用字母做logo的网站,外贸网络推广高手何在,智慧团建网站登陆,人力资源公司简介模板一、什么是分布式锁#xff1a;
1、什么是分布式锁#xff1a;
分布式锁#xff0c;即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题#xff0c;而分布式锁#xff0c;就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是
1、什么是分布式锁
分布式锁即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题而分布式锁就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是分布式系统中竞争共享资源的最小粒度从线程升级成了进程。
2、分布式锁应该具备哪些条件
在分布式系统环境下一个方法在同一时间只能被一个机器的一个线程执行高可用的获取锁与释放锁高性能的获取锁与释放锁具备可重入特性可理解为重新进入由多于一个任务并发使用而不必担心数据错误具备锁失效机制即自动解锁防止死锁具备非阻塞锁特性即没有获取到锁将直接返回获取锁失败
3、分布式锁的实现方式 基于数据库实现分布式锁基于Zookeeper实现分布式锁基于reids实现分布式锁 这篇文章就简单介绍下这几种分布式锁的实现重点讲解的是基于redis的分布式锁。
二、基于数据库的分布式锁
基于数据库的锁实现也有两种方式一是基于数据库表的增删另一种是基于数据库排他锁。
1、基于数据库表的增删
基于数据库表增删是最简单的方式首先创建一张锁的表主要包含下列字段类的全路径名方法名时间戳等字段。
具体的使用方式当需要锁住某个方法时往该表中插入一条相关的记录。类的全路径名方法名是有唯一性约束的如果有多个请求同时提交到数据库的话数据库会保证只有一个操作可以成功那么我们就认为操作成功的那个线程获得了该方法的锁可以执行方法体内容。执行完毕之后需要delete该记录。
这里只是简单介绍一下对于上述方案可以进行优化如应用主从数据库数据之间双向同步一旦挂掉快速切换到备库上做一个定时任务每隔一定时间把数据库中的超时数据清理一遍使用while循环直到insert成功再返回成功记录当前获得锁的机器的主机信息和线程信息下次再获取锁的时候先查询数据库如果当前机器的主机信息和线程信息在数据库可以查到的话直接把锁分配给他就可以了实现可重入锁
2、基于数据库排他锁
基于MySql的InnoDB引擎可以使用以下方法来实现加锁操作
public void lock(){ connection.setAutoCommit(false) int count 0; while(count 4){ try{ select * from lock where lock_namexxx for update; if(结果不为空){ //代表获取到锁 return; } }catch(Exception e){ } //为空或者抛异常的话都表示没有获取到锁 sleep(1000); count; } throw new LockException();}在查询语句后面增加for update数据库会在查询过程中给数据库表增加排他锁。获得排它锁的线程即可获得分布式锁当获得锁之后可以执行方法的业务逻辑执行完方法之后释放锁connection.commit()。当某条记录被加上排他锁之后其他线程无法获取排他锁并被阻塞。
3、基于数据库锁的优缺点
上面两种方式都是依赖数据库表一种是通过表中的记录判断当前是否有锁存在另外一种是通过数据库的排他锁来实现分布式锁。
优点是直接借助数据库简单容易理解。缺点是操作数据库需要一定的开销性能问题需要考虑。
三、基于Zookeeper的分布式锁
基于zookeeper临时有序节点可以实现的分布式锁。每个客户端对某个方法加锁时在zookeeper上的与该方法对应的指定节点的目录下生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单只需要判断有序节点中序号最小的一个。 当释放锁的时候只需将这个瞬时节点删除即可。同时其可以避免服务宕机导致的锁无法释放而产生的死锁问题。 第三方库有 CuratorCurator提供的InterProcessMutex是分布式锁的实现
Zookeeper实现的分布式锁存在两个个缺点
1性能上可能并没有缓存服务那么高因为每次在创建锁和释放锁的过程中都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行然后将数据同步到所有的Follower机器上。2zookeeper的并发安全问题因为可能存在网络抖动客户端和ZK集群的session连接断了zk集群以为客户端挂了就会删除临时节点这时候其他客户端就可以获取到分布式锁了。
四、基于redis的分布式锁
redis命令说明
1setnx命令set if not exists当且仅当 key 不存在时将 key 的值设为 value。若给定的 key 已经存在则 SETNX 不做任何动作。
返回1说明该进程获得锁将 key 的值设为 value返回0说明其他进程已经获得了锁进程不能进入临界区。
命令格式setnx lock.key lock.value
2get命令获取key的值如果存在则返回如果不存在则返回nil
命令格式get lock.key
3getset命令该方法是原子的对key设置newValue这个值并且返回key原来的旧值。
命令格式getset lock.key newValue
4del命令删除redis中指定的key
命令格式del lock.key
方案一基于set命令的分布式锁
1、加锁使用setnx进行加锁当该指令返回1时说明成功获得锁
2、解锁当得到锁的线程执行完任务之后使用del命令释放锁以便其他线程可以继续执行setnx命令来获得锁 1存在的问题假设线程获取了锁之后在执行任务的过程中挂掉来不及显示地执行del命令释放锁那么竞争该锁的线程都会执行不了产生死锁的情况。 2解决方案设置锁超时时间 3、设置锁超时时间setnx 的 key 必须设置一个超时时间以保证即使没有被显式释放这把锁也要在一定时间后自动释放。可以使用expire命令设置锁超时时间 1存在问题 setnx 和 expire 不是原子性的操作假设某个线程执行setnx 命令成功获得了锁但是还没来得及执行expire 命令服务器就挂掉了这样一来这把锁就没有设置过期时间了变成了死锁别的线程再也没有办法获得锁了。 2解决方案redis的set命令支持在获取锁的同时设置key的过期时间 4、使用set命令加锁并设置锁过期时间
命令格式set lock.key lock.value nx ex
详情参考redis使用文档
http://doc.redisfans.com/string/set.html
1存在问题
① 假如线程A成功得到了锁并且设置的超时时间是 30 秒。如果某些原因导致线程 A 执行的很慢过了 30 秒都没执行完这时候锁过期自动释放线程 B 得到了锁。
② 随后线程A执行完任务接着执行del指令来释放锁。但这时候线程 B 还没执行完线程A实际上删除的是线程B加的锁。
2解决方案
可以在 del 释放锁之前做一个判断验证当前的锁是不是自己加的锁。在加锁的时候把当前的线程 ID 当做value并在删除之前验证 key 对应的 value 是不是自己线程的 ID。但是这样做其实隐含了一个新的问题get操作、判断和释放锁是两个独立操作不是原子性。对于非原子性的问题我们可以使用Lua脚本来确保操作的原子性
5、锁续期这种机制类似于redisson的看门狗机制文章后面会详细说明
虽然步骤4避免了线程A误删掉key的情况但是同一时间有 AB 两个线程在访问代码块仍然是不完美的。怎么办呢我们可以让获得锁的线程开启一个守护线程用来给快要过期的锁“续期”。 ① 假设线程A执行了29 秒后还没执行完这时候守护线程会执行 expire 指令为这把锁续期 20 秒。守护线程从第 29 秒开始执行每 20 秒执行一次。 ② 情况一当线程A执行完任务会显式关掉守护线程。 ③ 情况二如果服务器忽然断电由于线程 A 和守护线程在同一个进程守护线程也会停下。这把锁到了超时的时候没人给它续命也就自动释放了。 方案二基于setnx、get、getset的分布式锁
1、实现原理
1setnx(lockkey, 当前时间过期超时时间) 如果返回1则获取锁成功如果返回0则没有获取到锁转向步骤(2)
2get(lockkey)获取值oldExpireTime 并将这个value值与当前的系统时间进行比较如果小于当前系统时间则认为这个锁已经超时可以允许别的请求重新获取转向步骤(3)
3计算新的过期时间 newExpireTime当前时间锁超时时间然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime
4判断 currentExpireTime 与 oldExpireTime 是否相等如果相等说明当前getset设置成功获取到了锁。如果不相等说明这个锁又被别的请求获取走了那么当前请求可以直接返回失败或者继续重试。
5在获取到锁之后当前线程可以开始自己的业务处理当处理完毕后比较自己的处理时间和对于锁设置的超时时间如果小于锁设置的超时时间则直接执行del命令释放锁释放锁之前需要判断持有锁的线程是不是当前线程如果大于锁设置的超时时间则不需要再锁进行处理。
2、代码实现
1获取锁的实现方式
public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException { acquireTimeout timeUnit.toMillis(acquireTimeout); long acquireTime acquireTimeout System.currentTimeMillis(); //使用J.U.C的ReentrantLock threadLock.tryLock(acquireTimeout, timeUnit); try { //循环尝试 while (true) { //调用tryLock boolean hasLock tryLock(); if (hasLock) { //获取锁成功 return true; } else if (acquireTime System.currentTimeMillis()) { break; } Thread.sleep(sleepTime); } } finally { if (threadLock.isHeldByCurrentThread()) { threadLock.unlock(); } } return false;} public boolean tryLock() { long currentTime System.currentTimeMillis(); String expires String.valueOf(timeout currentTime); //设置互斥量 if (redisHelper.setNx(mutex, expires) 0) { //获取锁设置超时时间 setLockStatus(expires); return true; } else { String currentLockTime redisUtil.get(mutex); //检查锁是否超时 if (Objects.nonNull(currentLockTime) Long.parseLong(currentLockTime) currentTime) { //获取旧的锁时间并设置互斥量 String oldLockTime redisHelper.getSet(mutex, expires); //旧值与当前时间比较 if (Objects.nonNull(oldLockTime) Objects.equals(oldLockTime, currentLockTime)) { //获取锁设置超时时间 setLockStatus(expires); return true; } } return false; }}tryLock方法中主要逻辑如下lock调用tryLock方法参数为获取的超时时间与单位线程在超时时间内获取锁操作将自旋在那里直到该自旋锁的保持者释放了锁。
2释放锁的实现方式
public boolean unlock() { //只有锁的持有线程才能解锁 if (lockHolder Thread.currentThread()) { //判断锁是否超时没有超时才将互斥量删除 if (lockExpiresTime System.currentTimeMillis()) { redisHelper.del(mutex); logger.info(删除互斥量[{}], mutex); } lockHolder null; logger.info(释放[{}]锁成功, mutex); return true; } else { throw new IllegalMonitorStateException(没有获取到锁的线程无法执行解锁操作); }}存在问题
1这个锁的核心是基于System.currentTimeMillis()如果多台服务器时间不一致那么问题就出现了但是这个bug完全可以从服务器运维层面规避的而且如果服务器时间不一样的话只要和时间相关的逻辑都是会出问题的
2如果前一个锁超时的时候刚好有多台服务器去请求获取锁那么就会出现同时执行redis.getset()而导致出现过期时间覆盖问题不过这种情况并不会对正确结果造成影响
3存在多个线程同时持有锁的情况如果线程A执行任务的时间超过锁的过期时间这时另一个线程就可以获得这个锁了造成多个线程同时持有锁的情况。类似于方案一可以使用“锁续期”的方式来解决。
前两种redis分布式锁的存在的问题
前面两种redis分布式锁的实现方式如果从“高可用”的层面来看仍然是有所欠缺也就是说当 redis 是单点的情况下当发生故障时则整个业务的分布式锁都将无法使用。
为了提高可用性我们可以使用主从模式或者哨兵模式但在这种情况下仍然存在问题在主从模式或者哨兵模式下正常情况下如果加锁成功了那么master节点会异步复制给对应的slave节点。但是如果在这个过程中发生master节点宕机主备切换slave节点从变为了 master节点而锁还没从旧master节点同步过来这就发生了锁丢失会导致多个客户端可以同时持有同一把锁的问题。来看个图来想下这个过程 那么如何避免这种情况呢redis 官方给出了基于多个 redis 集群部署的高可用分布式锁解决方案RedLock在方案三我们就来详细介绍一下。备注如果master节点宕机期间可以容忍多个客户端同时持有锁那么就不需要redLock
方案三基于RedLock的分布式锁
redLock的官方文档地址
https://redis.io/topics/distlock
Redlock算法是Redis的作者 Antirez 在单Redis节点基础上引入的高可用模式。Redlock的加锁要结合单节点分布式锁算法共同实现因为它是RedLock的基础
1、加锁实现原理
现在假设有5个Redis主节点(大于3的奇数个)这样基本保证他们不会同时都宕掉获取锁和释放锁的过程中客户端会执行以下操作
1获取当前Unix时间以毫秒为单位并设置超时时间TTL TTL 要大于 正常业务执行的时间 获取所有redis服务消耗时间 时钟漂移 2依次尝试从5个实例使用相同的key和具有唯一性的value获取锁当向Redis请求获取锁时客户端应该设置一个网络连接和响应超时时间这个超时时间应该小于锁的失效时间TTL这样可以避免客户端死等。比如TTL为5s设置获取锁最多用1s所以如果一秒内无法获取锁就放弃获取这个锁从而尝试获取下个锁
3客户端 获取所有能获取的锁后的时间 减去 第(1)步的时间就得到锁的获取时间。锁的获取时间要小于锁失效时间TTL并且至少从半数以上的Redis节点取到锁才算获取成功锁
4如果成功获得锁key的真正有效时间 TTL - 锁的获取时间 - 时钟漂移。比如TTL 是5s,获取所有锁用了2s则真正锁有效时间为3s
5如果因为某些原因获取锁失败没有在半数以上实例取到锁或者取锁时间已经超过了有效时间客户端应该在所有的Redis实例上进行解锁无论Redis实例是否加锁成功因为可能服务端响应消息丢失了但是实际成功了。 设想这样一种情况客户端发给某个Redis节点的获取锁的请求成功到达了该Redis节点这个节点也成功执行了SET操作但是它返回给客户端的响应包却丢失了。这在客户端看来获取锁的请求由于超时而失败了但在Redis这边看来加锁已经成功了。因此释放锁的时候客户端也应该对当时获取锁失败的那些Redis节点同样发起请求。实际上这种情况在异步通信模型中是有可能发生的客户端向服务器通信是正常的但反方向却是有问题的。 6失败重试当client不能获取锁时应该在随机时间后重试获取锁同时重试获取锁要有一定次数限制 在随机时间后进行重试主要是防止过多的客户端同时尝试去获取锁导致彼此都获取锁失败的问题。 算法示意图如下 2、RedLock性能及崩溃恢复的相关解决方法
由于N个Redis节点中的大多数能正常工作就能保证Redlock正常工作因此理论上它的可用性更高。前面我们说的主从架构下存在的安全性问题在RedLock中已经不存在了但如果有节点发生崩溃重启还是会对锁的安全性有影响的具体的影响程度跟Redis持久化配置有关
1如果redis没有持久化功能在clientA获取锁成功后所有redis重启clientB能够再次获取到锁这样违法了锁的排他互斥性
2如果启动AOF永久化存储事情会好些 举例当我们重启redis后由于redis过期机制是按照unix时间戳走的所以在重启后然后会按照规定的时间过期不影响业务但是由于AOF同步到磁盘的方式默认是每秒一次如果在一秒内断电会导致数据丢失立即重启会造成锁互斥性失效但如果同步磁盘方式使用Always(每一个写命令都同步到硬盘)造成性能急剧下降所以在锁完全有效性和性能方面要有所取舍
3为了有效解决既保证锁完全有效性 和 性能高效问题antirez又提出了“延迟重启”的概念redis同步到磁盘方式保持默认的每秒1次在redis崩溃单机后无论是一个还是所有先不立即重启它而是等待TTL时间后再重启这样的话这个节点在重启前所参与的锁都会过期它在重启后就不会对现有的锁造成影响缺点是在TTL时间内服务相当于暂停状态
3、Redisson中RedLock的实现
在JAVA的redisson包已经实现了对RedLock的封装主要是通过 redisClient 与 lua 脚本实现的之所以使用 lua 脚本是为了实现加解锁校验与执行的事务性。
1唯一ID的生成
分布式事务锁中为了能够让作为中心节点的存储节点获取锁的持有者从而避免锁被非持有者误解锁每个发起请求的 client 节点都必须具有全局唯一的 id。通常我们是使用 UUID 来作为这个唯一 idredisson 也是这样实现的在此基础上redisson 还加入了 threadid 避免了多个线程反复获取 UUID 的性能损耗
protected final UUID id UUID.randomUUID();String getLockName(long threadId) { return id : threadId;}2加锁逻辑
redisson 加锁的核心代码非常容易理解通过传入 TTL 与唯一 id实现一段时间的加锁请求。下面是可重入锁的实现逻辑
T RFutureT tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) { internalLockLeaseTime unit.toMillis(leaseTime); // 获取锁时向5个redis实例发送的命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 校验分布式锁的KEY是否已存在如果不存在那么执行hset命令hset REDLOCK_KEY uuidthreadId 1并通过pexpire设置失效时间也是锁的租约时间 if (redis.call(exists, KEYS[1]) 0) then redis.call(hset, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; // 如果分布式锁的KEY已存在则校验唯一 id如果唯一 id 匹配表示是当前线程持有的锁那么重入次数加1并且设置失效时间 if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; // 获取分布式锁的KEY的失效时间毫秒数 return redis.call(pttl, KEYS[1]);, // KEYS[1] 对应分布式锁的 keyARGV[1] 对应 TTLARGV[2] 对应唯一 id Collections.ObjectsingletonList(getName()), internalLockLeaseTime, getLockName(threadId));}3释放锁逻辑
protected RFutureBoolean unlockInnerAsync(long threadId) { // 向5个redis实例都执行如下命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果分布式锁 KEY 不存在那么向 channel 发布一条消息 if (redis.call(exists, KEYS[1]) 0) then redis.call(publish, KEYS[2], ARGV[1]); return 1; end; // 如果分布式锁存在但是唯一 id 不匹配表示锁已经被占用 if (redis.call(hexists, KEYS[1], ARGV[3]) 0) then return nil; end; // 如果就是当前线程占有分布式锁那么将重入次数减 1 local counter redis.call(hincrby, KEYS[1], ARGV[3], -1); // 重入次数减1后的值如果大于0表示分布式锁有重入过那么只设置失效时间不删除 if (counter 0) then redis.call(pexpire, KEYS[1], ARGV[2]); return 0; else // 重入次数减1后的值如果为0则删除锁并发布解锁消息 redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return 1; end; return nil;, // KEYS[1] 表示锁的 keyKEYS[2] 表示 channel nameARGV[1] 表示解锁消息ARGV[2] 表示 TTLARGV[3] 表示唯一 id Arrays.ObjectasList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}4redisson中RedLock的使用
Config config new Config();config.useSentinelServers() .addSentinelAddress(127.0.0.1:6369,127.0.0.1:6379, 127.0.0.1:6389) .setMasterName(masterName) .setPassword(password).setDatabase(0); RedissonClient redissonClient Redisson.create(config);RLock redLock redissonClient.getLock(REDLOCK_KEY); try { // 尝试加锁最多等待500ms上锁以后10s自动解锁 boolean isLock redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS); if (isLock) { //获取锁成功执行对应的业务逻辑 }} catch (Exception e) { e.printStackTrace();} finally { redLock.unlock();}可以看到redisson 包的实现中通过 lua 脚本校验了解锁时的 client 身份所以我们无需再在 finally 中去判断是否加锁成功也无需做额外的身份校验可以说已经达到开箱即用的程度了。
同样基于RedLock实现的分布式锁也存在 client 获取锁之后在 TTL 时间内没有完成业务逻辑的处理而此时锁会被自动释放造成多个线程同时持有锁的问题。而Redisson 在实现的过程中自然也考虑到了这一问题redisson 提供了一个“看门狗”的特性当锁即将过期还没有释放时不断的延长锁key的生存时间。具体实现原理会在方案四进行介绍
方案四基于Redisson看门狗的分布式锁
前面说了如果某些原因导致持有锁的线程在锁过期时间内还没执行完任务而锁因为还没超时被自动释放了那么就会导致多个线程同时持有锁的现象出现而为了解决这个问题可以进行“锁续期”。其实在JAVA的Redisson包中有一个看门狗机制已经帮我们实现了这个功能。
1、redisson原理
redisson在获取锁之后会维护一个看门狗线程当锁即将过期还没有释放时不断的延长锁key的生存时间 2、加锁机制
线程去获取锁获取成功执行lua脚本保存数据到redis数据库。
线程去获取锁获取失败一直通过while循环尝试获取锁获取成功后执行lua脚本保存数据到redis数据库。
3、watch dog自动延期机制 看门狗启动后对整体性能也会有一定影响默认情况下看门狗线程是不启动的。如果使用redisson进行加锁的同时设置了锁的过期时间也会导致看门狗机制失效。 redisson在获取锁之后会维护一个看门狗线程在每一个锁设置的过期时间的1/3处如果线程还没执行完任务则不断延长锁的有效期。看门狗的检查锁超时时间默认是30秒可以通过 lockWactchdogTimeout 参数来改变。
加锁的时间默认是30秒如果加锁的业务没有执行完那么每隔 30 ÷ 3 10秒就会进行一次续期把锁重置成30秒保证解锁前锁不会自动失效。
那万一业务的机器宕机了呢如果宕机了那看门狗线程就执行不了了就续不了期那自然30秒之后锁就解开了呗。
4、redisson分布式锁的关键点 a. 对key不设置过期时间由Redisson在加锁成功后给维护一个watchdog看门狗watchdog负责定时监听并处理在锁没有被释放且快要过期的时候自动对锁进行续期保证解锁前锁不会自动失效 b. 通过Lua脚本实现了加锁和解锁的原子操作 c. 通过记录获取锁的客户端id每次加锁时判断是否是当前客户端已经获得锁实现了可重入锁。 5、Redisson的使用
在方案三中我们已经演示了基于Redisson的RedLock的使用案例其实 Redisson 也封装 可重入锁Reentrant Lock、公平锁Fair Lock、联锁MultiLock、红锁RedLock、读写锁ReadWriteLock、 信号量Semaphore、可过期性信号量PermitExpirableSemaphore、 闭锁CountDownLatch等具体使用说明可以参考官方文档Redisson的分布式锁和同步器
附redLock的官方文档翻译