隐式传递
隐式传递的应用
- 传递请求流水号,分布式应用中通过链路追踪号来全局检索日志
- 传递用户信息,以便不同系统在处理业务逻辑时可以获取用户层面的一些信息
- 传递凭证信息,以便不同系统可以有选择性地取出一些数据做业务逻辑,比如 Cookie、Token 等
自定义过滤器的四个步骤
- 创建一个自定义的类,并实现 org.apache.dubbo.rpc.Filter 接口
- 在自定义类中的 invoke 方法中实现传递逻辑,提供方过滤器从 invocation 取出 traceId 并设置到 ClientAttachment、MDC 中
- 将自定义的类路径添加到 META-INF/dubbo/org.apache.dubbo.rpc.Filter 文件中,并取个别名
- 将过滤器配置到应用中
服务提供者
ReqNoProviderFilter
package com.doudou.demo.filter;import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;import java.util.Map;
import java.util.Objects;
import java.util.UUID;import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;@Activate(group = PROVIDER, order = -9999)
public class ReqNoProviderFilter implements Filter {private Logger logger = LoggerFactory.getLogger(ReqNoProviderFilter.class);public static final String TRACE_ID = "traceId";@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {Map<String, Object> objectAttachments = invocation.getObjectAttachments();String traceId = Objects.nonNull(objectAttachments) ? (String) objectAttachments.get(TRACE_ID) : null;logger.info("traceId:{}", traceId);if (traceId == null) {traceId = generateTraceId();logger.info("traceId:{}", traceId);}RpcContext.getServerContext().setAttachment(TRACE_ID, traceId);MDC.put(TRACE_ID, traceId);return invoker.invoke(invocation);}private String generateTraceId() {return UUID.randomUUID().toString().replaceAll("-", "");}
}
META-INF/dubbo/org.apache.dubbo.rpc.Filter
traceId=com.doudou.demo.filter.ReqNoProviderFilter
AsyncOrderProviderStartApplication
package com.doudou.demo;import com.doudou.demo.api.AsyncOrderFacade;
import com.doudou.demo.api.UserQueryFacade;
import com.doudou.demo.api.UserQueryFacade3;
import com.doudou.demo.service.AsyncOrderFacadeImpl;
import com.doudou.demo.service.AsyncUserQueryFacade2Impl;
import com.doudou.demo.service.AsyncUserQueryFacadeImpl;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ProviderConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.ServiceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;public class AsyncOrderProviderStartApplication {private static Logger logger = LoggerFactory.getLogger(JavaCodeProviderStartApplication.class);public static void main(String[] args) throws IOException {ApplicationConfig application = new ApplicationConfig();application.setName("dubbo-provider");RegistryConfig registry = new RegistryConfig();registry.setAddress("nacos://127.0.0.1:8848");ProviderConfig providerConfig = new ProviderConfig();providerConfig.setFilter("traceId");ServiceConfig<AsyncOrderFacade> asyncOrder = new ServiceConfig<>();asyncOrder.setApplication(application);asyncOrder.setRegistry(registry);asyncOrder.setProvider(providerConfig);asyncOrder.setInterface(AsyncOrderFacade.class);asyncOrder.setRef(new AsyncOrderFacadeImpl());asyncOrder.export();System.out.println("---------------------------------");System.in.read();}
}
AsyncOrderFacadeImpl
package com.doudou.demo.service;import com.doudou.demo.api.AsyncOrderFacade;
import com.doudou.demo.po.OrderInfo;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.rpc.AsyncContext;
import org.apache.dubbo.rpc.RpcContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.math.BigDecimal;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@DubboService
public class AsyncOrderFacadeImpl implements AsyncOrderFacade {private Logger logger = LoggerFactory.getLogger(AsyncOrderFacadeImpl.class);ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(1000, 1000, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));@Overridepublic OrderInfo queryOrderById(String id) {AsyncContext asyncContext = RpcContext.startAsync();threadPoolExecutor.submit(() -> {asyncContext.signalContextSwitch();try {TimeUnit.MILLISECONDS.sleep(0);} catch (InterruptedException e) {throw new RuntimeException(e);}asyncContext.write(new OrderInfo("GeekDubbo", "服务方异步方式之RpcContext.startAsync#" + id, new BigDecimal(2000)));logger.info("-------------- end --------------");});return null;}
}
服务消费发
TraceConsumerFilter
package com.doudou.dubbo.demo;import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;import java.util.UUID;import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;@Activate(group = CONSUMER, order = -9999)
public class TraceConsumerFilter implements Filter {private Logger logger = LoggerFactory.getLogger(TraceConsumerFilter.class);public static final String TRACE_ID = "traceId";@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {String traceId = generateTraceId();logger.info("traceId:{}", traceId);MDC.put(TRACE_ID, traceId);invocation.setAttachment(TRACE_ID, traceId);return invoker.invoke(invocation);}private String generateTraceId() {return UUID.randomUUID().toString().replaceAll("-", "");}
}
META-INF/dubbo/org.apache.dubbo.rpc.Filter
traceId=com.doudou.dubbo.demo.TraceConsumerFilter
AsyncOrderFacadeConsumer
package com.doudou.dubbo.consumer;import com.alibaba.fastjson.JSON;
import com.doudou.demo.api.AsyncOrderFacade;
import com.doudou.demo.api.UserQueryFacade;
import com.doudou.demo.po.OrderInfo;
import org.apache.dubbo.config.*;
import org.apache.dubbo.config.annotation.DubboReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class AsyncOrderFacadeConsumer {private static Logger logger = LoggerFactory.getLogger(AsyncOrderFacadeConsumer.class);private static ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(400, 400, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));public static void main(String[] args) throws InterruptedException {ApplicationConfig applicationConfig = new ApplicationConfig();applicationConfig.setName("dubbo-demo-consumer");RegistryConfig registryConfig = new RegistryConfig();registryConfig.setAddress("nacos://127.0.0.1:8848/nacos");registryConfig.setCheck(false);registryConfig.setRegister(false);registryConfig.setTimeout(2000);ConsumerConfig consumerConfig = new ConsumerConfig();consumerConfig.setFilter("traceId");ReferenceConfig<AsyncOrderFacade> referenceConfig = new ReferenceConfig<>();referenceConfig.setApplication(applicationConfig);referenceConfig.setRegistry(registryConfig);referenceConfig.setConsumer(consumerConfig);referenceConfig.setInterface(AsyncOrderFacade.class);referenceConfig.setTimeout(60000);AsyncOrderFacade asyncOrderFacade = referenceConfig.get();OrderInfo orderInfo = asyncOrderFacade.queryOrderById(String.valueOf(1));logger.info("{} ---------------------------------\t\t\t\t\t\t orderInfo: {}", 1, JSON.toJSONString(orderInfo));}
}