当前位置: 首页 > news >正文

dubbo 隐式传递

隐式传递

隐式传递的应用

  • 传递请求流水号,分布式应用中通过链路追踪号来全局检索日志
  • 传递用户信息,以便不同系统在处理业务逻辑时可以获取用户层面的一些信息
  • 传递凭证信息,以便不同系统可以有选择性地取出一些数据做业务逻辑,比如 Cookie、Token 等

自定义过滤器的四个步骤

  1. 创建一个自定义的类,并实现 org.apache.dubbo.rpc.Filter 接口
  2. 在自定义类中的 invoke 方法中实现传递逻辑,提供方过滤器从 invocation 取出 traceId 并设置到 ClientAttachment、MDC 中
  3. 将自定义的类路径添加到 META-INF/dubbo/org.apache.dubbo.rpc.Filter 文件中,并取个别名
  4. 将过滤器配置到应用中

服务提供者

ReqNoProviderFilter

package com.doudou.demo.filter;import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;import java.util.Map;
import java.util.Objects;
import java.util.UUID;import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;@Activate(group = PROVIDER, order = -9999)
public class ReqNoProviderFilter implements Filter {private Logger logger = LoggerFactory.getLogger(ReqNoProviderFilter.class);public static final String TRACE_ID = "traceId";@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {// 获取当前 RPC 调用中传递的 ‌隐式参数集合‌(附加信息),返回类型为键值对的 Map,其中值可以是任意对象(Object)Map<String, Object> objectAttachments = invocation.getObjectAttachments();// 获取调用方传来的TRACE_IDString traceId = Objects.nonNull(objectAttachments) ? (String) objectAttachments.get(TRACE_ID) : null;logger.info("traceId:{}", traceId);// 调用方未设置TRACE_ID时,初始化新的if (traceId == null) {traceId = generateTraceId();logger.info("traceId:{}", traceId);}// 将TRACE_ID设置到上下文中// 获取服务端特有的响应上下文,用于回传数据(dubbo3.x中废弃,使用getServiceContext())]// 服务端响应附件元数据:服务端处理完成后,通过setAttachment回传签名,加密令牌等数据RpcContext.getServerContext().setAttachment(TRACE_ID, traceId);// 将序列号设置到日志打印器中,方便在日志中打印MDC.put(TRACE_ID, traceId);// 继续后续过滤器的调用return invoker.invoke(invocation);}/*** 生成一个唯一字符串** @return*/private String generateTraceId() {return UUID.randomUUID().toString().replaceAll("-", "");}
}

META-INF/dubbo/org.apache.dubbo.rpc.Filter

traceId=com.doudou.demo.filter.ReqNoProviderFilter

AsyncOrderProviderStartApplication

package com.doudou.demo;import com.doudou.demo.api.AsyncOrderFacade;
import com.doudou.demo.api.UserQueryFacade;
import com.doudou.demo.api.UserQueryFacade3;
import com.doudou.demo.service.AsyncOrderFacadeImpl;
import com.doudou.demo.service.AsyncUserQueryFacade2Impl;
import com.doudou.demo.service.AsyncUserQueryFacadeImpl;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ProviderConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.ServiceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;public class AsyncOrderProviderStartApplication {private static Logger logger = LoggerFactory.getLogger(JavaCodeProviderStartApplication.class);public static void main(String[] args) throws IOException {ApplicationConfig application = new ApplicationConfig();application.setName("dubbo-provider");RegistryConfig registry = new RegistryConfig();registry.setAddress("nacos://127.0.0.1:8848");ProviderConfig providerConfig = new ProviderConfig();providerConfig.setFilter("traceId");ServiceConfig<AsyncOrderFacade> asyncOrder = new ServiceConfig<>();asyncOrder.setApplication(application);asyncOrder.setRegistry(registry);asyncOrder.setProvider(providerConfig);asyncOrder.setInterface(AsyncOrderFacade.class);asyncOrder.setRef(new AsyncOrderFacadeImpl());asyncOrder.export();System.out.println("---------------------------------");System.in.read();}
}

AsyncOrderFacadeImpl

package com.doudou.demo.service;import com.doudou.demo.api.AsyncOrderFacade;
import com.doudou.demo.po.OrderInfo;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.rpc.AsyncContext;
import org.apache.dubbo.rpc.RpcContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.math.BigDecimal;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@DubboService
public class AsyncOrderFacadeImpl implements AsyncOrderFacade {private Logger logger = LoggerFactory.getLogger(AsyncOrderFacadeImpl.class);// 构建线程池ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(1000, 1000, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));@Overridepublic OrderInfo queryOrderById(String id) {// 开启异步化操作模式,标识异步化模式开启AsyncContext asyncContext = RpcContext.startAsync();threadPoolExecutor.submit(() -> {// 同步queryOrderById方法所在的线程上下文信息到当前的子线程中asyncContext.signalContextSwitch();try {TimeUnit.MILLISECONDS.sleep(0);} catch (InterruptedException e) {throw new RuntimeException(e);}// 使用asyncContext将结果返回asyncContext.write(new OrderInfo("GeekDubbo", "服务方异步方式之RpcContext.startAsync#" + id, new BigDecimal(2000)));logger.info("-------------- end --------------");});return null;}
}

服务消费发

TraceConsumerFilter

package com.doudou.dubbo.demo;import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;import java.util.UUID;import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;@Activate(group = CONSUMER, order = -9999)
public class TraceConsumerFilter implements Filter {private Logger logger = LoggerFactory.getLogger(TraceConsumerFilter.class);public static final String TRACE_ID = "traceId";@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {String traceId = generateTraceId();logger.info("traceId:{}", traceId);MDC.put(TRACE_ID, traceId);invocation.setAttachment(TRACE_ID, traceId);return invoker.invoke(invocation);}/*** 生成一个唯一字符串*/private String generateTraceId() {return UUID.randomUUID().toString().replaceAll("-", "");}
}

META-INF/dubbo/org.apache.dubbo.rpc.Filter

traceId=com.doudou.dubbo.demo.TraceConsumerFilter

AsyncOrderFacadeConsumer

package com.doudou.dubbo.consumer;import com.alibaba.fastjson.JSON;
import com.doudou.demo.api.AsyncOrderFacade;
import com.doudou.demo.api.UserQueryFacade;
import com.doudou.demo.po.OrderInfo;
import org.apache.dubbo.config.*;
import org.apache.dubbo.config.annotation.DubboReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class AsyncOrderFacadeConsumer {private static Logger logger = LoggerFactory.getLogger(AsyncOrderFacadeConsumer.class);private static ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(400, 400, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));public static void main(String[] args) throws InterruptedException {ApplicationConfig applicationConfig = new ApplicationConfig();applicationConfig.setName("dubbo-demo-consumer");RegistryConfig registryConfig = new RegistryConfig();registryConfig.setAddress("nacos://127.0.0.1:8848/nacos");registryConfig.setCheck(false);registryConfig.setRegister(false);registryConfig.setTimeout(2000);ConsumerConfig consumerConfig = new ConsumerConfig();consumerConfig.setFilter("traceId");ReferenceConfig<AsyncOrderFacade> referenceConfig = new ReferenceConfig<>();referenceConfig.setApplication(applicationConfig);referenceConfig.setRegistry(registryConfig);referenceConfig.setConsumer(consumerConfig);referenceConfig.setInterface(AsyncOrderFacade.class);referenceConfig.setTimeout(60000);AsyncOrderFacade asyncOrderFacade = referenceConfig.get();OrderInfo orderInfo = asyncOrderFacade.queryOrderById(String.valueOf(1));logger.info("{} ---------------------------------\t\t\t\t\t\t orderInfo: {}", 1, JSON.toJSONString(orderInfo));}
}

相关文章:

  • Python项目实践:控制台银行系统与词频统计工具开发指南
  • 【project】--模拟搭建一个中小型校园网的网络平台
  • SpringBoot 常用注解通俗解释
  • 何恺明团队又发新作!!-用于物理推理的去噪哈密顿网络
  • Linux基础命令总结
  • Set的学习
  • 论文如何降低AIGC?(完整指南版)
  • 【Linux系统篇】:信号的生命周期---从触发到保存与捕捉的底层逻辑
  • 长途骑行装备攻略:VELO维乐 Angel Revo坐垫伴我畅享旅途
  • arcpy列表函数的应用
  • ClickHouse查询执行与优化
  • Linux基础篇、第4章_03系统磁盘高级管理LVM 逻辑卷管理器
  • 腾讯二面:TCC分布式事务 | 图解TCC|用Go语言实现一个TCC
  • java中的Selector详解
  • 高中数学联赛模拟试题精选第18套几何题
  • 前端职业发展:如何规划前端工程师的成长路径?
  • 二叉树层序遍历
  • React-Hook
  • Java基础第五章、面向对象程序设计
  • AIGC赋能智慧医疗:从影像诊断到个性化治疗的革命性突破
  • 上海市市管干部任职前公示:赵亮拟为地区区长人选
  • 屋顶上的阳光与火光:战争如何改变了加沙的能源格局
  • 天津外国语大学原校长修刚突发疾病去世,享年68岁
  • 演员孙俪:中年人没有脆弱的时间,学习胡曼黎不内耗
  • 文旅部副部长饶权出任国家文物局局长
  • 中国人民银行行长潘功胜会见世界银行行长彭安杰