1:1 변환(map)
한개의 데이터를 1:1 방식으로 변환해주며, 자바 스트림의 map()과 유사하다.
Flux.just("a", "bc", "def", "wxyz")
.map(String::length) // 문자열을 Integer 값으로 1-1 변환
.subscribe(System.out::println);
1:N 변환(flatMap)
1개의 데이터로부터 시퀀스 생성할때 사용, 1:N 방식의 변환
flatMap에 전달한 함수가 생성하는 Flux는 하나의 시퀀스처럼 연결되므로, flatMap의 결과는 Flux 타입이 Flux<Flux<Integer>>가 아니라 Flux<Integer>이다.
Flux<Integer> seq = Flux.just(1, 2, 3)
.flatMap(i -> Flux.range(1, i)); // Integer를 Flux<Integer>로 1-N 변환
seq.subscribe(System.out::println);
필터(filter)
시퀀스가 생성한 데이터를 필터링할 수 있다.
java stream 의 filter와 유사하게 동작한다
- filter() 함수의 결과가 true인 데이터만 전달하고, false인 데이터는 전달하지 않는다.
Flux.range(1, 10)
.filter(num -> num % 2 == 0)
.subscribe(System.out::println);
특정 값으로 시작하는 시퀀스로 변환(startWith)
1, 2, 3을 생성하는 시퀀스에 -1, 0으로 시작하는 신호를 추가
Flux<Integer> seq1 = Flux.just(1, 2, 3);
Flux<Integer> seq2 = seq1.startWith(-1, 0);
seq2.subscribe(System.out::println);
특정 값으로 끝나는 시퀀스로 변환(concatWithValues)
생성된 시퀀스가 특정 값으로 끝나도록 변환
Flux<Integer> seq1 = Flux.just(1, 2, 3);
Flux<Integer> seq = seq1.concatWithValues(100);
seq.subscribe(System.out::println);
시퀀스를 순서대로 연걸(concatWith)
여러 시퀀스를 순서대로 연결
Flux<Integer> seq1 = Flux.just(1, 2, 3);
Flux<Integer> seq2 = Flux.just(4, 5, 6);
Flux<Integer> seq3 = Flux.just(7, 8, 9);
seq1.concatWith(seq2).concatWith(seq3).subscribe(System.out::println);
시퀀스 발생 순서대로 섞기(mergeWith)
연결 순서가 아니라 시퀀스가 발생하는 데이터 순서대로 섞을때 사용
Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + " a");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + " bb");
tick1.mergeWith(tick2).subscribe(System.out::println);
시퀀스 묶기(zipWith)
두 시퀀스의 값을 묶은 값 쌍을 생성하는 시퀀스를 생성
개수에 맞춰 두 시퀀스의 데이터를 묶는다
변경된 시퀀스는 1:1 쌍이 된다
Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");
tick1.zipWith(tick2).subscribe(System.out::println);
시퀀스 묶기(combineLatest)
가장 최근의 데이터를 쌍으로 만든다.
Flux<String> tick1 = Flux.interval(Duration.ofSeconds(1)).map(tick -> tick + "초틱");
Flux<String> tick2 = Flux.interval(Duration.ofMillis(700)).map(tick -> tick + "밀리초틱");
Flux.combineLatest(tick1, tick2, (a, b) -> a + " - " + b).subscribe(System.out::println);
지정한 개수 / 시간에 해당하는 데이터만 유지(take, takeLast)
take(long)
- 시퀀스에서 처음 N 개의 데이터만 유지할 때
take(Duration)
- 지정한 시간 동안 발생한 데이터만 유지할 때
takeLast(long)
- 마지막 N개의 데이터만 유지할 때
Flux<Integer> someSeq = Flux.range(1, 20);
Flux<Integer> seq1 = someSeq.take(10); // 최초 10개 데이터만 유지
Flux<Integer> seq2 = someSeq.take(Duration.ofSeconds(10)); // 최초 10초 동안 데이터 유지
Flux<Integer> seq3 = someSeq.takeLast(5); // 마지막 5개 데이터만 유지
System.out.println("take(10)");
seq1.subscribe(System.out::println);
System.out.println("take(Duration.ofSeconds(10)");
seq2.subscribe(System.out::println);
System.out.println("takeLast(5)");
seq3.subscribe(System.out::println);
지정한 개수 / 시간만큼 데이터 거르기(skip, skipLast)
skip
- 처음 N개의 데이터를 거를때
skip(Duration)
- 지정한 처음 시간 동안 발생한 데이터를 거르고 싶을 때
skipLast
- 마지막 N개의 데이터를 거를때
Flux<Integer> someSeq = Flux.range(1, 20);
Flux<Integer> seq1 = someSeq.skip(10); // 최초 10개 데이터 제외
Flux<Integer> seq2 = someSeq.skip(Duration.ofSeconds(2)); // 최초 2초 동안 데이터 제외
Flux<Integer> seq3 = someSeq.skipLast(5); // 마지막 5개 데이터 제외
System.out.println("skip(10)");
seq1.subscribe(System.out::println);
System.out.println("skip(Duration.ofSeconds(10)");
seq2.subscribe(System.out::println);
System.out.println("skipLast(5)");
seq3.subscribe(System.out::println);
'JAVA > Reactor' 카테고리의 다른 글
리액터 쓰레드 스케줄링 (0) | 2020.06.08 |
---|---|
에러 처리 (0) | 2020.06.08 |
비동기 멀티 스레드 생성 (0) | 2020.06.08 |
프로그래밍 방식의 시퀀스 생성 (0) | 2020.06.04 |
배압과 요청 재구성 방법 (0) | 2020.06.04 |