当前位置: 首页 > news >正文

Nacos源码—1.Nacos服务注册发现分析一

大纲

1.客户端如何发起服务注册 + 发送服务心跳

2.服务端如何处理客户端的服务注册请求

3.注册服务—如何实现高并发支撑上百万服务注册

4.内存注册表—如何处理注册表的高并发读写冲突

1.客户端如何发起服务注册 + 发送服务心跳

(1)Nacos客户端项目启动时为什么会自动注册服务

(2)Nacos客户端通过什么方式注册服务

(3)Nacos客户端如何发送服务心跳

(1)Nacos客户端项目启动时为什么会自动注册服务

Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。引入spring-cloud-starter-alibaba-nacos-discovery后,才自动注册服务。查看这个依赖包中的spring.factories文件,发现有一些Configuration类。

Spring Boot启动时会扫描spring.factories文件,然后创建里面的配置类。

在spring.pactories文件中,与注册相关的类就是:NacosServiceRegistryAutoConfiguration这个Nacos服务注册自动配置类。

Nacos服务注册自动配置类NacosServiceRegistryAutoConfiguration如下,该配置类创建了三个Bean。

第一个Bean:NacosServiceRegistry

这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。

第二个Bean:NacosRegistration

这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。

第三个Bean:NacosAutoServiceRegistration

这个Bean在创建时,会传入NacosServiceRegistry和NacosRegistration两个Bean。然后该Bean继承了AbstractAutoServiceRegistration抽象类。该抽象类实现了ApplicationListener接口,所以项目启动时便是利用了Spring的监听事件来实现自动注册服务的。因为在Spring容器启动的最后会执行finishRefresh()方法,然后会发布一个事件,该事件会触发调用onApplicationEvent()方法。

调用AbstractAutoServiceRegistration的onApplicationEvent()方法时,首先会调用AbstractAutoServiceRegistration的bind()方法,然后调用AbstractAutoServiceRegistration的start()方法,接着调用AbstractAutoServiceRegistration的register()方法发起注册,也就是调用this.serviceRegistry的register()方法完成服务注册的具体工作。

其中,AbstractAutoServiceRegistration的serviceRegistry属性,是在服务注册自动配置类NacosServiceRegistryAutoConfiguration,创建第三个Bean—NacosAutoServiceRegistration时,通过传入其创建的第一个Bean—NacosServiceRegistry进行赋值的。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {@Beanpublic NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {//传入NacosDiscoveryProperties作为参数return new NacosServiceRegistry(nacosDiscoveryProperties);}@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {//传入NacosDiscoveryProperties作为参数return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);}@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {//传入NacosServiceRegistry和NacosRegistration作为参数return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);}
}@ConfigurationProperties("spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties {//nacos discovery server address.private String serverAddr;//the nacos authentication username.private String username;//the nacos authentication password.private String password;//namespace, separation registry of different environments.private String namespace;//service name to registry.@Value("${spring.cloud.nacos.discovery.service:${spring.application.name:}}")private String service;//cluster name for nacos.private String clusterName = "DEFAULT";//group name for nacos.private String group = "DEFAULT_GROUP";//The ip address your want to register for your service instance, needn't to set it if the auto detect ip works well.private String ip;//The port your want to register for your service instance, needn't to set it if the auto detect port works well.private int port = -1;//Heart beat interval. Time unit: millisecond.private Integer heartBeatInterval;//Heart beat timeout. Time unit: millisecond.private Integer heartBeatTimeout;//If instance is ephemeral.The default value is true.private boolean ephemeral = true;...
}public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {...private NacosRegistration registration;public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {super(serviceRegistry, autoServiceRegistrationProperties);this.registration = registration;}...
}public abstract class AbstractAutoServiceRegistration<R extends Registration>implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {...private final ServiceRegistry<R> serviceRegistry;private AutoServiceRegistrationProperties properties;protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry, AutoServiceRegistrationProperties properties) {this.serviceRegistry = serviceRegistry;this.properties = properties;}...@Override@SuppressWarnings("deprecation")public void onApplicationEvent(WebServerInitializedEvent event) {bind(event);}public void bind(WebServerInitializedEvent event) {ApplicationContext context = event.getApplicationContext();if (context instanceof ConfigurableWebServerApplicationContext) {if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {return;}}this.port.compareAndSet(0, event.getWebServer().getPort());this.start();}public void start() {if (!isEnabled()) {if (logger.isDebugEnabled()) {logger.debug("Discovery Lifecycle disabled. Not starting");}return;}//only initialize if nonSecurePort is greater than 0 and it isn't already running//because of containerPortInitializer belowif (!this.running.get()) {this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));//发起注册register();if (shouldRegisterManagement()) {registerManagement();}this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));this.running.compareAndSet(false, true);}}protected void register() {//调用创建NacosAutoServiceRegistration时传入的NacosServiceRegistry实例的register()方法this.serviceRegistry.register(getRegistration());}...
}public class NacosServiceRegistry implements ServiceRegistry<Registration> {private final NacosDiscoveryProperties nacosDiscoveryProperties;public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {this.nacosDiscoveryProperties = nacosDiscoveryProperties;}@Overridepublic void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}NamingService namingService = namingService();String serviceId = registration.getServiceId();String group = nacosDiscoveryProperties.getGroup();Instance instance = getNacosInstanceFromRegistration(registration);try {//把当前的服务实例注册到Nacos中namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());} catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);//rethrow a RuntimeException if the registration is failed.//issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132rethrowRuntimeException(e);}}private NamingService namingService() {return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());}...
}

Nacos客户端项目启动时自动触发服务实例注册的流程总结:Spring监听器调用onApplicationEvent()方法 -> bind()方法 -> start()方法 -> register()方法,最后register()方法会调用serviceRegistry属性的register()方法进行注册。

整个流程具体来说就是:首先通过spring.factories文件,找到一个注册相关的Configuration配置类,这个配置类里面定义了三个Bean对象。创建第三个Bean对象时,需要第一个、第二个Bean对象作为参数传进去。第一个Bean对象里面就有真正进行服务注册的register()方法,并且第一个Bean对象会赋值给第三个Bean对象中的serviceRegistry属性。在第三个Bean对象的父类会实现Spring的监听器方法。所以在Spring容器启动时会发布监听事件,从而触发执行Nacos注册逻辑。

(2)Nacos客户端通过什么方式注册服务

项目启动时是通过NacosServiceRegistry的register()方法发起服务注册的,然后会调用NacosNamingService的registerInstance()方法注册服务实例,接着调用NamingProxy的registerService()方法组装参数发起服务注册请求,接着调用NamingProxy的reqApi()方法向Nacos服务端发起服务注册请求,也就是调用NamingProxy的callServer()方法向Nacos服务端发送注册请求。

在NamingProxy的callServer()方法中,首先会调用NacosRestTemplate的exchangeForm()方法发起HTTP请求,然后会调用this.requestClient()的execute()方法执行HTTP请求的发送,接着会调用DefaultHttpClientRequest的execute()方法处理请求的发送,也就是通过Apache的CloseableHttpClient组件来处理发送HTTP请求。

注意:NacosServiceRegistry是属于nacos-discovery包中的类,NacosNamingService是属于nacos-client包中的类。

public class NacosServiceRegistry implements ServiceRegistry<Registration> {private final NacosDiscoveryProperties nacosDiscoveryProperties;public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {this.nacosDiscoveryProperties = nacosDiscoveryProperties;}@Overridepublic void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}NamingService namingService = namingService();//服务名称String serviceId = registration.getServiceId();//服务分组String group = nacosDiscoveryProperties.getGroup();//服务实例,包含了IP、Port等信息Instance instance = getNacosInstanceFromRegistration(registration);try {//调用NacosNamingService.registerInstance()方法把当前的服务实例注册到Nacos中namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());} catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);rethrowRuntimeException(e);}}private NamingService namingService() {return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());}private Instance getNacosInstanceFromRegistration(Registration registration) {Instance instance = new Instance();instance.setIp(registration.getHost());instance.setPort(registration.getPort());instance.setWeight(nacosDiscoveryProperties.getWeight());instance.setClusterName(nacosDiscoveryProperties.getClusterName());instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());instance.setMetadata(registration.getMetadata());instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());return instance;}...
}public class NacosNamingService implements NamingService {private BeatReactor beatReactor;private NamingProxy serverProxy;...@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);//获取分组服务名字String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//判断要注册的服务实例是否是临时实例if (instance.isEphemeral()) {//如果是临时实例,则构建心跳信息BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);//添加心跳信息beatReactor.addBeatInfo(groupedServiceName, beatInfo);}//接下来调用NamingProxy的注册方法registerService()来注册服务实例serverProxy.registerService(groupedServiceName, groupName, instance);}...
}public class NamingProxy implements Closeable {private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();...//register a instance to service with specified instance properties.//@param serviceName name of service//@param groupName   group of service//@param instance    instance to registerpublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance);//创建一个Map组装注册请求参数final Map<String, String> params = new HashMap<String, String>(16);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));//下面UtilAndComs常量类拼装的请求url是: /Nacos/v1/ns/instancereqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);}public String reqApi(String api, Map<String, String> params, String method) throws NacosException {return reqApi(api, params, Collections.EMPTY_MAP, method);}public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {return reqApi(api, params, body, getServerList(), method);}//Request api.public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {params.put(CommonParams.NAMESPACE_ID, getNamespaceId());if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {throw new NacosException(NacosException.INVALID_PARAM, "no server available");}NacosException exception = new NacosException();if (StringUtils.isNotBlank(nacosDomain)) {for (int i = 0; i < maxRetry; i++) {try {return callServer(api, params, body, nacosDomain, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);}}}} else {Random random = new Random(System.currentTimeMillis());int index = random.nextInt(servers.size());for (int i = 0; i < servers.size(); i++) {String server = servers.get(index);try {return callServer(api, params, body, server, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", server, e);}}index = (index + 1) % servers.size();}}NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg());throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());}//Call server.public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {long start = System.currentTimeMillis();long end = 0;injectSecurityInfo(params);Header header = builderHeader();String url;if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {url = curServer + api;} else {if (!IPUtil.containsPort(curServer)) {curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;}url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;}try {//调用NacosRestTemplate.exchangeForm()方法发起HTTP请求HttpRestResult<String> restResult = nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);end = System.currentTimeMillis();MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start);if (restResult.ok()) {return restResult.getData();}if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {return StringUtils.EMPTY;}throw new NacosException(restResult.getCode(), restResult.getMessage());} catch (Exception e) {NAMING_LOGGER.error("[NA] failed to request", e);throw new NacosException(NacosException.SERVER_ERROR, e);}}...
}public class NacosRestTemplate extends AbstractNacosRestTemplate {private final HttpClientRequest requestClient;...//Execute the HTTP method to the given URI template, writing the given request entity to the request, and returns the response as {@link HttpRestResult}.public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception {RequestHttpEntity requestHttpEntity = new RequestHttpEntity(header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);return execute(url, httpMethod, requestHttpEntity, responseType);}private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception {URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());if (logger.isDebugEnabled()) {logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());}ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);HttpClientResponse response = null;try {response = this.requestClient().execute(uri, httpMethod, requestEntity);return responseHandler.handle(response);} finally {if (response != null) {response.close();}}}private HttpClientRequest requestClient() {if (CollectionUtils.isNotEmpty(interceptors)) {if (logger.isDebugEnabled()) {logger.debug("Execute via interceptors :{}", interceptors);}return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());}return requestClient;}...
}public class DefaultHttpClientRequest implements HttpClientRequest {private final CloseableHttpClient client;public DefaultHttpClientRequest(CloseableHttpClient client) {this.client = client;}@Overridepublic HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception {HttpRequestBase request = build(uri, httpMethod, requestHttpEntity);//通过Apache的CloseableHttpClient组件执行HTTP请求CloseableHttpResponse response = client.execute(request);return new DefaultClientHttpResponse(response);}...
}

由此可知:Nacos客户端是通过HTTP的方式往Nacos服务端发起服务注册的,Nacos服务端会提供服务注册的API接口给Nacos客户端进行HTTP调用,Nacos官方Open API文档中注册服务实例的接口说明如下:

(3)Nacos客户端如何发送服务心跳

调用NacosNamingService的registerInstance()方法注册服务实例时,在调用NamingProxy的registerService()方法来注册服务实例之前,会根据注册的服务实例是临时实例来构建和添加心跳信息到beatReactor,也就是调用BeatReactor的buildBeatInfo()方法和addBeatInfo()方法。

在BeatReactor的buildBeatInfo()方法中,会通过beatInfo的setPeriod()方法设置心跳间隔时间,默认是5秒。

在BeatReactor的addBeatInfo()方法中,倒数第二行会开启一个延时执行的任务,执行的任务是根据心跳信息BeatInfo封装的BeatTask。该BeatTask任务会交给BeatReactor的ScheduledExecutorService来执行,并通过beatInfo的getPeriod()方法获取延时执行的时间为5秒。

在BeatTask的run()方法中,就会调用NamingProxy的sendBeat()方法发送心跳请求给Nacos服务端,也就是调用NamingProxy的reqApi()方法向Nacos服务端发起心跳请求。如果返回的心跳响应表明服务实例不存在则重新发起服务实例注册请求。无论心跳响应如何,继续根据心跳信息BeatInfo封装一个BeatTask任务,然后将该任务交给线程池ScheduledExecutorService来延时5秒执行。

public class NacosServiceRegistry implements ServiceRegistry<Registration> {private final NacosDiscoveryProperties nacosDiscoveryProperties;public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {this.nacosDiscoveryProperties = nacosDiscoveryProperties;}@Overridepublic void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}NamingService namingService = namingService();//服务名称String serviceId = registration.getServiceId();//服务分组String group = nacosDiscoveryProperties.getGroup();//服务实例,包含了IP、Port等信息Instance instance = getNacosInstanceFromRegistration(registration);try {//调用NacosNamingService.registerInstance()方法把当前的服务实例注册到Nacos中namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());} catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);rethrowRuntimeException(e);}}private NamingService namingService() {return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());}private Instance getNacosInstanceFromRegistration(Registration registration) {Instance instance = new Instance();instance.setIp(registration.getHost());instance.setPort(registration.getPort());instance.setWeight(nacosDiscoveryProperties.getWeight());instance.setClusterName(nacosDiscoveryProperties.getClusterName());instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());instance.setMetadata(registration.getMetadata());instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());return instance;}...
}public class NacosNamingService implements NamingService {private BeatReactor beatReactor;private NamingProxy serverProxy;...@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);//获取分组服务名字String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//判定要注册的服务实例是否是临时实例if (instance.isEphemeral()) {//如果是临时实例,则构建心跳信息BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);//添加心跳信息beatReactor.addBeatInfo(groupedServiceName, beatInfo);}//接下来调用NamingProxy的注册方法registerService()来注册服务实例serverProxy.registerService(groupedServiceName, groupName, instance);}...
}public class BeatReactor implements Closeable {...//Build new beat information.public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(groupedServiceName);beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);//getInstanceHeartBeatInterval()的返回值是5000beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());return beatInfo;}...
}@JsonInclude(Include.NON_NULL)
public class Instance implements Serializable {...public long getInstanceHeartBeatInterval() {//Constants.DEFAULT_HEART_BEAT_INTERVAL,默认是5000return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL);}...
}public class BeatReactor implements Closeable {private final ScheduledExecutorService executorService;private final NamingProxy serverProxy;public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();public BeatReactor(NamingProxy serverProxy) {this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);}public BeatReactor(NamingProxy serverProxy, int threadCount) {this.serverProxy = serverProxy;this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});}...//Add beat information.public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);//开启一个延时执行的任务,执行的任务是BeatTaskexecutorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());}...class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}@Overridepublic void run() {//判断是否需要停止if (beatInfo.isStopped()) {return;}//获取下一次执行的时间,同样还是5slong nextTime = beatInfo.getPeriod();try {//调用NamingProxy.sendBeat()方法发送心跳请求给Nacos服务端JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}//获取Nacos服务端返回的code状态码int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}//如果code = RESOURCE_NOT_FOUND,没有找到资源,那么表示之前注册的信息,已经被Nacos服务端移除了if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {//然后重新组装参数,重新发起注册请求Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try { //调用NamingProxy.registerService()方法发送服务实例注册请求到Nacos服务端serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());}//把beatInfo又重新放入延迟任务当中,并且还是5秒,所以一直是个循环的状态executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);}}
}public class NamingProxy implements Closeable {...//Send beat.public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {    if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}Map<String, String> params = new HashMap<String, String>(8);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result);}...
}

由此可见,在客户端在发起服务注册期间,会开启一个心跳健康检查的延时任务,这个任务每间隔5s执行一次。任务内容就是通过HTTP请求调用发送Nacos提供的服务实例心跳接口。Nacos官方Open API文档中服务实例心跳接口说明如下:

如下是客户端发起服务注册 + 发送服务心跳的整个流程图:

相关文章:

  • 蓝桥杯2025年第十六届省赛真题-可分解的正整数
  • Docker镜像仓库技术深度解析
  • 【环境配置】Mac电脑安装运行R语言教程 2025年
  • 半监督学习与强化学习的结合:新兴的智能训练模式
  • 【计算机视觉】Bayer Pattern与Demosaic算法详解:从传感器原始数据到彩色图像
  • 《计算机视觉度量:从特征描述到深度学习》—图片多模态CLIP,BLIP2,DINOv2特征提取综述
  • SDK游戏盾、高防IP、高防CDN三者的区别与选型指南
  • Profinet 从站转 EtherNet/IP 从站网关
  • OpenCV计算机视觉实战(2)——环境搭建与OpenCV简介
  • MongoDB的增删改查操作
  • 反向代理、负载均衡与镜像流量:原理剖析、区别对比及 Nginx 配置实践
  • 软件测试实验报告3 | 自动化测试工具的基本操作
  • 使用阿里云 CDN 保护网站真实 IP:完整配置指南
  • 【分布式系统中的“瑞士军刀”_ Zookeeper】三、Zookeeper 在实际项目中的应用场景与案例分析
  • 换张电话卡能改变IP属地吗?一文解读
  • 在 C# .NET 中驾驭 JSON:使用 Newtonsoft.Json 进行解析与 POST 请求实战
  • Java基础361问第16问——枚举为什么导致空指针?
  • 第十三章-PHP MySQL扩展
  • DeepSeek+即梦:AI视频创作从0到1全突破
  • 深度学习任务评估指标
  • 日韩 “打头阵”与美国贸易谈判,汽车、半导体产业忧虑重重
  • 民生访谈|宝妈宝爸、毕业生、骑手……上海如何为不同人群提供就业保障
  • 当隐身13年的北小京决定公开身份 ,专业戏剧评论依然稀缺
  • 专访|伊朗学者:美伊核谈不只是改革派立场,但伊朗不信任美国
  • 商务部:将打造一批国际消费集聚区和入境消费友好商圈
  • 商务部:汽车流通消费改革试点正在加快推进