본문 바로가기

공부/Apache Flink

[Apache Flink] DataStream API를 이용한 데이터 처리

728x90
반응형

○ Apache Flink DataStream API 라이브러리 주요 구성 요소

    1.실행환경

    2.데이터 소스

    3.데이터 트랜스포메이션

    4.데이터 싱크

    5.커넥터


실행 환경 ( Execution Environment )

 

실행환경 구분

    - 기존 플링크 환경 입력

    - 로컬 실행환경 생성

    - 원격 실행환경 생성

 

보통 실행시 getExecutionEnviroment() 를 사용한다.

그럼 IDE의 로컬에서 실행 중인 경우 로컬실행환경이 시작되고,

아닐 경우 클러스터 관리자가 분산형태로 실행한다.

 

직접 구분하고 싶으면

  로컬 환경은 createLocalEnvironment() 를 사용.

  원격은 createRemoteEnvironment(String host, int port, String, jar파일 등) 사용

 



데이터 소스 (Data Source)

데이터를 가져오는 역활

 

 소켓기반

socketTextStream(hostName, port) 
socketTextStream(hostName, port, delimiter)  // 구분자
socketTextStream(hostName, port, delimiter, maxRetry) // 데이터 패치를 시도하는 최대 횟수

 

파일기반

readTextFile(Strinf filePath) // 텍스트 파일을 한 줄씩 순차적으로 읽는다
readFile(FileInputFormat<Out> inputFormat, String path) // 텍스트 파일이 아닌경우 사용
readFilesStream(String path, long intervalMills, FileMonitoringFunction.WatchType watchType)
    // 파일스트림을 읽을때 사용


 데이터 소스 (Data Source)

 

받아온 데이터를 다른 형태로 변환하는 작업

 

 Map

inputStream.map(new MapFunction<Integer, Integer>() {
    @override
    public Integer map(Integer value) throws Exception {
        return 5 * value;
    }
});

 

 FlatMap

inputStream.flotMap(new FlatMapFunction<String, String>() {    
	@Override    
    public Integer flatMap(String value, Collector<String> out) throws Exception {        
    	for(String word : value.split(" ")) {
        	out.collect(word);        
        }
    }
});

 

 Filter

결과가 true인 경우 레코드를 생성

inputStream.filter(new FilterFunction<Integer>() {
    @override
    public boolean filter(Integer value) throws Exception {
        return value != 1;
    }
});

 

 KeyBy

Key를 기반으로 스트림을 파티션하는 역할을 한다. 리턴값은 keyedDataStream.

inputStream.keyBy(“someKey”);

 

 Reduce

Key를 기반으로 스트림을 파티션하는 역할을 한다. 리턴값은 keyedDataStream.

inputStream.keyBy(“someKey”)

 

 Reduce

현재 값을 갖고 가장 마지막으로Reduce한 결과를 keyedDataStream으로 넘기는 역할.

keyedInputStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2) throws Exception {
        return value1 + value2;
    }
});

 

 Fold

현재 값을 갖고 가장 마지막 폴더에 조합한 결과를 keyedDataStream으로 넘기는 .

keyedInputStream keyedStream.fold("Start", new FoldFunction<Integer, String>() {
    @Override
    public Integer fold(String current, Integer value) throws Exception {
        return current + "=" + value;
    }
});

결과예시 : 스트림 (1,2,3,4,5) 를 적용하면 결과는 Start=1=2=3=4=5

 

 

 집계함수

keyedInputStream.sum(0)
keyedInputStream.sum("key")
keyedInputStream.min(0)
keyedInputStream.min("key")
keyedInputStream.max(0)
keyedInputStream.max("key")
keyedInputStream.minBy(0)
keyedInputStream.minBy("key")
keyedInputStream.maxBy(0)
keyedInputStream.maxBy("key")

Max는 가장 큰값을 리턴, maxby는 가장 큰값의 key를 리턴

 

 

 Window

시간 또는 그 외에 다른 조건으로 그룹화할 수 있도록 해준다

inputStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10)));

 

플링크는 무제한의 데이터 스트림을 처리하기 위해 데이터 슬라이스를 정의한다.(sliding window)

이는 데이터를 chunk 단위로 처리할 수 있게 해준다. 스트림을 윈도우 하기위해

분산시키는 key와 수행할 트랜스포메이션을 설명하는 함수를 할당해야한다.

 

이미 구현되어있는 플링크 윈도우 할당기 ( Window assginer )

    - 글로벌 윈도우

      영원히 유지되는 윈도우. Key별로 하나씩 할당

    - 텀블링 윈도우

      고정된 시간으로 생성되는 윈도우. 중복이 없다

    - 슬라이딩 윈도우

      텀블링 윈도우와 유사하지만, 오버랩을 허용한다.

    - 세션 윈도우

      윈도우의 시작시간과 윈도우 크기를 유연하게 설정하며, 종료되기 전에

      얼마나 지속할지를 나타내는 세션갭 설정이 가능하다.

 

 WindowAll

정규화된 데이터 스트림을 그룹화 할수 있도록 한다.

파티션이 안된 데이터에 동작하므로 병렬로 처리되지 않는다.

inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));

 Union

두개 이상의 데이터 스트림을 하나로 합친다.

파티션이 안된 데이터에 동작하므로 병렬로 처리되지 않는다.

inputStream.union(inputStream1, inputStream2, … );

 

 Window Join

특정 key를 이용해 2개의 데이터 스트림을 공동 윈도우로 조인한다.

inputStream.join(inputStream1)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .apply(new JoinFunction() { ... });

 

 Split

두개이상의 스트림으로 분할하는 역할.

SplitStream<Integer> split = inputStream.split(new OuputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if( value%2 == 0 ) {
            output.add("even");
        } else {
            output.add("odd");
        }
        return output;
    }
});

 

 Select

분류된 특정 스트림을 선택하는 데 사용.

SplitStream<Integer> split;
DataStream<Integer> even = split.selct("even");
DataStream<Integer> odd = split.selct("odd");
DataStream<Integer> all = split.selct("even", "odd");

 

 Project

이벤트 스트림에서 일부 어트리뷰트를 선택하고 다음단계로 보낸다.

DataStream<Tuple4<Integer, Double, String, String >> in = // […]
DataStream<Tuple2<String, String >> out = in.project(3,2);

 

 물리적 파티셔닝

    - 커스텀 파티셔닝

      파티션 함수를 사용자가 원하는대로 구현 가능

inputStream.partitionCustom(partitioner, “someKey”)
inputStream.partitionCustom(partitioner, 0)

    - 랜덤 파티셔닝

      비슷한 크기를 갖도록 무자위로 파티셔닝하는 기능

inputStream.shuffle();

    - 리밸런싱 파티셔닝

      데이터를 골고루 분산시키는 역할

inputStream.rebalance();

    - 리스케일링

      운영과정에서 데이터를 분산시키는데 사용.

      단일 노드에서 일어나므로 네트워크상에서 데이터 전송작업은 없다.

inputStream.rescale();

 

 브로드캐스팅

모든 레코드를 각 파티션에 분산시키는 역할

inputStream.broadcast();

 



데이터 (Data Sinks)

 

데이터 트랜스포메이션 작업 완료 후 저장하기 위한 작업

-writeAsText() : 스트링형태로 한 번에 한 라인씩 레코드를 쓴다.

-writeAsCsv() : 콤마로 구분된 형태로 파일 저장

-Print(), printErr() : 화면에 표시

-writeUsingOutputFormat() : 사용자가 지정한 대로 출력 형태를 제공.

-writeToSocker() : 특정 소켓을 이용해 데이터 쓰기 작업을 수행.

 



이벤트 타임과 워터마크

플링크는 Streaming API 에 대한 타임 관점을 여러가지 지원한다.

 이벤트 타임

이벤트가 디바이스에서 생성된 타임을 의미.

타임값을 추출해서 사용. 순서 상관없이 유입되는 이벤트를 처리시 사용

 

 프로세싱 타임

데이터 스트림이 유입되서 실제 서버가 실행하는 시간을 의미.

가장 간단항 스트림 처리방법.

 

 인제스천 타임

특정 이벤트가 플링크에 입력되는 타임을 의미.

순서를 무시한 이벤트를 처리할 수 없다.



커넥터 ( Connecter )

플링크는 데이터를 읽고 쓸 수 있도록 커넥터를 지원한다.

 

 카프카 커넥터

이벤트가 디바이스에서 생성된 타임을 의미.

타임값을 추출해서 사용. 순서 상관없이 유입되는 이벤트를 처리시 사용

// Consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> input = env.addSource(
    new FlinkKafkaConsumer09<String>("mytopic", new SimpleStringSchema(), properties));

// Producer
stream.addSink(
    new FlinkKafkaProducer09<String>(
        "localhost:9092"
        , "mytopic"
        , new SimpleStringSchema()
    )
);

 

 트위 커넥터

 

 카산드라 커넥터

 

 RabbitMQ 커넥터

 

 Elasticsearch 커넥터

    - 임베디드 모드

DataStream<String> input = ...;

Map<String, String> config = Maps,newHashMap();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "cluster-name");
input.addSick(New Elasticsearchsink<>(config, new IndexRequestBuilder<String() {
    @Override
    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);
        return Requests.indexRequest()
                    .index("my-index")
                    .type("my-type")
                    .source(json);
    }
}));

 

    - 트랜스포트 클라이언트 모드

DataStream<String> input = ...;

Map<String, String> config = Maps,newHashMap();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "cluster-name");
List<TransportAddress> transports = new ArrayList<String>();
transports.add(new InetSocketTransportAddress("es-node-1", 9300));
transports.add(new InetSocketTransportAddress("es-node-2", 9300));
transports.add(new InetSocketTransportAddress("es-node-3", 9300));
input.addSink(new Elasticsearchsink<>(
    config, tranports, new IndexRequestBuilder<String() {
    @Override
    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);
        return Requests.indexRequest()
                    .index("my-index")
                    .type("my-type")
                    .source(json);
    }
}));

 

 


▶ 마무리

    커넥터는 제가 사용할 내용만 정리하였습니다.

    카프카로 데이터를 받고 Elasticsearch로 데이터를 저장하는 방식이 될꺼같습니다.

    

728x90
반응형