JAVA/Reactor

배압과 요청 재구성 방법

lovineff 2020. 6. 4. 18:55

배압이란?

Subsciber가 Publisher로 데이터 방출 속도를 제한하는 신호(signal)을 보내는 피드백 메커니즘

 

요청 재구성 방법

소비자 압력이 소스로 다시 전파되는 방식은 업스트림 연산자에게 요청을 보내는 것이다.

현재 요청의 합은 때때로 "요구(demand)", "보류중인 요청(pending request)"이라고 함

무한한 요청은 Long.MAX_VALUE의 값으로 제한된다

 

간단한 요청 재구성 방법

BaseSubscriber로 subscribe하면 되고, hookOnSubscribe 메소드를 Override하여 사용한다

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

     @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
      }
    });

 

주의사항

요청을 조작 할 때 시퀀스가 진행되기에 충분한 수요를 생성해야하며. 그렇지 않으면 Flux가 "고정"될 수 있다.

이 이유 때문에 BseSubscriber가 기본적으로 hookOnSubscribe를 무제한으로 설정 되어있다.

hookOnSubscribe를 재정의하는 경우 요청을 한번 이상 호출해야 한다

 

다운 스트림에서 요청을 수정하는 연산자

subscribe level에서 표현된 수요는 업스트림 체인의 각 연산자에 의해 재구성될 수 있다

request(2)를 받으면, 두개의 풀 버퍼에 대한 요구로 해석된다

결과적으로 버퍼에는 N 개의 요소가 가득찬것으로 관주되므로 버퍼 연산자는 요청을 2 * N으로 재구성한다.

요청을 수정할 수 있는 연산자

- limitRate

다운 스트림 요청을 분할하여 작은 배치로 업스트림에 전파되도록함

- limitRequest

다운 스트림 요청을 최대 N개로 제한

 

'JAVA > Reactor' 카테고리의 다른 글

비동기 멀티 스레드 생성  (0) 2020.06.08
프로그래밍 방식의 시퀀스 생성  (0) 2020.06.04
람다의 대안 : BaseSubscriber  (0) 2020.06.04
Flux, Mono 생성 후 Subscribe  (0) 2020.06.04
React 정리  (0) 2020.06.04