정의


non-blocking, backPressure 를 이용해서 비용기 서비스를 할 때 기본이 되는 스팩이다. Java RxJava, Spring5 Webflux의 Core 에 있는 ProjectReactor 프로젝트 모두 해당 스팩을 사용하고 있습니다.

따라서 비동기 프로젝트를 잘 이해하기 위해서 기본 스펙이 되는 Reactive Stream 에 대한 이해가 필요하다.

 

 

 

개요


non-blocking backpressure 를 이용하여 비동기 스트림 처리의 표준을 제공하는 것

 

 

 

목적, 설계, 범위


지속적으로 들어오는 스트림 데이터를 효율적으로 처리하기 위해서는 비동기 시스템이 효과적이다. 비동기 처리를 하면서 가장 중요한 문제는 데이터 처리가 목적지의 리소스 소비를 예측가능한 범위에서 신중하게 제어할 수 있어야 한다는 것이다. 네크워크를 통한 서버간의 협업 또는 단일 서버에서 컴퓨팅 리소스를 동시에 사용할 때 주로 씁니다.

Reactive Stream 의 주된 목적은 비동기의 경계를 명확히하여 스트림 데이터의 교환을 효과적으로 관리하는 것에 있음. 즉, 비동기로 데이터를 처리하는 시스템이 어느정도의 data 가 들어올 지 예측가능하도록 하는 것.

Reactive Stream 에서는 BackPressure가 이를 달성할 수 있게 해주는 중요한 부분이다.

  • Java 가 제공하는 Flow API 의 구현체인 SubmissionPublisher 라이브러리를 예시로 들자면, 비동기 요청으로 받는 쪽(subscriber)에서 요청을 보내는 쪽(publisher)에 “n개의 데이터 처리가 가능한 상태입니다" 하고 알림을 주면 publisher 가 전송할 데이터가 있을 때 subscriber 에게 데이터를 보냅니다.

Reactive Stream은 다음과 같은 스트림 지향 라이브러리에 대한 표준 및 사양입니다.

  1. 잠재적으로 무한한 숫자의 데이터 처리
  2. 순서대로 처리
  3. 컴포넌트간에 데이터를 비동기적으로 전달
  4. backpressure 를 이용한 데이터 흐름제어

 

 

 

BackPressure (배압)


리액티브 선언문

한 컴포넌트가 부하를 이겨내기 힘들 때, 시스템 전체가 합리적인 방법으로 대응해야 한다.

즉, 컴포넌트가 대처할 수 없고 장애가 발생해선 안 되기 때문에 컴포넌트는 상류 컴포넌트들에 자신이 과부하 상태라는 것을 알려 부하를 줄이도록 해야 한다.

이러한 배압은 시스템이 부하로 인해 무너지지 않고 정상적으로 응답할 수 있게 하는 중요한 피드백 방법이다. 배압이 사용자에게까지 전달되어 응답성이 떨어질 수 있지만, 이 메커니즘은 부하에 대한 시스템의 복원력을 보장하고 시스템 자체가 부하를 분산할 수 있는 자원을 제공할 수 있는지 정보를 제공할 것이다.

 

 

 

API Components


Reactive Stream API 의 구성요소는 아래와 같다.

  1. Publisher
  2. Subscriber
  3. Subscription
  4. Processor

Publisher 는 무한한 data를 제공합니다. 제공된 data 는 Subscriber 가 구독하는 형식으로 처리됩니다. Publisher.subscribe(Subscriber) 의 형식으로 data 제공자와 구독자가 연결을 맺게 됩니다.

그리고 호출되는 순서는 아래와 같습니다.

  • onSubscribe
    • publisher 가 생산하는 data를 Subscriber 가 받을 준비가 되어 있다는 의미이며, onNext로 데이터를 수신합니다.
  • onNext
    • 데이터를 수신한다는 의미입니다.
  • onError
    • 실패가 있는 경우 호출됩니다.
  • onComplete
    • 더 이상 사용할 수 있는 신호가 없을 경우 Publisher 는 onComplete 을 호출합니다.

 

 

 

API Components 명세서


public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Publisher 를 Subscriber 가 구독할 수 있도록 메서드를 제공합니다.

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscriber 는 Subscription 를 등록해야하고, Subscription 에서 오는 신호에 따라서 동작합니다. Subscriber 에게 오는 신호는 onNext, onError, onComplete 가 있습니다.

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Subscription 은 Publisher 와 Subscriber 사이에서 중계하는 역할을 합니다.

request 메서드는 Subscriber 가 Publisher 에게 데이터를 요청하는 개수이며 (요청할 수 있다는 준비상태를 알림) cancel 은 구독을 취소하겠다는 의미입니다.

위 interface 를 토대로 아래와 같은 flow 를 만들 수 있습니다.

Reactive Stream Flow,  출처 : https://grokonez.com/java/java-9-flow-api-reactive-streams

  1. Publisher 에 본인이 소유할 Subscription 를 구현하고 publishing 할 data 를 만듭니다.
  2. Publisher 는 subscribe() 메서드를 통해서 Subscriber 를 등록합니다.
  3. Subscriber 는 onSubscribe() 메서드를 통해서 Subscription 을 등록하고 Publisher 를 구독하기 시작합니다. 이는 Publisher 에 구현된 Subscription 을 통해서 이루어집니다. 이렇게 하면 Publisher 와 Subscriber는 Subscription 을 통해 연결된 상태가 됩니다. onSubscribe() 내부에서 Subscription 의 request() 를 요청하면 그 때부터 data 에 대한 구독이 시작됩니다.
  4. Subscriber 는 Subscription 메서드의 request() 또는 cancel() 을 호출해서 data 흐름을 제어할 수 있습니다.
  5. Subscription 의 request() 에는 조건에 따라 Subscriber 의 onNext(), onComplete() 또는 onError()를 호툴합니다. 그러면 Subscriber 의 해당 메서드 로직에 따라서 request() 또는 cancel() 로 제어하게 됩니다.

 

 

 

샘플 예제

 

public class Sample {
    public static void main(String[] args) {
				Flow.Publisher<String> publisher = new SubmissionPublisher<>();

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 구독을 수행합니다.
                System.out.println("구독");
                // Subscription 를 등록합니다.
                this.subscription = subscription;
                // 1개의 데이터를 요청합니다.
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // Subscriber 가 데이터를 받으면 next 가 호출됩니다.
                System.out.println("onNext call");
                System.out.println(item);
                // 다음 메세지를 호출할 수 있는 상태인지 Subscription 을 통해서 Publisher 에게 알립니다.
                subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("completed");
            }
        };

        publisher.subscribe(subscriber);
        ((SubmissionPublisher)publisher).submit("submit1");
        ((SubmissionPublisher)publisher).submit("submit2");
        ((SubmissionPublisher)publisher).submit("submit3");
        ((SubmissionPublisher)publisher).submit("submit4");
        ((SubmissionPublisher)publisher).close();
        System.out.println("끝");

    }
}

 

 

 

결과

 

끝
구독
onNext call
submit1
onNext call
submit2
onNext call
submit3
onNext call
submit4
completed

보면 비동기로 요청이 일어나기 때문에 “끝" 이라는 메세지가 먼저 호출된 것을 알 수 있다.

구독 후에 publisher 에게 메세지를 submit 하면 Publisher는 Subscriber 에게 onNext 신호를 주고 데이터 처리를 진행합니다.

 

 

 

 

참고

https://sabarada.tistory.com/98

 

[Java] Reactive Stream 이란?

reactive stream이란 non-blocking(넌블럭킹) backPressure(역압)을 이용하여 비동기 서비스를 할 때 기본이 되는 스펙입니다. java의 RxJava, Spring5 Webflux의 Core에 있는 ProjectReactor 프로젝트 모두 해당..

sabarada.tistory.com