2024.05.22 - [Spring/대용량 트래픽] - Spring MVC와 Webflux
이전 글에서는 Spring MVC와 Webflux의 비교, Webflux에서 첫번째로 처리 해주는 컴포넌트이자, 고성능 프레임워크 Netty에 대해서 알아보았다. 이제 뒷단에 있는 Reactor에 대해서 자세히 알아보자.
우선 알아야할것은 Reactive Stream이란 무엇인가? 이다.
Reactive Stream
비동기 데이터 스트림 처리를 위해 표준화된 스펙(인터페이스)을 말한다.
이 스펙을 구현한것이 바로 Reactor 이다. Spring Webflux에서 Reactor를 활용하므로써, Web요청에 대한 비동기처리를 진행한다.
Reactive Stream에 대한 구성요소는 크게 4가지라고 볼수 있다.
- Publisher : 데이터 발행
- subscriber : 구독을 통해 데이터를 소비
- subscription : subscriber ↔ Publisher 간의 요청/응답을 주고 받는 객체
- processor : subscriber, Publisher를 상속받는 인터페이스, 데이터 가공 or 가공된 데이터 전달
당연하게도 대표적인 특징은 asynchronous(비동기)처리가 가능하다는 점이다. 데이터 처리 단계에서 같은 스레드나, 별도의 스레드로 동작시킬수 있고, 병렬처리 또한 가능하다.
다음 대표적인 특징으로는 back pressure이다. publisher에 의해 생산되는 데이터를 subscriber가 충분히 처리하지 못할때 나타난다. 이런 상황에서 속도의 불균형을 해결할 수 있는 장치를 back pressure이라고 한다.
실제로 subscriber는 request(n) 메서드에 인자로 처리가능한 양을 전달함으로써 해결하고, publisher는 이 메서드를 받고 subscriber에게 그에 맞는 데이터를 전달하게 된다.
** subscriber가 request(n) 메서드를 호출할때, 데이터를 받을때 까지 blocking 하지 않는다. 그래서 정확히 하자면 non-blocking back pressure이라고 한다.
Project reactor
Reactive Stream의 구현체중 하나이다. Spring Webflux에서 사용한다. 또한 VMWare에서 관리하는 오픈소스이다.
특징으로는 다음의 것들이있다.
- Fully Non-Blocking 지원
- Functional API, Completable Future, Stream, Duration과 직접 상호작용
- Netty와 연동
그렇다면 Stream은 대체 뭘까?
- Stream : 아래 두 개념으로 정의 될 수 있다.
- Flux : 한개 이상의 데이터
- Mono : 단일 또는 데이터가 없음을 의미
공식 홈페이지에서 프로젝트 세팅을 위한 다양한 라이브러리와 문서를 볼수 있다.
프로젝트 준비
나는 Reactor Core에 대해서 의존성 설정을 해보겠다.
plugins {
id "io.spring.dependency-management" version "1.0.7.RELEASE"
}
dependencyManagement {
imports {
mavenBom "io.projectreactor:reactor-bom:2023.0.6"
}
}
dependencies {
implementation 'io.projectreactor:reactor-core'
}
dependencies {
implementation platform('io.projectreactor:reactor-bom:2023.0.6')
implementation 'io.projectreactor:reactor-core'
}
InteliJ에서 새로운 프로젝트를 생성해주고, 위 의존성들을 추가해주었다.
외부 라이브러리를 보면 reactor-core 뿐만 아니라, reactor-core가 의존하는 reactive-streams도 보인다.
Reactor에서 제공하는 두가지 Publisher는 위에서 언급했던 두가지이다.
- Flux
- onNext(~N) : 데이터 개수만큼 호출해서 데이터전달
- onComplete / onError : 해당스트림 종료
- Mono : 단일값 처리 or 예외처리, 함수는 Flux와 같다.
이제 Flux와 Mono를 사용해보기전에, Reactive Stream에서는 Publisher만 한다고 동작하지 않고, Subscriber까지 구현해줘야 동작한다!
public class Publisher {
public Flux<Integer> startFlux(){
return Flux.range(1,10).log();
}
}
log() 메서드를 통해 어느 스레드에서 동작했고, publisher 와 subscriber 가 어떤 메서드를 주고받았는지 확인할수 있다.
public class Main {
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.startFlux()
.subscribe(System.out::println);
System.out.println("Hello world!");
}
}
간단하게 publisher에서 생성한 데이터를 단순하게 subscriber에서 받아서 출력하는 코드이다.
위 로그에서 unbounded는 제한없이 데이터를 요청한다는 의미이다.
이제 Mono에 대해서 보자.
public Mono<Integer> startMono(){
return Mono.just(1).log();
}
publisher.startMono()
.subscribe();
하나의 데이터만 전달하는 것을 확인할수 있고, 아무것도 없는 데이터에 대해서도 정상적으로 작동하는지 확인해보자.
public Mono<?> startMono2(){
return Mono.empty().log();
}
데이터를 전달하지 않았으므로 onNext()를 실행하지 않았고, 이것도 잘 실행된다.
Reactor test
Reactor에서는 자동화된 테스트를 작성할 수있는 라이브러리를 제공한다.
특히 시나리오 검증에 관련된 StepVerifier 를 제공하는데 이것에 대해서 살펴보자.
이것 또한 위에 링크했던 Project Reactor 공식문서에 나와있다.
의존성 추가의 부분이다.
dependencies {
testImplementation 'io.projectreactor:reactor-test'
}
예상 시나리오 부분이다.
- appendBoomError()라는 구현된 메서드에서 Flux를 인자로 받아 concatWith로 Mono.error()를 추가해서 반환한다.
- 테스트코드에서는 StepVerifier에서 create()를 통해 위 메서드를 인자로 포함시킨다.
- expectNext()로 검증할 값들을 세팅한다.
- 에러를 받을 경우 expectErrorMessage()를 통해서 에러를 발생시킨다.
- verify()를 호출하므로써 전체 테스트를 진행한다.
전반적인 흐름을 가지고 위에서 Flux와 Mono에 대해 연습한 코드를 테스트 코드를 만들어보자.
publisher.java
public class Publisher {
public Flux<Integer> startFlux(){
return Flux.range(1,10).log();
}
public Flux<String> startFlux2(){
return Flux.fromIterable(List.of("a","b","c","d")).log();
}
public Mono<Integer> startMono(){
return Mono.just(1).log();
}
public Mono<?> startMono2(){
return Mono.empty().log();
}
public Mono<?> startMono3(){
return Mono.error(new Exception("hello reactor")).log();
}
}
- range(시작, 끝)
- fromIterable(리스트)
- just(값, 값, ...)
- empty() : 아무 데이터도 전달 안하는것
- error(Exception객체) : 에러가 발생하는것
publisherTest.java
class PublisherTest {
private Publisher publisher = new Publisher();
@Test
void startFlux() {
StepVerifier.create(publisher.startFlux())
.expectNext(1,2,3,4,5,6,7,8,9,10)
.verifyComplete();
}
@Test
void startMono() {
StepVerifier.create(publisher.startMono())
.expectNext(1)
.verifyComplete();
}
@Test
void startMono2() {
StepVerifier.create(publisher.startMono2())
.verifyComplete();
}
@Test
void startMono3() {
StepVerifier.create(publisher.startMono3())
.expectError(Exception.class)
.verify();
}
@Test
void startFlux2() {
StepVerifier.create(publisher.startFlux2())
.expectNext("a","b","c","d")
.verifyComplete();
}
}
이렇게 StepVerifier로 편리하게 Flux와 Mono에 대해서 테스트를 진행할 수 있다.
Operator
Flux / Mono의 데이터를 가공하거나, 집계, 필터링 할수 있는 함수형 연산자이다. 결국에는 Stream의 데이터를 어떻게 처리하느냐의 관점에서 바라봐야한다. 먼저 대표적인 4개의 Operator이다.
- map : Stream데이터에 대해 1:1 변환하는 것이다.
- filter : Stream데이터에 대해 조건에 맞게 걸러내는것이다.
- take : Stream데이터중 일부만 사용하게 된다.
- flatMap : 1개의 데이터를 1개이상의 Stream으로 변환한다.(비동기로 동작 → 순서보장 안됨)
Operator1.java
public class Operator1 {
public Flux<Integer> fluxMap(){
return Flux.range(1,5)
.map(i -> i*2)
.log();
}
public Flux<Integer> fluxFilter(){
return Flux.range(1,10)
.filter(i -> i > 5)
.log();
}
public Flux<Integer> fluxTake(){
return Flux.range(1,10)
.filter(i -> i > 5)
.take(3)
.log();
}
public Flux<Integer> fluxFlatMap(){
return Flux.range(1,10)
.flatMap(i -> Flux.range(i*10, 10))
.log();
}
public Flux<Integer> fluxFlatMap2(){
return Flux.range(1,9)
.flatMap(i -> Flux.range(1, 9)
.map(j ->{
System.out.printf("%d * %d = %d\n",i,j,i*j);
return i*j;
}));
}
}
Operator1Test.java
class Operator1Test {
private Operator1 operator1 = new Operator1();
@Test
void fluxMap() {
StepVerifier.create(operator1.fluxMap())
.expectNext(2,4,6,8,10)
.verifyComplete();
}
@Test
void fluxFilter() {
StepVerifier.create(operator1.fluxFilter())
.expectNext(6,7,8,9,10)
.verifyComplete();
}
@Test
void fluxTake() {
StepVerifier.create(operator1.fluxTake())
.expectNext(6,7,8)
.verifyComplete();
}
@Test
void fluxFlatMap() {
StepVerifier.create(operator1.fluxFlatMap())
.expectNextCount(100)
.verifyComplete();
}
@Test
void fluxFlatMap2() {
StepVerifier.create(operator1.fluxFlatMap2())
.expectNextCount(81)
.verifyComplete();
}
}
fluxFlatMap2()에서는 flatMap과 map을 연계해서 구구단을 계산하는 로직을 만들었다.
이제 다음 연산자 4가지를 살펴보자.
- concatMap : flatMap과 비슷하다. 차이점은 Sync(동기)적으로 작동한다는것이다. 순서가 중요한 작업일때 사용한다.
- flatMapMany : Mono의 단일아이템에 대해 Flux로 변환할때 사용
- switchIfEmpty / defaultIfEmpty : 여러가지 연산자를 통해 empty일 경우 어떡할건지 도와주는 연산자
- switchIfEmpty : 새로운 Publisher로 시작할수 있게 해준다. 에러 내지 혹은 콜백으로 새로운 값을 정의내릴수 있다.
- defaultIfEmpty : 콜백을 위한 값만 설정할수 있다.
- merge / zip : 여러개의 스트림을 하나로 모아준다.
- zip : 각 Publisher의 인덱스가 동일 한것끼리 합쳐준다.
- mono의 경우 merge가 아닌 mergeWith() 메서드를 이용한다.
operator2.java
public class Operator2 {
public Flux<Integer> fluxConcatMap(){
return Flux.range(1,10)
.concatMap(i -> Flux.range(i*10, 10)
.delayElements(Duration.ofMillis(100)))
.log();
}
public Flux<Integer> monoFlatMapMany(){
return Mono.just(10)
.flatMapMany(i -> Flux.range(1,i))
.log();
}
public Mono<Integer> defalutIfEmpty1(){
return Mono.just(100)
.filter(i -> i > 100)
.defaultIfEmpty(30);
}
public Mono<Integer> switchIfEmpty1(){
return Mono.just(100)
.filter(i -> i > 100)
.switchIfEmpty(Mono.just(30).map(i -> i*2));
}
public Mono<Integer> switchIfEmpty2(){
return Mono.just(100)
.filter(i -> i > 100)
.switchIfEmpty(Mono.error(new Exception("no value")));
}
public Flux<String> fluxMerge(){
return Flux.merge(Flux.fromIterable(List.of("1","2","3")), Flux.just("4"))
.log();
}
public Flux<String> monoMerge(){
return Mono.just("1").mergeWith(Mono.just("2")).mergeWith(Mono.just("3"));
}
public Flux<String> fluxZip(){
return Flux.zip(Flux.just("a","b","c"), Flux.just("d","e","f"))
.map(i -> i.getT1() + i.getT2())
.log();
}
public Mono<Integer> monoZip(){
return Mono.zip(Mono.just(1), Mono.just(2), Mono.just(3))
.map(i -> i.getT1() + i.getT2() + i.getT3());
}
}
fluxConcatMap()메서드에서 delayElements() 메서드는 flatMap()메서드와의 속도차이비교, 순서 비교를 위해 각 요소들간의 반환되는 시간의 완급을 조절해준것이고, 테스트 결과 flatMap이 훨씬 빠르지만 순서가 없이 랜덤하게 리턴하는 것을 확인할수 있었다.
operator2Test.java
class Operator2Test {
private Operator2 operator2 = new Operator2();
@Test
void fluxConcatMap() {
StepVerifier.create(operator2.fluxConcatMap())
.expectNextCount(100)
.verifyComplete();
}
@Test
void monoFlatMapMany() {
StepVerifier.create(operator2.monoFlatMapMany())
.expectNextCount(10)
.verifyComplete();
}
@Test
void defalutIfEmpty1() {
StepVerifier.create(operator2.defalutIfEmpty1())
.expectNext(30)
.verifyComplete();
}
@Test
void switchIfEmpty1() {
StepVerifier.create(operator2.switchIfEmpty1())
.expectNext(60)
.verifyComplete();
}
@Test
void switchIfEmpty2() {
StepVerifier.create(operator2.switchIfEmpty2())
.expectError()
.verify();
}
@Test
void fluxMerge() {
StepVerifier.create(operator2.fluxMerge())
.expectNext("1","2","3","4")
.verifyComplete();
}
@Test
void monoMerge() {
StepVerifier.create(operator2.monoMerge())
.expectNext("1","2","3")
.verifyComplete();
}
@Test
void fluxZip() {
StepVerifier.create(operator2.fluxZip())
.expectNext("ad","be","cf")
.verifyComplete();
}
@Test
void monoZip() {
StepVerifier.create(operator2.monoZip())
.expectNext(6)
.verifyComplete();
}
}
다음으로 또 집계할수있는 명령어를 알아보자.
- count : 요소의 개수를 단일값으로 리턴한다.
- distinct : 중복 요소를 찾아서 삭제한다.
- reduce : 연속적인 숫자를 더할때 용이하다.
- groupby : 동일한 값에 대해서 묶어서 처리할수 있게 한다.
Operator3.java
public class Operator3 {
public Mono<Long> fluxCount(){
return Flux.range(1, 10)
.count()
.log();
}
public Flux<String> fluxDistinct(){
return Flux.fromIterable(List.of("a","b","a","b","c"))
.distinct();
}
public Mono<Integer> fluxReduce(){
return Flux.range(1,10)
.reduce((i, j) -> i+j)
.log();
}
public Flux<Integer> fluxGroupBy(){
return Flux.range(1, 10)
.groupBy(i -> (i%2 == 0) ? "even" : "odd")
.flatMap(group -> group.reduce((i,j) -> i+j))
.log();
}
}
Operator3Test.java
class Operator3Test {
private Operator3 operator3 = new Operator3();
@Test
void fluxCount() {
StepVerifier.create(operator3.fluxCount())
.expectNext(10L)
.verifyComplete();
}
@Test
void fluxDistinct() {
StepVerifier.create(operator3.fluxDistinct())
.expectNext("a","b","c")
.verifyComplete();
}
@Test
void fluxReduce() {
StepVerifier.create(operator3.fluxReduce())
.expectNext(55)
.verifyComplete();
}
@Test
void fluxGroupBy() {
StepVerifier.create(operator3.fluxGroupBy())
.expectNext(30)
.expectNext(25)
.verifyComplete();
}
}
여기서부터는 back pressure와 관련된 명령어이다.
- delaySequence / limitRate
- delaySequence : (limitRate() - 몇개씩 전달?) 와 함께 사용되며 전달되는 속도를 조절해준다.
- sample : 설정한 시간동안 받은 데이터중 일부만 전달한다.
Operator4.java
public class Operator4 {
public Flux<Integer> fluxDelayAndLimit(){
return Flux.range(1,10)
.delaySequence(Duration.ofSeconds(1))
.log()
.limitRate(2);
}
public Flux<Integer> fluxSample(){
return Flux.range(1,100)
.delayElements(Duration.ofMillis(100))
.sample(Duration.ofMillis(300))
.log();
}
}
Operator4Test.java
class Operator4Test {
private Operator4 operator4 = new Operator4();
@Test
void fluxDelayAndLimit() {
StepVerifier.create(operator4.fluxDelayAndLimit())
.expectNext(1,2,3,4,5,6,7,8,9,10)
.verifyComplete();
}
@Test
void fluxSample() {
StepVerifier.create(operator4.fluxSample())
.expectNextCount(1000)
.verifyComplete();
}
}
로그를 보면 sample은 값을 일부만 보내서 테스트 코드에서 에러가 뜨는것을 확인할수 있다.
Schedulers
여러가지 Operator에 대해서 알아봤고, 이제는 비동기 작업을 실행하고 관리해주는 Scheduler를 알아야한다.
스케쥴러 또한 다양한 타입으로 제공한다. 스레드 풀 관리와 동시성 제어에 사용된다.
- Schedulers.immediate() : 현재 스레드에서 동기적인 작업 실행
- Schedulers.single() : 단일 백그라운드 스레드에서 작업 실행
- Schedulers.parallel() : 병렬로 사용할수 있는 별도의 스레드 풀, CPU 개수에 맞게 고정크기로 만들어놓음
- Schedulers.boundedElastic() : 크기에 제한이 있지만 유연하게 늘어나고, 줄어들수있는 스레드풀
위와 같이 어떻게 설정할수 있느냐?
- publishOn : 해당 메서드를 실행한 이후 설정한 스케쥴러를 반영
- subscribeOn : 설정한 스케쥴러를 전체반영
위 코드를 가지고 위에서 설명한 타입들을 매개변수로 설정할수 있다.
Scheduler1.java
public class Scheduler1 {
public Flux<Integer> fluxMapWithSubscribeOn(){
return Flux.range(1,10)
.map(i -> i*2)
.subscribeOn(Schedulers.boundedElastic())
.log();
}
public Flux<Integer> fluxMapWithPublishOn(){
return Flux.range(1,10)
.map(i -> i + 1)
.log()
.publishOn(Schedulers.boundedElastic())
.log()
.map(i -> i*2);
}
}
Scheduler1Test.java
class Scheduler1Test {
private Scheduler1 scheduler1 = new Scheduler1();
@Test
void fluxMapWithSubscribeOn() {
StepVerifier.create(scheduler1.fluxMapWithSubscribeOn())
.expectNextCount(10)
.verifyComplete();
}
@Test
void fluxMapWithPublishOn() {
StepVerifier.create(scheduler1.fluxMapWithPublishOn())
.expectNextCount(10)
.verifyComplete();
}
}
- subscribeOn 테스트 코드의 실행결과
기본 스레드는 Test worker 스레드인데, 로그 정보를 보면 boundedElastic-1 이라는 별도의 스레드에서 동작 하는것을 확인할 수있다.
- publishOn 테스트 코드의 실행결과
publishOn()메서드 실행 이후에 값들에 대해서만 스케쥴러 설정에대한 스레드에서 동작하는 것을 확인할수 있고, 그 이전 로그 정보들은 기본 스레드인 Test worker에서 실행되는것을 확인할수 있다.
휴,,, 이렇게 Spring Webflux에서 사용되는 Project Reactor와 Publisher ↔ Subscriber 간의 데이터 전달이 어떻게 이루어지는지, 이것에 대한 테스트는 어떻게 하는지, Stream의 종류인 Flux와 Mono와 이것을 다루기 위한 연산자는 무엇이 있는지, 이러한 비동기작업들을 어떤 정책으로 관리할것인지에 대한 Scheduler 까지 직접 코드를 치며 알아봤다.
'Spring > 대용량 트래픽' 카테고리의 다른 글
Spring Webflux에서 WebClient 사용하기 (0) | 2024.07.03 |
---|---|
Spring Webflux 사용해보기 (0) | 2024.05.29 |
Spring MVC와 Webflux (0) | 2024.05.22 |
sync/async 와 block/non-block (0) | 2024.05.21 |
CPU Bound vs I/O Bound (1) | 2024.05.21 |