JAVA/Reactor 11

Blocking I/O 처리 방안

blocking calll 부분을 별도의 쓰레드에서 background로 돌리는 방법을 사용. Mono blockingWrapper = Mono.fromCallable(() -> { return /* make a remote synchronous call */ }).subscribeOn(Schedulers.elastic()); Mono.fromCallable을 사용하여 blocking call 부분의 실행을 미루고, 이를 Schedulers.elastic()을 사용하여 blocking 자원을 기다리는 별도의 쓰레드를 생성하여 실행 DB Callback 부분을 아래와 같이 처리 public Mono save(MemoRequestDTO memoRequestDTO) { return Mono.fromCallab..

JAVA/Reactor 2020.06.08

모으기(aggregation) 연산

List 콜렉션으로 모으기(collectList) collectList()의 리턴 타입은 Mono이므로 Mono를 구독해서 값을 사용하면 된다. Mono mono = someFlux.collectList(); mono.subscribe(lst -> System.out.println(lst)); Map 콜렉션으로 모으기(collectMap) keyExtractor : 데이터에서 맵의 키를 제공하는 함수 valueExtractor : 데이터에서 맵의 값을 제공하는 함수 mapSupplier : 사용할 Map 객체를 제공(mapSupplier가 없는 메서드는 기본으로 HashMap 사용) Mono mono = someFlux.collectList(); mono.subscribe(lst -> System.out..

JAVA/Reactor 2020.06.08

리액터 쓰레드 스케줄링

리엑터는 비동기 실행을 강제하지 않는다. 모두 main 쓰레드에서 실행된다. Flux.range(1, 3) .map(i -> { log.info("map {} to {}", i, i + 2); return i + 2; }) .flatMap(i -> { log.info("flatMap {} to Flux.range({}, {})", i, 1, i); return Flux.range(1, i); }) .subscribe(i -> log.info("next " + i)); publishOh을 이용한 신호 처리 쓰레드 스케줄링 publishOn()은 두개의 인자를 받는다. 첫번째 인자는 비동기로 신호를 처리할 스케쥴러이다. 두번째 인자는 스케줄러가 신호를 처리하기 전에 미리 가져올(prefetch) 데이터 개수..

JAVA/Reactor 2020.06.08

에러 처리

에러 처리 에러 신호는 종료 신호이며, 에러 신호가 발생하면 시퀀스는 종료된다. 에러 신호 처리 에러 신호가 발생하면 Subscriber의 onError 메서드가 호출된다. 이 메서드를 구현한 Subscriber를 이용해서 구독하면 에러 신호를 원하는대로 처리할 수 있다. 에러 처리를 위한 Consumerfmf 파라미터로 갖는 subscribe() 메서드를 사용해서 Exception 처리할 수 있다. 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드 사용 Flux.range(1, 10) .map(x -> { if (x == 5) throw new RuntimeException("new RuntimeException"); // 에러 발생 else return x; }) .su..

JAVA/Reactor 2020.06.08

시퀀스 변환

1:1 변환(map) 한개의 데이터를 1:1 방식으로 변환해주며, 자바 스트림의 map()과 유사하다. Flux.just("a", "bc", "def", "wxyz") .map(String::length) // 문자열을 Integer 값으로 1-1 변환 .subscribe(System.out::println); 1:N 변환(flatMap) 1개의 데이터로부터 시퀀스 생성할때 사용, 1:N 방식의 변환 flatMap에 전달한 함수가 생성하는 Flux는 하나의 시퀀스처럼 연결되므로, flatMap의 결과는 Flux 타입이 Flux가 아니라 Flux이다. Flux seq = Flux.just(1, 2, 3) .flatMap(i -> Flux.range(1, i)); // Integer를 Flux로 1-N 변환..

JAVA/Reactor 2020.06.08

비동기 멀티 스레드 생성

create() Flux의 고급 프로그래밍 방식의 생성 방법으로, 멀티 스레드에서 라운드당 다중 배출에 적합하다. generate와는 달리 상태기반 변형이 없다 리스너 기반 비동기 API이다. pull 방식, push 방식 모두 생성 가능하다. 주의사항 create는 코드를 병렬화하거나 비동기 API와 사용할 수 있지만 코드를 비동기화하지 않는다. create lambda 내에서 차단하면, deadlock과 유사한 부작용이 발생할수 있다. Flux.create()를 이용한 pull 방식 메시지 생성 Flux.generate()의 경우 한 번에 한개의 next 신호만 발생하나, Flux.create()는 한 번에 한개 이상의 next() 신호를 발생 시킨다. subscriber로 부터 request를 받아..

JAVA/Reactor 2020.06.08

프로그래밍 방식의 시퀀스 생성

프로그래밍 방식의 시퀀스 생성 Flux, Mono를 프로그래밍 방식으로 관련 이벤트를 통해 생성하는 것을 설명한다.(onNext, onError, onComplete) 동기 방식의 시퀀스 생성 generate 사용 Flux 생성하는 가장 간단한 방법 동기 방식으로 처리하게 되며, 1번의 1개의 데이터를 생성하므로, next() 메소드는 콜백 호출 당 최대 한번만 호출된다. Subscriber로 요청이 왔을 때, next() 신호를 발생하는 Flux를 생성 error(Throwable), complete()를 추가할 수 있다 generate()가 생성한 Flux는 다음과 같은 방식으로 신호를 발생한다. - Subscriber의 요청에 대해 인자로 전달받은 generator를 실행한다. generator를 ..

JAVA/Reactor 2020.06.04

배압과 요청 재구성 방법

배압이란? Subsciber가 Publisher로 데이터 방출 속도를 제한하는 신호(signal)을 보내는 피드백 메커니즘 요청 재구성 방법 소비자 압력이 소스로 다시 전파되는 방식은 업스트림 연산자에게 요청을 보내는 것이다. 현재 요청의 합은 때때로 "요구(demand)", "보류중인 요청(pending request)"이라고 함 무한한 요청은 Long.MAX_VALUE의 값으로 제한된다 간단한 요청 재구성 방법 BaseSubscriber로 subscribe하면 되고, hookOnSubscribe 메소드를 Override하여 사용한다 Flux.range(1, 10) .doOnRequest(r -> System.out.println("request of " + r)) .subscribe(new BaseS..

JAVA/Reactor 2020.06.04

람다의 대안 : BaseSubscriber

람다로 구성하는 대신 전체 Subscriber를 가져오는 subscribe 메소드가 있다 BaseSubsriber 객체는 단일 사용이다 - 첫번째 Publisher를 cancel하면 두번째 Publisher도 cancel한다. - 인스턴스를 두 번 사용하면 구독자의 onNext 메소드를 병렬로 호출하면 안되는 Reactive Streams 규칙을 위반하기 때문입니다. - 결과적으로 익명 구현은 Publisher 호출 내에서 직접 선언 된 경우에만 적합합니다. SampleSubscriber 예시 BaseSubscriber는 subscriber의 행동을 대신하는 훅을 제공한다 무제한 요청을 트리거하고 subscribe() 와 같이 작동함 BaseSubscriber 상속은 요청량을 조절하는데 매우 유용하다 S..

JAVA/Reactor 2020.06.04

Flux, Mono 생성 후 Subscribe

Flux // String 시퀀스 생성 Flux seq1 = Flux.just("foo", "bar", "foobar"); // 객체로 부터 시퀀스 생성 List iterable = Arrays.asList("foo", "bar", "foobar"); Flux seq2 = Flux.fromIterable(iterable); // 5부터 7까지 Integer 시퀀스 Flux 생성 Flux numbersFromFiveToSeven = Flux.range(5, 3); Mono Mono 생성 참고 자료 - Mono.just vs Mono.create - https://stackoverflow.com/questions/56115447/mono-defer-vs-mono-create-vs-mono-just Mono...

JAVA/Reactor 2020.06.04