본문 바로가기
IT 지식/빅데이터 & 분석

NIFI 설치 및 사용법

by 이민우 2023. 5. 21.
728x90
반응형

현재 재직중인 회사가 빅데이터를 주로 다루는 회사이다보니, 프로그램 구축이 끝나고난 후 해당 프로그램을 통해 모인 데이터를 데이터 마트에 옮겨주는 작업까지 해야할 일이 종종 생겼다.

 

지난 프로젝트에서는 위의 작업을 Hue UI 내에서 Oozie와 Sqoop, 그리고 Hive의 유틸인 Beeline과 직접 만든 java application을 이용해서 데이터 마트 적재 플로우를 구축했다. 하지만 이번 프로젝트에서는 Hue가 설치되어 있지 않아 위와 같은 방법을 사용할 수 없었다. 그래서 대신 NIFI를 이용해 데이터 마트를 구축하기로 결정했다.

 

 

nifi

nifi란 데이터 ETL 툴 중 하나로, 분산환경에서 대량의 데이터를 수집하기에 용이한 프로그램이다.

nifi는 실시간 처리에 적합하고, 웹 UI를 지원하기에 간편한 사용이 가능하다는 장점이 있다. 다만 배치작업의 효율이 떨어지고, 복잡한 연산이 불가능하다는 단점이 있다.

 

이번 프로젝트에서는 복잡한 연산은 굳이 필요하지 않고 단순하게 데이터를 퍼오기만 하면 되기에 nifi를 이용한 데이터 마트를 구축해도 좋을 것 같다는 결론이 내려졌다.

 

nifi 설치

 

우선 nifi를 사용하기 위해서는 설치를 해야 한다.

만약 간편하게 nifi를 설치하고 활용하고 싶다면 아래와 같이 도커 기반으로 설치하면 된다.

docker run \
--name nifi \
-p 8443:8443 \
-d \
-e NIFI_WEB_HTTPS_PORTS='8443' \
-e SINGLE_USER_CREDENTIALS_USERNAME=nifi \
-e SINGLE_USER_CREDENTIALS_PASSWORD=password1234 \
apache/nifi:latest

위 설정에서 포트 설정은 8443이 기본이다. 만약 다른 포트를 원한다면 설정 후 -p 옵션에서 함께 설정한다.

사용자의 패스워드는 12자 이상이어야 하며, 사용자를 설정하지 않을 경우 별도의 로그인 과정 없이 로그인이 가능하다.

 

도커가 아닌 일반적인 설치를 원한다면 다음과 같이 설치할 수 있다.

우선 다음 사이트에서 버전을 먼저 확인하고 진행한다.

# wget, java 설치
# 굳이 기재하진 않겠지만, JAVA_HOME까지 설정해줘야 원활하게 작동한다. (안해줘도 돌아가긴 함)
yum install -y wget java-1.8.0-devel

# 바이너리 설치파일 다운로드
wget --no-check-certificate https://dlcdn.apache.org/nifi/1.21.0/nifi-1.21.0-bin.zip

# 압축 해제
unzip nifi-1.21.0-bin.zip

 

여기서 현재 포스팅에서 PostgreSQL을 주로 사용할 예정이므로, PostgreSQL JDBC Driver를 추가로 설치해준다.

cd nifi-1.21.0/lib
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.24/postgresql-42.2.24.jar

 

다음으로 nifi를 설정한다.

vim ./../conf/nifi.properties

# IP, Port 설정. 이 때 IP는 사용자가 직접 웹에 입력할 IP를 입력한다.
nifi.web.https.host=172.30.xx.xx
nifi.web.https.port=8443

 

설정이 완료되었다면 사용자를 생성한다. 비밀번호는 마찬가지로 12자 이상으로 설정한다.

만약 별도의 로그인이 필요하지 않다면 아래 과정은 생략해도 무방하다.

cd ./../bin
./nifi.sh set-single-user-credentials {id} {password}

 

마지막으로 NIFI의 메모리를 설정한다.

기본적으로 512 mb로 메모리가 설정되어 있는데, NIFI는 메모리를 많이 사용하기에 이 정도 메모리로는 Out of memory 에러가 발생할 확률이 높으므로 메모리를 올려준다.

vim ./../conf/bootstrap.conf

java.arg.2=-Xms2048m
java.arg.3=-Xms4096m

 

설정 완료되었다면 아래 명령어를 통해 실행한다.

cd ..
# 실행
./bin/nifi.sh start

# 종료
./bin/nifi.sh stop

 

nifi는 무거운 프로그램이기에 실행되는데 조금 긴 시간이 소요된다.

 

 

nifi RDB to RDB 플로우 만들기

 

우선 다음과 같은 테이블이 존재한다고 가정하자. 해당 테이블은 mart_db 테이블의 mart_scm 스키마 안에도 mart_tbl 이름으로 동일하게 생성되어있다.

test_db > test_scm > test_tbl 구조

 

https://{IP}:8443/nifi URL로 접속한 후 설정한 ID/PW로 로그인한다.

우선 RDB와 Mart에 접속하는 컨트롤러를 생성한다. 배경화면 우클릭 후 Configure를 선택한다.

 

아래와 같은 창이 나타나면 CONTROLLER SERVICES를 선택하고 우측 상단의 + 버튼을 누른다.

 

Controller Service 중 DBCPConnectionPool을 선택하고 다음과 같이 입력해준다.

 

이 작업으로 target_dbsource_db 두 개의 커넥션 풀을 생성하고 enable한다.

*스크린샷에서는 빼먹었는데, AvroReader도 만들어줘야 한다.

 

다음으로 이제 데이터 플로우를 생성한다.

크게 다음 로직을 따라 수행하도록 설정한다.

  1. Source DB에 타겟 테이블 존재여부 확인
  2. Target DB에 타겟 테이블 존재여부 확인
  3. Source DB로부터 데이터 추출
  4. Target DB에 추출한 데이터 UPSERT

 

먼저 테이블 존재 여부는 ExecuteSQL과 RouteOnAttribute를 사용해 구성한다.

추가로 에러, unmatch 내역은 로그 파일에 저장하도록 구성한다.

에러는 테스트 삼아 일부러 발생시켰다.

Name Type Properties
source db 테이블 확인 ExecuteSQL 1.21.0 Database Connection Pooling Service : source_db
SQL select query : SELECT * FROM information_Schema.tables WHERE table_name='test_tbl' AND table_schema='test_scm' AND table_catalog='test_db';
결과가 1인지 확인 RouteOnAttribute 1.21.0 Routing Strategy : Route to 'matched' if any matches
checkResult : ${executesql.row.count:equals(1)}
에러 로깅 PutFile 1.21.0 Directory : /tmp/rdb/${now():toNumber():minus(864000000):format('yyyy-MM-dd')}/
Conflict Resolution Strategy : replace
Create Missing Directories : true

*checkResult는 직접 만드는 것으로, 결과의 갯수가 1인지 확인한다. (테이블이 존재하지 않으면 0이기 때문)

*target db에 대한 내용은 위를 참고하여 생성하면 되므로 기재하지 않았다.

*에러는 위와 같이 설정할 경우 /tmp/rdb/날짜/ 폴더가 생성되며, 플로우 내용이 기재된다.

 

다음으로 데이터를 퍼오는 부분은 아래와 같이 설정한다.

Name Type Properties
데이터 퍼오기 ExecuteSQL 1.21.0 Database Connection Pooling Service : source_db
SQL select query : SELECT * FROM test_scm.test_tbl;
퍼온 데이터 저장 PutDatabaseRecord 1.21.0 Record Reader : AvroReader
Database Type : PostgreSQL
Statement Type : UPSERT
Database Connection Pooling Serivce : target_db
Schema Name : mart_scm
Table Name : mart_tbl

 

이대로 실행해도 되지만, 그러면 계속해서 데이터를 받아오므로 스케줄링을 설정하고 실행한다.

 

물론 Stopped 상태에서는 우클릭을 하면 "Run Once" 기능이 있는데, 이걸 누르면 이름 그대로 단 한 번만 실행된다.

다른 모든 프로세서는 Run 상태로 두고, 시작이 되는 프로세서만 Stopped 상태에서 Run Once 기능을 실행해주면 된다.

하지만 스케줄링을 거는 방법도 기재해놓기 위해 스케줄링을 기반으로 1분에 한 번 실행되도록 설정한다.

 

스케줄링은 시작이 되는 프로세서에만 걸어주면 되므로, "source db 테이블 확인" 프로세서에 걸어준다.

Cron 기반이며, nifi cron은 "초 분 시 일 월 요일" 이다.

1분에 한 번 실행

 

 

이제 전체 프로세서를 실행하면 매 분마다 데이터 플로우가 동작하며, 아래와 같이 타겟 DB에 데이터가 정상적으로 입력됨을 확인할 수 있다.

 

 

PostgreSQL의 Array 타입은 지원하지 않음!

PostgreSQL에는 Character Varying[] 혹은 Integer[] 같은 Array 형 변수가 존재할 수 있다.
다만 이 부분은 실험 결과 NIFI로 퍼올 때 테이블 구조를 bytes로 퍼오기 때문에, 퍼오는 것은 가능하나 다른 데이터베이스에 삽입할 때는 에러가 발생한다. (Object를 Array로 변환할 수 없다는 에러가 발생)

 

HIVE to RDB

 

Hadoop의 DB (Hive, HBase)의 경우 하는 방법은 알아냈으나, 현재 포스팅을 작성중인 집 컴퓨터에는 하둡이 설치되어 있지 않아 회사에서 연습했던 걸 기반으로 그냥 어떻게 하는 지 정도만 기재해놓을까 한다.

 

우선 Hive에 연결할 컨트롤러를 작성한다.

Name Type Properties
hive_db HiveConnectionPool Database Connection URL : jdbc:hive://{ip}:{port}/{db}
Database User : hadoop
Database Password : {password}

 

다음으로 데이터 플로우를 작성한다. 순서는 RDB와 마찬가지로 아래와 같이 지정한다. (위와 중복되는 부분은 - 로 기재한다.)

Name Type Properties
Hive 테이블 존재여부 확인 SelectHiveQL Hive Database Connection Pooling Service : hive_db
HiveQL Select Query : SHOW TABLES IN default LIKE '{table_name}'
Output Format : Avro
Normalize Table/Columns Names : false
결과가 1인지 확인 RouteOnAttribute Routing Strategy : Route to 'matched' if any matches
checkResult : ${selecthiveql.row.count:equals(1)}
RDB 테이블 존재여부 확인 - -
결과가 1인지 확인 - -
Hive 데이터 추출 SelectHiveQL Hive Database Connection Pooling Service : hive_db
HiveQL Select Query : SELECT * FROM '{table_name}'
Output Format : Avro
Normalize Table/Columns Names : false
추출한 데이터 저장 - -
에러 로깅 - -

 

 

HBase to RDB

 

마지막은 HBase의 데이터를 RDB에 저장하는 작업이다.

개인적으로 Hive/RDB to RDB는 중간에 처리하는 일 없이 그대로 삽입할 수 있었으나, HBase는 그대로 삽입을 할 수가 없어 이 부분이 조금 어려웠던 것 같다.

 

그래서 떠올린 방법은 ReplaceText로 HBase 양식대로 퍼와진 데이터를 RDB에 삽입할 수 있는 양식으로 변경하는 것이다.

 

마찬가지로 집 컴퓨터에 하둡이 설치되어 있지 않아 회사에서 한 연습한 내용만 기재한다.

 

컨트롤러는 아래와 같이 구성한다.

*JsonReader도 필요했던 것으로 기억한다. 굳이 아래에 추가하진 않았다.

Name Type Properties
hbase_db HBase_1_1_2_ClientService Zookeeper Quorum : {hostname}
Zookeeper Client Port : 2181
Zookeeper ZNode Parent : /hbase

 

데이터 플로우는 아래와 같이 구성했다.

참고로 테이블 존재 여부 확인은 HBase에서 어떻게 하는지 몰라 제외했다.

Name Type Properties
HBase 데이터 추출 GetHBase HBase Client Service : HBaseDB
Table Name : {table}
결과 Flatten FlattenJson -
텍스트 제거 ReplaceText Search Value : cells.
Replacement Value : "Set empty string" 선택
추출한 데이터 저장 - -
에러 로깅 - -

*결과를 flatten 하면 각 컬럼값이 cells.{column}이 되므로, 텍스트 제거 단계에서 "cells."를 제거했다.

*이 부분은 테이블을 어떻게 구성했느냐에 따라 다르므로, 텍스트 제거 프로세서를 여러 개 두고 데이터 타입에 맞게 작업하면 된다.

728x90
반응형