JAVA/Reactor

시퀀스 변환

lovineff 2020. 6. 8. 18:30

1:1 변환(map)

한개의 데이터를 1:1 방식으로 변환해주며, 자바 스트림의 map()과 유사하다.

Flux.just("a", "bc", "def", "wxyz")
        .map(String::length)        // 문자열을 Integer 값으로 1-1 변환
        .subscribe(System.out::println);

 

1:N 변환(flatMap)

1개의 데이터로부터 시퀀스 생성할때 사용, 1:N 방식의 변환

flatMap에 전달한 함수가 생성하는 Flux는 하나의 시퀀스처럼 연결되므로, flatMap의 결과는 Flux 타입이 Flux<Flux<Integer>>가 아니라 Flux<Integer>이다.

Flux<Integer> seq = Flux.just(1, 2, 3)
        .flatMap(i -> Flux.range(1, i));     // Integer를 Flux<Integer>로 1-N 변환
seq.subscribe(System.out::println);

 

필터(filter)

시퀀스가 생성한 데이터를 필터링할 수 있다.

java stream 의 filter와 유사하게 동작한다

- filter() 함수의 결과가 true인 데이터만 전달하고, false인 데이터는 전달하지 않는다.

Flux.range(1, 10)
        .filter(num -> num % 2 == 0)
        .subscribe(System.out::println);

 

특정 값으로 시작하는 시퀀스로 변환(startWith)

1, 2, 3을 생성하는 시퀀스에 -1, 0으로 시작하는 신호를 추가

Flux<Integer> seq1 = Flux.just(1, 2, 3);
Flux<Integer> seq2 = seq1.startWith(-1, 0);
seq2.subscribe(System.out::println);

 

특정 값으로 끝나는 시퀀스로 변환(concatWithValues)

생성된 시퀀스가 특정 값으로 끝나도록 변환

Flux<Integer> seq1 = Flux.just(1, 2, 3);
Flux<Integer> seq = seq1.concatWithValues(100);
seq.subscribe(System.out::println);

 

시퀀스를 순서대로 연걸(concatWith)

여러 시퀀스를 순서대로 연결

Flux<Integer> seq1 = Flux.just(1, 2, 3);
Flux<Integer> seq2 = Flux.just(4, 5, 6);
Flux<Integer> seq3 = Flux.just(7, 8, 9);

seq1.concatWith(seq2).concatWith(seq3).subscribe(System.out::println);

 

시퀀스 발생 순서대로 섞기(mergeWith)

연결 순서가 아니라 시퀀스가 발생하는 데이터 순서대로 섞을때 사용

Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + " a");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + " bb");
tick1.mergeWith(tick2).subscribe(System.out::println);

 

시퀀스 묶기(zipWith)

두 시퀀스의 값을 묶은 값 쌍을 생성하는 시퀀스를 생성

개수에 맞춰 두 시퀀스의 데이터를 묶는다

변경된 시퀀스는 1:1 쌍이 된다

Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");
tick1.zipWith(tick2).subscribe(System.out::println);

 

시퀀스 묶기(combineLatest)

가장 최근의 데이터를 쌍으로 만든다.

Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");
Flux.combineLatest(tick1, tick2, (a, b) -> a + " - " + b).subscribe(System.out::println);

 

지정한 개수 / 시간에 해당하는 데이터만 유지(take, takeLast)

take(long)

- 시퀀스에서 처음 N 개의 데이터만 유지할 때

take(Duration)

- 지정한 시간 동안 발생한 데이터만 유지할 때

takeLast(long)

- 마지막 N개의 데이터만 유지할 때

Flux<Integer> someSeq = Flux.range(1, 20);

Flux<Integer> seq1 = someSeq.take(10); // 최초 10개 데이터만 유지
Flux<Integer> seq2 = someSeq.take(Duration.ofSeconds(10)); // 최초 10초 동안 데이터 유지
Flux<Integer> seq3 = someSeq.takeLast(5); // 마지막 5개 데이터만 유지

System.out.println("take(10)");
seq1.subscribe(System.out::println);

System.out.println("take(Duration.ofSeconds(10)");
seq2.subscribe(System.out::println);

System.out.println("takeLast(5)");
seq3.subscribe(System.out::println);

 

지정한 개수 / 시간만큼 데이터 거르기(skip, skipLast)

skip

- 처음 N개의 데이터를 거를때

skip(Duration)

- 지정한 처음 시간 동안 발생한 데이터를 거르고 싶을 때

skipLast

- 마지막 N개의 데이터를 거를때

Flux<Integer> someSeq = Flux.range(1, 20);

Flux<Integer> seq1 = someSeq.skip(10);                        // 최초 10개 데이터 제외
Flux<Integer> seq2 = someSeq.skip(Duration.ofSeconds(2));     // 최초 2초 동안 데이터 제외
Flux<Integer> seq3 = someSeq.skipLast(5);                     // 마지막 5개 데이터 제외

System.out.println("skip(10)");
seq1.subscribe(System.out::println);

System.out.println("skip(Duration.ofSeconds(10)");
seq2.subscribe(System.out::println);

System.out.println("skipLast(5)");
seq3.subscribe(System.out::println);

 

'JAVA > Reactor' 카테고리의 다른 글

리액터 쓰레드 스케줄링  (0) 2020.06.08
에러 처리  (0) 2020.06.08
비동기 멀티 스레드 생성  (0) 2020.06.08
프로그래밍 방식의 시퀀스 생성  (0) 2020.06.04
배압과 요청 재구성 방법  (0) 2020.06.04