徐州网站建站,小学生制作ppt的软件,建设行业年度峰会网站,app推广活动策划方案之前工作中一直使用redis来实现分布式锁#xff0c;但是最近项目使用了云弹性#xff0c;机器会涉及到扩缩容#xff0c;涉及到优雅停机的问题#xff0c;普通的redis分布锁#xff0c;一般使用时会设置锁的时间#xff0c;但是如果在加锁期间 JVM异常重启等发生会导致分… 之前工作中一直使用redis来实现分布式锁但是最近项目使用了云弹性机器会涉及到扩缩容涉及到优雅停机的问题普通的redis分布锁一般使用时会设置锁的时间但是如果在加锁期间 JVM异常重启等发生会导致分布式锁得不到及时释放即使机器重启还是获取不到分布式锁。因此决定使用一下Redisson来解决这个问题。
基于redis实现的分布式锁
加锁代码如下
public boolean tryGlobalLock(String key, Integer expireSeconds) {Long resultLong new ExecutorLong() {Overridepublic Long executor(String key, JedisCluster jedisCluster) {String status jedisCluster.set(key, GLOBAL_LOCK_VALUE, SetParams.setParams().nx().ex(expireSeconds null ? DEFAULT_LOCK_EXPIRE_TIME : expireSeconds));if (OK.equalsIgnoreCase(status)) {// 第一次设置,设置成功return 1L;} else {// 已经存在这个keyreturn 0L;}}}.run(key);return resultLong 1L;
}
一般使用流程如下 // 尝试获取分布式锁// 如果获取失败 则直接返回// 如果获取成功// 执行业务逻辑// 业务逻辑执行成功 要释放锁// 业务逻辑执行失败 要释放锁
如果在执行业务逻辑过程中 机器重启 优雅停机处理不合理 则会导致分布式锁不能及时释放机器重启后分布式锁仍获取不到需要等待锁过期失效。
基于redisson实现的分布式锁
引入依赖 dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.17.5/version/dependency
锁配置
Configuration
Slf4j
public class RedissonConfig {private String nodesString ;private String password ;Beanpublic Redisson redisson() {// 这里连接串是使用 逗号拼接的所以手动分隔一下String[] nodeArray nodesString.split(,);Config config new Config();// 使用redis集群配置ClusterServersConfig clusterServersConfig config.useClusterServers();for (String node : nodeArray) {clusterServersConfig.addNodeAddress(redis://node);}try {clusterServersConfig.setPassword(password);} catch (Exception exception) {log.error(init redisson fail ,exception);}return (Redisson) Redisson.create(config);}
}
redisson分布式锁的使用很简单 Autowiredprivate Redisson redisson;// 获取锁对象RLock lock redisson.getLock(lockName);logger.info(try get lock start key {} currentThread {}, messageManagerVO.getMsgType(), Thread.currentThread().getName());try {// 在指定时间范围内尝试加锁boolean flag lock.tryLock(tryGetSeconds, TimeUnit.SECONDS);logger.info(try get lock end key {} flag {} currentThread {}, messageManagerVO.getMsgType(), flag, Thread.currentThread().getName());if (flag) {// 模拟事务处理逻辑Thread.sleep(doBizSeconds * 1000 * 60);// 释放锁lock.unlock();logger.info(try release lock end key {}, currentThread {}, messageManagerVO.getMsgType(), Thread.currentThread().getName());}} catch (InterruptedException e) {logger.info(RedissonService tryGlobalLock exception, e);}
getLock方法获取锁对象
tryLock方法尝试加锁 不需要配置锁过期时间 没有执行unlock方法之前 锁会自动续约 如果线程中断 则锁会自动释放
unlock 释放锁
加锁是以当前线程来加锁的一但当前线程获取到 则其他线程不能获取锁。
redisson源码简读
加锁逻辑
tryLock方法依次进入
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return this.tryLock(waitTime, -1L, unit);
}
首先查看正常获取锁的逻辑 long time unit.toMillis(waitTime);long current System.currentTimeMillis();long threadId Thread.currentThread().getId();Long ttl this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl null) {return true;}
核心方法 tryAcquireAsync private T RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture ttlRemainingFuture;if (leaseTime 0L) {ttlRemainingFuture this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStageLong f ttlRemainingFuture.thenApply((ttlRemaining) - {if (ttlRemaining null) {if (leaseTime 0L) {this.internalLockLeaseTime unit.toMillis(leaseTime);} else {this.scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper(f);}
其中方法 T RFutureT tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);, Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});}
可以看到实际上是异步执行一个redis lua脚本(Lua脚本是redis已经内置的一种轻量小巧语言其执行是通过redis的eval /evalsha 命令来运行把操作封装成一个Lua脚本如论如何都是一次执行的原子操作)
if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end;
if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end;
return redis.call(pttl, KEYS[1]);
其中脚本中涉及redis基本命令如下
EXISTS 命令用于检查给定 key 是否存在 若 key 存在返回 1 否则返回 0
Hincrby 命令用于为哈希表中的字段值加上指定增量值如果哈希表的 key 不存在一个新的哈希表被创建并执行 HINCRBY 命令。如果指定的字段不存在那么在执行命令前字段的值被初始化为 0
PEXPIRE 命令以毫秒为单位设置 key 的生存时间 设置成功返回 1 key 不存在或设置失败返回 0
Hexists 命令用于查看哈希表的指定字段是否存在 如果哈希表含有给定字段返回 1 。 如果哈希表不含有给定字段或 key 不存在返回 0
Pttl 命令以毫秒为单位返回 key 的剩余过期时间 当 key 不存在时返回 -2 。 当 key 存在但没有设置剩余生存时间时返回 -1 。 否则以毫秒为单位返回 key 的剩余生存时间
参数含义如下
KEYS保存分布式锁的名称
ARGV[1] 对应KEYS过期时间 默认为30s
ARGV[2] 对应线程ID
// 如果第一次加锁 则key不存在 则创建key hashmap 并将线程ID 放入map中 设置为1 设置过期时间
if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end;
// 如果key已经存在 并且map中含有线程ID 则将线程ID加一 实现可重入锁 设置过期时间
if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end;
// 返回锁的剩余时间
return redis.call(pttl, KEYS[1]);
释放锁逻辑
unlock方法 public void unlock() {try {this.get(this.unlockAsync(Thread.currentThread().getId()));} catch (RedisException var2) {if (var2.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException)var2.getCause();} else {throw var2;}}} public RFutureVoid unlockAsync(long threadId) {// 释放锁RFutureBoolean future this.unlockInnerAsync(threadId);CompletionStageVoid f future.handle((opStatus, e) - {// 取消锁的续约逻辑this.cancelExpirationRenewal(threadId);if (e ! null) {throw new CompletionException(e);} else if (opStatus null) {IllegalMonitorStateException cause new IllegalMonitorStateException(attempt to unlock lock, not locked by current thread by node id: this.id thread-id: threadId);throw new CompletionException(cause);} else {return null;}});return new CompletableFutureWrapper(f);}
主要包括释放锁和取消锁续约
释放锁执行lua脚本
protected RFutureBoolean unlockInnerAsync(long threadId) {return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil;end; local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); if (counter 0) then redis.call(pexpire, KEYS[1], ARGV[2]); return 0; else redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1; end; return nil;, Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});}
// 如果线程ID在map中不存在 则直接返回nil
if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil;end;
// 如果线程ID在map中存在 则减一 返回当前对应的value值counter
local counter redis.call(hincrby, KEYS[1], ARGV[3], -1);
// 如果counter大于0 表示可重入锁没有全部释放完 则续约
if (counter 0) then redis.call(pexpire, KEYS[1], ARGV[2]); return 0;
else 如果 counter0 表示锁已经不存在 则直接删除keyredis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1; end;
return nil;
锁续约逻辑
redisson中数据结构(map)如下
lockName: 过期时间 线程ID 线程重入次数
由加锁逻辑可知 默认锁的过期时间为30s 后续会不断进行续约 保证锁不会释放
tryAcquireAsync方法中加锁之后会进行锁的续约
private T RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture ttlRemainingFuture;if (leaseTime 0L) {ttlRemainingFuture this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStageLong f ttlRemainingFuture.thenApply((ttlRemaining) - {if (ttlRemaining null) {if (leaseTime 0L) {this.internalLockLeaseTime unit.toMillis(leaseTime);} else {this.scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper(f);}
进入方法scheduleExpirationRenewal protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry new ExpirationEntry();ExpirationEntry oldEntry (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry ! null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {this.renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {this.cancelExpirationRenewal(threadId);}}}}
继续进入renewExpiration方法
private void renewExpiration() {ExpirationEntry ee (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee ! null) {Timeout task this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {ExpirationEntry ent (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());if (ent ! null) {Long threadId ent.getFirstThreadId();if (threadId ! null) {CompletionStageBoolean future RedissonBaseLock.this.renewExpirationAsync(threadId);future.whenComplete((res, e) - {if (e ! null) {RedissonBaseLock.log.error(Cant update lock RedissonBaseLock.this.getRawName() expiration, e);RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());} else {if (res) {RedissonBaseLock.this.renewExpiration();} else {RedissonBaseLock.this.cancelExpirationRenewal((Long)null);}}});}}}}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);ee.setTimeout(task);}}
锁续约的方法renewExpirationAsync
protected CompletionStageBoolean renewExpirationAsync(long threadId) {return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(pexpire, KEYS[1], ARGV[1]); return 1; end; return 0;, Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));}
LUA脚本
// 如果缓存中map含有当前线程ID 则重置缓存失效时间 默认30s
if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(pexpire, KEYS[1], ARGV[1]);return 1; end;
return 0;
取消锁续约逻辑
protected void cancelExpirationRenewal(Long threadId) {ExpirationEntry task (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (task ! null) {if (threadId ! null) {task.removeThreadId(threadId);}if (threadId null || task.hasNoThreads()) {Timeout timeout task.getTimeout();if (timeout ! null) {timeout.cancel();}EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());}}}
redisson围绕map 线程ID 重入次数 数据结构来通过lua脚本原子执行来保证分布式锁。其功能很强大 可以实现其他公平锁 读写锁等功能后面可以深入了解一下。
参考资料
最强分布式锁工具Redisson
https://github.com/redisson/redisson