ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 챕터[16] 안정적 비동기 프로그래밍
    모던자바인액션 2020. 10. 21. 16:06

    서론

    비동기 작업을 만들고 결과 얻기

    비블록 동작으로 생산성 높이기

    비동기 API 설계 및 구현


    Future

    자바 8 이전의 코드에서는 Future를 통하여 비동기 작업을 진행하였지만 Future의 값을 얻을려고 접근을 하게 될 경우 블록이 되어 작업이 끝날때까지 무기한(쓰레드 대기시간) 기다리는 문제가 발생할 수 있다.

    ExecutorService executorService = Executors.newCachedThreadPool();
            Future<Double> future = executorService.submit(new Callable<Double>() {
                @Override
                public Double call() throws Exception {
                    // 오래걸리는 작업은 다른 스레드에서 실행되게 (실제로는 null이 아님)
                    return null;
                }
            });
            try {
                // 블록 방지 ( 1초간 대기 )
                Double result = future.get(1, TimeUnit.SECONDS);
            } catch (ExecutionException e) {
                // 계산 중 예외 발생
            } catch (InterruptedException e2) {
                // 현재 스레드에서 대기 중 인터럽트 발생
            } catch (TimeoutException e3) {
                // Future가 완료되지 전에 타임아웃 발생
            }

    동기 API 와 비동기 API

    전통적인 동기 API는 메서드를 호출한 다음 메서드가 계산을 완료할 때까지 기다렸다 메서드가 반환되면 호출자는 반환된 값을 이용하여 다른 작업을 수행한다. 호출자와 피호출자가 각각 다른 스레드에서 실행되는 상황이었더라도 호출자는 피호출자의 동작 완료를 기다렸을 것 이다. 이처럼 동기 API를 사용하는 상황을 블록 호출이라고 한다.

    비동기 API 에서는 메서드가 즉시 반환되며 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다. 이와 같은 비동기 API를 사용하는 상황을 비블록 호출이라고 한다.

     

    에러처리

    아래의 샘플 코드는 문제없이 작동을 하지만 futurePrice 값을 계산하는 과정에서 에러가 발생할 경우 해당 스레드에만 영향을 미친다. 그렇기에 에러가 발생해도 가격 계산은 계속 진행되며 일의 순서가 꼬인다. 결과적으로 클라이언트는 get 메서드가 반환될 때까지 영원히 기다리게 될 수 있다. 그렇기에 이처럼 블록 문제가 발생할 수 있는 상황에서는 타임아웃을 활용해야 문제가 발생 시 클라이언트가 영원히 블록되지 않고 타임아웃 시간이 지나면 TImeoutException을 받을 수 있다.

    
        @Test
        public void asyncFuture() {
            Shop shop = new Shop("BestShop");
            long start = System.nanoTime();
            Future<Double> future = shop.getPriceAsync1("myProduct");
            long invocationTime = ((System.nanoTime() - start)) / 1_000_000;
            log.info("Invocation returned after invocationTime: {}", invocationTime);
            // 다른 스레드 작업 메소드
            try {
                double price = future.get(); // 가격정보가 있으면 Future에서 가격 정보를 읽고 없으면 블록한다.
                log.info("price: {}", price);
            } catch (Exception e) {
                throw new RuntimeException(e);
                completableFuture.completeExceptionally(e); // CompletableFuture 내부에서 발생한 에러 전파
            }
            long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
            log.info("price return after: {}", retrievalTime);
            // invocationTime: 3
            // price: 146.6...
            // after: 1033
        }
        

    비블록 코드 만들기

    
        // 순차 검색 코드
        @Test
        public void noneBlockCode1() {
            long start = System.nanoTime();
            findPrices("myPhone").forEach(price -> log.info("price: {}", price));
            long end = (System.nanoTime() - start) / 1_000_000;
            log.info("endTime: {}", end);
            // 4035
        }
        private List<String> findPrices(String product) {
            return shops.stream()
                    .map(shop -> String.format("%s price is %.3f", shop.getName(), shop.getPrice(product)))
                    .collect(Collectors.toList());
        }
        
        // 병렬 검색 코드
        @Test
        public void noneBlockCode2() {
            long start = System.nanoTime();
            findPricesParallel("myPhone").forEach(price -> log.info("price: {}", price));
            long end = (System.nanoTime() - start) / 1_000_000;
            log.info("endTime: {}", end);
            // 1025
        }
        private List<String> findPricesParallel(String product) {
            return shops.parallelStream()
                    .map(shop -> String.format("%s price is %.3f", shop.getName(), shop.getPrice(product)))
                    .collect(Collectors.toList());
        }

    4개의 상점에서 가격을 검색하는 코드의 샘플에서 가격을 검색하는 동안 각각 1초의 대기시간이 있으므로 전체 검색 결과 시간은 4초 이상이 걸린다 하지만 이걸 병렬 스트림으로 처리할 경우 4개의 상점에서 병렬로 검색이 진행되기에 더욱더 빠른 속도로 검색을 할 수 있는 코드를 완성할 수 있다.

    확장성이 더 좋은 해결 방법

    병렬 스트림 버전의 코드는 정확히 4개의 상점에 하나의 스레드를 할당해서 4개의 작업을 병렬로 수행하며 시간을 단축시켰다. 반면 작업이 5개로 증가를 하게 된다면 5번째 상점을 처리하는데 추가로 1초가 증가하여 순차 검색 코드에서는 1초가 더 추가가 되어 5000~ 이상의 시간이 소요될 것이며, 병렬 검색 또한 4개의 상점을 동시에 처리 이후 1개의 상점을 처리하기에 기존의 2배의 시간인 2000~ 이상의 시간이 소요될 것이다. 그렇기에 증가하는 상점의수에 맞게 스레드 수를 증가하는 것이 더욱더 높은 확장성 코드가 될 것이다.

     

        @Test
        public void nonBlockExecutorCode() {
            long start = System.nanoTime();
            findPricesAsyncExecutor("myPhone").forEach(price -> log.info("price: {}", price));
            long end = (System.nanoTime() - start) / 1_000_000;
            log.info("endTime:{}", end);
            // 2016
        }
                                                                    // 검색할 상점의 수만큼, 최대 크기
        private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread();
                // 자바에서 일반 스레드가 싱행 중이라면 자바 프로그램은 종료되지 않는다.
                // 반면 데몬 스레드같은경우 자바 프로그램이 종료될때 스레드를 강제로 다운시킬 수 있다.
                t.setDaemon(true);
                return t;
            }
        });
    
        private List<String> findPricesAsyncExecutor(String product) {
            List<CompletableFuture<String>> priceFutures = shops.stream()
                    .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.3f", shop.getName(), shop.getPrice(product), executor)))
                    .collect(Collectors.toList());
    
            return priceFutures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
        }
    

     


     그나마 git : github.com/hodolee246/ModernJavaStudy/blob/master/src/test/java/com/example/modernjava/ch16/ch16AsyncCompletableFuture.java

    댓글

Designed by Tistory.