wordpress标签库,安阳企业网站优化外包,厦门软件园多客宝网站开发,网站开发顶岗报告上文我们说了服务启动的时候从远程Nacos服务端拉取配置#xff0c;这节我们来说下Nacos服务端配置的变动怎么实时通知到客户端#xff0c;首先需要注册监听器。
注册监听器
NacosContextRefresher类会监听应用启动发布的ApplicationReadyEvent事件#xff0c;然后进行配置…
上文我们说了服务启动的时候从远程Nacos服务端拉取配置这节我们来说下Nacos服务端配置的变动怎么实时通知到客户端首先需要注册监听器。
注册监听器
NacosContextRefresher类会监听应用启动发布的ApplicationReadyEvent事件然后进行配置监听器的注册。
com.alibaba.cloud.nacos.refresh.NacosContextRefresher#onApplicationEvent
public void onApplicationEvent(ApplicationReadyEvent event) {// many Spring contextif (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}
}registerNacosListenersForApplications()方法里会进行判断如果自动刷新机制是开启的则进行监听器注册。上文我们说到了会将拉到的配置缓存到NacosPropertySourceRepository中 这儿就从缓存中获取所有的配置然后循环进行监听器注册如果配置文件中配置refresh字段为 false则不注册监听器。 com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListenersForApplications
private void registerNacosListenersForApplications() {if (isRefreshEnabled()) {for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) {if (!propertySource.isRefreshable()) {continue;}String dataId propertySource.getDataId();registerNacosListener(propertySource.getGroup(), dataId);}}
}我们可以看到监听器是以dataIdgroupIdnamespace为维度进行注册的后续配置更新时会回调此监听器。
监听器的逻辑主要就三步
REFRESH_COUNT在之前的loadNacosPropertySource()方法有用到往NacosRefreshHistory#records中添加一条刷新记录发布一个RefreshEvent事件该事件是SpringCloud提供的主要就是用来做环境变更刷新用的
com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListener
private void registerNacosListener(final String groupKey, final String dataKey) {String key NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener listenerMap.computeIfAbsent(key,lst - new AbstractSharedListener() {Overridepublic void innerReceive(String dataId, String group,String configInfo) {refreshCountIncrement();nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);// 发布RefreshEvent事件// todo feature: support single refresh for listeningapplicationContext.publishEvent(new RefreshEvent(this, null, Refresh Nacos config));if (log.isDebugEnabled()) {log.debug(String.format(Refresh Nacos config group%s,dataId%s,configInfo%s,group, dataId, configInfo));}}});try {configService.addListener(dataKey, groupKey, listener);}catch (NacosException e) {log.warn(String.format(register fail for nacos listener ,dataId[%s],group[%s], dataKey,groupKey), e);}
}监听器的注册操作又委托到了ConfigService。 com.alibaba.nacos.client.config.NacosConfigService#addListener
public void addListener(String dataId, String group, Listener listener) throws NacosException {worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}监听器的注册在ClientWorker中处理这块会创建一个CacheData对象该对象主要就是用来管理监听器的也是非常重要的一个类。 com.alibaba.nacos.client.config.impl.ClientWorker#addTenantListeners
public void addTenantListeners(String dataId, String group, List? extends Listener listeners)throws NacosException {group blank2defaultGroup(group);String tenant agent.getTenant();CacheData cache addCacheDataIfAbsent(dataId, group, tenant);for (Listener listener : listeners) {cache.addListener(listener);}
}CacheData中中药字段如下
// 可对配置进行拦截处理可用于配置加密解密
private final ConfigFilterChainManager configFilterChainManager;public final String dataId;public final String group;public final String tenant;// 关注此配置的监听器
private final CopyOnWriteArrayListManagerListenerWrap listeners;// 用于比较此配置是否变更
private volatile String md5;/*** whether use local config.*/
private volatile boolean isUseLocalConfig false;/*** last modify time.*/
private volatile long localConfigLastModified;// 配置的内容
private volatile String content;addCacheDataIfAbsent()方法中会将刚才创建的CacheData缓存到ClientWorker中的一个Map中后续会用到。
com.alibaba.nacos.client.config.impl.ClientWorker#addCacheDataIfAbsent
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {String key GroupKey.getKeyTenant(dataId, group, tenant);CacheData cacheData cacheMap.get(key);if (cacheData ! null) {return cacheData;}cacheData new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);// multiple listeners on the same dataidgroup and race conditionCacheData lastCacheData cacheMap.putIfAbsent(key, cacheData);if (lastCacheData null) {//fix issue # 1317if (enableRemoteSyncConfig) {ConfigResponse response getServerConfig(dataId, group, tenant, 3000L);cacheData.setContent(response.getContent());}int taskId cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();cacheData.setTaskId(taskId);lastCacheData cacheData;}// reset so that server not hang this checklastCacheData.setInitializing(true);LOGGER.info([{}] [subscribe] {}, agent.getName(), key);MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());return lastCacheData;
}至此在服务启动后向每一个需要支持热更新的配置都注册了一个监听器用来监听远程配置的变动以及做相应的处理。
获取更新的配置
ClientWorker是在ConfigService的构造方法中创建的。
ClientWorker的构造函数里会去创建两个线程池executor会每隔10ms进行一次配置变更的检查executorService主要是用来处理长轮询请求的。
com.alibaba.nacos.client.config.impl.ClientWorker#ClientWorker
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,final Properties properties) {this.agent agent;this.configFilterChainManager configFilterChainManager;// Initialize the timeout parameterinit(properties);this.executor Executors.newScheduledThreadPool(1, new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread t new Thread(r);t.setName(com.alibaba.nacos.client.Worker. agent.getName());t.setDaemon(true);return t;}});this.executorService Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread t new Thread(r);t.setName(com.alibaba.nacos.client.Worker.longPolling. agent.getName());t.setDaemon(true);return t;}});this.executor.scheduleWithFixedDelay(new Runnable() {Overridepublic void run() {try {// 检查配置信息checkConfigInfo();} catch (Throwable e) {LOGGER.error([ agent.getName() ] [sub-check] rotate check error, e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);
}checkConfigInfo()负责提交长轮询任务。 com.alibaba.nacos.client.config.impl.ClientWorker#checkConfigInfo
public void checkConfigInfo() {// Dispatch tasks.int listenerSize cacheMap.size();// Round up the longingTaskCount.int longingTaskCount (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount currentLongingTaskCount) {for (int i (int) currentLongingTaskCount; i longingTaskCount; i) {// The task list is no order.So it maybe has issues when changing.executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount longingTaskCount;}
}长轮询任务的执行过程。 com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable#run
public void run() {ListCacheData cacheDatas new ArrayListCacheData();ListString inInitializingCacheList new ArrayListString();try {// check failover configfor (CacheData cacheData : cacheMap.values()) {if (cacheData.getTaskId() taskId) {cacheDatas.add(cacheData);try {checkLocalConfig(cacheData);if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}} catch (Exception e) {LOGGER.error(get local config info error, e);}}}// check server config// 校验配置ListString changedGroupKeys checkUpdateDataIds(cacheDatas, inInitializingCacheList);if (!CollectionUtils.isEmpty(changedGroupKeys)) {LOGGER.info(get changedGroupKeys: changedGroupKeys);}for (String groupKey : changedGroupKeys) {String[] key GroupKey.parseKey(groupKey);String dataId key[0];String group key[1];String tenant null;if (key.length 3) {tenant key[2];}try {// 根据dataId从服务端查询最新的配置ConfigResponse response getServerConfig(dataId, group, tenant, 3000L);CacheData cache cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));cache.setContent(response.getContent());cache.setEncryptedDataKey(response.getEncryptedDataKey());if (null ! response.getConfigType()) {cache.setType(response.getConfigType());}LOGGER.info([{}] [data-received] dataId{}, group{}, tenant{}, md5{}, content{}, type{},agent.getName(), dataId, group, tenant, cache.getMd5(),ContentUtils.truncateContent(response.getContent()), response.getConfigType());} catch (NacosException ioe) {String message String.format([%s] [get-update] get changed config exception. dataId%s, group%s, tenant%s,agent.getName(), dataId, group, tenant);LOGGER.error(message, ioe);}}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// 校验md5是否变化有变化就发通知cacheData.checkListenerMd5();cacheData.setInitializing(false);}}inInitializingCacheList.clear();executorService.execute(this);} catch (Throwable e) {// If the rotation training task is abnormal, the next execution time of the task will be punishedLOGGER.error(longPolling error : , e);executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);}
}checkUpdateDataIds()该方法中会将所有的dataId按定义格式拼接出一个字符串构造一个长轮询请求发给服务端Long-Pulling-Timeout 超时时间默认30s如果服务端没有配置变更则会保持该请求直到超时有配置变更则直接返回有变更的dataId列表。 com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateDataIds
ListString checkUpdateDataIds(ListCacheData cacheDatas, ListString inInitializingCacheList) throws Exception {StringBuilder sb new StringBuilder();for (CacheData cacheData : cacheDatas) {if (!cacheData.isUseLocalConfigInfo()) {sb.append(cacheData.dataId).append(WORD_SEPARATOR);sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);} else {sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);}if (cacheData.isInitializing()) {// It updates when cacheData occurs in cacheMap by first time.// 添加要初始化的cacheDatainInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));}}}boolean isInitializingCacheList !inInitializingCacheList.isEmpty();// 检验服务器端的配置return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}checkUpdateConfigStr()会发起HTTP接口/v1/cs/configs/listener的调用。 com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateConfigStr
ListString checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {MapString, String params new HashMapString, String(2);params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);MapString, String headers new HashMapString, String(2);// 使用长轮询headers.put(Long-Pulling-Timeout, timeout);// told server do not hang me up if new initializing cacheData added inif (isInitializingCacheList) {headers.put(Long-Pulling-Timeout-No-Hangup, true);}if (StringUtils.isBlank(probeUpdateString)) {return Collections.emptyList();}try {// In order to prevent the server from handling the delay of the clients long task,// increase the clients read timeout to avoid this problem.long readTimeoutMs timeout (long) Math.round(timeout 1);// 调用远程的监听接口HttpRestResultString result agent.httpPost(Constants.CONFIG_CONTROLLER_PATH /listener, headers, params, agent.getEncode(),readTimeoutMs);if (result.ok()) {setHealthServer(true);return parseUpdateDataIdResponse(result.getData());} else {setHealthServer(false);LOGGER.error([{}] [check-update] get changed dataId error, code: {}, agent.getName(),result.getCode());}} catch (Exception e) {setHealthServer(false);LOGGER.error([ agent.getName() ] [check-update] get changed dataId exception, e);throw e;}return Collections.emptyList();
}checkListenerMd5()主要就是判断两个md5是不是相同不同则调用safeNotifyListener()处理。 com.alibaba.nacos.client.config.impl.CacheData#checkListenerMd5
void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {// 配置有变化通知监听器safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);}}
}safeNotifyListener()方法主要就是调用监听器的receiveConfigInfo()方法然后更新监听器包装器中的lastContent、lastCallMd5字段。 com.alibaba.nacos.client.config.impl.CacheData#safeNotifyListener
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {final Listener listener listenerWrap.listener;Runnable job new Runnable() {Overridepublic void run() {ClassLoader myClassLoader Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader listener.getClass().getClassLoader();try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter (AbstractSharedListener) listener;adapter.fillContext(dataId, group);LOGGER.info([{}] [notify-context] dataId{}, group{}, md5{}, name, dataId, group, md5);}// 执行回调之前先将线程classloader设置为具体webapp的classloader以免回调方法中调用spi接口是出现异常或错用多应用部署才会有该问题。Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);String contentTmp cr.getContent();/*** see com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListener(java.lang.String, java.lang.String)*/listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {Map data ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent content;}listenerWrap.lastCallMd5 md5;LOGGER.info([{}] [notify-ok] dataId{}, group{}, md5{}, listener{} , name, dataId, group, md5,listener);} catch (NacosException ex) {LOGGER.error([{}] [notify-error] dataId{}, group{}, md5{}, listener{} errCode{} errMsg{},name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());} catch (Throwable t) {LOGGER.error([{}] [notify-error] dataId{}, group{}, md5{}, listener{} tx{}, name, dataId,group, md5, listener, t.getCause());} finally {Thread.currentThread().setContextClassLoader(myClassLoader);}}};final long startNotify System.currentTimeMillis();try {if (null ! listener.getExecutor()) {listener.getExecutor().execute(job);} else {job.run();}} catch (Throwable t) {LOGGER.error([{}] [notify-error] dataId{}, group{}, md5{}, listener{} throwable{}, name, dataId,group, md5, listener, t.getCause());}final long finishNotify System.currentTimeMillis();LOGGER.info([{}] [notify-listener] time cost{}ms in ClientWorker, dataId{}, group{}, md5{}, listener{} ,name, (finishNotify - startNotify), dataId, group, md5, listener);
}监听器要执行的方法我们上面也已经讲过了主要就是发布RefreshEvent事件。至此Nacos的处理流程已经结束了RefreshEvent事件主要由 SpringCloud相关类来处理。
RefreshEvent事件处理
RefreshEvent事件会由RefreshEventListener来处理。
org.springframework.cloud.endpoint.event.RefreshEventListener#onApplicationEvent
public void onApplicationEvent(ApplicationEvent event) {if (event instanceof ApplicationReadyEvent) {handle((ApplicationReadyEvent) event);}else if (event instanceof RefreshEvent) {handle((RefreshEvent) event);}
}委托给ContextRefresher来刷新容器中的配置。 org.springframework.cloud.endpoint.event.RefreshEventListener#handle(org.springframework.cloud.endpoint.event.RefreshEvent)
public void handle(RefreshEvent event) {if (this.ready.get()) { // dont handle events before app is readylog.debug(Event received event.getEventDesc());SetString keys this.refresh.refresh();log.info(Refresh keys changed: keys);}
}org.springframework.cloud.context.refresh.ContextRefresher#refresh
public synchronized SetString refresh() {SetString keys refreshEnvironment();this.scope.refreshAll();return keys;
}refreshEnvironment()会去刷新Spring环境变量实际上是交给addConfigFilesToEnvironment()方法去做的刷新具体刷新思想就是重新创建一个新的Spring容器然后将这个新容器中的环境信息设置到原有的Spring环境中。拿到所有变化的配置项后发布一个环境变化的 EnvironmentChangeEvent事件。
org.springframework.cloud.context.refresh.ContextRefresher#refreshEnvironment
public synchronized SetString refreshEnvironment() {MapString, Object before extract(this.context.getEnvironment().getPropertySources());addConfigFilesToEnvironment();SetString keys changes(before,extract(this.context.getEnvironment().getPropertySources())).keySet();this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));return keys;
}/* For testing. */ ConfigurableApplicationContext addConfigFilesToEnvironment() {ConfigurableApplicationContext capture null;try {StandardEnvironment environment copyEnvironment(this.context.getEnvironment());SpringApplicationBuilder builder new SpringApplicationBuilder(Empty.class).bannerMode(Mode.OFF).web(WebApplicationType.NONE).environment(environment);// Just the listeners that affect the environment (e.g. excluding logging// listener because it has side effects)builder.application().setListeners(Arrays.asList(new BootstrapApplicationListener(),new ConfigFileApplicationListener()));capture builder.run();if (environment.getPropertySources().contains(REFRESH_ARGS_PROPERTY_SOURCE)) {environment.getPropertySources().remove(REFRESH_ARGS_PROPERTY_SOURCE);}MutablePropertySources target this.context.getEnvironment().getPropertySources();String targetName null;for (PropertySource? source : environment.getPropertySources()) {String name source.getName();if (target.contains(name)) {targetName name;}if (!this.standardSources.contains(name)) {if (target.contains(name)) {target.replace(name, source);}else {if (targetName ! null) {target.addAfter(targetName, source);// update targetName to preserve orderingtargetName name;}else {// targetName was null so we are at the start of the listtarget.addFirst(source);targetName name;}}}}}finally {ConfigurableApplicationContext closeable capture;while (closeable ! null) {try {closeable.close();}catch (Exception e) {// Ignore;}if (closeable.getParent() instanceof ConfigurableApplicationContext) {closeable (ConfigurableApplicationContext) closeable.getParent();}else {break;}}}return capture;
}org.springframework.cloud.context.scope.refresh.RefreshScope#refreshAll
public void refreshAll() {super.destroy();this.context.publishEvent(new RefreshScopeRefreshedEvent());
}Value注解的属性要实现热更新就需要配合RefreshScope注解被RefreshScope注解的对象的作用域为RefreshScope这种对象不是存在Spring容器的一级缓存中而是存在GenericScope对象的cache属性中当配置变更时会清空缓存在cache属性的对象这样Bean下次使用时就会被重新创建从而从Environment中获取最新的配置。 org.springframework.cloud.context.scope.GenericScope#destroy()
public void destroy() {ListThrowable errors new ArrayListThrowable();// 清空缓存CollectionBeanLifecycleWrapper wrappers this.cache.clear();for (BeanLifecycleWrapper wrapper : wrappers) {try {Lock lock this.locks.get(wrapper.getName()).writeLock();lock.lock();try {wrapper.destroy();}finally {lock.unlock();}}catch (RuntimeException e) {errors.add(e);}}if (!errors.isEmpty()) {throw wrapIfNecessary(errors.get(0));}this.errors.clear();
}