2024.03.13 - [Spring/Batch Programming] - Spring Batch Job
Step이란?
배치 작업의 독립적이고, 순차적인 단계를 캡슐화한 도메인이다.
read, processing, write에 대한 모든 설정을 포함한다!
이를 구현한 Step 클래스들도 Template Method Pattern으로 구현되어있다.
Tasklet 기반 JobConfiguration.java
@Slf4j
@Configuration
public class JobConfiguration {
@Bean
public Job job(JobRepository jobRepository, Step step) {
return new JobBuilder("job", jobRepository)
.start(step)
.build();
}
@Bean
public Step step(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
final Tasklet tasklet = new Tasklet() {
private int count = 0;
@Override
public RepeatStatus execute(StepContribution a, ChunkContext b) throws Exception {
count++;
if(count == 15){
log.info("Tasklet FINISHED");
return RepeatStatus.FINISHED;
}
log.info("Tasklet CONTINUABLE {}", count);
return RepeatStatus.CONTINUABLE;
}
};
return new StepBuilder("step", jobRepository)
.tasklet(tasklet, platformTransactionManager)
.build();
}
}
tasklet을 통해서 단일 실행도 할수 있고, 여러개를 실행할 수도있다.
☞ 하나씩 커밋해서 대용량 데이터일 경우 느리다
빠르게 하려면 어떻게 해야할까,,
Chunk(덩어리)
여러개씩 묶어서 처리하는 방식, Spring Batch의 일반적인 사용방식.
Chunk-oriented process : 데이터를 한번에 하나씩 읽고 트랜잭션 경계 내에 기록되는 'Chunk'를 생성하는 것을 의미.
https://docs.spring.io/spring-batch/reference/step/chunk-oriented-processing.html
스프링 공식 문서에 보면 어떻게 처리하는지 그림으로 잘 나와있다.
정리해보면 execute() 실행 이후, 한번 읽고 한번 쓰고를 하는것이아닌, 쭉~~ 읽고 모아서 한번에 쓴다는 것이다.
chunk-size는 당연하게도 read의 횟수가 되겠다. 다르게 말하면 커밋 간격을 의미한다.
ex) chunk-size = 10000 → read = 10000, process = 10000, write = 1
그러면 chunk-size를 그냥 크게하는게 무조건 좋은 거 아닌가?
물론 커밋 비용이 줄긴하지만, 답은 NO 입니다. 위험성이 올라가기 때문이다.
- 만약 백만번의 사이즈인 chunk가 있다고 생각해보자 99만 9999번째에서 실패를 하게 되면 전부 롤백이 되어버린다,,,
그래서 적절히 사용하는 것이 중요하다!
Chunk기반 JobConfiguration.java
@Slf4j
@Configuration
public class JobConfiguration {
@Bean
public Job job(JobRepository jobRepository, Step step) {
return new JobBuilder("job", jobRepository)
.start(step)
.build();
}
@Bean
public Step step(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
ItemReader<Integer> itemReader = new ItemReader<>() {
private int count = 0;
@Override
public Integer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
count++;
log.info("Read : {}", count);
if (count == 15) {
return null;
}
return count;
}
};
return new StepBuilder("step", jobRepository)
.chunk(10, platformTransactionManager)
.reader(itemReader)
// .processor()
.writer(read -> {})
.build();
}
}
Tasklet 기반보다 훨씬 더 자세히 정보를 보여준다.
실제로 Tasklet에 기반해서 구현된 부분이다.
다음으로는 Step의 옵션들을 살펴 본다.
Restart
- allowStartIfComplete(true/false) : Step이 성공해도 재시작 허용
- startLimit(int count) : Step 시작 제한 수
여러 번 재시작했을때, 문제가 있다고 판단될 때 사용한다.
※ 단, Job에 preventRestart가 선언되지 않아야 사용할 수 있다.
@Slf4j
@Configuration
public class JobConfiguration {
@Bean
public Job job(JobRepository jobRepository, Step step) {
return new JobBuilder("job", jobRepository)
.start(step)
.build();
}
@Bean
public Step step(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
return new StepBuilder("step", jobRepository)
.tasklet((contribution, chunkContext) ->{
log.info("step 실행");
return RepeatStatus.FINISHED;
}, platformTransactionManager)
.allowStartIfComplete(true)
.startLimit(5)
.build();
}
}
JobInstance는 그대로 이고, Job_execution이 새로 생성되는것을 DB에서 확인 할 수있다.
Skip
한 번 실패하면 Job, Step이 전부 멈추므로, 사소한 에러는 Skip하고 넘어 갈 필요도 있다.
이 또한 스프링에서 제공하는 기능이다.
- faultTolerant() : 메소드선언
- skipLimit(int skipcount) : 몇 번까지 스킵?
- skip(해당 클래스) : 어느 예외 클래스로 지정?
- noSkip(해당 클래스) : 이 예외 클래스는 Skip NO!
인터페이스도 제공해주고 있다.
@Slf4j
@Configuration
public class JobConfiguration {
@Bean
public Job job(JobRepository jobRepository, Step step) {
return new JobBuilder("job", jobRepository)
.start(step)
.build();
}
@Bean
public Step step(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
ItemReader<Integer> itemReader = new ItemReader<>() {
private int count = 0;
@Override
public Integer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
count++;
log.info("Read : {}", count);
if (count >= 15) {
throw new IllegalStateException("예외 발생");
}
return count;
}
};
return new StepBuilder("step", jobRepository)
.chunk(10, platformTransactionManager)
.reader(itemReader)
// .processor()
.writer(read -> {})
.faultTolerant()
.skip(IllegalStateException.class)
.skipLimit(5)
.build();
}
}
위 코드에서는 반드시 skipLimit값을 설정해줘야한다. 설정안한 Default값이 0으로 설정되어있기 때문이다.
5번까지 skipLimit을 설정해줬는데 5번 이상이되어서 rollback된 것을 알 수있다.
여기에 더해서 skipPolicy로 더욱 복잡한 로직을 구현할 수 있다.
.skipPolicy((t, skipCount) -> t instanceof IllegalStateException && skipCount < 5)
Retry
특정 에러는 다시 시도해서 성공 하는 경우도 있다.
Skip 뿐 아니라 재시도를 해서 회복탄력성을 가지게 할 수도 있다.
- retryLimit(int retryCount)
- retry(해당 클래스)
Skip과 방법이 동일하다, 인터페이스 또한 제공하고 있다.
retry는 itemReader에는 제공되지 않는다. 따라서 itemProcessor이 필요하다.
return new StepBuilder("step", jobRepository)
.<Integer, Integer>chunk(10, platformTransactionManager)
.reader(itemReader)
.processor(itemProcessor)
.writer(read -> {})
.faultTolerant()
.retry(IllegalStateException.class)
.retryLimit(5)
.build();
설정안한 Default값이 0으로 설정되어있기 때문에 skip과 마찬가지로 따로 설정해줘야한다.
여기선 전부 실패했지만, 네트워크 지연 등의 문제에서는 장애가 회복 될 수도있다!
Rollback
위에서 Chunk안에서 작업을 수행하다가 실패하면 rollback이 일어난다고 했다.
그렇다면 rollback을 안하고 싶으면 어떻게 해야할까?
- noRollback(해당 클래스)
.noRollback(IllegalStateException.class)
Intercepting Step
Step은 Job과 다르게 다양한 Listener를 제공한다.
우리가 사용하고자하는 비즈니스에 맞게 구현 할 수있다.
어노테이션으로도 사용이 가능하다.
Late Binding
이때까지는
위 설정 파일에서 직접 파라미터를 주입해서 사용했다.
비즈니스적으로 Step이나 Item, Tasklet 까지 전달되어 사용되는 경우도 있다.
이런 것들은 지연된 바인딩으로 획득할 수있다!
- 어플리케이션 구동 시점이 아니라 빈의 실행 시점에 적용하기 위해 사용.
병렬 처리 시 개별의 Scope 빈이 할당되어 Thread가 안전한 상태다.
그러면 언제 사용할까 이건?
☞ JobParameter에서 값을 활용할 때 사용한다.
@JobScope, @StepScope로 쓸 수 있다.
@Bean
@JobScope
public Step step(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager,
@Value("#{jobParameters['name']}") String name){
log.info("name : {}", name);
return new StepBuilder("step", jobRepository)
.tasklet((a,b) ->{
return RepeatStatus.FINISHED;
},platformTransactionManager)
.build();
}
여기서 @JobScope를 안쓴다면 어떻게 될까?
→ Bean이 생성되는 시점보다 Job 파라미터를 주입받는 시점이 느린데, 지연이 되지않아 에러가 발생한다!
name=minsoo 로 설정해줬을때 잘 동작하는 것을 확인할 수있다.
@StepScope도 마찬가지지만, Tasklet이나 Item에 적용할 수있다.
지금까지 step1 → step2 → step3 ... 순으로 Sequential 하게 흐름을 보이는 Simple Job을 살펴봤다.
상황에 따라 다르게 동작하는 경우에는 어떨까?
Conditional Flow
만약 step1이 성공하면 step2를 실행하고, 실패하면 step3를 실행하게 된다면?
이것도 스프링에서 다 처리 해두었다. (내가 할 게없는듯,,,)
별반 다르지 않은게 똑같이 JobBuilder를 사용하면 된다.
return new JobBuilder("job", jobRepository).start(step1).on("*").to(step2)
.from(step1).on("FAILED").to(step3).end().build();
- on(String) : ExitStatus의 반환물과 매칭
'*' : 0개 이상의 문자열과 일치
'?' : 정확하게 문자열과 일치
- to(Step) : on 조건에 만족하면 해당 Step으로 이동
- from : 이전에 등록한 단계로 돌아가서 새 경로를 시작
Flow가 끝났을때 3가지상태
1. Completed : end() -> 강제로 Job이 완성되었다고 하게 함, Ste이 실패더라
2. Failed : fail()
3. Stopped : stopAndRestart ()
비즈니스 로직이 복잡할 수록 배치 요구사항이 늘어나고 Flow JOB을 사용할 수있지만, 꼭 필요할때만 쓰자!
2024.03.20 - [Spring/Batch Programming] - Spring Batch 3 - Items(Reader)
'Spring > Batch Programming' 카테고리의 다른 글
Spring Batch 3 - Items(Writer) (0) | 2024.03.20 |
---|---|
Spring Batch 3 - Items(Reader) (0) | 2024.03.20 |
Spring Batch Job (0) | 2024.03.13 |
Spring Batch 도메인 용어, 실전 (3) | 2024.03.12 |
Spring Batch 실습(4) (0) | 2024.03.10 |