○ Apache Flink DataStream API 라이브러리 주요 구성 요소
1.실행환경
2.데이터 소스
3.데이터 트랜스포메이션
4.데이터 싱크
5.커넥터
● 실행 환경 ( Execution Environment )
▷ 실행환경 구분
- 기존 플링크 환경 입력
- 로컬 실행환경 생성
- 원격 실행환경 생성
보통 실행시 getExecutionEnviroment() 를 사용한다.
그럼 IDE의 로컬에서 실행 중인 경우 로컬실행환경이 시작되고,
아닐 경우 클러스터 관리자가 분산형태로 실행한다.
직접 구분하고 싶으면
로컬 환경은 createLocalEnvironment() 를 사용.
원격은 createRemoteEnvironment(String host, int port, String, jar파일 등) 사용
● 데이터 소스 (Data Source)
데이터를 가져오는 역활
▷ 소켓기반
socketTextStream(hostName, port)
socketTextStream(hostName, port, delimiter) // 구분자
socketTextStream(hostName, port, delimiter, maxRetry) // 데이터 패치를 시도하는 최대 횟수
▷ 파일기반
readTextFile(Strinf filePath) // 텍스트 파일을 한 줄씩 순차적으로 읽는다
readFile(FileInputFormat<Out> inputFormat, String path) // 텍스트 파일이 아닌경우 사용
readFilesStream(String path, long intervalMills, FileMonitoringFunction.WatchType watchType)
// 파일스트림을 읽을때 사용
● 데이터 소스 (Data Source)
받아온 데이터를 다른 형태로 변환하는 작업
▷ Map
inputStream.map(new MapFunction<Integer, Integer>() {
@override
public Integer map(Integer value) throws Exception {
return 5 * value;
}
});
▷ FlatMap
inputStream.flotMap(new FlatMapFunction<String, String>() {
@Override
public Integer flatMap(String value, Collector<String> out) throws Exception {
for(String word : value.split(" ")) {
out.collect(word);
}
}
});
▷ Filter
결과가 true인 경우 레코드를 생성
inputStream.filter(new FilterFunction<Integer>() {
@override
public boolean filter(Integer value) throws Exception {
return value != 1;
}
});
▷ KeyBy
Key를 기반으로 스트림을 파티션하는 역할을 한다. 리턴값은 keyedDataStream.
inputStream.keyBy(“someKey”);
▷ Reduce
Key를 기반으로 스트림을 파티션하는 역할을 한다. 리턴값은 keyedDataStream.
inputStream.keyBy(“someKey”)
▷ Reduce
현재 값을 갖고 가장 마지막으로Reduce한 결과를 keyedDataStream으로 넘기는 역할.
keyedInputStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
});
▷ Fold
현재 값을 갖고 가장 마지막 폴더에 조합한 결과를 keyedDataStream으로 넘기는 역할.
keyedInputStream keyedStream.fold("Start", new FoldFunction<Integer, String>() {
@Override
public Integer fold(String current, Integer value) throws Exception {
return current + "=" + value;
}
});
결과예시 : 스트림 (1,2,3,4,5) 를 적용하면 결과는 Start=1=2=3=4=5
▷ 집계함수
keyedInputStream.sum(0)
keyedInputStream.sum("key")
keyedInputStream.min(0)
keyedInputStream.min("key")
keyedInputStream.max(0)
keyedInputStream.max("key")
keyedInputStream.minBy(0)
keyedInputStream.minBy("key")
keyedInputStream.maxBy(0)
keyedInputStream.maxBy("key")
Max는 가장 큰값을 리턴, maxby는 가장 큰값의 key를 리턴
▷ Window
시간 또는 그 외에 다른 조건으로 그룹화할 수 있도록 해준다
inputStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10)));
플링크는 무제한의 데이터 스트림을 처리하기 위해 데이터 슬라이스를 정의한다.(sliding window)
이는 데이터를 chunk 단위로 처리할 수 있게 해준다. 스트림을 윈도우 하기위해
분산시키는 key와 수행할 트랜스포메이션을 설명하는 함수를 할당해야한다.
이미 구현되어있는 플링크 윈도우 할당기 ( Window assginer )
- 글로벌 윈도우
영원히 유지되는 윈도우. Key별로 하나씩 할당
- 텀블링 윈도우
고정된 시간으로 생성되는 윈도우. 중복이 없다
- 슬라이딩 윈도우
텀블링 윈도우와 유사하지만, 오버랩을 허용한다.
- 세션 윈도우
윈도우의 시작시간과 윈도우 크기를 유연하게 설정하며, 종료되기 전에
얼마나 지속할지를 나타내는 세션갭 설정이 가능하다.
▷ WindowAll
정규화된 데이터 스트림을 그룹화 할수 있도록 한다.
파티션이 안된 데이터에 동작하므로 병렬로 처리되지 않는다.
inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));
▷ Union
두개 이상의 데이터 스트림을 하나로 합친다.
파티션이 안된 데이터에 동작하므로 병렬로 처리되지 않는다.
inputStream.union(inputStream1, inputStream2, … );
▷ Window Join
특정 key를 이용해 2개의 데이터 스트림을 공동 윈도우로 조인한다.
inputStream.join(inputStream1)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction() { ... });
▷ Split
두개이상의 스트림으로 분할하는 역할.
SplitStream<Integer> split = inputStream.split(new OuputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if( value%2 == 0 ) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
});
▷ Select
분류된 특정 스트림을 선택하는 데 사용.
SplitStream<Integer> split;
DataStream<Integer> even = split.selct("even");
DataStream<Integer> odd = split.selct("odd");
DataStream<Integer> all = split.selct("even", "odd");
▷ Project
이벤트 스트림에서 일부 어트리뷰트를 선택하고 다음단계로 보낸다.
DataStream<Tuple4<Integer, Double, String, String >> in = // […]
DataStream<Tuple2<String, String >> out = in.project(3,2);
▷ 물리적 파티셔닝
- 커스텀 파티셔닝
파티션 함수를 사용자가 원하는대로 구현 가능
inputStream.partitionCustom(partitioner, “someKey”)
inputStream.partitionCustom(partitioner, 0)
- 랜덤 파티셔닝
비슷한 크기를 갖도록 무자위로 파티셔닝하는 기능
inputStream.shuffle();
- 리밸런싱 파티셔닝
데이터를 골고루 분산시키는 역할
inputStream.rebalance();
- 리스케일링
운영과정에서 데이터를 분산시키는데 사용.
단일 노드에서 일어나므로 네트워크상에서 데이터 전송작업은 없다.
inputStream.rescale();
▷ 브로드캐스팅
모든 레코드를 각 파티션에 분산시키는 역할
inputStream.broadcast();
● 데이터 싱크 (Data Sinks)
데이터 트랜스포메이션 작업 완료 후 저장하기 위한 작업
-writeAsText() : 스트링형태로 한 번에 한 라인씩 레코드를 쓴다.
-writeAsCsv() : 콤마로 구분된 형태로 파일 저장
-Print(), printErr() : 화면에 표시
-writeUsingOutputFormat() : 사용자가 지정한 대로 출력 형태를 제공.
-writeToSocker() : 특정 소켓을 이용해 데이터 쓰기 작업을 수행.
● 이벤트 타임과 워터마크
플링크는 Streaming API 에 대한 타임 관점을 여러가지 지원한다.
▷ 이벤트 타임
이벤트가 디바이스에서 생성된 타임을 의미.
타임값을 추출해서 사용. 순서 상관없이 유입되는 이벤트를 처리시 사용
▷ 프로세싱 타임
데이터 스트림이 유입되서 실제 서버가 실행하는 시간을 의미.
가장 간단항 스트림 처리방법.
▷ 인제스천 타임
특정 이벤트가 플링크에 입력되는 타임을 의미.
순서를 무시한 이벤트를 처리할 수 없다.
● 커넥터 ( Connecter )
플링크는 데이터를 읽고 쓸 수 있도록 커넥터를 지원한다.
▷ 카프카 커넥터
이벤트가 디바이스에서 생성된 타임을 의미.
타임값을 추출해서 사용. 순서 상관없이 유입되는 이벤트를 처리시 사용
// Consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> input = env.addSource(
new FlinkKafkaConsumer09<String>("mytopic", new SimpleStringSchema(), properties));
// Producer
stream.addSink(
new FlinkKafkaProducer09<String>(
"localhost:9092"
, "mytopic"
, new SimpleStringSchema()
)
);
▷ 트위터 커넥터
▷ 카산드라 커넥터
▷ RabbitMQ 커넥터
▷ Elasticsearch 커넥터
- 임베디드 모드
DataStream<String> input = ...;
Map<String, String> config = Maps,newHashMap();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "cluster-name");
input.addSick(New Elasticsearchsink<>(config, new IndexRequestBuilder<String() {
@Override
public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}));
- 트랜스포트 클라이언트 모드
DataStream<String> input = ...;
Map<String, String> config = Maps,newHashMap();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "cluster-name");
List<TransportAddress> transports = new ArrayList<String>();
transports.add(new InetSocketTransportAddress("es-node-1", 9300));
transports.add(new InetSocketTransportAddress("es-node-2", 9300));
transports.add(new InetSocketTransportAddress("es-node-3", 9300));
input.addSink(new Elasticsearchsink<>(
config, tranports, new IndexRequestBuilder<String() {
@Override
public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}));
▶ 마무리
커넥터는 제가 사용할 내용만 정리하였습니다.
카프카로 데이터를 받고 Elasticsearch로 데이터를 저장하는 방식이 될꺼같습니다.
'공부 > Apache Flink' 카테고리의 다른 글
[Apache Flink] Standalone Cluster 구성 (0) | 2021.04.15 |
---|---|
[Apache Flink] CEP-Complex Event Processing (0) | 2021.03.30 |
[Apache Flink] 설치하기 (windows 10) (0) | 2021.03.16 |