2024.05.22 - [Spring/대용량 트래픽] - Spring Webflux Reactor와 다양한 연산자
이전글에서 Stream의 종류인 Flux와 Mono에 대한 연산자를 Publisher와 Subscriber의 관점에서 알아봤다.
이제 진짜로 Spring Webflux를 사용해볼건데 어떤기능을 제공하는지부터 알고가야한다.
- Controller : 2가지를 제공한다.
- functional endpoint
- annotation endpoint : Spring MVC에서 많이 보던것
- Service
- Repository
그리고 위의 기능들을 사용해볼건데, 준비물은 당연하지만 스프링 프로젝트와 Spring Reactive Web의 의존성이다.
어떻게 동작하는지 관찰하기 위해서 application.yaml 파일에 설정 log를 DEBUG로 해줬다.
application.yaml
logging:
level:
root: DEBUG
전반적인 설정은 끝냈고, 기능 하나하나를 파헤쳐보자!
functional endpoint
RouteConfig.java
@Configuration
@RequiredArgsConstructor
public class RouteConfig {
private final SampleHandler sampleHandler;
@Bean
public RouterFunction<ServerResponse> route(){
return RouterFunctions.route()
.GET("/hello-functional",sampleHandler::getString)
.build();
}
}
- route() : 함수를 호출한 이후 HTTP메서드에 관해 설정할수 있다.
- endPoint와 Handler 설정을 인자로 넣어줘야한다.
SampleHandler.java
@Component
public class SampleHandler {
public Mono<ServerResponse> getString(ServerRequest request){
return ServerResponse.ok().bodyValue("hello, functional endpoint");
}
}
간단하게 문자결과값을 리턴하는 컨트롤러를 구현했다.
annotation endpoint
기존 Spring MVC에서 했던대로 하면된다.
SampleController.java
@RestController
public class SampleController {
@GetMapping("sample/hello")
public Mono<String> getHello(){
return Mono.just("hello rest controller with webflux");
}
}
- ServerResponse의 형태를 리턴하는것을 어노테이션에서 기능을 제공하고 있으므로, 반환하고 싶은형을 리턴하면 된다.
그런데 조금 이상한점이있다. 위 코드는 Publlisher에 해당하는 Mono를 생산만하고 있어서 Subscriber가 없다. 이전글에서 reactor에 대해 볼때, 단순히 Publisher만 있고 Subscriber가 없으면 동작하지 않는다고 배웠는데, 이게 잘 돌아갈까?
잘 돌아간다.
왜? Spring Webflux자체적으로 별도의 구독을 하고 있기때문이다!
Service, Repository
정말 간단하게 CRUD API에 대해서 다루겠다.
사용자 정보에 대해서 다룬다고 가정하면 기본적으로 사용자 엔티티가 있어야한다.
User.java
@Data
@Builder
@AllArgsConstructor
public class User {
private Long id;
private String name;
private String email;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}
이제 DB에 연동하기 위한 repository에 함수를 직접 짠다.
UserRepository.java
public interface UserRepository {
Mono<User> save(User user);
Flux<User> findAll();
Mono<User> findById(Long id);
Mono<Integer> deleteById(Long id);
}
UserRepositoryImpl.java
@Repository
public class UserRepositoryImpl implements UserRepository{
private final ConcurrentHashMap<Long, User> userHashMap = new ConcurrentHashMap<>();
private AtomicLong sequence = new AtomicLong(1L);
@Override
public Mono<User> save(User user) {
LocalDateTime now = LocalDateTime.now();
if(user.getId() == null){
user.setId(sequence.getAndAdd(1));
user.setCreatedAt(now);
}
user.setUpdatedAt(now);
userHashMap.put(user.getId(), user);
return Mono.just(user);
}
@Override
public Flux<User> findAll() {
return Flux.fromIterable(userHashMap.values());
}
@Override
public Mono<User> findById(Long id) {
return Mono.justOrEmpty(userHashMap.getOrDefault(id, null));
}
@Override
public Mono<Integer> deleteById(Long id) {
User user = userHashMap.getOrDefault(id, null);
if(user == null){
return Mono.just(0);
}
userHashMap.remove(id, user);
return Mono.just(1);
}
}
Spring Data라는 편리한 기능을 쓰는게 아니므로, 내가 직접 자료구조를 만들어줘야한다.
특히 Webflux라는 비동기처리로직을 짤때는 데이터의 일관성이 중요하므로 그에 맞는 자료구조를 쓰는것이좋다.
사용자는 ID와 그 이외의 정보들로 이루어져있으므로, map이 좋은 자료구조이다. 그러나 일반적인 HashMap은 동기적임을 보장할때 쓰는것이고, 비동기로직처리는 아래 자료구조를 쓴다.
- ConcurrentHashMap : Thread safe 하기 때문에 멀티 스레드 환경에서도 안전하고, 각 entry마다 lock을 걸어서 consistent(일관성)을 유지한다.
- save() : 멀티스레딩의 save() 상황에서 ID가 없는 유저의 정보를 저장할때, atomic하게 새로운 ID를 자동으로 부여해주기 위해서 AtomicLong을 사용한다.
- put() : ConcurrentHashMap에서의 put()은 원래 있던 데이터에 추가하는것이 아니라 아예 값자체를 교체한다.
- values() : ConcurrentHashMap에서 Key가 있는 값들에대해 Collection형으로 묶어서 리턴한다.
- 반복조회할수 있는 자료형이므로 Flux의 fromIterable을 이용해서 리턴해준다.
Service단을 만들러 가자!
근데 자꾸 Mono랑 Flux형으로 리턴하는 이유? 우리가 작성하는 코드가 Reactor를 기반으로 사용하는 Spring Webflux이기도 하고, 내가 하고있는 것이 Webflux내에서 Subscribe를 하고있기때문에, 결국 Controller에서 리턴되는 값이 Publisher값이 나와야 되기 때문이다.
UserService.java
@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
public Mono<User> create(String name, String email){
return userRepository.save(User.builder().name(name).email(email).build());
}
public Flux<User> findAll(){
return userRepository.findAll();
}
public Mono<User> findById(Long id){
return userRepository.findById(id);
}
public Mono<Integer> deleteById(Long id){
return userRepository.deleteById(id);
}
public Mono<User> update(Long id, String name, String email){
userRepository.findById(id)
.flatMap(u -> {
u.setName(name);
u.setEmail(email);
return userRepository.save(u);
});
}
}
update를 빼고는 그냥 단순하게 repository에서 만들어준 함수를 써서 기능을 구현했다.
- flatMap() : 이전글에서 살펴봤던 연산자이다. map말고 이걸 쓴이유는?
- 위에 repository에서 save()함수를 보면 Mono형을 리턴하고 있는데, map()을 쓰면 Mono<Mono<User>>형으로 두 번 감싸서 나오게 되어 원하지 않는 형이 리턴된다. flatMap()을 쓰면 조건에 맞는 새로운 Steam을 만들어내는 것이므로 Mono<User>이 리턴되어 원하는값이 나온다.
Controller
UserController
@RestController
@RequiredArgsConstructor
@RequestMapping("/users")
public class UserController {
private final UserService userService;
@PostMapping("")
public Mono<UserResponse> createUser(@RequestBody UserCreateRequest userCreateRequest){
return userService.create(userCreateRequest.getName(), userCreateRequest.getEmail())
.map(UserResponse::of);
}
@GetMapping("")
public Flux<UserResponse> findAllUsers(){
return userService.findAll()
.map(UserResponse::of);
}
@GetMapping("/{id}")
public Mono<ResponseEntity<UserResponse>> findUser(@PathVariable Long id){
return userService.findById(id)
.map(u -> ResponseEntity.ok(UserResponse.of(u)))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
@DeleteMapping("/{id}")
public Mono<ResponseEntity<?>> deleteUser(@PathVariable Long id){
//정상응답 205가 아니라 204 (no content) 응답으로 주고 싶다.
return userService.deleteById(id).then(
Mono.just(ResponseEntity.noContent().build())
);
}
@PutMapping("/{id}")
public Mono<ResponseEntity<UserResponse>> updateUser(@PathVariable Long id, @RequestBody UserUpdateRequest request){
return userService.update(id, request.getName(), request.getEmail())
.map(u -> ResponseEntity.ok(UserResponse.of(u)))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
}
createUser
deleteUser
HTTP요청에 대한 응답값중 200대의 숫자가 응답을 성공적으로 했다는 것은 알것이다. 205의 경우 객체를 담아서 리턴한것을 의미하는데, 그렇다면 객체가 없이 비어있다고 응답을 받기위해선 어떻게 해야할까?
ResponseEntity클래스의 noContent()함수를 사용해서 요청에 성공했을때 응답으로 204(no content)를 설정할 수있다.
findUser, updateUser
기존 있던 사용자를 검색, 업데이트하는것은 문제없지만, 존재하지 않는 사용자에 대해서 업데이트 요청을 하면 어떻게 할까?
200번대 성공 메세지가 아니라 400번대 클라이언트에서 잘못된 요청을 했다는 메세지를 받아야한다.
Repository에서 아이디를 검색하는 부분을 봐야하는데 다시 한번 함수를보자
@Override
public Mono<User> findById(Long id) {
return Mono.justOrEmpty(userHashMap.getOrDefault(id, null));
}
없는 사용자라면 이 함수에서 null값을 반환할거다. 뭔가 이전글에서 봤던 명령어를 활용할수 있을거 같다.
switchIfEmpty, defaultIfEmpty, filter등을 활용해서 로직을 처리했다.
이렇게 CRUD가 가능한 RestfulAPI를 만들어봤다.
이제 Postman말고 테스트코드를 만들어보자.
WebFluxTest
실제로는 코드를 작성해서 개발을 하는것도 중요하지만, Test의 비중이 크고 그에 따라 테스트코드 짜는 방법을 잘 알아두는 것이 좋다고 한다. 테스트 하기에 앞서서 용어부터 알아보고 들어가자.
- @WebFluxTest : Spring Webflux에서 지원하는 테스트용 어노테이션이다. 컨트롤러와 관련된 것만 빈으로 등록하고, Service, Repository등 관련 없는 레이어는 빈으로 등록하지 않고 의존성을 끊는다.
- WebTestClient : Spring Webflux에서 지원하는 테스트용 웹클라이언트 객체이다. 뒤에 HTTP 메서드등 원하는 요청을 설정할수 있다.
- exchange : 요청설정에서 응답설정으로 전환해주는 함수다.
- Mockito : 테스트를 하기위한 가짜객체를 만드는 Mocking FrameWork이다.
- @MockBean : 가짜빈이라고 생각하면된다. 통합테스트를 실행할때 사용되고, @WebFluxtest는 빈으로 등록해서 사용하는 통합테스트임으로 같이 사용해줘야한다.
- when : 의미그대로 Mock객체의 행동을 설정해주는 함수다. (~했을때에 대응시키면 된다.)
- thenAnswer : when과 함께 쓰이며 이 함수를 써도 되고 세분화해서 밑에 두개의 형식으로 써도된다. when 상황에서 어떤 것을 하는지에 대한 설정을 하는 함수다.
- thenReturn
- thenThrow
이렇게 쓰이는 어노테이션이나 함수를 보고 든 생각은 왜 when을 써주는거지이다. 조금만 생각해보면 @WebFluxTest로 만들어둔 Controller만 빈으로 등록하고, 나머지 빈들은 연결이 끊어진다.
Ex. 컨트롤러를 테스트한다면 Service단과, Repository단은 거의 필수로 쓰일건데 위 어노테이션을 쓴상태에서는 연결이 없으므로 쓰지못한다. 따라서 임의로 직접 when을 통해서 구현해주는 것이다.
UserControllerTest.java
createUser
@WebFluxTest(UserController.class)
@AutoConfigureWebTestClient
class UserControllerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private UserService userService;
@Test
void createUser() {
when(userService.create("uni1", "uni1@gmail.com")).thenReturn(
Mono.just(new User(1L, "uni1", "uni1@gmail.com", LocalDateTime.now(),LocalDateTime.now()))
);
webTestClient.post().uri("/users")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(new UserCreateRequest("uni1", "uni1@gmail.com"))
.exchange()
.expectStatus().is2xxSuccessful()
.expectBody(UserResponse.class)
.value(res -> {
assertEquals("uni1", res.getName());
assertEquals("uni1@gmail.com", res.getEmail());
});
}
findAllUsers
@Test
void findAllUsers() {
when(userService.findAll()).thenReturn(
Flux.just(
new User(1L, "uni1", "uni1@gmail.com", LocalDateTime.now(),LocalDateTime.now()),
new User(2L, "uni2", "uni2@gmail.com", LocalDateTime.now(),LocalDateTime.now()),
new User(3L, "uni3", "uni3@gmail.com", LocalDateTime.now(),LocalDateTime.now())
)
);
webTestClient.get().uri("/users")
.exchange()
.expectStatus().is2xxSuccessful()
.expectBodyList(UserResponse.class)
.hasSize(3);
}
findUser
@Test
void findUser() {
when(userService.findById(1L)).thenReturn(
Mono.just(new User(1L, "uni1", "uni1@gmail.com", LocalDateTime.now(),LocalDateTime.now())
));
webTestClient.get().uri("/users/1")
.exchange()
.expectStatus().is2xxSuccessful()
.expectBody(UserResponse.class)
.value(res -> {
assertEquals("uni1", res.getName());
assertEquals("uni1@gmail.com", res.getEmail());
}
);
}
유저가 없는데 유저를 검색한 경우
@Test
void notFoundUser() {
when(userService.findById(1L)).thenReturn(Mono.empty());
webTestClient.get().uri("/users/1")
.exchange()
.expectStatus().is4xxClientError();
}
deleteUser
@Test
void deleteUser() {
when(userService.deleteById(1L)).thenReturn(Mono.just(1));
webTestClient.delete().uri("/users/1")
.exchange()
.expectStatus().is2xxSuccessful();
}
updateUser
@Test
void updateUser() {
when(userService.update(1L, "uni1", "uni1@gmail.com")).thenReturn(
Mono.just(new User(1L, "uni1", "uni1@gmail.com", LocalDateTime.now(),LocalDateTime.now()))
);
webTestClient.put().uri("/users/1")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(new UserCreateRequest("uni1", "uni1@gmail.com"))
.exchange()
.expectStatus().is2xxSuccessful()
.expectBody(UserResponse.class)
.value(res -> {
assertEquals("uni1", res.getName());
assertEquals("uni1@gmail.com", res.getEmail());
});
}
Repository Test
이어서 Repository 테스트도 진행한다.
Repository는 Controller와 달리 Repository자체의 기능을 테스트하는것이 중요하기 때문에 이전글에서 봤던 Reactor 테스트에서 썼던 StepVerifier를 써서 테스트한다.
save
@Test
void save() {
User user = User.builder().name("uni1").email("uni1@gmail.com").build();
StepVerifier.create(userRepository.save(user))
.assertNext(u -> {
assertEquals(1L, u.getId());
assertEquals("uni1", u.getName());
assertEquals("uni1@gmail.com", u.getEmail());
})
.verifyComplete();
}
findAll
@Test
void findAll() {
userRepository.save(User.builder().name("uni1").email("uni1@gmail.com").build());
userRepository.save(User.builder().name("uni2").email("uni2@gmail.com").build());
userRepository.save(User.builder().name("uni3").email("uni3@gmail.com").build());
StepVerifier.create(userRepository.findAll())
.expectNextCount(3)
.verifyComplete();
}
findById
@Test
void findById() {
userRepository.save(User.builder().name("uni1").email("uni1@gmail.com").build());
userRepository.save(User.builder().name("uni2").email("uni2@gmail.com").build());
StepVerifier.create(userRepository.findById(2L))
.assertNext(u -> {
assertEquals(2L, u.getId());
assertEquals("uni2", u.getName());
})
.verifyComplete();
}
deleteById
@Test
void deleteById() {
userRepository.save(User.builder().name("uni1").email("uni1@gmail.com").build());
userRepository.save(User.builder().name("uni2").email("uni2@gmail.com").build());
StepVerifier.create(userRepository.deleteById(2L))
.expectNextCount(1)
.verifyComplete();
}
이렇게 Spring MVC와 같이 어노테이션기반의 Spring Webflux로 구독자가 명확히 없는 Restful API를 만들고, 테스트까지 상세히 해봤다. 다음 글은 실제 외부 서비스 요청을 할수 있는 WebClient에 대해 다뤄보자!
2024.07.03 - [Spring/대용량 트래픽] - Spring Webflux에서 WebClient 사용하기
'Spring > 대용량 트래픽' 카테고리의 다른 글
R2DBC 알아보기!! (0) | 2024.07.03 |
---|---|
Spring Webflux에서 WebClient 사용하기 (0) | 2024.07.03 |
Spring Webflux Reactor와 다양한 연산자 (0) | 2024.05.22 |
Spring MVC와 Webflux (0) | 2024.05.22 |
sync/async 와 block/non-block (0) | 2024.05.21 |