@EnableAsync+@Async源码学习笔记之六
接上文,我们本文分析 AsyncExecutionAspectSupport
的源码:
package org.springframework.aop.interceptor;import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.NoUniqueBeanDefinitionException;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.lang.Nullable;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;/*** Base class for asynchronous method execution aspects, such as* {@code org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor}* or {@code org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect}.** <p>Provides support for <i>executor qualification</i> on a method-by-method basis.* {@code AsyncExecutionAspectSupport} objects must be constructed with a default {@code* Executor}, but each individual method may further qualify a specific {@code Executor}* bean to be used when executing it, e.g. through an annotation attribute.** @author Chris Beams* @author Juergen Hoeller* @author Stephane Nicoll* @since 3.1.2*/
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {/*** The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor".* <p>Note that the initial lookup happens by type; this is just the fallback* in case of multiple executor beans found in the context.* @since 4.2.6*/public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";protected final Log logger = LogFactory.getLog(getClass());private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);@Nullableprivate volatile Executor defaultExecutor;private AsyncUncaughtExceptionHandler exceptionHandler;@Nullableprivate BeanFactory beanFactory;/*** Create a new instance with a default {@link AsyncUncaughtExceptionHandler}.* @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}* or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific* executor has been requested via a qualifier on the async method, in which case the* executor will be looked up at invocation time against the enclosing bean factory*/public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {this(defaultExecutor, new SimpleAsyncUncaughtExceptionHandler());}/*** Create a new {@link AsyncExecutionAspectSupport} with the given exception handler.* @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}* or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific* executor has been requested via a qualifier on the async method, in which case the* executor will be looked up at invocation time against the enclosing bean factory* @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use*/public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {this.defaultExecutor = defaultExecutor;this.exceptionHandler = exceptionHandler;}/*** Supply the executor to be used when executing async methods.* @param defaultExecutor the {@code Executor} (typically a Spring {@code AsyncTaskExecutor}* or {@link java.util.concurrent.ExecutorService}) to delegate to, unless a more specific* executor has been requested via a qualifier on the async method, in which case the* executor will be looked up at invocation time against the enclosing bean factory* @see #getExecutorQualifier(Method)* @see #setBeanFactory(BeanFactory)* @see #getDefaultExecutor(BeanFactory)*/public void setExecutor(Executor defaultExecutor) {this.defaultExecutor = defaultExecutor;}/*** Supply the {@link AsyncUncaughtExceptionHandler} to use to handle exceptions* thrown by invoking asynchronous methods with a {@code void} return type.*/public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {this.exceptionHandler = exceptionHandler;}/*** Set the {@link BeanFactory} to be used when looking up executors by qualifier* or when relying on the default executor lookup algorithm.* @see #findQualifiedExecutor(BeanFactory, String)* @see #getDefaultExecutor(BeanFactory)*/@Overridepublic void setBeanFactory(BeanFactory beanFactory) {this.beanFactory = beanFactory;}/*** Determine the specific executor to use when executing the given method.* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.* @return the executor to use (or {@code null}, but just if no default executor is available)*/@Nullableprotected AsyncTaskExecutor determineAsyncExecutor(Method method) {AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {System.out.println("使用默认的执行器");targetExecutor = this.defaultExecutor;if (targetExecutor == null) {synchronized (this.executors) {if (this.defaultExecutor == null) {this.defaultExecutor = getDefaultExecutor(this.beanFactory);}targetExecutor = this.defaultExecutor;}}}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;}/*** Return the qualifier or bean name of the executor to be used when executing the* given async method, typically specified in the form of an annotation attribute.* Returning an empty string or {@code null} indicates that no specific executor has* been specified and that the {@linkplain #setExecutor(Executor) default executor}* should be used.* @param method the method to inspect for executor qualifier metadata* @return the qualifier if specified, otherwise empty String or {@code null}* @see #determineAsyncExecutor(Method)* @see #findQualifiedExecutor(BeanFactory, String)*/@Nullableprotected abstract String getExecutorQualifier(Method method);/*** Retrieve a target executor for the given qualifier.* @param qualifier the qualifier to resolve* @return the target executor, or {@code null} if none available* @since 4.2.6* @see #getExecutorQualifier(Method)*/@Nullableprotected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {if (beanFactory == null) {throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +" to access qualified executor '" + qualifier + "'");}return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);}/*** Retrieve or build a default executor for this advice instance.* An executor returned from here will be cached for further use.* <p>The default implementation searches for a unique {@link TaskExecutor} bean* in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.* If neither of the two is resolvable, this implementation will return {@code null}.* @param beanFactory the BeanFactory to use for a default executor lookup* @return the default executor, or {@code null} if none available* @since 4.2.6* @see #findQualifiedExecutor(BeanFactory, String)* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME*/@Nullableprotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {// Search for TaskExecutor bean... not plain Executor since that would// match with ScheduledExecutorService as well, which is unusable for// our purposes here. TaskExecutor is more clearly designed for it.return beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskExecutor bean found within the context, and none is named " +"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {logger.info("No task executor bean found for async processing: " +"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}return null;}/*** Delegate for actually executing the given task with the chosen executor.* @param task the task to execute* @param executor the chosen executor* @param returnType the declared return type (potentially a {@link Future} variant)* @return the execution result (potentially a corresponding {@link Future} handle)*/@Nullableprotected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {System.out.println("最终调用方法的地方");return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}else if (ListenableFuture.class.isAssignableFrom(returnType)) {System.out.println("最终调用方法的地方");return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}else if (Future.class.isAssignableFrom(returnType)) {System.out.println("最终调用方法的地方");return executor.submit(task);}else {System.out.println("最终调用方法的地方");executor.submit(task);return null;}}/*** Handles a fatal error thrown while asynchronously invoking the specified* {@link Method}.* <p>If the return type of the method is a {@link Future} object, the original* exception can be propagated by just throwing it at the higher level. However,* for all other cases, the exception will not be transmitted back to the client.* In that later case, the current {@link AsyncUncaughtExceptionHandler} will be* used to manage such exception.* @param ex the exception to handle* @param method the method that was invoked* @param params the parameters used to invoke the method*/protected void handleError(Throwable ex, Method method, Object... params) throws Exception {if (Future.class.isAssignableFrom(method.getReturnType())) {ReflectionUtils.rethrowException(ex);}else {// Could not transmit the exception to the caller with default executortry {this.exceptionHandler.handleUncaughtException(ex, method, params);}catch (Throwable ex2) {logger.error("Exception handler for async method '" + method.toGenericString() +"' threw unexpected exception itself", ex2);}}}}
我们主要看2个方法:determineAsyncExecutor
和 doSubmit
。
- 先看
determineAsyncExecutor
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {AsyncTaskExecutor executor = this.executors.get(method);if (executor == null) {Executor targetExecutor;String qualifier = getExecutorQualifier(method);if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}else {System.out.println("使用默认的执行器");targetExecutor = this.defaultExecutor;if (targetExecutor == null) {synchronized (this.executors) {if (this.defaultExecutor == null) {this.defaultExecutor = getDefaultExecutor(this.beanFactory);}targetExecutor = this.defaultExecutor;}}}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));this.executors.put(method, executor);}return executor;
}
首先,调用 getExecutorQualifier
方法,作用是获取线程池的名字,这是个抽象方法,子类 AnnotationAsyncExecutionInterceptor
中有实现,我们在前面的文章也讲过了,这里再简单看下源码:
@Override
@Nullable
protected String getExecutorQualifier(Method method) {// Maintainer's note: changes made here should also be made in// AnnotationAsyncExecutionAspect#getExecutorQualifierAsync async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);if (async == null) {async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);}System.out.println("找 @Async 注解。先找方法上的。找不到的话再找类上的。也就是说方法上的@Async注解的优先级要高于类上的。");return (async != null ? async.value() : null);
}
然后,调用 findQualifiedExecutor
方法,作用是根据上一步找到的线程池的名字,从容器中找对应的线程池,如下:
@Nullable
protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {if (beanFactory == null) {throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +" to access qualified executor '" + qualifier + "'");}return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
}
接着,如果没找到线程池的话,使用默认的线程池,代码如下,这里用到的是双重加锁检查
targetExecutor = this.defaultExecutor;
if (targetExecutor == null) {synchronized (this.executors) {if (this.defaultExecutor == null) {this.defaultExecutor = getDefaultExecutor(this.beanFactory);}targetExecutor = this.defaultExecutor;}
}
重点看下这个 getDefaultExecutor
方法,源码如下:
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {// Search for TaskExecutor bean... not plain Executor since that would// match with ScheduledExecutorService as well, which is unusable for// our purposes here. TaskExecutor is more clearly designed for it.return beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskExecutor bean found within the context, and none is named " +"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskExecutor bean", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {logger.info("No task executor bean found for async processing: " +"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}return null;
}
代码看着挺长的,其实不然,关键就是下面这一句:
beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
再看下默认的线程池的名字 DEFAULT_TASK_EXECUTOR_BEAN_NAME
的定义:
/*** The default name of the {@link TaskExecutor} bean to pick up: "taskExecutor".* <p>Note that the initial lookup happens by type; this is just the fallback* in case of multiple executor beans found in the context.* @since 4.2.6*/
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
看到了吧,默认的用于执行异步任务的线程池的名字就是 taskExecutor
。
接着往下看:
if (targetExecutor == null) {return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
如果没找到线程池,就返回 null ;如果找到了,判断类型是不是 AsyncListenableTaskExecutor
,是的话就直接返回,不是的话就用 TaskExecutorAdapter
包一下。这个地方不细讲了。有兴趣可以自己研究下 AsyncListenableTaskExecutor
和 TaskExecutorAdapter
。
- 然后就是看
doSubmit
方法了
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {System.out.println("最终调用方法的地方");return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}else if (ListenableFuture.class.isAssignableFrom(returnType)) {System.out.println("最终调用方法的地方");return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}else if (Future.class.isAssignableFrom(returnType)) {System.out.println("最终调用方法的地方");return executor.submit(task);}else {System.out.println("最终调用方法的地方");executor.submit(task);return null;}
}
这里边对返回值进行了判断,根据返回值的不同,走不同的分支,不管哪个分支,最终都是把任务交给了线程池。
至此,本系列文章结束。