广州市政府网站集约化建设方案/老司机们用的关键词有哪些
上文说了Nacos配置中心客户端的源码流程,这篇介绍下Nacos配置中心服务端的源码。
服务端的启动
先来看服务启动时干了啥?
init()方法上面有@PostConstruct,该方法会在ExternalDumpService实例化后执行。
com.alibaba.nacos.config.server.service.dump.ExternalDumpService#init
@PostConstruct
@Override
protected void init() throws Throwable {dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
}
dumpOperate()主要干了两件事:
- dumpConfigInfo(),这个方法里面也是调用的DumpAllTask
- 提交DumpAllTask的定时任务
com.alibaba.nacos.config.server.service.dump.DumpService#dumpOperate
protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {String dumpFileContext = "CONFIG_DUMP_TO_FILE";TimerContext.start(dumpFileContext);try {LogUtil.DEFAULT_LOG.warn("DumpService start");Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());... ...try {// 转存配置// 执行一次DumpAllTaskdumpConfigInfo(dumpAllProcessor);... ...} catch (Exception e) {LogUtil.FATAL_LOG.error("Nacos Server did not start because dumpservice bean construction failure :\n" + e.toString());throw new NacosException(NacosException.SERVER_ERROR,"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),e);}if (!EnvUtil.getStandaloneMode()) {Runnable heartbeat = () -> {String heartBeatTime = TimeUtils.getCurrentTime().toString();// write disktry {DiskUtil.saveHeartBeatToDisk(heartBeatTime);} catch (IOException e) {LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());}};ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);// 6个小时执行一次DumpAllTaskConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);}ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);} finally {TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);}}
dumpConfigInfo()里面还是执行了DumpAllTask。
com.alibaba.nacos.config.server.service.dump.DumpService#dumpConfigInfo
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {int timeStep = 6;Boolean isAllDump = true;// initial dump allFileInputStream fis = null;Timestamp heartheatLastStamp = null;try {... ...if (isAllDump) {LogUtil.DEFAULT_LOG.info("start clear all config-info.");DiskUtil.clearAll();// 执行DumpAllTaskdumpAllProcessor.process(new DumpAllTask());} else {... ...}} catch (IOException e) {LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());throw e;} finally {if (null != fis) {try {fis.close();} catch (IOException e) {LogUtil.DEFAULT_LOG.warn("close file failed");}}}
}
process()会分页查询出数据库的所有配置,然后一个一个调用ConfigCacheService.dump()。
com.alibaba.nacos.config.server.service.dump.processor.DumpAllProcessor#process
public boolean process(NacosTask task) {long currentMaxId = persistService.findConfigMaxId();long lastMaxId = 0;while (lastMaxId < currentMaxId) {// 分页查询出数据库的所有配置Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {for (ConfigInfoWrapper cf : page.getPageItems()) {long id = cf.getId();lastMaxId = id > lastMaxId ? id : lastMaxId;if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {AggrWhitelist.load(cf.getContent());}if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {ClientIpWhiteList.load(cf.getContent());}if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {SwitchService.load(cf.getContent());}// dump为文件boolean result = ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),cf.getType());final String content = cf.getContent();final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),md5);}DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);} else {lastMaxId += PAGE_SIZE;}}return true;
}
dump()就是将数据库的配置,保存到本地,一个配置对应一个文件,这样客户端来查询配置,直接查的本地文件,而不是查数据库。
com.alibaba.nacos.config.server.service.ConfigCacheService#dump
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,String type) {String groupKey = GroupKey2.getKey(dataId, group, tenant);CacheItem ci = makeSure(groupKey);ci.setType(type);final int lockResult = tryWriteLock(groupKey);assert (lockResult != 0);if (lockResult < 0) {DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);return false;}try {final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),lastModifiedTs);} else if (!PropertyUtil.isDirectRead()) {// 写入磁盘DiskUtil.saveToDisk(dataId, group, tenant, content);}// 更新md5,发布LocalDataChangeEvent事件updateMd5(groupKey, md5, lastModifiedTs);return true;} catch (IOException ioe) {DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);if (ioe.getMessage() != null) {String errMsg = ioe.getMessage();if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg.contains(DISK_QUATA_EN)) {// Protect from disk full.FATAL_LOG.error("磁盘满自杀退出", ioe);System.exit(0);}}return false;} finally {releaseWriteLock(groupKey);}
}
服务启动过程中主要就是将数据库的配置全部保存到本地。
客户端来查询配置
客户端启动时会调用/v1/cs/configs
来查询配置。
com.alibaba.nacos.config.server.controller.ConfigController#getConfig
@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam("dataId") String dataId, @RequestParam("group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "tag", required = false) String tag)throws IOException, ServletException, NacosException {// 读取配置的入口// check tenantParamUtils.checkTenant(tenant);tenant = NamespaceUtil.processNamespaceParameter(tenant);// check paramsParamUtils.checkParam(dataId, group, "datumId", "content");ParamUtils.checkParam(tag);final String clientIp = RequestUtil.getRemoteIp(request);inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
doGetConfig()直接找到文件,使用jdk的零拷贝传输直接将文件输入流转response输出流中。
com.alibaba.nacos.config.server.controller.ConfigServletInner#doGetConfig
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,String tenant, String tag, String clientIp) throws IOException, ServletException {final String groupKey = GroupKey2.getKey(dataId, group, tenant);String autoTag = request.getHeader("Vipserver-Tag");String requestIpApp = RequestUtil.getAppName(request);int lockResult = tryConfigReadLock(groupKey);final String requestIp = RequestUtil.getRemoteIp(request);boolean isBeta = false;if (lockResult > 0) {// LockResult > 0 means cacheItem is not null and other thread can`t delete this cacheItemFileInputStream fis = null;try {String md5 = Constants.NULL;long lastModified = 0L;CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);if (cacheItem.isBeta() && cacheItem.getIps4Beta().contains(clientIp)) {isBeta = true;}final String configType =(null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();response.setHeader("Config-Type", configType);FileTypeEnum fileTypeEnum = FileTypeEnum.getFileTypeEnumByFileExtensionOrFileType(configType);String contentTypeHeader = fileTypeEnum.getContentType();response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader);File file = null;ConfigInfoBase configInfoBase = null;PrintWriter out = null;if (isBeta) {md5 = cacheItem.getMd54Beta();lastModified = cacheItem.getLastModifiedTs4Beta();if (PropertyUtil.isDirectRead()) {configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);} else {file = DiskUtil.targetBetaFile(dataId, group, tenant);}response.setHeader("isBeta", "true");} else {if (StringUtils.isBlank(tag)) {if (isUseTag(cacheItem, autoTag)) {... ...} else {md5 = cacheItem.getMd5();lastModified = cacheItem.getLastModifiedTs();if (PropertyUtil.isDirectRead()) {// 单节点模式,直接读取数据库configInfoBase = persistService.findConfigInfo(dataId, group, tenant);} else {// 集群模式,读取磁盘文件file = DiskUtil.targetFile(dataId, group, tenant);}... ...}} else {... ...}}response.setHeader(Constants.CONTENT_MD5, md5);// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");if (PropertyUtil.isDirectRead()) {response.setDateHeader("Last-Modified", lastModified);} else {fis = new FileInputStream(file);response.setDateHeader("Last-Modified", file.lastModified());}if (PropertyUtil.isDirectRead()) {out = response.getWriter();out.print(configInfoBase.getContent());out.flush();out.close();} else {// 零拷贝fis.getChannel().transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));}LogUtil.PULL_CHECK_LOG.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());final long delayed = System.currentTimeMillis() - lastModified;// TODO distinguish pull-get && push-get/*Otherwise, delayed cannot be used as the basis of push delay directly,because the delayed value of active get requests is very large.*/ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,ConfigTraceService.PULL_EVENT_OK, delayed, requestIp);} finally {releaseConfigReadLock(groupKey);IoUtils.closeQuietly(fis);}} else if (lockResult == 0) {
... ...} else {
... ...}return HttpServletResponse.SC_OK + "";
}
客户端长轮询监听配置
客户端启动成功后,会调用Http接口/v1/cs/configs/listener
长轮询来监听配置的变更。
com.alibaba.nacos.config.server.controller.ConfigController#listener
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {// 监听配置更新的入口request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);String probeModify = request.getParameter("Listening-Configs");if (StringUtils.isBlank(probeModify)) {throw new IllegalArgumentException("invalid probeModify");}probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);Map<String, String> clientMd5Map;try {clientMd5Map = MD5Util.getClientMd5Map(probeModify);} catch (Throwable e) {throw new IllegalArgumentException("invalid probeModify");}// 长轮询// do long-pollinginner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
doPollingConfig()会判断是否支持长轮询,依据是header是否包含Long-Pulling-Timeout
属性。
com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {// Long polling.if (LongPollingService.isSupportLongPolling(request)) {// 支持长轮询longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);return HttpServletResponse.SC_OK + "";}
... ...
}
addLongPollingClient()会将客户端保存起来,方便后面有配置变更时找到客户端并进行响应。
com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.long timeout = Math.max(10000, Long.parseLong(str) - delayTime);if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// Do nothing but set fix polling timeout.} else {long start = System.currentTimeMillis();// 校验md5List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);if (changedGroups.size() > 0) {// 如果有变更立马返回generateResponse(req, rsp, changedGroups);LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {// 如果是初始化请求,直接返回,不挂起LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;}}String ip = RequestUtil.getRemoteIp(req);// Must be called by http thread, or send response.final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout() is incorrect, Control by oneselfasyncContext.setTimeout(0L);// 如果md5是一样的,异步执行ClientLongPollingConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
ClientLongPolling()会启动一个延时30执行的任务,如果30s内配置没有变更,任务就会执行,对客户端进行响应,如果30s内配置发生了变更,此任务就会被取消。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#run
public void run() {// 延时30s执行asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {@Overridepublic void run() {try {getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());// Delete subsciber's relations.allSubs.remove(ClientLongPolling.this);if (isFixedPolling()) {... ...} else {LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),"polling", clientMd5Map.size(), probeRequestSize);// 超时直接返回sendResponse(null);}} catch (Throwable t) {LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());}}}, timeoutTime, TimeUnit.MILLISECONDS);// 将客户端端缓存至队列中allSubs.add(this);
}
sendResponse()对客户端进行响应,如果配置有变更,就会取消上面创建的任务。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#sendResponse
void sendResponse(List<String> changedGroups) {// Cancel time out task.if (null != asyncTimeoutFuture) {// 取消任务asyncTimeoutFuture.cancel(false);}generateResponse(changedGroups);
}
generateResponse()会将变更配置的dataId和group新信息返回给客户端,并不会返回具体的配置内容,内容会由客户端来查询。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#generateResponse
void generateResponse(List<String> changedGroups) {if (null == changedGroups) {// Tell web container to send http response.asyncContext.complete();return;}HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();try {// 封装更新的配置,返回客户端final String respString = MD5Util.compareMd5ResultString(changedGroups);// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);response.getWriter().println(respString);asyncContext.complete();} catch (Exception ex) {PULL_LOG.error(ex.toString(), ex);asyncContext.complete();}
}
配置变更通知客户端
当在Nacos管理后台修改了配置后,会调用/v1/cs/configs
来更新配置。
publishConfig()会将配置保存到数据库中,并发布ConfigDataChangeEvent事件。
com.alibaba.nacos.config.server.controller.ConfigController#publishConfig
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {// 修改配置入口final String srcIp = RequestUtil.getRemoteIp(request);final String requestIpApp = RequestUtil.getAppName(request);srcUser = RequestUtil.getSrcUserName(request);//check typeif (!ConfigType.isValidType(type)) {type = ConfigType.getDefaultType().getType();}// check tenantParamUtils.checkTenant(tenant);ParamUtils.checkParam(dataId, group, "datumId", content);ParamUtils.checkParam(tag);Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);ParamUtils.checkParam(configAdvanceInfo);if (AggrWhitelist.isAggrDataId(dataId)) {LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),dataId, group);throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");}final Timestamp time = TimeUtils.getCurrentTime();String betaIps = request.getHeader("betaIps");ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);configInfo.setType(type);if (StringUtils.isBlank(betaIps)) {if (StringUtils.isBlank(tag)) {// 插入数据库persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);// 发布ConfigDataChangeEvent事件/*** AsyncNotifyService监听了ConfigDataChangeEvent事件* @see AsyncNotifyService#AsyncNotifyService(com.alibaba.nacos.core.cluster.ServerMemberManager)*/ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));} else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));}} else {// beta publishpersistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));}ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),ConfigTraceService.PERSISTENCE_EVENT_PUB, content);return true;
}
AsyncNotifyService监听了ConfigDataChangeEvent事件,然后提交了AsyncTask任务来对Nacos集群中的节点进行通知配置的变化。
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService
public AsyncNotifyService(ServerMemberManager memberManager) {this.memberManager = memberManager;// Register ConfigDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);// Register A Subscriber to subscribe ConfigDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {// Generate ConfigDataChangeEvent concurrentlyif (event instanceof ConfigDataChangeEvent) {// 监听ConfigDataChangeEvent事件ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;Collection<Member> ipList = memberManager.allMembers();// In fact, any type of queue here can be// 遍历集群中的所有节点,封装NotifySingleTaskQueue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();for (Member member : ipList) {queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),evt.isBeta));}// 提交AsyncTask任务,AsyncTask中包含了NotifySingleTask/*** @see AsyncTask#run()*/ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));}}@Overridepublic Class<? extends Event> subscribeType() {return ConfigDataChangeEvent.class;}});
}
AsyncTask.run()会调用Nacos集群中的所有节点(包含自己)的Http接口/v1/cs/communication/dataChange来通知配置的变化。
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncTask#run
@Override
public void run() {executeAsyncInvoke();
}private void executeAsyncInvoke() {// 遍历所有的NotifySingleTask任务while (!queue.isEmpty()) {NotifySingleTask task = queue.poll();String targetIp = task.getTargetIP();if (memberManager.hasMember(targetIp)) {// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notifyboolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);if (unHealthNeedDelay) {// target ip is unhealthy, then put it in the notification listConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, task.target);// get delay time and set fail count to the taskasyncTaskExecute(task);} else {Header header = Header.newInstance();header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());if (task.isBeta) {header.addParam("isBeta", "true");}AuthHeaderUtil.addIdentityToHeader(header);// 调用/v1/cs/communication/dataChange接口/*** @see CommunicationController#notifyConfigInfo(javax.servlet.http.HttpServletRequest, java.lang.String, java.lang.String, java.lang.String, java.lang.String)*/restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));}}}
}
notifyConfigInfo()主要负责将变化的配置从数据库中查询出来,然后更新本地的文件。
com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,@RequestParam("group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "tag", required = false) String tag) {// 通知配置数据变更的入口dataId = dataId.trim();group = group.trim();String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);String isBetaStr = request.getHeader("isBeta");if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);} else {// 转存数据dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);}return true;
}
dump()操作又提交了一个DumpTask任务。
com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, java.lang.String)
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) {dump(dataId, group, tenant, tag, lastModified, handleIp, false);
}public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,boolean isBeta) {String groupKey = GroupKey2.getKey(dataId, group, tenant);String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);// 添加DumpTask任务/*** @see DumpProcessor#process(com.alibaba.nacos.common.task.NacosTask)*/dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
process()会将变化的配置从数据库中查询出来,交于DumpConfigHandler.configDump()处理配置。
com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
public boolean process(NacosTask task) {// 处理DumpTaskfinal PersistService persistService = dumpService.getPersistService();DumpTask dumpTask = (DumpTask) task;String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());String dataId = pair[0];String group = pair[1];String tenant = pair[2];long lastModified = dumpTask.getLastModified();String handleIp = dumpTask.getHandleIp();boolean isBeta = dumpTask.isBeta();String tag = dumpTask.getTag();ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId).group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);if (isBeta) {
。。。。。。} else {if (StringUtils.isBlank(tag)) {// 从数据库查询配置数据ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);build.remove(Objects.isNull(cf));build.content(Objects.isNull(cf) ? null : cf.getContent());build.type(Objects.isNull(cf) ? null : cf.getType());// 转存配置数据return DumpConfigHandler.configDump(build.build());} else {
。。。。。。}}
}
configDump()又调用了ConfigCacheService.dump(),这个方法在服务端启动时保存所有的配置文件时也使用了。
com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump
public static boolean configDump(ConfigDumpEvent event) {final String dataId = event.getDataId();final String group = event.getGroup();final String namespaceId = event.getNamespaceId();final String content = event.getContent();final String type = event.getType();final long lastModified = event.getLastModifiedTs();if (event.isBeta()) {。。。。。。}if (StringUtils.isBlank(event.getTag())) {if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {AggrWhitelist.load(content);}if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {ClientIpWhiteList.load(content);}if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {SwitchService.load(content);}boolean result;if (!event.isRemove()) {// dump数据result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);if (result) {ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,content.length());}} else {。。。。。。}return result;} else {。。。。。。}}
dump()会将新的配置写入磁盘文件,更新md5,然后发布LocalDataChangeEvent事件。
com.alibaba.nacos.config.server.service.ConfigCacheService#dump
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,String type) {String groupKey = GroupKey2.getKey(dataId, group, tenant);CacheItem ci = makeSure(groupKey);ci.setType(type);final int lockResult = tryWriteLock(groupKey);assert (lockResult != 0);if (lockResult < 0) {DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);return false;}try {final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),lastModifiedTs);} else if (!PropertyUtil.isDirectRead()) {// 写入磁盘DiskUtil.saveToDisk(dataId, group, tenant, content);}// 更新md5,发布LocalDataChangeEvent事件updateMd5(groupKey, md5, lastModifiedTs);return true;} catch (IOException ioe) {DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);if (ioe.getMessage() != null) {String errMsg = ioe.getMessage();if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg.contains(DISK_QUATA_EN)) {// Protect from disk full.FATAL_LOG.error("磁盘满自杀退出", ioe);System.exit(0);}}return false;} finally {releaseWriteLock(groupKey);}
}
updateMd5()会更新md5,然后发布LocalDataChangeEvent事件。
com.alibaba.nacos.config.server.service.ConfigCacheService#updateMd5
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {CacheItem cache = makeSure(groupKey);if (cache.md5 == null || !cache.md5.equals(md5)) {cache.md5 = md5;cache.lastModifiedTs = lastModifiedTs;// 发布LocalDataChangeEvent事件/*** LongPollingService监听了LocalDataChangeEvent事件* @see LongPollingService#LongPollingService()*/NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));}
}
LongPollingService会监听LocalDataChangeEvent事件,然后提交DataChangeTask。
com.alibaba.nacos.config.server.service.LongPollingService#LongPollingService
public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);// Register LocalDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);// Register A Subscriber to subscribe LocalDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// Ignore.} else {// 监听LocalDataChangeEvent事件if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent) event;// 提交DataChangeTask任务/*** @see DataChangeTask#run()*/ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}@Overridepublic Class<? extends Event> subscribeType() {return LocalDataChangeEvent.class;}});}
DataChangeTask会找到监听这个配置的客户端,然后进行通知。
com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run
public void run() {try {ConfigCacheService.getContentBetaMd5(groupKey);for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {ClientLongPolling clientSub = iter.next();// 找到监听这个配置的客户端if (clientSub.clientMd5Map.containsKey(groupKey)) {// If published tag is not in the beta list, then it skipped.if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {continue;}// If published tag is not in the tag list, then it skipped.if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {continue;}getRetainIps().put(clientSub.ip, System.currentTimeMillis());iter.remove(); // Delete subscribers' relationships.LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",RequestUtil.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);// 通知客户端配置更新了clientSub.sendResponse(Arrays.asList(groupKey));}}} catch (Throwable t) {LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));}
}
相关文章:

【Nacos】Nacos配置中心服务端源码分析
上文说了Nacos配置中心客户端的源码流程,这篇介绍下Nacos配置中心服务端的源码。 服务端的启动 先来看服务启动时干了啥? init()方法上面有PostConstruct,该方法会在ExternalDumpService实例化后执行。 com.alibaba.nacos.config.server.s…...

第十五章 栅格数据重分类、栅格计算器、插值分析
文章目录第十五章 栅格数据分析第一章 栅格数据重分类第一节 栅格数据重分类第二节 栅格重分类的使用第三节 重分类的使用中的空值使用第四节 重分类的案例:分类统计面积第五节 坡度矢量分级图生成第二章 栅格计算器第一节 栅格计算器介绍第二节 栅格计算器使用第三…...

CS5260测试版|CS5260demoboard|typec转VGA参考PCB原理图
CS5260测试版|CS5260demoboard|typec转VGA参考PCB原理图 CS5260是一款高度集成的TYPEC转VGA转换方案芯片。 CS5260输出端接口:外接高清VGA设备如:显示器投影机电视带高清的设备,广泛应用于 笔记本Macbook Air 12寸USB3.1输出端对外接高清VGA设备如:显示器投影机电视…...

winform开发心得
最近一直在从事winform的开发,每次都是需要从网上查找资料才能对应具体风格要求,现在总结一下。 ui方面可以使用CSkin对应的一套ui,使用步骤 1.在窗口界面,工具箱空白处点击右键,弹出菜单有个”选择项“,点…...

学习周报-2023-0210
文章目录一 在SUSE11sp3系统中将openssh从6升级到8一 需求二 系统环境三 部署流程1.上传编译安装的软件包2.安装 gcc编译软件3.安装依赖zlib4.安装依赖openssl5.安装openssh二 在CentOS-6.9配置apache服务(3)---虚拟主机配置一 定义二 系统环境三 基于域…...

百度富文本UE的问题集合
百度富文本编辑能上传视频成功但是在浏览器不能播放、显示的问题百度富文本视频封面空白问题百度富文本编辑器UMEditor 添加视频无法删除百度富文本编辑器结果存数据库取出来到js赋值报错怎么让浏览器重新加载修改过的JS文件,而不是沿用缓存里的百度富文本编辑能上传…...

在Linux上安装node-v14.17.3和npm-6.14.13
记录:374场景:在CentOS 7.9操作系统上,安装node-v14.17.3-linux-x64环境。包括node-v14.17.3和npm-6.14.13。node命令应用和npm命令应用。版本:JDK 1.8 node v14.17.3 npm 6.14.13官网地址:https://nodejs.org/下载地址…...

机器学习框架sklearn之特征降维
目录特征降维概念特征选择过滤式①低方差特征过滤②相关系数③主成分分析特征降维 0维 标量 1维 向量 2维 矩阵 概念 降维是指在某些限定条件下,降低随机变量(特征)个数,得到一组“不相关”主变量的过程 注:正是…...

java实现二叉树(一文带你详细了解二叉树的)
🎇🎇🎇作者: 小鱼不会骑车 🎆🎆🎆专栏: 《数据结构》 🎓🎓🎓个人简介: 一名专科大一在读的小比特,努力学习编程是我唯一…...

学弟学妹少走弯路,超完整算法刷题路线出炉
大家好,我是帅地。 本篇文章主要讲解下面三个事: 1、自己学习算法的一些经历 2、大家学习算法存在的一些普遍问题 3、给大家规划的算法刷题路线 一、算法学习往事 记得当初学了 C 语言就开始刷题了,刷题倒不是面试,而是为了…...

Windows截取gif动态图的软件 ScreenToGif 的安装、使用教程
一、概述 👉GIF(Graphics Interchange Format),又称图形交换格式,是一种公用的图像文件格式标准,于1987年由Compu Serve公司成功研发并推出。 👉GIF用于以超文本标志语言方式显示索引彩色图像&a…...

C++程序设计——多态:虚函数、抽象类、虚函数表
注:以下示例均是在VS2019环境下 一、多态的概念 通俗来讲,多态就是多种形态,当不同的对象去完成某个行为时,会产生出不同的状态。即不同继承关系的类对象,去调用同一函数时,产生不同的行为。 比如”叫“这…...

OpenMMLab AI实战营 第6课 语义分割与MMSegmentation
第6课 语义分割与MMSegmentation 1. 语义分割简介 任务:将图像按照物体的类别分割成不同的区域,等价于对每个像素进行分类应用 无人驾驶人像分割智能遥感医疗影像分析 语义分割 vs 实例分割 vs 全景分割 语义分割:仅考虑像素的类别…...

产业互联网是对互联网的衍生和进化,也是一次重塑和再造
互联网并不仅仅只是充当撮合和中介的角色,它应当具备更多的功能和意义。只有这样,它的发展才能够真正全面和完善。产业互联网的衍生和出现,正是在互联网进化的基础之上出现的。这是我们看到之所以会有那么多的互联网玩家投身到产业互联网的浪…...

Shell脚本之——Hadoop3单机版安装
目录 1.解压 2.文件重命名 3.配置环境变量 4.hadoop-env.sh 5.core-site.xml 6. hdfs-site.xml 7. mapred-site.xml 8.yarn-site.xml 9.完整脚本代码(注意修改主机名) 10.重启环境变量 11.初始化 12.启动服务 13.jps查询节点 1.解压 tar -zxf /opt/install/hadoo…...

代码随想录NO39 |0-1背包问题理论基础 416.分割等和子集
0-1背包问题理论基础 分割等和子集1. 0-1背包问题理论基础(二维数组实现)2. 0-1背包问题理论基础 二(一维数组实现)1. 0-1背包问题理论基础(二维数组实现) 背包问题一般分为这几种: 0-1背包问题:有n件物品和一个最多能背重量为w…...

FITC-PEG-FA,荧光素-聚乙二醇-叶酸,FA-PEG-FITC,实验室科研试剂,提供质量检测
FITC-PEG-FA,荧光素-聚乙二醇-叶酸 中文名称:荧光素-聚乙二醇-叶酸 英文名称:FITC-PEG-FA 英文别名:Fluorescein-PEG-Folic Acid 性状:基于不同的分子量,呈白色/类白色固体,或粘稠液体。 溶…...

简洁易懂:源码+实战讲解Redisson并发锁及看门狗自动续期
1 缘起 有一次同事问Redisson存储的键是否为hash? 我当时,没有看Redisson的相关源码,只知道应用, 所以没有办法回答,于是开始看看Redisson实现的源码, 顺便写了一个单机Redisson测试, 发现Redi…...

TCP 三次握手和四次挥手
✏️作者:银河罐头 📋系列专栏:JavaEE 🌲“种一棵树最好的时间是十年前,其次是现在” 目录TCP 建立连接(三次握手)为啥不能是 4 次?为啥不能是 2 次?三次握手的意义:TCP 断开连接(四…...

JavaWeb复习
JavaWeb复习一.概述1.概念2.B/S和C/S 架构二.HTTP通信协议概述1.概念2.HTTP1.0 与 HTTP1.1 版本3.HTTP 协议组成4.常见状态码5.GET 与 POST 请求方式三.Tomcat1.Web服务器介绍2.安装(Windows)3.Tomcat目录结构4.server.xml部分配置解释四.Servlet1.概念2…...

P14 PyTorch AutoGrad
前言:激活函数与loss的梯度PyTorch 提供了Auto Grad 功能,这里系统讲解一下torch.autograd.grad系统的工作原理,了解graph 结构目录:1: require_grad False2: require_grad True3: 多层bakcward 原理4: in…...

前端报表如何实现无预览打印解决方案或静默打印
在前端开发中,除了将数据呈现后,我们往往需要为用户提供,打印,导出等能力,导出是为了存档或是二次分析,而打印则因为很多单据需要打印出来作为主要的单据来进行下一环节的票据支撑, 而前端打印可…...

Operating System Course 2 - My OS
Computer Startup process上一篇:http://t.csdn.cn/XfUKt 讲到这个启动设备的第一个扇区:引导扇区。那么引导扇区的代码长什么样子?这里得看引导扇区代码源文件bootsect.s(.s后缀文件为用汇编语言编写的源代码文件)。另…...

离散数学 课时一 命题逻辑的基本概念
1 命题 1、命题:可以判断其真值的陈述句 2、真值:真或者假(1或者0) 3、真命题:真值为真的命题 4、假命题:真值为假的命题 5、原子命题:不可以再被分解成更简单的命题 6、复合命题:由原子命题通过联结词联结…...

Word文档带有权限密码怎么办?
Word文档的权限密码指的是什么?其实这是Word文档的保护方法之一,具体指Word文档的编辑、修改受到了限制,需要输入密码才能进行。 设置了权限密码的Word文档还是可以直接打开,只有当需要编辑或者修改内容的时候,才会发…...

C++多态
1. 多态的概念1.1 概念多态的概念:通俗来说,就是多种形态,具体点就是去完成某个行为,当不同的对象去完成时会产生出不同的状态举个例子:比如买票这个行为,当普通人买票时,是全价买票;…...

访问学者如何申请美国J1签证?
一、申请美国J1签证的步骤: 第一步:填写I901表。 填写I901表会收取SERVIS费用180美元,可以用VISA/Master卡直接网上支付。填完后打印收据单或者存成PDF后续再打印,记下I901收据编号。 第二步:DS-160表填写。 填写DS-…...

使用gitlab ci/cd来发布一个.net 项目
gitlab runner的安装和基本使用:https://bear-coding.blog.csdn.net/article/details/120591711安装并给项目配置完gitlab runner后再操作后面步骤。实现目标:master分支代码有变更的时候自动构建build。当开发人员在gitlab上给项目打一个tag标签分支的时候自动触发…...

笔试题-2023-蔚来-数字芯片设计【纯净题目版】
回到首页:2023 数字IC设计秋招复盘——数十家公司笔试题、面试实录 推荐内容:数字IC设计学习比较实用的资料推荐 题目背景 笔试时间:2022.08.24应聘岗位:校招-芯片逻辑综合工程师-智能硬件笔试时长:90min笔试平台:nowcoder牛客网题目类型:不定项选择题(15道)、填空题…...

ThreadLocal 详解
ThreadLocal简介JDK源码对ThreadLocal类的注释如下:ThreadLocal提供线程局部变量,使得每个线程都有自己的、独立初始化的变量副本ThreadLocal实例通常是类中的private static字段,用于将状态与线程相关联,如用户ID、事务ID只要线程…...