如何做淘客网站源码,视觉传达设计主要学什么,在哪里可以发布自己的广告,海盐市网站建设Nacos服务发现的方式
1.客户端获取 1.1:先是故障转移机制判断是否去本地文件中读取信息#xff0c;读到则返回 1.2:再去本地服务列表读取信息(本地缓存)#xff0c;没读到则创建一个空的服务#xff0c;然后立刻去nacos中读取更新 1.3:读到了就返回#xff0c;同时开启定时…Nacos服务发现的方式
1.客户端获取 1.1:先是故障转移机制判断是否去本地文件中读取信息读到则返回 1.2:再去本地服务列表读取信息(本地缓存)没读到则创建一个空的服务然后立刻去nacos中读取更新 1.3:读到了就返回同时开启定时更新定时向服务端同步信息 正常1s异常最多60s一次 2.服务端通过GRPC推送 建立长连接、当服务发现变更的时候往订阅了服务的客户端推送事件
SpringBoot自动注入
项目启动的时候会通过自动注入的机制将 NacosDiscoveryClientConfiguration注入 当注入NacosDiscoveryClientConfiguration的时候会将DiscoveryClient一起注入Bean DiscoveryClient实现了SpringCloud的DiscoveryClient接口重点是getInstances和getServices方法而且都是由NacosServiceDiscovery实现
获取实例信息
NacosDiscoveryClient private NacosServiceDiscovery serviceDiscovery;public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {this.serviceDiscovery nacosServiceDiscovery;}Overridepublic ListServiceInstance getInstances(String serviceId) {try {return serviceDiscovery.getInstances(serviceId);}catch (Exception e) {throw new RuntimeException(Can not get hosts from nacos server. serviceId: serviceId, e);}}public ListServiceInstance getInstances(String serviceId) throws NacosException {String group discoveryProperties.getGroup();ListInstance instances namingService().selectInstances(serviceId, group,true);return hostToServiceInstanceList(instances, serviceId);}NacosServiceDiscovery public ListServiceInstance getInstances(String serviceId) throws NacosException {//获取分组String group discoveryProperties.getGroup();//查询服务下的实例ListInstance instances namingService().selectInstances(serviceId, group,true);//填充返回的实例信息数据return hostToServiceInstanceList(instances, serviceId);}Overridepublic ListInstance selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {return selectInstances(serviceName, groupName, healthy, true);}Overridepublic ListInstance selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe)throws NacosException {return selectInstances(serviceName, groupName, new ArrayListString(), healthy, subscribe);}Overridepublic ListInstance selectInstances(String serviceName, String groupName, ListString clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;String clusterString StringUtils.join(clusters, ,);// 默认是订阅的if (subscribe) {//从缓存中获取实例信息serviceInfo serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);//如果获取不到则从服务端拉取if (null serviceInfo) {serviceInfo clientProxy.subscribe(serviceName, groupName, clusterString);}} else {// 如果未订阅服务信息则直接从服务器进行查询serviceInfo clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);}//获取服务中的实例信息return selectInstances(serviceInfo, healthy);}从缓存中拿数据 public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug(failover-mode: {}, failoverReactor.isFailoverSwitch());String groupedServiceName NamingUtils.getGroupedName(serviceName, groupName);String key ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}return serviceInfoMap.get(key);}获取服务的实例信息 private ListInstance selectInstances(ServiceInfo serviceInfo, boolean healthy) {ListInstance list;if (serviceInfo null || CollectionUtils.isEmpty(list serviceInfo.getHosts())) {return new ArrayListInstance();}IteratorInstance iterator list.iterator();while (iterator.hasNext()) {Instance instance iterator.next();// 保留 健康、启用、权重大于0 的实例if (healthy ! instance.isHealthy() || !instance.isEnabled() || instance.getWeight() 0) {iterator.remove();}}return list;}GRPC请求拉取服务实例信息 Overridepublic ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {NAMING_LOGGER.info([SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} , serviceName, groupName, clusters);String serviceNameWithGroup NamingUtils.getGroupedName(serviceName, groupName);String serviceKey ServiceInfo.getKey(serviceNameWithGroup, clusters);//定时同步服务端serviceInfoserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);//获取ServiceInfo 信息ServiceInfo result serviceInfoHolder.getServiceInfoMap().get(serviceKey);//如果没有则从服务端拿if (null result || !isSubscribed(serviceName, groupName, clusters)) {//GRPC请求result grpcClientProxy.subscribe(serviceName, groupName, clusters);}//填充进Map中 这里可以看服务注册最后那部分代码最后也是调用serviceInfoHolder保存的serviceInfoHolder.processServiceInfo(result);return result;}定时同步服务端ServiceInfo public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);if (futureMap.get(serviceKey) ! null) {return;}synchronized (futureMap) {if (futureMap.get(serviceKey) ! null) {return;}//构建任务放到ScheduledFuture执行ScheduledFuture? future addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}}缓存订阅信息 public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {//拿服务当keyString key ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);//构建需要缓存的订阅信息SubscriberRedoData redoData SubscriberRedoData.build(serviceName, groupName, cluster);//缓存订阅信息synchronized (subscribes) {subscribes.put(key, redoData);}}执行订阅 public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {//构建订阅请求SubscribeServiceRequest request new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);//执行订阅SubscribeServiceResponse response requestToServer(request, SubscribeServiceResponse.class);//设置已订阅redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();}设置订阅信息已订阅 public void subscriberRegistered(String serviceName, String groupName, String cluster) {String key ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);synchronized (subscribes) {SubscriberRedoData redoData subscribes.get(key);// 标记订阅数据已订阅if (null ! redoData) {redoData.setRegistered(true);}}}Nacos订阅机制
Nacos的订阅机制如果用一句话来描述就是Nacos客户端通过一个定时任务每6秒从注册中心获取实例列表当发现实例发生变化时发布变更事件订阅者进行业务处理。该更新实例的更新实例该更新本地缓存的更新本地缓存。 UpdateTask public class UpdateTask implements Runnable {public void run() {long delayTime DEFAULT_DELAY;try {//校验订阅任务是否不对 如果不对就不处理 if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info(update task is stopped, service:{}, clusters:{}, groupedServiceName, clusters);isCancel true;return;}//从缓存中拿 Service信息ServiceInfo serviceObj serviceInfoHolder.getServiceInfoMap().get(serviceKey);//如果拿不到则去服务端拉取if (serviceObj null) {serviceObj namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); //然后再填充进缓存 serviceInfoHolder.processServiceInfo(serviceObj);//更新下事件lastRefTime serviceObj.getLastRefTime();return;}// 过期服务服务的最新更新时间小于等于缓存刷新时间从注册中心重新查询if (serviceObj.getLastRefTime() lastRefTime) {//服务过期了重新查serviceObj namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); //在缓存进去serviceInfoHolder.processServiceInfo(serviceObj);}// 刷新更新时间lastRefTime serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}// 下次更新缓存时间设置默认为6秒// TODO multiple time can be configured.delayTime serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败数量为0resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn([NA] failed to update serviceName: {}, groupedServiceName, e);} finally {// 下次调度刷新时间下次执行的时间与failCount有关// failCount0则下次调度时间为6秒最长为1分钟// 即当无异常情况下缓存实例的刷新时间是6秒if (!isCancel) {executor.schedule(this, Math.min(delayTime failCount, DEFAULT_DELAY * 60),TimeUnit.MILLISECONDS);}}}}实例变更事件处理
监听事件的注册
在NacosNamingService的subscribe方法中通过如下方式进行了监听事件的注册 Overridepublic void subscribe(String serviceName, String groupName, ListString clusters, EventListener listener)throws NacosException {if (null listener) {return;}String clusterString StringUtils.join(clusters, ,);changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString);}这里的changeNotifier.registerListener便是进行具体的事件注册逻辑 public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {String key ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);ConcurrentHashSetEventListener eventListeners listenerMap.get(key);//这里用到了双重检查锁机制if (eventListeners null) {synchronized (lock) {eventListeners listenerMap.get(key);if (eventListeners null) {eventListeners new ConcurrentHashSetEventListener();listenerMap.put(key, eventListeners);}}}eventListeners.add(listener);}可以看出事件的注册便是将EventListener存储在InstancesChangeNotifier的listenerMap属性当中了。
这里的数据结构为Mapkey为服务实例信息的拼接value为监听事件的集合。
监听服务变更事件
因为UpdateTask 中假如没有从缓存中拿到服务信息则会通过grpc协议从服务端拉取然后会执行serviceInfoHolder.processServiceInfo方法缓存服务信息当实例发生变化的话这个方法最终会发送一个InstancesChangeEvent 事件 所以这里会监听InstancesChangeEvent 事件进行处理
InstancesChangeNotifier
public class InstancesChangeNotifier extends SubscriberInstancesChangeEvent {private final MapString, ConcurrentHashSetEventListener listenerMap new ConcurrentHashMapString, ConcurrentHashSetEventListener();Overridepublic void onEvent(InstancesChangeEvent event) {String key ServiceInfo.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());ConcurrentHashSetEventListener eventListeners listenerMap.get(key);if (CollectionUtils.isEmpty(eventListeners)) {return;}for (final EventListener listener : eventListeners) {//[] final com.alibaba.nacos.api.naming.listener.Event namingEvent transferToNamingEvent(event);final com.alibaba.nacos.api.naming.listener.Event namingEvent new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(),instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());// 最终调度执行listener.onEvent(namingEvent)只在NacosWatch#start找到了有效的EventListener见下文if (listener instanceof AbstractEventListener ((AbstractEventListener) listener).getExecutor() ! null) {((AbstractEventListener) listener).getExecutor().execute(() - listener.onEvent(namingEvent));} else {listener.onEvent(namingEvent);}}}
}
}
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean {private MapString, EventListener listenerMap new ConcurrentHashMap(16);private final AtomicBoolean running new AtomicBoolean(false);public void start() {if (this.running.compareAndSet(false, true)) {EventListener eventListener (EventListener)this.listenerMap.computeIfAbsent(this.buildKey(), (event) - {return new EventListener() {public void onEvent(Event event) {if (event instanceof NamingEvent) {List instances ((NamingEvent)event).getInstances();//[] Optional instanceOptional NacosWatch.this.selectCurrentInstance(instances);// 按IP和端口选择第一个instance作为当前的instanceOptional instanceOptional instances.stream().filter((instance) - {return this.properties.getIp().equals(instance.getIp()) this.properties.getPort() instance.getPort();}).findFirst()instanceOptional.ifPresent((currentInstance) - {//[] NacosWatch.this.resetIfNeeded(currentInstance);// 重新设置properties的metadataif (!this.properties.getMetadata().equals(instance.getMetadata())) {this.properties.setMetadata(instance.getMetadata());}});}}};});}}获取服务信息
NacosDiscoveryClient Overridepublic ListString getServices() {try {return serviceDiscovery.getServices();}catch (Exception e) {log.error(get service name from nacos server fail,, e);return Collections.emptyList();}}public ListString getServices() throws NacosException {//获取分组String group discoveryProperties.getGroup();//获取服务信息ListViewString services namingService().getServicesOfServer(1,Integer.MAX_VALUE, group);return services.getData();}Overridepublic ListViewString getServicesOfServer(int pageNo, int pageSize, String groupName) throws NacosException {return getServicesOfServer(pageNo, pageSize, groupName, null);}Overridepublic ListViewString getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector)throws NacosException {return clientProxy.getServiceList(pageNo, pageSize, groupName, selector);}GRPC拉取信息 Overridepublic ListViewString getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)throws NacosException {//构建请求ServiceListRequest request new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);if (selector ! null) {if (SelectorType.valueOf(selector.getType()) SelectorType.label) {request.setSelector(JacksonUtils.toJson(selector));}}//采用GRPC协议拉取信息ServiceListResponse response requestToServer(request, ServiceListResponse.class);ListViewString result new ListViewString();result.setCount(response.getCount());result.setData(response.getServiceNames());return result;}服务端处理GRPC请求
接收获取服务的请求
ServiceListRequestHandler OverrideSecured(action ActionTypes.READ)public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {//根据命名空间获取这个命名空间下的所有服务信息 erviceManager.getInstance().getSingletons这个方法服务注册的时候里有CollectionService serviceSet ServiceManager.getInstance().getSingletons(request.getNamespace());//构建返回信息ServiceListResponse result ServiceListResponse.buildSuccessResponse(0, new LinkedList());//服务信息不等于空填充返回信息if (!serviceSet.isEmpty()) {CollectionString serviceNameSet selectServiceWithGroupName(serviceSet, request.getGroupName());// TODO select service by selectorListString serviceNameList ServiceUtil.pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);result.setCount(serviceNameSet.size());result.setServiceNames(serviceNameList);}return result;}订阅服务请求
SubscribeServiceRequestHandler OverrideSecured(action ActionTypes.READ)public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {//命名空间String namespaceId request.getNamespace();//服务名称String serviceName request.getServiceName();//分组名称String groupName request.getGroupName();String app request.getHeader(app, unknown);String groupedServiceName NamingUtils.getGroupedName(serviceName, groupName);//构建服务信息Service service Service.newService(namespaceId, groupName, serviceName, true);//组装订阅请求Subscriber subscriber new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),namespaceId, groupedServiceName, 0, request.getClusters());//获取健康的实例 ServiceInfo serviceInfo ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), //服务元数据信息 metadataManager.getServiceMetadata(service).orElse(null), subscriber);//是否订阅 if (request.isSubscribe()) {clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());} else {clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());}//构建返回数据return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), success, serviceInfo);}发送订阅事件 后续事件监听可参考服务事件处理的那篇文章 Overridepublic void subscribeService(Service service, Subscriber subscriber, String clientId) {Service singleton ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Client client clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));}发送取消订阅事件 后续事件监听可参考服务事件处理的那篇文章 Overridepublic void unsubscribeService(Service service, Subscriber subscriber, String clientId) {Service singleton ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Client client clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}client.removeServiceSubscriber(singleton);client.setLastUpdatedTime();NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId));}服务查询请求
ServiceQueryRequestHandler OverrideSecured(action ActionTypes.READ)public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {//获取命名空间String namespaceId request.getNamespace();//分组明String groupName request.getGroupName();//服务名String serviceName request.getServiceName();//创建服务信息Service service Service.newService(namespaceId, groupName, serviceName);//集群String cluster null request.getCluster() ? : request.getCluster();boolean healthyOnly request.isHealthyOnly();//获取服务信息ServiceInfo result serviceStorage.getData(service);//获取服务元数据信息ServiceMetadata serviceMetadata metadataManager.getServiceMetadata(service).orElse(null);// 获取有保护机制的健康实例result ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,meta.getClientIp());//构建返回信息return QueryServiceResponse.buildSuccessResponse(result);}public ServiceInfo getData(Service service) {return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);}public OptionalServiceMetadata getServiceMetadata(Service service) {return Optional.ofNullable(serviceMetadataMap.get(service));}