当前位置: 首页 > news >正文

@EnableAsync+@Async源码学习笔记之六

接上文,我们本文分析 AsyncExecutionAspectSupport 的源码:


package org.springframework.aop.interceptor;import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.NoUniqueBeanDefinitionException;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.lang.Nullable;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;/*** Base class for asynchronous method execution aspects, such as* {@code org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor}* or {@code org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect}.** <p>Provides support for <i>executor qualification</i> on a method-by-method basis.* {@code AsyncExecutionAspectSupport} objects must be constructed with a default {@code* Executor}, but each individual method may further qualify a specific {@code Executor}* bean to be used when executing it, e.g. through an annotation attribute.** @author Chris Beams* @author Juergen Hoeller* @author Stephane Nicoll* @since 3.1.2*/
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {/*** The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor".* <p>Note that the initial lookup happens by type; this is just the fallback* in case of multiple executor beans found in the context.* @since 4.2.6*/public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";protected final Log logger = LogFactory.getLog(getClass());private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);@Nullableprivate volatile Executor defaultExecutor;private AsyncUncaughtExceptionHandler exceptionHandler;@Nullableprivate BeanFactory beanFactory;/*** Create a new instance with a default {@link AsyncUncaughtExceptionHandler}.* @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}* or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific* executor has been requested via a qualifier on the async method, in which case the* executor will be looked up at invocation time against the enclosing bean factory*/public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {this(defaultExecutor, new SimpleAsyncUncaughtExceptionHandler());}/*** Create a new {@link AsyncExecutionAspectSupport} with the given exception handler.* @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}* or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific* executor has been requested via a qualifier on the async method, in which case the* executor will be looked up at invocation time against the enclosing bean factory* @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use*/public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {this.defaultExecutor = defaultExecutor;this.exceptionHandler = exceptionHandler;}/*** Supply the executor to be used when executing async methods.* @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}* or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific* executor has been requested via a qualifier on the async method, in which case the* executor will be looked up at invocation time against the enclosing bean factory* @see #getExecutorQualifier(Method)* @see #setBeanFactory(BeanFactory)* @see #getDefaultExecutor(BeanFactory)*/public void setExecutor(Executor defaultExecutor) {this.defaultExecutor = defaultExecutor;}/*** Supply the {@link AsyncUncaughtExceptionHandler} to use to handle exceptions* thrown by invoking asynchronous methods with a {@code void} return type.*/public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {this.exceptionHandler = exceptionHandler;}/*** Set the {@link BeanFactory} to be used when looking up executors by qualifier* or when relying on the default executor lookup algorithm.* @see #findQualifiedExecutor(BeanFactory, String)* @see #getDefaultExecutor(BeanFactory)*/@Overridepublic void setBeanFactory(BeanFactory beanFactory) {this.beanFactory = beanFactory;}/*** Determine the specific executor to use when executing the given method.* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.* @return the executor to use (or {@code null}, but just if no default executor is available)*/@Nullableprotected AsyncTaskExecutor determineAsyncExecutor(Method method) {AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {System.out.println("使用默认的执行器");targetExecutor = this.defaultExecutor;if (targetExecutor == null) {synchronized (this.executors) {if (this.defaultExecutor == null) {this.defaultExecutor = getDefaultExecutor(this.beanFactory);}targetExecutor = this.defaultExecutor;}}}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;}/*** Return the qualifier or bean name of the executor to be used when executing the* given async method, typically specified in the form of an annotation attribute.* Returning an empty string or {@code null} indicates that no specific executor has* been specified and that the {@linkplain #setExecutor(Executor) default executor}* should be used.* @param method the method to inspect for executor qualifier metadata* @return the qualifier if specified, otherwise empty String or {@code null}* @see #determineAsyncExecutor(Method)* @see #findQualifiedExecutor(BeanFactory, String)*/@Nullableprotected abstract String getExecutorQualifier(Method method);/*** Retrieve a target executor for the given qualifier.* @param qualifier the qualifier to resolve* @return the target executor, or {@code null} if none available* @since 4.2.6* @see #getExecutorQualifier(Method)*/@Nullableprotected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {if (beanFactory == null) {throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +" to access qualified executor '" + qualifier + "'");}return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);}/*** Retrieve or build a default executor for this advice instance.* An executor returned from here will be cached for further use.* <p>The default implementation searches for a unique {@link TaskExecutor} bean* in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.* If neither of the two is resolvable, this implementation will return {@code null}.* @param beanFactory the BeanFactory to use for a default executor lookup* @return the default executor, or {@code null} if none available* @since 4.2.6* @see #findQualifiedExecutor(BeanFactory, String)* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME*/@Nullableprotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {// Search for TaskExecutor bean... not plain Executor since that would// match with ScheduledExecutorService as well, which is unusable for// our purposes here. TaskExecutor is more clearly designed for it.return beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskExecutor bean found within the context, and none is named " +"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {logger.info("No task executor bean found for async processing: " +"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}return null;}/*** Delegate for actually executing the given task with the chosen executor.* @param task the task to execute* @param executor the chosen executor* @param returnType the declared return type (potentially a {@link Future} variant)* @return the execution result (potentially a corresponding {@link Future} handle)*/@Nullableprotected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {System.out.println("最终调用方法的地方");return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}else if (ListenableFuture.class.isAssignableFrom(returnType)) {System.out.println("最终调用方法的地方");return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}else if (Future.class.isAssignableFrom(returnType)) {System.out.println("最终调用方法的地方");return executor.submit(task);}else {System.out.println("最终调用方法的地方");executor.submit(task);return null;}}/*** Handles a fatal error thrown while asynchronously invoking the specified* {@link Method}.* <p>If the return type of the method is a {@link Future} object, the original* exception can be propagated by just throwing it at the higher level. However,* for all other cases, the exception will not be transmitted back to the client.* In that later case, the current {@link AsyncUncaughtExceptionHandler} will be* used to manage such exception.* @param ex the exception to handle* @param method the method that was invoked* @param params the parameters used to invoke the method*/protected void handleError(Throwable ex, Method method, Object... params) throws Exception {if (Future.class.isAssignableFrom(method.getReturnType())) {ReflectionUtils.rethrowException(ex);}else {// Could not transmit the exception to the caller with default executortry {this.exceptionHandler.handleUncaughtException(ex, method, params);}catch (Throwable ex2) {logger.error("Exception handler for async method '" + method.toGenericString() +"' threw unexpected exception itself", ex2);}}}}

我们主要看2个方法:determineAsyncExecutordoSubmit

  • 先看 determineAsyncExecutor
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {System.out.println("使用默认的执行器");targetExecutor = this.defaultExecutor;if (targetExecutor == null) {synchronized (this.executors) {if (this.defaultExecutor == null) {this.defaultExecutor = getDefaultExecutor(this.beanFactory);}targetExecutor = this.defaultExecutor;}}}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;
}

首先,调用 getExecutorQualifier 方法,作用是获取线程池的名字,这是个抽象方法,子类 AnnotationAsyncExecutionInterceptor 中有实现,我们在前面的文章也讲过了,这里再简单看下源码:

@Override
@Nullable
protected String getExecutorQualifier(Method method) {// Maintainer's note: changes made here should also be made in// AnnotationAsyncExecutionAspect#getExecutorQualifierAsync async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);if (async == null) {async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);}System.out.println("找 @Async 注解。先找方法上的。找不到的话再找类上的。也就是说方法上的@Async注解的优先级要高于类上的。");return (async != null ? async.value() : null);
}

然后,调用 findQualifiedExecutor 方法,作用是根据上一步找到的线程池的名字,从容器中找对应的线程池,如下:

@Nullable
protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {if (beanFactory == null) {throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +" to access qualified executor '" + qualifier + "'");}return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
}

接着,如果没找到线程池的话,使用默认的线程池,代码如下,这里用到的是双重加锁检查

targetExecutor = this.defaultExecutor;
if (targetExecutor == null) {synchronized (this.executors) {if (this.defaultExecutor == null) {this.defaultExecutor = getDefaultExecutor(this.beanFactory);}targetExecutor = this.defaultExecutor;}
}

重点看下这个 getDefaultExecutor 方法,源码如下:

@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {// Search for TaskExecutor bean... not plain Executor since that would// match with ScheduledExecutorService as well, which is unusable for// our purposes here. TaskExecutor is more clearly designed for it.return beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskExecutor bean found within the context, and none is named " +"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {logger.info("No task executor bean found for async processing: " +"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}return null;
}

代码看着挺长的,其实不然,关键就是下面这一句:

beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);

再看下默认的线程池的名字 DEFAULT_TASK_EXECUTOR_BEAN_NAME 的定义:

/*** The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor".* <p>Note that the initial lookup happens by type; this is just the fallback* in case of multiple executor beans found in the context.* @since 4.2.6*/
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";

看到了吧,默认的用于执行异步任务的线程池的名字就是 taskExecutor
接着往下看:

if (targetExecutor == null) {return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);

如果没找到线程池,就返回 null ;如果找到了,判断类型是不是 AsyncListenableTaskExecutor ,是的话就直接返回,不是的话就用 TaskExecutorAdapter 包一下。这个地方不细讲了。有兴趣可以自己研究下 AsyncListenableTaskExecutorTaskExecutorAdapter

  • 然后就是看 doSubmit 方法了
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {System.out.println("最终调用方法的地方");return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}else if (ListenableFuture.class.isAssignableFrom(returnType)) {System.out.println("最终调用方法的地方");return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}else if (Future.class.isAssignableFrom(returnType)) {System.out.println("最终调用方法的地方");return executor.submit(task);}else {System.out.println("最终调用方法的地方");executor.submit(task);return null;}
}

这里边对返回值进行了判断,根据返回值的不同,走不同的分支,不管哪个分支,最终都是把任务交给了线程池。
至此,本系列文章结束。

相关文章:

  • 【自动化测试框架】什么是对象层?
  • [密码学基础]密码学常用名词深度解析:从基础概念到实战应用
  • npm 常用操作和配置
  • 国产GPU生态现状评估:从寒武纪到壁仞的编程适配挑战
  • DeepSeek与Napkin:信息可视化领域的创新利器
  • 安徽合肥京东自营代运营如何突围?
  • CSRF 请求伪造Referer 同源置空配合 XSSToken 值校验复用删除
  • 第3章 垃圾收集器与内存分配策略《深入理解Java虚拟机:JVM高级特性与最佳实践(第3版)》
  • FPGA练习———DDS波形发生器
  • Linux419 三次握手四次挥手抓包 wireshark
  • Dubbo(65)如何实现Dubbo的服务文档生成?
  • ThingsBoard3.9.1 MQTT Topic(3)
  • Python实现对目标Word文档进行自动化排版【4万字精讲】(14)
  • 玩转Docker | 使用Docker部署tududi任务管理工具
  • 【深度学习—李宏毅教程笔记】Transformer
  • verilog float mult
  • 详细的PyCharm安装教程
  • Java学习手册:Web 应用架构概述
  • B端APP设计:打破传统限制,为企业开启便捷新通道
  • 电脑 访问 github提示 找不到网页,处理方案
  • 郑州卫健委通报郑飞医院“血液净化”问题:拟撤销该院血液净化技术备案
  • 中央民族乐团团长赵聪已任文旅部艺术司司长
  • 俄乌互指对方未遵守复活节临时停火提议
  • 数智时代出版专业技能人才培养研讨会在沪举行
  • 解除近70家煤电厂有毒物质排放限制,特朗普能重振煤炭吗?
  • 解放日报头版:再出发再创业,浦东以开放拥抱世界