Flux
// String 시퀀스 생성
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
// 객체로 부터 시퀀스 생성
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
// 5부터 7까지 Integer 시퀀스 Flux 생성
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
Mono
Mono 생성 참고 자료
- Mono.just vs Mono.create
- https://stackoverflow.com/questions/56115447/mono-defer-vs-mono-create-vs-mono-just
// 값이 없는 Mono 생성
Mono<String> noData = Mono.empty();
// String Mono 시퀀스 생성
Mono<String> data = Mono.just("foo");
subscribe() 함수
Consumer : next 신호 처리
errorConsumer : error 신호 처리
completeConsumer : complete 신호 처리
subscriptionConsumer : Subscriber의 onSubscribe 메서드에 대응
// Subscribe를 수행하고 시퀀스를 작동시킨다
subscribe();
// 생성된 값으로 무언가를 수행한다
subscribe(Consumer<? super T> consumer);
// 값을 처리하고 에러 반응한다
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 값과 에러를 처리하고, 시퀀스가 성공적으로 종료되면 무언가를 수행한다
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 값과 에러를 처리하고 성공적인 종료도 다루며, subscibe가 호출되며 생성된 Subscription을 처리한다
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
subscribe 함수 예제
인자를 받지 않는 기본 함수 사용
수행 내역이 output에 보이지 않으나, 정상 동작
Flux<Integer> ints = Flux.range(1, 3);
ints.subscribe();
인자를 받지 않는 기본 함수 사용
Flux<Integer> ints = Flux.range(1, 3); // 3개의 시퀀스값을 가지는 Flux 생성
ints.subscribe(i -> System.out.println(i)); // Subscriber를 통해 subscribe를 하며 값을 출력한다.
ints.subscribe(System.out::println); // 함수 간단 표현
고의적으로 에러를 발생시키는 예제
Flux<Integer> ints = Flux.range(1, 4) // subscriber가 결합할 4개의 값을 가진 Flux를 생성
.map(i -> { // 값을 다르게 다룰수 있도록 map 사용
if (i <= 3) return i;
throw new RuntimeException("Got to 4");
});
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error: " + error)); // subscribe 에러 핸들러
에러 핸들러와 이벤트 종료 핸들러를 수행하는 subscribe
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error), // 에러 핸들러
() -> System.out.println("Done")); // 완료 이벤트 핸들러
Consumer<Subscription>을 보함하는 subscribe
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done"),
sub -> sub.request(10)); // 10개의 엘리먼트만 사용한다고 신호를 준다
subscribe() 취소(Disposable)
람다 기반의 subscribe() 변형들은 Disposable 리턴 타입을 가지고있다.
Disposable 인터페이스는 dispose() 메소드를 호출하여 subscription을 취소할 수 있다
Flux, Mono의 경우 cancellation은 소스가 엘레먼츠를 생성을 중단해야하는 신호이다
- 그러나, 즉시 중지가 보장되지 않는다.
- 몇몇의 소스는 요소 생성이 너무 빨라서 cancel 지시를 받기 전에 완료된다
'JAVA > Reactor' 카테고리의 다른 글
비동기 멀티 스레드 생성 (0) | 2020.06.08 |
---|---|
프로그래밍 방식의 시퀀스 생성 (0) | 2020.06.04 |
배압과 요청 재구성 방법 (0) | 2020.06.04 |
람다의 대안 : BaseSubscriber (0) | 2020.06.04 |
React 정리 (0) | 2020.06.04 |