다음 프로젝트에서는 Spring Webflux 기반 프로젝트에 MaiaDB와 Hive를 동시에 연동시켜서 진행할 예정이다.
두 개의 DB에 대한 요구사항은 얼추 아래와 같이 구성할 수 있다.
- 프로그램 실행에 사용되는 데이터는 MariaDB에 저장한다.
- 사용자가 입력하는 데이터는 Hive에 저장한다.
- Hive 테이블 구조는 사용자가 직접 설정하며, 해당 구조는 MariaDB에 저장된다.
- 즉 Hive에 데이터 입력은 고정된 형식으로만 제공되지 않으며, 동적으로 DDL과 DML 설정이 가능해야 한다.
전체 요구사항을 말할 수 없어 이렇게만 적으면 "무슨 소리지" 싶을 수 있겠지만, 아무튼 그렇다.
위 요구사항 만족을 위해 MariaDB는 일반적인 방법으로 r2dbc를 사용해 해결할 수 있다.
하지만 Hive는 그렇지 않다. 이유라면 두 가지다.
- 동적으로 DDL, DML 실행이 가능해야 함.
- 애초에 r2dbc는 Hive를 지원하지 않음.
결국 위 요구사항 충족을 위해 Hive 연동 부분은 Mybatis를 사용해야 한다.
그런데 무작정 Mybatis를 사용하기에는 문제가 존재했다.
MyBatis는 동기식(synchronous) 방식으로 작동하는 ORM(Object-Relational Mapping) 프레임워크이다.
앞단에서 비동기로 짜봤자 Mybatis가 동기식으로 작동하면 비동기의 의미가 퇴색된다.
그래서 이에 대한 해결법을 모색하기 위해 전지전능한 장난감 ChatGPT에게 문의를 해보았다.
chatGPT가 내놓은 방법 중 2번 방법인 CompletableFuture 사용이 4개의 방법 중 가장 무난해보였고, 한 번 시도해보았다. 그리고 그 결과를 포스팅하고자 한다.
프로젝트 세팅
우선 직전 포스팅에서 Spring Webflux를 사용해 만든 프로젝트가 있기에 그대로 사용했다.
https://123okk2.tistory.com/482
*APPENDIX #2까지 전부 되어있는 상태이다.
우선 Mybatis 사용을 위해 Mybatis Dependency를 추가한다.
그리고 Hive 연동을 위한 dpcp와 hive-jar을 pom.xml에 추가했다.
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.3.7</version>
<exclusions>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency>
완료되었다면 application.properties에 Hive 연동을 위한 환경설정을 추가한다.
hive.driver.classname=org.apache.hive.jdbc.HiveDriver
hive.connection.url=jdbc:hive2://{IP}:10000/default
hive.connection.username=hadoop
hive.connection.password=hadoop
이제 코드를 작성한다.
코드는 아래의 구조로 작성할 예정이다.
- HiveConfig : 하이브 연결 설정 및 Bean 생성
- HiveRestController : 하이브 관련 API를 제공하는 Restful API Controller
- HiveRepository : 하이브에 쿼리를 전송하는 DAO
- HiveService : 하이브 관련 비즈니스 로직을 담당하는 클래스
우선 HiveConfig 클래스에서 하이브 연결을 설정한다.
HiveConfig.java
package com.mwlee.test.wfx.hive.config;
import javax.sql.DataSource;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.ibatis.session.AutoMappingBehavior;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@Configuration
@EnableTransactionManagement
public class HiveConfig {
@Value("${hive.driver.classname}")
private String hiveDriverClassName ;
@Value("${hive.connection.url}")
private String hiveJdbcUrl;
@Value("${hive.connection.username}")
private String hiveJdbcUsername;
@Value("${hive.connection.password}")
private String hiveJdbcPassword;
@Bean
@Qualifier("hiveDataSource")
public DataSource hiveDataSource() {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName(hiveDriverClassName);
dataSource.setUrl(hiveJdbcUrl);
dataSource.setUsername(hiveJdbcUsername);
dataSource.setPassword(hiveJdbcPassword);
dataSource.setMinIdle(10);
return dataSource;
}
@Bean
@Qualifier("hiveDataSourceTransactionManager")
public DataSourceTransactionManager hiveDataSourceTransactionManager(@Qualifier("hiveDataSource") DataSource hiveDataSource) {
return new DataSourceTransactionManager(hiveDataSource);
}
@Bean
@Qualifier("hiveSqlSessionFactory")
public SqlSessionFactory hiveSqlSessionFactory(@Qualifier("hiveDataSource") DataSource hiveDataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(hiveDataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/hive.xml"));
// *Configuration annotation과 겹쳐서 이렇게 작성
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
configuration.setCacheEnabled(true);
configuration.setUseGeneratedKeys(false);
configuration.setDefaultExecutorType(ExecutorType.SIMPLE);
configuration.setLazyLoadingEnabled(false);
configuration.setAggressiveLazyLoading(true);
configuration.setUseColumnLabel(true);
configuration.setAutoMappingBehavior(AutoMappingBehavior.PARTIAL);
configuration.setMultipleResultSetsEnabled(true);
configuration.setSafeRowBoundsEnabled(true);
configuration.setMapUnderscoreToCamelCase(false);
bean.setConfiguration(configuration);
return bean.getObject();
}
@Bean
@Qualifier("hiveSqlSession")
public SqlSessionTemplate hiveSqlSession(@Qualifier("hiveSqlSessionFactory") SqlSessionFactory hiveSqlSessionFactory) {
return new SqlSessionTemplate(hiveSqlSessionFactory);
}
}
위 코드의 hiveMSqlSessionFactory 메서드에서 세 번째 줄을 보면 알 수 있듯, 매퍼 xml의 위치를 mapper/hive.xml로 설정해놓았다. 이에 따라 resource 하위에 해당 폴더와 xml 파일을 생성한다.
그리고 hive.xml 파일에 아래와 같은 스크립트를 작성한다. 단순 테스트 용이므로 단순한 쿼리문만을 탑재했다.
hive.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="mapper.hive">
<select id="test" resultType = "Integer">
SELECT 1
</select>
</mapper>
완료되었다면 위 xml의 mapper.hive.test 에 작성한 쿼리를 불러오는 쿼리를 HiveRepository에 작성한다.
HiveRepository.java
package com.mwlee.test.wfx.hive.repository;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;
@Repository
public class HiveRepository {
@Autowired
@Qualifier("hiveSqlSession")
private SqlSessionTemplate sqlSession;
public Integer test() {
return sqlSession.selectOne("mapper.hive.test");
}
}
그 후에는 HiveService를 작성한다. HiveRepository의 test() 메서드를 호출하며, 실행 순서를 보기 위해 아래와 같이 Mono 내부에 코드를 작성하지 않고 본문에서 작성하며 위 아래에 로그를 탑재했다.
HiveService.java
package com.mwlee.test.wfx.hive.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.mwlee.test.wfx.hive.repository.HiveRepository;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@Service
public class HiveService {
@Autowired HiveRepository repo;
public Mono<Integer> doTest() {
log.info("start");
Mono<Integer> result = Mono.just(repo.test());
log.info("end");
return result;
}
}
마지막으로 HiveRestController.java 클래스를 작성한다.
HiveRestController.java
package com.mwlee.test.wfx.hive.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.mwlee.test.wfx.hive.service.HiveService;
import reactor.core.publisher.Mono;
@RestController
public class HiveRestController {
@Autowired HiveService service;
@GetMapping("/hive")
public Mono<Integer> getTest() {
return service.doTest();
}
}
이제 Postman을 활용해서 API를 테스트한다. (jdk.tools가 없어 JUnitTest는 불가능)
만약 비동기로 호출이 된다면 Service 내의 로그가 전부 출력된 이후 Hive가 연동될 것이다.
start 이후 DB가 연동되고, DB 연동이 끝난 후에야 end 로그가 찍혔다.
즉, 비동기로 돌아가지 않고 동기식으로 호출되었음을 의미한다.
그렇다면 이제 ChatGPT가 알려준 방식으로 코드를 작성해보자.
우선 HiveRepository 의 test() 메서드를 수정한다.
방법은 간단하게 기존 코드를 CompletableFuture로 감싸면 된다.
HiveRepository.java
package com.mwlee.test.wfx.hive.repository;
import java.util.concurrent.CompletableFuture;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;
@Repository
public class HiveRepository {
@Autowired
@Qualifier("hiveSqlSession")
private SqlSessionTemplate sqlSession;
public CompletableFuture<Integer> test() {
return CompletableFuture.supplyAsync(() -> {
return sqlSession.selectOne("mapper.hive.test");
});
}
}
그리고 CompletableFuture로 return된 내용을 service에서 Mono.fromFuture로 받아준다.
HiveService.java
package com.mwlee.test.wfx.hive.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.mwlee.test.wfx.hive.repository.HiveRepository;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@Service
public class HiveService {
@Autowired HiveRepository repo;
public Mono<Integer> doTest() {
log.info("start");
Mono<Integer> result = Mono.fromFuture(repo.test());
log.info("end");
return result;
}
}
이제 프로젝트를 재실행하고 다시 한 번 Postman으로 호출을 실행해본다.
start, end 로그가 먼저 찍히고 DB에 연동되었음을 알리는 로그가 찍히는 것을 토대로 비동기로 쿼리가 호출되었음을 확인할 수 있다.
APPENDIX
그렇다면 Mono 안에 sqlSession.select를 넣어서 날리면 혹시 비동기가 되지 않을까? 하는 의문이 생겼다.
궁금한 건 해소해야 하므로 테스트를 진행해보았다.
코드는 간단하게 아래와 같이 작성했다.
HiveRepository.java
package com.mwlee.test.wfx.hive.repository;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
@Repository
public class HiveRepository {
@Autowired
@Qualifier("hiveSqlSession")
private SqlSessionTemplate sqlSession;
public Mono<Integer> test() {
return Mono.just(sqlSession.selectOne("mapper.hive.test"));
}
}
HiveService.java
package com.mwlee.test.wfx.hive.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.mwlee.test.wfx.hive.repository.HiveRepository;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@Service
public class HiveService {
@Autowired HiveRepository repo;
public Mono<Integer> doTest() {
log.info("start");
Mono<Integer> result = repo.test();
log.info("end");
return result;
}
}
이제 테스트를 수행해보자.
아쉽지만 Mono로 감쌌다고 해서 무조건 비동기로 실행되는 것은 아닌 것 같다.
'실습 > 리눅스 서버 + 스프링 부트' 카테고리의 다른 글
[Spring JPA] JPA에서도 PK가 변경될까? (1) | 2023.12.18 |
---|---|
[Spring Webflux] HDFS에 파일 업로드 (0) | 2023.12.12 |
[Spring Webflux] MariaDB CRUD API (2) | 2023.12.03 |
Spring MVC vs Spring Webflux (0) | 2023.10.30 |
동시성 제어 (0) | 2023.10.16 |