Reactor는 Reative Streams 명세를 기반으로 JVM에서 반응성 Non-Blocking 애플리케이션을 생성하기 위한 JAVA 라이브러리입니다.
처음에는 이 라이브러리로 작업하는것이 어려울 수 있습니다. 이 시리즈는 Mono 및 Flux 클래스를 통해 Reactor를 제공하는 Reactive Streams의 실행을 생성, 조작, 관리하는 과정을 설명합니다.
Mono와 Flux는 모두 반응성 스트림이지만 표현하는 것이 다릅니다.
- Mono: 0개 또는 1개의 요소의 Stream
- Flux: 0개 ~ N개의 요소의 Stream
예를들어 HTTP 서버에 요청을 하면 응답이 없거나 한 개의 응답을 받으므로 이 경우 Flux보다는 Mono가 더 적절합니다. 반대로 어떤 구간에서 수학적인 함수의 결과를 계산한다면, 그 구간에서 숫자당 하나의 결과가 나올것을 예샹하므로 이 경우에는 Flux가 더 적절합니다.
Mono And Flux are Lazy
지연은 Reactive Stream의 속성중 하나입니다. 이는 얼마나 많은 함수들이 Stream을 함수로 호출하던지 간에 이를 소비하기전까지는 실행되지 않는다는 것입니다.
Mono와 Flux에서 이를 소비하는 메서드는 .subscribe() 입니다. 이를 호출하기 전까지 Mono와 Flux는 어떠한 동작도 수행하지 않습니다.
Flux를 생성하는 일반적인 방법
Flux를 생성하는데는 몇 가지 방법이 있습니다. 아래의 코드는 Flux를 사용하기 위해 일반적으로 사용하는 방법입니다.
//1, 2, 3의 값을 가지는 Flux를 생성합니다.
Flux<Integer> integerFlux = Flux.just(1, 2, 3);
//"Hello", "foo", "bar"값을 가지는 Flux를 생성합니다.
Flux<String> stringFlux = Flux.just("Hello", "foo", "bar");
//Iterable한 요소로부터 Flux를 생성합니다. 여기서는 List가 사용되었습니다.
List<String> stringList = Arrays.asList("Hello", "foo", "bar");
Flux<String> fluxFromList = Flux.fromIterable(stringList);
//Java Stream에서도 동일하게 동작합니다.
Stream<String> stringStream = stringList.stream();
Flux<String> fluxFromStream = Flux.fromStream(stringStream);
//범위로 Flux 만들기
Flux<Integer> rangeFlux = Flux.range(1, 5); // -> Flux(1, 2, 3, 4, 5)
//100ms 마다 새로운 값을 만들어서 Flux를 생성한다. 값은 1부터 시작하여 증가한다.
Flux<Integer> intervalFlux = Flux.interval(Duration.ofMillis(100));
//다른 Flux또는 Mono로부터 생성
Flux<String> fluxCopy = Flux.from(fluxFromList);
Mono를 생성하는 일반적인 방법
just를 제외하고는 Mono와 Flux는 다른 메서드를 가집니다.
//"Hello World!"를 가지는 Mono생성
Mono<String> helloWorld = Mono.just("Hello World");
//빈 Mono 생성
Mono<T> empty = Mono.empty();
//Callable로 부터 Mono 생성
Mono<String> helloWorldCallable = Mono.fromCallable(() -> "Hello World!");
Mono<User> user = Mono.fromCallable(UserService::fetchAnyUser);
//Future로 Mono 생성
CompletableFuture<String> helloWorldFuture = MyApi.getHelloWorldAsync();
Mono<String> monoFromFuture = Mono.fromFuture(helloWorldFuture);
//supplier로 부터 Mono생성
Ramdom rand = new Ramdom();
Mono<Double> MonoFromSupplier = mono.fromSupplier(rand::nextDouble);
//다른 Mono또는 Flux로 생성
Mono<Double> monoCopy = Mono.from(monoFromSupplier);
Mono<Integer> monoFromFlux = Mono.from(Flux.range(1, 10));
Flux나 Mono를 생성하는 일반적인 메서드
Mono와 Flux는 이를 생성하기 위한 3개의 메서드를 함께 가지고 있습니다.
1) Error
Mono.error(Throwable t)와 Flux.error(Throwable t) 는 Reactive Stream을 사용하면서 에러를 다루는데 매우 유용합니다.
2) Defer
Mono.defer(Supplier<? extends Mono<? extends T>>)는 Mono.fromCallable과 매우 유사합니다. 그러나 fromCallable은 T 타입을 반환하는 반면에 defer는 Mono를 반환한다는 차이가 있습니다.
또한 만약 예외가 발생하는 코드 호출시, defer는 개발자가 스스로 이를 잡는 코드를 작성해야 하지만, fromCallable의 경우에는 내부적으로 Mono.error를 호출하게 됩니다.
- 두 방법 모두 동일한 결과를 제공합니다.
- defer 메서드가 좀더 장황하지만 예외를 다른 종류의 Mono로 다시 매핑하거나 새로운 예외를 생성할 수 있으므로 코드를 더 쉽고 이해하고 유연성을 높일 수 있습니다.
//두 예제는 같은 결과를 가져다줍니다.
Integer getAnyInteger() throws Exception {
throw new RuntimeException("An error as occured for no reason");
}
//두 메서드의 비교
Mono<Integer> fromCallable = Mono.fromCallable(this::getAnyInteger);
// result -> Mono.error(RuntimeException("An error as occured for no reason."))
Mono<Integer> defer = Mono.defer(() -> {
try {
Integer res = this.getAnyInteger();
return Mono.just(res);
} catch (Exception e) {
return Mono.error(e);
}
})
// result -> Mono.error(RuntimeException("An error as occured for no reason"))
3) Create
create(Consumer<MonoSink> callback) 메서드는 우리가 위에서 살펴본 방식보다 더 low-level의 방식입니다. 이 방식은 Mono와 Flux의 내부 신호를 다룹니다.
Mono
Integer getAnyInteger() throws Exception {
throw new RuntimeException("An error as occured for no reason.");
}
Mono<Integer> = Mono.create(callback -> {
try { callback.success(this.getAnyInteger()); }
catch (Exception e) { callback.error(e); }
});
Flux
Flux<Double> flux = Flux.create(emitter -> {
Random rnd = new Random();
for(int i = 0; i <= 10; i++) emitter.next(rnd.nextDouble());
int random = rnd.nextInt(2);
if(random < 1) emitter.complete();
else emitter.error(new RuntimeException("Bad luck, you had one change out of 2 to complete Flux));
});
Mono와 Flux를 사용(소비)하는 방법은?
위에서 Flux와 Mono를 만드는 방법을 보았으므로, 이들이 보유한 값을 사용하는 방법을 보겠습니다. 이 작업은 스트림을 소비하는 작업입니다.
1) Mono
Mono.just("Hello World.").subscribe(
successValue -> System.out.println(successValue),
error -> System.error.println(error.getMessage()),
() -> System.out.println("Mono consumed.")
);
결과
- 성공시
Hello World!
Mono consumed.
- 에러 발생시
**the Error Message**
Mono consumed.
2) Flux
Flux에서 데이터와 그에 대한 처리는 순차적으로 진행됩니다. 이는 아래의 경우 1, 2, 3, 4, 5 가 순차적으로 처리됨을 의미합니다. 따라서 값 3이 2보다 먼저 표시되지 않습니다.
Flux.range(1, 5).subscribe(
successValue -> System.out.println(successValue),
error -> System.out.println(error.getMessage()),
() -> System.out.println("Flux consumed.")
)
결과
- 성공시
1
2
3
4
5
Flux comsumed.
- 실패시
1
2
3
// 이번에는 Flux comsumed. 가 출력되지 않는다.
// Flux는 에러가 발생하여 미래의 값을 다루는 것이 멈추면(모든 작업이 완전히 소비되지 않으면) Flux comsumed. 가 출력되지 않는다.
안좋은 예
Flux와 Mono가 호출하지 말아야할 메서드를 소개합니다.
Mono에는 block() 메서드가 있습니다. 이를 호출하면서 메서드는 현재 동작중인 쓰레드를 차단합니다. 따라서 이후 프로그램이 더 이상 반응하지 않습니다. 따라서 이를 사용하지 않고 문제를 해결하는 방법을 찾는것을 권장합니다.
Flux에는 이에 해당하는 것이 blockFirst(), blockFirst(Duration timeout) 및 blockLast() , blockLast(Duration timeout)
참조
https://pearlluck.tistory.com/730
Reactor Java #1 How to create Mono and Flux ? | by Antoine Cheron | Medium
'web' 카테고리의 다른 글
[JAVA] Stream for each vs for-loop (0) | 2024.06.11 |
---|---|
[WEB] 블로킹/논블로킹 and 동기 / 비동기 (0) | 2023.11.23 |
[MSA] Spring Cloud를 사용해보자(3)-API Gateway (1) | 2023.11.22 |
[WEB] Reactive Streams (1) | 2023.11.08 |
HTTP 인증에 대한 처리 방식 (0) | 2023.08.22 |