2.1 리액티브 스트림즈(Reactive Streams)
- 개발자가 리액티브한 코드를 작성하기 위해서는 코드 구성을 용이하게 해주는 리액티브 라이브러리가 필요하다.
- 리액티브 라이브러리를 어떻게 구현할지 정의해 놓은 별도의 표준 사양이 리액티브 스트림즈이다.
- 데이터 스트림을 Non- Blocking이면서 비동기적인 방식으로 처리하기위한 리액티브 라이브러리의 표준사양
- 리액티브 스트림즈를 구현한 구현체로 RxJava, Reactor, Akka Streams, Java9 Flow APi 등이 있다.
2.2 리액티브 스트림즈의 구성 요소
리액티브 스트림즈를 통해 구현해야 하는 API 컴포넌트에는 Publisher, Subscriber, Subscription, Processor가 있다.
Publisher: 데이터를 생성하고, 통지하는 역할을 한다.
Subscriber: 구독한 Publisher로 통지된 데이터를 전달받아서 처리하는 역할을 한다.
Subscription: Publisher에 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할을 한다
Processor: Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉 Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscriber가 구독할 수 있다.
Publisher와 Subscriber 간에 데이터가 전달되는 동작 과정
- 먼저 Subscriber는 전달받을 데이터를 구독합니다.(subscribe)
- 다음으로 publisher는 데이터를 통지할 준비가 되었음을 Subscriber에게 알립니다. (onSubscribe)
- Publisher가 데이터를 통지할 준비가 되었음을 알림을 받은 Subscriber는 전달받기를 원하는 데이터의 개수를 Publisher에게 요청합니다. (Subscription.request)
- 다음으로 Publisher는 Subscriber로 부터 요청받은 만큼의 데이터를 통지합니다. (onNext)
- Publisher와 Subscriber 간에 데이터 통지, 수신, 요청의 과정을 반복하다가 마지막으로 데이터 전송이 완료되었음을 Subscriber에게 통지합니다. (onComplete) Publisher가 데이터 통지시 오류 발생하면 Subscriber에게 에러가 발생했음을 알립니다. (onError)
Subscriber가 왜 Subscription.request를 통해 데이터의 요청 개수를 지정할까?
→ 실제로 Publisher와 Subscriber는 각각 다른 스레드에서 비동기적으로 상호작용 하는 경우가 대부분이다.
Publisher가 데이터를 통지하는 속도가 Publisher로 부터 통지받은 데이터를 Subscriber가 처리하는 속도보다 빠르면 처리를 기다리는 데이터는 쌓이게 되어 시스템 부하 발생
리액티브 시스템 컴포넌트 코드
컴포넌트는 실제 코드에서 인터페이스 형태로 정의되며, 이 인터페이스를 구현해서 해당 컴포넌트를 사용하게 된다.
Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
// <? super T> 하한 경계 와이들 카드를 사용하여, 하위 클래스 제한 (T와 그 조상들만 가능)
- subscribe의 메서드는 파라미터로 전달받은 Subscriber를 등록하는 역할을 한다.
- 왜 publisher에 subscribe 메서드가 정의되어 있을까?
Kafka에서 Pub/Sub 모델
- Publisher와 Subscriber 중간에 메시지 브로커가 있다. 이 브로커 내에 여러개의 토픽이 존재한다.
- Publisher와 Subscriber는 브로커에 있는 특정 토픽을 바라보는 구조로 이루어져 있다.
→ Publisher와 Subscriber는 느슨한 결합 구조로 이루어져 있다.
리액티브 스트림즈에서의 Pub/Sub 모델
- Publisher가 subscribe 메서드의 파라미터인 Subscriber를 등록하는 형태로 구독이 이루어져 있다.
2.3.2 Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
- onSubscribe 메서드는 구독 시작 시점에 어떤 처리를 하는 역할을 한다. Publisher에게 요청할 데이터의 개수를 저장하거나, 구독을 해지한다.
- onNext: publisher가 통지한 데이터를 처리하는 역할을 한다.
- onError: publisher가 데이터 통지를 위한 처리 과정에서 에러가 발생했을 때 해당 에러를 처리하는 역할을 한다.
- onComplete: publisher가 데이터 통지를 완료했음을 알릴 때 호출되는 메서드이다.
2.3.3 Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
- request를 통해서 publisher에게 데이터 개수 요청 가능
- cancel 메서드를 통해서 구독 해지 가능
2.3.4 Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R>
- Subscriber 인터페이스와 Publisher 인터페이스를 다중 상속한다.
- Publisher와 Subscriber의 기능을 모두 가지고 있다.
2.4 리액티브 스트림즈 관련 용어 정리
- Signal
Publisher와 Subscriber 간에 주고받는 상호작용
onSubscribe, onComplete, onError, request 또는 cancel 메서드는 Publisher가 Subscriber에게 보내는 Signal
request와 cancel 메서드는 subscription 인터페이스 코드에 적용되지만, 실 사용 주체는 Subscriber이므로, Subscriber가 Publisher에게 보내는 Signal
- Demand
Publisher가 아직 Subscriber에게 전달하지 않은 Subscriber가 요청한 데이터를 말한다.
- Emit
Publisher가 emit하는 Signal 중에서 데이터를 전달하기 위한 onNext Signal을 데이터를 emit한다고 표현한다.
Upstream/Downstream
public class Example {
public static void main(String[] args) {
Flux
.just(1, 2, 3, 4, 5, 6)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(System.out::println);
}
}
- just 메서드를 사용해서 데이터를 생성한 후, emit하게 되는데 여기서 just 메서드는 리액티브 스트림즈 컴포넌트의 Publisher 역할을 한다.
- 메서드가 하나로 연결된 것처럼 보이는 메서드 체인 방식 사용
- subscribe 제외, 반환 값이 모두 Flux 객체이기 때문에 메서드 호출이 가능해진다.
<aside>
현재 호출한 메서드에서 반환된 Flux의 위치에서 자신보다 더 상위에 있는 Flux는 Upstream, 하위에 있는 Flux는 Downstream이 된다.
</aside>
Sequence
- Sequence는 Publisher가 emit하는 데이터의 연속적인 흐름을 말한다.
public class Example {
public static void main(String[] args) {
Flux
.just(1, 2, 3, 4, 5, 6)
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(System.out::println);
}
}
- Flux를 통하여 데이터 생성 → filtering → map 메서드를 통해 반환하는 과정이 sequence라고 한다.
- 다양한 Operator 로 데이터의 연속적인 흐름을 정의
Operator
- just, filter, map … 같은 메서드를 연산자라고 부른다.
Source
- 최초에 가장 먼저 생성된 무언가 (Data Source, Source Publisher, SourceFlux 등)
2.5 리액티브 스트림즈의 구현 규칙
- Publisher가 Subscriber에게 보내는 onNext signal의 총 개수는 항상 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다.
- Publisher가 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 혹은 onError를 호출하여 구독을 종료할 수 있다.
- Publisher의 데이터 처리가 실패하면 onError signal을 보내야 한다.
- Publisher의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다.
- Publisher가 Subscriber에게 onError 또는onComplete signal을 보내는경우 해당 Subscriber 의 구독 은 취소된 것으로 간주되어야 한다.
- 일단 종료 상태 signal을 받으면 (onError, onComplete) 더이상 signal이 발생되지 않아야 한다.
- 구독이 취소되면 Subscriber는 결국 signal을 받는 것을 중지해야 한다.
Subscriber 구현을 위한 주요 기본 규칙
- Subscriber는 publisher로부터 onNext signal을 수신하기 위해 Subscription.request(n)를 통해 Demand signal을 Publisher에게 보내야 한다.
- Subscriber.onComplete() 및 Subscriber.onError(Throwable T)는 Subscription 또는 Publisher의 메서드를 호출해서는 안된다.
- Subscriber.onComplete( ) 및 Subscriber.onError(Throwable 1)는 signal을 수신한 후 구독이 취소된것으로 간주해야한다.
- 구독이 더이상 필요하지 않은 경우 Subscriber는 Subscription.cancel( )을 호출해야 한다.
- Subscriber.onSubscribe( )는 지정된 Subscriber에 대해 최대 한 번만 호출 되어야 한다.
Subscription 구현을 위한 주요 기본 규칙
- 구독은 Subscriber가 onNext 또는 onSubscribe 내에서 동기적으로 Subscription.request를 호출하도록 허용해야 한다.
- 구독이 취소된 후 추가적으로 호출되는 Subscription.request(long n)는 효력이 없어야 한다.
- 구독이 취소된 후 추가적으로 호출되는 Subscription.cancel()은 효력이 없어야 한다.
- 구독이 취소되지 않은 동안 Subscription.request(long n)의 매개변수가 0보다 작거나 같으면 java.lang.IllegalArgumentException과 함께 onError signal을 보내야 한다.
- 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher가 Subscriber에게 보내는 signal을 결국 중지하도록 요청해야 한다.
- 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다.
- Subscription.cancel(), Subscription.request() 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다.
- Return normally : 유효한 값 이외에는 어떠한 예외도 던지지 않는다는 의미
- 리액티브 시스템즈에서는 예외가 발생하면, 메서드를 호출한 쪽으로 예외를 던지는데, 에외가 발생하면 해당 예외를 onError Signal과 함께 보내도록 규정한다.
- 구독은 무제한 수의 request 호출을 지원해야 하고 최대 2^63-1개의 Demand를 지원해야 한다.
2.6 리액티브 스트림즈 구현체
- RxJava (Reactive Extensions(리액티브 확장))는 .NET 환경의 리액티브 확장 라이브러리를 넷플릭스에서 Java 언어로 포팅하여 만든 JVM 기반의 대표적인 리액티브 확장 라이브러리
- 1.x 버전과 2.0 이후 버전의 근본적인 차이는 Backpressure를 지원하느냐에 갈린다.
Backpressure
- Backpressure은 Publisher가 끊임없이 emit하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 것.
Project Reactor
- Project Reactor는 Spring Framework 팀에의해 주도적으로 개발된 리액티브 스트림즈의 구현체.
- Reactor 3.x가 SpringFramework5 버전부터 리액티브 스택에 포함되어 리액티브 프로그래밍의 핵심 역할을 담당.
Akka Streams
- Akka는 JVM상에서의 동시성과 분산 애플리케이션을 단순화해주는 오픈소스 툴킷
- Actor Model을 사용하며, Actor들 간의 통신은 메시지를 통해서만 이루어지고 Actor들은 서로 독립적이어서, 느슨한 결합과 높은 응집력이 보장된다.
- Akka 툴킷 위에 리액티브 스트림즈를 구현 한 것
Java Flow API
- Java 9부터 Flow API를 사용하여 리액티브 스트림즈를 지원
- Flow API는 Reactor, RxJava, Akka Streams처럼 리액티브 스트림즈를 구현한 구현체가 아니라, 리액티브 스트림즈의 표준사양이 SPI (Service Provider Interface (SPI), 확장기능에 필요한 최소한의 조건을 명시하는 인터페이스)로써 JAVA API에 정의되어 있다.
SPI(Service Provider Interface)
Java에서 사용하는 JDBC(Java Database Connectivity)는 Java에서 Database를 사용할수 있도록 Java 애플리케이션과 Database 를 연결해주는 API입니다.
JDBC API를 사용해서 Database에 저장된 데이터를 조회하고추가, 수정, 삭제를 할수 있습니다. 그런데 JDBC API는 인터페이스로 정의 되었으며 Oracle이나 MySQL등의 Database 벤더들이JDBC인터페이스를 구현한 구현체를 라이브러리 형태로 제공합니다. 여기서 JDBC 인터페이스 구현체를 사용하는 사용자 입장에서는 JDBC가 말 그대로 API가되지만 JDBC인터페이스를 구현해야 하는 벤더 입장에서 JDBC는 SPI가 되는것입니다.
'📗 BOOK > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
4장 함수형 인터페이스 (1) | 2024.11.20 |
---|---|
3장 Blocking I/O vs Non-Blocking I/O (0) | 2024.11.20 |
1장 리액티브 시스템과 리액티브 프로그래밍 (0) | 2024.11.20 |