Nacos 注册中心 - 健康检查机制源码
目录
1. 健康检查介绍
2. 客户端健康检查
2.1 临时实例的健康检查
2.2 永久实例的健康检查
3. 服务端健康检查
3.1 临时实例的健康检查
3.2 永久实例服务端健康检查
1. 健康检查介绍
当一个服务实例注册到 Nacos 中后,其他服务就可以从 Nacos 中查询出该服务实例信息,就可以调用使用了。
然而服务提供者如果此时挂掉了,此时其他服务拿到信息后就会调用不通,所以Nacos中的服务信息应该有一个更新机制(即删除掉挂掉的服务)
那么服务注册信息应该如何维护呢,那就是判断某个服务实例是否有问题,如果检测到服务实例出现问题了就将他剔除掉。
那么如何判断 服务实例 是否有问题呢?这就是健康检查要做的事情,即检查服务实例的健康状态。不健康则剔除下线。
下面看看客户端和服务端为了实现健康检查功能都各自做了哪些事情。
2. 客户端健康检查
2.1 临时实例的健康检查
从 Nacos 2.x 开始,临时实例的服务注册由原来的 HTTP 更换为了 GRPC 长连接方式。Nacos Client 和 Nacos Server 之间建立的 RPC 长连接,服务注册、服务取消注册等接口都是通过 GRPC 消息与服务端通信的。
GRPC 长连接是一直存在的,只要连接一直存在就代表Nacos Client 和 Nacos Server 之间的连接是通的,Nacos Client 则一直在线。如果Nacos Client 由于网络问题等其他问题挂掉了,那么这条长连接也会断开连接。
那么服务实例如何算健康呢,就是长连接一直存在没有断那就算健康的。
如果连接断掉了,那么该客户端上注册的全部服务实例都是不健康的了。
GRPC 长连接如果想一直保证连接状态,就需要定时发送心跳包,以确保连接处于活动的状态。否则一段时间不操作的话就会自动断开连接。
接下来看看 NacosClient 如何开启 RPC 长连接的
NacosClient 操作注册中心的 API 是通过 NamingService 进行的
在 NacosNamingService 的构造器中调用了 init 初始化方法:init 方法最后
进入最后一行代码:
再看最后 new NamingGrpcClientProxy 的源码:
public class NamingGrpcClientProxy {public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) {// 省略部分代码// 创建 RPC Clientthis.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);// 启动 RPC Clientstart(serverListFactory, serviceInfoHolder);}private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {rpcClient.serverListFactory(serverListFactory);rpcClient.registerConnectionListener(redoService);rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));// 启动rpcClient.start();}
看看 RpcClient.start 做了什么
public abstract class RpcClient {protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();public final void start() {// 省略部分代码// connection event consumer.clientEventExecutor.submit(() -> {while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {ConnectionEvent take = eventLinkedBlockingQueue.take();if (take.isConnected()) {notifyConnected();} else if (take.isDisConnected()) {notifyDisConnected();}}});Connection connectToServer = null;// 状态设置为启动中rpcClientStatus.set(RpcClientStatus.STARTING);int startUpRetryTimes = rpcClientConfig.retryTimes();while (startUpRetryTimes > 0 && connectToServer == null) {startUpRetryTimes--;ServerInfo serverInfo = nextRpcServer();// 建立连接connectToServer = connectToServer(serverInfo);}this.currentConnection = connectToServer;// 状态设置为 运行中rpcClientStatus.set(RpcClientStatus.RUNNING);eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));// 省略部分代码}}
省略了一些代码,这次只关注核心的两个点
1. 找到下一个 RPC Server 建立连接
因为 Nacos 支持集群部署,此时的 RPC Server List 其实就是这些集群节点。也就是找到集群下一个节点建议连接,如果连接失败就走到下一轮循环再获取到下一个节点继续连接(再重试次数内循环)
2. eventLinkedBlockingQueue 队列中加入一项
eventLinkedBlockingQueue 队列里存的是 ConnectionEvent,ConnectionEvent 代表一个连接事件,事件有 已连接事件、断开连接事件。
public class ConnectionEvent {public static final int CONNECTED = 1;public static final int DISCONNECTED = 0;int eventType;public ConnectionEvent(int eventType) {this.eventType = eventType;}public boolean isConnected() {return eventType == CONNECTED;}public boolean isDisConnected() {return eventType == DISCONNECTED;}
}
可见上面的源码,连接建立成功后,就会往队列压入一个 已连接事件 CONNECTED
队列事件的消费者在哪里呢?
便是在 start 方法的最开头定义的,while 循环不断从队列中获取到数据然后根据事件类型,进行各自的通知。
public final void start() {// 省略部分代码// connection event consumer.clientEventExecutor.submit(() -> {while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {ConnectionEvent take = eventLinkedBlockingQueue.take();if (take.isConnected()) {// 通知连接notifyConnected();} else if (take.isDisConnected()) {// 通知断开连接notifyDisConnected();}}});}
notifyConnected 和 notifyDisConnected 实现差不多,所以这里只看一个的实现源码。
protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();
protected void notifyConnected() {// 省略部分源码if (connectionEventListeners.isEmpty()) {return;}// 循环全部监听器 一个个回调for (ConnectionEventListener connectionEventListener : connectionEventListeners) { connectionEventListener.onConnected(); }
}
connectionEventListeners 里的数据是什么时候加入的呢?
那就是在 NamingGrpcClientProxy.start 方法
public class NamingGrpcRedoService implements ConnectionEventListener {private volatile boolean connected = false;@Overridepublic void onConnected() {// 建立连接,改变连接状态connected = true;}@Overridepublic void onDisConnect() {connected = false;// 将 redoService 上的全部缓存数据一改synchronized (registeredInstances) {registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));}synchronized (subscribes) {subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));}}
当GRPC 长连接断开后就会进入 onDisConnect 事件回调中,这里改变了setRegistered 状态
上篇说过,redoService的作用 Nacos 注册中心 - 服务注册源码
此时会走入服务卸载流程。
2.2 永久实例的健康检查
永久实例客户端只负责提交一个请求即完成了全部操作。
健康检查工作由 服务端做。
3. 服务端健康检查
在收到客户端建立连接事件回调后,会调用 init 方法
public class IpPortBasedClient extends AbstractClient {public void init() {if (ephemeral) {beatCheckTask = new ClientBeatCheckTaskV2(this);HealthCheckReactor.scheduleCheck(beatCheckTask);} else {healthCheckTaskV2 = new HealthCheckTaskV2(this);HealthCheckReactor.scheduleCheck(healthCheckTaskV2);}}
}
如果当前是临时实例:使用 ClientBeatCheckTaskV2 处理健康检查
如果当前是永久实例:使用 HealthCheckTaskV2处理健康检查
然后将任务放到线程池中执行定时执行
3.1 临时实例的健康检查
看看 ClientBeatCheckTaskV2 如何实现:
public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {// 省略部分代码private final IpPortBasedClient client;private final InstanceBeatCheckTaskInterceptorChain interceptorChain;// 执行健康检查@Overridepublic void doHealthCheck() {// 拿到当前客户端上注册的全部服务Collection<Service> services = client.getAllPublishedService();for (Service each : services) {HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(each);// 将全部服务用拦截器链一个个执行interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));} }@Overridepublic void run() {doHealthCheck();}
}
看看拦截器链如何实现
这里的拦截器链是一个典型的责任链模式
public abstract class AbstractNamingInterceptorChain<T extends Interceptable>implements NacosNamingInterceptorChain<T> {@Overridepublic void doInterceptor(T object) {for (NacosNamingInterceptor<T> each : interceptors) {if (!each.isInterceptType(object.getClass())) {continue;}// 当前是责任节点,直接由该责任节点处理if (each.intercept(object)) {object.afterIntercept();// 不往后执行了return;}}// 如果没有责任节点执行,就调用 passInterceptobject.passIntercept();}
}
首先第一个问题:拦截器都是些什么呢?
可见具体的有三个实现类
这三个类代表者三个地方的判断,判断是否开启了 健康心跳检查功能? 如果没开,那就被拦截了呀,就走不到后面的心跳检查代码了
ServiceEnableBeatCheckInterceptor
从 Service 的元数据上判断
public class ServiceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {@Overridepublic boolean intercept(InstanceBeatCheckTask object) {NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);// 获取当前 Service 的元数据Optional<ServiceMetadata> metadata = metadataManager.getServiceMetadata(object.getService());// 如果元数据存在,并且其数据 enableClientBeat 配置了if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {// 直接取 enableClientBeat 值return Boolean.parseBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT));}return false;}}
InstanceBeatCheckResponsibleInterceptor
并不是一个客户端要负责集群中全部节点的心跳处理的,而是只负责自己注册的。
public class InstanceBeatCheckResponsibleInterceptor extends AbstractBeatCheckInterceptor {@Overridepublic boolean intercept(InstanceBeatCheckTask object) {// 是否是当前责任节点return !ApplicationUtils.getBean(DistroMapper.class).responsible(object.getClient().getResponsibleId());}}
InstanceEnableBeatCheckInterceptor
这个就是实例级别的健康检查判断
public class InstanceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {@Overridepublic boolean intercept(InstanceBeatCheckTask object) {NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);HealthCheckInstancePublishInfo instance = object.getInstancePublishInfo();// 获取到实例上的元数据Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getMetadataId());// 从元数据上取if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {// 元数据存在取该值return ConvertUtils.toBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());}// 从 extendDatum 中取数据if (instance.getExtendDatum().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {return ConvertUtils.toBoolean(instance.getExtendDatum().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());}return false;}}
如果都没被上面三个拦截器拦截掉,那就代表 当前实例是 开启了 健康检查,所以后面就要开始进行 检查操作 了
检查操作由 object.passIntercept(); 做
object.passIntercept(); 是什么呢?
就是刚才的 开始拦截方法最后一行
@Override
public void doInterceptor(T object) {for (NacosNamingInterceptor<T> each : interceptors) {if (!each.isInterceptType(object.getClass())) {continue;}// 拦截器是否拦截?if (each.intercept(object)) {object.afterIntercept();// 拦截了,直接返回return;}}// 未被拦截到object.passIntercept();
}
object 是什么?
就是之前传过来的 InstanceBeatCheckTask
接下来看看 InstanceBeatCheckTask
public class InstanceBeatCheckTask implements Interceptable {// 全部检查项目private static final List<InstanceBeatChecker> CHECKERS = new LinkedList<>();private final IpPortBasedClient client;private final Service service;private final HealthCheckInstancePublishInfo instancePublishInfo;static {// 添加检查项目CHECKERS.add(new UnhealthyInstanceChecker());CHECKERS.add(new ExpiredInstanceChecker());// SPI 机制添加CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));}@Overridepublic void passIntercept() {// 遍历全部检查项目for (InstanceBeatChecker each : CHECKERS) {// 开始检查each.doCheck(client, service, instancePublishInfo);}}
}
全部检查项目 都是什么呢?
下面分别介绍
UnhealthyInstanceChecker
不健康实例检查器
public class UnhealthyInstanceChecker implements InstanceBeatChecker {// 开始做检查@Overridepublic void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {if (instance.isHealthy() && isUnhealthy(service, instance)) {// 当前实例不健康了 -> 改变健康状态为 不健康changeHealthyStatus(client, service, instance);}}// 判断实例是否健康private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {// 获取超时时间 默认 15 秒;可通过配置更改。long beatTimeout = getTimeout(service, instance);// 当前时间距离上一次发送心跳包时间 超过了 规定的超时时间 则返回 true,代表节点不健康了return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;}// 改变健康状态private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {instance.setHealthy(false);// 省略部分代码}}
ExpiredInstanceChecker
过期实例检查器
public class ExpiredInstanceChecker implements InstanceBeatChecker {@Overridepublic void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();if (expireInstance && isExpireInstance(service, instance)) {// 如果实例过期了,则直接剔除实例deleteIp(client, service, instance);}}private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {// 获取超时时间 默认 30 秒;可通过配置更改。long deleteTimeout = getTimeout(service, instance);// 当前时间距离上一次发送心跳包时间 超过了 规定的超时时间 则返回 true,代表节点过期了,需要进行节点剔除操作return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;}/*** 服务直接剔除掉*/private void deleteIp(Client client, Service service, InstancePublishInfo instance) {client.removeServiceInstance(service);// 客户端下线NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));// 元数据改变NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true));// 注销实例NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",false, DeregisterInstanceReason.HEARTBEAT_EXPIRE, service.getNamespace(), service.getGroup(),service.getName(), instance.getIp(), instance.getPort()));}
3.2 永久实例服务端健康检查
永久实例的健康检查是服务端主动探测方式,服务端定时外部请求客户端来看是否健康。
public class IpPortBasedClient extends AbstractClient {public void init() {if (ephemeral) {beatCheckTask = new ClientBeatCheckTaskV2(this);HealthCheckReactor.scheduleCheck(beatCheckTask);} else {healthCheckTaskV2 = new HealthCheckTaskV2(this);HealthCheckReactor.scheduleCheck(healthCheckTaskV2);}}
}
入口类是 HealthCheckTaskV2
public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealthCheckTask {// 省略部分代码private final IpPortBasedClient client;@Overridepublic void doHealthCheck() {// 获取到当前客户端上注册的全部节点for (Service each : client.getAllPublishedService()) {// 如果开启了健康检查if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) {// 拿到实例注册信息InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each);// 拿到集群元数据ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo);// 调用 HealthCheckProcessorV2Delegate.process()ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(this, each, metadata);
}}}
@Overridepublic void run() {doHealthCheck();}}
最终调用了 HealthCheckProcessorV2Delegate.process 方法
看看如何实现
HealthCheckProcessorV2Delegate 这个类就是一个委托类
public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {// 类型,健康检查实现类private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
@Autowiredpublic void addProcessor(Collection<HealthCheckProcessorV2> processors) {healthCheckProcessorMap.putAll(processors.stream().filter(processor -> processor.getType() != null).collect(Collectors.toMap(HealthCheckProcessorV2::getType, processor -> processor)));}@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {// 从元数据中获取到当前的健康检查类型 (HTTP、MySQL、TCP、None)String type = metadata.getHealthyCheckType();// 根据类型找到具体的 健康检查类HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type);if (processor == null) {// 找不到 就使用 None 健康检查processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);}// 开始进行健康检查processor.process(task, service, metadata);}
}
健康检查有如下几类,还可通过 SPI 方式扩展
下面一个一个介绍
NoneHealthCheckProcessor
None 代表不做健康检查,所以这个类的Process 为空实现
public class NoneHealthCheckProcessor implements HealthCheckProcessorV2 {public static final String TYPE = HealthCheckType.NONE.name();@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {}@Overridepublic String getType() {return TYPE;}
}
TcpHealthCheckProcessor
TCP 健康检查,用于通过 TCP 方式检查是否健康,本质上是通过建立 Socket 连接,发送 Socket 信息实现
public class TcpHealthCheckProcessor implements HealthCheckProcessorV2, Runnable {@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {// 省略}// 省略部分代码private class TaskProcessor implements Callable<Void> {
@Overridepublic Void call() {// 发送 Socket 请求SocketChannel channel = null;HealthCheckInstancePublishInfo instance = beat.getInstance();BeatKey beatKey = keyMap.get(beat.toString());if (beatKey != null && beatKey.key.isValid()) {if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {instance.finishCheck();return null;}beatKey.key.cancel();beatKey.key.channel().close();}channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);ClusterMetadata cluster = beat.getMetadata();int port = cluster.isUseInstancePortForCheck() ? instance.getPort() : cluster.getHealthyCheckPort();channel.connect(new InetSocketAddress(instance.getIp(), port));SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key));beat.setStartTime(System.currentTimeMillis());GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);return null;}}
}
MysqlHealthCheckProcessor
本质是 发送一个 sql。(sql从配置中获取),整个过程没报异常就算健康
public class MysqlHealthCheckProcessor implements HealthCheckProcessorV2 {public static final String TYPE = HealthCheckType.MYSQL.name();private final HealthCheckCommonV2 healthCheckCommon;private final SwitchDomain switchDomain;public static final int CONNECT_TIMEOUT_MS = 500;private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'";private static final String MYSQL_SLAVE_READONLY = "ON";private static final ConcurrentMap<String, Connection> CONNECTION_POOL = new ConcurrentHashMap<String, Connection>();public MysqlHealthCheckProcessor(HealthCheckCommonV2 healthCheckCommon, SwitchDomain switchDomain) {this.healthCheckCommon = healthCheckCommon;this.switchDomain = switchDomain;}@Overridepublic String getType() {return TYPE;}@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {// 省略}private class MysqlCheckTask implements Runnable {@Overridepublic void run() {Statement statement = null;ResultSet resultSet = null;String clusterName = instance.getCluster();String key =service.getGroupedServiceName() + ":" + clusterName + ":" + instance.getIp() + ":" + instance.getPort();Connection connection = CONNECTION_POOL.get(key);Mysql config = (Mysql) metadata.getHealthChecker();if (connection == null || connection.isClosed()) {String url = "jdbc:mysql://" + instance.getIp() + ":" + instance.getPort() + "?connectTimeout="+ CONNECT_TIMEOUT_MS + "&socketTimeout=" + CONNECT_TIMEOUT_MS + "&loginTimeout=" + 1;connection = DriverManager.getConnection(url, config.getUser(), config.getPwd());CONNECTION_POOL.put(key, connection);}statement = connection.createStatement();statement.setQueryTimeout(1);resultSet = statement.executeQuery(config.getCmd());int resultColumnIndex = 2;if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {resultSet.next();if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {throw new IllegalStateException("current node is slave!");}}healthCheckCommon.checkOk(task, service, "mysql:+ok");healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,switchDomain.getMysqlHealthParams());
}
}
HttpHealthCheckProcessor
本质是发送一个 HTTP 请求,返回状态码 200 就算健康
public class HttpHealthCheckProcessor implements HealthCheckProcessorV2 {public static final String TYPE = HealthCheckType.HTTP.name();private static final NacosAsyncRestTemplate ASYNC_REST_TEMPLATE = HttpClientManager.getProcessorNacosAsyncRestTemplate();private final HealthCheckCommonV2 healthCheckCommon;
@Overridepublic void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient().getInstancePublishInfo(service);if (null == instance) {return;}try {if (!instance.tryStartCheck()) {SRV_LOG.warn("http check started before last one finished, service: {} : {} : {}:{}",service.getGroupedServiceName(), instance.getCluster(), instance.getIp(), instance.getPort());healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());return;}Http healthChecker = (Http) metadata.getHealthChecker();int ckPort = metadata.isUseInstancePortForCheck() ? instance.getPort() : metadata.getHealthyCheckPort();URL host = new URL(HTTP_PREFIX + instance.getIp() + ":" + ckPort);URL target = new URL(host, healthChecker.getPath());Map<String, String> customHeaders = healthChecker.getCustomHeaders();Header header = Header.newInstance();header.addAll(customHeaders);// 发送 HTTP 请求ASYNC_REST_TEMPLATE.get(target.toString(), header, Query.EMPTY, String.class,new HttpHealthCheckCallback(instance, task, service));MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();} catch (Throwable e) {instance.setCheckRt(switchDomain.getHttpHealthParams().getMax());healthCheckCommon.checkFail(task, service, "http:error:" + e.getMessage());healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,switchDomain.getHttpHealthParams());}}@Overridepublic String getType() {return TYPE;}private class HttpHealthCheckCallback implements Callback<String> {private final HealthCheckTaskV2 task;private final Service service;private final HealthCheckInstancePublishInfo instance;private long startTime = System.currentTimeMillis();public HttpHealthCheckCallback(HealthCheckInstancePublishInfo instance, HealthCheckTaskV2 task,Service service) {this.instance = instance;this.task = task;this.service = service;}@Overridepublic void onReceive(RestResult<String> result) {instance.setCheckRt(System.currentTimeMillis() - startTime);int httpCode = result.getCode();if (HttpURLConnection.HTTP_OK == httpCode) {healthCheckCommon.checkOk(task, service, "http:" + httpCode);healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,switchDomain.getHttpHealthParams());} else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode|| HttpURLConnection.HTTP_MOVED_TEMP == httpCode) {// server is busy, need verification laterhealthCheckCommon.checkFail(task, service, "http:" + httpCode);healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());} else {//probably means the state files has been removed by administratorhealthCheckCommon.checkFailNow(task, service, "http:" + httpCode);healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,switchDomain.getHttpHealthParams());}}@Overridepublic void onError(Throwable throwable) {Throwable cause = throwable;instance.setCheckRt(System.currentTimeMillis() - startTime);int maxStackDepth = 50;for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {if (HttpUtils.isTimeoutException(cause)) {healthCheckCommon.checkFail(task, service, "http:" + cause.getMessage());healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,switchDomain.getHttpHealthParams());return;}cause = cause.getCause();}if (throwable instanceof ConnectException) {healthCheckCommon.checkFailNow(task, service, "http:unable2connect:" + throwable.getMessage());} else {healthCheckCommon.checkFail(task, service, "http:error:" + throwable.getMessage());}healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,switchDomain.getHttpHealthParams());}@Overridepublic void onCancel() {}}
}
可见,不同方式的健康检查差异还是挺大的,那么如果将检查结果告知 Nacos 呢,那就是调用
// 健康检查结果:健康
healthCheckCommon.checkOk(task, service, "");// 健康检查结果:不健康
healthCheckCommon.checkFail(task, service, "");
健康检查成功
检查成功主要做的事情就是 重置失败次数、结束检查
public void checkOk(HealthCheckTaskV2 task, Service service, String msg) {try {HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient().getInstancePublishInfo(service);if (instance == null) {// 实例不存在,不做处理return;}if (!instance.isHealthy()) {// 如果实例不健康的,将状态改为 健康// 代码省略}} finally {// 重置失败次数instance.resetFailCount();// 结束检查instance.finishCheck();}
}
健康检查失
public void checkFail(HealthCheckTaskV2 task, Service service, String msg) {
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient().getInstancePublishInfo(service);if (instance == null) {return;}try {if (instance.isHealthy()) {// 如果实例是健康的,将状态改为 不健康// 代码省略}} finally {// 重置健康次数instance.resetOkCount();// 结束检查instance.finishCheck();}
}
相关文章:

Nacos 注册中心 - 健康检查机制源码
目录 1. 健康检查介绍 2. 客户端健康检查 2.1 临时实例的健康检查 2.2 永久实例的健康检查 3. 服务端健康检查 3.1 临时实例的健康检查 3.2 永久实例服务端健康检查 1. 健康检查介绍 当一个服务实例注册到 Nacos 中后,其他服务就可以从 Nacos 中查询出该服务…...

Transformer在计算机视觉中的应用-VIT、TNT模型
上期介绍了Transformer的结构、特点和作用等方面的知识,回头看下来这一模型并不难,依旧是传统机器翻译模型中常见的seq2seq网络,里面加入了注意力机制,QKV矩阵的运算使得计算并行。 当然,最大的重点不是矩阵运算&…...

快速入门Zookeeper技术.黑马教程
快速入门Zookeeper技术.黑马教程一、初识 Zookeeper二、ZooKeeper 安装与配置三、ZooKeeper 命令操作1.Zookeeper 数据模型2.Zookeeper 服务端常用命令3.Zookeeper 客户端常用命令四、ZooKeeper JavaAPI 操作五、ZooKeeper JavaAPI 操作1.Curator 介绍2.Curator API 常用操作2.…...
网易C++实习一面
说下C11新特性 auto有没有效率上的问题?为什么?发生在什么时候? 说下单例模式 什么时候需要加锁,什么时候不需要加锁? 像printf这样的函数,自己本身不修改数据,但是其他人会修改数据&#x…...

进程和线程的区别和联系
进程和线程的区别和联系1. 认识线程2. 进程和线程的关系3. 进程和线程的区别4. 线程共享了进程哪些资源1. 上下文切换2. 线程共享了进程哪些资源1.代码区2. 数据区3. 堆区1. 认识线程 线程是进程的一个实体,它被包含在进程中,一个进程至少包含一个线程,一个进程也可以包含多个…...

Java学习笔记——集合
目录集合与数组的对比集合体系结构Collection——常见成员方法Collection——迭代器基本使用Collection——迭代器原理分析Collection——迭代器删除方法增强for——基本格式增强for——注意点Collection——练习集合与数组的对比 package top.xxxx.www.CollectionDemo;import …...

差分运放公式推导-运算放大器
不知道大家有没遇到这种情况,在计算电路的时候,有时候会突然的忘记一些公式啊啥的,需要回去翻看笔记或者查资料,知其然而不知其所以然。今天跟大家一起来一起推导一遍差分运放的计算过程。 计算过程其实归根结底还是根据运放的虚…...
金丹二层 —— 字符串长度求解的四种方法
前言: 1.CSDN由于我的排版不怎么好看,我的有道云笔记比较美观,请移步有道云笔记 2.修炼必备 1)入门必备:VS2019社区版,下载地址:Visual Studio 较旧的下载 - 2019、2017、2015 和以前的版本 (m…...

深入剖析Linux——进程信号
致前行的人: 要努力,但不着急,繁花锦簇,硕果累累都需要过程! 目录 1.信号概念 1.1生活角度的信号 2. 技术应用角度的信号 3.Linux操作系统中查看信号 4.常用信号发送 4.1通过键盘发送信号 4.2调用系统函数发送信号 4.3…...

API-Server的监听器Controller的List分页失效
前言 最近做项目,还是K8S的插件监听器(理论上插件都是通过API-server通信),官方的不同写法居然都能出现争议,争议点就是对API-Server的请求的耗时,说是会影响API-Server。实际上通过源码分析两着有差别&am…...
jupyter notebook 进阶使用:nbextensions,终极避坑
jupyter notebook 进阶使用:nbextensions,终极避坑吐槽安装 jupyter_contrib_nbextensions1. Install the python package(安装python包)方法一,PIP:方法二,Conda(推荐)&…...
C 语言编程 — Doxygen + Graphviz 静态项目分析
目录 文章目录目录安装配置解析Project related configuration optionsBuild related configuration optionsConfiguration options related to warning and progress messagesConfiguration options related to the input filesConfiguration options related to source brows…...

Mybatis报BindingException:Invalid bound statement (not found)异常
一、前言 本文的mybatis是与springboot整合时出现的异常,若使用的不是基于springboot,解决思路也大体一样的。 二、从整合mybatis的三个步骤排查问题 但在这之前,我们先要知道整合mybatis的三个重要的工作,如此才能排查&#x…...

HttpRunner3.x(1)-框架介绍
HttpRunner 是一款面向 HTTP(S) 协议的通用测试框架,只需编写维护一份 YAML/JSON 脚本,即可实现自动化测试、性能测试、线上监控、持续集成等多种测试需求。主要特征继承的所有强大功能requests ,只需以人工方式获得乐趣即可处理HTTP…...

pytest学习和使用20-pytes如何进行分布式测试?(pytest-xdist)
20-pytes如何进行分布式测试?(pytest-xdist)1 什么是分布式测试?2 为什么要进行分布式测试?2.1 场景1:自动化测试场景2.2 场景2:性能测试场景3 分布式测试有什么特点?4 分布式测试关…...

三、Python 操作 MongoDB ----非 ODM
文章目录一、连接器的安装和配置二、新增文档三、查询文档四、更新文档五、删除文档一、连接器的安装和配置 pymongo: MongoDB 官方提供的 Python 工具包。官方文档: https://pymongo.readthedocs.io/en/stable/ pip安装,命令如下࿱…...
求最大公约数和最小公倍数---辗转相除法(欧几里得算法)
目录 一.GCD和LCM 1.最大公约数 2.最小公倍数 二.暴力求解 1.最大公约数 2.最小公倍数 三.辗转相除法 1.最大公约数 2.最小公倍数 一.GCD和LCM 1.最大公约数 最大公约数(Greatest Common Divisor,简称GCD)指的是两个或多个整数共有…...

音视频开发_获取媒体文件的详细信息
一、前言 做音视频开发过程中,经常需要获取媒体文件的详细信息。 比如:获取视频文件的总时间、帧率、尺寸、码率等等信息。 获取音频文件的的总时间、帧率、码率,声道等信息。 这篇文章贴出2个我封装好的函数,直接调用就能获取媒体信息返回,copy过去就能使用,非常方便。…...

Springboot集成Swagger
一、Swagger简介注意点! 在正式发布的时候要关闭swagger(出于安全考虑,而且节省内存空间)之前开发的时候,前端只用管理静态页面, http请求到后端, 模板引擎JSP,故后端是主力如今是前…...

Vue全新一代状态管理库 Pinia【一篇通】
文章目录前言1. Pinia 是什么?1.1 为什么取名叫 Pinia?1.2. 为什么要使用 Pinia ?2. 安装 Pinia2.1.创建 Store2.1.1. Option 类型 Store2.1.2 Setup 函数类型 Store2.1.3 模板中使用3. State 的使用事项(Option Store )3.1 读取 State3.2 …...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验
一、多模态商品数据接口的技术架构 (一)多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如,当用户上传一张“蓝色连衣裙”的图片时,接口可自动提取图像中的颜色(RGB值&…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...

AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...

20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

MFC 抛体运动模拟:常见问题解决与界面美化
在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

ui框架-文件列表展示
ui框架-文件列表展示 介绍 UI框架的文件列表展示组件,可以展示文件夹,支持列表展示和图标展示模式。组件提供了丰富的功能和可配置选项,适用于文件管理、文件上传等场景。 功能特性 支持列表模式和网格模式的切换展示支持文件和文件夹的层…...