조금 옛날 일이긴 한데, 데이터 코어라는 모듈을 개발하던 중 적게는 수 개에서 많게는 수만 개의 데이터를 일괄적으로 전송받아 처리하는 Batch 모듈을 개발한 경험이 있다.
사실 이 때 이름도 이름인지라 개발을 "Spring Batch"로 하려고 했던 적이 있다. Spring Batch에 대한 경험은 없었으나, 그냥 이름이 동일하고 대량의 데이터를 처리하기 적합한 기술이라는 것 정도는 알고 있었기 때문이다.
하지만 실제로는 Spring Batch로는 이를 구현할 수 없었다. 컨트롤러에서 받은 사용자의 입력을 스프링 배치로 선언한 job에 넣어주려고 보니, 도저히 넣을 수 있는 파라미터 설정 방법이 보이지 않았다. 스프링 배치는 애초에 컨트롤러에서 데이터를 받아 이를 처리하는 모듈이 아니라, 그냥 내부에서 정기적으로 실행되는 작업을 자동화하는 것이 불과했다. 즉 주기적인 로그 데이터 분석이나 통계 등에 사용되는 기술이지, 사용자의 입력을 토대로 비즈니스 로직을 돌릴 수 있는 툴이 아니었다.
Spring Batch
스프링 배치는 복잡하고 대량의 데이터를 처리하는 배치 어플리케이션 개발을 위한 프레임워크이다.
스프링 배치의 특징은 아래와 같다.
- Job, Step, Tasklet 등 다양한 컴포넌트를 제공하고, 체계적인 구조로 배치 어플리케이션 생성이 가능하다.
- Quartz 등 다른 스케줄링 라이브러리와 연동이 가능하다.
- 배치 처리 중 발생하는 예외 상황에 대해 재시도, 건너뛰기 등 다양한 기능을 제공하기에 자동화가 가능하다.
- 여러 스레드에서 병렬로 데이터를 처리하는 기능과 여러 step을 병렬로 수행하는 기능을 제공해 병렬 처리가 뛰어나다.
다음으로 장점은 아래와 같다.
- 청크, 페이지네이션 등을 제공하기에 대용량 데이터 처리에 적합하다.
- 쓰레드, 트랜잭션 등 자원을 효율적으로 관리한다.
- 확장성과 유연성이 뛰어나다.
마지막으로 단점은 간단한 작업을 배치로 만들면 오히려 복잡성과 오버헤드가 증가될 수 있다.
추가로 Spring Batch에서 사용하는 용어들이 헷갈리는데, 아래와 같이 정리할 수 있을 것 같다.
- Job: 배치 처리 과정을 포괄하는 객체로, 전체 배치 처리에 대한 단위 작업. Job 안에는 여러 개의 Step이 포함.
- Step: Job을 구성하는 하나의 단위 작업으로, ItemReader, ItemProcessor, ItemWriter를 포함.
- JobInstance: Job Parameter에 따라 구분되는 Job 실행의 논리적인 실행 단위. 같은 파라미터로 Job을 재실행하는 경우 같은 JobInstance를 사용.
- JobExecution: JobInstance가 한 번 실행되는 것을 의미. JobInstance는 여러 번 실행될 수 있으므로, JobExecution은 여러 개가 될 수 있음.
- JobParameter: 배치 실행 시점에 외부에서 받을 수 있는 파라미터. JobInstance를 구분하는 기준.
- ItemReader: 데이터를 읽는 역할을 하는 인터페이스.
- ItemProcessor: 읽은 데이터를 가공하는 역할을 하는 인터페이스.
- ItemWriter: 가공한 데이터를 다시 쓰는 역할을 하는 인터페이스.
실습
이제 실습을 해볼까 한다.
바로 위에서 간단한 작업을 배치로 만들면 안된다고 말해놓고 간단한 예시를 들기 민망하지만,
아래와 같은 테이블이 존재한다고 가정한다.
딱 봐도 알 수 있겠지만, 하단의 access_log는 특정 API에 대한 접근자에 대한 히스토리 정보이고, access_statistics는 각 API에 대한 시간별 접근 통계이다.
이제부터 Spring Batch를 이용해 매 00분마다 access_log의 데이터를 읽어와 API 접근 통계에 통계 정보를 저장하려고 한다.
우선 임시 데이터를 넣어주었다.
CREATE TABLE IF NOT EXISTS access_log (
access_time TIMESTAMP NOT NULL,
access_user VARCHAR(50) NOT NULL,
access_url VARCHAR(50) NOT NULL,
access_result BOOLEAN NULL,
PRIMARY KEY (access_time, access_user, access_url)
);
CREATE TABLE IF NOT EXISTS access_statistics (
access_time VARCHAR(15) NOT NULL,
access_url VARCHAR(50) NOT NULL,
access_success INT DEFAULT 0,
access_fail INT DEFAULT 0,
PRIMARY KEY (access_time, access_url)
);
DELETE FROM access_log;
INSERT INTO access_log VALUES
(DATE_SUB(NOW(), INTERVAL 1 MINUTE), 'a', '/test1', true),
(DATE_SUB(NOW(), INTERVAL 2 MINUTE), 'b', '/test1', false),
(DATE_SUB(NOW(), INTERVAL 3 MINUTE), 'c', '/test2', null),
(DATE_SUB(NOW(), INTERVAL 4 MINUTE), 'd', '/test2', true),
(DATE_SUB(NOW(), INTERVAL 5 MINUTE), 'a', '/test1', true),
(DATE_SUB(NOW(), INTERVAL 6 MINUTE), 'b', '/test2', true),
(DATE_SUB(NOW(), INTERVAL 7 MINUTE), 'c', '/test1', null),
(DATE_SUB(NOW(), INTERVAL 8 MINUTE), 'd', '/test1', false),
(DATE_SUB(NOW(), INTERVAL 9 MINUTE), 'a', '/test1', true),
(DATE_SUB(NOW(), INTERVAL 8 MINUTE), 'b', '/test2', true),
(DATE_SUB(NOW(), INTERVAL 7 MINUTE), 'c', '/test2', false),
(DATE_SUB(NOW(), INTERVAL 6 MINUTE), 'd', '/test3', null),
(DATE_SUB(NOW(), INTERVAL 5 MINUTE), 'd', '/test3', true),
(DATE_SUB(NOW(), INTERVAL 4 MINUTE), 'a', '/test2', true),
(DATE_SUB(NOW(), INTERVAL 3 MINUTE), 'c', '/test1', true),
(DATE_SUB(NOW(), INTERVAL 2 MINUTE), 'b', '/test3', false),
(DATE_SUB(NOW(), INTERVAL 1 MINUTE), 'a', '/test3', true),
(DATE_SUB(NOW(), INTERVAL 2 MINUTE), 'c', '/test3', null),
(DATE_SUB(NOW(), INTERVAL 3 MINUTE), 'f', '/test2', true),
(DATE_SUB(NOW(), INTERVAL 4 MINUTE), 'd', '/test3', true),
(DATE_SUB(NOW(), INTERVAL 5 MINUTE), 'b', '/test1', true),
(DATE_SUB(NOW(), INTERVAL 6 MINUTE), 'q', '/test2', true),
(DATE_SUB(NOW(), INTERVAL 7 MINUTE), 'd', '/test3', null),
(DATE_SUB(NOW(), INTERVAL 8 MINUTE), 'z', '/test1', true),
(DATE_SUB(NOW(), INTERVAL 9 MINUTE), 'k', '/test2', false),
(DATE_SUB(NOW(), INTERVAL 8 MINUTE), 'l', '/test3', true),
(DATE_SUB(NOW(), INTERVAL 7 MINUTE), 'a', '/test1', true),
(DATE_SUB(NOW(), INTERVAL 6 MINUTE), 'l', '/test3', null),
(DATE_SUB(NOW(), INTERVAL 5 MINUTE), 'k', '/test2', true),
(DATE_SUB(NOW(), INTERVAL 4 MINUTE), 'j', '/test3', true),
(DATE_SUB(NOW(), INTERVAL 3 MINUTE), 'i', '/test1', null),
(DATE_SUB(NOW(), INTERVAL 2 MINUTE), 'q', '/test2', false),
(DATE_SUB(NOW(), INTERVAL 1 MINUTE), 'p', '/test1', true);
이제 테스트용 코드 작성을 시작해보자.
우선 아래 dependencies를 이용해 스프링 부트 프로젝트를 생성했다.
JPA가 dependency에 포함된 것을 보면 알겠지만, Repository와 JPA를 이용해 만들 예정이다.
그러므로 우선 두 개의 테이블에 대한 Entity 클래스를 생성했다. 참고로 PK가 여러개이기에 pk 클래스도 함께 작성했다.
AccessLog.java
package com.mwlee.batchtest.domain;
import java.util.Date;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.IdClass;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Table(name="access_log")
@ToString
@IdClass(AccessLogPk.class)
public class AccessLog {
@Id
private Date accessTime;
@Id
private String accessUser;
@Id
private String accessUrl;
private Boolean accessResult;
}
AccessLogPk.java
package com.mwlee.batchtest.domain;
import java.io.Serializable;
import java.util.Date;
import javax.persistence.Id;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Getter
@Setter
@ToString
public class AccessLogPk implements Serializable {
@Id
private Date accessTime;
@Id
private String accessUser;
@Id
private String accessUrl;
}
AccessStatistics.java
package com.mwlee.batchtest.domain;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.IdClass;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Table(name="access_statistics")
@ToString
@IdClass(AccessStatisticsPk.class)
public class AccessStatistics {
@Id
private String accessTime;
@Id
private String accessUrl;
private long accessSuccess;
private long accessFail;
}
AccessStatisticsPk.java
package com.mwlee.batchtest.domain;
import java.io.Serializable;
import javax.persistence.Id;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Getter
@Setter
@ToString
public class AccessStatisticsPk implements Serializable {
@Id
private String accessTime;
@Id
private String accessUrl;
}
그리고 각 엔티티 클래스에 대한 Repository를 작성한다.
AccessLogRepository.java
package com.mwlee.batchtest.repository;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;
import com.mwlee.batchtest.domain.AccessLog;
import com.mwlee.batchtest.domain.AccessLogPk;
@Repository
public interface AccessLogRepository extends CrudRepository<AccessLog, AccessLogPk> {
// 한 시간 이내 데이터 불러오기용 함수 : 따로 VO로 선언하기 귀찮아서 List<Map>으로 반환.
@Query(" SELECT new map("
+ " function('date_format', a.accessTime, '%Y-%m-%d %H') as accessTime, "
+ " a.accessUrl as accessUrl, "
+ " SUM(CASE WHEN a.accessResult = true THEN 1 ELSE 0 END) as success, "
+ " SUM(CASE WHEN a.accessResult = false THEN 1 "
+ " WHEN a.accessResult = null THEN 1 "
+ " ELSE 0 END) as fail "
+ " ) "
+ " FROM "
+ " AccessLog a "
+ " WHERE "
+ " a.accessTime BETWEEN :start AND :end "
+ " GROUP BY "
+ " accessTime, a.accessUrl")
List<Map<String, Object>> aggregateAccessLog(Date start, Date end);
}
AccessStatisticsRepository.java
package com.mwlee.batchtest.repository;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;
import com.mwlee.batchtest.domain.AccessStatistics;
import com.mwlee.batchtest.domain.AccessStatisticsPk;
@Repository
public interface AccessStatisticsRepository extends CrudRepository<AccessStatistics, AccessStatisticsPk> {
}
다음으로 이제 가장 중요한 배치를 만들자.
배치를 생성하기 전에 우선 해야할 작업이 있다. 배치 정보를 저장할 DB 테이블을 만드는 것이다.
배치는 메타 데이터를 저장하기 위해 몇 가지 테이블을 사용하는데, 이 테이블들은 배치 잡의 상태 추적에 사용되고, 만약 존재하지 않으면 배치에서 아래 에러가 발생한다.
Table 'testdb.BATCH_JOB_INSTANCE' doesn't exist
그렇다고 테이블을 직접 생성해줘야 하느냐? 그건 또 아니다. 그냥 application.properties에 아래 옵션만 추가해주면 된다.
spring.batch.jdbc.initialize-schema=always
만약 테이블을 추가하기 싫으면 embedded 옵션을 사용하면 된다.
설정 완료 후 BatchConfiguration을 생성해보자.
BatchConfiguration.java
package com.mwlee.batchtest.batch.config;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.mwlee.batchtest.domain.AccessStatistics;
import com.mwlee.batchtest.repository.AccessLogRepository;
import com.mwlee.batchtest.repository.AccessStatisticsRepository;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired private JobBuilderFactory jobBuilderFactory;
@Autowired private StepBuilderFactory stepBuilderFactory;
@Autowired private AccessLogRepository accessLogRepository;
@Autowired private AccessStatisticsRepository accessStatisticsRepository;
@Bean
public ItemReader<Map<String, Object>> accessLogReader() {
// 매 0분에 실행되므로 직전 시간의 0분0초~59분59초 까지
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.HOUR, -1); // 1시간 전 > 만약 00시일 경우 23시로 넘어가며 일도 바뀜
calendar.set(Calendar.MINUTE, 0); // 0분
calendar.set(Calendar.SECOND, 0); // 0초
calendar.set(Calendar.MILLISECOND, 0); // 0밀리세컨드
Date startDate = calendar.getTime();
calendar.set(Calendar.MINUTE, 59);
calendar.set(Calendar.SECOND, 59);
calendar.set(Calendar.MILLISECOND, 999);
Date endDate = calendar.getTime();
// 00분 00초 000밀리초 ~ 59분 59초 999밀리초 데이터 반환
return new ListItemReader<>(accessLogRepository.aggregateAccessLog(startDate, endDate));
}
@Bean
public ItemProcessor<Map<String, Object>, AccessStatistics> aggregateLog() {
return item -> {
log.info("@@@ {}" + item.toString());
AccessStatistics statistics = new AccessStatistics();
statistics.setAccessTime(item.get("accessTime").toString());
statistics.setAccessUrl(item.get("accessUrl").toString());
statistics.setAccessSuccess((long) item.get("success"));
statistics.setAccessFail((long) item.get("fail"));
return statistics;
};
}
@Bean
public ItemWriter<AccessStatistics> statisticsWriter() {
return items -> {
accessStatisticsRepository.saveAll(items);
log.info("{} saved!" + items);
};
}
@Bean
public Step aggregateLogStep(
ItemReader<Map<String, Object>> accessLogReader,
ItemProcessor<Map<String, Object>, AccessStatistics> aggregateLog,
ItemWriter<AccessStatistics> statisticsWriter
) {
return stepBuilderFactory.get("aggregateLogStep")
// 한 번에 수행할 작업 설정
// 만약 예시대로 쿼리 입력 시 reader에서 3개의 데이터가 반환되는데,
// 만약 아래처럼 2로 해놓으면 2 개가 동시에 processor에서 실행되어 writer로 넘어가고,
// 나머지 1개가 따로 실행됨.
// reader, writer에 log.info를 찍어보면 알 수 있음.
.<Map<String, Object>, AccessStatistics>chunk(2)
// 로직 수행
.reader(accessLogReader)
.processor(aggregateLog)
.writer(statisticsWriter)
// 특정 에러 발생 시 3회 재수행
.faultTolerant()
.retry(Exception.class)
.retryLimit(3)
// 빌드
.build();
}
@Bean
public Job aggregateLogJob(Step aggregateLogStep) {
return jobBuilderFactory.get("aggregateLogJob")
.start(aggregateLogStep)
.build();
}
}
작성 완료 후 테스트를 수행해본다.
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job aggregateLogJob;
@Test
void contextLoads() {
JobParameters param = new JobParametersBuilder()
.addString("JobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
try {
jobLauncher.run(aggregateLogJob, param);
} catch (JobExecutionAlreadyRunningException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JobRestartException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JobInstanceAlreadyCompleteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JobParametersInvalidException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
이렇게 동작시키고 DB를 확인하면 배치가 정상 작동해서 데이터가 들어와 있음을 확인할 수 있다.
마지막으로 매 0분 0초에 해당 로직이 작동하도록 Test 코드를 Scheduler에 옮겨주기만 하면 자동 집계 기능이 완성된다.
SchedulingConfig.java
package com.mwlee.batchtest.batch.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@EnableScheduling
@Component
public class SchedulingConfig {
@Autowired private JobLauncher jobLauncher;
@Autowired private Job aggregateLogJob;
@Scheduled(cron = "0 0 * * * ?") // 매 0분 0초
public void aggregateLog() {
JobParameters param = new JobParametersBuilder()
.addString("JobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
try {
jobLauncher.run(aggregateLogJob, param);
} catch (JobExecutionAlreadyRunningException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JobRestartException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JobInstanceAlreadyCompleteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JobParametersInvalidException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
'실습 > 리눅스 서버 + 스프링 부트' 카테고리의 다른 글
동시성 제어 (0) | 2023.10.16 |
---|---|
[Apache Server] 리버스 프록시_CentOS, Ubuntu (1) | 2023.09.28 |
OSIV (0) | 2023.07.14 |
[SPRING JPA] N+1 문제 (0) | 2023.07.04 |
Spring Boot + Mybatis (0) | 2023.07.02 |