비동기로 요청을 처리하면 작업이 순차적으로 이루어지는 것이 아닌 병렬적으로 처리한다는 뜻이다.
Spring에서는 간단히 비동기를 구현할 수 있도록 @Async 어노테이션을 제공한다.
최종적으로는 CompletableFuture와 @Async를 함께 사용하여 N개의 Task를 병렬처리할 것이다.
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.devlog.common.exception.AsyncExceptionHandler;
@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer{
@Bean(name = "asyncThreadPoolTaskExecutor")
public Executor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(100);
taskExecutor.setThreadNamePrefix("asyncThreadPoolTaskExecutor-");
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}
AsyncConfigurer 인터페이스를 구현하여 별도의 TaskExecutor를 설정한다.
별도의 TaskExecutor를 설정을 해주지 않으면, SimpleAsyncTaskExecutor가 기본적으로 사용된다.
SimpleAsyncTaskExecutor는 스레드 풀을 사용하지 않고, 매 요청마다 새로운 스레드를 생성해 작업을 수행한다.
따라서 스레드 풀 기반의 TaskExecutor을 사용하도록 설정해야 한다.
명시적으로 적지 않아도 빈으로 등록될 때 initialize()한다.
@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer{
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
//Custom Exception
return new AsyncExceptionHandler();
}
}
반환 값이 void인 경우 예외는 호출 스레드에 전달되지 않는다.
getAsyncUncaughtExceptionHandler을 Override 하여 호출 스레드에 전달될 수 있도록 설정한다.
@Slf4j
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler{
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error( ex.getMessage(), ex);
}
}
@RequestMapping("/VoidAsync")
public ResponseEntity<Void> voidAsync(@RequestParam Map<String, Object> paramMap) {
log.info("=============================voidAsync START==============================");
for(int i = 0 ; i < 5 ; i++){
asyncService.voidAsyncMethod(i);
}
log.info("=============================voidAsync END==============================");
return ResponseEntity.ok().build();
}
@Slf4j
@Service
@RequiredArgsConstructor
public class AsyncService {
@Async("asyncThreadPoolTaskExecutor")
public void voidAsyncMethod(int number){
if (number % 2 != 0){
log.info(String.format("[%s] Exception", number));
throw new RuntimeException();
}
log.info("Thread Name : " + Thread.currentThread().getName());
}
}
반환 값이 void인 경우 예외는 호출 스레드에 전달되지 않는다.
getAsyncUncaughtExceptionHandler을 Override 하여 호출 스레드에 전달될 수 있도록 설정한다.
@RequestMapping("/ListenableFutureAsyncMethod")
public ResponseEntity<Void> listenableFutureAsyncMethod(@RequestParam Map<String, Object> paramMap) {
log.info("=============================listenableFutureAsyncMethod START==============================");
for (int i = 1; i <=10; i++){
ListenableFuture<String> listenableFuture = asyncService.listenableFutureAsyncMethod(i);
listenableFuture.addCallback(result -> log.info(result),
error -> error.printStackTrace());
/*
* addCallback(success, failure)
* addCallback(ListenableFutureCallback 구현)
*/
}
log.info("=============================listenableFutureAsyncMethod END==============================");
return ResponseEntity.ok().build();
}
addCallback메서드의 첫 번째 파라미터는 성공 시 실행할 것을, 두 번째 파라미터는 실패 시 실행할 것을 지정해 주면 된다.
@Slf4j
@Service
@RequiredArgsConstructor
public class AsyncService {
@Async("asyncThreadPoolTaskExecutor")
public ListenableFuture<String> listenableFutureAsyncMethod(int number){
if (number % 2 != 0){
throw new RuntimeException();
}
return new AsyncResult<>("성공" + number);
}
}
addCallback 메서드를 사용하여 호출 스레드에서 성공, 실패를 제어할 수 있
@RequestMapping("/CompletableFutureAsyncMethod")
public ResponseEntity<Void> completableFutureAsyncMethod(@RequestParam Map<String, Object> paramMap) {
log.info("=============================completableFutureAsyncMethod START==============================");
List<CompletableFuture<String>> completableFutureList = new ArrayList<CompletableFuture<String>>();
for (int i = 1; i <=30; i++){
CompletableFuture<String> result = asyncService.completableFutureAsyncMethod(i);
completableFutureList.add(result);
}
List<String> resultList = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]))
.thenApply(result -> completableFutureList.stream()
.map(completableFuture -> completableFuture.join())
.collect(Collectors.toList()))
.join();
log.info(resultList.toString());
log.info("=============================completableFutureAsyncMethod END==============================");
return ResponseEntity.ok().build();
}
allOf 메서드를 사용하면 여러 개의 CompletableFuture을 동시에 실행하고, 모든 작업 결과에 콜백을 실행한다.
하지만, 모든 CompletableFuture의 결과를 결합한 결괏값을 반환할 수 없는 한계가 있다.
join 메서드를 활용하여 allOf 메서드의 한계를 극복할 수 있다.
CompletableFuture의 결과가 사용 가능할 때까지 기다리며 결과가 사용 가능해지면 그 결과를 반환한다.
단, CompletableFuture가 정상적으로 완료되지 않을 경우 UncheckedExecutionException이 발생할 수 있다 는 것을 고려해야 한다.
@Slf4j
@Service
@RequiredArgsConstructor
public class AsyncService {
@Async("asyncThreadPoolTaskExecutor")
public CompletableFuture<String> completableFutureAsyncMethod(int number){
try {
if (number % 2 != 0){
log.info(String.format("[%s] RuntimeException", number));
throw new RuntimeException();
}
return CompletableFuture.completedFuture("성공" + number );
}catch (Exception e) {
return CompletableFuture.completedFuture("실패" + number );
}
}
}
호출 스레드가 모든 Task의 수행결과를 기다린 후 수행되는 것을 START, END 로그로 확인할 수 있다.
댓글 영역