에러 처리
에러 신호는 종료 신호이며, 에러 신호가 발생하면 시퀀스는 종료된다.
에러 신호 처리
에러 신호가 발생하면 Subscriber의 onError 메서드가 호출된다.
이 메서드를 구현한 Subscriber를 이용해서 구독하면 에러 신호를 원하는대로 처리할 수 있다.
에러 처리를 위한 Consumerfmf 파라미터로 갖는 subscribe() 메서드를 사용해서 Exception 처리할 수 있다.
에러 처리를 위한 Consumer를 파라미터로 갖는 subscribe() 메서드 사용
Flux.range(1, 10)
.map(x -> {
if (x == 5) throw new RuntimeException("new RuntimeException"); // 에러 발생
else return x;
})
.subscribe(
i -> System.out.println(i), // next 신호 처리
e -> System.err.println(e.getMessage()), // error 신호 처리
() -> System.out.println("complete") // complete 신호 처리
);
에러 발생시 기본 값 반환(onErrorReturn)
위 코드와 동일한 에러를 발생시키나, 에러 대신 값이 발생하고 시퀀스는 종료된다.
Flux.range(1, 10)
.map(x -> {
if (x == 5) throw new RuntimeException("new RuntimeException"); // 에러 발생
else return x;
})
.onErrorReturn(-1)
.subscribe(
i -> System.out.println(i), // next 신호 처리
e -> System.err.println(e.getMessage()), // error 신호 처리
() -> System.out.println("complete") // complete 신호 처리
);
Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
Exception이 특정 조건을 충족하는 경우에만 특정 값을 발생
<E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue)
익셉션이 특정 타입(Class)인 경우 특정 값을 발생
에러 발생시 다른 신호(시퀀스)나 다른 에러로 대체(onErrorResume)
시퀀스에서 발생한 에러를 입력으로 받아 결과로 Publisher를 리턴한다.(에러 발생시 이를 다른 데이터 신호로 대체)
Random random = new Random();
Flux.range(1, 10)
.map(x -> {
int rand = random.nextInt(8);
if (rand == 0) throw new IllegalArgumentException("illarg");
if (rand == 1) throw new IllegalStateException("illstate");
if (rand == 2) throw new RuntimeException("exception");
return x;
})
.onErrorResume(error -> {
if (error instanceof IllegalArgumentException) { // 에러가 IllegalArgumentException인 경우 다른 데이터 신호로 대체
return Flux.just(21, 22);
}
if (error instanceof IllegalStateException) { // 에러가 IllegalStateException인 경우 다른 데이터 신호로 대체
return Flux.just(31, 32);
}
return Flux.error(error); // 그 외의 에러인 경우 에러를 발생시킴
})
.subscribe(System.out::println);
에러를 다른 에러로 변환(onErrorMap)
Predicate이나 Class 타입을 파라미터로 갖는 메서드를 이용해서 변환할 익셉션을 제한할 수 있다.
Random random = new Random();
Flux.range(1, 10)
.map(x -> {
int rand = random.nextInt(8);
if (rand == 0) throw new IllegalArgumentException("illarg");
if (rand == 1) throw new IllegalStateException("illstate");
if (rand == 2) throw new RuntimeException("exception");
return x;
})
.onErrorMap(error -> new Exception("new Exception!!"))
.subscribe(System.out::println);
재시도(retry)
에러 발생시 구독을 재시도한다.
Flux.range(1, 5)
.map(x -> {
if (x < 4) return "num " + x;
throw new RuntimeException("boom");
})
.retry(2) // 에러 발생시 2번 재시도한다.
.subscribe(System.out::println, System.err::println);
재시도(retryWhen)
단순 재시도가 아닌 좀 더 복잡한 상황에서 재시도
retryWhen(Function< Flux<Throwable>, ? extends Publisher<?>> whenFactory)
- Flux<Throwable> : 시퀀스가 발생하는 익셉션 신호
- whenFactory : 재시도 조건에 맞게 변환한 Flux를 리턴
'JAVA > Reactor' 카테고리의 다른 글
모으기(aggregation) 연산 (0) | 2020.06.08 |
---|---|
리액터 쓰레드 스케줄링 (0) | 2020.06.08 |
시퀀스 변환 (0) | 2020.06.08 |
비동기 멀티 스레드 생성 (0) | 2020.06.08 |
프로그래밍 방식의 시퀀스 생성 (0) | 2020.06.04 |