분류 전체보기 191

리액터 쓰레드 스케줄링

리엑터는 비동기 실행을 강제하지 않는다. 모두 main 쓰레드에서 실행된다. Flux.range(1, 3) .map(i -> { log.info("map {} to {}", i, i + 2); return i + 2; }) .flatMap(i -> { log.info("flatMap {} to Flux.range({}, {})", i, 1, i); return Flux.range(1, i); }) .subscribe(i -> log.info("next " + i)); publishOh을 이용한 신호 처리 쓰레드 스케줄링 publishOn()은 두개의 인자를 받는다. 첫번째 인자는 비동기로 신호를 처리할 스케쥴러이다. 두번째 인자는 스케줄러가 신호를 처리하기 전에 미리 가져올(prefetch) 데이터 개수..

JAVA/Reactor 2020.06.08

에러 처리

에러 처리 에러 신호는 종료 신호이며, 에러 신호가 발생하면 시퀀스는 종료된다. 에러 신호 처리 에러 신호가 발생하면 Subscriber의 onError 메서드가 호출된다. 이 메서드를 구현한 Subscriber를 이용해서 구독하면 에러 신호를 원하는대로 처리할 수 있다. 에러 처리를 위한 Consumerfmf 파라미터로 갖는 subscribe() 메서드를 사용해서 Exception 처리할 수 있다. 에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드 사용 Flux.range(1, 10) .map(x -> { if (x == 5) throw new RuntimeException("new RuntimeException"); // 에러 발생 else return x; }) .su..

JAVA/Reactor 2020.06.08

시퀀스 변환

1:1 변환(map) 한개의 데이터를 1:1 방식으로 변환해주며, 자바 스트림의 map()과 유사하다. Flux.just("a", "bc", "def", "wxyz") .map(String::length) // 문자열을 Integer 값으로 1-1 변환 .subscribe(System.out::println); 1:N 변환(flatMap) 1개의 데이터로부터 시퀀스 생성할때 사용, 1:N 방식의 변환 flatMap에 전달한 함수가 생성하는 Flux는 하나의 시퀀스처럼 연결되므로, flatMap의 결과는 Flux 타입이 Flux가 아니라 Flux이다. Flux seq = Flux.just(1, 2, 3) .flatMap(i -> Flux.range(1, i)); // Integer를 Flux로 1-N 변환..

JAVA/Reactor 2020.06.08

비동기 멀티 스레드 생성

create() Flux의 고급 프로그래밍 방식의 생성 방법으로, 멀티 스레드에서 라운드당 다중 배출에 적합하다. generate와는 달리 상태기반 변형이 없다 리스너 기반 비동기 API이다. pull 방식, push 방식 모두 생성 가능하다. 주의사항 create는 코드를 병렬화하거나 비동기 API와 사용할 수 있지만 코드를 비동기화하지 않는다. create lambda 내에서 차단하면, deadlock과 유사한 부작용이 발생할수 있다. Flux.create()를 이용한 pull 방식 메시지 생성 Flux.generate()의 경우 한 번에 한개의 next 신호만 발생하나, Flux.create()는 한 번에 한개 이상의 next() 신호를 발생 시킨다. subscriber로 부터 request를 받아..

JAVA/Reactor 2020.06.08

프로그래밍 방식의 시퀀스 생성

프로그래밍 방식의 시퀀스 생성 Flux, Mono를 프로그래밍 방식으로 관련 이벤트를 통해 생성하는 것을 설명한다.(onNext, onError, onComplete) 동기 방식의 시퀀스 생성 generate 사용 Flux 생성하는 가장 간단한 방법 동기 방식으로 처리하게 되며, 1번의 1개의 데이터를 생성하므로, next() 메소드는 콜백 호출 당 최대 한번만 호출된다. Subscriber로 요청이 왔을 때, next() 신호를 발생하는 Flux를 생성 error(Throwable), complete()를 추가할 수 있다 generate()가 생성한 Flux는 다음과 같은 방식으로 신호를 발생한다. - Subscriber의 요청에 대해 인자로 전달받은 generator를 실행한다. generator를 ..

JAVA/Reactor 2020.06.04

배압과 요청 재구성 방법

배압이란? Subsciber가 Publisher로 데이터 방출 속도를 제한하는 신호(signal)을 보내는 피드백 메커니즘 요청 재구성 방법 소비자 압력이 소스로 다시 전파되는 방식은 업스트림 연산자에게 요청을 보내는 것이다. 현재 요청의 합은 때때로 "요구(demand)", "보류중인 요청(pending request)"이라고 함 무한한 요청은 Long.MAX_VALUE의 값으로 제한된다 간단한 요청 재구성 방법 BaseSubscriber로 subscribe하면 되고, hookOnSubscribe 메소드를 Override하여 사용한다 Flux.range(1, 10) .doOnRequest(r -> System.out.println("request of " + r)) .subscribe(new BaseS..

JAVA/Reactor 2020.06.04

람다의 대안 : BaseSubscriber

람다로 구성하는 대신 전체 Subscriber를 가져오는 subscribe 메소드가 있다 BaseSubsriber 객체는 단일 사용이다 - 첫번째 Publisher를 cancel하면 두번째 Publisher도 cancel한다. - 인스턴스를 두 번 사용하면 구독자의 onNext 메소드를 병렬로 호출하면 안되는 Reactive Streams 규칙을 위반하기 때문입니다. - 결과적으로 익명 구현은 Publisher 호출 내에서 직접 선언 된 경우에만 적합합니다. SampleSubscriber 예시 BaseSubscriber는 subscriber의 행동을 대신하는 훅을 제공한다 무제한 요청을 트리거하고 subscribe() 와 같이 작동함 BaseSubscriber 상속은 요청량을 조절하는데 매우 유용하다 S..

JAVA/Reactor 2020.06.04

Flux, Mono 생성 후 Subscribe

Flux // String 시퀀스 생성 Flux seq1 = Flux.just("foo", "bar", "foobar"); // 객체로 부터 시퀀스 생성 List iterable = Arrays.asList("foo", "bar", "foobar"); Flux seq2 = Flux.fromIterable(iterable); // 5부터 7까지 Integer 시퀀스 Flux 생성 Flux numbersFromFiveToSeven = Flux.range(5, 3); Mono Mono 생성 참고 자료 - Mono.just vs Mono.create - https://stackoverflow.com/questions/56115447/mono-defer-vs-mono-create-vs-mono-just Mono...

JAVA/Reactor 2020.06.04

React 정리

React 프로그래밍이란? 모든것을 Stream으로 보고, 모든 데이터의 흐름을 시간 순서에 의해 전달되어지는 스트림으로 처리 하나의 문제를 비동기와 논블로킹 방식으로 실행될 수 있는 여러 단계로 분리할 수 있으며, 무한한 입력이나 입.출력을 생성할 수 있는 작업흐름(workflow)을 만들기 위해 결함 비동기는 클라이언트에서 서비스로 전송이 요청온 이후 임의의 시점에 처리된다는 의미이며, 논블로킹을 가능하게 하는 리액티브 프로그래밍에서 굉장히 중요한 기술 필요한 경우에만 스레드를 생성 후 메시지 형태로 전달하기 때문에 효율적으로 컴퓨터 리소스를 사용할 수 있다. React 프로그래밍 원칙 즉각 반응, 반응성(Responsive) - 지속적으로 긍정적인 사용자 환경을 보장하기 위해서, 모든 사용자에게 신..

JAVA/Reactor 2020.06.04

도커 파일 생성

도커 파일 생성(Dockerfile) - 프로젝트 root 경로에 Dockerfile을 생성한다. - Dockerfile 예제 # 실행 파일 빌드 부분 # 이미지는 openjdk 버전 8-jdk-alpine에서 실행된다. FROM openjdk:8-jdk-alpine as TEMP_BUILD_IMAGE # 변수 사용 ENV APP_HOME=/usr/app/ # 작업 폴더 위치 설정 WORKDIR $APP_HOME # 실행시 필수 파일 복사 COPY gradlew $APP_HOME COPY build.gradle $APP_HOME COPY settings.gradle $APP_HOME COPY gradle $APP_HOME/gradle RUN ./gradlew -x test build || return 0..

SERVER/Docker 2020.06.04