상세 컨텐츠

본문 제목

Spring Batch - Chunk 지향 프로세싱(Mybatis)

Spring/Spring Batch

by Chan.94 2022. 10. 11. 08:08

본문

반응형

Step

일반적으로 Spring Batch는 대용량 데이터를 다루는 경우가 많기 때문에 Tasklet보다 상대적으로 트랜잭션의 단위를 짧게 하여 처리할 수 있는 ItemReader, ItemProcessor, ItemWriter를 이용한 Chunk 지향 프로세싱을 이용한다.

작업 로직이 Chunk 지향 처리를 하기에는 너무 간단하거나, 부 자연스러운 경우 Tasklet을 사용하는 것이 좋다.

 

Chunk
한 번에 하나씩 데이터(row)를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것
Chunk 단위로 트랜잭션을 수행하기 때문에 실패할 경우엔 해당 Chunk 만큼만 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영이 된다.
Chunk 기반 Step은 ItemReader, ItemProcessor, ItemWriter라는 3개의 주요 부분으로 구성될 수 있다.

ItemReader, ItemProcessor, ItemWriter
Chunk 모델을 구현하면서 데이터의 입력/처리/출력 3가지 프로세스로 분할하기 위한 인터페이스로 데이터 IO를 담당하는 ItemReader와 ItemWriter는 데이터 베이스와 파일을 Java객체 컨버팅을 제공하기에 Spring Batch를 사용하는 것으로 충분히 컨버팅이 가능하다. ItemProcessor는 입력 확인 및 비즈니스 로직을 구현한다. 

  • 읽기(Read) — Database에서 배치 처리를 할 Data를 읽어온다.
  • 처리(Processing) — 읽어온 Data를 가공, 처리를 한다. (필수사항X)
  • 쓰기(Write) — 가공, 처리한 데이터를 Database에 저장한다.

 

Spring Batch에는 다양한 ItemReader와 ItemWriter가 존재한다.

대용량 배치 처리를 하게 되면 Item을 읽어 올 때 Paging 처리를 하는 게 효과적이다.

Spring Batch Reader에서는 이러한 Paging 처리를 지원하고 있다. 또한 적절한 Paging처리와 Chunk Size(한 번에 처리될 트랜잭션)를 설정하여 더욱 효과적인 배치 처리를 할 수 있다.

 

Paging Size와 Chunk Size

Paging Size가 10이며 Chunk Size가 20일 경우 2번의 Read가 이루어진 후에 1번의 Transaction이 수행된다.

한 번의 Transaction을 위해 2번의 쿼리 수행이 발생하게 되는 것이다.

적절한 Paging Size와 Chunk Size에 대해 Spring Batch에는 다음과 같이 적혀 있다.

Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.
페이지 크기를 상당히 크게 설정하고 페이지 크기와 일치하는 커밋 간격을 사용하면 성능이 향상됩니다.

한번의 Read 쿼리 수행 시 1번의 Transaction을 위해 두 설정의 값을 일치를 시키는 게 가장 좋은 성능 향상 방법이며 특별한 이유가 없는 한 Paging Size와 Chunk Size를 동일하게 설정하는 것을 추천한다.

 

PagingReader 사용 시 필수 사항

  • SQL에 Order By 지정
  • SQL에 Offset과 Limit 지정 

 

Chunk 지향 프로세싱의 장점

Chunk 지향 프로세싱을 사용하지 않는다 하더라도 개발자가 충분히 비슷한 로직으로 구현을 할 수도 있다.

하지만 Chunk 지향 프로세싱은 단순히 Chunk 단위의 트랜잭션만 제공해주는 것은 아니다.
내결함성 (Falut tolernat)을 위한 다양한 기능들을 제공하고 있다는 것이다.

 

레코드 건너뛰기(Skip)

public Step step() throws Exception {
        return stepBuilderFactory.get("ChunkStep")
        		//<Input Type, Output Type>
                .<UserVO,UserVO>chunk(CHUNK_AND_PAGE_SIZE)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .faultTolerant()
                .skip(Exception.class)
                .noSkip(IllegalAccessException.class)
                .skipLimit(10)
                .listener(loggingChunkListener)
                .build();
}

입력에서 레코드를 읽는 중에 에러가 발생했을 때는 몇 가지 선택지가 존재한다. 먼저 예외를 던져 처리를 멈추는 것이다.

이 방법은 너무 극단적이다. 1만 명에 사람에게 만원씩 입금해야 한다고 했을 때 1000번째에서 예외가 발생하였다고 생각해보자. 나머지 9000명은 돈을 받을 수 없다. 

 

Spring Batch는 그 대신 특정 예외가 발생했을 때 레코드를 건너뛰는 Skip 기능을 제공한다. 

 

레코드를 건너뛸지 여부를 결정할 때 고려해야 할 두 가지 요소가 있다. 
어떤 조건에서 레코드를 건너뛸 것인가, 특히 어떤 예외를 무시할 것인가이다. 
레코드를 건너뛰려면 어떤 예외를 건너뛰게 할지, 몇 번까지 예외를 허용할지 설정하기만 하면 된다.

 

건너뛰고 싶은 예외보다는 건너뛰고 싶지 않은 예외를 설정하는 구성이 더 간편할 수도 있다.

 

소스에 대한 해석은 다음과 같다.

Exception을 Skip 하고 IllegalAccessException은 Skip하지 않는다. 즉, IllegalAccessException만 Skip하지 않는다는 뜻이다. IllegalAccessException을 제외한 Exception을 상속한 모든 예외를 10번까지 건너뛸 수 있음을 나타낸다.

 

.listener(loggingChunkListener)

 

 

@Component
public class LoggingChunkListener implements ChunkListener {
	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	//Chunk 실행 전
	@Override
	public void beforeChunk(ChunkContext context) {
		StepContext stepContext = context.getStepContext();
		StepExecution stepExecution = stepContext.getStepExecution();
		logger.debug("###### beforeChunk : " + stepExecution.getReadCount());
	}

	//Chunk 실행 후
	@Override
	public void afterChunk(ChunkContext context) {	
		StepContext stepContext = context.getStepContext();
		StepExecution stepExecution = stepContext.getStepExecution();
		logger.debug("###### afterChunk : " + stepExecution.getCommitCount());
	}

	//Chunk 수행 중 Error가 발생했을시
	@Override
	public void afterChunkError(ChunkContext context) {
		StepContext stepContext = context.getStepContext();
		StepExecution stepExecution = stepContext.getStepExecution();
		logger.debug("##### afterChunkError : " + stepExecution.getRollbackCount());
	}

}

listener를 통해 Chunk별로 Log를 출력할 수 있다. afterChunkError메소드를 활용하면 예외처리가 발생한 Chunk들의 정보를 출력할 수 있을 것이다.


Example (Mybatis)

@Configuration
public class ExampleJobChunkConfiguration {

	@Autowired
	private JobBuilderFactory jobBuilderFactory;
	@Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    public SqlSessionFactory sqlSessionFactory;
    
    private int CHUNK_SIZE = 2;
    
    @Bean
    public Job exampleJobChunk() throws Exception {
        
        Job exampleJob = jobBuilderFactory.get("ExampleJobChunk")
                .start(step())
                .build();

        return exampleJob;
    }

    @Bean
    @JobScope
    public Step step() throws Exception {
        return stepBuilderFactory.get("ChunkStep")
        		//<Input Type, Output Type>
                .<UserVO,UserVO>chunk(CHUNK_SIZE)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public MyBatisPagingItemReader<UserVO> reader() throws Exception {
		/*
		 * Paging 처리 시 OrderBy는 필수
		 */
    	MyBatisPagingItemReader<UserVO> reader = new MyBatisPagingItemReader<>();
    	reader.setPageSize(CHUNK_SIZE);
    	reader.setSqlSessionFactory(sqlSessionFactory);
    	reader.setQueryId("Mapper.id");
    	
    	return reader;
    }

    @Bean
    @StepScope
    public ItemProcessor<UserVO, UserVO> processor(){

    	return new ItemProcessor<UserVO, UserVO>() {
            @Override
            public UserVO process(UserVO model) throws Exception {

            	model.setNick(model.getNick() + "1");

                return model;
            }
        };
    }

    @Bean
    @StepScope
    public MyBatisBatchItemWriter<UserVO> writer(){
    	MyBatisBatchItemWriter<UserVO> writer = new MyBatisBatchItemWriter<>();
    	writer.setSqlSessionFactory(sqlSessionFactory);
    	writer.setStatementId("mapper.id");
    	return writer;
    }

}

 

<select id="chunkSelectTest" resultType="VO경로">
	SELECT
		...
	FROM
		...
	ORDER BY COLUMN1, COLUMN2, ...
	LIMIT #{_skiprows}, #{_pagesize}
</select>

SQL에는 정렬을 해야 한다. Paging처리를 할 때마다 SQL이 실행되기 때문에 순서가 보장되어야 하기 때문이다.

또한 Offset과 Limit을 지정해야 Paging처리를 하여 순차적으로 조회할 수 있다.

 

@JobScope, @StepScope

Chunk 지향 처리 Example을 확인하면 @JobScope와 @StepScope Annotation을 확인할 수 있다.

@JobScope는 Step 선언문에 사용 가능하며 @StepScope는 Step을 구성하는 ItemReader, ItemProcessor, ItemWriter에 사용이 가능합니다.

@JobScope와 @StepScope는 Singleton 패턴이 아닌 Annotation이 명시된 메소드의 실행 시점에 Bean이 생성되게 된다. 또한 @JobScope와 @StepScope Bean이 생성될 때 JobParameter가 생성되기 때문에 JobParameter 사용하기 위해선 반드시 Scope를 지정해주어야 한다. 이는 LateBinding을 하여 JobParameter를 비즈니스 로직 단계에서 할당하여 보다 유연한 설계를 가능하게 하고 서로 다른 Step이 서로를 침범하지 않고 병렬로 실행되게 하기 위함입니다.

 


 

반응형

관련글 더보기

댓글 영역

>