본문 바로가기
실습/리눅스 서버 + 스프링 부트

[Spring Webflux] HDFS에 파일 업로드

by 이민우 2023. 12. 12.
728x90
반응형

오늘은 Webflux 모듈에서 HDFS에 파일을 업로드 하는 코드를 작성해보고자 한다.

 

우선 모듈은 이전의 두 개 포스팅에서 작성한 모듈을 이어서 사용했다.

https://123okk2.tistory.com/483

 

[Spring Webflux] Mybatis를 비동기로 돌리기

다음 프로젝트에서는 Spring Webflux 기반 프로젝트에 MaiaDB와 Hive를 동시에 연동시켜서 진행할 예정이다. 두 개의 DB에 대한 요구사항은 얼추 아래와 같이 구성할 수 있다. 프로그램 실행에 사용되는

123okk2.tistory.com

 

 

설정 추가

hdfs와의 연동을 위해 필요한 의존성을 추가한다.

참고로 설치된 하둡이 3.2.3 버전이라 3.2.3 버전의 디펜던시를 채택했다.

 

pom.xml

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>3.2.3</version>
</dependency>

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-hdfs</artifactId>
	<version>3.2.3</version>
</dependency>

 

의존성 설정 후에는 필요한 일부 설정을 application.properties에 추가한다.

 

application.properties

hadoop.hdfs.url=hdfs://{IP}:8020/
hadoop.hdfs.user=hadoop
hadoop.hdfs.dir=/hdfstest

 

 

위 항목들의 네이밍은 내가 임의로 정한 것이므로, 굳이 따를 필요는 없다.

 

 

 

코드 작성

 

코드는 아래와 같이 세 개의 클래스가 추가될 예정이다.

  • HadoopConfig : HDFS 연결 설정
  • HadoopRestController : 사용자에게 파일을 업로드받는 컨트롤러
  • HadoopService : 비즈니스 로직 수행

 

우선 HDFS 연결 설정을 위한 HadoopConfig를 작성해서 연결을 위한 빈을 생성한다.

 

HadoopConfog.java

package com.mwlee.test.wfx.hadoop.config;

import org.apache.hadoop.fs.FileSystem;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.net.URI;

@Configuration
public class HadoopConfig {

	// 하둡 연결 주소 hdfs://{ip}:8020
	@Value("${hadoop.hdfs.url}")
    private String HDFS_URI;
	
	// 하둡의 root 사용자 (나같은 경우는 hadoop 계정으로 설치해서 hadoop이 root 계정임.)
	@Value("${hadoop.hdfs.user}")
	private String HADOOP_USER;
	
	/**
	 * Hadoop 연결을 위한 설정 수행
	 * Configuration 어노테이션과 겹처서 아래와 같이 사용
	 * @return
	 */
    @Bean
    @Primary
    public org.apache.hadoop.conf.Configuration configuration() {
    	
    	/**
    	 * 시스템 변수로 hadoop 사용자가 설정되어 있어야 한다.
    	 * 만약 설정하지 않으면 root 등 현재 사용자 이름으로 연결이 설정되어 Permission denied가 발생한다.
    	 * org.apache.hadoop.security.AccessControlException: Permission denied: user=À̹οì, access=WRITE, inode="/":hadoop:supergroup:drwxr-xr-x
    	 */
    	System.setProperty("HADOOP_USER_NAME", HADOOP_USER);
    	
    	org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
        
        return configuration;
    }

    @Bean
    public FileSystem fileSystem(org.apache.hadoop.conf.Configuration configuration) throws Exception {
    	return FileSystem.get(new URI(HDFS_URI), configuration);
    }
}

 

주석으로도 작성해놓았지만, 개발은 윈도우에서 진행되는 반면 hadoop은 리눅스 기반으로 설치된 경우가 있을 것이다. 그리고 이에 따라 윈도우 사용자 이름과 리눅스 내 hadoop 계정의 이름은 다를 수 있다. 이 경우 Permission denied 에러가 발생하는데, 이를 방지하기 위해 시스템 변수 내 사용자 이름을 hadoop의 최고 관리자 계정으로 변경한다.

 

다음으로 비즈니스 로직을 수행하는 서비스 클래스를 작성한다.

hdfs에 파일을 업로드하는 방법은 간단하게 원본 파일의 InputStream을 추출해서 IOUtils를 사용해서 업로드하면 된다.

 

HadoopService.java

package com.mwlee.test.wfx.hadoop.service;

import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Service;

import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

@Slf4j
@Service
public class HadoopService {

	@Autowired FileSystem hdfsConnection;
	@Autowired Configuration hdfsConfig;
	
	@Value("${hadoop.hdfs.dir}")
	private String TARGET_HDFS_PATH;
	
	/**
	 * HDFS내 파일을 저장할 타겟 디렉터리가 존재하는지 확인 후 존재하지 않는다면 생성
	 */
	@PostConstruct
	public void checkIfTargetPathExists() {
		try {
			
			Path path = new Path(TARGET_HDFS_PATH);
			
			if(!hdfsConnection.exists(path)) {
				// 데이터를 적재할 폴더가 존재하지 않으면 폴더 생성
				if(hdfsConnection.mkdirs(path)) {
					// 폴더 생성 성공
					log.info("@@ Folder {} Created.", TARGET_HDFS_PATH);
				}
				else {
					// 폴더 생성 실패 및 프로그램이 정상 작동할 수 없으므로 강제 종료
					log.error("@@ Can't Create Folder {}", TARGET_HDFS_PATH);
					System.exit(0);
				}
			}
			else {
				log.info("@@ Folder {} Already Exists.", TARGET_HDFS_PATH);
			}
			
			log.info(Boolean.toString(hdfsConnection.exists(path)));
			
		} catch (IOException e) {
			log.error("@@ IOError Occured : {}", e.getMessage());
			System.exit(0);
		} catch (Exception e) {
			log.error("@@ Error Occured : {}", e.getMessage());
			System.exit(0);
		}
	}

	/**
	 * 파일 업로드 수행
	 * 
	 * @param file
	 * @return
	 */
	public Mono<Boolean> uploadFile(FilePart file) {
		return file
				// file 내 inputstream 뽑아내기
				.content()
				.flatMap(dataBuffer -> {
					return Mono.just(dataBuffer.asInputStream());
				})
				// asInputStream이 FLUX를 뽑아내므로 collect 실행
				.collectList()
				// hdfs에 업로드
				.flatMap(inputStreamList -> {
					try {
						// 저장 위치에 동명의 파일로 저장.
						Path path = new Path(TARGET_HDFS_PATH + "/" + file.filename());
						// org.apache.hadoop.io.IOUtils 를 사용해 업로드
						OutputStream outputStream = hdfsConnection.create(path);
						// 어차피 파일은 하나이므로 리스트 안에는 1개만 들어있음.
						IOUtils.copyBytes(inputStreamList.get(0), outputStream, hdfsConfig);
						
					} catch (IOException e) {
						log.error("@@ IOError Occured : {}", e.getMessage());
						return Mono.just(false);
					} catch (Exception e) {
						log.error("@@ Error Occured : {}", e.getMessage());
						return Mono.just(false);
					}
					
					return Mono.just(true);
				});
	}
}

 

 

코드를 보면 일반적으로 사용되는 MultipartFile이 아니라 FilePart를 사용하고 있다. 이유라면 Webflux가 MultipartFile을 지원하지 않기 때문이다.

 

 

실험을 위해 MultipartFile을 RestController 로 받으려고 했는데, 아래와 같은 에러가 발생했다.

Resolved [UnsupportedMediaTypeStatusException: 415 UNSUPPORTED_MEDIA_TYPE "Content type 'multipart/form-data' not supported for bodyType=org.springframework.web.multipart.MultipartFile"] for HTTP POST /hadoop

 

고로 Webflux에서는 MultipartFile 대신 FilePart를 사용한다. MultipartFile을 사용하는 MVC 모델 코드는 APPENDIX로 하단에 작성하겠다.

 

서비스 작성이 완료되었다면 파일을 전달받는 RestController를 작성한다.

 

HadoopRestController.java

package com.mwlee.test.wfx.hadoop.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;

import com.mwlee.test.wfx.hadoop.service.HadoopService;

import reactor.core.publisher.Mono;

@RestController
public class HadoopRestController {

	@Autowired HadoopService hadoopService;
	
	/**
	 * 사용자에게 파일을 업로드 받아 서비스에 전달하는 컨트롤러
	 * 
	 * @param file
	 * @return
	 */
	@PostMapping(path="/hadoop")
	public Mono<Boolean> uploadHadoop(@RequestPart("file") Mono<FilePart> file) {
		
		return file.flatMap(singleFile -> {
			return hadoopService.uploadFile(singleFile);
		});
	}
	
}

 

 

테스트

 

코드 작성이 완료되었으므로, 실제로 업로드를 실행해본다.

 

일반적인 하둡 3이라면 50070 포트에 진입하면 HDFS내 파일 목록을 확인할 수 있다.

 

 

이제 스프링 어플리케이션을 실행시켜보자. 어플리케이션이 정상 동작하고 코드 내용대로 /hdfstest 폴더가 생성된다면 연동에 성공한 것이다.

 

 

 

 

정상적으로 생성이 완료되었다.

 

그러면 이제 실제로 파일을 업로드해보자. 우선 아래와 같은 txt 파일을 생성했다.

 

그리고 postman에 들어가 아래와 같이 post 메서드를 날려보자.

Content-Type을 multipart/form-data 로 설정
body를 form-data로 변경 후 file 키에 파일 탑재 (키 이름 우측의 Text를 File로 변경해야 함)

 

설정 완료 후 Send 버튼을 눌러 API에 파일을 업로드한다. 그리고 아까 확인했던 HDFS /hdfstest 디렉터리 내에 해당 파일이 정상 업로드되었는지 확인한다.

 

목록을 통해 정상 업로드 확인
Head the file 을 선택해 내용도 정상 입력되었는지 확인

 

정상 업로드 되었음을 확인할 수 있다.

 

 

APPENDIX

 

Webflux 의 FilePart 기반으로 hdfs에 파일을 업로드 하는 방법까지는 확인헀는데, MultipartFile로 업로드하는 방법은 확인하지 않았다.

 

사실 로직 자체는 크게 다르지 않다. 그냥 MultipartFile의 inputstream을 추출하고 IOUtils에 넣기만 하면 된다.

 

아래 코드는 MultipartFile을 hdfs에 업로드하는 코드이다. hdfsConnection, hdfsConfig는 상단의 HadoopService.java의 @Autowired 부분을 참고하면 된다.

Path path = new Path(TARGET_HDFS_PATH + "/" + file.getName());

OutputStream outputStream = hdfsConnection.create(path);
IOUtils.copyBytes(file, outputStream, hdfsConfig);

 

728x90
반응형