@Async
アノテーションとは
非同期に処理を実行できるようにする仕組み。
参考 7. Task Execution and Scheduling
参考 Spring MVC(+Spring Boot)上での非同期リクエストを理解する -前編-
簡単なサンプル
pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
非同期にする処理に@Async
を付与する。(@Async
はメソッド単位でもクラス単位でも付与できる)
@Slf4j @Component class HeavyJob { @Async public void execute() { log.info("before heavy task"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } log.info("after heavy task"); } }
@EnableAsync
で、非同期処理を有効化する。
@EnableAsync
アノテーションをつけると有効になる仕組みは、前に書きました。
参照 アノテーションを処理するには
@EnableAsync @SpringBootApplication @Slf4j @RequiredArgsConstructor public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } private final HeavyJob heavyJob; @Bean public CommandLineRunner getCommandLineRunner() { return args -> { log.info("before heavyJob.execute()"); heavyJob.execute(); log.info("after heavyJob.execute()"); }; } }
すると、以下のように別スレッドで実行される。
2018-06-02 13:20:17.559 INFO 16620 --- [ main] com.example.demo.DemoApplication : before heavyJob.execute() 2018-06-02 13:20:17.561 INFO 16620 --- [ main] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either 2018-06-02 13:20:17.562 INFO 16620 --- [ main] com.example.demo.DemoApplication : after heavyJob.execute() 2018-06-02 13:20:17.564 INFO 16620 --- [cTaskExecutor-1] com.example.demo.HeavyJob : before heavy task 2018-06-02 13:20:18.564 INFO 16620 --- [cTaskExecutor-1] com.example.demo.HeavyJob : after heavy task
注意点
スレッドの生成
デフォルトのスレッド生成クラスはSimpleAsyncTaskExecutorで、要求ごとにスレッドを生成する。そのため、ThreadPoolTaskExecutorなどを利用して、スレッドを生成しすぎないように、また、再利用するように設定すべき。
参考 7.2.1. TaskExecutor types
参考 Spring MVC(+Spring Boot)上での非同期リクエストを理解する -前編-
例外ハンドリング
デフォルトだと、@Async
メソッドで発生した例外は例外トレースを出力するだけ。
@Slf4j @Component @RequiredArgsConstructor class HeavyJob { @Async public void execute() { log.info("before exception"); throw new RuntimeException("oops"); } }
以下のように、例外トレースが出力される。
2018-06-02 13:23:19.098 INFO 19120 --- [ main] com.example.demo.DemoApplication : before heavyJob.execute() 2018-06-02 13:23:19.100 INFO 19120 --- [ main] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either 2018-06-02 13:23:19.102 INFO 19120 --- [ main] com.example.demo.DemoApplication : after heavyJob.execute() 2018-06-02 13:23:19.104 INFO 19120 --- [cTaskExecutor-1] com.example.demo.HeavyJob : before exception 2018-06-02 13:23:19.107 ERROR 19120 --- [cTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected error occurred invoking async method 'public void com.example.demo.HeavyJob.execute()'. java.lang.RuntimeException: oops at com.example.demo.HeavyJob.execute(HeavyJob.java:16) ~[classes/:na] at com.example.demo.HeavyJob$$FastClassBySpringCGLIB$$bbd7ff10.invoke(<generated>) ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$313/1027200.call(Unknown Source) [spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
独自のハンドリングを入れたければ、AsyncUncaughtExceptionHandler を実装する。
@Slf4j @Configuration public class MyAsyncConfig extends AsyncConfigurerSupport { @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> { log.info("-----------------"); log.info("handle exception"); log.info("-----------------"); }; } }
2018-06-02 13:25:07.260 INFO 9428 --- [ main] com.example.demo.DemoApplication : before heavyJob.execute() 2018-06-02 13:25:07.262 INFO 9428 --- [ main] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either 2018-06-02 13:25:07.263 INFO 9428 --- [ main] com.example.demo.DemoApplication : after heavyJob.execute() 2018-06-02 13:25:07.265 INFO 9428 --- [cTaskExecutor-1] com.example.demo.HeavyJob : before exception 2018-06-02 13:25:07.265 INFO 9428 --- [cTaskExecutor-1] com.example.demo.MyAsyncConfig : ----------------- 2018-06-02 13:25:07.265 INFO 9428 --- [cTaskExecutor-1] com.example.demo.MyAsyncConfig : handle exception 2018-06-02 13:25:07.265 INFO 9428 --- [cTaskExecutor-1] com.example.demo.MyAsyncConfig : -----------------
スレッドローカルへのアクセス
@Async
を付けたメソッドは別スレッドで実行されるため、スレッドローカルで管理している値は参照できない。
例えば、@RequestScope
のBeanや@SessionScope
のBeanはDIできないし、RequestContextHolderも利用できない。
どうしても処理で使いたいなら、メソッド引数として渡す必要がある。
2018-06-02 13:30:24.631 ERROR 17784 --- [cTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected error occurred invoking async method 'public void com.example.demo.HeavyJob.execute()'. org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.user': Scope 'request' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request. at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:362) ~[spring-beans-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) ~[spring-beans-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:672) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at com.example.demo.User$$EnhancerBySpringCGLIB$$da7d348c.getName(<generated>) ~[classes/:na] at com.example.demo.HeavyJob.execute(HeavyJob.java:19) ~[classes/:na] at com.example.demo.HeavyJob$$FastClassBySpringCGLIB$$bbd7ff10.invoke(<generated>) ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$321/1810268094.call(Unknown Source) [spring-aop-5.0.6.RELEASE.jar:5.0.6.RELEASE] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25] Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request. at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131) ~[spring-web-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.web.context.request.AbstractRequestAttributesScope.get(AbstractRequestAttributesScope.java:42) ~[spring-web-5.0.6.RELEASE.jar:5.0.6.RELEASE] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) ~[spring-beans-5.0.6.RELEASE.jar:5.0.6.RELEASE] ... 13 common frames omitted
トランザクション
DataSourceTransactionManagerはスレッドごとにJDBCコネクションを管理するため、@Async
でトランザクションが分かれる。
参考 DataSourceTransactionManager
Binds a JDBC Connection from the specified DataSource to the current thread, potentially allowing for one thread-bound Connection per DataSource. Note: The DataSource that this transaction manager operates on needs to return independent Connections. The Connections may come from a pool (the typical case), but the DataSource must not return thread-scoped / request-scoped Connections or the like. This transaction manager will associate Connections with thread-bound transactions itself, according to the specified propagation behavior. It assumes that a separate, independent Connection can be obtained even during an ongoing transaction.
検証する
pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
- 「タスクを登録する」ボタンを押すと、サーバ側でDBに0-10件のランダムなタスクを登録する。
- サーバ側では、
@Async
アノテーションをつけたクラスを用意し、非同期にタスクを消化する。 - 非同期処理を開始したら、クライアントにレスポンスをとりあえず返す
- タスクは1sごとに1件消化し、その都度コミットする。
- dbはh2を利用する。
- h2のデフォルトのisolationレベルは「READ COMMITTED」
- Txがコミットされたタイミングで、別のTxから参照可能になる
- Springの
@Transactional
のpropagationはREQUIRED- Txが既に開始されていれば、それに参加する
- JDBCコネクションの管理がスレッドごとなら、別Txになるはず
したがって、画面から見たときにリアルタイムに更新されれば、Txが呼び出し元とは分かれていると言えるはず。
ソースコード
登録と非同期更新部分を抜粋。全ソースコードはgithubを参照してください。
参考 github
@Slf4j @RestController @AllArgsConstructor @RequestMapping("api/tasks") public class TaskRestController { final TaskService service; ... @PostMapping public Task execute() { Task task = service.register(); service.execute(task.getId()); return task; } }
非同期に、1sごとに1タスクを消化する。
@Slf4j @Service @AllArgsConstructor public class TaskServiceImpl implements TaskService { ... @Override @Transactional public void execute(int id) { Task task = taskMapper.findOne(id); log.info(task.toString()); // 1sごとにtaskを1こずつ消化していく while (task.getDone() < task.getAmount()) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } service.execute(task); } } }
メソッドに@Transactional
をつけ、親Txに参加するかどうか調べる。
@Slf4j @Service @AllArgsConstructor public class ExecuteServiceImpl implements ExecuteService { final TaskMapper taskMapper; @Override @Async @Transactional public void execute(Task task) { task.setDone(task.getDone() + 1); taskMapper.update(task); log.info(task.toString()); } }
検証結果
@Async
でTxが分かれる。