当前位置: 首页 > news >正文

站长工具综合查询2020wordpress网站会员太多

站长工具综合查询2020,wordpress网站会员太多,什么是网络营销概念,沈阳seo关键词排名优质博文#xff1a;IT-BLOG-CN 一、CAP 基础 ‌Nacos作为注册中心同时支持CP和AP模式。‌ Nacos通过不同的协议和机制来实现这两种模式#xff0c;以满足不同的需求场景。 在Nacos中#xff0c;默认情况下使用的是AP模式#xff0c;通过Distro协议来实现。AP模式主要关…优质博文IT-BLOG-CN 一、CAP 基础 ‌Nacos作为注册中心同时支持CP和AP模式。‌ Nacos通过不同的协议和机制来实现这两种模式以满足不同的需求场景。 在Nacos中默认情况下使用的是AP模式通过Distro协议来实现。AP模式主要关注高可用性在网络分区时仍然保持服务但可能会允许短暂的数据不一致。 此外Nacos也支持CP模式通过Raft协议来实现。CP模式在网络分区时牺牲可用性以保证数据一致性适用于对数据准确性要求高的场景如金融行业。 具体实现上Nacos通过客户端设置spring.cloud.nacos.discovery.ephemeral的值为false来启用CP模式默认为true则为AP模式。此外Nacos的gRPC通信端口和集群节点之间的通信端口也有所不同分别用于AP和CP模式的实现‌。 二、Nacos AP 实现原理 Distro协议。Distro是阿里巴巴开源的一个动态服务发现、配置管理和服务管理平台。目前流行的Nacos服务管理框架就采用了Distro协议。Distro协议被定位为临时数据的一致性协议 该类型协议 不需要把数据存储到磁盘或者数据库因为临时数据通常和服务器保持一个session会话该会话只要存在数据就不会丢失。 Distro协议保证写必须永远是成功的即使可能会发生网络分区。当网络恢复时把各数据分片的数据进行合并。 Distro协议具有以下特点 【1】数据分片Sharding    ☑️ 将数据根据某种规则如哈希分片每个节点负责一部分数据的存储和管理。    ☑️ 这种分片策略可以有效地分散负载避免单点瓶颈。 【2】数据复制Replication    ☑️ 在每个节点上维护一份完整的数据副本确保数据在节点之间的一致性。    ☑️ 使用异步复制的方式在数据更新时将更新信息广播给其他节点。 【3】一致性管理    ☑️ 采用最终一致性模型确保数据在一定时间内达到一致。    ☑️ 使用版本号或时间戳来管理数据的更新确保数据的最新状态能够传播到所有节点。 【4】心跳检测和故障转移    ☑️ 定期进行心跳检测确保节点的健康状态。    ☑️ 在节点故障时其他节点能够快速接管其数据和职责确保系统的高可用性。 Distro协议服务端节点发现使用寻址机制来实现服务端节点的管理。在Nacos中寻址模式有三种 在Nacos服务启动的时候ServerMemberManager这个类专门对集群节点进行管理的这个类在init方法中就会对集群进行初始化 protected void init() throws NacosException {Loggers.CORE.info(Nacos-related cluster resource initialization);// 得到当前nacos服务的端口号this.port EnvUtil.getProperty(server.port, Integer.class, 8848);// 得到当前nacos服务的地址this.localAddress InetUtils.getSelfIP() : port;// 解析地址得到当前nacos服务所对应的集群节点对象this.self MemberUtil.singleParse(this.localAddress);// 给当前nacos服务设置一个版本号this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);// 把自己放到serverList中serverList.put(self.getAddress(), self);// 该方法做了两件事// 1.注册了一个集群节点信息变更事件// 2.注册了订阅IPChangeEvent事件的事件订阅者registerClusterEvent();// 初始化节点地址寻址模式// 在这里就可以通过配置的节点地址去初始化整个nacos集群节点集合了initAndStartLookup();if (serverList.isEmpty()) {throw new NacosException(NacosException.SERVER_ERROR, cannot get serverlist, so exit.);}Loggers.CORE.info(The cluster resource is initialized); }注册了订阅IPChangeEvent事件的事件订阅者registerClusterEvent该方法做了两件事 【1】注册了一个集群节点信息变更事件。 【2】注册了订阅IPChangeEvent事件的事件订阅者。 private void registerClusterEvent() {// 注册一个集群节点信息变更事件NotifyCenter.registerToPublisher(MembersChangeEvent.class,EnvUtil.getProperty(nacos.member-change-event.queue.size, Integer.class, 128));// 注册一个事件订阅者订阅的事件类型是IPChangeEventNotifyCenter.registerSubscriber(new SubscriberInetUtils.IPChangeEvent() {Overridepublic void onEvent(InetUtils.IPChangeEvent event) {String newAddress event.getNewIP() : port;ServerMemberManager.this.localAddress newAddress;EnvUtil.setLocalAddress(localAddress);Member self ServerMemberManager.this.self;self.setIp(event.getNewIP());String oldAddress event.getOldIP() : port;ServerMemberManager.this.serverList.remove(oldAddress);ServerMemberManager.this.serverList.put(newAddress, self);ServerMemberManager.this.memberAddressInfos.remove(oldAddress);ServerMemberManager.this.memberAddressInfos.add(newAddress);}Overridepublic Class? extends Event subscribeType() {return InetUtils.IPChangeEvent.class;}}); }通过上述方法分析可知注册一个MembersChangeEvent事件而对应的事件订阅者是ServerListManager。同时该方法还会注册一个IPChangeEvent事件的事件订阅者IPChangeEvent这个事件就是当前节点IP发生变更之后发布的该事件发布之后会被这个注册的订阅者所捕获该订阅者做的事情也很简单就是对集群节点集合中对应当前节点的ip进行更新就行了。 这一行代码就是初始化集群的关键 创建nacos集群节点寻址器 private void initAndStartLookup() throws NacosException {this.lookup LookupFactory.createLookUp(this);this.lookup.start(); }先是通过LookupFactory创建一个节点寻址器然后调用start方法启动这个节点寻址器。 通过上图可知Nacos配置集群节点地址的时候有两种方式读取本地配置文件 和 通过配置服务器 /*** 创建nacos集群节点寻址器** param memberManager {link ServerMemberManager}* return {link MemberLookup}* throws NacosException NacosException*/ public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {// 条件成立当前nacos节点是集群模式if (!EnvUtil.getStandaloneMode()) {// 从配置环境中获取nacos集群节点的寻址方式String lookupType EnvUtil.getProperty(LOOKUP_MODE_TYPE);LookupType type chooseLookup(lookupType);// 得到对应的节点寻址器FileConfigMemberLookup / AddressServerMemberLookupLOOK_UP find(type);currentLookupType type;}// 条件成立当前nacos节点是单机模式else {LOOK_UP new StandaloneMemberLookup();}LOOK_UP.injectMemberManager(memberManager);Loggers.CLUSTER.info(Current addressing mode selection : {}, LOOK_UP.getClass().getSimpleName());return LOOK_UP; }创建nacos集群节点寻址器(参考流程图) /*** 创建nacos集群节点寻址器** param memberManager {link ServerMemberManager}* return {link MemberLookup}* throws NacosException NacosException*/ public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {// 条件成立当前nacos节点是集群模式if (!EnvUtil.getStandaloneMode()) {// 从配置环境中获取nacos集群节点的寻址方式String lookupType EnvUtil.getProperty(LOOKUP_MODE_TYPE);LookupType type chooseLookup(lookupType);// 得到对应的节点寻址器FileConfigMemberLookup / AddressServerMemberLookupLOOK_UP find(type);currentLookupType type;}// 条件成立当前nacos节点是单机模式else {LOOK_UP new StandaloneMemberLookup();}LOOK_UP.injectMemberManager(memberManager);Loggers.CLUSTER.info(Current addressing mode selection : {}, LOOK_UP.getClass().getSimpleName());return LOOK_UP; }怎么确定是使用FileConfigMemberLookup还是AddressServerMemberLookup主要看是否配置了lookupType如果没有配置lookupType就按照默认的先看是否配置了集群配置文件。 private static LookupType chooseLookup(String lookupType) {if (StringUtils.isNotBlank(lookupType)) {LookupType type LookupType.sourceOf(lookupType);if (Objects.nonNull(type)) {return type;}}// 代码来到这里说明没有配置lookupType此时会默认去寻找user.home/nacos/conf/cluster.conf文件File file new File(EnvUtil.getClusterConfFilePath());// 条件成立集群配置文件存在或者环境变量配置了集群节点地址if (file.exists() || StringUtils.isNotBlank(EnvUtil.getMemberList())) {// 返回文件寻址模式return LookupType.FILE_CONFIG;}// 返回服务器寻址模式return LookupType.ADDRESS_SERVER; }确定了对应的节点寻址器FileConfigMemberLookup / AddressServerMemberLookup后获取对应的节点地址。如果当前节点是集群模式那么会去判断${user.home}/nacos/conf/cluster.conf这个文件是否存在或者环境变量中是否配置了集群节点地址如果两者有一个成立就是文件寻址模式反之是服务器寻址模式。 private static MemberLookup find(LookupType type) {// 条件成立集群配置方式是文件配置的方式if (LookupType.FILE_CONFIG.equals(type)) {// 创建一个FileConfigMemberLookup对象并返回LOOK_UP new FileConfigMemberLookup();return LOOK_UP;}// 条件成立集群配置方式是通过服务器获取节点地址的方式if (LookupType.ADDRESS_SERVER.equals(type)) {LOOK_UP new AddressServerMemberLookup();return LOOK_UP;}// unpossible to run herethrow new IllegalArgumentException(); }如何初始化集群 【1】单机模式 StandaloneMemberLookup 【2】文件模式 FileConfigMemberLookup利用监控cluster.conf文件的变动实现节点的管理。核心代码如下 public class FileConfigMemberLookup extends AbstractMemberLookup {/*** 文件监听回调*/private FileWatcher watcher new FileWatcher() {/*** 当对应的目录或者文件发生变更的时候会回调该方法* param event {link FileChangeEvent}*/Overridepublic void onChange(FileChangeEvent event) {readClusterConfFromDisk();}Overridepublic boolean interest(String context) {return StringUtils.contains(context, cluster.conf);}};Overridepublic void start() throws NacosException {if (start.compareAndSet(false, true)) {// 从文件中读取集群节点地址readClusterConfFromDisk();try {// 使用notify机制监控文件的变化并自动触发读取cluster.confWatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);} catch (Throwable e) {Loggers.CLUSTER.error(An exception occurred in the launch file monitor : {}, e.getMessage());}}}Overridepublic void destroy() throws NacosException {WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);}/*** 从cluster.conf文件中读取集群节点地址*/private void readClusterConfFromDisk() {CollectionMember tmpMembers new ArrayList();try {// 获取到cluster.conf文件中配置的节点地址列表ListString tmp EnvUtil.readClusterConf();// 把这些节点地址分别转换成对应的集群节点对象tmpMembers MemberUtil.readServerConf(tmp);} catch (Throwable e) {Loggers.CLUSTER.error(nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}, e.getMessage());}afterLookup(tmpMembers);} }在start方法中先调用readClusterConfFromDisk方法这个方法会读取${user.home}/nacos/conf/cluster.conf这个文件中配置的节点地址读取到之后把这些节点地址转化为对应的Member对象一个Member对象就代表一个节点接着会调用父类AbstractMemberLookup的afterLookup方法。 public void afterLookup(CollectionMember members) {this.memberManager.memberChange(members); }调用的是集群节点管理器的menberChange方法同时把上面从cluster.conf文件中读取到的Member节点集合作为方法参数 /*** 当nacos节点启动 或者 每次配置的集群节点地址发生改变的时候就会调用到该方法* param members 当前最新的集群节点地址* return 返回true表示集群节点数量发生改变了反之表示没改变*/ synchronized boolean memberChange(CollectionMember members) {if (members null || members.isEmpty()) {return false;}// 配置的集群节点地址是否包含当前nacos节点boolean isContainSelfIp members.stream().anyMatch(ipPortTmp - Objects.equals(localAddress, ipPortTmp.getAddress()));if (isContainSelfIp) {isInIpList true;} else {isInIpList false;members.add(this.self);Loggers.CLUSTER.warn([serverlist] self ip {} not in serverlist {}, self, members);}// If the number of old and new clusters is different, the cluster information// must have changed; if the number of clusters is the same, then compare whether// there is a difference; if there is a difference, then the cluster node changes// are involved and all recipients need to be notified of the node change eventboolean hasChange members.size() ! serverList.size();ConcurrentSkipListMapString, Member tmpMap new ConcurrentSkipListMap();SetString tmpAddressInfo new ConcurrentHashSet();for (Member member : members) {final String address member.getAddress();if (!serverList.containsKey(address)) {hasChange true;// 如果cluster.conf或address-server中的cluster信息被更改而对应的nacos-server还没有启动则成员的状态应该设置为DOWN// 如果相应的nacos-server已经启动则在几秒钟后检测到该成员的状态将被设置为UPmember.setState(NodeState.DOWN);} else {//fix issue # 4925member.setState(serverList.get(address).getState());}// Ensure that the node is created only oncetmpMap.put(address, member);if (NodeState.UP.equals(member.getState())) {tmpAddressInfo.add(address);}}// 更新serverList为最新的集群节点集合serverList tmpMap;// 更新memberAddressInfos为最新的集群节点地址memberAddressInfos tmpAddressInfo;// 获取更新之后集群所有的节点对象CollectionMember finalMembers allMembers();Loggers.CLUSTER.warn([serverlist] updated to : {}, finalMembers);// Persist the current cluster node information to cluster.conf// important need to put the event publication into a synchronized block to ensure// that the event publication is sequential// 条件成立1.集群中有节点增加了// 2.集群中有节点下线了// 3.手动增加或者删除了节点配置地址信息if (hasChange) {// 把最新的节点写入到配置中cluster.conf或者address-serverMemberUtil.syncToFile(finalMembers);// 发布一个MembersChangeEvent事件Event event MembersChangeEvent.builder().members(finalMembers).build();NotifyCenter.publishEvent(event);}return hasChange; }如果是集群节点的数量发生改变的话就会发布一个MembersChangeEvent事件而这个事件对应的订阅者是ServerListManager这个类在这个类中也保存了整个nacos集群所有的节点集合在回调它的订阅方法时很简单就是把这个集合属性重新赋值代码如下 /*** 当集群中的其他节点发生变化的时候会当前nacos节点就会发布一个MembersChangeEvent事件然后会调用该方法更新最新的集群信息集合* param event MembersChangeEvent*/ Override public void onEvent(MembersChangeEvent event) {// 把最新的集群节点集合重新赋值到serversthis.servers new ArrayList(event.getMembers()); }读取配置文件的过程已经分析完后对cluster.conf文件注册一个监听器当cluster.conf文件发生变更时就会触发FileWatcher中的onChange回调方法onChange回调方法会再次调用上面说的readClusterConfFromDisk方法重新读取一遍cluster.conf文件中的节点地址。 // 使用notify机制监控文件的变化并自动触发读取cluster.conf WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);public static synchronized boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException {checkState();if (NOW_WATCH_JOB_CNT MAX_WATCH_FILE_JOB) {return false;}WatchDirJob job MANAGER.get(paths);if (job null) {job new WatchDirJob(paths);job.start();MANAGER.put(paths, job);NOW_WATCH_JOB_CNT;}job.addSubscribe(watcher);return true; }/*** 文件监听回调*/ private FileWatcher watcher new FileWatcher() {/*** 当对应的目录或者文件发生变更的时候会回调该方法* param event {link FileChangeEvent}*/Overridepublic void onChange(FileChangeEvent event) {readClusterConfFromDisk();}Overridepublic boolean interest(String context) {return StringUtils.contains(context, cluster.conf);} };【3】服务器模式 AddressServerMemberLookup使用地址服务器存储节点信息服务端节点定时拉取信息进行管理 核心代码 使用地址服务器存储节点信息会创建AddressServerMemberLookup服务端定时拉取信息进行管理 public class AddressServerMemberLookup extends AbstractMemberLookup {private final GenericTypeRestResultString genericType new GenericTypeRestResultString() {};public String domainName;public String addressPort;public String addressUrl;public String envIdUrl;public String addressServerUrl;private volatile boolean isAddressServerHealth true;private int addressServerFailCount 0;private int maxFailCount 12;private final NacosRestTemplate restTemplate HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);private volatile boolean shutdown false;Overridepublic void start() throws NacosException {if (start.compareAndSet(false, true)) {this.maxFailCount Integer.parseInt(EnvUtil.getProperty(maxHealthCheckFailCount, 12));initAddressSys();run();}}/**** 获取服务器地址*/private void initAddressSys() {String envDomainName System.getenv(address_server_domain);if (StringUtils.isBlank(envDomainName)) {domainName EnvUtil.getProperty(address.server.domain, jmenv.tbsite.net);} else {domainName envDomainName;}String envAddressPort System.getenv(address_server_port);if (StringUtils.isBlank(envAddressPort)) {addressPort EnvUtil.getProperty(address.server.port, 8080);} else {addressPort envAddressPort;}String envAddressUrl System.getenv(address_server_url);if (StringUtils.isBlank(envAddressUrl)) {addressUrl EnvUtil.getProperty(address.server.url, EnvUtil.getContextPath() / serverlist);} else {addressUrl envAddressUrl;}addressServerUrl http:// domainName : addressPort addressUrl;envIdUrl http:// domainName : addressPort /env;Loggers.CORE.info(ServerListService address-server port: addressPort);Loggers.CORE.info(ADDRESS_SERVER_URL: addressServerUrl);}SuppressWarnings(PMD.UndefineMagicConstantRule)private void run() throws NacosException {// With the address server, you need to perform a synchronous member node pull at startup// Repeat three times, successfully jump outboolean success false;Throwable ex null;int maxRetry EnvUtil.getProperty(nacos.core.address-server.retry, Integer.class, 5);for (int i 0; i maxRetry; i) {try {//拉取集群节点信息syncFromAddressUrl();success true;break;} catch (Throwable e) {ex e;Loggers.CLUSTER.error([serverlist] exception, error : {}, ExceptionUtil.getAllExceptionMsg(ex));}}if (!success) {throw new NacosException(NacosException.SERVER_ERROR, ex);}//创建定时任务GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);}Overridepublic void destroy() throws NacosException {shutdown true;}Overridepublic MapString, Object info() {MapString, Object info new HashMap(4);info.put(addressServerHealth, isAddressServerHealth);info.put(addressServerUrl, addressServerUrl);info.put(envIdUrl, envIdUrl);info.put(addressServerFailCount, addressServerFailCount);return info;}private void syncFromAddressUrl() throws Exception {RestResultString result restTemplate.get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());if (result.ok()) {isAddressServerHealth true;Reader reader new StringReader(result.getData());try {afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));} catch (Throwable e) {Loggers.CLUSTER.error([serverlist] exception for analyzeClusterConf, error : {},ExceptionUtil.getAllExceptionMsg(e));}addressServerFailCount 0;} else {addressServerFailCount;if (addressServerFailCount maxFailCount) {isAddressServerHealth false;}Loggers.CLUSTER.error([serverlist] failed to get serverlist, error code {}, result.getCode());}}// 定时任务class AddressServerSyncTask implements Runnable {Overridepublic void run() {if (shutdown) {return;}try {//拉取服务列表syncFromAddressUrl();} catch (Throwable ex) {addressServerFailCount;if (addressServerFailCount maxFailCount) {isAddressServerHealth false;}Loggers.CLUSTER.error([serverlist] exception, error : {}, ExceptionUtil.getAllExceptionMsg(ex));} finally {GlobalExecutor.scheduleByCommon(this, 5_000L);}}} }初始全量同步 Distro协议节点启动时会从其他节点全量同步数据。在Nacos中整体流程如下 【1】启动一个定时任务线程DistroLoadDataTask加载数据调用load()方法加载数据。 /**** 数据加载过程*/ Override public void run() {try {//加载数据load();if (!checkCompleted()) {GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());} else {loadCallback.onSuccess();Loggers.DISTRO.info([DISTRO-INIT] load snapshot data success);}} catch (Exception e) {loadCallback.onFailed(e);Loggers.DISTRO.error([DISTRO-INIT] load snapshot data failed. , e);} }/**** 加载数据并同步* throws Exception*/ private void load() throws Exception {while (memberManager.allMembersWithoutSelf().isEmpty()) {Loggers.DISTRO.info([DISTRO-INIT] waiting server list init...);TimeUnit.SECONDS.sleep(1);}while (distroComponentHolder.getDataStorageTypes().isEmpty()) {Loggers.DISTRO.info([DISTRO-INIT] waiting distro data storage register...);TimeUnit.SECONDS.sleep(1);}//同步数据for (String each : distroComponentHolder.getDataStorageTypes()) {if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {//从远程机器上同步所有数据loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));}} }【2】调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的数据。 /**** 从远程机器上同步所有数据*/ private boolean loadAllDataSnapshotFromRemote(String resourceType) {DistroTransportAgent transportAgent distroComponentHolder.findTransportAgent(resourceType);DistroDataProcessor dataProcessor distroComponentHolder.findDataProcessor(resourceType);if (null transportAgent || null dataProcessor) {Loggers.DISTRO.warn([DISTRO-INIT] Cant find component for type {}, transportAgent: {}, dataProcessor: {},resourceType, transportAgent, dataProcessor);return false;}//遍历集群成员节点,不包括自己for (Member each : memberManager.allMembersWithoutSelf()) {try {Loggers.DISTRO.info([DISTRO-INIT] load snapshot {} from {}, resourceType, each.getAddress());//从远程节点加载数据调用http请求接口: distro/datums;DistroData distroData transportAgent.getDatumSnapshot(each.getAddress());//处理数据boolean result dataProcessor.processSnapshot(distroData);Loggers.DISTRO.info([DISTRO-INIT] load snapshot {} from {} result: {}, resourceType, each.getAddress(),result);if (result) {return true;}} catch (Exception e) {Loggers.DISTRO.error([DISTRO-INIT] load snapshot {} from {} failed., resourceType, each.getAddress(), e);}}return false; }【3】从namingProxy代理获取所有的数据data。  ● 构造http请求调用httpGet方法从指定的server获取数据。  ● 从获取的结果result中获取数据bytes。 /**** 从namingProxy代理获取所有的数据data从获取的结果result中获取数据bytes* param targetServer target server.* return*/ Override public DistroData getDatumSnapshot(String targetServer) {try {//从namingProxy代理获取所有的数据data从获取的结果result中获取数据bytesbyte[] allDatum NamingProxy.getAllData(targetServer);//将数据封装成DistroDatareturn new DistroData(new DistroKey(snapshot, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);} catch (Exception e) {throw new DistroException(String.format(Get snapshot from %s failed., targetServer), e);} }/*** Get all datum from target server.* NamingProxy.getAllData* 执行HttpGet请求并获取返回数据* param server target server address* return all datum byte array* throws Exception exception*/ public static byte[] getAllData(String server) throws Exception {//参数封装MapString, String params new HashMap(8);//组装URL并执行HttpGet请求获取结果集RestResultString result HttpClient.httpGet(http:// server EnvUtil.getContextPath() UtilsAndCommons.NACOS_NAMING_CONTEXT ALL_DATA_GET_URL,new ArrayList(), params);//返回数据if (result.ok()) {return result.getData().getBytes();}throw new IOException(failed to req API: http:// server EnvUtil.getContextPath() UtilsAndCommons.NACOS_NAMING_CONTEXT ALL_DATA_GET_URL . code: result.getCode() msg: result.getMessage()); }【4】处理数据同步到本地processData。  ● 从data反序列化出datumMap。  ● 把数据存储到dataStore也就是本地缓存dataMap。  ● 监听器不包括key就创建一个空的service并且绑定监听器。 /*** 数据处理并更新本地缓存* param data* return* throws Exception*/ private boolean processData(byte[] data) throws Exception {if (data.length 0) {//从data反序列化出datumMapMapString, DatumInstances datumMap serializer.deserializeMap(data, Instances.class);// 把数据存储到dataStore也就是本地缓存dataMapfor (Map.EntryString, DatumInstances entry : datumMap.entrySet()) {dataStore.put(entry.getKey(), entry.getValue());//监听器不包括key就创建一个空的service并且绑定监听器if (!listeners.containsKey(entry.getKey())) {// pretty sure the service not exist:if (switchDomain.isDefaultInstanceEphemeral()) {// create empty service//创建一个空的serviceLoggers.DISTRO.info(creating service {}, entry.getKey());Service service new Service();String serviceName KeyBuilder.getServiceName(entry.getKey());String namespaceId KeyBuilder.getNamespace(entry.getKey());service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(Constants.DEFAULT_GROUP);// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();// The Listener corresponding to the key value must not be empty// 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManagerRecordListener listener listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();if (Objects.isNull(listener)) {return false;}//为空的绑定监听器listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);}}}//循环所有datumMapfor (Map.EntryString, DatumInstances entry : datumMap.entrySet()) {if (!listeners.containsKey(entry.getKey())) {// Should not happen:Loggers.DISTRO.warn(listener of {} not found., entry.getKey());continue;}try {//执行监听器的onChange监听方法for (RecordListener listener : listeners.get(entry.getKey())) {listener.onChange(entry.getKey(), entry.getValue().value);}} catch (Exception e) {Loggers.DISTRO.error([NACOS-DISTRO] error while execute listener of key: {}, entry.getKey(), e);continue;}// Update data store if listener executed successfully:// 监听器listener执行成功后就更新dataStoredataStore.put(entry.getKey(), entry.getValue());}}return true; }【5】监听器listener执行成功后就更新data store。 增量同步: 新增数据使用异步广播同步服务注册的InstanceController.register()就是数据入口它会调用ServiceManager.registerInstance()执行数据同步的时候调用addInstance(),在该方法中会执行DistroConsistencyServiceImpl.put()该方法是增量同步的入口会调用distroProtocol.sync()方法代码如下 /**** 数据保存* param key key of data, this key should be globally unique* param value value of data* throws NacosException*/ Override public void put(String key, Record value) throws NacosException {//将数据存入到dataStore中onPut(key, value);//使用distroProtocol同步数据distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),DataOperation.CHANGE,globalConfig.getTaskDispatchPeriod() / 2); }【1】DistroProtocol使用sync()方法接收增量数据 public void sync(DistroKey distroKey, DataOperation action, long delay) {//向除了自己外的所有节点广播for (Member each : memberManager.allMembersWithoutSelf()) {DistroKey distroKeyWithTarget new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),each.getAddress());DistroDelayTask distroDelayTask new DistroDelayTask(distroKeyWithTarget, action, delay);//从distroTaskEngineHolder获取延时执行引擎并将distroDelayTask任务添加进来//执行延时任务发布distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug([DISTRO-SCHEDULE] {} to {}, distroKey, each.getAddress());}} }【2】向其他节点发布广播任务调用distroTaskEngineHolder发布延迟任务 /**** 构造函数指定任务处理器* param distroComponentHolder*/ public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {DistroDelayTaskProcessor defaultDelayTaskProcessor new DistroDelayTaskProcessor(this, distroComponentHolder);//指定任务处理器defaultDelayTaskProcessordelayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor); }【3】调用DistroDelayTaskProcessor.process()方法进行任务投递将延迟任务转换为异步变更任务 /**** 任务处理过程* param task task.* return*/ Override public boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask (DistroDelayTask) task;DistroKey distroKey distroDelayTask.getDistroKey();if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {//将延迟任务变更成异步任务异步任务对象是一个线程DistroSyncChangeTask syncChangeTask new DistroSyncChangeTask(distroKey, distroComponentHolder);//将任务添加到NacosExecuteTaskExecuteEngine中并执行distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;}return false; }【4】执行变更任务DistroSyncChangeTask.run()方法向指定节点发送消息 /**** 执行数据同步*/ Override public void run() {Loggers.DISTRO.info([DISTRO-START] {}, toString());try {//获取本地缓存数据String type getDistroKey().getResourceType();DistroData distroData distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());distroData.setType(DataOperation.CHANGE);//向其他节点同步数据boolean result distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());if (!result) {handleFailedTask();}Loggers.DISTRO.info([DISTRO-END] {} result: {}, toString(), result);} catch (Exception e) {Loggers.DISTRO.warn([DISTRO] Sync data change failed., e);handleFailedTask();} }● 调用DistroHttpAgent.syncData()方法发送数据  ● 调用NamingProxy.syncData()方法发送数据 /**** 向其他节点同步数据* param data data* param targetServer target server* return*/ Override public boolean syncData(DistroData data, String targetServer) {if (!memberManager.hasMember(targetServer)) {return true;}//获取数据字节数组byte[] dataContent data.getContent();//通过Http协议同步数据return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer()); }【5】异常任务调用handleFailedTask()方法进行处理  ● 调用DistroFailedTaskHandler处理失败任务  ● 调用DistroHttpCombinedKeyTaskFailedHandler将失败任务重新投递成延迟任务 private void handleFailedTask() {String type getDistroKey().getResourceType();DistroFailedTaskHandler failedTaskHandler distroComponentHolder.findFailedTaskHandler(type);if (null failedTaskHandler) {return;}// 重试失败任务failedTaskHandler.retry(getDistroKey(), ApplyAction.CHANGE); }
http://www.w-s-a.com/news/38691/

相关文章:

  • 深圳企业网站制作公司单位注册wordpress发送邮件
  • 兰州专业网站建设团队wordpress 拉取点击数
  • 基于php房产网站开发ppt模板免费下载第一ppt
  • 网站盈利模式分析怎么做山东营销网站建设联系方式
  • 二级网站建设 知乎我的个人主页模板
  • wordpress小说网站模板下载地址百度优化服务
  • 云南网页设计制作seo计费系统源码
  • 屏蔽ip网站吗行业外贸网站建设
  • 河北城乡建设学校网站常州网站建设公司平台
  • 合肥网站建设市场分析网站收录后怎么做排名
  • 湖南企业网站建设如何推广手机网站
  • 网站建设项目经历网站推广服务 商务服务
  • 加强网站的建设福州seo排名外包
  • 做婚庆找什么网站有专门为个人网站做推广的吗
  • 网站搭建要求模板学编程需要英语基础吗
  • 网上如何建网站卖量具净水机企业网站源码
  • 网站推广 软件规划设计公司年终总结
  • 视频网站开发方法微站网建站系统
  • 渐变网站网页界面设计的宗旨是什么
  • 网站排名提升工具免费韶关做网站公司
  • 做网站一个月可以赚多少钱东营市建设工程招标网
  • 网站开发工具阿里云怎么做网站
  • 用html做静态网站成都专业logo设计公司
  • 哪里有免费建站平台233小游戏网页版在线玩
  • 为什么我的网站做不起来网站能搜索到
  • 方又圆网站建设信息流广告二级代理
  • 公司管理网站首页网站后台编辑框不显示
  • aspnet网站开发模板备案 网站建设方案书
  • 营销软件网站wordpress优秀的破解主题
  • 卧龙区网站建设国内漂亮网站欣赏