ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 챕터[17] 리엑티브 프로그래밍
    모던자바인액션 2020. 10. 22. 00:43

    서론

    리액티브 애플리케이션 및 시스템 개발 원칙

    자바 9 Flow API Publisher, Subscriber, Subscirption, Processor


    리액티브 프로그래밍 패러다임의 중요성

     

    수년 전까지 대규모 애플리케이션은 수십 대의 서버, 기가바이트의 데이터, 수초의 응답 시간, 당연히 여겨졌던 몇 시간의 유지보수 시간 등의 특징을 가졌다. 오늘날에는 다음과 같은 적어도 세 가지 이유로 상황이 변하고 있다.

    1. 빅데이터 : 보통 빅테이터는 페타바이트 단위로 구성되며 매일 증가한다.
    2. 다양한 환경 : 모바일 디바이스에서 수천 개의 멀티 코어 프로세서로 실행되는 클라우드 기반 클러스터에 이르기까지 다양한 환경에 애플리케이션이 배포된다.
    3. 사용 패턴 : 사영자는 1년 내내 항상 서비스를 이용할 수 있으며 밀리초단위의 응답 시간을 기대한다.

    리액티브 매니패스토 www.reactivemanifesto.org/

    리액티브 애플리케이션과 시스템 개발의 핵심 원칙을 공식적으로 정의한다.

     

    • 반응성

      1. 리액티브 시스템은 빠를 뿐만 아니라 더 중요한 특징으로 일정하고 예상할 수 있는 반응 시간을 제공한다.

      2. 사용자가 기대치를 가질 수 있다.

      3. 기대치를 통하여 사용자의 확신이 증가하면서 사용할 수 있는 애플리케이션이라는 확인을 제공한다.

    • 회복성

      1. 장애가 발생해도 시스템이 반응하고 복구한다.

      2. 컴포넌트 실행 복제, 여러 컴포넌트의 시간과 공간분리, 각 컴포넌트가 비동기적으로 다른 작업을 다른 컴포넌트에 위임하는 등 다양한 회복성 기법 제공

    • 탄력성

      1. 애플리케이션 생명주기 동안 작업부하를 받게 되어 반응성이 위협받을 경우 자동으로 컴포넌트에 할당된 자원을 늘려준다.

    • 메시지 주도

      1. 회복성과 탄력성을 지원하려면 약한 결합, 고립, 위치, 투명성 등을 지원할 수 있도록 시스템을 구성하는 컴포넌트의 경계를 명확하게 정의해야 한다.

      2. 비동기 메시지를 전달해 컴포넌트 끼리의 통신이 이루어 진다.

      3. 이로 인하여 회복성, 탄력성을 얻을 수 있다.

    애플리케이션 수준의 리액티브

    1. 주요기능은 비동기로 작업을 수행할 수 있는점

    2. 이벤트 스트림을 블록하지 않고 비동기로 처리하는 것이 CPU의 사용률을 극대화 할 수 있는 방법이다. 이 목표를 달성할 수 있도록 리액티브 프레임워크와 라이브러리는 스레드를 퓨처, 액터, 일련의 콜백을 발생시키는 이벤트 루프 등과 공유하고 처리할 이벤트를 변환하고 관리한다.

      1. 개발자는 이로 인하여 애플리케이션 구현의 추상수준을 높이며

      2. 동기 블록, 경쟁 조건, 데드락 같은 저 수준 멀티스레드 문제를 처리할 필요가 없어지며 비지니스 로직 구현에 집중할 수 있다.

    시스템 수준의 리액티브이란?

    1. 리액티브 시스템은 여러 애플리케이션이 한 개의 일관적인, 회복할 수 있는 플랫폼을 구성할 수 있음 뿐만 아니라 다른 애플리케이션이 실패해도 전체 시스템이 운영될 수 있도록 도와주는 소프트웨어 아키텍처 이다.

    2. 주요 속성으로는 메시지 주도를 꼽을 수 있다.

    3. 각 컴포넌트를 완전히 고립하려면 이들이 결합되지 않도록 해야 하며 그래야만 시스템 장애(회복성)와 높은 부하(탄력성)에서도 반응성을 유지할 수 있다.

    4. 리액티브 아키텍처 에서는 컴포넌트에서 발생한 장애를 고립시켜 문제가 퍼저나가는걸 막음으로 회복성을 제공한다. 이러한 맥락에서 회복성은 경함 허용 능력과 같은 의미를 지닌다.

    5. 고립과 비결합이 회복성의 핵심이라면 탄력성의 핵심은 위치 투명성이다. 위치 투명성은 리액티브 시스템의 모든 컴포넌트가 수신자의 위치와 상관없이 다른 서비스와 통신할 수 있음을 의미한다. 그렇기에 별도의 시스템을 복제하지 않아도 현재 작업 부하에 따라 애플리케이션을 확장할 수 있다.

    Flow API docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html

    • Publisher

      • subscribe(Flow.Subscriber<? super T> subscriber) : 지정된 구독자를 추가한다.

    • Subscriber

      • onComplete() : 오류로 인하여 종료되지 않은 구독에 대해 추가 구독자 메서드 호출이 발생하지 않는다는 것이 알려진 경우 호출하는 메서드이며 다른 구독자 메서드가 구독에 의해 호출되지 않는다.

      • onError(Throwable t) : 게시자 또는 구독에서 복구할 수 없는 오류 발생시 호출 되는 메서드이며 다른 구독자 메서드를 호출하지 않는다.

      • onNext(T item) : 구독의 다음 항목으로 호출되는 메서드

      • onSubscribe(Flow.subscription subscription) : 지정된 구독에 대해 다른 구독자 메서드를 호출하기 전에 호출 하는 메서드

    • Subscription

      • cancel() : 구독자가 메시지 수신을 중지하도록 시킨다

      • request(long n) n이 구독에 대하여 충족되지 않은 수요에 주어진 항목 수를 추가한다.

    • Processor

    Flow 인터페이스 규칙

    • Publisher는 반드시 Subscription request 요구된 요소 보다 적은 수를 Subscriber 에게 전달해야 하며, 동작이 성공적으로 끝났으면 onCompleate() 에러가 발생했으면 onError를 호출하여 Subscription을 종료한다. Publisher가 항목을 발생하면 Subscriber가 한 개씩 또는 한 번에 여러 항목을 소비하는데 Subscription이 이 과정을 관리할 수 있도록 Flow 클래스는 관련된 인터페이스와 정적 메서드를 제공한다. Publisher는 수많은 일련의 이벤트를 제공할 수 있지만 Subscriber의 요구사항에 따라 역압력 기법에 의해 이벤트 제공 속도가 제한된다.

    • Subscriber는 요소를 받아 처리할 수 있음을 Publisher에게 알려야 한다 (역압력 행사를 위해) onCompleate(), onError() 신호를 처리하는 상황에서 Subscriber는 Publisher나 Subscription의 어떤 메서드도 호출할 수 없으며 Subscription이 취소되었다고 가정해야한다. Subsciber는 Subscritpion request() 메서드 호출이 없이도 언제든지 종료 시그널을 받을 준비가 되어있어야 하며 Subscription.cancel()이 호출된 이후 한 개이상의 onNext()를 받을 준비가 되어 있어야한다.

    • Publisher와 Subscriber는 정확하게 Subscritpion을 공유해야 하며 각각이 고유한 역할을 수행해야 한다. 그러려면 onsubscribe와 onNext 메서드에서 Subscriber는 request메서드를 동기적으로 호출할 수 있어야 한다. 표준에서는 Subscrition의 cancel 메서드는 몇 번을 호출해도 한 번 호출한 것과 같은 효과를 가져야 하며, 여러 번 이 메서드를 호출해도 다른 추가 호출에 별 영향이 없도록 스레드에 안전해야 한다고 명시한다. 같은 Subscriber 객체에 다시 가입하는 것은 관장하지 않지만 이런 상항에서 예외가 발생해야 한다면 명세서가 강제하진 않는데. 이전에 취소된 가입이 영구적으로 적용되었다면 이후의 기능에 영향을 주지 않을 가능성도 있기 때문이다.

    이벤트는 아래 프로토콜에서 정의한 순서로 호출을 통해 발행 되어야 한다.

    onSubscribe는 맨 처음에 호출 되어야 하며 이어서 onNext가 여러번 호출 가능하며 이후
    이벤트 스트림이 지속되거나 onComplete 콜백을 통해 더 이상 데이터가 없고 종료됨을 알릴 수
    있으며 또는 Publisher에 에러가 발생하여 onError를 호출할 수 있다.
    
    onSubscribe onNext* (onError | onComplete)?

    Subscriber가 Publisher에 자신을 등록할 때 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달한다. Subscription 인터페이스는 메서드 두 개를 정의한다. Subscription은 첫 번째 메서드로 Publisher에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 알릴 수 있다. 두 번째 메서드로는 Subscription을 취소, 즉 Publisher에게 더 이상 이벤트를 받지 않음을 통지한다.

     

    자바에서 Flow API 구현을 제공하지 않는 이유

    • Flow ApI를 만들 당시 Akka, RxJava 등 다양한 리액티브 스트림의 자바 코드 라이브러리가 이미 존재했기에 같은 발행 - 구독 사상에 기반하여 리액티브 프로그래밍을 구현하였지만, 이득 라이브러리는 독립적으로 개발되었고, 서로 다른 이름 규칙과 API를 사용했다.

    • 자바 9의 표준화 과정에서 기존 방법이 아닌 Flow 인터페이스 기반으로 리액티브 개념을 구현하여 쉽게 다양한 라이브러리가 협력할 수 있게 되었다.

    Flow API 이용한 예제

    package com.example.modernjava.ch17;
    
    import java.util.Random;
    // 544p 현재 보고된 온도를 전달하는 자바 빈
    public class TempInfo {
    
        public static final Random random = new Random();
    
        private final String town;
        private final int temp;
    
        public TempInfo(String town, int temp) {
            this.town = town;
            this.temp = temp;
        }
        // 정적 팩토리 메서드를 이용해 해당 도시의 TempInfo 인스턴스를 생성한다.
        public static TempInfo fetch(String town) {
            // 10% 확률로 에러발생
            if(random.nextInt(10) == 0)
                throw new RuntimeException("Error!!!!!");
            return new TempInfo(town, random.nextInt(100));
        }
    
        public String getTown() {
            return town;
        }
    
        public int getTemp() {
            return temp;
        }
    
        @Override
        public String toString() {
            return "TempInfo{" +
                    "town='" + town + '\'' +
                    ", temp=" + temp +
                    '}';
        }
    }
    
    package com.example.modernjava.ch17;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Flow.*;
    
    public class TempSubscription implements Subscription {
    
        // 스택오버플로 해결용 Executor
        private static final ExecutorService executor = Executors.newSingleThreadExecutor();
        private final Subscriber<? super TempInfo> subscriber;
        private final String town;
    
        public TempSubscription(Subscriber<? super TempInfo> subscriber, String town) {
            this.subscriber = subscriber;
            this.town = town;
        }
    
        @Override
        public void request(long n) {
            // 다른 스레드에서 다음 요소를 구독자에게 전송한다.
            executor.submit( () -> {
                // Subscriber가 만든 요청을 하나씩 반복한다.
                for(long i = 0L; i < n; i++) {
                    try {
                        // 현재 온도를 Subscriber에게 전달
                        subscriber.onNext(TempInfo.fetch(town));
                    } catch (Exception e) {
                        // error 발생시 Subscriber에게 에러 전송
                        subscriber.onError(e);
                        break;
                    }
                }
            });
        }
    
        @Override
        public void cancel() {
            // 구독이 취소되면 완료 신호를 Subscription에게 전송
            subscriber.onComplete();
        }
    }
    
    package com.example.modernjava.ch17;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.Flow.*;
    
    @Slf4j
    public class TempSubscriber implements Subscriber<TempInfo> {
    
        private Subscription subscription;
    
        @Override
        public void onSubscribe(Subscription subscription) {
            // 구독을 저장한다.
            this.subscription = subscription;
            // 첫 번째 요청을 전달한다.
            subscription.request(1L);
        }
    
        @Override
        public void onNext(TempInfo tempInfo) {
            // 수신정보 출력 후 다음 정보를 요청한다.
            log.info("TempInfo: {}", tempInfo);
            subscription.request(1L);
        }
    
        @Override
        public void onError(Throwable throwable) {
            // 에러가 발생할 경우
            log.error(throwable.getMessage());
        }
    
        @Override
        public void onComplete() {
            // 전부 종료되면
            log.info("Done!!");
        }
    }
    

    Processer를 이용한 구현

    package com.example.modernjava.ch17;
    
    import java.util.concurrent.Flow.*;
    // TempInfo를 다른 TempInfo로 변환하는 프로세스
    public class TempProcessor implements Processor<TempInfo, TempInfo> {
    
        private Subscriber<? super TempInfo> subscriber;
    
        @Override
        public void subscribe(Subscriber<? super TempInfo> subscriber) {
            this.subscriber = subscriber;
        }
    
        @Override
        public void onSubscribe(Subscription subscription) {
            subscriber.onSubscribe(subscription);
        }
    
        @Override
        public void onNext(TempInfo temp) {
            subscriber.onNext(new TempInfo(temp.getTown(), temp.getTemp() -32 * 5 / 9));
        }
    
        @Override
        public void onError(Throwable throwable) {
            subscriber.onError(throwable);
        }
    
        @Override
        public void onComplete() {
            subscriber.onComplete();
        }
    }
    
        @Test
        public void subscribeTempInfo() {
            getTemperatures("New York").subscribe(new TempSubscriber());
        }
    
        @Test
        public void subscribeProcessTempInfo() {
            getCelsiusTemperatures("New York").subscribe(new TempSubscriber());
        }
    
        private static Publisher<TempInfo> getTemperatures(String town) {
            return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
        }
    
        public static Publisher<TempInfo> getCelsiusTemperatures(String town) {
            return subscriber -> {
                TempProcessor processor = new TempProcessor();
                processor.subscribe(subscriber);
                processor.onSubscribe(new TempSubscription(processor, town));
            };
        }

    RXJava

    RXJava는 자바로 리액티브 애플리케이션을 구현하는 데 사용하는 라이브러리이다.

     


    github : github.com/hodolee246/ModernJavaStudy/blob/master/src/main/java/com/example/modernjava/ch17/TempSubscription.java

    댓글

Designed by Tistory.