본문 바로가기
web

[WEB] Reactive Streams

by 뽀리님 2023. 11. 8.

Reactive Streams?

  • 리액티브 스트림은 자바와 다른 언어에서 사용되는 프로그래밍 패러다임으로, 데이터 스트림을 논블로킹(non-blocking) 및 비동기(asynchronous) 방식으로 처리하는 데 중점을 둔다
  • 이 접근 방식은 대량의 데이터 또는 높은 동시성 수준을 처리할 때 리소스를 보다 효율적으로 사용하고 성능을 향상시킬 수 있다
  • 리액티브 프로그래밍은 실시간 데이터(예: 실시간 피드 또는 사용자 상호 작용)를 다룰 때 특히 유용하다.
  • 전통적인 순차 또는 명령형 프로그래밍에서는 코드가 단계별로 실행되며, 이전 작업이 완료될 때까지 각 작업이 대기한다. 이는 I/O 바운드 작업이나 오래 실행되는 작업을 처리할 때 비효율적일 수 있다.
  • 반면에 논블로킹(non-blocking) 실행은 작업이 완료될 때까지 기다리는 동안 다른 작업을 계속 처리할 수 있도록 한다. 이는 콜백(callbacks), 프로미스(promises) 또는 다른 비동기 프로그래밍 구성 요소를 사용하여 이루어진다

Java Reactive Streams 구현체

  • 자바에서는 리액티브 스트림 API가 java.util.concurrent.Flow 패키지의 일부로 구현되어 있으며, 네 가지 주요 인터페이스가 포함되어 있다:
  • 자바에서 리액티브 스트림을 사용하려면 Project Reactor 또는 RxJava 같은 인기 라이브러리를 사용할 있으며, 이러한 라이브러리는 리액티브 데이터 스트림을 처리하는 사용되는 고수준 추상화 유틸리티 메소드를 제공한다

 

Reactive 구성요소

리액티브 스트림을 통해 구현해야 되는 API 컴포넌트에는 Publisher, Subscriber, Subscription, Processor 가 있다. 이 4개의 컴포넌트를 반드시 기억해야한다. 아무래도 이 4개의 역할이 헷갈리면 전체적인 동작과정까지도 헷갈리게되기 때문에 확실히 짚고 넘어가자.

컴포넌트 설명
Publisher 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
Subsriber 구독한 Publisher로부터 통지된 데이터를 전달받아서 처리한다.
Subscription Publisher 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할을 한다.
Processor Publisher Subscriber 기능을 모두 가지고 있다. , Subscriber로서 다른 Publisher 구독할 있고, Publisher로서 다른 Subscriber 구독할 있다. 주로 발행자와 최종 구독자 사이에서 중간 로직을 처리하기 위해 사용된다

 

출처 https://devfunny.tistory.com/928

 

 

그림으로 자세히 설명된게 있어 가져왔다.

1. 데이터를 구독한다. (subscribe)
먼저 Subscriber는 전달받을 데이터를 구독한다.
 
2. 데이터를 통지할 준비가 되었음을 알린다. (onSubscribe)
Publisher는 데이터를 통지할 준비가 되었음을 Subscriber에 알린다.
 
3. 전달 받을 통지 데이터 개수를 요청한다. (Subscription.request)
Publisher가 데이터를 통지할 준비가 되었다는 알림을 받은 Subscriber는 전달받기를 원하는 데이터의 개수를 
Publisher에게 요청한다.
 
▶ 데이터의 요청 개수를 지정하는 이유가 뭘까?
Subscriber가 Subscription.reuqest를 통해 데이터의 요청 개수를 지정한다.
이는 실제로 Publisher와 Subscriber는 각각 다른 스레드에서 비동기적으로 상호작용하는 경우가 대부분이기 때문이다. 
Publisher의 데이터 통지 속도 > 통지된 데이터를 처리하는 Subscriber 속도 
간단하게 생각해보면 데이터가 전송되는 속도가 데이터가 처리되는 속도보다 빠르면 계속해서 처리해야하는 
데이터가 밀리기 때문인 것이다.
 
4. 데이터를 생성한다.
5. 요청 받은 개수만큼 데이터를 통지한다. (onNext)
Publisher는 Subscriber로부터 요청받은 만큼의 데이터를 통지한다.
 
6. 데이터 처리를 완료할때까지 위 3~5번의 과정을 반복한다.
이렇게 Publisehr와 Subscriber 간에 데이터 통지, 데이터 수신, 데이터 요청의 과정을 반복한다.
 
7. 완료 또는 에러가(onError) 발생할때까지 데이터 생성, 통지, 요청을 계속한다.
8. 데이터 통지가 완료되었음을 알린다. (onComplete)
반복하다가 Publisher가 모든 데이터를 통지하게 되면 마지막으로 데이터 전송이 완료되었음을 Subscriber에게 알린다.
만약에 Publisher가 데이터를 처리하는 과정에서 에러가 발생하면 에러가 발생했음을 Subscriber에게 알린다.

 

 

 

예제

public static void main(String[] args) {
    Flux<Integer> numbers = Flux.range(1, 10); // 1부터 10까지의 숫자 스트림 생성
    numbers.subscribe(System.out::println); // 스트림에 구독하고 각 숫자를 출력
}

 

numbers.subscribe()는 구독자 또는 콜백 집합(예: System.out::println)을 사용하여 데이터 스트림 수명 주기의 다양한 측면을 처리하는 메서드이다.

numbers.subscribe(System.out::println)를 호출하면 System.out.println에 대한 메서드 참조를 전달한다.

이는 onNext() 메서드를 System.out.println으로 설정한 구독자를 정의하는 것과 동일하다. onNext() 메서드는 발행자가 새 항목을 내보낼 때마다 호출된다.

 

다음은 내부에서 발생하는 동작이다:

  1. subscribe를 호출하면 제공된 콜백(이 경우 onNext를 위한 System.out::println)을 사용하여 구독자 객체가 생성된다.
  2. 구독자는 발행자(이 경우 Flux 인스턴스)에 구독한다.
  3. 발행자는 발행자와 구독자 사이의 연결을 나타내는 구독 객체를 생성한다. 이 구독 객체는 구독자의 onSubscribe 메서드에 전달된다.
  4. 구독자는 구독 객체에서 request(n)을 호출하여 발행자로부터 항목을 요청한다. 여기서 n은 처리할 준비가 된 항목 수이다. 기본적으로 Reactor의 Flux는 제한 없는 요청 전략을 사용하여 Long.MAX_VALUE 항목을 요청한다.
  5. 발행자는 구독자에게 항목을 내보낸다. 각 항목에 대해 구독자의 onNext 메서드(이 경우 System.out::println)를 호출한다. 이로 인해 숫자가 콘솔에 출력된다.
  6. 발행자가 모든 항목을 내보낸 후에는 구독자의 onComplete 메서드를 호출하여 스트림이 끝났음을 알린다. 처리 중 오류가 발생하면 발행자는 관련 예외와 함께 구독자의 onError 메서드를 호출한다.
  • 리액티브 프레임워크는 작업이 논블로킹, 비동기 방식으로 이루어지도록 보장하여 데이터 스트림 처리를 효율적으로 수행한다.

 

 

대충 간단한 개념만 짚어봤다.

사실 아직도 잘모르겠다 ㅠㅠ..  개념이 너무 어렵다.

실제로 적용해서 써보지 않는한 예제코드를 봐도 사실 모르기에....

많은 연습이 필요할 거  같다.

 

참조

https://devfunny.tistory.com/928

https://velog.io/@cjstk3221/%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C-%EC%8A%A4%ED%8A%B8%EB%A6%BC%EC%A6%88Reactive-Streams%EB%9E%80

https://velog.io/@sangmin7648/Reactive-Streams