Spring Batch Step

2024. 3. 13. 15:59·Spring/Batch Programming

2024.03.13 - [Spring/Batch Programming] - Spring Batch Job

 

Spring Batch Job

2024.03.12 - [Spring/Batch Programming] - Spring Batch 도메인 용어, 실전 Spring Batch 도메인 용어, 실전 2024.03.10 - [Spring/Batch Programming] - Spring Batch 실습(4) Spring Batch 실습(4) 2024.03.09 - [Spring/Batch Programming] - Spring B

lsdiary.tistory.com

Step이란?

배치 작업의 독립적이고, 순차적인 단계를 캡슐화한 도메인이다.

read, processing, write에 대한 모든 설정을 포함한다!

Step 인터페이스

 

이를 구현한 Step 클래스들도 Template Method Pattern으로 구현되어있다.

 

Tasklet 기반 JobConfiguration.java

@Slf4j
@Configuration
public class JobConfiguration {

    @Bean
    public Job job(JobRepository jobRepository, Step step) {
        return new JobBuilder("job", jobRepository)
                .start(step)
                .build();
    }

    @Bean
    public Step step(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
        final Tasklet tasklet = new Tasklet() {

            private int count = 0;
            @Override
            public RepeatStatus execute(StepContribution a, ChunkContext b) throws Exception {
                count++;
                if(count == 15){
                    log.info("Tasklet FINISHED");
                    return RepeatStatus.FINISHED;
                }
                log.info("Tasklet CONTINUABLE {}", count);
                return RepeatStatus.CONTINUABLE;
            }
        };
        return new StepBuilder("step", jobRepository)
                .tasklet(tasklet, platformTransactionManager)
                .build();
    }
}

 


tasklet을 통해서 단일 실행도 할수 있고, 여러개를 실행할 수도있다.

☞ 하나씩 커밋해서 대용량 데이터일 경우 느리다

빠르게 하려면 어떻게 해야할까,,

Chunk(덩어리)

 

여러개씩 묶어서 처리하는 방식, Spring Batch의 일반적인 사용방식.

Chunk-oriented process : 데이터를 한번에 하나씩 읽고 트랜잭션 경계 내에 기록되는 'Chunk'를 생성하는 것을 의미.

https://docs.spring.io/spring-batch/reference/step/chunk-oriented-processing.html

 

Chunk-oriented Processing :: Spring Batch

Spring Batch uses a “chunk-oriented” processing style in its most common implementation. Chunk oriented processing refers to reading the data one at a time and creating 'chunks' that are written out within a transaction boundary. Once the number of ite

docs.spring.io

스프링 공식 문서에 보면 어떻게 처리하는지 그림으로 잘 나와있다.

정리해보면 execute() 실행 이후, 한번 읽고 한번 쓰고를 하는것이아닌, 쭉~~ 읽고 모아서 한번에 쓴다는 것이다.

chunk-size는 당연하게도 read의 횟수가 되겠다. 다르게 말하면 커밋 간격을 의미한다.

ex) chunk-size = 10000 → read = 10000, process = 10000, write = 1

 

그러면 chunk-size를 그냥 크게하는게 무조건 좋은 거 아닌가?

물론 커밋 비용이 줄긴하지만, 답은 NO 입니다. 위험성이 올라가기 때문이다.

  • 만약 백만번의 사이즈인 chunk가 있다고 생각해보자 99만 9999번째에서 실패를 하게 되면 전부 롤백이 되어버린다,,,

그래서 적절히 사용하는 것이 중요하다!

 

Chunk기반 JobConfiguration.java

@Slf4j
@Configuration
public class JobConfiguration {

    @Bean
    public Job job(JobRepository jobRepository, Step step) {
        return new JobBuilder("job", jobRepository)
                .start(step)
                .build();
    }

    @Bean
    public Step step(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {

        ItemReader<Integer> itemReader = new ItemReader<>() {
            private int count = 0;

            @Override
            public Integer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                count++;

                log.info("Read : {}", count);
                if (count == 15) {
                    return null;
                }
                return count;
            }

        };
        return new StepBuilder("step", jobRepository)
                .chunk(10, platformTransactionManager)
                .reader(itemReader)
//                .processor()
                .writer(read -> {})
                .build();
    }
}

Tasklet 기반보다 훨씬 더 자세히 정보를 보여준다.

15번 실행되는 로그가 잘 찍힌것을 볼 수있다.

 

커밋수도 2번!
ChunkOrientedTasklet.java

 

실제로 Tasklet에 기반해서 구현된 부분이다.

 


다음으로는 Step의 옵션들을 살펴 본다.

 

Restart

  • allowStartIfComplete(true/false) : Step이 성공해도 재시작 허용
  • startLimit(int count) : Step 시작 제한 수

여러 번 재시작했을때, 문제가 있다고 판단될 때 사용한다.

※ 단, Job에 preventRestart가 선언되지 않아야 사용할 수 있다.

 

@Slf4j
@Configuration
public class JobConfiguration {

    @Bean
    public Job job(JobRepository jobRepository, Step step) {
        return new JobBuilder("job", jobRepository)
                .start(step)
                .build();
    }

    @Bean
    public Step step(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {

        return new StepBuilder("step", jobRepository)
                .tasklet((contribution, chunkContext) ->{
                    log.info("step 실행");
                    return RepeatStatus.FINISHED;
                }, platformTransactionManager)
                .allowStartIfComplete(true)
                .startLimit(5)
                .build();
    }
}

JobInstance는 그대로 이고, Job_execution이 새로 생성되는것을 DB에서 확인 할 수있다.

 


 

Skip

한 번 실패하면 Job, Step이 전부 멈추므로, 사소한 에러는 Skip하고 넘어 갈 필요도 있다.

이 또한 스프링에서 제공하는 기능이다.

  • faultTolerant() : 메소드선언
  • skipLimit(int skipcount) : 몇 번까지 스킵? 
  • skip(해당 클래스) : 어느 예외 클래스로 지정?
  • noSkip(해당 클래스) : 이 예외 클래스는 Skip NO!

SkipPolicy.java

인터페이스도 제공해주고 있다.

@Slf4j
@Configuration
public class JobConfiguration {

    @Bean
    public Job job(JobRepository jobRepository, Step step) {
        return new JobBuilder("job", jobRepository)
                .start(step)
                .build();
    }
        @Bean
    public Step step(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {

        ItemReader<Integer> itemReader = new ItemReader<>() {
            private int count = 0;

            @Override
            public Integer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                count++;

                log.info("Read : {}", count);
                if (count >= 15) {
                    throw new IllegalStateException("예외 발생");
                }
                return count;
            }

        };
        return new StepBuilder("step", jobRepository)
                .chunk(10, platformTransactionManager)
                .reader(itemReader)
//                .processor()
                .writer(read -> {})
                .faultTolerant()
                .skip(IllegalStateException.class)
                .skipLimit(5)
                .build();
    }
}

 

위 코드에서는 반드시 skipLimit값을 설정해줘야한다. 설정안한 Default값이 0으로 설정되어있기 때문이다.

 

예외 발생
skip을 하고 rollback까지 함

 

5번까지 skipLimit을 설정해줬는데 5번 이상이되어서 rollback된 것을 알 수있다.

여기에 더해서 skipPolicy로 더욱 복잡한 로직을 구현할 수 있다.

                .skipPolicy((t, skipCount) -> t instanceof IllegalStateException && skipCount < 5)

 


Retry

특정 에러는 다시 시도해서 성공 하는 경우도 있다.

Skip 뿐 아니라 재시도를 해서 회복탄력성을 가지게 할 수도 있다.

  • retryLimit(int retryCount)
  • retry(해당 클래스)

Skip과 방법이 동일하다, 인터페이스 또한 제공하고 있다.

retry는 itemReader에는 제공되지 않는다. 따라서 itemProcessor이 필요하다.

 

  return new StepBuilder("step", jobRepository)
                .<Integer, Integer>chunk(10, platformTransactionManager)
                .reader(itemReader)
                .processor(itemProcessor)
                .writer(read -> {})
                .faultTolerant()
                .retry(IllegalStateException.class)
                .retryLimit(5)
                .build();

설정안한 Default값이 0으로 설정되어있기 때문에 skip과 마찬가지로 따로 설정해줘야한다.

처음 + 5번의 재시도 실패&nbsp;&rarr; 롤백,,

 

여기선 전부 실패했지만, 네트워크 지연 등의 문제에서는 장애가 회복 될 수도있다!


 

Rollback

위에서 Chunk안에서 작업을 수행하다가 실패하면 rollback이 일어난다고 했다.

그렇다면 rollback을 안하고 싶으면 어떻게 해야할까?

  • noRollback(해당 클래스) 
                .noRollback(IllegalStateException.class)

Intercepting Step

 

Step은 Job과 다르게 다양한 Listener를 제공한다.

우리가 사용하고자하는 비즈니스에 맞게 구현 할 수있다.

어노테이션으로도 사용이 가능하다.

 

Late Binding

 

이때까지는

Job파라미터 주입

위 설정 파일에서 직접 파라미터를 주입해서 사용했다.

비즈니스적으로 Step이나 Item,  Tasklet 까지 전달되어 사용되는 경우도 있다.

이런 것들은 지연된 바인딩으로 획득할 수있다!

 

  • 어플리케이션 구동 시점이 아니라 빈의 실행 시점에 적용하기 위해 사용.

병렬 처리 시 개별의 Scope 빈이 할당되어 Thread가 안전한 상태다.

 

그러면 언제 사용할까 이건? 

☞ JobParameter에서 값을 활용할 때 사용한다.

@JobScope, @StepScope로 쓸 수 있다. 

@Bean
    @JobScope
    public Step step(JobRepository jobRepository,
                     PlatformTransactionManager platformTransactionManager,
                     @Value("#{jobParameters['name']}") String name){
        log.info("name : {}", name);
        return new StepBuilder("step", jobRepository)
                .tasklet((a,b) ->{
                    return RepeatStatus.FINISHED;
                },platformTransactionManager)
                .build();
    }

여기서 @JobScope를 안쓴다면 어떻게 될까?

→ Bean이 생성되는 시점보다 Job 파라미터를 주입받는 시점이 느린데, 지연이 되지않아 에러가 발생한다!

 

batch_job_execution_params 테이블

name=minsoo 로 설정해줬을때 잘 동작하는 것을 확인할 수있다.

@StepScope도 마찬가지지만, Tasklet이나 Item에 적용할 수있다.


지금까지 step1 → step2 → step3 ... 순으로 Sequential 하게 흐름을 보이는 Simple Job을 살펴봤다.

상황에 따라 다르게 동작하는 경우에는 어떨까?

Conditional Flow

 

만약 step1이 성공하면 step2를 실행하고, 실패하면 step3를 실행하게 된다면?

이것도 스프링에서 다 처리 해두었다. (내가 할 게없는듯,,,)

별반 다르지 않은게 똑같이 JobBuilder를 사용하면 된다.

return new JobBuilder("job", jobRepository).start(step1).on("*").to(step2)
						.from(step1).on("FAILED").to(step3).end().build();

 

  • on(String) : ExitStatus의 반환물과 매칭
'*' : 0개 이상의 문자열과 일치
'?' : 정확하게 문자열과 일치
  • to(Step) : on 조건에 만족하면 해당 Step으로 이동
  • from : 이전에 등록한 단계로 돌아가서 새 경로를 시작
 Flow가 끝났을때 3가지상태
1. Completed : end() -> 강제로 Job이 완성되었다고 하게 함, Ste이 실패더라
2. Failed : fail()
3. Stopped : stopAndRestart ()

 

비즈니스 로직이 복잡할 수록 배치 요구사항이 늘어나고 Flow JOB을 사용할 수있지만, 꼭 필요할때만 쓰자!

 

2024.03.20 - [Spring/Batch Programming] - Spring Batch 3 - Items(Reader)

 

Spring Batch 3 - Items(Reader)

2024.03.13 - [Spring/Batch Programming] - Spring Batch Step Spring Batch Step 2024.03.13 - [Spring/Batch Programming] - Spring Batch Job Spring Batch Job 2024.03.12 - [Spring/Batch Programming] - Spring Batch 도메인 용어, 실전 Spring Batch 도메인

lsdiary.tistory.com

 

'Spring > Batch Programming' 카테고리의 다른 글

Spring Batch 3 - Items(Writer)  (0) 2024.03.20
Spring Batch 3 - Items(Reader)  (0) 2024.03.20
Spring Batch Job  (0) 2024.03.13
Spring Batch 도메인 용어, 실전  (3) 2024.03.12
Spring Batch 실습(4)  (0) 2024.03.10
'Spring/Batch Programming' 카테고리의 다른 글
  • Spring Batch 3 - Items(Writer)
  • Spring Batch 3 - Items(Reader)
  • Spring Batch Job
  • Spring Batch 도메인 용어, 실전
Ls._.Rain
Ls._.Rain
안되면 될때까지 삽질했던 기록
  • Ls._.Rain
    Ls{Diary}
    Ls._.Rain
  • 전체
    오늘
    어제
    • 분류 전체보기 (136)
      • Github (2)
      • Spring (51)
        • Batch Programming (13)
        • 결제 (4)
        • 대용량 트래픽 (32)
        • OpenAI (0)
        • Security (0)
        • WebSocket (0)
        • JPA (1)
      • Algorithm (67)
        • DFS (6)
        • BFS (6)
        • Dynamic Programming (10)
        • Brute Force (4)
        • Binary Search (6)
        • 구현, 시뮬레이션 (15)
        • Stack (1)
        • Greedy (4)
        • Priority_Queue (2)
        • Back Tracking (3)
        • Geometry (2)
        • SCC (1)
        • 투포인터 (4)
        • 최대유량 (1)
        • 정렬 (1)
      • OS (0)
      • DevOps (15)
        • AWS (11)
        • Docker (4)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • hELLO· Designed By정상우.v4.10.0
Ls._.Rain
Spring Batch Step
상단으로

티스토리툴바