JAVA/Reactor

프로그래밍 방식의 시퀀스 생성

lovineff 2020. 6. 4. 18:57

프로그래밍 방식의 시퀀스 생성 

Flux, Mono를 프로그래밍 방식으로 관련 이벤트를 통해 생성하는 것을 설명한다.(onNextonErroronComplete)

 

 

동기 방식의 시퀀스 생성

generate 사용

Flux 생성하는 가장 간단한 방법

동기 방식으로 처리하게 되며, 1번의 1개의 데이터를 생성하므로, next() 메소드는 콜백 호출 당 최대 한번만 호출된다.

Subscriber로 요청이 왔을 때, next() 신호를 발생하는 Flux를 생성

error(Throwable), complete()를 추가할 수 있다 

generate()가 생성한 Flux는 다음과 같은 방식으로 신호를 발생한다.

- Subscriber의 요청에 대해 인자로 전달받은 generator를 실행한다. generator를 실행할 때 인자로 SynchronousSink를 전달한다.

- generator는 전달받은 SynchronousSink를 사용해서 next, complete, error 신호를 발생한다. 한 번에 1개의 next() 신호만 발생할 수 있다.

Flux<String> flux = Flux.generate(
    () -> 0,                                             // 초기 상태값 0 설정
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state);      // state 값으로 방출할 값을 선택한다.
      if (state == 10) sink.complete();                 // 정지하기 위해 사용
      return state + 1;                                 // 다음 호출에서 사용할 state값 반환
    });

flux.subscribe(System.out::println);                    // subscribe

 

SynchronousSink를 이용한 동기 신호 생성

Consumer<SynchronousSink<Integer>> randGen = new Consumer<SynchronousSink<Integer>>() {
    private int emitCount = 0;
    private Random rand = new Random();

    @Override
    public void accept(SynchronousSink<Integer> sink) {
        emitCount++;
        int data = rand.nextInt(100) + 1;       // 1~100 사이 임의 정수 생성
        log.info("Generator sink next " + data);
        sink.next(data);                        // 신호 발생
        if (emitCount == 10) {                  // 10개 데이터를 발생했으면
            log.info("Generator sink complete");
            sink.complete();                    // 완료 신호 발생
        }
    }
};

Flux.generate(randGen).subscribe(new BaseSubscriber<Integer>() {
    private int receiveCount = 0;
    @Override
    protected void hookOnSubscribe(Subscription subscription) {  // 구독 시작
        log.info("Subscriber#onSubscribe");
        log.info("Subscriber request first 3 items");
        request(3);                                              // 3개 신호 요청
    }

    @Override
    protected void hookOnNext(Integer value) {                   // 다음 신호 요청시 수행
        log.info("Subscriber#onNext: " + value);
        receiveCount++;
        if (receiveCount % 3 == 0) {
            log.info("Subscriber request next 3 items");
            request(3);                                          // 3개 신호 요청
        }
    }

    @Override
    protected void hookOnComplete() {                            // 구독 완료
        log.info("Subscriber#onComplete");
    }
});

 

'JAVA > Reactor' 카테고리의 다른 글

시퀀스 변환  (0) 2020.06.08
비동기 멀티 스레드 생성  (0) 2020.06.08
배압과 요청 재구성 방법  (0) 2020.06.04
람다의 대안 : BaseSubscriber  (0) 2020.06.04
Flux, Mono 생성 후 Subscribe  (0) 2020.06.04