JAVA/Reactor

Blocking I/O 처리 방안

lovineff 2020. 6. 8. 18:47

blocking calll 부분을 별도의 쓰레드에서 background로 돌리는 방법을 사용.

Mono blockingWrapper = Mono.fromCallable(() -> { 
    return /* make a remote synchronous call */ 
}).subscribeOn(Schedulers.elastic());

 

Mono.fromCallable을 사용하여 blocking call 부분의 실행을 미루고, 이를 Schedulers.elastic()을 사용하여 blocking 자원을 기다리는 별도의 쓰레드를 생성하여 실행

 

DB Callback 부분을 아래와 같이 처리

public Mono<Memo> save(MemoRequestDTO memoRequestDTO) { 
    return Mono.fromCallable(() -> memoRepository.save(Memo.of(memoRequestDTO)))
                .subscribeOn(Schedulers.elastic())
                .log(); 
}

 

Mono to Flux 처리 방법

// blocking call 부분을 별도의 쓰레드에서 처리하기 위한 방법
return Mono.fromCallable(userRepository::findAllByQueryDsl) // Mono<List<User>>
        .flatMapMany(Flux::fromIterable)    // Flux<User>
        .subscribeOn(Schedulers.elastic());

// Blocking Call
return Flux.fromIterable(userRepository.findAllByQueryDsl())    // 쿼리 수행은 Blocking IO에서 처리된다.
        .collectList()  // 변환 작업은 Blocking IO에서 처리된다.
        .subscribeOn(Schedulers.elastic()); // 최종 결과 전달을 별도의 쓰레드에서 처리한다.

 

'JAVA > Reactor' 카테고리의 다른 글

모으기(aggregation) 연산  (0) 2020.06.08
리액터 쓰레드 스케줄링  (0) 2020.06.08
에러 처리  (0) 2020.06.08
시퀀스 변환  (0) 2020.06.08
비동기 멀티 스레드 생성  (0) 2020.06.08