이번 시간에는 Reactor Sequence 상에서 발생하는 signal의 전파 흐름을 알아보도록 하겠습니다.

 

일반적인 Signal의 전파 흐름

1
2
3
4
5
6
7
8
9
10
11
12
13
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class ReactorSignalEventPropagationExample01 {
    public static void main(String[] args) {
        Flux
            .range(11)
            .filter(n -> n > 0)
            .map(n -> n * 2)
            .subscribe(data -> log.info("# onNext: {}", data));
    }
}
코드 1-1 일반적인 signal 이벤트 흐름
 

코드 1-1은 Reactor Sequence 상에서 발생하는 signal의 전파 흐름을 확인하기 위한 예제 코드입니다.

예제 코드 상에는 log() Operator가 빠져있지만 range(), filter(), map() Operator 하단에 각각 log() Operator를 추가해서 실행하면 아래와 같은 로그를 출력합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
18:51:02.355 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
18:51:02.369 [main] INFO log.range - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:51:02.371 [main] INFO log.filter - | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
18:51:02.371 [main] INFO log.map - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
18:51:02.372 [main] INFO log.map - | request(unbounded)
18:51:02.372 [main] INFO log.filter - | request(unbounded)
18:51:02.372 [main] INFO log.range - | request(unbounded)
18:51:02.372 [main] INFO log.range - | onNext(1)
18:51:02.372 [main] INFO log.filter - | onNext(1)
18:51:02.372 [main] INFO log.map - | onNext(2)
18:51:02.372 [main] INFO com.codestates.example.schedulers.SchedulersExample04 - # onNext: 2
18:51:02.373 [main] INFO log.range - | onComplete()
18:51:02.373 [main] INFO log.filter - | onComplete()
18:51:02.373 [main] INFO log.map - | onComplete()
 
 

로그를 보면, 코드 1-1에서 발생하는 signal은 아래와 같은 순서로 전파됩니다.

range | onSubscribe() --> filter | onSubscribe() --> map | onSubscribe()
--> map | request() --> filter | request() --> range | request()
--> range | onNext() --> filter | onNext() --> map | onNext()
--> range | onComplete() --> filter | onComplete() --> map | onComplete()
 

 

이를 통해 알 수 있는 사실 두 가지를 살펴보면,

먼저 대부분의 Operator들은 각각 자신만의 Subscriber를 포함하고 있고, 이 Subscriber와 서로 signal을 주고 받으면서 데이터를 전송하고 데이터를 전달 받는다는 것을 알 수 있습니다.

다름으로 onSubscribe, onNext, onComplete signal은 Downstream 쪽으로 전파되고, request signal은 Upstream 쪽으로 전파되는 것을 알 수 있습니다.

 

subscribeOn()이 추가되었을 때 Signal의 전파 흐름

그렇다면 subscribeOn() Operator가 Operator 체인에 추가되면 signal의 전파 흐름은 어떻게 변할까요?

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class ReactorSignalEventPropagationExample02 {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .range(11)
            .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
          .subscribeOn(Schedulers.boundedElastic()) // (1)
            .map(n -> n * 2)
            .subscribe(data -> log.info("# onNext: {}", data));
 
        Thread.sleep(100L);
    }
}
 

코드 1-2는 subscribeOne() Operator가 Operator체인에 추가되었을 때 signal의 전파 흐름을 살펴보기 위한 예제 코드입니다. ( 로그 출력 결과가 복잡해 보일 것 같아 filter() Operator는 제외했습니다.)

코드 1-1과 마찬가지로 log() Operator를 각각의 Operator 다음에 추가해서 코드를 실행하면 로그 출력 결과는 아래와 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
21:48:27.992 [main] INFO log.subscribeOn - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
21:48:27.993 [main] INFO log.map - onSubscribe(FluxMap.MapSubscriber)
21:48:27.994 [main] INFO log.map - request(unbounded)
21:48:27.994 [main] INFO log.subscribeOn - request(unbounded)
21:48:28.003 [boundedElastic-1] INFO log.range - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
21:48:28.004 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample05 - # doOnSubscribe
21:48:28.005 [boundedElastic-1] INFO log.doOnSubscribe - | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
21:48:28.006 [boundedElastic-1] INFO log.doOnSubscribe - | request(unbounded)
21:48:28.006 [boundedElastic-1] INFO log.range - | request(unbounded)
21:48:28.006 [boundedElastic-1] INFO log.range - | onNext(1)
21:48:28.006 [boundedElastic-1] INFO log.doOnSubscribe - | onNext(1)
21:48:28.006 [boundedElastic-1] INFO log.subscribeOn - onNext(1)
21:48:28.006 [boundedElastic-1] INFO log.map - onNext(2)
21:48:28.006 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample05 - # onNext: 2
21:48:28.006 [boundedElastic-1] INFO log.range - | onComplete()
21:48:28.006 [boundedElastic-1] INFO log.doOnSubscribe - | onComplete()
21:48:28.006 [boundedElastic-1] INFO log.subscribeOn - onComplete()
21:48:28.006 [boundedElastic-1] INFO log.map - onComplete()
 
 

로그가 좀 복잡해보이는데 어쨌든 코드 1-2의 signal 전파 흐름을 보면 코드 1-1과 조금 다르다른 것을 알 수 있습니다.

subscribeOn()이 추가되지 않았던 코드 1-1에서는 range()부터 차례대로 onSubscribe signal이 전파되었는데 코드 1-2의 경우에는 subscribeOn()부터 onSubscribe signal이 Downstream쪽으로 전파되고, request signal이 다시 Upstream 쪽으로 전파되지만 range()까지 전파되는 것이 아니라 subscribeOn()까지만 전파되고 이 후 부터는 range()부터 onSubscribe signalsubscribeOn()까지만 전파된 후에 request signal이 다시 range()까지 전파되고 이 후부터는 onNext signal이 Downstream 끝까지 전파되고, 다음에 onComplete signal 역시 Downstream 끝까지 전파되는 것을 알 수 있습니다.

signal의 전파 흐름이 굉장히 복잡한데 이 복잡함을 덜 복잡하게 이해하기 위해서는 signal의 전파 흐름을 subscribeOn() Operator를 기준으로 생각하면 됩니다.

subscribeOn() Operator를 기준으로 signal 흐름을 요약하면,

먼저 onSubscribe signal이 Downstream 쪽으로 전파
-> request signal이 Upstream 쪽으로 전파되지만 subscribeOn() Operator까지만 전파
-> range() 부터 onSubscribe signal이 전파되지만 subscribeOn() 까지만 전파
-> request signal이 Upstream 쪽으로 전파
-> onNext signal이 Downstream 끝까지 전파
-> onComplete signal이 Downstream 끝까지 전파

 

위에서 설명한 signal 전파 흐름을 코드 상의 흐름으로 간략하게 표현하면 아래의 [그림 1-1]과 같습니다.

[그림 1-1] subscribeOn()이 추가될 경우의 signal 전파 흐름

 

 

그리고, subscribeOn() Operator가 추가 되었을 때 signal 전파 흐름이 어떻게 되는지 Reactor 코드 내부에서 사용되는 코드를 통해 조금 더 구체적으로 표현하면 아래의 [그림 1-2]와 같습니다.

[그림 1-2] subscribeOn()이 추가될 경우의 signal 전파 흐름(detail description)

[그림 1-2]의 signal 전파 흐름이 꽤 복잡해 보이지만 Reactor에서 실제 사용되는 컴포넌트들을 이용해 [그림 1-1]을 조금 더 구체적으로 표현한 것 뿐입니다.

만일 [그림 1-2]를 이해하는것이 어렵게 느껴진다면 [그림 1-1]만 이해해도 충분하다고 생각합니다.

 

Operator 체인상에서 doOnSubscribe()의 위치에 따른 실행 쓰레드의 변경

[그림 1-2]를 보면 doOnSubscribe() Operator는 subscribeOn() 위 쪽에 추가되어있습니다. 

이 경우, doOnSubscribe()의 실행 쓰레드는 코드 1-2의 로그 출력 결과에서도 확인할 수 있다시피 boundedElastic-1 쓰레드에서 실행되는 이유가 뭘까요?

doOnSubscribe() Operator는 onSubscribe()가 호출되었을 때 이어서 호출되는 일종의 콜백입니다.
따라서 onSubscribe()가 호출될 때의 실행 쓰레드와 동일한 쓰레드에서 실행이 됩니다.

doOnSubscribe()를 호출되게 하는 onSubscribe()는 어디서 호출될까요?

[그림 1-2]의 signal 전파 흐름에서도 알 수 있듯이 range()에서 onSubscribe()가 호출되면 뒤이어 doOnSubscribe()가 호출됩니다.

range()의 실행 쓰레드는? 

바로 boundedElastic-1 쓰레드입니다. 

따라서 range()에서 호출하는 onSubscribe()도 boundedElastic-1 쓰레드이고, doOnSubscribe()의 실행 쓰레드 역시 boundedElastic-1 쓰레드입니다.

 

만약 doOnSubscribe()subscribeOn() 바로 아래에 추가한다면 doOnSubscribe()의 실행 쓰레드는 무엇일까요?

이 경우, doOnSubscribe()의 실행 쓰레드는 subscribeOn()에서 호출되는 onSubscribe()의 영향을 받습니다.

[그림 1-2]에서 subscribeOn()의 실행 쓰레드는 무엇인가요?

바로 main 쓰레드입니다.

따라서 subscribeOn()에서 호출하는 onSubscribe() main 쓰레드이고, doOnSubscribe()의 실행 쓰레드 역시 main 쓰레드입니다.

 

헷갈리겠지만 이것 하나만 기억하면 됩니다.

doOnSubscribe()의 실행 쓰레드는 doOnSubscribe() 바로 위 쪽에 있는 Operator가 호출하는 onSubscribe()의 실행 쓰레드와 같다.

 

이 글이 Reactor Sequence 상에서 발생하는 signal의 전파 흐름과 subscribeOn()과 doOnSubscribe()의 실행 쓰레드의 관계를 이해하는데 조금이라도 도움이 될 수 있길 바래봅니다.

'리액티브 프로그래밍 > Reactor' 카테고리의 다른 글

Hello Reactor 들여다 보기  (2) 2021.01.06
리액터와 WebFlux  (0) 2021.01.06

안녕하세요? [Kevin의 알기 쉬운 RxJava] 강의를 진행하고 있는 Kevin입니다.

이번 시간에는 'Hello, Reactor' 코드를 가지고 Reactor의 기본 구조를 들여다보는 시간을 가져보겠습니다.

개발에 처음 입문하실때 대부분 'Hello World!' 코드를 실행해보셨을텐데요. Reactor 역시 마찬가지로 'Hello, Reactor' 메시지를 출력해보면서 Reactor의 기본 구조를 간단하게 살펴보도록 하겠습니다.

어렵지 않으니 가벼운 마음으로 글을 읽어주시면 감사드리겠습니다.^^

먼저 아래의 코드를 보시죠.

[Hello Reactor 코드]

 

여러분들이 아시다 시피 Reactor는 RxJava와 마찬가지로 Reactive Streams라는 표준 사양을 구현한 구현체입니다. 따라서 Reactive Streams에서 정의하고 있는 생산자 즉, Publisher와 소비자 즉, Subscriber를 구현하고 있는데요.

위의 코드에서 보시는것처럼 Reactor에서는 Flux가 대표적인 생산자 중에 하나입니다.
위의 코드에서 소비자라고 되어 있는 부분은 람다 베이스 Subscriber가 되겠습니다.

생산자 쪽에서는 just라는 Operator(연산자)를 사용해서 두 개의 데이터를 소비자 쪽으로 통지를 하는데요. Reactor에서는 생산자 쪽에서 통지할 데이터를 정의하는 과정 자체를 묶어서 'Sequence(시퀀스)'라고 합니다. 이 Sequence라는 용어는 Reactor를 접하다보면 자주 나오는 용어이니 기억을 해두시길 바래볼게요.

생산자 쪽에서 소비자 쪽으로 데이터를 통지하는것이 맞긴하지만 최종 소비자까지 데이터가 전달 되기까지는 중간에 여러 Operator를 만날 수 있기때문에 엄밀히 말하자면 생산자가 소비자에게 데이터를 통지한다라고 말하기보다는 생산자가 Downstream으로 데이터를 내보낸다라는 표현이 더 정확할 것 같네요.

용어에 대한 부분은 아래쪽에서 다시 정의를 해보도록 하구요.

아무튼 생산자 쪽에서 데이터를 통지하게 되면 최종 소비자 쪽에 바로 데이터가 전달되는 것이 아니라 map( )이라는 Operator에서 원본 데이터가 가공 처리 된 후에 최종 소비자 쪽으로 전달이 되는것을 볼 수 있는데요.

여기서는 통지된 String 데이터를 소문자로 변환을 한 후에 최종 소비자쪽으로 전달하고 있네요.

자, RxJava 강의 시간에도 말씀을 드렸지만 리액티브 프로그래밍은 크게 세가지의 step으로 이루어져있습니다.

1 step : 생산자가 데이터를 통지한다.
2 step : 통지된 데이터를 Operator로 가공한다.
3 step : 가공 처리된 데이터를 최종 소비자에게 전달한다.

이 세가지 큰 흐름은 기본적으로 꼭 기억을 해두시고, 나머지는 데이터를 통지하는 다양한 방법, 데이터를 가공하는 다양한 방법, 비동기 프로그래밍을 위한 쓰레드를 할당하는 방법, 에러 처리 방법 등을 단계적으로 적용 하는 식으로 학습을 진행해 나가면 리액티브 프로그래밍을 조금 더 수월하게 이해하실 수 있을거라는 생각이 드네요. ^^;

자, 그럼 마지막으로 Reactor에 대한 에피소드를 계속 진행가기 전에 Reactor에서 사용되는 여러가지 용어를 정의하고 에피소드를 마무리 하도록 하겠습니다.

먼저 아래 용어들을 보시죠.

  • Publisher : 발행자, 게시자, 생산자, 방출자(Emitter)
  • Subscriber : 구독자, 소비자 
  • Emit : Publisher가 데이터를 내보내는 것(방출하다. 내보내다. 통지하다.) 
  • Sequence : Publisher가 emit하는 데이터의 연속적인 흐름. 스트림과 같은 의미라고 보면 됨 
  • Subscribe : SubscriberSequence를 구독하는 것
  • Dispose : Suscriber가 Sequence 구독을 해지 하는 것 
  • Downstream : 현재 Operator 체인의 위치에서 봤을때 데이터가 전달 되는 하위 Operator 및 method 체인
  • Upstream : 현재 Operator 체인의 위치에서 봤을때 상위 Operator 및 method 체인

여태껏 제가 데이터를 통지하는 쪽은 생산자로 지칭했는데요. 보시는대로 생산자 이외에 발행자, 게시자, 방출자 등 다양하게 사용이 되고 있습니다. 모두 같은 의미라고 보시면 되겠지만 앞으로 에피소드를 진행할 때는 그냥 영어로 Publisher라고 지칭하도록 하겠습니다. ^^;

따라서 소비자 역시 영어로 Subscriber라고 지칭하면 되겠구요.

데이터를 통지하는 행위 역시 영어 그대로 Emit 한다라고 하겠습니다.

Sequence는 위에서 잠깐 말씀 드렸죠? Publisher쪽에서 데이터를 emit하는 흐름 자체를 통칭하는 것을 의미하는데, 예를 들어서 Flux라는 Publisher가 데이터의 Emit을 정의하는 부분을 설명할때 Sequence를 생성한다 라고 표현할 수 있겠습니다.

Subscribe와 Dispose는 구독하다, 구독을 해지하다라고 표현하도록 하겠습니다.

마지막으로 Downstream과 Upstream이라는 용어인데요. Reactor를 학습하다보면 종종 나오는 용어인데 개념적으로 이해하기 모호한 면이 있는 용어중에 하나이기도 합니다.

이럴때는 그냥 단순하게 이해를 하는게 제일 좋을거라고 생각을 해보게되네요. ^^;
예를 들어 위의 코드에서 just( )의 위치에서 Downstream은 map( )부터 subscribe( )까지 하위 체인이 되겠습니다. 그리고 subscribe( ) 입장에서는 map( )과 just( )가 Upstream이 되겠구요. 

just( )의 내부를 들여다보면 return 값으로 Flux를 반환하는데요. just( )에서 반환하는 Flux를 원본 Flux가 되겠습니다. 그리고 map( )에서 반환하는 return 값 역시 Flux인데요. 여기서의 Flux는 원본 Flux가 아닌 가공 처리된 데이터를 가지고 있는 새로운 Flux입니다. 

Flux(또는 Mono)의 이러한 처리 흐름때문에 Downstream이나 Upstream이라는 용어가 생겨난것 같은데 위에서 언급드린것처럼 단순하게 생각해주시면 될것 같아요.

자, 오늘은 'Hello, Reactor' 코드를 통해서 Reactor 의 기본 구조를 살펴보았는데요. 
다음 시간에 다른 에피소드를 가지고 다시 찾아뵙도록 하겠습니다. ^^

감사합니다!

[Kevin의 알기 쉬운 RxJava 1부] 강의 바로가기

 

Kevin의 알기 쉬운 RxJava 1부 - 인프런

리액티브 프로그래밍이라는 진입 장벽을 넘고 싶으신가요? Kevin의 알기 쉬운 RxJava가 그 벽을 넘을 수 있는 힘을 키워드리겠습니다. 초급 프레임워크 및 라이브러리 함수형 프로그래밍 RxJava Reacti

www.inflearn.com

[Kevin의 알기 쉬운 RxJava 2부] 강의 바로가기

 

Kevin의 알기 쉬운 RxJava 2부 - 인프런

리액티브 프로그래밍이라는 진입 장벽을 넘고 싶으신가요? Kevin의 알기 쉬운 RxJava가 그 벽을 넘을 수 있는 힘을 키워드리겠습니다. 초급 프레임워크 및 라이브러리 함수형 프로그래밍 RxJava Reacti

www.inflearn.com

 

여러분 안녕하세요? [Kevin의 알기 쉬운 RxJava 강의]를 진행하고 있는 Kevin이라고 합니다. 지난번 공지에서 잠깐 말씀을 드렸다시피 매주 한두번 정도 시간을 내서 강의에서 제대로 얘기하지 못했던 리액티브 프로그래밍에 대한 뒷 얘기들을 조금씩 해볼까 합니다. 리액티브 프로그래밍을 배우고자 하시는 분들께 조금이나마 더 도움이 되길 바래보겠습니다.

리액터(Reactor)란 무엇일까요?

리액티브 프로그래밍에 대해서 처음 들으시는 분들은 리액터에 대해서 당연히 모르실거 같은데 아마도 스프링을 학습하면서 들어 보신분도 있지 않을까하는 생각도 들긴하네요. ^^예상하신분도 계시겠지만 리액터는 Java로 구현 된 리액티브 프로그래밍의 한 종류입니다. 더 정확히 얘길 하자면 Reactive Streams를 구현한 구현체라고 볼 수 있는데요.강의를 수강하신 분들은 이미 알고 계시지만 RxJava 역시 Reactive Streams를 구현한 구현체의 하나라고 볼 수 있습니다. 일반적으로 Reactive Streams를 구현한 구현체들은 대부분 Rx~로 시작을 하는데 Reactor는 그것보다 더 확실하게 "나는 리액티브 프로그래밍을 사랑하는 Reactor야" 라고 얘길 하는것 같군요.^^Reactor라는 이름에서 리액티브 프로그래밍의 느낌이 팍팍 오지 않으신가요?ㅎ

리액터(Reactor)를 배워야 할까 말아야 할까

결론부터 말씀드리자면 리액티브 프로그래밍을 배우기로 하셨다면 리액터는 배우는것이 맞다고 생각합니다. 두가지 이유만 얘길 드리자면 먼저 리액터를 통해서 저희는 명령형 프로그래밍의 한계를 넘어설 수 있는 그리고 시대에 맞는 개발 패러다임을 이해하고 실무에 점차적으로 사용을 하게 될 것입니다. 끊임없이 생산되는 데이터를 효과적인 비동기 상태로 처리할 수 있어야 한다는 사실은 필연적이라는 생각이 드니까요. 그렇기 때문에 Reactor는 꼭 학습을 하시는게 좋을 것이라는 생각이 드네요.

굳이 왜 리액터일까?

근데 왜 굳이 리액터를 배워야 되는가.. 라고 생각을 하실수도 있겠네요.^^; 저희는 지금 RxJava를 배우고 있는데 말이죠. 음.. Java 애플리케이션을 개발해보신 분들은 아시겠지만 Java 애플리케이션을 개발하면서 Spring Framework을 빼놓고 얘기하는게 어려워진 것은 오래되었습니다.물론 Spring을 사용하지 않고, 애플리케이션을 만들 수는 있지만 Spring을 일단 한 번 사용하게되면 Spring 없이 개발하겠다고 얘기하는 분들은 많지 않을거 같아요.Spring 얘기를 왜 하느냐하면 Reactor가 바로 Spring5에서 지원하는 Reactive 프로그래밍의 기본이 되기 때문인데요. Spring을 사용하시는 분들, 그리고 앞으로 Spring을 접하실 분들이라면 Spring과 궁합이 딱 맞는 Reactor를 배우지 않을 이유가 있을까 싶습니다. Spring과 Reactor를 사용해서 Spring MVC 기반의 애플리케이션 대신에 리액티브 애플리케이션을 만들어보는것. 재미있을 것 같지 않나요? ^^

그럼 RxJava는 배우지 말라는거?

그건 당연히 아닙니다. 제가 RxJava 강의를 오픈한 가장 큰 이유 중에 한 가지는 RxJava라는 기술 자체 보다는 리액티브 프로그래밍에 대해서 알려드리고 싶어서이기 때문인데요. 리액티브 프로그래밍은 처음 접하게 되면 이해하기가 쉽지 않기때문에 리액티브 프로그래밍에 대한 기본 개념을 잡을 수 있도록 하는것이 제 강의의 가장 큰 목표라고 볼 수 있습니다. 그런 의미에서 제 강의는 다른 의미로 여러분들한테 충분히 의미있는 강의가 될거라고 믿어보겠습니다.^^제 강의를 듣고 Reactor를 접하게 된 후에 생각했던 것보다 Reactor가 어렵지 않구나라는 얘기를 꼭 들었으면 좋겠네요. ^^

Reactor는 이제 알겠는데 WebFlux는 뭐지?

저희 강의를 조금이라도 수강을 하신 분들은 알고 계실텐데요. RxJava에서 데이터를 통지(또는 발행, 방출)하는 생산자의 역할을 하는 놈이 Observable, Flowable, Single, Maybe 등인데, Reactor에서 이러한 역할을 하는 놈이 바로 Flux와 Mono라는 놈입니다.RxJava에서는 데이터를 한 건만 통지할 때에는 Single을 사용하고, 데이터를 한 건도 통지하지 않거나 한 건만 통지할 때는 Maybe를 사용 하죠. 그리고 데이터를 한 건 이상 통지할 때에는 Observable 또는 Flowable을 사용을 합니다. 뭔가 좀 복잡하죠? Reactor에서는 이걸 조금 단순화해서 Mono라는 놈은 딱 한 건의 데이터만 처리하고, Flux는 한건도 통지하지않거나 한건 이상 통지하는데 사용을 하도록했습니다.Flux에 대해서 말씀을 드렸으니 WebFlux는 무얼하는 놈인지 대충 감이 오시죠?
'혹시 Web에서 사용하는 Flux?' 맞습니다.^^ Spring MVC가 웹 계층에 특화된 웹프레임워크인 것과 마찬가지로 WebFlux는 웹 계층에서 비동기 처리를 하는데 특화된 리액티브 프로그래밍 기반의 웹 프레임워크라고 할 수 있습니다. 그러니까 우리는 WebFlux라는 저놈을 Spring MVC 처럼 효과적으로 잘 사용 하기 위해서 Reactor를 먼저 배우게 되는 것입니다. 구체적으로 어떻게 사용하는지는 아직 자세히 모르더라도 WebFlux가 대충 뭐하는 놈인지는 대충 이해가 되시죠?그러면 Reactor의 Flux를 사용하는 코드 예시를 조금 살펴보고 오늘 이야기를 마무리 하도록할게요. 먼저 RxJava에서 사용하는 코드부터 간단하게 살펴보겠습니다.

@Test
public void ObservableFilterTest() {
	Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
		.filter(n -> n % 2 == 0);

    observable.test()
    		.assertValues(2, 4, 6, 8, 10)
    		.assertComplete();
}

RxJava 2부 강의에서 만나보시겠지만 Observable에서 통지하는 데이터들에 대한 Unit Test 코드입니다. Observable의 just() 함수를 사용해서 1부터 10까지 10개의 데이터를 통지하면 filter() 함수에서 통지된 데이터를 2로 나눈 나머지가 0인 숫자 즉, 짝수인 숫자만 필터링을 해서 최종적으로 소비자 쪽에 통지를 하는 간단한 코드인데요.이렇게 통지된 데이터가 정상적으로 소비자 쪽에 전달이 되는지 검증하기 위해서 Observable의 test() 함수를 이용해서 검증을 합니다.자, 그러면 이번에는 Reactor에서 사용하는 코드를 한번 보실까요?

@Test
public void fluxFilterTest() {
  Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  		.filter(n -> n % 2 == 0);

  StepVerifier.create(flux)
  		.expectNext(2, 4, 6, 8, 10)
 	 	.verifyComplete();
}

Flux의 just()함수를 사용해서 1부터 10까지 10개의 데이터를 통지하면 filter()함수에서 역시 짝수인 숫자만 필터링을 해서 소비자 쪽에 통지를 합니다. 그러면 StepVerifier 클래스를 이용해서 소비자 쪽에 전달되는 데이터를 검증을 하고 있습니다.RxJava와 거의 똑같다는거 느껴지시죠? 그렇기때문에 여러분들은 저희 강의를 통해서 리액티브 프로그래밍의 기본 개념을 확실하게 자기 것으로 만든 후에 Reactor의 코드를 보면 '아.. 어렵지 않네?'라고 말씀하실수 있을거라고 생각합니다. ^^그럼 저는 다음 시간에 리액티브 프로그래밍의 또 다른 에피소드를 가지고 다시 뵙도록 할게요. 감사합니다!

 

 

Kevin의 알기 쉬운 RxJava 1부 - 인프런

리액티브 프로그래밍이라는 진입 장벽을 넘고 싶으신가요? Kevin의 알기 쉬운 RxJava가 그 벽을 넘을 수 있는 힘을 키워드리겠습니다. 초급 프레임워크 및 라이브러리 함수형 프로그래밍 RxJava Reacti

www.inflearn.com

 

Kevin의 알기 쉬운 RxJava 2부 - 인프런

리액티브 프로그래밍이라는 진입 장벽을 넘고 싶으신가요? Kevin의 알기 쉬운 RxJava가 그 벽을 넘을 수 있는 힘을 키워드리겠습니다. 초급 프레임워크 및 라이브러리 함수형 프로그래밍 RxJava Reacti

www.inflearn.com

 

+ Recent posts

출처: http://large.tistory.com/23 [Large]