Reactive Stream 이란?, backPressure 이란?
정의
non-blocking, backPressure 를 이용해서 비용기 서비스를 할 때 기본이 되는 스팩이다. Java RxJava, Spring5 Webflux의 Core 에 있는 ProjectReactor 프로젝트 모두 해당 스팩을 사용하고 있습니다.
따라서 비동기 프로젝트를 잘 이해하기 위해서 기본 스펙이 되는 Reactive Stream 에 대한 이해가 필요하다.
개요
non-blocking backpressure 를 이용하여 비동기 스트림 처리의 표준을 제공하는 것
목적, 설계, 범위
지속적으로 들어오는 스트림 데이터를 효율적으로 처리하기 위해서는 비동기 시스템이 효과적이다. 비동기 처리를 하면서 가장 중요한 문제는 데이터 처리가 목적지의 리소스 소비를 예측가능한 범위에서 신중하게 제어할 수 있어야 한다는 것이다. 네크워크를 통한 서버간의 협업 또는 단일 서버에서 컴퓨팅 리소스를 동시에 사용할 때 주로 씁니다.
Reactive Stream 의 주된 목적은 비동기의 경계를 명확히하여 스트림 데이터의 교환을 효과적으로 관리하는 것에 있음. 즉, 비동기로 데이터를 처리하는 시스템이 어느정도의 data 가 들어올 지 예측가능하도록 하는 것.
Reactive Stream 에서는 BackPressure가 이를 달성할 수 있게 해주는 중요한 부분이다.
- Java 가 제공하는 Flow API 의 구현체인 SubmissionPublisher 라이브러리를 예시로 들자면, 비동기 요청으로 받는 쪽(subscriber)에서 요청을 보내는 쪽(publisher)에 “n개의 데이터 처리가 가능한 상태입니다" 하고 알림을 주면 publisher 가 전송할 데이터가 있을 때 subscriber 에게 데이터를 보냅니다.
Reactive Stream은 다음과 같은 스트림 지향 라이브러리에 대한 표준 및 사양입니다.
- 잠재적으로 무한한 숫자의 데이터 처리
- 순서대로 처리
- 컴포넌트간에 데이터를 비동기적으로 전달
- backpressure 를 이용한 데이터 흐름제어
BackPressure (배압)
한 컴포넌트가 부하를 이겨내기 힘들 때, 시스템 전체가 합리적인 방법으로 대응해야 한다.
즉, 컴포넌트가 대처할 수 없고 장애가 발생해선 안 되기 때문에 컴포넌트는 상류 컴포넌트들에 자신이 과부하 상태라는 것을 알려 부하를 줄이도록 해야 한다.
이러한 배압은 시스템이 부하로 인해 무너지지 않고 정상적으로 응답할 수 있게 하는 중요한 피드백 방법이다. 배압이 사용자에게까지 전달되어 응답성이 떨어질 수 있지만, 이 메커니즘은 부하에 대한 시스템의 복원력을 보장하고 시스템 자체가 부하를 분산할 수 있는 자원을 제공할 수 있는지 정보를 제공할 것이다.
API Components
Reactive Stream API 의 구성요소는 아래와 같다.
- Publisher
- Subscriber
- Subscription
- Processor
Publisher 는 무한한 data를 제공합니다. 제공된 data 는 Subscriber 가 구독하는 형식으로 처리됩니다. Publisher.subscribe(Subscriber) 의 형식으로 data 제공자와 구독자가 연결을 맺게 됩니다.
그리고 호출되는 순서는 아래와 같습니다.
- onSubscribe
- publisher 가 생산하는 data를 Subscriber 가 받을 준비가 되어 있다는 의미이며, onNext로 데이터를 수신합니다.
- onNext
- 데이터를 수신한다는 의미입니다.
- onError
- 실패가 있는 경우 호출됩니다.
- onComplete
- 더 이상 사용할 수 있는 신호가 없을 경우 Publisher 는 onComplete 을 호출합니다.
API Components 명세서
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Publisher 를 Subscriber 가 구독할 수 있도록 메서드를 제공합니다.
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscriber 는 Subscription 를 등록해야하고, Subscription 에서 오는 신호에 따라서 동작합니다. Subscriber 에게 오는 신호는 onNext, onError, onComplete 가 있습니다.
public interface Subscription {
public void request(long n);
public void cancel();
}
Subscription 은 Publisher 와 Subscriber 사이에서 중계하는 역할을 합니다.
request 메서드는 Subscriber 가 Publisher 에게 데이터를 요청하는 개수이며 (요청할 수 있다는 준비상태를 알림) cancel 은 구독을 취소하겠다는 의미입니다.
위 interface 를 토대로 아래와 같은 flow 를 만들 수 있습니다.
Reactive Stream Flow, 출처 : https://grokonez.com/java/java-9-flow-api-reactive-streams
- Publisher 에 본인이 소유할 Subscription 를 구현하고 publishing 할 data 를 만듭니다.
- Publisher 는 subscribe() 메서드를 통해서 Subscriber 를 등록합니다.
- Subscriber 는 onSubscribe() 메서드를 통해서 Subscription 을 등록하고 Publisher 를 구독하기 시작합니다. 이는 Publisher 에 구현된 Subscription 을 통해서 이루어집니다. 이렇게 하면 Publisher 와 Subscriber는 Subscription 을 통해 연결된 상태가 됩니다. onSubscribe() 내부에서 Subscription 의 request() 를 요청하면 그 때부터 data 에 대한 구독이 시작됩니다.
- Subscriber 는 Subscription 메서드의 request() 또는 cancel() 을 호출해서 data 흐름을 제어할 수 있습니다.
- Subscription 의 request() 에는 조건에 따라 Subscriber 의 onNext(), onComplete() 또는 onError()를 호툴합니다. 그러면 Subscriber 의 해당 메서드 로직에 따라서 request() 또는 cancel() 로 제어하게 됩니다.
샘플 예제
public class Sample {
public static void main(String[] args) {
Flow.Publisher<String> publisher = new SubmissionPublisher<>();
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 구독을 수행합니다.
System.out.println("구독");
// Subscription 를 등록합니다.
this.subscription = subscription;
// 1개의 데이터를 요청합니다.
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// Subscriber 가 데이터를 받으면 next 가 호출됩니다.
System.out.println("onNext call");
System.out.println(item);
// 다음 메세지를 호출할 수 있는 상태인지 Subscription 을 통해서 Publisher 에게 알립니다.
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("completed");
}
};
publisher.subscribe(subscriber);
((SubmissionPublisher)publisher).submit("submit1");
((SubmissionPublisher)publisher).submit("submit2");
((SubmissionPublisher)publisher).submit("submit3");
((SubmissionPublisher)publisher).submit("submit4");
((SubmissionPublisher)publisher).close();
System.out.println("끝");
}
}
결과
끝
구독
onNext call
submit1
onNext call
submit2
onNext call
submit3
onNext call
submit4
completed
보면 비동기로 요청이 일어나기 때문에 “끝" 이라는 메세지가 먼저 호출된 것을 알 수 있다.
구독 후에 publisher 에게 메세지를 submit 하면 Publisher는 Subscriber 에게 onNext 신호를 주고 데이터 처리를 진행합니다.
참고
https://sabarada.tistory.com/98