当前位置: 首页 > news >正文

dubbo 异步化实践

@DubboService
public class AsyncOrderFacadeImpl implements AsyncOrderFacade {private Logger logger = LoggerFactory.getLogger(AsyncOrderFacadeImpl.class);// 构建线程池ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(1000, 1000, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));@Overridepublic OrderInfo queryOrderById(String id) {// 开启异步化操作模式,标识异步化模式开启AsyncContext asyncContext = RpcContext.startAsync();threadPoolExecutor.submit(() -> {// 同步queryOrderById方法所在的线程上下文信息到当前的子线程中asyncContext.signalContextSwitch();try {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {throw new RuntimeException(e);}// 使用asyncContext将结果返回asyncContext.write(new OrderInfo("GeekDubbo", "服务方异步方式之RpcContext.startAsync#" + id, new BigDecimal(2000)));logger.info("-------------- end --------------");});return null;}
}

org.apache.dubbo.rpc.RpcContext#startAsync

public static AsyncContext startAsync() throws IllegalStateException {// 获取上下文对象RpcContext currentContext = getContext();// AsyncContext 是Dubbo服务提供者端实现的异步处理的核心接口,用于将同步调用转为异步执行if (currentContext.asyncContext == null) {// 获取Dubbo上下文currentContext.asyncContext = new AsyncContextImpl();}// 创建CompletableFuture实例currentContext.asyncContext.start();return currentContext.asyncContext;
}

org.apache.dubbo.rpc.AsyncContextImpl

/*** 初始化上下文实例*/
public AsyncContextImpl() {// 获取当前线程绑定的RpcContext实例,包含请求和响应双向信息// 跨服务参数透传:消费端设置隐式参数,提供端通过getAttachment获取// 异步调用管理:通过getCompletableFuture()处理异步结果(消费端)或延迟写入响应(提供端)this.storedContext = RpcContext.getContext();// 获取服务端特有的响应上下文,用于回传数据(dubbo3.x中废弃,使用getServiceContext())]// 服务端响应附件元数据:服务端处理完成后,通过setAttachment回传签名,加密令牌等数据this.storedServerContext = RpcContext.getServerContext();
}

org.apache.dubbo.rpc.AsyncContextImpl#start

public void start() {if (this.started.compareAndSet(false, true)) {// 创建CompletableFuture实例// 该对象用于非阻塞式任务编排,支持链式调用和组合操作,简化多线程开发复杂度this.future = new CompletableFuture<>();}
}

org.apache.dubbo.rpc.AsyncContextImpl#signalContextSwitch

/*** 将主线程中的上下文对象同步到当前线程上下文中*/
@Override
public void signalContextSwitch() {RpcContext.restoreContext(storedContext);RpcContext.restoreServerContext(storedServerContext);// Restore any other contexts in here if necessary.
}

org.apache.dubbo.rpc.AsyncContextImpl#write

 @Overridepublic void write(Object value) {if (isAsyncStarted() && stop()) {if (value instanceof Throwable) {Throwable bizExe = (Throwable) value;future.completeExceptionally(bizExe);} else {// 将结果写回future.complete(value);}} else {throw new IllegalStateException("The async response has probably been wrote back by another thread, or the asyncContext has been closed.");}}

相关文章:

  • Python类和对象四(十三)
  • 【springboot知识】配置方式实现SpringCloudGateway相关功能
  • 通过Golang实现快速实现MCP Server
  • Go 语言中的实时交互式编程环境
  • 量子跃迁:Vue组件安全工程的基因重组与生态免疫(完全体终局篇)
  • 正则表达式 工作案例
  • docker 常用配置
  • python 画折线统计图
  • Linux下的I/O复用技术之epoll
  • 模型 隐含前提
  • MyBatis缓存配置的完整示例,包含一级缓存、二级缓存、自定义缓存策略等核心场景,并附详细注释和总结表格
  • Python部署Docker报错:curl: (56) Recv failure: Connection reset by peer
  • 强化学习:高级策略梯度理论与优化方法
  • leetcode110 平衡二叉树
  • 在QML中获取当前时间、IP和位置(基于网络请求)
  • Simple-BEV论文解析
  • module.noParse(跳过指定文件的依赖解析)
  • [贪心_8] 跳跃游戏 | 单调递增的数字 | 坏了的计算器
  • GitOps进化:深入探讨 Argo CD 及其对持续部署的影响
  • 青少年编程与数学 02-018 C++数据结构与算法 12课题、递归
  • 经济日报金观平:充分发挥增量政策的经济牵引力
  • 四川省人大常委会原党组成员、副主任宋朝华接受审查调查
  • 外交部:中方在黄海暂定海域建造渔业养殖设施,同中韩海域划界无关
  • 联手华为猛攻主流市场,上汽集团总裁:上汽不做生态孤岛
  • 神二十明日发射,长二F火箭推进剂加注工作已完成
  • 宝马董事长:继续倡导自由贸易和开放市场,坚信全球性挑战需要多协作而非对立,将引入DeepSeek