JAVA/Reactor

Flux, Mono 생성 후 Subscribe

lovineff 2020. 6. 4. 18:51

Flux

// String 시퀀스 생성
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");    

// 객체로 부터 시퀀스 생성
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

// 5부터 7까지 Integer 시퀀스 Flux 생성
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);

 

Mono

Mono 생성 참고 자료

 - Mono.just vs Mono.create

 - https://stackoverflow.com/questions/56115447/mono-defer-vs-mono-create-vs-mono-just 

 

Mono.Defer() vs Mono.create() vs Mono.just()?

Could someone help me to understand the difference between Mono.defer(), Mono.create() and Mono.just()? How to use it properly?

stackoverflow.com

// 값이 없는 Mono 생성
Mono<String> noData = Mono.empty();

// String Mono 시퀀스 생성
Mono<String> data = Mono.just("foo");

 

subscribe() 함수

Consumer : next 신호 처리

errorConsumer : error 신호 처리

completeConsumer : complete 신호 처리

subscriptionConsumer : Subscriber의 onSubscribe 메서드에 대응

// Subscribe를 수행하고 시퀀스를 작동시킨다
subscribe(); 

// 생성된 값으로 무언가를 수행한다
subscribe(Consumer<? super T> consumer); 

// 값을 처리하고 에러 반응한다
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 

// 값과 에러를 처리하고, 시퀀스가 성공적으로 종료되면 무언가를 수행한다
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 

// 값과 에러를 처리하고 성공적인 종료도 다루며, subscibe가 호출되며 생성된 Subscription을 처리한다
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);

 

subscribe 함수 예제

인자를 받지 않는 기본 함수 사용

수행 내역이 output에 보이지 않으나, 정상 동작

Flux<Integer> ints = Flux.range(1, 3); 
ints.subscribe();

인자를 받지 않는 기본 함수 사용

Flux<Integer> ints = Flux.range(1, 3);        // 3개의 시퀀스값을 가지는 Flux 생성
ints.subscribe(i -> System.out.println(i)); // Subscriber를 통해 subscribe를 하며 값을 출력한다.

ints.subscribe(System.out::println);    // 함수 간단 표현

고의적으로 에러를 발생시키는 예제

Flux<Integer> ints = Flux.range(1, 4)                     // subscriber가 결합할 4개의 값을 가진 Flux를 생성
      .map(i -> {                                        // 값을 다르게 다룰수 있도록 map 사용
        if (i <= 3) return i; 
        throw new RuntimeException("Got to 4"); 
      });
ints.subscribe(i -> System.out.println(i),          
      error -> System.err.println("Error: " + error));   // subscribe 에러 핸들러 

 

에러 핸들러와 이벤트 종료 핸들러를 수행하는 subscribe

Flux<Integer> ints = Flux.range(1, 4); 
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),       // 에러 핸들러
    () -> System.out.println("Done"));                   // 완료 이벤트 핸들러

 

Consumer<Subscription>을 보함하는 subscribe

Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done"),
    sub -> sub.request(10));                             // 10개의 엘리먼트만 사용한다고 신호를 준다

 

subscribe() 취소(Disposable)

람다 기반의 subscribe() 변형들은 Disposable 리턴 타입을 가지고있다.

Disposable 인터페이스는 dispose() 메소드를 호출하여 subscription을 취소할 수 있다

Flux, Mono의 경우 cancellation은 소스가 엘레먼츠를 생성을 중단해야하는 신호이다

 - 그러나, 즉시 중지가 보장되지 않는다.

 - 몇몇의 소스는 요소 생성이 너무 빨라서 cancel 지시를 받기 전에 완료된다

 

'JAVA > Reactor' 카테고리의 다른 글

비동기 멀티 스레드 생성  (0) 2020.06.08
프로그래밍 방식의 시퀀스 생성  (0) 2020.06.04
배압과 요청 재구성 방법  (0) 2020.06.04
람다의 대안 : BaseSubscriber  (0) 2020.06.04
React 정리  (0) 2020.06.04