JAVA/Reactor

비동기 멀티 스레드 생성

lovineff 2020. 6. 8. 18:28

create()

Flux의 고급 프로그래밍 방식의 생성 방법으로, 멀티 스레드에서 라운드당 다중 배출에 적합하다.

generate와는 달리 상태기반 변형이 없다

리스너 기반 비동기 API이다.

pull 방식, push 방식 모두 생성 가능하다.

 

주의사항

create는 코드를 병렬화하거나 비동기 API와 사용할 수 있지만 코드를 비동기화하지 않는다.

create lambda 내에서 차단하면, deadlock과 유사한 부작용이 발생할수 있다.

 

Flux.create()를 이용한 pull 방식 메시지 생성

Flux.generate()의 경우 한 번에 한개의 next 신호만 발생하나, Flux.create()는 한 번에 한개 이상의 next() 신호를 발생 시킨다.

subscriber로 부터 request를 받아 subscribe

Flux<Integer> flux = Flux.create((FluxSink<Integer> sink) -> {
    sink.onRequest(request -> {                  // request는 Subscriber가 요청한 데이터 개수
        for (int i = 1; i <= request; i++) {
            sink.next(i);                           // Flux.generate()의 경우와 달리 한 번에 한 개 이상의 next() 신호 발생 가능
        }
    });
});

flux.limitRequest(10).subscribe(System.out::println); // 10개의 request 요청 request 제한이 없다면 Integer.MAX_VALUE 만큼 요청된다.

 

Flux.create()를 이용한 push 방식 메시지 생성

Subscriber의 요청과 상관없이 비동기로 신호 발생

DataPump pump = new DataPump();

Flux<Integer> bridge = Flux.create((FluxSink<Integer> sink) -> {
    pump.setListener(new DataListener<Integer>() {
        @Override
        public void onData(List<Integer> chunk) {
            chunk.forEach(s -> {
                sink.next(s);                           // Subscriber의 요청에 상관없이 신호 발생
            });
        }

		@Override
        public void complete() {
            logger.info("complete");
            sink.complete();
        }
    });
});

 

Flux.create()와 배압

Subscriber로부터 요청이 왔을 때(FluxSink#onRequest) 데이터를 전송하거나(pull 방식) Subscriber의 요청에 상관없이 데이터를 전송하거나(push 방식) 두 방식 모두 Subscriber가 요청한 개수보다 더 많은 데이터를 발생할 수 있다

Flux.create()로 생성한 Flux는 초과로 발생한 데이터를 버퍼에 저장한다.

버퍼에 저장된 데이터는 다음에 Subscriber가 데이터를 요청할 때 전달된다.

Flux<Integer> flux = Flux.create( (FluxSink<Integer> sink) -> {
    sink.onRequest(request -> {
        for (int i = 1; i <= request + 3 ; i++) {        // Subscriber가 요청한 것보다 3개 더 발생
            sink.next(i);
        }
    });
});

 

Flux.fromStream(), Flux.fromIterable()을 이용한 Flux 생성

Flux.fromStream()

 - 자바 8의 Stream에서 Flux 생성

Flux.fromIterable()

 - List, Set과 같은 Collection에서 Flux 생성

Stream<String> straem = Files.lines(Paths.get(filePath));
Flux<String> seq = Flux.fromStream(straem);
seq.subscribe(System.out::println);
 

'JAVA > Reactor' 카테고리의 다른 글

에러 처리  (0) 2020.06.08
시퀀스 변환  (0) 2020.06.08
프로그래밍 방식의 시퀀스 생성  (0) 2020.06.04
배압과 요청 재구성 방법  (0) 2020.06.04
람다의 대안 : BaseSubscriber  (0) 2020.06.04