이번 시간에는 Reactor Sequence 상에서 발생하는 signal의 전파 흐름을 알아보도록 하겠습니다.

 

일반적인 Signal의 전파 흐름

1
2
3
4
5
6
7
8
9
10
11
12
13
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class ReactorSignalEventPropagationExample01 {
    public static void main(String[] args) {
        Flux
            .range(11)
            .filter(n -> n > 0)
            .map(n -> n * 2)
            .subscribe(data -> log.info("# onNext: {}", data));
    }
}
코드 1-1 일반적인 signal 이벤트 흐름
 

코드 1-1은 Reactor Sequence 상에서 발생하는 signal의 전파 흐름을 확인하기 위한 예제 코드입니다.

예제 코드 상에는 log() Operator가 빠져있지만 range(), filter(), map() Operator 하단에 각각 log() Operator를 추가해서 실행하면 아래와 같은 로그를 출력합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
18:51:02.355 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
18:51:02.369 [main] INFO log.range - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:51:02.371 [main] INFO log.filter - | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
18:51:02.371 [main] INFO log.map - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
18:51:02.372 [main] INFO log.map - | request(unbounded)
18:51:02.372 [main] INFO log.filter - | request(unbounded)
18:51:02.372 [main] INFO log.range - | request(unbounded)
18:51:02.372 [main] INFO log.range - | onNext(1)
18:51:02.372 [main] INFO log.filter - | onNext(1)
18:51:02.372 [main] INFO log.map - | onNext(2)
18:51:02.372 [main] INFO com.codestates.example.schedulers.SchedulersExample04 - # onNext: 2
18:51:02.373 [main] INFO log.range - | onComplete()
18:51:02.373 [main] INFO log.filter - | onComplete()
18:51:02.373 [main] INFO log.map - | onComplete()
 
 

로그를 보면, 코드 1-1에서 발생하는 signal은 아래와 같은 순서로 전파됩니다.

range | onSubscribe() --> filter | onSubscribe() --> map | onSubscribe()
--> map | request() --> filter | request() --> range | request()
--> range | onNext() --> filter | onNext() --> map | onNext()
--> range | onComplete() --> filter | onComplete() --> map | onComplete()
 

 

이를 통해 알 수 있는 사실 두 가지를 살펴보면,

먼저 대부분의 Operator들은 각각 자신만의 Subscriber를 포함하고 있고, 이 Subscriber와 서로 signal을 주고 받으면서 데이터를 전송하고 데이터를 전달 받는다는 것을 알 수 있습니다.

다름으로 onSubscribe, onNext, onComplete signal은 Downstream 쪽으로 전파되고, request signal은 Upstream 쪽으로 전파되는 것을 알 수 있습니다.

 

subscribeOn()이 추가되었을 때 Signal의 전파 흐름

그렇다면 subscribeOn() Operator가 Operator 체인에 추가되면 signal의 전파 흐름은 어떻게 변할까요?

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class ReactorSignalEventPropagationExample02 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .range(11)
            .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
          .subscribeOn(Schedulers.boundedElastic()) // (1)
            .map(n -> n * 2)
            .subscribe(data -> log.info("# onNext: {}", data));
 
        Thread.sleep(100L);
    }
}
 

코드 1-2는 subscribeOne() Operator가 Operator체인에 추가되었을 때 signal의 전파 흐름을 살펴보기 위한 예제 코드입니다. ( 로그 출력 결과가 복잡해 보일 것 같아 filter() Operator는 제외했습니다.)

코드 1-1과 마찬가지로 log() Operator를 각각의 Operator 다음에 추가해서 코드를 실행하면 로그 출력 결과는 아래와 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21:48:27.992 [main] INFO log.subscribeOn - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
21:48:27.993 [main] INFO log.map - onSubscribe(FluxMap.MapSubscriber)
21:48:27.994 [main] INFO log.map - request(unbounded)
21:48:27.994 [main] INFO log.subscribeOn - request(unbounded)
21:48:28.003 [boundedElastic-1] INFO log.range - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
21:48:28.004 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample05 - # doOnSubscribe
21:48:28.005 [boundedElastic-1] INFO log.doOnSubscribe - | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
21:48:28.006 [boundedElastic-1] INFO log.doOnSubscribe - | request(unbounded)
21:48:28.006 [boundedElastic-1] INFO log.range - | request(unbounded)
21:48:28.006 [boundedElastic-1] INFO log.range - | onNext(1)
21:48:28.006 [boundedElastic-1] INFO log.doOnSubscribe - | onNext(1)
21:48:28.006 [boundedElastic-1] INFO log.subscribeOn - onNext(1)
21:48:28.006 [boundedElastic-1] INFO log.map - onNext(2)
21:48:28.006 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample05 - # onNext: 2
21:48:28.006 [boundedElastic-1] INFO log.range - | onComplete()
21:48:28.006 [boundedElastic-1] INFO log.doOnSubscribe - | onComplete()
21:48:28.006 [boundedElastic-1] INFO log.subscribeOn - onComplete()
21:48:28.006 [boundedElastic-1] INFO log.map - onComplete()
 
 

로그가 좀 복잡해보이는데 어쨌든 코드 1-2의 signal 전파 흐름을 보면 코드 1-1과 조금 다르다른 것을 알 수 있습니다.

subscribeOn()이 추가되지 않았던 코드 1-1에서는 range()부터 차례대로 onSubscribe signal이 전파되었는데 코드 1-2의 경우에는 subscribeOn()부터 onSubscribe signal이 Downstream쪽으로 전파되고, request signal이 다시 Upstream 쪽으로 전파되지만 range()까지 전파되는 것이 아니라 subscribeOn()까지만 전파되고 이 후 부터는 range()부터 onSubscribe signalsubscribeOn()까지만 전파된 후에 request signal이 다시 range()까지 전파되고 이 후부터는 onNext signal이 Downstream 끝까지 전파되고, 다음에 onComplete signal 역시 Downstream 끝까지 전파되는 것을 알 수 있습니다.

signal의 전파 흐름이 굉장히 복잡한데 이 복잡함을 덜 복잡하게 이해하기 위해서는 signal의 전파 흐름을 subscribeOn() Operator를 기준으로 생각하면 됩니다.

subscribeOn() Operator를 기준으로 signal 흐름을 요약하면,

먼저 onSubscribe signal이 Downstream 쪽으로 전파
-> request signal이 Upstream 쪽으로 전파되지만 subscribeOn() Operator까지만 전파
-> range() 부터 onSubscribe signal이 전파되지만 subscribeOn() 까지만 전파
-> request signal이 Upstream 쪽으로 전파
-> onNext signal이 Downstream 끝까지 전파
-> onComplete signal이 Downstream 끝까지 전파

 

위에서 설명한 signal 전파 흐름을 코드 상의 흐름으로 간략하게 표현하면 아래의 [그림 1-1]과 같습니다.

[그림 1-1] subscribeOn()이 추가될 경우의 signal 전파 흐름

 

 

그리고, subscribeOn() Operator가 추가 되었을 때 signal 전파 흐름이 어떻게 되는지 Reactor 코드 내부에서 사용되는 코드를 통해 조금 더 구체적으로 표현하면 아래의 [그림 1-2]와 같습니다.

[그림 1-2] subscribeOn()이 추가될 경우의 signal 전파 흐름(detail description)

[그림 1-2]의 signal 전파 흐름이 꽤 복잡해 보이지만 Reactor에서 실제 사용되는 컴포넌트들을 이용해 [그림 1-1]을 조금 더 구체적으로 표현한 것 뿐입니다.

만일 [그림 1-2]를 이해하는것이 어렵게 느껴진다면 [그림 1-1]만 이해해도 충분하다고 생각합니다.

 

Operator 체인상에서 doOnSubscribe()의 위치에 따른 실행 쓰레드의 변경

[그림 1-2]를 보면 doOnSubscribe() Operator는 subscribeOn() 위 쪽에 추가되어있습니다. 

이 경우, doOnSubscribe()의 실행 쓰레드는 코드 1-2의 로그 출력 결과에서도 확인할 수 있다시피 boundedElastic-1 쓰레드에서 실행되는 이유가 뭘까요?

doOnSubscribe() Operator는 onSubscribe()가 호출되었을 때 이어서 호출되는 일종의 콜백입니다.
따라서 onSubscribe()가 호출될 때의 실행 쓰레드와 동일한 쓰레드에서 실행이 됩니다.

doOnSubscribe()를 호출되게 하는 onSubscribe()는 어디서 호출될까요?

[그림 1-2]의 signal 전파 흐름에서도 알 수 있듯이 range()에서 onSubscribe()가 호출되면 뒤이어 doOnSubscribe()가 호출됩니다.

range()의 실행 쓰레드는? 

바로 boundedElastic-1 쓰레드입니다. 

따라서 range()에서 호출하는 onSubscribe()도 boundedElastic-1 쓰레드이고, doOnSubscribe()의 실행 쓰레드 역시 boundedElastic-1 쓰레드입니다.

 

만약 doOnSubscribe()subscribeOn() 바로 아래에 추가한다면 doOnSubscribe()의 실행 쓰레드는 무엇일까요?

이 경우, doOnSubscribe()의 실행 쓰레드는 subscribeOn()에서 호출되는 onSubscribe()의 영향을 받습니다.

[그림 1-2]에서 subscribeOn()의 실행 쓰레드는 무엇인가요?

바로 main 쓰레드입니다.

따라서 subscribeOn()에서 호출하는 onSubscribe() main 쓰레드이고, doOnSubscribe()의 실행 쓰레드 역시 main 쓰레드입니다.

 

헷갈리겠지만 이것 하나만 기억하면 됩니다.

doOnSubscribe()의 실행 쓰레드는 doOnSubscribe() 바로 위 쪽에 있는 Operator가 호출하는 onSubscribe()의 실행 쓰레드와 같다.

 

이 글이 Reactor Sequence 상에서 발생하는 signal의 전파 흐름과 subscribeOn()과 doOnSubscribe()의 실행 쓰레드의 관계를 이해하는데 조금이라도 도움이 될 수 있길 바래봅니다.

'리액티브 프로그래밍 > Reactor' 카테고리의 다른 글

Hello Reactor 들여다 보기  (2) 2021.01.06
리액터와 WebFlux  (0) 2021.01.06

+ Recent posts

출처: http://large.tistory.com/23 [Large]