빅데이터를 다루는 데이터 허브 시스템을 개발했다.
물론 굳이 따지자면 개발한 건 아니고, 과거의 누군가 만들어놓은 시스템을 가져와 고객의 입맛에 맞게 기능을 추가 및 수정하고 잡다한 에러를 수정했다.
어쨌든 이 시스템을 고치면서 한 가지 의문점이 들었다.
"이거 왜 Webflux가 아니라 MVC로 개발했지?"
빅데이터를 다루는 시스템이다보니, 여러 데이터 소스에서 데이터가 유입된다. 물론 사업 관련이라 자세한 이야기를 기재할 수 없지만 데이터 소스가 그렇게 많지 않다는 고객의 요구사항에서는 크게 상관 없는 부분이긴 하다.
하지만 그래도 명색에 "빅데이터를 다루는 시스템"인데 수많은 데이터 소스에서 데이터가 들어올 수 있는 것을 고려하면 당장은 아니더라도 추후 Webflux로 재개발하는 것이 맞을 것 같다는 생각이 들었다.
그런데 여기까지 생각이 미치자 또 생각이 확장되었다.
"그런데 MVC랑 Webflux랑 명확하게 차이가 뭐야?"
궁금한 건 못참고, 또 알아두면 분명 두고두고 도움이 될 것 같아서 위 의문에 대해 한 번 공부를 해볼까 한다.
동기/블로킹과 비동기/논블로킹
동기식(Synchronous)는 요청을 보낸 후 해당 요청에 대한 응답을 받아야 다음 동작을 실행하는 방식을 의미한다. 즉 작업이 직렬로 배치되어 실행되고, 작업의 실행 순서가 고정된 방식이다.
그리고 블로킹(Blocking)은 다른 함수 호출 시 제어권을 넘기고, 실행중이던 함수는 호출한 함수로부터 return을 받을 때까지 멈추는 방식을 의미한다.
비동기식(Asynchronous)는 요청을 보낸 후 응답과 관계없이 다음 동작을 실행하는 방식을 의미한다. 즉 작업이 병렬로 배치되고, 작업의 실행 순서가 고정되지 않은 방식이다.
그리고 논블로킹(Non-Blocking)은 다른 함수 호출 시에도 제어권을 넘기지 않고, 계속해서 원래 함수를 실행시키는 방식이다.
Spring MVC와 Spring Webflux
우선 Spring MVC는 전통적인 서블릿 기반의 구조를 사용하며, 일반적으로 블로킹 방식으로 요청을 처리하는 동기식 방식이다.
서블릿 기반 구조
Java 웹 어플리케이션 개발에 사용되는 기술로, HTTP 요청을 받아 HTTP 응답을 반환하는 역할을 한다.
Tomcat, Jetty 같은 "서블릿 컨테이너"라는 환경에서 실행되며, 웹 브라우저나 클라이언트의 요청을 HttpServletRequest 형태로 받아 HttpServletResponse 형태로 돌려준다.
Spring MVC는 일반적으로 요청을 받을 경우 한 스레드가 해당 요청이 끝날 때까지 블로킹된다. 즉, 요청이 완료될 때까지 해당 스레드는 대기 상태가 되어 묶이고, 다른 요청을 처리할 수 없다. 이를 Thread Per Request라고 한다.
그렇다면 100개의 가용 스레드가 존재하는 MVC 어플리케이션에 1,000명의 사용자가 동시에 접속하면 어떻게 될까? 당연히 그만큼 대기 시간이 길어질 것이다.
*참고로 Tomcat에 아무런 옵션도 주지 않을 경우 스레드 풀 내 기본 스레드 수는 200개이다.
이러한 대용량 트래픽을 감당하기 위해 등장한 것이 Spring Webflux이다. Spring Webflux는 비동기, 논블로킹 방식의 IO를 사용한다. 스레드가 요청이 끝날 때까지 블로킹되지 않으며, 이로 인해 대용량 트래픽을 안정적으로 처리할 수 있는 기술이다.
*참고로 Webflux의 스레드 수는 CPU 코어 수의 두 배이다.
그러면 어떻게 Webflux 같은 비동기 방식은 한정된 스레드로 여러 요청을 처리할 수 있는 것일까? 이를 알아보기 위해서는 이벤트 루프 (Event Loop)에 대해 알아볼 필요가 있다.
이벤트 루프 (Event Loop)는 프로그램이 외부의 이벤트를 기다리고, 해당 이벤트가 발생했을 때 미리 정의된 콜백 함수를 실행하는 역할을 의미한다.
이벤트 루프는 이벤트(파일 읽기나 HTTP 요청 등)를 등록하고, 이벤트가 발생할 때까지 대기한 후, 이벤트 발생 시 미리 정의된 콜백 함수를 실행하는 형식으로 동작한다.
Webflux가 동일한 양의 스레드로 MVC보다 많은 양의 요청을 처리할 수 있는 것은 이러한 이벤트 루프를 사용하기 때문이다. 요청 및 내부 작업을 이벤트 단위로 관리하며, 이는 이벤트 큐에 적재되어 차례대로 수행된다. 그리고 이벤트를 처리하는 스레드 풀이 별도로 존재하며, 이 풀이 바로 이벤트 루프이다.
필요할 때만 이벤트 루프가 스레드에게 이벤트를 할당하고, 기타 외부 API 호출 및 IO 작업에서는 스레드가 대기상태가 아니라 다른 작업을 수행할 수 있는 상태가 되기 때문에 보다 많은 양의 요청을 처리할 수 있는 것이다.
요약하면 두 모델 사이의 차이점을 요약하면 아래 표처럼 기재할 수 있을 것 같다.
MVC | Webflux | |
모델 | 서블릿 구조 | 반응형 프로그래밍 |
동시성 | 한 스레드가 요청을 처리하는 동안 블로킹 | 비동기 및 논블로킹으로 동일한 수의 스레드로 더 많은 동시 연결 처리 가능 |
활용 | 블로킹 호출 또는 단순한 CRUD 어플리케이션에 사용 | 스트리밍 같은 고도의 동시적인 요청을 처리해야 하는 상황에 사용 |
테스트
물론 더 많은 내용이 있겠지만, 간단하게 개념 정도만 알기 위해서는 위의 내용 정도만 알아도 충분할 것 같다.
그렇다면 이제 직접적으로 차이점을 확인하기 위해 테스트를 진행해보자.
우선 동기와 비동기의 차이는 IO나 HTTP 요청에서 발생한다. 고로 아래와 같이 간단하게 HTTP 요청을 받아 잠시 지연을 시키는 API를 만들어 9999 포트에 띄워놓았다.
@RestController
public class ApiController {
@GetMapping("/hello")
public ResponseEntity<String> hello() throws InterruptedException {
// 지연을 주기 위해 3초간 대기
Thread.sleep(3000);
return ResponseEntity.ok("HELLO");
}
}
그리고 추가로 두 개의 프로젝트를 생성했다.
*하나의 프로젝트로 생성해도 무방하긴 하지만, MVC와 Webflux 의존성을 동시에 가진 프로젝트 생성 시 기본적으로 서블렛 기반으로 작동해서 이렇게 분리했다.
그리고 이제 앞서 실행한 9999포트로부터 API 요청을 받는 컨트롤러, 서비스, 그리고 이를 테스트하기 위한 테스트 코드를 작성한다.
우선 MVC 테스트용 프로젝트이다.
MvcController.java
package com.mwlee.test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RestController
public class MvcController {
@Autowired private MvcService testService;
@GetMapping("/testmvc")
public ResponseEntity<String> testMvc(@RequestParam Integer order) {
log.info("Order: " + order + "\t | MVC Controller Before Service Call :\t" + Thread.currentThread().getName());
String response = testService.callApiMvc(order);
log.info("Order: " + order + "\t | MVC Controller After Service Call :\t" + Thread.currentThread().getName());
return ResponseEntity.ok(response);
}
}
MvcService.java
package com.mwlee.test;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class MvcService {
private String targetApi = "http://localhost:9999";
public String callApiMvc(Integer order) {
log.info("Order: " + order + "\t | MVC Service Thread Before API Call :\t" + Thread.currentThread().getName());
RestTemplate restTemplate = new RestTemplate();
String response = restTemplate.getForObject(targetApi + "/hello", String.class);
log.info("Order: " + order + "\t | MVC Service Thread After API Call :\t" + Thread.currentThread().getName());
return response;
}
}
TestmvcApplicationTests.java
package com.mwlee.test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import lombok.extern.slf4j.Slf4j;
// 아래 랜덤포트 미지정 시 TestRestTemplate에서 Error creating bean 에러 발생
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
class TestmvcApplicationTests {
@Autowired private TestRestTemplate restTemplate;
private int CNT=3;
@Test
void contextLoads() throws InterruptedException {
ExecutorService executorServiceMvc = Executors.newFixedThreadPool(CNT);
CountDownLatch latchMvc = new CountDownLatch(CNT);
log.info("@@ MVC test start");
for(int i=1; i<=CNT; i++) {
final int cnt = i;
executorServiceMvc.execute(() -> {
String result = restTemplate.getForObject("/testmvc?order="+cnt, String.class);
assertEquals(result, "HELLO");
latchMvc.countDown();
});
}
latchMvc.await(); // 위 요청이 모두 끝날 때까지 대기
executorServiceMvc.shutdown();
}
}
다음은 Webflux 프로젝트의 컨트롤러, 서비스, 테스트 코드이다.
WfxController.java
package com.mwlee.test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@RestController
public class WfxController {
@Autowired WfxService testService;
@GetMapping("/testwfx")
public Mono<String> testWfx(Integer order) {
return Mono.defer(() -> {
log.info("Order: " + order + "\t | WFX Controller Before Service Call :\t" + Thread.currentThread().getName());
Mono<String> result = testService.callApiWfx(order);
log.info("Order: " + order + "\t | WFX Controller After Service Call :\t" + Thread.currentThread().getName());
return result;
});
}
}
WfxService.java
package com.mwlee.test;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@Service
public class WfxService {
private String targetApi = "http://localhost:9999";
private WebClient webClient = WebClient.create(targetApi);
public Mono<String> callApiWfx(Integer order) {
return Mono.defer(() -> {
log.info("Order: " + order + "\t | WFX Service Thread Before API Call :\t" + Thread.currentThread().getName());
Mono<String> response = webClient.get().uri("/hello").retrieve().bodyToMono(String.class);
response.subscribe(result -> {
log.info("Order: " + order + "\t | WFX Service Thread After API Call :\t" + Thread.currentThread().getName());
});
return response;
});
}
}
TestwfxApplicationTests.java
package com.mwlee.test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.web.reactive.server.WebTestClient;
import lombok.extern.slf4j.Slf4j;
//아래 랜덤포트 미지정 시 webTestClient에서 Error creating bean 에러 발생
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
class TestwfxApplicationTests {
@Autowired private WebTestClient webTestClient;
private int CNT=3;
@Test
void contextLoads() throws InterruptedException {
ExecutorService executorServiceWfx = Executors.newFixedThreadPool(CNT);
CountDownLatch latchWfx = new CountDownLatch(CNT);
log.info("@@ WFX test start");
for(int i=1; i<=CNT; i++) {
final int cnt = i;
executorServiceWfx.execute(() -> {
String result =
webTestClient.get()
.uri(uriBuilder -> uriBuilder.path("/testwfx").queryParam("order", cnt).build())
.exchange()
.expectStatus().isOk()
.expectBody(String.class)
.returnResult().getResponseBody();
assertEquals(result, "HELLO");
latchWfx.countDown();
});
}
latchWfx.await(); // 위 요청이 모두 끝날 때까지 대기
executorServiceWfx.shutdown();
}
}
이제 실행을 한 번 시켜보자.
우선 MVC의 결과이다.
http-nio 스레드를 사용하고 있으며, Controller가 Service 호출 후 Service의 결과를 기다리고 있음을 확인할 수 있다. 즉 모든 코드가 순차적으로 수행된다.
그렇다면 Webflux는 어떤 결과를 보여줄까? 다음은 Webflux의 결과이다.
reactor-http-nio 스레드를 사용하고 있으며, Controller와 Service가 각각 다른 함수를 호출했으나 해당 함수의 결과를 기다리지 않고 곧바로 다음 로직으로 넘어가는 비동기식으로 작동했음을 확인할 수 있다.
http-nio : Tomcat의 NIO 기반 웹 런타임으로, 전통적인 서블렛 API와 함께 작동한다.
ractor-http-nio : Reactor Netty 기반의 웹 런타임으로, Reactive Streams API와 함께 작동한다.
그러면 이제 과연 3초라는 API의 통신 시간동안 이를 수행하던 스레드는 어떤 일을 하는지 확인해보자.
각 테스트 클래스의 시도 회수인 CNT를 3에서 99으로 변경하고, 전체 테스트 소요시간과 할당되는 스레드의 변화를 감지해보았다.
MVC의 경우 3.99 초가 소요되었고, CTRL+F를 통해 특정 스레드를 찾아보면, 동일한 스레드가 다른 HTTP 요청을 처리한 흔적이 존재하지 않는다.
그렇다면 Webflux는 어떨까?
고작 99개밖에 되지 않는 요청이기에 소요시간에 유의미한 차이가 존재하지는 않지만, 동일한 스레드가 여러 Order를 동시에 처리하는 모습이 확인되었다.
(위 테스트를 기준으로 고작 8개의 스레드가 100개의 요청을 모두 처리했다.)
'실습 > 리눅스 서버 + 스프링 부트' 카테고리의 다른 글
[Spring Webflux] Mybatis를 비동기로 돌리기 (0) | 2023.12.05 |
---|---|
[Spring Webflux] MariaDB CRUD API (2) | 2023.12.03 |
동시성 제어 (0) | 2023.10.16 |
[Apache Server] 리버스 프록시_CentOS, Ubuntu (1) | 2023.09.28 |
Spring Batch (0) | 2023.07.19 |