본문 바로가기
실습/리눅스 서버 + 스프링 부트

Spring Batch

by 이민우 2023. 7. 19.
728x90
반응형

조금 옛날 일이긴 한데, 데이터 코어라는 모듈을 개발하던 중 적게는 수 개에서 많게는 수만 개의 데이터를 일괄적으로 전송받아 처리하는 Batch 모듈을 개발한 경험이 있다.

 

사실 이 때 이름도 이름인지라 개발을 "Spring Batch"로 하려고 했던 적이 있다. Spring Batch에 대한 경험은 없었으나, 그냥 이름이 동일하고 대량의 데이터를 처리하기 적합한 기술이라는 것 정도는 알고 있었기 때문이다.

 

하지만 실제로는 Spring Batch로는 이를 구현할 수 없었다. 컨트롤러에서 받은 사용자의 입력을 스프링 배치로 선언한 job에 넣어주려고 보니, 도저히 넣을 수 있는 파라미터 설정 방법이 보이지 않았다. 스프링 배치는 애초에 컨트롤러에서 데이터를 받아 이를 처리하는 모듈이 아니라, 그냥 내부에서 정기적으로 실행되는 작업을 자동화하는 것이 불과했다. 즉 주기적인 로그 데이터 분석이나 통계 등에 사용되는 기술이지, 사용자의 입력을 토대로 비즈니스 로직을 돌릴 수 있는 툴이 아니었다.

 

 

Spring Batch

스프링 배치는 복잡하고 대량의 데이터를 처리하는 배치 어플리케이션 개발을 위한 프레임워크이다.

 

스프링 배치의 특징은 아래와 같다.

  • Job, Step, Tasklet 등 다양한 컴포넌트를 제공하고, 체계적인 구조로 배치 어플리케이션 생성이 가능하다.
  • Quartz 등 다른 스케줄링 라이브러리와 연동이 가능하다.
  • 배치 처리 중 발생하는 예외 상황에 대해 재시도, 건너뛰기 등 다양한 기능을 제공하기에 자동화가 가능하다.
  • 여러 스레드에서 병렬로 데이터를 처리하는 기능과 여러 step을 병렬로 수행하는 기능을 제공해 병렬 처리가 뛰어나다.

 

다음으로 장점은 아래와 같다.

  • 청크, 페이지네이션 등을 제공하기에 대용량 데이터 처리에 적합하다.
  • 쓰레드, 트랜잭션 등 자원을 효율적으로 관리한다.
  • 확장성과 유연성이 뛰어나다.

마지막으로 단점은 간단한 작업을 배치로 만들면 오히려 복잡성과 오버헤드가 증가될 수 있다.

 

 

추가로 Spring Batch에서 사용하는 용어들이 헷갈리는데, 아래와 같이 정리할 수 있을 것 같다.

  • Job: 배치 처리 과정을 포괄하는 객체로, 전체 배치 처리에 대한 단위 작업. Job 안에는 여러 개의 Step이 포함.
  • Step: Job을 구성하는 하나의 단위 작업으로, ItemReader, ItemProcessor, ItemWriter를 포함.
  • JobInstance: Job Parameter에 따라 구분되는 Job 실행의 논리적인 실행 단위. 같은 파라미터로 Job을 재실행하는 경우 같은 JobInstance를 사용.
  • JobExecution: JobInstance가 한 번 실행되는 것을 의미. JobInstance는 여러 번 실행될 수 있으므로, JobExecution은 여러 개가 될 수 있음.
  • JobParameter: 배치 실행 시점에 외부에서 받을 수 있는 파라미터. JobInstance를 구분하는 기준.
  • ItemReader: 데이터를 읽는 역할을 하는 인터페이스.
  • ItemProcessor: 읽은 데이터를 가공하는 역할을 하는 인터페이스.
  • ItemWriter: 가공한 데이터를 다시 쓰는 역할을 하는 인터페이스.

 

실습

이제 실습을 해볼까 한다.

 

바로 위에서 간단한 작업을 배치로 만들면 안된다고 말해놓고 간단한 예시를 들기 민망하지만,

아래와 같은 테이블이 존재한다고 가정한다.

딱 봐도 알 수 있겠지만, 하단의 access_log는 특정 API에 대한 접근자에 대한 히스토리 정보이고, access_statistics는 각 API에 대한 시간별 접근 통계이다.

 

이제부터 Spring Batch를 이용해 매 00분마다 access_log의 데이터를 읽어와 API 접근 통계에 통계 정보를 저장하려고 한다.

 

우선 임시 데이터를 넣어주었다.

CREATE TABLE IF NOT EXISTS access_log (
	access_time 	TIMESTAMP 	NOT NULL,
	access_user 	VARCHAR(50) 	NOT NULL,
	access_url	VARCHAR(50) 	NOT NULL,
	access_result	BOOLEAN 	NULL,
	PRIMARY KEY (access_time, access_user, access_url)
);

CREATE TABLE IF NOT EXISTS access_statistics (
	access_time 	VARCHAR(15)		NOT NULL,
	access_url	VARCHAR(50)		NOT NULL,
	access_success	INT			DEFAULT 0,
	access_fail	INT			DEFAULT 0,
	PRIMARY KEY (access_time, access_url)
);

DELETE FROM access_log;

INSERT INTO access_log VALUES 
	(DATE_SUB(NOW(), INTERVAL 1 MINUTE), 'a', '/test1', true),
	(DATE_SUB(NOW(), INTERVAL 2 MINUTE), 'b', '/test1', false),
	(DATE_SUB(NOW(), INTERVAL 3 MINUTE), 'c', '/test2', null),
	(DATE_SUB(NOW(), INTERVAL 4 MINUTE), 'd', '/test2', true),
	(DATE_SUB(NOW(), INTERVAL 5 MINUTE), 'a', '/test1', true),
	(DATE_SUB(NOW(), INTERVAL 6 MINUTE), 'b', '/test2', true),
	(DATE_SUB(NOW(), INTERVAL 7 MINUTE), 'c', '/test1', null),
	(DATE_SUB(NOW(), INTERVAL 8 MINUTE), 'd', '/test1', false),
	(DATE_SUB(NOW(), INTERVAL 9 MINUTE), 'a', '/test1', true),
	(DATE_SUB(NOW(), INTERVAL 8 MINUTE), 'b', '/test2', true),
	(DATE_SUB(NOW(), INTERVAL 7 MINUTE), 'c', '/test2', false),
	(DATE_SUB(NOW(), INTERVAL 6 MINUTE), 'd', '/test3', null),
	(DATE_SUB(NOW(), INTERVAL 5 MINUTE), 'd', '/test3', true),
	(DATE_SUB(NOW(), INTERVAL 4 MINUTE), 'a', '/test2', true),
	(DATE_SUB(NOW(), INTERVAL 3 MINUTE), 'c', '/test1', true),
	(DATE_SUB(NOW(), INTERVAL 2 MINUTE), 'b', '/test3', false),
	(DATE_SUB(NOW(), INTERVAL 1 MINUTE), 'a', '/test3', true),
	(DATE_SUB(NOW(), INTERVAL 2 MINUTE), 'c', '/test3', null),
	(DATE_SUB(NOW(), INTERVAL 3 MINUTE), 'f', '/test2', true),
	(DATE_SUB(NOW(), INTERVAL 4 MINUTE), 'd', '/test3', true),
	(DATE_SUB(NOW(), INTERVAL 5 MINUTE), 'b', '/test1', true),
	(DATE_SUB(NOW(), INTERVAL 6 MINUTE), 'q', '/test2', true),
	(DATE_SUB(NOW(), INTERVAL 7 MINUTE), 'd', '/test3', null),
	(DATE_SUB(NOW(), INTERVAL 8 MINUTE), 'z', '/test1', true),
	(DATE_SUB(NOW(), INTERVAL 9 MINUTE), 'k', '/test2', false),
	(DATE_SUB(NOW(), INTERVAL 8 MINUTE), 'l', '/test3', true),
	(DATE_SUB(NOW(), INTERVAL 7 MINUTE), 'a', '/test1', true),
	(DATE_SUB(NOW(), INTERVAL 6 MINUTE), 'l', '/test3', null),
	(DATE_SUB(NOW(), INTERVAL 5 MINUTE), 'k', '/test2', true),
	(DATE_SUB(NOW(), INTERVAL 4 MINUTE), 'j', '/test3', true),
	(DATE_SUB(NOW(), INTERVAL 3 MINUTE), 'i', '/test1', null),
	(DATE_SUB(NOW(), INTERVAL 2 MINUTE), 'q', '/test2', false),
	(DATE_SUB(NOW(), INTERVAL 1 MINUTE), 'p', '/test1', true);

이제 테스트용 코드 작성을 시작해보자.

우선 아래 dependencies를 이용해 스프링 부트 프로젝트를 생성했다.

 

JPA가 dependency에 포함된 것을 보면 알겠지만, Repository와 JPA를 이용해 만들 예정이다.

 

그러므로 우선 두 개의 테이블에 대한 Entity 클래스를 생성했다. 참고로 PK가 여러개이기에 pk 클래스도 함께 작성했다.

 

AccessLog.java

package com.mwlee.batchtest.domain;

import java.util.Date;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.IdClass;
import javax.persistence.Table;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Table(name="access_log")
@ToString
@IdClass(AccessLogPk.class)
public class AccessLog {
	@Id
	private Date accessTime;
	@Id
	private String accessUser;
	@Id
	private String accessUrl;
	
	private Boolean accessResult;
}

AccessLogPk.java

package com.mwlee.batchtest.domain;

import java.io.Serializable;
import java.util.Date;

import javax.persistence.Id;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@ToString
public class AccessLogPk implements Serializable {
	@Id
	private Date accessTime;
	@Id
	private String accessUser;
	@Id
	private String accessUrl;
}

AccessStatistics.java

package com.mwlee.batchtest.domain;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.IdClass;
import javax.persistence.Table;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Table(name="access_statistics")
@ToString
@IdClass(AccessStatisticsPk.class)
public class AccessStatistics {
	@Id
	private String accessTime;
	@Id
	private String accessUrl;
	
	private long accessSuccess;
	private long accessFail;
}

AccessStatisticsPk.java

package com.mwlee.batchtest.domain;

import java.io.Serializable;

import javax.persistence.Id;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@ToString
public class AccessStatisticsPk implements Serializable {
	@Id
	private String accessTime;
	@Id
	private String accessUrl;

}

 

그리고 각 엔티티 클래스에 대한 Repository를 작성한다.

AccessLogRepository.java

package com.mwlee.batchtest.repository;

import java.util.Date;
import java.util.List;
import java.util.Map;

import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

import com.mwlee.batchtest.domain.AccessLog;
import com.mwlee.batchtest.domain.AccessLogPk;

@Repository
public interface AccessLogRepository extends CrudRepository<AccessLog, AccessLogPk> {
	
	// 한 시간 이내 데이터 불러오기용 함수 : 따로 VO로 선언하기 귀찮아서 List<Map>으로 반환.
	@Query("	SELECT new map("
			+ "		function('date_format', a.accessTime, '%Y-%m-%d %H') as accessTime, "
			+ "		a.accessUrl as accessUrl, "
			+ "		SUM(CASE 	WHEN a.accessResult = true THEN 1 ELSE 0 END) as success, "
			+ "		SUM(CASE 	WHEN a.accessResult = false THEN 1 "
			+ "					WHEN a.accessResult = null THEN 1 "
			+ "					ELSE 0 END) as fail "
			+ "	) "
			+ "	FROM "
			+ "		AccessLog a "
			+ "	WHERE "
			+ "		a.accessTime BETWEEN :start AND :end "
			+ "	GROUP BY "
			+ "		accessTime, a.accessUrl")
    List<Map<String, Object>> aggregateAccessLog(Date start, Date end);
	
}

AccessStatisticsRepository.java

package com.mwlee.batchtest.repository;

import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

import com.mwlee.batchtest.domain.AccessStatistics;
import com.mwlee.batchtest.domain.AccessStatisticsPk;

@Repository
public interface AccessStatisticsRepository extends CrudRepository<AccessStatistics, AccessStatisticsPk> {

}

 

다음으로 이제 가장 중요한 배치를 만들자.

 

배치를 생성하기 전에 우선 해야할 작업이 있다. 배치 정보를 저장할 DB 테이블을 만드는 것이다.

배치는 메타 데이터를 저장하기 위해 몇 가지 테이블을 사용하는데, 이 테이블들은 배치 잡의 상태 추적에 사용되고, 만약 존재하지 않으면 배치에서 아래 에러가 발생한다.

Table 'testdb.BATCH_JOB_INSTANCE' doesn't exist

 

그렇다고 테이블을 직접 생성해줘야 하느냐? 그건 또 아니다. 그냥 application.properties에 아래 옵션만 추가해주면 된다.

spring.batch.jdbc.initialize-schema=always

 

만약 테이블을 추가하기 싫으면 embedded 옵션을 사용하면 된다.

 

설정 완료 후 BatchConfiguration을 생성해보자.

 

BatchConfiguration.java

package com.mwlee.batchtest.batch.config;

import java.util.Calendar;
import java.util.Date;
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.mwlee.batchtest.domain.AccessStatistics;
import com.mwlee.batchtest.repository.AccessLogRepository;
import com.mwlee.batchtest.repository.AccessStatisticsRepository;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
	
	@Autowired private JobBuilderFactory jobBuilderFactory;
	@Autowired private StepBuilderFactory stepBuilderFactory;
	@Autowired private AccessLogRepository accessLogRepository;
	@Autowired private AccessStatisticsRepository accessStatisticsRepository;

	
	@Bean
	public ItemReader<Map<String, Object>> accessLogReader() {
		
		// 매 0분에 실행되므로 직전 시간의 0분0초~59분59초 까지
		Calendar calendar = Calendar.getInstance();
		calendar.add(Calendar.HOUR, -1); // 1시간 전 > 만약 00시일 경우 23시로 넘어가며 일도 바뀜
		calendar.set(Calendar.MINUTE, 0); // 0분
		calendar.set(Calendar.SECOND, 0); // 0초
		calendar.set(Calendar.MILLISECOND, 0); // 0밀리세컨드
		
		Date startDate = calendar.getTime();
		
		calendar.set(Calendar.MINUTE, 59);
		calendar.set(Calendar.SECOND, 59);
		calendar.set(Calendar.MILLISECOND, 999);
		
		Date endDate = calendar.getTime();
		
		// 00분 00초 000밀리초 ~ 59분 59초 999밀리초 데이터 반환
		return new ListItemReader<>(accessLogRepository.aggregateAccessLog(startDate, endDate));
	}
	
	@Bean
	public ItemProcessor<Map<String, Object>, AccessStatistics> aggregateLog() {
		return item -> {
			log.info("@@@ {}" + item.toString());
			
			AccessStatistics statistics = new AccessStatistics();

			statistics.setAccessTime(item.get("accessTime").toString());
			statistics.setAccessUrl(item.get("accessUrl").toString());
			statistics.setAccessSuccess((long) item.get("success"));
			statistics.setAccessFail((long) item.get("fail"));
			
			return statistics;
		};
	}
	
	@Bean
	public ItemWriter<AccessStatistics> statisticsWriter() {
		return items -> {
			accessStatisticsRepository.saveAll(items);
			log.info("{} saved!" + items);
		};
	}
	
	@Bean
	public Step aggregateLogStep(
			ItemReader<Map<String, Object>> accessLogReader, 
			ItemProcessor<Map<String, Object>, AccessStatistics> aggregateLog, 
			ItemWriter<AccessStatistics> statisticsWriter
			) {
		return stepBuilderFactory.get("aggregateLogStep")
				// 한 번에 수행할 작업 설정
				// 만약 예시대로 쿼리 입력 시 reader에서 3개의 데이터가 반환되는데, 
				// 만약 아래처럼 2로 해놓으면 2 개가 동시에 processor에서 실행되어 writer로 넘어가고,
				// 나머지 1개가 따로 실행됨.
				// reader, writer에 log.info를 찍어보면 알 수 있음.
				.<Map<String, Object>, AccessStatistics>chunk(2)
				// 로직 수행
				.reader(accessLogReader)
				.processor(aggregateLog)
				.writer(statisticsWriter)
				// 특정 에러 발생 시 3회 재수행
				.faultTolerant()
				.retry(Exception.class)
				.retryLimit(3)
				// 빌드
				.build();
	}
	
	@Bean
	public Job aggregateLogJob(Step aggregateLogStep) {
		return jobBuilderFactory.get("aggregateLogJob")
				.start(aggregateLogStep)
				.build();
	}
}

 

작성 완료 후 테스트를 수행해본다.

    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job aggregateLogJob;

	@Test
	void contextLoads() {
		
        JobParameters param = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();

        try {
			jobLauncher.run(aggregateLogJob, param);
		} catch (JobExecutionAlreadyRunningException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JobRestartException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JobInstanceAlreadyCompleteException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JobParametersInvalidException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

 

이렇게 동작시키고 DB를 확인하면 배치가 정상 작동해서 데이터가 들어와 있음을 확인할 수 있다.

 

마지막으로 매 0분 0초에 해당 로직이 작동하도록 Test 코드를 Scheduler에 옮겨주기만 하면 자동 집계 기능이 완성된다.

 

SchedulingConfig.java

package com.mwlee.batchtest.batch.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@EnableScheduling
@Component
public class SchedulingConfig {
	

    @Autowired private JobLauncher jobLauncher;
    @Autowired private Job aggregateLogJob;

	@Scheduled(cron = "0 0 * * * ?") // 매 0분 0초
	public void aggregateLog() {

        JobParameters param = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();

        try {
			jobLauncher.run(aggregateLogJob, param);
		} catch (JobExecutionAlreadyRunningException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JobRestartException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JobInstanceAlreadyCompleteException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JobParametersInvalidException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
}

 

728x90
반응형

'실습 > 리눅스 서버 + 스프링 부트' 카테고리의 다른 글

동시성 제어  (0) 2023.10.16
[Apache Server] 리버스 프록시_CentOS, Ubuntu  (1) 2023.09.28
OSIV  (0) 2023.07.14
[SPRING JPA] N+1 문제  (0) 2023.07.04
Spring Boot + Mybatis  (0) 2023.07.02