Spring Batch는 엔터프라이즈 시스템의 운영에 있어 대용량 일괄처리의 편의를 위해 설계된 가볍고 포괄적인 배치 프레임워크
아래와 같은 경우 많이 사용한다.
스프링 배치는 계층 구조가 다음과 같이 설계되어 있기 때문에 개발자는 Application 계층의 비즈니스 로직에 집중할 수 있고, 배치의 동작과 관련된 것은 Batch Core에 있는 클래스들을 이용하여 제어할 수 있다.
Job
Job은 배치 처리 과정을 하나의 단위로 만들어 표현한 객체이고 여러 Step 인스턴스를 포함하는 컨테이너이다.
반드시 하나 이상의 Step으로 구성해야한다.
JobInstance
JobInstance는 Job의 실행의 단위. Job을 실행시키게 되면 하나의 JobInstance가 생성된다. 예를 들어 1월 1일 실행, 1월 2일 실행을 하게 되면 각각의 JobInstance가 생성되며 1월 1일 실행한 JobInstance가 실패하여 다시 실행을 시키더라도 이 JobInstance는 1월 1일에 대한 데이터만 처리하게 된다.
JobParameters
JobInstance는 Job의 실행 단위라고 했다. 그렇다면 JonInstance는 어떻게 구별할까? 이는 바로 JobParameters 객체로 구분하게 된다.. JobParameters는 JobInstance 구별 외에도 개발자 JobInstacne에 전달되는 매개변수 역할도 하고 있다.
또한 JobParameters는 String, Double, Long, Date 4가지 형식만을 지원하고 있다.
JobExecution
JobExecution은 JobInstance에 대한 실행 시도에 대한 객체이다. 1월 1일에 실행한 JobInstacne가 실패하여 재실행을 하여도 동일한 JobInstance를 실행시키지만 2번에 실행에 대한 JobExecution은 개별로 생기게 된다. JobExecution는 이러한 JobInstance 실행에 대한 상태, 시작시간, 종료시간, 생성시간 등의 정보를 담고 있다
ExecutionContext
ExecutionContext란 Job에서 데이터를 공유할 수 있는 데이터 저장소이다. Spring Batch에서 제공하는 ExecutionContext는 JobExecutionContext, StepExecutionContext 두 가지 종류가 있으나 이 두 가지는 지정되는 범위가 다르다.
ExecutionContext를 통해 Step 간 Data 공유가 가능하며 Job 실패 시 ExecutionContext를 통한 마지막 실행 값을 재구성할 수 있습니다.
JobRepository
JobRepository는 위에서 말한 모든 배치 처리 정보를 담고 있다. Job이 실행되게 되면 JobRepository에 JobExecution과 StepExecution을 생성하게 되며 JobRepository에서 Execution 정보들을 저장하고 조회하며 사용하게 된다.
JobLauncher
Job을 실행하는 역할을 담당한다. Job.execute을 호출하는 역할이다.
Job의 재실행 가능 여부 검증, 잡의 실행 방법, 파라미터 유효성 검증 등을 수행한다.
스프링 부트의 환경에서는 부트가 Job을 시작하는 기능을 제공하므로, 일반적으로 직접 다룰 필요가 없는 컴포넌트다.
Job을 실행하면 해당 JOB은 각 Step을 실행한다. 각 스텝이 실행되면 JobRepository는 현재 상태로 갱신된다.
Step
Batch Job을 구성하는 독립적인 하나의 단계로 스프링 배치에서 가장 일반적인 상태 단위이다.
모든 Job은 하나 이상의 Step으로 구성되며, 배치 작업을 어떻게 구성하고 실행할 것인지 Job의 세부 작업을 Task 기반으로 설정하고 명세해놓은 객체이다.
Step에는 Tasklet, Chunk 기반으로 2가지가 있다.
StepExecution
StepExecution은 JobExecution과 동일하게 Step 실행 시도에 대한 객체를 나타낸다. 하지만 Job이 여러 개의 Step으로 구성되어 있을 경우 이전 단계의 Step이 실패하게 되면 다음 단계가 실행되지 않음으로 실패 이후 StepExecution은 생성되지 않는다. StepExecution 또한 JobExecution과 동일하게 실제 시작이 될 때만 생성된다. StepExecution에는 JobExecution에 저장되는 정보 외에 read 수, write 수, commit 수, skip 수 등의 정보들도 저장이 된다.
Tasklet
Step이 중지될 때까지 execute 메서드가 계속 반복해서 수행하고 수행할 때마다 독립적인 트랜잭션이 얻어진다. 초기화, 저장 프로시저 실행, 알림 전송과 같은 잡에서 일반적으로 사용된다.
Chunk
한 번에 하나씩 데이터(row)를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것
Chunk 단위로 트랜잭션을 수행하기 때문에 실패할 경우엔 해당 Chunk 만큼만 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영이 된다.
Chunk 기반 Step은 ItemReader, ItemProcessor, ItemWriter라는 3개의 주요 부분으로 구성될 수 있다.
ItemReader, ItemProcessor, ItemWriter
Chunk 모델을 구현하면서 데이터의 입력/처리/출력 3가지 프로세스로 분할하기 위한 인터페이스로 데이터 IO를 담당하는 ItemReader와 ItemWriter는 데이터 베이스와 파일을 Java객체 컨버팅을 제공하기에 Spring Batch를 사용하는 것으로 충분히 컨버팅이 가능하다. ItemProcessor는 입력 확인 및 비즈니스 로직을 구현한다.
일반적으로 스프링 배치는 대용량 데이터를 다루는 경우가 많기 때문에 Tasklet보다 상대적으로 트랜잭션의 단위를 짧게 하여 처리할 수 있는 ItemReader, ItemProcessor, ItemWriter를 이용한 Chunk 지향 프로세싱을 이용한다.
작업로직이 Chunk 지향 처리를 하기에는 너무 간단하거나, 부자연스러운경우 Tasklet을 사용하는 것이 좋다.
@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchApplication.class, args);
}
}
@EnableBatchProcessing
Batch Infrastructure를 위한 대부분의 스프링 빈 정의를 제공한다. 그렇기에 아래 컴포넌트를 직접 포함시킬 필요가 없다.
- JobRepository : 실행 중인 잡의 상태를 기록하는 데 사용
- JobLauncher : 잡을 구동 하는 데 사용
- JobExplorer : JobRepository를 사용해 읽기 전용 작업을 수행하는 데 사용
- JobRegistry : 특정한 런처 구현체를 사용할 때 잡을 찾는 용도로 사용
- PlaformTransactionManager : 잡 진행 과정에서 트랜잭션을 다루는 데 사용
- JobBuilderFactory : 잡을 생성하는 빌더
- StepBuilderFactory : 스텝을 생성하는 빌더
@Configuration // Spring Batch의 모든 Job은 @Configuration으로 등록해서 사용해야 한다.
public class HelloJobConfiguration {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private JobBuilderFactory jobBuilderFactory; // Job 빌더 생성용
@Autowired
private StepBuilderFactory stepBuilderFactory; // Step 빌더 생성용
/*
* JobBuilderFactory를 통해서 helloJob을 생성
*/
@Bean
public Job helloJob() {
return jobBuilderFactory.get("helloJob") // helloJob이라는 이름으로 Job을 생성한다.
.start(helloStep())
.next(helloNextStep())
.build();
}
/*
* StepBuilderFactory를 통해서 helloStep을 생성
*/
@Bean
public Step helloStep() {
return stepBuilderFactory.get("helloStep") //helloStep이라는 이름으로 Step을 생성한다.
.tasklet((contribution, chunkContext) -> {
logger.debug("hello Step");
return RepeatStatus.FINISHED;
})
.build();
}
/*
* StepBuilderFactory를 통해서 helloNextStep을 생성
*/
@Bean
public Step helloNextStep() {
return stepBuilderFactory.get("helloNextStep") //helloNextStep이라는 이름으로 Step을 생성한다.
.tasklet(new HelloNextTasklet()) // Tasklet 설정
.build();
}
}
public class HelloNextTasklet implements Tasklet{
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
logger.debug("excute HelloNext tasklet");
return RepeatStatus.FINISHED;
}
}
API
@Configuration
public class JobFlowStepConfiguration {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job jobFlow(){
Job exampleJob = jobBuilderFactory.get("jobFlow")
.start(startStep())
.on("FAILED").to(failOverStep()) //startStep의 ExitStatus가 FAILED일 경우 failOverStep을 실행 시킨다.
.on("*").to(writeStep()) //failOverStep의 결과와 상관없이 writeStep을 실행 시킨다.
.on("*").end() //writeStep의 결과와 상관없이 Flow를 종료시킨다.
.from(startStep())
.on("COMPLETED").to(processStep()) //startStep이 FAILED가 아니고 COMPLETED일 경우 processStep을 실행 시킨다
.on("*").to(writeStep()) //processStep의 결과와 상관없이 writeStep을 실행 시킨다.
.on("*").end() //writeStep의 결과와 상관없이 Flow를 종료시킨다.
.from(startStep()).on("*") //startStep의 결과가 FAILED, COMPLETED가 아닌 모든 경우
.to(writeStep()) //write Step을 실행시킨다.
.on("*").end() //writeStep의 결과와 상관없이 Flow를 종료시킨다.
.end()
.build();
return exampleJob;
}
@Bean
public Step startStep() {
return stepBuilderFactory.get("startStep")
.tasklet((contribution, chunkContext) -> {
logger.debug("======================Start Step START======================");
logger.debug("Start Step!");
logger.debug("======================Start Step END======================");
String result = "COMPLETED";
//String result = "FAIL";
//String result = "UNKNOWN";
//Flow에서 on은 RepeatStatus가 아닌 ExitStatus를 바라본다.
if(result.equals("COMPLETED"))
contribution.setExitStatus(ExitStatus.COMPLETED);
else if(result.equals("FAIL"))
contribution.setExitStatus(ExitStatus.FAILED);
else if(result.equals("UNKNOWN"))
contribution.setExitStatus(ExitStatus.UNKNOWN);
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step failOverStep(){
return stepBuilderFactory.get("nextStep")
.tasklet((contribution, chunkContext) -> {
logger.debug("======================FailOver Step START======================");
logger.debug("FailOver Step!");
logger.debug("======================FailOver Step END======================");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step processStep(){
return stepBuilderFactory.get("processStep")
.tasklet((contribution, chunkContext) -> {
logger.debug("======================Process Step START======================");
logger.debug("Process Step!");
logger.debug("======================Process Step END======================");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step writeStep(){
return stepBuilderFactory.get("writeStep")
.tasklet((contribution, chunkContext) -> {
logger.debug("======================Process Step START======================");
logger.debug("Write Step!");
logger.debug("======================Process Step END======================");
return RepeatStatus.FINISHED;
})
.build();
}
}
spring.batch.job.enabled : true #실행시 Spring Batch 자동 실행 옵션
Schedule을 등록하지 않았기 때문에 applcation 설정 파일에 값을 추가하여 테스트하도록 하자.
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME NOT NULL,
START_TIME DATETIME DEFAULT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME NOT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
Spring Batch - Scheduler 등록 (0) | 2022.10.14 |
---|---|
Spring Batch - Step 데이터 공유(Excution Context) (0) | 2022.10.13 |
Spring Batch - 메타 테이블 (0) | 2022.10.12 |
Spring Batch - Chunk 지향 프로세싱(Mybatis) (0) | 2022.10.11 |
댓글 영역