○ CEP 란?
내용높은 빈도로 발생하면서 낮은 지연 시간을 요구하는 이벤트의 스트림을 분석합니다.
CEP는 이벤트, 하위 이벤트 및 시쿼스의 스트림에서 패턴을 파악할 수 있습니다.
서로 관련이 없는 이벤트들 사이에서 의미 있는 패턴 및 복잡한 관계를 알아내고, 실시간으로 알림을 전송하는
등 대응을 통해 피해를 사전에 막을 수 있도록 해준다.
○ Flink CEP 라이브러리 주요 구성 요소
1. 이벤트 스트림 ( Event Stream )
2. 패턴정의( Pattern definition )
3. 패턴 추적( Pattren detection )
4. 경보 조치 생성( Alert generation )
● 이벤트 스트림 ( Event Stream )
데이터를 입력받기 위한 이벤트 스트림을 이용하려면 이벤트에 대한 자바 POJO를 정의하는 것이다.
추상클래스를 정의하고 이를 상속받아 사용한다.
public abstract class MonitoringEvent {
private String machineName;
public String getMachineName() {
return machineName;
}
public void setMachineName(String machineName) {
this.machineName = machineName;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MonitoringEvent other = (MonitoringEvent) obj;
if (machineName == null) {
if (other.machineName != null)
return false;
} else if (!machineName.equals(other.machineName))
return false;
return true;
}
public MonitoringEvent(String machineName) {
super();
this.machineName = machineName;
}
}
↑ 추상클래스 생성. hashCode()와 equals() 메서드를 구형해야한다.
public class TemperatureEvent extends MonitoringEvent {
public TemperatureEvent(String machineName) {
super(machineName);
}
private double temperature;
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
long temp;
temp = Double.doubleToLongBits(temperature);
result = prime * result + (int) (temp ^ (temp >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
TemperatureEvent other = (TemperatureEvent) obj;
if (Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature))
return false;
return true;
}
public TemperatureEvent(String machineName, double temperature) {
super(machineName);
this.temperature = temperature;
}
@Override
public String toString() {
return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName() + "]";
}
}
↑ 추상클래스를 상속받아 POJO를 생성
● Pattern API
Pattern API 를 이용하여 이벤트 패턴을 쉽게 정의하고 분석이 가능하다. 각 Pattern은 여러 개의 state 로 구성되고 하나의 state에서 다른 state로 넘어가기 위해 condition을 정의해야한다.
condition은 이벤트를 지속하게 할 수도 있고 필터링할 수도 있다.
▷ 시작(Begin)
초기 state를 정의한다.
Pettern(Event, ?> stat = Pattren.<Event>begin(“start”);
▷ Filter
필터 조건을 명시한다.
Start.where(new FilterFunction<Event>() {
@Override
public boolean filter(SubEvent value) {
return ... // 조건문
}
});
▷ Subtype
Subtype() 메서드를 이용해 이벤트를 필터링할 수도 있다.
Start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
@Override
public boolean filter(SubEvent value) {
return ... // 조건문
}
});
▷ OR / AND
OR과 AND 연산자를 이용ㅇ하여 여러 개의 컨디션을 한꺼번에 정의할 수 있다.
start.where(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) {
return ... // 조건문 1
}
}).or(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) {
return ... // 조건문 2
};
});
▷ 컨티뉴이티 ( Continuity )
이벤트를 필터링 하지 않고 지속되어야 하는경우 사용.
- 엄격한 컨티뉴이티 (Strict Contiguity )
두 이벤트 사이에 다른 이벤트가 없어야 한다는 의미이고, 두 이벤트의 연속성을 뜻한다.
Pattern<Event, ?> strictNext = start.next("middle");
- 엄격하지 않는 컨티뉴이티 ( Non-strict Contiguity, Relaxed Contiguity )
두 이벤트 사이에 다른 이벤트가 있어도 된다.
Pattern<Event, ?> strictNext = start.followedBy("middle");
▷ Within
시간 간격(Time interval)을 기반으로 패턴 매칭을 할 수 있다.
next.within(Time.seconds(30));
▷ 패턴의 추적
패턴에대해 스트림을 실행 시킨다.
CEP.pattern()을 실행시키면 PatternStream을 리턴한다.
// ex) 온도가 10초 간격으로 26.0도보다 높은지 확인한다.
Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>
.begin("first")
.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {
public voolean filter(TemperatureEvent value) {
if(value.getTemperature() >= 26.0) {
return true;
}
return false;
}
}).within(Time.seconds(10));
PatternStream<TemperatureEvent> patternStream = CEP.pattern(inputEventStream, warningPattern);
▷ 패턴의 선택
패턴 스트림에 대해 적절한 액션을 취해야 한다.
패턴에서 데이터를 선택하고자 할 때 select와 flatSelect 메서드를 사용한다.
- select
Select 매서드를 사용하려면 PatternSelectionFunction을 구현해야 한다. Select 메서드는
각 이벤트 시퀀스별로 존재하며, 패턴과 일치하는 이벤트에 대해 string/event쌍의 맵을 리턴한다.
결과를 취합하기 위해서는 아웃풋 POJO를 정의해야한다.
class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
@Override
public OUT select(Map<String, IN> pattern) {
IN startEvent = pattern.get("start");
IN endEvent = pattern.get("end");
return new OUT(startEvent, endEvent);
}
}
- flatSelect
Flatselct 는 결과를 임의의 개수 만큼 리턴 할수 있다. Collector 파라미터를 가지고있다.
class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
@Override
public OUT select(Map<String, IN> pattern, Collector<OUT> collector) {
IN startEvent = pattern.get("start");
IN endEvent = pattern.get("end");
for (int i=0; i<startEvent.getValue(); i++) {
collector.collect(new OUT(startEvent, endEvent));
}
}
}
▷ 타임아웃 패턴 처리
일정 크기 만큼 시간을 잡는 형태로 패턴에 제약을 두면, 종종 이벤트를 놓치는 경우가 발생한다.
타임아웃이 일어난 이벤트에 대해 어떤 액션을 취하고 싶을때, select와 flatselect 메서드에서
타임아웃을 처리할 수 있도록 해준다. patternSelectFunction과 patternTimeoutFunction 2개의 파라미터를 포함한다.
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Either<TimeoutEvent, ComplexEvent>> result =
patternStrem.select(
new PatternTimeoutFunction<Event, TimeoutEvent>() { ... },
new PatternSelectFunction<Event, ComplexEvent>() { ... }
);
DataStream<Either<TimeoutEvent, ComplexEvent>> result =
patternStrem.flatSelect(
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() { ... },
new PatternFlatSelectFunction<Event, ComplexEvent>() { ... }
);
▶ 마무리
Apache Flink 에서 사용하는 CEP API에 대해 간단하게 알아봤습니다.
다음에는 예제를 통해 API를 활용해보겠습니다.
'공부 > Apache Flink' 카테고리의 다른 글
[Apache Flink] Standalone Cluster 구성 (0) | 2021.04.15 |
---|---|
[Apache Flink] DataStream API를 이용한 데이터 처리 (0) | 2021.03.30 |
[Apache Flink] 설치하기 (windows 10) (0) | 2021.03.16 |