外贸网站建设内容包括哪些,新手学做网站的书,做网站可以临摹吗,怎样建设个人游戏网站nacos的服务发现主要采用服务端主动推送客户端定时拉取#xff1b;心跳机制通过每5s向服务端发送心跳任务来保活#xff0c;当超过15s服务端未接收到心跳任务时#xff0c;将该实例设置为非健康状态#xff1b;当超过30s时#xff0c;删除该实例。 1.服务发现 nacos主要采… nacos的服务发现主要采用服务端主动推送客户端定时拉取心跳机制通过每5s向服务端发送心跳任务来保活当超过15s服务端未接收到心跳任务时将该实例设置为非健康状态当超过30s时删除该实例。 1.服务发现 nacos主要采用服务端主动推送客户端定时拉取来保证AP架构的高可用性。 NacosNamingService服务发现的接口客户端
每个nacos实例有本地缓存Map存放所有的实例 调用查询服务实例列表 /nacos/v1/ns/instance/list NacosNamingService.getAllInstances() - getServiceInfo() 核心方法是getServiceInfo代码如下
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug(failover-mode: failoverReactor.isFailoverSwitch());String key ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null serviceObj) {// 如果拿到为空则更新服务列表serviceObj new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error([getServiceInfo] serviceName: serviceName , clusters: clusters, e);}}}}scheduleUpdateIfAbsent(serviceName, clusters);// 定时拉取return serviceInfoMap.get(serviceObj.getKey());// 从map拿} 本地从服务端拉取服务实例放到本地缓存中
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final MapString, String params new HashMapString, String(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(clusters, clusters);params.put(udpPort, String.valueOf(udpPort));params.put(clientIP, NetUtils.localIP());params.put(healthyOnly, String.valueOf(healthyOnly));return reqApi(UtilAndComs.nacosUrlBase /instance/list, params, HttpMethod.GET);// 调用接口}
客户端通过定时任务--定时拉取服务端的实例更新本地缓存在发起服务调用的时候才会执行定时任务
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) ! null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) ! null) {return;}ScheduledFuture? future addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}}
2.心跳机制 NamingService-register 注册实例的时候客户端会有一个心跳任务定时5s向服务端发送心跳 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
BeatTask就是心跳任务 调用Instance/beat接口发送实例心跳检查 BeatTask#run的代码如下 Overridepublic void run() {if (beatInfo.isStopped()) {return;}long nextTime beatInfo.getPeriod();try {JsonNode result serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval result.get(clientBeatInterval).asLong();boolean lightBeatEnabled false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled lightBeatEnabled;if (interval 0) {nextTime interval;}int code NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code result.get(CommonParams.CODE).asInt();}if (code NamingResponseCode.RESOURCE_NOT_FOUND) {Instance instance new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.warn([CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {},JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());} catch (Exception unknownEx) {NAMING_LOGGER.error([CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {},JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);} finally {executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);// 定时任务}} 服务端会更新心跳的时间每隔5秒检查一下实例健康状态 对于集群而言在一台服务端执行定时任务即可不需要再所有节点都执行定时任务 当实例下线后服务端通过InstanceController检查健康状态健康任务检查。代码入口
createEmptyService-createServiceIfAbsent-putServiceAndInit-init-scheduleCheck定时任务 if (System.currentTimeMillis() - instance.getLastBeat() instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断 如果超过15秒设置实例为非健康更新健康状态 if (System.currentTimeMillis() - instance.getLastBeat() instance.getIpDeleteTimeout()) {// 如果已经过去了30s直接删掉这个机器删除实例 asyncHttpRequest(url, headers, paramValues, callback, HttpMethod.DELETE); 核心代码如下
Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}ListInstance instances service.allIPs(true);// 获取服务端的所有实例// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);// 如果超过15s健康设置为falseLoggers.EVT_LOG.info({POS} {IP-DISABLED} valid: {}:{}{}{}, region: {}, msg: client timeout after {}, last beat: {},instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info([AUTO-DELETE-IP] service: {}, ip: {}, service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn(Exception while processing client beat time out., e);}}
3.服务事件变动
客户端每隔5秒从服务端拉取一次信息是不是有延迟
对于AP架构而言有一点延迟 无所谓。Ribbon会进行重试 当服务端的服务列表变化时服务端会主动推送给客户端信息 核心代码入口notifier#run-handle-onChange
onChange更新注册表的相关代码 getPushService().serviceChanged(this);// 发布 服务变化 的事件通知客户端
public void updateIPs(CollectionInstance instances, boolean ephemeral) {MapString, ListInstance ipMap new HashMap(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList());}for (Instance instance : instances) {try {if (instance null) {Loggers.SRV_LOG.error([NACOS-DOM] received malformed ip: null);continue;}if (StringUtils.isEmpty(instance.getClusterName())) {instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn(cluster: {} not found, ip: {}, will create new cluster with default configuration.,instance.getClusterName(), instance.toJson());Cluster cluster new Cluster(instance.getClusterName(), this);cluster.init();getClusterMap().put(instance.getClusterName(), cluster);}ListInstance clusterIPs ipMap.get(instance.getClusterName());if (clusterIPs null) {clusterIPs new LinkedList();ipMap.put(instance.getClusterName(), clusterIPs);}clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error([NACOS-DOM] failed to process ip: instance, e);}}for (Map.EntryString, ListInstance entry : ipMap.entrySet()) {//make every ip mineListInstance entryIPs entry.getValue();clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);// 核心注册逻辑}setLastModifiedMillis(System.currentTimeMillis());getPushService().serviceChanged(this);// 发布 服务变化 的事件StringBuilder stringBuilder new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append(_).append(instance.isHealthy()).append(,);}Loggers.EVT_LOG.info([IP-UPDATED] namespace: {}, service: {}, ips: {}, getNamespaceId(), getName(),stringBuilder.toString());}
通过全文搜索事件找到这个方法PushService#onApplicationEvent它来处理事件。代码如下
udpPush(ackEntry);// 服务端给客户端发送udp信息通知客户端实例变动
Override
public void onApplicationEvent(ServiceChangeEvent event) {Service service event.getService();String serviceName service.getName();String namespaceId service.getNamespaceId();Future future GlobalExecutor.scheduleUdpSender(() - {try {Loggers.PUSH.info(serviceName is changed, add it to push queue.);ConcurrentMapString, PushClient clients clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}MapString, Object cache new HashMap(16);long lastRefTime System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug(client is zombie: client.toString());clients.remove(client.toString());Loggers.PUSH.debug(client is zombie: client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug(push serviceName: {} to client: {}, serviceName, client.toString());String key getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData null;MapString, Object data null;if (switchDomain.getDefaultPushCacheMillis() 20000 cache.containsKey(key)) {org.javatuples.Pair pair (org.javatuples.Pair) cache.get(key);compressData (byte[]) (pair.getValue0());data (MapString, Object) pair.getValue1();Loggers.PUSH.debug([PUSH-CACHE] cache hit: {}:{}, serviceName, client.getAddrStr());}if (compressData ! null) {ackEntry prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry ! null) {cache.put(key, new org.javatuples.Pair(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info(serviceName: {} changed, schedule push for: {}, agent: {}, key: {},client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry null ? null : ackEntry.key));udpPush(ackEntry);// 给客户端发送udp信息}} catch (Exception e) {Loggers.PUSH.error([NACOS-PUSH] failed to push serviceName: {} to client, error: {}, serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
客户端接收udp报文的代码如下
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug(failover-mode: failoverReactor.isFailoverSwitch());String key ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null serviceObj) {// 如果拿到为空则更新服务列表serviceObj new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);// upd的端口