본문 바로가기
실습/리눅스 서버 + 스프링 부트

[Webflux] Hadoop HTTPFS에 파일 업로드

by 이민우 2024. 3. 30.
728x90
반응형

이번에 또다시 혼자서 개발을 전담하며, Hadoop의 에코 시스템과 통신해서 데이터를 업로드/다운로드 하는 프로그램의 개발을 맡게 되었다. 그 과정에서 여러 가지 새로운 것들을 알아냈고, 언제 다시 사용할 지 모르니 복기 차원에서 블로그 포스팅을 해볼까 한다.

 

우선 첫 번째는 HTTPFS에 파일을 업로드 하는 것이다. 프로젝트는 아래와 같이 설정되어 있다.

  • Spring Boot 3.2.4
  • Java 17

 

HTTPFS

HTTPFS는 HDFS과 같은 대용량 데이터 저장 시스템에 HTTP(S) 인터페이스를 제공하는 파일 시스템이다. 쉽게 말하자면, HTTP API를 통해 HDFS에 파일을 업로드/다운로드/조회 할 수 있도록 지원하는 API라고 생각하면 쉽다.

 

왠만해서는 하둡 설치 시 함께 동봉되어 설치되나, start-all.sh로는 켜지지 않고, 별도의 쉘 스크립트 파일을 이용해서 켜야한다.

쉘 스크립트 파일은 start-all.sh가 위치한 sbin 폴더 내에 존재한다.

./httpfs.sh start

 

실행 시 14000 포트를 통해 httpfs가 실행된다.

 

 

HTTPFS API

HTTPFS 에서 제공하는 API는 여러개가 있지만, 이번 프로젝트에서 사용한 몇 가지만 알아보고자 한다.

  • LISTSTATUS
    • 폴더 내 파일 목록 조회
    • GET 사용
    • 파일에 사용 시 해당 파일에 대한 정보가 조회됨
  • GETFILESTATUS
    • 파일 상태 조회
    • GET 사용
  • MKDIRS
    • 폴더 생성
    • PUT 사용
  • CREATE
    • 파일 생성 및 업로드
    • PUT 사용
    • overwrite={true|false} 쿼리 파라미터로 중복 방지 가능
    • permisstion={perm}으로 권한 생성 가능
    • 업로드 시 content-type은 octstream이어야 함.
  • DELETE
    • 폴더 혹은 파일 삭제
    • DELETE 사용
    • recursive={true|false} 사용으로 폴더 삭재 시 재귀적 삭제 여부 설정

 

위 목록은 쿼리 파라미터의 op에 들어가며, 사용자는 user.name 쿼리 파라미터에 삽입된다.

# 예시 : hadoop 계정으로 /test/test_folder_1 폴더 내 파일 목록 조회
[GET] http://{ip}:14000/webhdfs/v1/test/test_folder_1?op=LISTSTATUS&user.name=hadoop

 

 

자바 코드와 연동

우선 HTTP를 통해 통신하기 때문에 별도의 디펜던시 추가는 불필요하다.

다만 Webflux를 기반으로 개발해서 이를 기반으로 설명할 것이기에 Spring Reactive Web 정도만 추가한다.

 

 

코드 생성

참고로 모든 API에 대한 연동을 작성할 건 아니다. 그저 Controller를 통해 사용자에게 파일을 수신받아, HTTPFS에 파일을 업로드 하는 정도의 간단한 코드만을 작성할까 한다.

 

해당 메소드의 로직은 아래와 같이 수행한다.

  1. 프로그램 실행 시 기본 파일 저장 폴더가 존재하는지 확인하고, 존재하지 않을 경우 폴더를 생성한다.
  2. 사용자가 파일을 업로드한다.
  3. 기본 파일 저장 폴더 내 동명의 파일이 존재하는지 여부를 체크한다.
  4. 존재하지 않으면 파일을 업로드한다.

참고로 밑줄을 쳐놓은 이유는 1, 3번 로직은 필요하지 않은 로직이기 때문이다.

HTTPFS의 경우 특정 폴더 내 파일을 저장하려고 하면 해당 디렉터리가 없을 경우 자동으로 생성해주고, 동명의 파일 체크는 CREATE API의 overwrite 쿼리 파라미터를 false로 두면 자동으로 방지되기 때문이다.

 

하지만 위에서 언급한 API를 최대한 사용해보고자 한 번 코드를 작성해보았다. 참고로 동명의 파일 존재 여부는 GETFILESTATUS가 아니라 LISTSTATUS API를 사용했다.

 

필요한 코드를 생성한다. DB 작업이 없기에 Service와 Controller, 그리고 두 클래스가 통신하기 위한 수단을 기재한 CommonCode, 사용자에게 전달할 Response인 CommonResponse, 그리고 CommonResponse에 들어갈 간단한 메시지를 저장하는 CommonMessage 정도만 추가로 작성했다.

 

샘플이므로 application.properties 혹은 yml 설정은 제외하고, 필요한 설정은 코드 내에서 설정했다.

 

우선 CommonCode 클래스는 아래와 같이 간단하게 작성한다. 이제 아래 시그널을 이용해서 Service 클래스가 Controller 클래스에 연산 결과를 공지할 것이다.

 

CommonCode.java

package com.mwlee.test.common;

public class CommonCode {

	public final static Integer SUCCESS=1;
	public final static Integer FAIL=2;
	public final static Integer ERR_INTERNAL=3;
	public final static Integer NOT_EXISTS=4;
	public final static Integer EXISTS=5;
}

 

CommonResponse는 StatusCode와 사용자에게 전달될 메시지를 저장하여 전달한다.

 

CommonResponse.java

package com.mwlee.test.common;

import java.io.Serializable;

import org.springframework.http.HttpStatus;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;

@Data
@Getter
@Setter
public class CommonResponse implements Serializable {
	
	private static final long serialVersionUID = 6627561697013941046L;

	private HttpStatus status;
	private String message;
	
}

 

CommonMessage는 CommonResponse의 message에 들어갈 데이터로, 현재는 간단하게 3개의 상황만 가정한다.

 

CommonMessage.java

package com.mwlee.test.common;

public class CommonMessage {

	public static final String RESPONSE_MESSAGE_FAILED = "업로드에 실패했습니다.";
	public static final String RESPONSE_MESSAGE_UPLOADED = "업로드되었습니다.";
	public static final String ALREADY_EXISTS = "동명의 파일이 존재합니다.";

}

 

다음으로 Service 클래스를 작성해보았다.

별도의 설명은 생략하며 필요 내용은 코드 내에 주석으로 표시해두었다.

 

HttpfsService.java

package com.mwlee.test.service;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;

import jakarta.annotation.PostConstruct;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mwlee.test.common.CommonCode;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

@Slf4j
@Service
public class HttpfsService {

	// httpfs 주소
	private String HTTPFS_URL = "http://IP:14000/webhdfs/v1";
	// hadoop에서 사용하는 사용자
	private String HTTPFS_USER = "hadoop";
	// 파일을 저장할 기본 디렉터리
	public String HTTPFS_DIR = "/BLOG_UPLOAD";
	// 중복저장 여부
	private String OVERWRITE_YN = "N";
	
	// HTTPFS와 통신을 위한 객체
	private WebClient webClient;
	private ObjectMapper mapper; 
	
	public HttpfsService() {
		// webclient 생성
		ClientHttpConnector connector = new ReactorClientHttpConnector();
        this.webClient = WebClient.builder().clientConnector(connector).build();

        // objectMapper 생성
        mapper = new ObjectMapper();
	}
	
	// HTTPFS 연산자들
	// [GET] http://{ip}:{port}/webhdfs/v1/{path}?op=LISTATATUS&user.name=hadoop
	private String SHOW_LIST="LISTSTATUS"; // 목록 조회 (GET)
	// [PUT] http://{ip}:{port}/webhdfs/v1/{path}?op=MKDIRS&user.name=hadoop
	private String CREATE_FOLDER="MKDIRS"; // 폴더 생성 (PUT)
	// [PUT] http://{ip}:{port}/webhdfs/v1/{path}?op=CREATE&user.name=hadoop&overwrite=<OVERWRITE>&permission=<PERMISSION>
	private String UPLOAD_FILE="CREATE"; // 파일 생성 및 업로드, OVERWRITE은 true or false

	/**
	 * HTTPFS에 연동해 파일/폴더가 존재하는지 여부 확인
	 * [GET] API_URL?op=LISTSTATUS&user.name=hadoop
	 * 
	 * 존재할 경우 : 
	 * { "FileStatuses": { "FileStatus": [ { ... } ] } }
	 * 
	 * 존재하지 않을 경우 :
	 * { "RemoteException": { "message": "File {FILE_NAME} does not exist.", "exception": "FileNotFoundException", "javaClassName": "java.io.FileNotFoundException" } }
	 * 
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public Mono<Integer> isExistsOnHdfs(String path) {
		
		return webClient
				// GET API 생성
				.get()
				// URI 입력 http://{IP}:{PORT}/webhdfs/v1/{path}?op=LISTSTATUS&user.name=hadoop
				.uri(String.format("%s%s?op=%s&user.name=%s", HTTPFS_URL, path, SHOW_LIST, HTTPFS_USER))
				// API 전송
				.retrieve()
				// Body를 가져옴
				.bodyToMono(String.class)
				// Response 파싱
				.map(jsonStr -> {
					log.info("@@ Get Response From HTTPFS : {}", jsonStr);
					try {
			            // 결과를 JSON으로 변경
						JsonNode rootNode = mapper.readTree(jsonStr);

			            // 존재 여부 확인
			            if (rootNode.path("RemoteException").path("exception").asText().equals("FileNotFoundException")) {
			                // 파일이 없음.
							log.info("@@ Folder or File {} Not Exists.", path);
							return CommonCode.NOT_EXISTS;
			            }
			            else {
			                // 파일이 있음.
							log.info("@@ Folder or File {} Already Exists.", path);
							return CommonCode.EXISTS;
			            }
			            
					} catch (JsonMappingException e) {
						log.error("@@ JsonMappingException Occured While Check Folder or File Exists : {}", e.getMessage());
						return CommonCode.ERR_INTERNAL;
					} catch (JsonProcessingException e) {
						log.error("@@ JsonProcessingException Occured While Check Folder or File Exists : {}", e.getMessage());
						return CommonCode.ERR_INTERNAL;
					}
		            
				})
				.onErrorResume(WebClientResponseException.class, e -> {
					log.warn("@@ WebClientResponseException Occured While Check Folder or File Exists : {}", e.getMessage());
					if(e.getMessage().contains("404 Not Found")) {
						// 404일 경우 즉 파일이 없을 경우
						return Mono.just(CommonCode.NOT_EXISTS);
					}
					return Mono.just(CommonCode.ERR_INTERNAL);
				})
				.onErrorResume(IOException.class, e -> {
					log.error("@@ IOException Occured While Check Folder or File Exists : {}", e.getMessage());
					return Mono.just(CommonCode.ERR_INTERNAL);
				})
				.onErrorResume(Exception.class, e -> {
					log.error("@@ Exception Occured While Check Folder or File Exists : {}", e.getMessage());
					return Mono.just(CommonCode.ERR_INTERNAL);
				});
	}
	
	/**
	 * HTTPFS에 연동해 폴더 생성
	 * [PUT] API_URL?op=MKDIRS&user.name=hadoop
	 * 
	 * @param path
	 */
	public Mono<Integer> createDir(String path) {
		
        return isExistsOnHdfs(path)
                .flatMap(exists -> {
                    if (exists.equals(CommonCode.NOT_EXISTS)) {
                    	// 존재하지 않을 경우
                        String createUri = String.format("%s%s?op=%s&user.name=%s", HTTPFS_URL, path, CREATE_FOLDER, HTTPFS_USER);
                        return webClient.put()
                                .uri(createUri)
                                .retrieve()
                                .bodyToMono(String.class)
                                .map(response -> {
                                    log.info("@@ Folder Created: {}", response);
                                    return CommonCode.SUCCESS;
                                })
                				.onErrorResume(WebClientResponseException.class, e -> {
                					// 404일 경우 여기로 넘어옴.
                					log.error("@@ WebClientResponseException Occured While Make Folder : {}", e.getMessage());
                					return Mono.just(CommonCode.ERR_INTERNAL);
                				})
                				.onErrorResume(IOException.class, e -> {
                					log.error("@@ IOException Occured While Make Folder : {}", e.getMessage());
                					return Mono.just(CommonCode.ERR_INTERNAL);
                				})
                				.onErrorResume(Exception.class, e -> {
                					log.error("@@ Exception Occured While Make Folder : {}", e.getMessage());
                					return Mono.just(CommonCode.ERR_INTERNAL);
                				});
                    } 
                    else if (CommonCode.EXISTS.equals(CommonCode.EXISTS)) {
                        log.info("@@ Folder Already Exists: {}", path);
                        return Mono.just(CommonCode.SUCCESS);
                    } 
                    else {
                        return Mono.just(CommonCode.FAIL);
                    }
                });
	}
	
	/**
	 * 파일 저장을 위한 폴더 생성
	 */
	@PostConstruct
	public void createBasicDir() {
		
		Integer result = createDir(HTTPFS_DIR).block();
		
		if(result != CommonCode.SUCCESS) {
			log.error("@@ Can't Create Basic Folder On Hdfs");
			System.exit(1);
		}
		
	}
	
	/**
	 * 파일 업로드
	 * 
	 * @param file
	 * @return
	 */
	public Mono<Integer> uploadFile(File file) {
		
		StringBuffer path = new StringBuffer();
		path.append(HTTPFS_DIR);
		path.append("/");
		path.append(file.getName());
		
		return 
				// 동명의 파일이 존재하는지 확인
				isExistsOnHdfs(path.toString())
				.flatMap(result -> {
					if(result == CommonCode.EXISTS || result == CommonCode.ERR_INTERNAL) {
						// 파일이 존재하거나 인터널 에러가 발생하면 그냥 RETURN
						return Mono.just(result);
					}
					else {
						// 파일이 존재하지 않으면 업로드 수행
						// 쿼리 파라미터가 좀 많으므로 String.format 대신 스트링버퍼 이용.
						// HTTP URL 구성
						StringBuffer uploadUri = new StringBuffer();
						uploadUri.append(HTTPFS_URL).append(path.toString());
						uploadUri.append("?op=").append(UPLOAD_FILE);
						uploadUri.append("&user.name=").append(HTTPFS_USER);
						uploadUri.append("&data=true");
						uploadUri.append("&overwrite=");
						if("Y".equals(OVERWRITE_YN)) {
							// 중복 허용
							uploadUri.append("true");
						}
						else {
							uploadUri.append("false");
						}

						// 파일 내용을 byte[]로 변경
				        byte[] fileData = null;
				        try {
				            fileData = Files.readAllBytes(file.toPath());
				        } catch (IOException e) {
				            e.printStackTrace();
				        }
				        // ByteBuffer를 DataBuffer로 변환
				        ByteBuffer byteBuffer = ByteBuffer.wrap(fileData);
				        DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
				        DataBuffer dataBuffer = bufferFactory.wrap(byteBuffer);
				        
				        // 업로드 수행
				        return webClient
				    			.put()
				    			.uri(uploadUri.toString())
				    			.contentType(MediaType.APPLICATION_OCTET_STREAM)
				    			.body(BodyInserters.fromDataBuffers(Mono.just(dataBuffer)))
				    			// 만약 file의 타입이 File이 아니라 FilePart일 경우 아래와 같이 만들면 됨.
				    			//.body(BodyInserters.fromDataBuffers(file.content()))
				    			.exchange()
				    			.flatMap(apiResult -> {
				    				
				    				// 끝났으면 임시파일 삭제
				    				file.delete();
				    				
				    				if(apiResult.statusCode().is2xxSuccessful()) {
				    					// 업로드 성공을 의미
				    					log.info("@@ File Uploaded On Hdfs : {}", file.getName());
				    					return Mono.just(CommonCode.SUCCESS);
				    				}
				    				
				    				return apiResult.bodyToMono(String.class)
				    						.flatMap(responseBody -> {
				    							// 에러 메시지가 반환되었으면 출력
						    					log.error("@@ Can't Upload File On Hdfs : {} ", responseBody);
						    					return Mono.just(CommonCode.FAIL);
				    						})
				    						// 에러 메시지도 없으면 로그 출력 없이 반환
				    						.switchIfEmpty(Mono.just(CommonCode.FAIL));
				    			});
					}
				})
				.onErrorResume(Exception.class, e -> {
    				// 끝났으면 임시파일 삭제
    				file.delete();
					log.error("@@ Exception Occured While Upload File : {}", e.getMessage());
					return Mono.just(CommonCode.ERR_INTERNAL);
				});
	}
	
	// 사용자에게 받은 FilePart를 File로 임시 저장
	// 굳이 저장하는 이유는 일정 용량 이상의 파일 업로드 시 스프링이 해당 파일을 가져오지 못해서
	private String TMP_DIR="C:\\files\\";
	public Mono<Integer> uploadFile(FilePart file) {
		// 임시파일 생성
		File tmpFile = new File(TMP_DIR + file.filename());
		
		return file
			// 임시파일에 업로드된 파일 저장
			.transferTo(tmpFile)
			// 파일 업로드 수행
			.then(uploadFile(tmpFile));
	}
}

 

마지막으로 HttpfsController는 간단하게 작성한다.

Router보다 @PostMapping 같은 게 편해서 아래와 같이 작성했다.

 

HttpfsController.java

package com.mwlee.test.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;

import com.mwlee.test.common.CommonCode;
import com.mwlee.test.common.CommonMessage;
import com.mwlee.test.common.CommonResponse;
import com.mwlee.test.service.HttpfsService;

import reactor.core.publisher.Mono;

@RestController
public class HttpfsController {
	
	@Autowired HttpfsService service;
	
	/**
	 * 파일 업로드
	 * @param file
	 * @return
	 */
	@PostMapping("/upload")
	public Mono<CommonResponse> upload(@RequestPart("file") Mono<FilePart> file) {

		CommonResponse errorResponse = new CommonResponse();
		
		errorResponse.setStatus(HttpStatus.INTERNAL_SERVER_ERROR);
		errorResponse.setMessage(CommonMessage.RESPONSE_MESSAGE_FAILED);
		
		return file
				.flatMap(singleFile -> {
					return service
							.uploadFile(singleFile)
							.flatMap(result -> {
								// 서비스에서 반환한 result에 따라 응답 분기
								CommonResponse response = new CommonResponse();
								
								if(result == CommonCode.SUCCESS) {
									response.setStatus(HttpStatus.CREATED);
									response.setMessage(CommonMessage.RESPONSE_MESSAGE_UPLOADED);
								}
								else if(result == CommonCode.ERR_INTERNAL) {
									response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR);
									response.setMessage(CommonMessage.RESPONSE_MESSAGE_FAILED);
								}
								else if(result == CommonCode.EXISTS) {
									response.setStatus(HttpStatus.BAD_REQUEST);
									response.setMessage(CommonMessage.ALREADY_EXISTS);
								}
								
								return Mono.just(response);
								
							}).onErrorResume(e -> {
								e.printStackTrace();
								return Mono.just(errorResponse);
							}).switchIfEmpty(Mono.just(errorResponse));
					});
	}
}

 

참고로 Webflux이므로 MultipartFile이 아니라 FilePart를 사용했다.

# FilePart

멀티파트 요청에서 단일 파일을 나타내며, 리액티브 스트림 프로그래밍에 적합한 인터페이스를 제공함.

 

또한 굳이 FilePart로 업로드된 파일을 별도로 저장한 후 다시 업로드하는 행위는 파일의 용량이 일정 수치 이상보다 높아지면 임시 파일에서 꺼내오지를 못해 저장 후 가져오도록 만들었다.

 

 

테스트

마지막으로 잘 동작하는지 테스트를 수행해보자.

 

프로그램을 실행시키고 Hue를 통해 hdfs를 확인해보면 아래 사진과 같이 BLOG_UPLOAD 폴더가 자동으로 생성되었음을 확인할 수 있다.

 

 

이제 아래와 같은 텍스트 파일을 생성했다.

 

이제 Postman을 통해 생성한 api에 데이터를 보내보자

Content-Type은 multipart/form-data로 두고, Body를 form-data로 두어 아래와 같이 데이터를 전송한다.

 

데이터를 전송해보면 아래와 같은 response가 출력되며 정상적으로 동작했음을 확인할 수 있다.

 

그리고 hdfs에도 정상적으로 파일이 업로드되었다.

 

그러면 다시 한 번 Send를 눌러보자. 눌렀을 때 이미 존재하는 파일이라는 response가 출력되면 정상적으로 개발이 완료된 것이다.

728x90
반응형