리엑터는 비동기 실행을 강제하지 않는다.
모두 main 쓰레드에서 실행된다.
Flux.range(1, 3)
.map(i -> {
log.info("map {} to {}", i, i + 2);
return i + 2;
})
.flatMap(i -> {
log.info("flatMap {} to Flux.range({}, {})", i, 1, i);
return Flux.range(1, i);
})
.subscribe(i -> log.info("next " + i));
publishOh을 이용한 신호 처리 쓰레드 스케줄링
publishOn()은 두개의 인자를 받는다.
첫번째 인자는 비동기로 신호를 처리할 스케쥴러이다.
두번째 인자는 스케줄러가 신호를 처리하기 전에 미리 가져올(prefetch) 데이터 개수이다.
- 스케줄러가 생성하는 비동기 경계 시점에 보관할 수 있는 데이터의 개수로 일종의 버퍼 크기가 된다.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.map(i -> {
log.info("map 1: {} + 10", i);
return i + 10;
})
.publishOn(Schedulers.newElastic("PUB"), 2) // 시퀀스 2개 이후의 데이터가 수행된다???
.map(i -> { // publishOn에서 지정한 PUB 스케줄러가 실행
log.info("map 2: {} + 10", i);
return i + 10;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
log.info("hookOnSubscribe");
requestUnbounded();
}
@Override
protected void hookOnNext(Integer value) {
log.info("hookOnNext: " + value); // publishOn에서 지정한 스케줄러가 실행
}
@Override
protected void hookOnComplete() {
log.info("hookOnComplete"); // publishOn에서 지정한 스케줄러가 실행
latch.countDown();
}
});
latch.await();
publicOn()에 지정한 스케줄러는 다음 publishOn()을 설정할 때까지 적용된다.
- 첫 번째 publishOn()과 두 번째 publishOn() 사이의 map() 처리는 PUB1 스케줄러가 실행하고 두 번째 publishOn() 이후의 map(), 신호 처리는 PUB2 스케줄러가 실행한 것을 알 수 있다.
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.publishOn(Schedulers.newElastic("PUB1"), 2)
.map(i -> {
log.info("map 1: {} + 10", i);
return i + 10;
})
.publishOn(Schedulers.newElastic("PUB2"))
.map(i -> {
log.info("map 2: {} + 10", i);
return i + 10;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
log.info("hookOnSubscribe");
requestUnbounded();
}
@Override
protected void hookOnNext(Integer value) {
log.info("hookOnNext: " + value);
}
@Override
protected void hookOnComplete() {
log.info("hookOnComplete");
latch.countDown();
}
});
latch.await();
subscribeOn을 이용한 구독 처리 쓰레드 스케줄링
subscribeOn()을 사용하면 Subscriber가 시퀀스에 대한 request신호를 별도 스케줄러로 처리한다.(시퀀스를 실행할 스케줄러를 지정한다)
Flux.range()가 생성한 시퀀스의 신호 발생뿐만 아니라 map() 실행, Subscriber의 next, complete 신호 처리를 "SUB" 스케줄러가 실행한다
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.log() // 보다 상세한 로그 출력 위함
.subscribeOn(Schedulers.newElastic("SUB"))
.map(i -> {
log.info("map: {} + 10", i);
return i + 10;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
log.info("hookOnSubscribe"); // main thread
request(1);
}
@Override
protected void hookOnNext(Integer value) {
log.info("hookOnNext: " + value); // SUB 쓰레드
request(1);
}
@Override
protected void hookOnComplete() {
log.info("hookOnComplete"); // SUB 쓰레드
latch.countDown();
}
});
latch.await();
SubscribeOn() + publishOn() 조합
publishOn()으로 PUB1 스케줄러를 지정하기 전까지는 SUB 스케줄러가 request 요청과 map1, mapByPub1 변환 처리하는것을 확인할 수 있다.
subscribeOn()이 publishOn() 뒤에 위치하면 실질적으로 prefetch할 때를 제외하면 적용되지 않는다.
subscribeOn()은 원본 시퀀스의 신호 발생을 처리할 스케줄러를 지정하므로 시퀀스 생성 바로 뒤에 subscribeOn()을 지정해야한다.
또한 두 개 이상의 subscribeOn()을 지정해도 첫 번째 subscribeOn()만 적용된다
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 6)
.log()
.subscribeOn(Schedulers.newElastic("SUB"))
.map(i -> {
log.info("map1: " + i + " --> " + (i + 20));
return i + 20;
})
.map(i -> {
log.info("mapBySub: " + i + " --> " + (i + 100));
return i + 100;
})
.publishOn(Schedulers.newElastic("PUB1"), 2)
.map(i -> {
log.info("mapByPub1: " + i + " --> " + (i + 1000));
return i + 1000;
})
.publishOn(Schedulers.newElastic("PUB2"), 2)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
log.info("hookOnSubscribe");
request(1);
}
@Override
protected void hookOnNext(Integer value) {
log.info("hookOnNext: " + value);
request(1);
}
@Override
protected void hookOnComplete() {
log.info("hookOnComplete");
latch.countDown();
}
});
latch.await();
스케줄러 종류
Schedulers.immediate() : 현재 쓰레드에서 실행한다.
Schedulers.single() : 쓰레드가 한 개인 쓰레드 풀을 이용해서 실행한다. 즉 한 쓰레드를 공유한다.
Schedulers.elastic() : 쓰레드 풀을 이용해서 실행한다. 블로킹 IO를 리액터로 처리할 때 적합하다. 쓰레드가 필요하면 새로 생성하고 일정 시간(기본 60초) 이상 유휴 상태인 쓰레드는 제거한다. 데몬 쓰레드를 생성한다.
Schedulers.parallel() : 고정 크기 쓰레드 풀을 이용해서 실행한다. 병렬 작업에 적합하다.
스케줄러 생성 방법
newSingle(String name)
newSingle(String name, boolean daemon)
newElastic(String name)
newElastic(String name, int ttlSeconds)
newElastic(String name, int ttlSeconds, boolean daemon)
newParallel(String name)
newParallel(String name, int parallelism)
newParallel(String name, int parallelism, boolean daemon)
파라미터는 다음과 같다
name : 쓰레드 이름으로 사용할 접두사이다.
daemon : 데몬 쓰레드 여부를 지정한다. 지정하지 않으면 false이다. 데몬 쓰레드가 아닌 경우 JVM 종료시에 생성한 스케줄러의 dispose()를 호출해서 풀에 있는 쓰레드를 종료해야 한다.
ttlSeconds : elastic 쓰레드 풀의 쓰레드 유휴 시간을 지정한다. 지정하지 않으면 60(초)이다.
parallelism : 작업 쓰레드 개수를 지정한다. 지정하지 않으면 Runtime.getRuntime().availableProcessors()이 리턴한 값을 사용한다.
new()로 생성하는 쓰레드 풀은 기본적으로 데몬 쓰레드가 아니기 때문에 어플리케이션 종료시에는 다음과 같이 dispose() 메서드를 호출해서 쓰레드를 종료해야 한다.
그렇지 않으면 어플리케이션이 종료되지 않을수가 있다.
Scheduler scheduler = Schedulers.newElastic("SUB", 60, false);
Flux.range(1, 6)
.log()
.subscribeOn(scheduler)
.map(i -> {
log.info("map1: " + i + " --> " + (i + 20));
return i + 20;
})
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
log.info("hookOnSubscribe");
request(1);
}
@Override
protected void hookOnNext(Integer value) {
log.info("hookOnNext: " + value);
request(1);
}
@Override
protected void hookOnComplete() {
log.info("hookOnComplete");
}
});
scheduler.dispose(); // dispose 처리하지 않으면 데몬이 살아있다.
일정 주기로 tick 발생(Flux.interval)
Flux.interval()을 사용하여 일정 주기마다 신호를 발생시킬 수 있다.
Schedulers.parallel()을 사용한다.
다른 스케줄러를 사용하고 싶다면 interval(Duration, Scheduler) 메서드를 사용하면 된다.
Flux.interval(Duration.ofSeconds(1))
.subscribe(tick -> log.info("Tick " + tick));
Thread.sleep(5000); // 5초 이후 main 쓰ㅔ드가 종료된다.
병렬처리(Parallel)
시퀀스는 순차적으로 next 신호를 발생시키고 Subscriber는 순차적으로 신호를 처리하고, 리액터는 next 신호를 병렬로 처리할 수 있는 방법을 제공한다.
parallel()과 runOn()으로 Flux 병렬 처리
parallel(N) 메서드는 Flux가 생성하는 next 신호를 parallelism 개수만큼 라운드 로빈 방식으로 나눈다.
분리한 신호는 일종의 신호를 전달할 레일을 구성한다.
parallel()로 여러 레일을 만든다고 병렬 신호 처리를 하는것은 아니다.
- parallel()은 병렬로 신호를 처리할 수 있는 ParallelFlux를 리턴하는데, ParallelFlux의 runOn() 메서드에 다중 쓰레드를 사용하는 스케줄러를 전달해야 병렬로 신호를 처리한다.
Flux.range(1, 20)
.parallel(2) // 2개의 레일을 생성(홀수 레일, 짝수 레일)
.runOn(Schedulers.newParallel("PAR", 2)) // 2개의 레일을 병렬로 실행
.map(x -> {
int sleepTime = nextSleepTime(x % 2 == 0 ? 50 : 100, x % 2 == 0 ? 150 : 300); // nextSleepTime은 인자로 받은 두 정수 값 범위에 해당하는 임의의 값을 생성한다고 가정
log.info("map1 {}, sleepTime {}", x, sleepTime);
try {
sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return String.format("%02d", x);
})
.subscribe(i -> log.info("next {}", i));
private static int nextSleepTime(int a, int b) {
return (int)(Math.random() * (a - b + 1)) + b;
}
쓰레드마다 1개의 레일을 처리하는 것을 확인할 수 있다(짝, 홀수 구분)
Mono 병렬 처리(Mono.zip() 사용)
쓰레드 마다 1개씩 처리하는 것을 확인할 수 있다.
DB도 마찬가지로 병렬 쓰레드 처리가 가능하다.
Mono m1 = Mono.just(1).map(x -> {
log.info("1 sleep");
try {
sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}).subscribeOn(Schedulers.parallel());
Mono m2 = Mono.just(2).map(x -> {
log.info("2 sleep");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}).subscribeOn(Schedulers.parallel());
Mono m3 = Mono.just(3).map(x -> {
log.info("3 sleep");
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}).subscribeOn(Schedulers.parallel());
log.info("Mono.zip(m1, m2, m3)");
Mono.zip(m1, m2, m3)
.subscribe(tup -> log.info("next: " + tup));
sleep(5000); // parallel 스케줄러는 데몬 스레드이며, main thread는 비 데몬 스레드로 main문이 종되면 데몬 스레드는 자동 종료되므로, sleep 처리함.
'JAVA > Reactor' 카테고리의 다른 글
Blocking I/O 처리 방안 (0) | 2020.06.08 |
---|---|
모으기(aggregation) 연산 (0) | 2020.06.08 |
에러 처리 (0) | 2020.06.08 |
시퀀스 변환 (0) | 2020.06.08 |
비동기 멀티 스레드 생성 (0) | 2020.06.08 |