본문 바로가기

공부/Apache Flink

[Apache Flink] CEP-Complex Event Processing

728x90
반응형

○ CEP ?

내용높은 빈도로 발생하면서 낮은 지연 시간을 요구하는 이벤트의 스트림을 분석합니다.

CEP는 이벤트, 하위 이벤트 및 시쿼스의 스트림에서 패턴을 파악할 수 있습니다.

서로 관련이 없는 이벤트들 사이에서 의미 있는 패턴 및 복잡한 관계를 알아내고, 실시간으로 알림을 전송하는

등 대응을 통해 피해를 사전에 막을 수 있도록 해준다.

 

○ Flink CEP 라이브러리 주요 구성 요소

1. 이벤트 스트림 ( Event Stream )

2. 패턴정의( Pattern definition )

3. 패턴 추적( Pattren detection )

4. 경보 조치 생성( Alert generation )



이벤트 스트림 ( Event Stream )

데이터를 입력받기 위한 이벤트 스트림을 이용하려면 이벤트에 대한 자바 POJO를 정의하는 것이다.
추상클래스를 정의하고 이를 상속받아 사용한다.

public abstract class MonitoringEvent {
	private String machineName;

	public String getMachineName() {
		return machineName;
	}

	public void setMachineName(String machineName) {
		this.machineName = machineName;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		MonitoringEvent other = (MonitoringEvent) obj;
		if (machineName == null) {
			if (other.machineName != null)
				return false;
		} else if (!machineName.equals(other.machineName))
			return false;
		return true;
	}

	public MonitoringEvent(String machineName) {
		super();
		this.machineName = machineName;
	}
}

    ↑ 추상클래스 생성. hashCode()와 equals() 메서드를 구형해야한다. 

 

public class TemperatureEvent extends MonitoringEvent {

	public TemperatureEvent(String machineName) {
		super(machineName);
	}

	private double temperature;

	public double getTemperature() {
		return temperature;
	}

	public void setTemperature(double temperature) {
		this.temperature = temperature;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = super.hashCode();
		long temp;
		temp = Double.doubleToLongBits(temperature);
		result = prime * result + (int) (temp ^ (temp >>> 32));
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (!super.equals(obj))
			return false;
		if (getClass() != obj.getClass())
			return false;
		TemperatureEvent other = (TemperatureEvent) obj;
		if (Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature))
			return false;
		return true;
	}

	public TemperatureEvent(String machineName, double temperature) {
		super(machineName);
		this.temperature = temperature;
	}

	@Override
	public String toString() {
		return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName() + "]";
	}
}

    ↑ 추상클래스를 상속받아 POJO를 생성

 



Pattern API

 

Pattern API 를 이용하여 이벤트 패턴을 쉽게 정의하고 분석이 가능하다.  Pattern은 여러 개의 state 로 구성되고 하나의 state에서 다른 state로 넘어가기 위해 condition 정의해야한다.

condition은 이벤트를 지속하게 할 수도 있고 필터링할 수도 있다.

 

▷ 시작(Begin)

초기 state를 정의한다.

Pettern(Event, ?> stat = Pattren.<Event>begin(“start”);

 

▷ Filter

필터 조건을 명시한다.

Start.where(new FilterFunction<Event>() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // 조건문
    }
});

 

Subtype

Subtype() 메서드를 이용해 이벤트를 필터링할 수도 있다.

Start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // 조건문
    }
});

 

OR / AND

ORAND 연산자를 이용ㅇ하여 여러 개의 컨디션을 한꺼번에 정의할 수 있다.

start.where(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // 조건문 1
    }
}).or(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // 조건문 2
    };
});

 

컨티뉴이티 (  Continuity )

이벤트를 필터링 하지 않고 지속되어야 하는경우 사용.

 

    - 엄격한 컨티뉴이티 (Strict Contiguity )
        두 이벤트 사이에 다른 이벤트가 없어야 한다는 의미이고, 두 이벤트의 연속성을 뜻한다.

Pattern<Event, ?> strictNext = start.next("middle");

    - 엄격하지 않는 컨티뉴이티 ( Non-strict Contiguity, Relaxed Contiguity )
        두 이벤트 사이에 다른 이벤트가 있어도 된다.

Pattern<Event, ?> strictNext = start.followedBy("middle");

 

Within

시간 간격(Time interval)을 기반으로 패턴 매칭을 할 수 있다.

next.within(Time.seconds(30));

 

패턴의 추적

패턴에대해 스트림을 실행 시킨다.

CEP.pattern()을 실행시키면 PatternStream 리턴한다.

// ex) 온도가 10초 간격으로 26.0도보다 높은지 확인한다.
Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>
    .begin("first")
    .subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {
        public voolean filter(TemperatureEvent value) {
            if(value.getTemperature() >= 26.0) {
                return true;
            } 
            return false;
        }
    }).within(Time.seconds(10));

PatternStream<TemperatureEvent> patternStream = CEP.pattern(inputEventStream, warningPattern);

 

패턴의 선택

패턴 스트림에 대해 적절한 액션을 취해야 한다.

패턴에서 데이터를 선택하고자 할 때 selectflatSelect 메서드를 사용한다.

 

    - select

        Select 매서드를 사용하려면 PatternSelectionFunction을 구현해야 한다. Select 메서드는
        각 이벤트 시퀀스별로 존재하며, 패턴과 일치하는 이벤트에 대해 string/event쌍의 맵을 리턴한다.
        결과를 취합하기 위해서는 아웃풋 POJO를 정의해야한다.

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
    @Override
    public OUT select(Map<String, IN> pattern) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");
        return new OUT(startEvent, endEvent);
    }
}

    - flatSelect

        Flatselct 는 결과를 임의의 개수 만큼 리턴 할수 있다. Collector 파라미터를 가지고있다.

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
    @Override
    public OUT select(Map<String, IN> pattern, Collector<OUT> collector) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");
        
        for (int i=0; i<startEvent.getValue(); i++) {
            collector.collect(new OUT(startEvent, endEvent));
        }
    }
}

 

타임아웃 패턴 처리

일정 크기 만큼 시간을 잡는 형태로 패턴에 제약을 두면, 종종 이벤트를 놓치는 경우가 발생한다.

타임아웃이 일어난 이벤트에 대해 어떤 액션을 취하고 싶을때, selectflatselect 메서드에서

타임아웃을 처리할 수 있도록 해준다. patternSelectFunctionpatternTimeoutFunction 2개의 파라미터를 포함한다.

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Either<TimeoutEvent, ComplexEvent>> result = 
    patternStrem.select(
        new PatternTimeoutFunction<Event, TimeoutEvent>() { ... },
        new PatternSelectFunction<Event, ComplexEvent>() { ... }
    );

DataStream<Either<TimeoutEvent, ComplexEvent>> result = 
    patternStrem.flatSelect(
        new PatternFlatTimeoutFunction<Event, TimeoutEvent>() { ... },
        new PatternFlatSelectFunction<Event, ComplexEvent>() { ... }
    );


▶ 마무리

    Apache Flink 에서 사용하는 CEP API에 대해 간단하게 알아봤습니다.

    다음에는 예제를 통해 API를 활용해보겠습니다.

728x90
반응형