2024.03.25 - [Spring/Batch Programming] - Spring Batch 3 - Items(Processor)
이때 까지 Spring Batch에 대해 전반적으로 알아봤다.
사용자가 많아지고, 비즈니스가 계속해서 복잡해지면 시간이 오래 걸릴 수있다.
이에 따라 Spring Batch를 확장해서 성능을 개선할수있다.
어떻게?
확장 방식
- Multi-threaded Step
- Parallel Steps
- Partitioning
- Remote Chunking
Multi-threaded Step
앞에서 배웠던 청크 단위로 차례대로 커밋하는 과정을 멀티스레드로 구현한다면, 동시에 수행할수있다.
당연히 훨씬 빠르고 처리량이 많아지지만, 경쟁상태(race condition)가 발생할 수 있다.
경쟁상태란 간단하게 말해서 여러개의 실행흐름이 있을때, 공유자원에(ex. 전역변수) 동시접근하여 공유자원의 값이 실행순서나, 접근순서에 따라 달라 질수가 있는 상태를 의미한다. 즉 consistancy가 깨지는 것이다. 멀티스레드를 할때 절대로 이런 일관성없는 상황이 발생하면 안되는 것은 운영체제를 공부한 사람이라면 알 것이라고 생각한다.
그렇다면 어떻게 경쟁상태를 방지할 수 있을까?
☞ 스레드 Safe한 구현체인지 살펴봐야한다!!
스레드 Safe의 판단기준은 Spring 공식문서나, Java Docs에서 thread-safe라고 적혀있는지 확인 하는 방법과,
직접 구현할 경우, Syncronization을 잘 구현해주는 방법이있다.
스프링에서는 어떻게 적용하지?
.taskExecutor(new SimpleAsyncTaskExecutor())
Step에서 taskExecutor만 SimpleAsyncTaskExecutor 로 바꿔주면 스프링배치는 멀티스레드로 동작하게 된다.
청크별로 진행하다보니, 실패지점에서 재시작하는 것은 불가능하다,,
따라서 이런 단점보다 장점이 크다고 생각할때, 멀티스레딩을 쓰는것이 바람직하다.
실습
@Slf4j
@Configuration
public class MultiThreadedJobConfig {
@Bean
public Job job(
JobRepository jobRepository,
Step step
){
return new JobBuilder("multithreadedjob", jobRepository)
.start(step)
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step step(
JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager,
JpaPagingItemReader<User> jpaPagingItemReader
){
return new StepBuilder("step", jobRepository)
.<User,User>chunk(5, platformTransactionManager)
.reader(jpaPagingItemReader)
.writer(result -> log.info(result.toString()))
.build();
}
@Bean
public JpaPagingItemReader<User> jpaPagingItemReader(
EntityManagerFactory entityManagerFactory
){
return new JpaPagingItemReaderBuilder<User>()
.name("jpaPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(5)
.queryString("SELECT u FROM User u ORDER BY u.id")
.build();
}
}
우선 paging으로 DB에서 데이터를 가져오는데 나는 임의로 100개의 데이터를 만들어놨다.
아무튼 잘 가져왔다.
@Bean
public Step step(
JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager,
JpaPagingItemReader<User> jpaPagingItemReader
){
return new StepBuilder("step", jobRepository)
.<User,User>chunk(5, platformTransactionManager)
.reader(jpaPagingItemReader)
.writer(result -> log.info(result.toString()))
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
@Bean
public JpaPagingItemReader<User> jpaPagingItemReader(
EntityManagerFactory entityManagerFactory
){
return new JpaPagingItemReaderBuilder<User>()
.name("jpaPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(5)
.saveState(false)
.queryString("SELECT u FROM User u ORDER BY u.id")
.build();
}
이제 스레드를 구현해봤는데 달라진게 딱 두줄 있다.
.taskExecutor(new SimpleAsyncTaskExecutor())
와
.saveState(false)
인데 saveState는 왜? 이걸 해줘야 실패했을때 지점을 알려줘서 재시도할때 거기서부터 시작할 수있게 해주는기능인데 왜끄냐?! 청크단위로 다같이 돌기때문에, 실패해도 어느 스레드에서 다수의 실패가 일어났는지 알수없다.
그래서 false로 두는 것이 바람직하다.
메인 스레드에서 돌다가 이제 세부적인 TaskExecutor를 통해서 실행되는것도 확인할수 있다.
그렇다면 가장 중요한 성능은? 실행속도는 어떻게 되었는가?
지금은 100개 밖에 안되어서 차이가 크게 나진 않지만, 데이터가 많아지면 많아질수록 그 차이는 크게 난다.
Parallel Steps
위 그림은 step1실행 후, step2와 (step3, 4)가 각자 실행된다. 완료된 이후 step5가 실행된다.
굳이 step이 순차적으로 진행될 필요없고, 특정 부분은 병렬처리 되어도 될때 활용할 수 있다.
멀티스레딩과의 차이점은 Chunk → Step 단위로 성능의 이점을 보는 것이다.
구현은 어떻게?
☞ Flow Step을 활용해서 구현한다.
실습
실습에 앞서 구현하고자 하는 전체 흐름은
flow1 : (step1, step2)
→ step4
flow2 : (step3)
이런식이다.
package com.fastcampus.springbatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
@Slf4j
@Configuration
public class ParallelStepConfig {
// flow1 : (step1, step2)
// -> step4
// flow2 : (step3)
@Bean
public Job job(JobRepository jobRepository, Step step4, Flow splitFlow){
return new JobBuilder("job", jobRepository)
.start(splitFlow)
.next(step4)
.build()
.build();
}
@Bean
public Flow splitFlow(Flow flow1, Flow flow2){
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(new SimpleAsyncTaskExecutor())
.add(flow1, flow2)
.build();
}
@Bean
public Flow flow1(Step step1, Step step2){
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1)
.next(step2)
.build();
}
@Bean
public Flow flow2(Step step3){
return new FlowBuilder<SimpleFlow>("flow1")
.start(step3)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager){
return new StepBuilder("step1", jobRepository)
.tasklet((a,b)->{
Thread.sleep(1000);
log.info("step1");
return RepeatStatus.FINISHED;
}, platformTransactionManager)
.build();
}
@Bean
public Step step2(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager){
return new StepBuilder("step2", jobRepository)
.tasklet((a,b)->{
Thread.sleep(2000);
log.info("step2");
return RepeatStatus.FINISHED;
}, platformTransactionManager)
.build();
}
@Bean
public Step step3(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager){
return new StepBuilder("step3", jobRepository)
.tasklet((a,b)->{
Thread.sleep(2500);
log.info("step3");
return RepeatStatus.FINISHED;
}, platformTransactionManager)
.build();
}
@Bean
public Step step4(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager){
return new StepBuilder("step1", jobRepository)
.tasklet((a,b)->{
Thread.sleep(1000);
log.info("step4");
return RepeatStatus.FINISHED;
}, platformTransactionManager)
.build();
}
}
step1, 3가 같이 돌고, step1이 먼저 끝나고, step2가 3랑 같이돌다가 step3가 종료, 그 이후 step2가 종료 후 마지막으로 step4가 실행된다고 실행화면에 뜨는데 내가 구상했던 흐름도랑 일치한다!
Partitioning
Manager Step이 있고, Worker Step이 이를 여러개로 분할해서 돌린다.
Manager(Master Step이라고도 함)는 Partitioner과 PartitionHandler 가 존재한다.
멀티스레딩과 상당히 유사해 보이지만, 각각의 워커는 Reader, Processor, Writer가 존재해서 데이터를 전부 저장하고 있기때문에, 어디가 실패했는지 알 수있고, 재시작 할수있다는 점에서 차이가 있다.
Master Step이 Worker Step을 어떻게 다룰지 정의한다.
파티셔닝할 스텝 Grid사이즈지정, 병렬로 실행, 스레드 풀은 어떻게? 등등을 관리한다.
나는 TaskExecutorPartitionHandler에 대해서 알아보겠다.
단순하게 객체를 생성하고 비동기로 돌리기위한 SimpleAsyncTaskExecutor를 주입하고 Step과 GridSize를 넣는다.
Worker Step을 위한 Step Execution을 생성하는 인터페이스이다.
각각 Worker들이 어디서부터 어디까지 실행할지를 지정해준다.
https://docs.spring.io/spring-batch/reference/scalability.html
스프링 공식문서에 너무 잘 정리되어서 한번쯤 읽어보는걸 추천함니다,,,
공식 스프링문서에서 제공하고 있는 파티셔닝 활용 샘플 코드를 살펴보자.
min = 1, max = 100, gridSize = 5 라고 가정하자.
100개의 데이터가 있고, Worker Step은 5개가 있다는 의미이다. 반환은 Map에서 String과 ExecutionContext를 반환한다. 그렇다면 결과적으로 파티셔닝이 5개로 분할되고 각각 (1~20), (21~40), (41~60), (61~80), (81,100) 이 범위가 된다.
실습
실습을 진행하는데 파티셔닝부분이 너무 복잡했다,,,,
구현을 하면서도 할 부분이 너무 많아서 헷갈렸지만 일단 정리해봤다.
PartitionJobConfiguration.java
@Slf4j
@Configuration
public class PartitionJobConfiguration {
@Bean
public Job job(JobRepository jobRepository, Step managerStep) {
return new JobBuilder("partitionJob", jobRepository)
.start(managerStep)
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step managerStep(
JobRepository jobRepository,
Step step,
PartitionHandler partitionHandler,
DataSource dataSource
) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("delegateStep", new ColumnRangePartitioner(dataSource))
.step(step)
.partitionHandler(partitionHandler)
.build();
}
@Bean
public PartitionHandler partitionHandler(Step step){
TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
taskExecutorPartitionHandler.setStep(step);
taskExecutorPartitionHandler.setTaskExecutor(new SimpleAsyncTaskExecutor());
taskExecutorPartitionHandler.setGridSize(5);
return taskExecutorPartitionHandler;
}
@Bean
public Step step(
JobRepository jobRepository,
JpaPagingItemReader<User> jpaPagingItemReader,
PlatformTransactionManager platformTransactionManager
){
return new StepBuilder("step", jobRepository)
.<User, User>chunk(4, platformTransactionManager)
.reader(jpaPagingItemReader)
.writer(result -> log.info(result.toString()))
.build();
}
@Bean
@StepScope
public JpaPagingItemReader<User> itemReader(
@Value("#{stepExecutionContext[minValue]}") Long minValue,
@Value("#{stepExecutionContext[maxValue]}") Long maxValue,
EntityManagerFactory entityManagerFactory
){
log.info("minValue : {}, maxValue : {}", minValue, maxValue);
final Map<String, Object> params = new HashMap<>();
params.put("minValue", minValue);
params.put("maxValue", maxValue);
return new JpaPagingItemReaderBuilder<User>()
.name("itemReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(5)
.queryString("""
SELECT u FROM User u
WHERE u.id BETWEEN :minValue AND :maxValue
""")
.parameterValues(params)
.build();
}
}
- Job 구성
- managerStep을 ColumnRangePartitioner(구현체)를 가지고 있다.
- PartitionHandler에서
- Worker의 Step
- 병렬여부
- GridSize (Worker Step 사이즈) 설정
- 실제 실행 Step에서 Reader 참조, Writer는 로그만 기록
- ItemReader에서 StepExecutionContext의 min, max value를 가지고 와서, JQPL로 id칼럼의 최소부터 최대까지 조회.
- @StepScope를 써서 Job내부에 있는 StepExecutionContext 까지 value를 가지고 옴.
ColumnRangePartitioner.java
public class ColumnRangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public ColumnRangePartitioner(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) { //5
Integer min = jdbcTemplate.queryForObject("SELECT MIN(id) from USER", Integer.class);
Integer max = jdbcTemplate.queryForObject("SELECT MAX(id) from USER", Integer.class);
int targetSize = (max - min) / gridSize + 1; //20
//반환할 객체 선언
Map<String, ExecutionContext> result = new HashMap<>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while(start <= max){
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if(end >= max){
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
//partition 0 : 1, 20
//partition 1 : 21, 40
//partition 2 : 41, 60
//partition 3 : 61, 80
//partition 4 : 81, 100
}
- ExecutionContext에 어디서부터 어디까지 읽어 올지 지정해준다.
이렇게 Spring Batch에 대해서 전반적으로 모두 알아봤다.
상황에 따라서 방법을 선택해 확장 시킬수도 있고, 그에 따라 대용량 데이터를 더욱 효율적으로 처리 할 수 있는데, 실제 프로젝트에도 적용해서 테스트 해볼 계획이다. 모쪼록 이 글이 도움이 되었으면 좋겠다 ^^
'Spring > Batch Programming' 카테고리의 다른 글
Spring Batch 3 - Items(Processor) (0) | 2024.03.25 |
---|---|
Spring Batch 3 - Items(Writer) (0) | 2024.03.20 |
Spring Batch 3 - Items(Reader) (0) | 2024.03.20 |
Spring Batch Step (0) | 2024.03.13 |
Spring Batch Job (0) | 2024.03.13 |