오늘은 Webflux 모듈에서 HDFS에 파일을 업로드 하는 코드를 작성해보고자 한다.
우선 모듈은 이전의 두 개 포스팅에서 작성한 모듈을 이어서 사용했다.
https://123okk2.tistory.com/483
설정 추가
hdfs와의 연동을 위해 필요한 의존성을 추가한다.
참고로 설치된 하둡이 3.2.3 버전이라 3.2.3 버전의 디펜던시를 채택했다.
pom.xml
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.3</version>
</dependency>
의존성 설정 후에는 필요한 일부 설정을 application.properties에 추가한다.
application.properties
hadoop.hdfs.url=hdfs://{IP}:8020/
hadoop.hdfs.user=hadoop
hadoop.hdfs.dir=/hdfstest
위 항목들의 네이밍은 내가 임의로 정한 것이므로, 굳이 따를 필요는 없다.
코드 작성
코드는 아래와 같이 세 개의 클래스가 추가될 예정이다.
- HadoopConfig : HDFS 연결 설정
- HadoopRestController : 사용자에게 파일을 업로드받는 컨트롤러
- HadoopService : 비즈니스 로직 수행
우선 HDFS 연결 설정을 위한 HadoopConfig를 작성해서 연결을 위한 빈을 생성한다.
HadoopConfog.java
package com.mwlee.test.wfx.hadoop.config;
import org.apache.hadoop.fs.FileSystem;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import java.net.URI;
@Configuration
public class HadoopConfig {
// 하둡 연결 주소 hdfs://{ip}:8020
@Value("${hadoop.hdfs.url}")
private String HDFS_URI;
// 하둡의 root 사용자 (나같은 경우는 hadoop 계정으로 설치해서 hadoop이 root 계정임.)
@Value("${hadoop.hdfs.user}")
private String HADOOP_USER;
/**
* Hadoop 연결을 위한 설정 수행
* Configuration 어노테이션과 겹처서 아래와 같이 사용
* @return
*/
@Bean
@Primary
public org.apache.hadoop.conf.Configuration configuration() {
/**
* 시스템 변수로 hadoop 사용자가 설정되어 있어야 한다.
* 만약 설정하지 않으면 root 등 현재 사용자 이름으로 연결이 설정되어 Permission denied가 발생한다.
* org.apache.hadoop.security.AccessControlException: Permission denied: user=À̹οì, access=WRITE, inode="/":hadoop:supergroup:drwxr-xr-x
*/
System.setProperty("HADOOP_USER_NAME", HADOOP_USER);
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
return configuration;
}
@Bean
public FileSystem fileSystem(org.apache.hadoop.conf.Configuration configuration) throws Exception {
return FileSystem.get(new URI(HDFS_URI), configuration);
}
}
주석으로도 작성해놓았지만, 개발은 윈도우에서 진행되는 반면 hadoop은 리눅스 기반으로 설치된 경우가 있을 것이다. 그리고 이에 따라 윈도우 사용자 이름과 리눅스 내 hadoop 계정의 이름은 다를 수 있다. 이 경우 Permission denied 에러가 발생하는데, 이를 방지하기 위해 시스템 변수 내 사용자 이름을 hadoop의 최고 관리자 계정으로 변경한다.
다음으로 비즈니스 로직을 수행하는 서비스 클래스를 작성한다.
hdfs에 파일을 업로드하는 방법은 간단하게 원본 파일의 InputStream을 추출해서 IOUtils를 사용해서 업로드하면 된다.
HadoopService.java
package com.mwlee.test.wfx.hadoop.service;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
@Service
public class HadoopService {
@Autowired FileSystem hdfsConnection;
@Autowired Configuration hdfsConfig;
@Value("${hadoop.hdfs.dir}")
private String TARGET_HDFS_PATH;
/**
* HDFS내 파일을 저장할 타겟 디렉터리가 존재하는지 확인 후 존재하지 않는다면 생성
*/
@PostConstruct
public void checkIfTargetPathExists() {
try {
Path path = new Path(TARGET_HDFS_PATH);
if(!hdfsConnection.exists(path)) {
// 데이터를 적재할 폴더가 존재하지 않으면 폴더 생성
if(hdfsConnection.mkdirs(path)) {
// 폴더 생성 성공
log.info("@@ Folder {} Created.", TARGET_HDFS_PATH);
}
else {
// 폴더 생성 실패 및 프로그램이 정상 작동할 수 없으므로 강제 종료
log.error("@@ Can't Create Folder {}", TARGET_HDFS_PATH);
System.exit(0);
}
}
else {
log.info("@@ Folder {} Already Exists.", TARGET_HDFS_PATH);
}
log.info(Boolean.toString(hdfsConnection.exists(path)));
} catch (IOException e) {
log.error("@@ IOError Occured : {}", e.getMessage());
System.exit(0);
} catch (Exception e) {
log.error("@@ Error Occured : {}", e.getMessage());
System.exit(0);
}
}
/**
* 파일 업로드 수행
*
* @param file
* @return
*/
public Mono<Boolean> uploadFile(FilePart file) {
return file
// file 내 inputstream 뽑아내기
.content()
.flatMap(dataBuffer -> {
return Mono.just(dataBuffer.asInputStream());
})
// asInputStream이 FLUX를 뽑아내므로 collect 실행
.collectList()
// hdfs에 업로드
.flatMap(inputStreamList -> {
try {
// 저장 위치에 동명의 파일로 저장.
Path path = new Path(TARGET_HDFS_PATH + "/" + file.filename());
// org.apache.hadoop.io.IOUtils 를 사용해 업로드
OutputStream outputStream = hdfsConnection.create(path);
// 어차피 파일은 하나이므로 리스트 안에는 1개만 들어있음.
IOUtils.copyBytes(inputStreamList.get(0), outputStream, hdfsConfig);
} catch (IOException e) {
log.error("@@ IOError Occured : {}", e.getMessage());
return Mono.just(false);
} catch (Exception e) {
log.error("@@ Error Occured : {}", e.getMessage());
return Mono.just(false);
}
return Mono.just(true);
});
}
}
코드를 보면 일반적으로 사용되는 MultipartFile이 아니라 FilePart를 사용하고 있다. 이유라면 Webflux가 MultipartFile을 지원하지 않기 때문이다.
실험을 위해 MultipartFile을 RestController 로 받으려고 했는데, 아래와 같은 에러가 발생했다.
Resolved [UnsupportedMediaTypeStatusException: 415 UNSUPPORTED_MEDIA_TYPE "Content type 'multipart/form-data' not supported for bodyType=org.springframework.web.multipart.MultipartFile"] for HTTP POST /hadoop
고로 Webflux에서는 MultipartFile 대신 FilePart를 사용한다. MultipartFile을 사용하는 MVC 모델 코드는 APPENDIX로 하단에 작성하겠다.
서비스 작성이 완료되었다면 파일을 전달받는 RestController를 작성한다.
HadoopRestController.java
package com.mwlee.test.wfx.hadoop.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import com.mwlee.test.wfx.hadoop.service.HadoopService;
import reactor.core.publisher.Mono;
@RestController
public class HadoopRestController {
@Autowired HadoopService hadoopService;
/**
* 사용자에게 파일을 업로드 받아 서비스에 전달하는 컨트롤러
*
* @param file
* @return
*/
@PostMapping(path="/hadoop")
public Mono<Boolean> uploadHadoop(@RequestPart("file") Mono<FilePart> file) {
return file.flatMap(singleFile -> {
return hadoopService.uploadFile(singleFile);
});
}
}
테스트
코드 작성이 완료되었으므로, 실제로 업로드를 실행해본다.
일반적인 하둡 3이라면 50070 포트에 진입하면 HDFS내 파일 목록을 확인할 수 있다.
이제 스프링 어플리케이션을 실행시켜보자. 어플리케이션이 정상 동작하고 코드 내용대로 /hdfstest 폴더가 생성된다면 연동에 성공한 것이다.
정상적으로 생성이 완료되었다.
그러면 이제 실제로 파일을 업로드해보자. 우선 아래와 같은 txt 파일을 생성했다.
그리고 postman에 들어가 아래와 같이 post 메서드를 날려보자.
설정 완료 후 Send 버튼을 눌러 API에 파일을 업로드한다. 그리고 아까 확인했던 HDFS /hdfstest 디렉터리 내에 해당 파일이 정상 업로드되었는지 확인한다.
정상 업로드 되었음을 확인할 수 있다.
APPENDIX
Webflux 의 FilePart 기반으로 hdfs에 파일을 업로드 하는 방법까지는 확인헀는데, MultipartFile로 업로드하는 방법은 확인하지 않았다.
사실 로직 자체는 크게 다르지 않다. 그냥 MultipartFile의 inputstream을 추출하고 IOUtils에 넣기만 하면 된다.
아래 코드는 MultipartFile을 hdfs에 업로드하는 코드이다. hdfsConnection, hdfsConfig는 상단의 HadoopService.java의 @Autowired 부분을 참고하면 된다.
Path path = new Path(TARGET_HDFS_PATH + "/" + file.getName());
OutputStream outputStream = hdfsConnection.create(path);
IOUtils.copyBytes(file, outputStream, hdfsConfig);
'실습 > 리눅스 서버 + 스프링 부트' 카테고리의 다른 글
[Spring JPA] Dynamic Insert/Update (0) | 2023.12.19 |
---|---|
[Spring JPA] JPA에서도 PK가 변경될까? (1) | 2023.12.18 |
[Spring Webflux] Mybatis를 비동기로 돌리기 (0) | 2023.12.05 |
[Spring Webflux] MariaDB CRUD API (2) | 2023.12.03 |
Spring MVC vs Spring Webflux (0) | 2023.10.30 |