JAVA/JAVA

CompletableFuture

lovineff 2020. 11. 13. 11:56

ParallelStream VS CompletableFuture

 

병렬 처리에 영향을 미치는 다음 3가지 요인을 잘 살펴보아야 한다.
1)  요소의 수와 요소당 처리 시간
컬렉션에 요소의 수가 적고 요소당 처리 시간이 짧으면 순차 처리가 오히려 병렬 처리보다 빠를 수 있음. 병렬 처리는 스레드풀 생성, 스레드 생성이라는 추가적인 비용이 발생하기 때문

 2)  스트림 소스의 종류
ArrayList, 배열은 인덱스로 요소를 관리하기 때문에 포크 단계에서 요소를 쉽게 분리할 수 있어 병렬 처리 시간이 절약된다. 
반면 HashSet, TreeSet은 요소 분리가 쉽지 않고, LinkedList 역시 링크를 따라가야 하므로 요소 분리가 쉽지 않음. 
따라서 이 소스들은 ArrayList, 배열보다는 상대적으로 병렬 처리가 늦다.

3)  코어(Core)의 수
싱글 코어 CPU일 경우에는 순차 처리가 빠르고. 병렬 스트림을 사용할 경우 스레드의 수만 증가하고 동시성 작업으로 처리되기 때문에 좋지 못한 결과를 보여준다. 
코어의 수가 많으면 많을수록 병렬 작업 처리 속도는 빨라짐. 

 

ParallelStream

시스템에서 제공하는 쓰레드 수만 사용이 가능하다.

CompletableFuture

쓰레드 수를 직접 지정하여 사용 가능하다.

동시에 여러 블록을 실행할 수 있다.

ParallelStream과 속도가 비슷하거나 빠르다.

시스템에서 사용하는 데몬 쓰레드수를 재정의하여 사용

작성시 테스트를 반드시 수행한다 

 

CompletableFuture 사용법

비동기 코드 실행 방법

CompletableFuture의 runAsync, supplyAsync 함수 사용

runAsync 는 return 타입이 없는 Runnable을 매개변수로 받고, supplyAsync는 return 타입이 있는 Supplier를 매개변수로 받다.

CompletableFuture의 수행 결과를 받아 다시 비동기 코드를 실행 하고 싶은 경우

thenApply

함수를 통해 결과값을 Function 으로 보낼 수 있다.

thenAccept

return 값이 필요없는 Consumer를 매개변수로 받는다.

독립된 CompletableFuture의 실행 결과를 받아 처리

thenCompose

독립된 CompletableFuture의 실행 결과값이 필요 없는 경우

thenCombine

독립된 CompletableFuture의 실행 결과값을 받아 처리해야 하는 경우

최종 반환값이 있다.

thenAcceptBoth

독립된 CompletableFuture의 실행 결과값을 받아 처리해야 하는 경우

최종 반환값이 없다

여러 CompletableFuture을 병렬로 실행하고 모든 프로세스가 끝나길 기다리는 경우

allOf(...)

 

 

then 이라는 접두어는 앞의 타스크들은 모두 반드시 완료되어야 한다는 의미이다. 앞의 타스크들은 하나혹은 두개일수 있다.
Apply라는 동사는 뒤에 Function형 람다식이 올거라는 것을 의미한다.
Function형은 첫번째 임의의 형태<T>의 입력값을 받아 처리한후 두번째 임의의 형태<R>의 값으로 출력하는 람다식이다.

 

CompletableFuture  병렬 처리 방법

함수 연속 호출 후 결과값 처리

함수를 2번 연속 호출한 결과값 반환

Executor 선언시 설정한 쓰레드수에 따라 결과가 다르다.

private final Executor executor = Executors.newFixedThreadPool(100, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } });
 
List<Integer> list = IntStream.rangeClosed(1, 100)
        .boxed()
        .collect(Collectors.toList());
 
// parallelStream 사용, 약 7초 소요
List<Integer> finalList = list.parallelStream()
        .map(n -> addTenAsync("parallelStream1", n))
        .map(m -> addTenAsync("parallelStream2", m))
        .collect(Collectors.toList());
 
finalList.stream().forEach(num -> log.info(String.valueOf(num)));
 
// CompletableFuture 사용, 약 1초 소요
List<CompletableFuture<Integer>> caledList = list.stream()
        .map(n -> CompletableFuture.supplyAsync(() -> addTenAsync("CompletableFuture1", n), executor))
        .map(future -> future.thenApplyAsync(m -> addTenAsync("CompletableFuture2", m), executor))
        .collect(Collectors.toList());
 
caledList.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList())
        .stream().forEach(num -> log.info(String.valueOf(num)));
 
 
 
// 함수 정의
public int addTenAsync(String worker, int num){
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.debug("worker : {} / num : {} end ", worker, num);
    return num + 10;
}

블록 함수 호출 비동기 처리

private final Executor executor = Executors.newFixedThreadPool(100, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } });
 
List<Integer> list = IntStream.rangeClosed(1, 100)
        .boxed()
        .collect(Collectors.toList());
 
 
 
// 1번 방법
List<CompletableFuture<Void>> collect = list.stream()
        .map(num -> {
            // 여러 CompletableFuture을 병렬로 실행하고 모든 프로세스가 끝나길 기다리는 경우
            CompletableFuture<Void> callApi1 = CompletableFuture.runAsync(() -> api1(num), executor);   // 0.5초 딜레이
            CompletableFuture<Void> callApi2 = CompletableFuture.runAsync(() -> api2(num), executor);   // 0.5초 딜레이
 
            return CompletableFuture.allOf(callApi1, callApi2);
        }).collect(Collectors.toList());
 
collect.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
}
 
 
// 2번 방법
List<CompletableFuture<Integer>> collect = list.stream()
        .map(num -> {
            // 독립된 CompletableFuture의 실행 결과를 받아 처리
            return CompletableFuture.supplyAsync(() -> api1(num), executor)
                    .thenCombine(CompletableFuture.supplyAsync(() -> api2(num), executor), (r1, r2) ->{
                        log.info("result {} / {} ", r1, r2);
                        return r1 + r2;
                    })
            ;
        }).collect(Collectors.toList());
 
collect.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
 
 
 
 
// 함수 정의
public int api1(int num){
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.info("api1 > {}", num);
    return num;
}
 
public int api2(int num){
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.info("api2 > {}", num);
    return num;
}

'JAVA > JAVA' 카테고리의 다른 글

이메일 형식 검사  (0) 2021.03.09
전화번호 구분자 추가  (0) 2021.03.09
queryDsl QClass 빌드 설정(gradle)  (0) 2020.11.11
HashMap 반복문 조회  (0) 2020.11.09
Integral division result cast to double or float  (0) 2020.11.09