找人做网站协议,html电影网页制作模板,乐清营销网站,有限公司怎么注册公司为什么需要缓存变更通知机制#xff1f;如果我们使用的是本地缓存或者多级缓存#xff08;本地缓存远程缓存#xff09;#xff0c;当其中一个节点的本地缓存变更之后#xff0c;为了保证缓存尽量的一致性#xff0c;此时其他节点的本地缓存也需要去变更#xff0c;这时… 为什么需要缓存变更通知机制如果我们使用的是本地缓存或者多级缓存本地缓存远程缓存当其中一个节点的本地缓存变更之后为了保证缓存尽量的一致性此时其他节点的本地缓存也需要去变更这时候常用的做法就是需要接入一个通知机制只要有节点的本地缓存变更了那么这时候就广播出一个缓存变更消息其他节点会去订阅这个消息最终消费这个消息的时候把本地缓存的数据进行更新或删除。当然通知机制的实现有很多比如可以利用redis的发布订阅机制也可以接入mq这种消息中间件作为消息通知不过相比之下mq就稍微重了点所以JetCache则是使用redis的发布订阅机制去实现缓存变更通知机制 缓存监视器CacheMonitor
FunctionalInterface
public interface CacheMonitor {void afterOperation(CacheEvent event);}
如果我们想要监控缓存的每一次操作或者在缓存的每一次操作的时候进行一些埋点扩展那么JetCache提供了CacheMonitor接口可以让我们用户自己去进行一些自定义的实现当然在JetCache内部也有默认的一些缓存监视器的实现
DefaultCacheMonitor
该monitor主要是监控缓存的每一次操作比如getputremove等基于监控这些操作之下就可以采集到各种操作指标例如缓存命中次数获取缓存次数加载数据次数等等详细的采集的指标如下
protected String cacheName;
protected long statStartTime;
protected long statEndTime;protected long getCount;
protected long getHitCount;
protected long getMissCount;
protected long getFailCount;
protected long getExpireCount;
protected long getTimeSum;
protected long minGetTime Long.MAX_VALUE;
protected long maxGetTime 0;protected long putCount;
protected long putSuccessCount;
protected long putFailCount;
protected long putTimeSum;
protected long minPutTime Long.MAX_VALUE;
protected long maxPutTime 0;protected long removeCount;
protected long removeSuccessCount;
protected long removeFailCount;
protected long removeTimeSum;
protected long minRemoveTime Long.MAX_VALUE;
protected long maxRemoveTime 0;protected long loadCount;
protected long loadSuccessCount;
protected long loadFailCount;
protected long loadTimeSum;
protected long minLoadTime Long.MAX_VALUE;
protected long maxLoadTime 0;
CacheNotifyMonitor
该monitor看名字大概就知道它的作用是用来监控缓存变更之后发送通知的监控的缓存操作有4种分别是putputAllremoveremoveAll这4个操作都是会让缓存产生变更的操作代码如下
Override
public void afterOperation(CacheEvent event) {if (this.broadcastManager null) {return;}AbstractCache absCache CacheUtil.getAbstractCache(event.getCache());if (absCache.isClosed()) {return;}AbstractEmbeddedCache localCache getLocalCache(absCache);if (localCache null) {return;}// put事件if (event instanceof CachePutEvent) {CacheMessage m new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CachePutEvent e (CachePutEvent) event;m.setType(CacheMessage.TYPE_PUT);m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});broadcastManager.publish(m);}// remove事件else if (event instanceof CacheRemoveEvent) {CacheMessage m new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CacheRemoveEvent e (CacheRemoveEvent) event;m.setType(CacheMessage.TYPE_REMOVE);m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});broadcastManager.publish(m);}// putAll事件else if (event instanceof CachePutAllEvent) {CacheMessage m new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CachePutAllEvent e (CachePutAllEvent) event;m.setType(CacheMessage.TYPE_PUT_ALL);if (e.getMap() ! null) {m.setKeys(e.getMap().keySet().stream().map(k - convertKey(k, localCache)).toArray());}broadcastManager.publish(m);}// removeAll事件else if (event instanceof CacheRemoveAllEvent) {CacheMessage m new CacheMessage();m.setArea(area);m.setCacheName(cacheName);m.setSourceId(sourceId);CacheRemoveAllEvent e (CacheRemoveAllEvent) event;m.setType(CacheMessage.TYPE_REMOVE_ALL);if (e.getKeys() ! null) {m.setKeys(e.getKeys().stream().map(k - convertKey(k, localCache)).toArray());}broadcastManager.publish(m);}
}
缓存变更消息发布器/订阅器BroadcastManager
上面我们知道了JetCache中会通过CacheNotifyMonitor来监控缓存的变更但是发送缓存变更的消息通知则是交给BroadcastManager去做的。BroadcastManager是一个抽象类提供了两个抽象方法
/*** 发布缓存变更消息* param cacheMessage cacheMessage*/
public abstract CacheResult publish(CacheMessage cacheMessage);/*** 订阅缓存变更消息*/
public abstract void startSubscribe();
publish方法是发布广播缓存变更消息的方法传入的CacheMessage参数就是缓存变更消息而startSubscribe方法则是订阅缓存变更消息的这两个方法具体由子类进行实现目前JetCache中主要提供了4种实现这4种实现都是基于redis的订阅发布机制实现的只是使用的redis客户端不一样的区别
LettuceBroadcastManagerRedisBroadcastManagerRedissonBroadcastManagerSpringDataBroadcastManager
我们这里可以以RedissonBroadcastManager为例去看它是怎样实现缓存变更消息通知机制的
public class RedissonBroadcastManager extends BroadcastManager {private static final Logger logger LoggerFactory.getLogger(RedissonBroadcastManager.class);private final RedissonCacheConfig?, ? config;private final String channel;private final RedissonClient client;private volatile int subscribeId;private final ReentrantLock reentrantLock new ReentrantLock();public RedissonBroadcastManager(final CacheManager cacheManager, final RedissonCacheConfig?, ? config) {super(cacheManager);checkConfig(config);this.config config;this.channel config.getBroadcastChannel();this.client config.getRedissonClient();}Overridepublic void startSubscribe() {reentrantLock.lock();try {if (this.subscribeId 0 Objects.nonNull(this.channel) !this.channel.isEmpty()) {this.subscribeId this.client.getTopic(this.channel).addListener(byte[].class, (channel, msg) - processNotification(msg, this.config.getValueDecoder()));}}finally {reentrantLock.unlock();}}Overridepublic void close() {reentrantLock.lock();try {final int id;if ((id this.subscribeId) 0 Objects.nonNull(this.channel)) {this.subscribeId 0;try {this.client.getTopic(this.channel).removeListener(id);} catch (Throwable e) {logger.warn(unsubscribe {} fail, this.channel, e);}}}finally {reentrantLock.unlock();}}Overridepublic CacheResult publish(final CacheMessage cacheMessage) {try {if (Objects.nonNull(this.channel) Objects.nonNull(cacheMessage)) {final byte[] msg this.config.getValueEncoder().apply(cacheMessage);this.client.getTopic(this.channel).publish(msg);return CacheResult.SUCCESS_WITHOUT_MSG;}return CacheResult.FAIL_WITHOUT_MSG;} catch (Throwable e) {SquashedLogger.getLogger(logger).error(jetcache publish error, e);return new CacheResult(e);}}
}
首先当调用Cache实例的putputAllremoveremoveAll这些方法的时候就会去触CacheNotifyMonitorCacheNotifyMonitor这时候就会去创建一个缓存变更信息然后交给RedissonBroadcastManager去调用publish方法在publish方法就会使用redisson的发布api去广播出这条缓存变更消息当然订阅也是使用redisson的订阅api当监听到有缓存变更消息的时候这时候就会去回调processNotification方法该方法是父类BroadcastManager的方法
protected void processNotification(byte[] message, Functionbyte[], Object decoder) {try {if (message null) {logger.error(notify message is null);return;}Object value decoder.apply(message);if (value null) {logger.error(notify message is null);return;}if (value instanceof CacheMessage) {processCacheMessage((CacheMessage) value);} else {logger.error(the message is not instance of CacheMessage, class{}, value.getClass());}} catch (Throwable e) {SquashedLogger.getLogger(logger).error(receive cache notify error, e);}
}private void processCacheMessage(CacheMessage cacheMessage) {// 条件成立说明这个缓存消息是自己发送的这时候不用处理if (sourceId.equals(cacheMessage.getSourceId())) {return;}// 根据area和cacheName从缓存管理器中获取到对应的Cache实例Cache cache cacheManager.getCache(cacheMessage.getArea(), cacheMessage.getCacheName());if (cache null) {logger.warn(Cache instance not exists: {},{}, cacheMessage.getArea(), cacheMessage.getCacheName());return;}// 如果这个Cache实例不是一个多级缓存的Cache实例那么就直接return因为这里主要是针对多级缓存的情况下进行处理的Cache absCache CacheUtil.getAbstractCache(cache);if (!(absCache instanceof MultiLevelCache)) {logger.warn(Cache instance is not MultiLevelCache: {},{}, cacheMessage.getArea(), cacheMessage.getCacheName());return;}// 获取到多级缓存中所有的Cache实例Cache[] caches ((MultiLevelCache) absCache).caches();// 获取到缓存有变动的keySetObject keys Stream.of(cacheMessage.getKeys()).collect(Collectors.toSet());// 遍历所有的Cache实例for (Cache c : caches) {Cache localCache CacheUtil.getAbstractCache(c);// 只针对本地缓存的Cache实例把缓存有变动的key从本地缓存中移除if (localCache instanceof AbstractEmbeddedCache) {((AbstractEmbeddedCache) localCache).__removeAll(keys);} else {break;}}
}
最终会调用到processCacheMessage方法需要注意的是如果是自己发送的消息则不需要再去消费所以会使用一个sourceId去进行过滤并且还会去判断当前的Cache实例是否是一个多级缓存实例如果是多级缓存那么就把这个多级缓存种的所有本地缓存都取出来然后最后根据缓存变更消息里面的key把对应的本地缓存都remove掉可能有人问为什么这里对本地缓存进行remove而不是update呢因为如果使用update的话还需要考虑并发的场景比如这个key进行了两次更新先后会发布两次缓存变更消息但是订阅者去消费这两个消息的时候并不能去保证它们的先后顺序此时就会有可能造成更新顺序的问题但是使用remove就可能完全避免这种并发问题了
总结
在JetCache中提供了对于缓存操作的后置扩展接口也叫做缓存监控器我们可以自己去实现自己的缓存监控器来对缓存操作之后做一些自定义的功能默认地JetCache提供了两个缓存监视器的实现一个是采集缓存操作的一些指标信息DefaultCacheMonitor一个是在缓存变更之后发送缓存变更通知的CacheNotifyMonitor其中CacheNotifyMonitor在监控到缓存变更之后会去使用BroadcastManager去进行消息的发布订阅BroadcastManager是发布者也是订阅者在订阅消费的时会根据缓存变更消息里面的keys去把当前Cache实例的本地缓存进行remove也就是依赖于这样的一个缓存变更消息通知机制就可以保证当使用多级缓存的时候多节点间的本地缓存尽量达成一致