2024.03.27 - [Spring/결제] - Spring으로 결제서비스 만들기(2)
고객이 유료 API를 사용한 정보에 대해서는 csv파일로 만들었다.
이제 결제한 정보에 대해서 특정 고객에게 결제대금등 정보를 줄 API를 만들것이다.
먼저 API를 만들기 위해 필요한 정보를 담은 클래스이다.
SettleDetail.java
@Entity
@NoArgsConstructor
@ToString
public class SettleDetail {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long customerId;
private Long serviceId;
private Long count;
private Long fee;
private LocalDate targetDate;
}
- 첫 번째 스텝은 파일의 고객 + 서비스 별로 집계를해서 Execution Context 안에 넣는다.
- 두 번째 스텝은 집계된 Execution Context 데이터를 가지고 DB에 write를 한다.
이런식으로 새로운 Step을 구성 해보겠다.
1. 첫 번째 스텝은 파일의 고객 + 서비스 별로 집계를해서 Execution Context 안에 넣는다.
SettleDetailStepConfiguration.java
@Configuration
@RequiredArgsConstructor
public class SettleDetailStepConfiguration {
private final JobRepository jobRepository;
private final PlatformTransactionManager platformTransactionManager;
// 첫 번째 스텝은 파일의 고객 + 서비스 별로 집계를해서 Execution Context 안에 넣는다.
// 두 번째 스텝은 집계된 Execution Context 데이터를 가지고 DB에 write를 한다.
@Bean
public Step preSettleDetailStep(
FlatFileItemReader<ApiOrder> preSettleDetailReader,
PreSettleDetailWriter preSettleDetailWriter,
ExecutionContextPromotionListener executionContextPromotionListener
){
return new StepBuilder("preSettleDetailStep", jobRepository)
.<ApiOrder, Key>chunk(500, platformTransactionManager)
.reader(preSettleDetailReader)
.processor(new PreSettleDetailProcessor())
.writer(preSettleDetailWriter)
.listener(executionContextPromotionListener)
.build();
}
}
ItemReader 부분이다. 이미 생성된 csv파일을 단순히 읽어오는 역할을 수행한다.
@Bean
@StepScope
public FlatFileItemReader<ApiOrder> preSettleDetailReader(
@Value("#{jobParameters['targetDate']}") String targetDate
){
String fileName = targetDate + "_api_orders_csv";
return new FlatFileItemReaderBuilder<ApiOrder>()
.name("preSettleDetailReader")
.resource(new ClassPathResource("/datas/" + fileName))
.linesToSkip(1)
.delimited()
.names("id", "customerId", "url", "state", "createdAt")
.targetType(ApiOrder.class)
.build();
}
targetType을 이용해서 내가 만든 ApiOrder클래스에 데이터들을 매핑시킨다.
ItemProcessor 부분에서 출력값으로 Key를 주기 위해 레코드를 만들었다.
record Key(Long customerId, Long serviceId) implements Serializable {
}
ApiOrder가 실패한 경우에는 null을 반환하여 스킵하도록 하고, 성공한 경우는 url을 받아서 그것의 Id를 얻어서 서비스 Id를 찾고, 고객 Id와 서비스Id를 반환하도록했다.
public class PreSettleDetailProcessor implements ItemProcessor<ApiOrder, Key> {
@Override
public Key process(ApiOrder item) throws Exception {
if(item.getState() == ApiOrder.State.FAIL){
return null;
}
Long serviceId = ServicePolicy.findByUrl(item.getUrl())
.getId();
return new Key(
item.getCustomerId(),
serviceId
);
}
}
인자로 받은 url로 어떤 서비스인지 찾아준다.
public static ServicePolicy findByUrl(String url){
return Arrays.stream(values())
.filter(it -> it.url.equals(url))
.findFirst()
.orElseThrow();
}
그리고 마지막으로 StepExecution에 데이터를 저장할 것이므로, StepExecutionListener를 ItemWriter부분에 재정의 해준다.
단순히 Map 자료구조에 value로 특정 고객의 서비스 이용횟수를 집계하는 로직이다.
public class PreSettleDetailWriter implements ItemWriter<Key>, StepExecutionListener {
private StepExecution stepExecution;
@Override
public void write(Chunk<? extends Key> chunk) throws Exception {
ConcurrentMap<Key, Long> snapshotMap = (ConcurrentMap<Key, Long>) stepExecution.getExecutionContext().get("snapshots");
chunk.forEach(it ->{
snapshotMap.compute(it, (k, v) -> (v==null) ? 1 : v+1);
});
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
ConcurrentMap<Key, Long> snapshotMap = new ConcurrentHashMap<>();
stepExecution.getExecutionContext().put("snapshots", snapshotMap);
}
}
여기서 사용된 ConcurrentMap이란? 기능적으로 봤을때는 그냥 HashMap이랑 같지만, 멀티스레드에 특화된 자료구조다.
그리고 compute는 chunk를 돌면서 충돌이 날 경우를 대비한 코드인데,
이런식으로 구성되어 있다. 맵의 키를 먼저 넣고, 다음에 새롭게 커스터마이징한 람다함수로 인자를 넣어줬다.
그리고 ExecutionContext에 데이터를 넣기위해서, StepExecutionListener를 상속받았고, step이 실행되기 이전에 stepExecution을 할당하고, 그안에 snapshotMap을 넣어줬다.
2. 두 번째 스텝은 집계된 Execution Context 데이터를 가지고 DB에 write를 한다.
두 번째 스텝
@Bean
public Step settleDetailStep(
SettleDetailReader settleDetailReader,
SettleDetailProcessor settleDetailProcessor,
JpaItemWriter<SettleDetail> settleDetailWriter
){
return new StepBuilder("settleDetailStep", jobRepository)
.<KeyAndCount, SettleDetail>chunk(1000, platformTransactionManager)
.reader(settleDetailReader)
.processor(settleDetailProcessor)
.writer(settleDetailWriter)
.build();
}
ItemReader 부분
@Component
@RequiredArgsConstructor
class SettleDetailReader implements ItemReader<KeyAndCount>, StepExecutionListener {
private Iterator<Map.Entry<Key, Long>> iterator;
@Override
public KeyAndCount read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if(iterator.hasNext())
return null;
Map.Entry<Key, Long> map = iterator.next();
return new KeyAndCount(map.getKey(), map.getValue());
}
@Override
public void beforeStep(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
Map<Key, Long> snapshots = (ConcurrentHashMap<Key, Long>) jobExecution.getExecutionContext().get("snapshots");
iterator = snapshots.entrySet().iterator();
}
}
entrySet()함수는 맵을 집합형태로 반환해준다. 여기서 iterator로 가져오고, iterator를 가지고 집합전체를 하나씩 조회한다.
엥? 근데 위 코드를 봤을때 이상한점이 있다.
분명 이전 writer에서 step레벨의 executionContext에 snapshots를 넣었는데, 왜 jobExecution을 가져오지?
Step내에서만 StepExecution이 적용되기때문에 다른 Step에서는 활용을 할 수가없다!
그러니 Job레벨로 올려주는 작업이 필요하다. 스프링은 이러한 리스너도 제공을 하고있다.
@Bean
public ExecutionContextPromotionListener promotionListener(){
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"snapshots"});
return listener;
}
설정파일에 리스너를 추가해줌으로써, Job레벨로 올라가게된다. 이제 쓸수있게 되었다!
ItemProcessor
public class SettleDetailProcessor implements ItemProcessor<KeyAndCount, SettleDetail>, StepExecutionListener {
private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
private StepExecution stepExecution;
@Override
public SettleDetail process(KeyAndCount item) throws Exception {
Key key = item.key();
ServicePolicy servicePolicy = ServicePolicy.findById(key.serviceId());
Long count = item.count();
String targetDate = stepExecution.getJobParameters().getString("targetDate");
return new SettleDetail(
key.customerId(),
key.serviceId(),
count,
servicePolicy.getFee() * count,
LocalDate.parse(targetDate, dateTimeFormatter)
);
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
targetDate를 알기위해서 JobParameters를 얻어왔고, 생성자를 통해 SettleDetail을 반환한다.
당연히 이것도 Step실행전에 stepExecution을 받아와준다.
ItemWriter
@Bean
public JpaItemWriter<SettleDetail> settleDetailWriter(EntityManagerFactory entityManagerFactory){
return new JpaItemWriterBuilder<SettleDetail>()
.entityManagerFactory(entityManagerFactory)
.build();
}
단순하게 DB에 저장할것이므로, JpaItemWriter로 구현했다.
그리고 이제까지 Job을 빼고 Step만 만들어줬기때문에 전체적인 흐름을 구성할 Job을 만든다.
SettleJobConfiguration.java
@Configuration
@RequiredArgsConstructor
public class SettleJobConfiguration {
private final JobRepository jobRepository;
@Bean
public Job settleJob(
Step preSettleDetailStep,
Step settleDetailStep
){
return new JobBuilder("settleJob", jobRepository)
.validator(new DateFormatJobParametersValidator(new String[]{"targetDate"}))
.start(preSettleDetailStep)
.next(settleDetailStep)
.build();
}
}
여기서 validator로 targetDate 파라미터가 내가 원하는 'yyyy-MM-dd' 형식으로 들어오는지 확인하기 위해서, DateFormatJobParametersValidator를 따로 만들어줬다.
DateFormatJobParametersValidator.java
public class DateFormatJobParametersValidator implements JobParametersValidator {
private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
private final String[] names;
public DateFormatJobParametersValidator(String[] names) {
this.names = names;
}
@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
for(String name : names){
validateDateFormat(parameters, name);
}
}
private void validateDateFormat(JobParameters parameters, String name) throws JobParametersInvalidException {
try{
String string = parameters.getString(name);
LocalDate.parse(Objects.requireNonNull(string), dateTimeFormatter);
}catch (Exception e){
throw new JobParametersInvalidException("yyyyMMdd 형식만을 지원합니다.");
}
}
}
코드는 별거없다, 날짜 형식을 String 배열로 받아와서, 하나씩 순회하며 내가 원한 형식과 맞는지 체크하고 형식이 바르지 않다면, 런타임 예외를 발생시켜주는 것이다.
사실 이렇게 하고 실행을 시켜봤는데, 실행은 성공적으로 되지만 계속해서 DB에 데이터가 들어가지않고, 빈테이블이 나왔다,,,, 5시간정도 걸렸나,, 그렇게 뭐가 문젠지 찾아보려고 스택트레이스와, batch_Step_execution 테이블을 찾아봤다.
역시나 했는데, preSettleDetailStep에서 write한 데이터를 settleDetailStep에서 읽지를 못하는 것이었다. 그래서 settleDetailReader부분을 자세히 보니,
@Override
public KeyAndCount read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if(iterator.hasNext())
return null;
Map.Entry<Key, Long> map = iterator.next();
return new KeyAndCount(map.getKey(), map.getValue());
}
이상한 점이 보이는가,,, if문에서 hasNext()함수는 iterator가 다음으로 가리키는 값이 있는가? 에서 있을경우 true를 반환하는 함수인데, 위코드 처럼 해버리면 있는 값에 대해서 계속 null을 반환하게 되어 아무것도 읽어들이지 못한다. 바보같은 짓을했다,, ㅎㅎ 그래도 이번 기회로 이상한점이있을때, 어디를 수정해야할지 보는 감이 생긴것같다.
2023-07-07일, 횟수, 고객ID, 요금, 서비스ID 가 잘출력되는 걸 확인할 수있다!
이렇게 일일 정산 배치 시스템을 구현해봤다.
그런데 만약 읽어 와야할 데이터가 flatfile이 아니라, 다른 DB나 json이면 어떡하냐?
reader부분만 따로 정의해서 파라미터에 갈아 끼워 넣어 주기만하면된다!
다음글은 주간 정산 배치 시스템을 구현해보겠다.
2024.03.31 - [Spring/결제] - Spring으로 결제서비스 만들기(4)
'Spring > 결제' 카테고리의 다른 글
Spring으로 결제서비스 만들기(4) (1) | 2024.03.31 |
---|---|
Spring으로 결제서비스 만들기(2) (0) | 2024.03.27 |
Spring으로 결제서비스 만들기(1) (0) | 2024.03.27 |