이번 시간에는 Reactor Sequence 상에서 발생하는 signal의 전파 흐름을 알아보도록 하겠습니다.
일반적인 Signal의 전파 흐름
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class ReactorSignalEventPropagationExample01 {
public static void main(String[] args) {
Flux
.range(1, 1)
.filter(n -> n > 0)
.map(n -> n * 2)
.subscribe(data -> log.info("# onNext: {}", data));
}
}
코드 1-1 일반적인 signal 이벤트 흐름
|
코드 1-1은 Reactor Sequence 상에서 발생하는 signal의 전파 흐름을 확인하기 위한 예제 코드입니다.
예제 코드 상에는 log()
Operator가 빠져있지만 range()
, filter()
, map()
Operator 하단에 각각 log()
Operator를 추가해서 실행하면 아래와 같은 로그를 출력합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
18:51:02.355 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
18:51:02.369 [main] INFO log.range - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:51:02.371 [main] INFO log.filter - | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
18:51:02.371 [main] INFO log.map - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
18:51:02.372 [main] INFO log.map - | request(unbounded)
18:51:02.372 [main] INFO log.filter - | request(unbounded)
18:51:02.372 [main] INFO log.range - | request(unbounded)
18:51:02.372 [main] INFO log.range - | onNext(1)
18:51:02.372 [main] INFO log.filter - | onNext(1)
18:51:02.372 [main] INFO log.map - | onNext(2)
18:51:02.372 [main] INFO com.codestates.example.schedulers.SchedulersExample04 - # onNext: 2
18:51:02.373 [main] INFO log.range - | onComplete()
18:51:02.373 [main] INFO log.filter - | onComplete()
18:51:02.373 [main] INFO log.map - | onComplete()
|
로그를 보면, 코드 1-1에서 발생하는 signal은 아래와 같은 순서로 전파됩니다.
range | onSubscribe() --> filter | onSubscribe() --> map | onSubscribe()
--> map | request() --> filter | request() --> range | request()
--> range | onNext() --> filter | onNext() --> map | onNext()
--> range | onComplete() --> filter | onComplete() --> map | onComplete()
먼저 대부분의 Operator들은 각각 자신만의 Subscriber
를 포함하고 있고, 이 Subscriber
와 서로 signal을 주고 받으면서 데이터를 전송하고 데이터를 전달 받는다는 것을 알 수 있습니다.
다름으로 onSubscribe, onNext, onComplete signal은 Downstream 쪽으로 전파되고, request signal은 Upstream 쪽으로 전파되는 것을 알 수 있습니다.
subscribeOn()이 추가되었을 때 Signal의 전파 흐름
그렇다면 subscribeOn() Operator가 Operator 체인에 추가되면 signal의 전파 흐름은 어떻게 변할까요?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@Slf4j
public class ReactorSignalEventPropagationExample02 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(1, 1)
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.subscribeOn(Schedulers.boundedElastic()) // (1)
.map(n -> n * 2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
|
코드 1-2는 subscribeOne() Operator가 Operator체인에 추가되었을 때 signal의 전파 흐름을 살펴보기 위한 예제 코드입니다. ( 로그 출력 결과가 복잡해 보일 것 같아 filter() Operator는 제외했습니다.)
코드 1-1과 마찬가지로 log() Operator를 각각의 Operator 다음에 추가해서 코드를 실행하면 로그 출력 결과는 아래와 같습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
21:48:27.992 [main] INFO log.subscribeOn - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
21:48:27.993 [main] INFO log.map - onSubscribe(FluxMap.MapSubscriber)
21:48:27.994 [main] INFO log.map - request(unbounded)
21:48:27.994 [main] INFO log.subscribeOn - request(unbounded)
21:48:28.003 [boundedElastic-1] INFO log.range - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
21:48:28.004 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample05 - # doOnSubscribe
21:48:28.005 [boundedElastic-1] INFO log.doOnSubscribe - | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
21:48:28.006 [boundedElastic-1] INFO log.doOnSubscribe - | request(unbounded)
21:48:28.006 [boundedElastic-1] INFO log.range - | request(unbounded)
21:48:28.006 [boundedElastic-1] INFO log.range - | onNext(1)
21:48:28.006 [boundedElastic-1] INFO log.doOnSubscribe - | onNext(1)
21:48:28.006 [boundedElastic-1] INFO log.subscribeOn - onNext(1)
21:48:28.006 [boundedElastic-1] INFO log.map - onNext(2)
21:48:28.006 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample05 - # onNext: 2
21:48:28.006 [boundedElastic-1] INFO log.range - | onComplete()
21:48:28.006 [boundedElastic-1] INFO log.doOnSubscribe - | onComplete()
21:48:28.006 [boundedElastic-1] INFO log.subscribeOn - onComplete()
21:48:28.006 [boundedElastic-1] INFO log.map - onComplete()
|
로그가 좀 복잡해보이는데 어쨌든 코드 1-2의 signal 전파 흐름을 보면 코드 1-1과 조금 다르다른 것을 알 수 있습니다.
subscribeOn()
이 추가되지 않았던 코드 1-1에서는 range()
부터 차례대로 onSubscribe signal
이 전파되었는데 코드 1-2의 경우에는 subscribeOn()
부터 onSubscribe signal
이 Downstream쪽으로 전파되고, request signal
이 다시 Upstream 쪽으로 전파되지만 range()
까지 전파되는 것이 아니라 subscribeOn()
까지만 전파되고 이 후 부터는 range()
부터 onSubscribe signal
이 subscribeOn()
까지만 전파된 후에 request signal
이 다시 range()까지 전파되고 이 후부터는 onNext signal
이 Downstream 끝까지 전파되고, 다음에 onComplete signal 역시 Downstream 끝까지 전파되는 것을 알 수 있습니다.
signal의 전파 흐름이 굉장히 복잡한데 이 복잡함을 덜 복잡하게 이해하기 위해서는 signal의 전파 흐름을 subscribeOn() Operator를 기준으로 생각하면 됩니다.
subscribeOn()
Operator를 기준으로 signal 흐름을 요약하면,
먼저onSubscribe signal
이 Downstream 쪽으로 전파
->request signal
이 Upstream 쪽으로 전파되지만subscribeOn()
Operator까지만 전파
->range()
부터onSubscribe signal
이 전파되지만subscribeOn()
까지만 전파
->request signal
이 Upstream 쪽으로 전파
->onNext signal
이 Downstream 끝까지 전파
->onComplete signal
이 Downstream 끝까지 전파
위에서 설명한 signal 전파 흐름을 코드 상의 흐름으로 간략하게 표현하면 아래의 [그림 1-1]과 같습니다.
그리고, subscribeOn()
Operator가 추가 되었을 때 signal 전파 흐름이 어떻게 되는지 Reactor 코드 내부에서 사용되는 코드를 통해 조금 더 구체적으로 표현하면 아래의 [그림 1-2]와 같습니다.
[그림 1-2]의 signal 전파 흐름이 꽤 복잡해 보이지만 Reactor에서 실제 사용되는 컴포넌트들을 이용해 [그림 1-1]을 조금 더 구체적으로 표현한 것 뿐입니다.
만일 [그림 1-2]를 이해하는것이 어렵게 느껴진다면 [그림 1-1]만 이해해도 충분하다고 생각합니다.
Operator 체인상에서 doOnSubscribe()의 위치에 따른 실행 쓰레드의 변경
[그림 1-2]를 보면 doOnSubscribe()
Operator는 subscribeOn()
위 쪽에 추가되어있습니다.
이 경우, doOnSubscribe()
의 실행 쓰레드는 코드 1-2의 로그 출력 결과에서도 확인할 수 있다시피 boundedElastic-1
쓰레드에서 실행되는 이유가 뭘까요?
doOnSubscribe()
Operator는 onSubscribe()가 호출되었을 때 이어서 호출되는 일종의 콜백입니다.
따라서 onSubscribe()
가 호출될 때의 실행 쓰레드와 동일한 쓰레드에서 실행이 됩니다.
doOnSubscribe()
를 호출되게 하는 onSubscribe()
는 어디서 호출될까요?
[그림 1-2]의 signal 전파 흐름에서도 알 수 있듯이 range()
에서 onSubscribe()
가 호출되면 뒤이어 doOnSubscribe()
가 호출됩니다.
range()의 실행 쓰레드는?
바로 boundedElastic-1
쓰레드입니다.
따라서 range()에서 호출하는 onSubscribe()도 boundedElastic-1
쓰레드이고, doOnSubscribe()의 실행 쓰레드 역시 boundedElastic-1
쓰레드입니다.
만약 doOnSubscribe()
를 subscribeOn()
바로 아래에 추가한다면 doOnSubscribe()의 실행 쓰레드는 무엇일까요?
이 경우, doOnSubscribe()
의 실행 쓰레드는 subscribeOn()
에서 호출되는 onSubscribe()
의 영향을 받습니다.
[그림 1-2]에서 subscribeOn()의 실행 쓰레드는 무엇인가요?
바로 main
쓰레드입니다.
따라서 subscribeOn()
에서 호출하는 onSubscribe()
도 main
쓰레드이고, doOnSubscribe()의 실행 쓰레드 역시 main
쓰레드입니다.
헷갈리겠지만 이것 하나만 기억하면 됩니다.
doOnSubscribe()의 실행 쓰레드는 doOnSubscribe() 바로 위 쪽에 있는 Operator가 호출하는 onSubscribe()의 실행 쓰레드와 같다.
이 글이 Reactor Sequence 상에서 발생하는 signal의 전파 흐름과 subscribeOn()과 doOnSubscribe()의 실행 쓰레드의 관계를 이해하는데 조금이라도 도움이 될 수 있길 바래봅니다.
'리액티브 프로그래밍 > Reactor' 카테고리의 다른 글
Hello Reactor 들여다 보기 (2) | 2021.01.06 |
---|---|
리액터와 WebFlux (0) | 2021.01.06 |