东莞营销网站建设优化,微信服务号菜单链接网站怎么做,中企动力z邮箱登录,如何进行百度推广1.绪论
为了解决并发问题#xff0c;我们可以通过加锁的方式来保证数据的一致性#xff0c;比如java中的synchronize关键字或者ReentrantLock#xff0c;但是他们只能在同一jvm进程上加锁。现在的项目基本上都是分布式系统#xff0c;如何对多个java实例进行加锁#xff…1.绪论
为了解决并发问题我们可以通过加锁的方式来保证数据的一致性比如java中的synchronize关键字或者ReentrantLock但是他们只能在同一jvm进程上加锁。现在的项目基本上都是分布式系统如何对多个java实例进行加锁这就需要用到分布式锁。分布式锁可以由多种实现方式本文将要介绍的就是采用redis实现的方式。
2.分布式锁简单实现
2.1 基本原理 redis的分布式锁实现其实是由redis中的setnx命令来实现的当线程1需要获取锁时只需要调用setnx lockkey val如果设置成功便表示加锁成功。线程2到达时同样调用setnx lockkey val方法但是发现redis中已经存在lockkey时便加锁失败。线程1执行完业务逻辑过后便可以调用del key删除对应的key这样便完成解锁。 setnx key val 表示当redis中不存在这个key便设置成功否者设置失败。 3.如果set key成功但是del key失败怎么办
3.1 问题
如果线程1在setnx key val调用成功过后并且执行完业务逻辑宕机了此时相当于key便一直存在redis中这样其他线程便一直不能获取到锁导致死锁。
3.2 解决方案
我们可以在给key设置一个过期时间这样利用redis过期策略进行兜底就算线程1set key成功后宕机了因为有过期时间也能保证在一段时间后这个key会被删除掉。
setnx key val
expire key timeout
可以看出可以使用上面两个命令来对key设置过期时间但是还是有问题就是set key和expire key之间不是一个原子操作这样会导致在加锁的时候会有并发问题。所以我们相当redis中的lua脚本来保证set key和expire key之间是一个原子操作。对于string字符串redis提供了一个符合命令来实现上面两个命令的功能
set key val NX PX timeout
4 如何解决锁误删问题并实现可重入锁功能
4.1 问题描述
4.1.1 锁误删问题 1.线程1获取到锁过后由于设置了可以的超时时间锁过期。
2.线程2来获取锁便能获取成功。
3.线程1执行完成释放锁调用del key删除key成功。
4.线程3加锁set key成功但是此时线程2还在执行。
其实上面本质上就是线程释放了不是自己加的锁。
4.1.2 可重入锁
在java中的reentrantLock和syncronized都有可重入功能即线程在获取到锁过后能够再次获取当前锁并且可冲入次数加1如果释放锁时可重入次数减1。
4.2 解决方案
既然上面线程释放了不是自己加的锁导致锁误删问题我们可以在加锁是将线程id记录到key中这样每次释放锁的时候判断一下是否是本线程程加的锁如果不是便直接返回如果是便释放锁就可以了。
而对于可重入问题我们在记录线程id的时候我们可以记录一下重入次数每次重入的时候重入加1释放锁的时候重入次数减1减为0便删除key就可以了。
针对上面两点我们可以采用hash结构来存储key其中field为线程idvalue为重入次数。
所以加锁变为:
//获取重入次数
local time hget key threadId
//如果未加锁,设置重入次数为1
if(nil time) thenhset key threadId 1
//如果已经加锁设置重入次数自增
else hincr key threadId
end
expire key 过期时间
释放锁为
//获取可重入次数
local time hget key threadId
//如果没有threadId这个feild表示该key已经过期不用删除
if(nil time) then return 0
end
//表示当前只加锁了一次删除锁
if (time 2) thendel key
else
//否者重入次数减1hincr key threadId -1
end
4.3 代码实现
4.3.1 java代码
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name name;this.stringRedisTemplate stringRedisTemplate;}private static final String KEY_PREFIX lock:;private static final String ID_PREFIX UUID.randomUUID().toString(true) -;private static final DefaultRedisScriptLong UNLOCK_SCRIPT;private static final DefaultRedisScriptLong LOCK_SCRIPT;static {LOCK_SCRIPT new DefaultRedisScript();LOCK_SCRIPT.setLocation(new ClassPathResource(lock.lua));LOCK_SCRIPT.setResultType(Long.class);UNLOCK_SCRIPT new DefaultRedisScript();UNLOCK_SCRIPT.setLocation(new ClassPathResource(unlock.lua));UNLOCK_SCRIPT.setResultType(Long.class);}Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标示String threadId ID_PREFIX Thread.currentThread().getId();// 调用lua脚本Long result stringRedisTemplate.execute(LOCK_SCRIPT,Collections.singletonList(KEY_PREFIX name),ID_PREFIX Thread.currentThread().getId(),timeoutSec);if (result null) {return false;}return 1 result;}Overridepublic void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX name),ID_PREFIX Thread.currentThread().getId());}
} 注意java和lua的交互execute方法
/*** param script lua脚本* param keys 需要操作的key在lua脚本中用KEYS数组来接收从1开始* param args 其他参数在lua脚本中用ARGVS数组来接收从1开始* param T* return*/
public T T execute(RedisScriptT script, ListK keys, Object... args) {return this.scriptExecutor.execute(script, keys, args);
} 4.3.2 加锁lua脚本
--1.获取参数列表
--加锁key在keys数组中
local lockKey KEYS[1]
--线程id
local threadId ARGV[1]
--超时时间
local timeout ARGV[2]--2.获取重入次数
local time redis.call(hget, lockKey, threadId)--3.如果未加锁,设置重入次数为1
if (nil time) thenredis.call(hset, lockKey, threadId, 1)--如果已经加锁设置重入次数自增
elseredis.call(hincr, lockKey, threadId)
end
--4.设置过期时间
redis.call(expire, lockKey, timeout)
return 1
4.3.3 释放锁lua脚本
--1.获取参数列表
--加锁key在keys数组中
local lockKey KEYS[1]
--线程id
local threadId ARGV[1]
--2.获取重入次数
local time redis.call(hget, lockKey, threadId)
--3.如果没有threadId这个feild表示该key已经过期不用删除
if(nil time) thenreturn 1
end
--4.如果重入次数为1表示当前只加锁了一次删除锁
if (time 2) thenredis.call(del, lockKey)
else
--5.否者重入次数减1redis.call(hincr, lockKey,threadId)
end
return 1 lua脚本调用redis命令 redis.call(命令名称, key, 其它参数, ...) 5.未获取锁时进行重试实现
5.1 问题描述
前面实现的分布式锁时当线程1获取锁时线程2尝试获取锁失败便会直接返回失败。我们如果要线程2在获取锁失败后在一段时间内尝试获取锁如果超所该时间才返回失败应该如何实现呢
5.2 问题解决
在线程获取锁失败时我们可以进行自旋直到获取锁成功为止但是这样会消耗资源。所以我们可以通过redis的发布订阅机制当线程获取锁失败过后订阅加锁的key然后阻塞。当其他线程释放锁的时候会给我们发送一个通知唤醒当前线程。 //订阅某个频道 SUBSCRIBE channel [channel ...] 5.3 redisson源码分析 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//获取线程等待时间long time unit.toMillis(waitTime);long current System.currentTimeMillis();long threadId Thread.currentThread().getId();//尝试获取锁如果获取失败返回当前key还有多久过期Long ttl tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl null) {return true;}//等待时间扣减从当前方法进入到执行到这里花费时间time - System.currentTimeMillis() - current;//如果小于0表示已经过期if (time 0) {acquireFailed(waitTime, unit, threadId);return false;}current System.currentTimeMillis();//定于当前keyRFutureRedissonLockEntry subscribeFuture subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) - {if (e null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time - System.currentTimeMillis() - current;if (time 0) {acquireFailed(waitTime, unit, threadId);return false;}//真正的等待逻辑while (true) {long currentTime System.currentTimeMillis();//再次获取锁如果失败得到锁还有多久过期ttl tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl null) {return true;}time - System.currentTimeMillis() - currentTime;if (time 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime System.currentTimeMillis();if (ttl 0 ttl time) {//如果key被删除会pulish消息唤醒当前线程subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time - System.currentTimeMillis() - currentTime;if (time 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}
// return get(tryLockAsync(waitTime, leaseTime, unit));}
6.如何解决锁续期问题
6.1 问题描述
我们在实现分布式锁的时候会设置过期时间超过这个过期时间这个key便会被自动删除。假设超过这个过期时间当前业务逻辑还未执行完成这样其他线程能拿到锁会导致并发问题。
6.2 解决方案
我们可以设置一个线程专门用来监听当前业务逻辑是否完成如果未完成便对key的时间进行续期在redission实现的分布式锁中这个线程被称作watchDog。我们来看看redisson中是如何实现的。
6.3 redisson源码分析 private T RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//如果设置锁自动释放时间不等于-1走正常逻辑if (leaseTime ! -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}
//如果锁自动释放时间等于-1开启watchDog并且设置锁自动释放时间为30sRFutureLong ttlRemainingFuture tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) - {if (e ! null) {return;}// lock acquiredif (ttlRemaining null) {//启动watchDog线程为当前线程进行锁续期scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}
加锁的流程:可以看出redission实现的分布式锁的lua逻辑其实和我们上面是差不多的。 T RFutureT tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {internalLockLeaseTime unit.toMillis(leaseTime);return evalWriteAsync(getName(), LongCodec.INSTANCE, command,if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);,Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
锁续期流程: private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry new ExpirationEntry();//其实就是将当前的线程id封装成一个entry并且加入到一个Map中然后调用renewExpiration方法ExpirationEntry oldEntry EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry ! null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);renewExpiration();}} private void renewExpiration() {//从当前map中获取监听的entryExpirationEntry ee EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee null) {return;}//启动一个定时任务Timeout task commandExecutor.getConnectionManager().newTimeout(new TimerTask() {Overridepublic void run(Timeout timeout) throws Exception {//从map中取出需要续约的线程idExpirationEntry ent EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent null) {return;}Long threadId ent.getFirstThreadId();if (threadId null) {return;}//将线程id的过期时间重置为30sRFutureBoolean future renewExpirationAsync(threadId);future.onComplete((res, e) - {if (e ! null) {log.error(Cant update lock getName() expiration, e);return;}if (res) {// reschedule itself//递归调用一直监听存活的线程renewExpiration();}});}//时间为10会触发一次定时任务}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}
简单而言其实就是启动一个定时线程一直扫描存活的key如果未过期便重置其过期时间为30s。可以看出当线程调用了unlock方法便会停止锁续期。
6.4watchDog的几个问题
6.4.1 客户端宕机watchDog是否会一直续期
答案是不会的如果客户端宕机证明当前jvm实例已经挂掉所以执行watchDog的线程自然也挂掉了。
6.4.2 unLock失败watchDog是否会一直续期
答案是不会的在redisson的解锁方法中会用到一个CompletionStage它能保证无论删除key是否抛出异常都能将当前线程id的锁续期任务从EXPIRATION_RENEWAL_MAP中移除。
7.redis主从一致性
7.1 问题描述
如果redis采用单机部署的话redis宕机导致整个服务都不可用。如果redis采用集群部署但是会有主动不同步的问题比如线程1将key加入到主节点但是主节点还未将数据同步到从节点宕机选举从节点为新的主节点这个主节点并没有线程1设置的key导致线程安全问题。\
7.2 联锁
7.2.1 实现原理 其实就是部署多个redis单击实例当加锁的时候向每个redis实例都发送setnx key val请求当所有的redis实例都返回成功才认为成功。
7.3 红锁 联锁是要求所有的锁加锁成功才表示加锁成功而红锁是只需要满足大于n/21个节点加锁成功便成功。