dubbo 异步化实践
@DubboService
public class AsyncOrderFacadeImpl implements AsyncOrderFacade {private Logger logger = LoggerFactory.getLogger(AsyncOrderFacadeImpl.class);// 构建线程池ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(1000, 1000, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));@Overridepublic OrderInfo queryOrderById(String id) {// 开启异步化操作模式,标识异步化模式开启AsyncContext asyncContext = RpcContext.startAsync();threadPoolExecutor.submit(() -> {// 同步queryOrderById方法所在的线程上下文信息到当前的子线程中asyncContext.signalContextSwitch();try {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {throw new RuntimeException(e);}// 使用asyncContext将结果返回asyncContext.write(new OrderInfo("GeekDubbo", "服务方异步方式之RpcContext.startAsync#" + id, new BigDecimal(2000)));logger.info("-------------- end --------------");});return null;}
}
org.apache.dubbo.rpc.RpcContext#startAsync
public static AsyncContext startAsync() throws IllegalStateException {// 获取上下文对象RpcContext currentContext = getContext();// AsyncContext 是Dubbo服务提供者端实现的异步处理的核心接口,用于将同步调用转为异步执行if (currentContext.asyncContext == null) {// 获取Dubbo上下文currentContext.asyncContext = new AsyncContextImpl();}// 创建CompletableFuture实例currentContext.asyncContext.start();return currentContext.asyncContext;
}
org.apache.dubbo.rpc.AsyncContextImpl
/*** 初始化上下文实例*/
public AsyncContextImpl() {// 获取当前线程绑定的RpcContext实例,包含请求和响应双向信息// 跨服务参数透传:消费端设置隐式参数,提供端通过getAttachment获取// 异步调用管理:通过getCompletableFuture()处理异步结果(消费端)或延迟写入响应(提供端)this.storedContext = RpcContext.getContext();// 获取服务端特有的响应上下文,用于回传数据(dubbo3.x中废弃,使用getServiceContext())]// 服务端响应附件元数据:服务端处理完成后,通过setAttachment回传签名,加密令牌等数据this.storedServerContext = RpcContext.getServerContext();
}
org.apache.dubbo.rpc.AsyncContextImpl#start
public void start() {if (this.started.compareAndSet(false, true)) {// 创建CompletableFuture实例// 该对象用于非阻塞式任务编排,支持链式调用和组合操作,简化多线程开发复杂度this.future = new CompletableFuture<>();}
}
org.apache.dubbo.rpc.AsyncContextImpl#signalContextSwitch
/*** 将主线程中的上下文对象同步到当前线程上下文中*/
@Override
public void signalContextSwitch() {RpcContext.restoreContext(storedContext);RpcContext.restoreServerContext(storedServerContext);// Restore any other contexts in here if necessary.
}
org.apache.dubbo.rpc.AsyncContextImpl#write
@Overridepublic void write(Object value) {if (isAsyncStarted() && stop()) {if (value instanceof Throwable) {Throwable bizExe = (Throwable) value;future.completeExceptionally(bizExe);} else {// 将结果写回future.complete(value);}} else {throw new IllegalStateException("The async response has probably been wrote back by another thread, or the asyncContext has been closed.");}}