怎么分析一个网站,设计工作室官网,达州seo,济南手机网站建设报价分布式锁的应用场景与分布式锁实现#xff08;一#xff09;#xff1a;传统锁处理并发及传统锁的问题
基于Redis实现分布式锁 所有代码已同步到GitCode#xff1a;https://gitcode.net/ruozhuliufeng/distributed-project.git 基本实现
借助Redis中的命令setnx(key一传统锁处理并发及传统锁的问题
基于Redis实现分布式锁 所有代码已同步到GitCodehttps://gitcode.net/ruozhuliufeng/distributed-project.git 基本实现
借助Redis中的命令setnx(keyvalue),key不存在就新增存在就什么都不做。同时有多个客户端发送setnx命令只有一个客户端可以成功返回1(true)其他客户端返回0(false)。
多个客户端同时获取锁(setnx)获取成功执行业务逻辑执行完成释放锁(del)其他客户端等待重试
改造StockService方法 /*** 减库存*/Overridepublic void checkAndLock() {// 加锁setnxBoolean lock this.redisTemplate.opsForValue().setIfAbsent(lock, 1);// 重试递归调用if (!lock){try {Thread.sleep(50);this.deduct();} catch (InterruptedException e) {e.printStackTrace();}} else {try {// 1. 查询库存信息String stock redisTemplate.opsForValue().get(stock).toString();// 2. 判断库存是否充足if (stock ! null stock.length() ! 0) {Integer st Integer.valueOf(stock);if (st 0) {// 3.扣减库存redisTemplate.opsForValue().set(stock, String.valueOf(--st));}}} finally {// 解锁this.redisTemplate.delete(lock);}}} 其中加锁也可以使用循环
// 加锁获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent(lock, 1)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}
} 解锁 this.redisTemplate.delete(lock); 重置缓存值使用Jmeter进行压力测试并获取库存余量0 防死锁 代码均已上传至GitCode可根据提交信息获取文件的更改内容 ]
问题setnx刚刚获取到锁当前服务器宕机导致del释放锁无法执行进而导致锁无法释放死锁
解决给锁设置过期时间自动释放锁
设置过期时间的两种方式
通过expire设置过期时间缺乏原子性如果setnx与expire之间出现异常锁也无法释放通过set指令设置过期时间set key value ex 3 nx(既达到setnx的效果又设置了过期时间)
防误删
问题可能会释放其他服务器的锁
场景如果业务逻辑的执行时间是7s。执行流程如下
index1业务逻辑没执行完3秒后锁被自动释放index2业务获取到锁执行业务逻辑3秒后锁被自动释放index3业务获取到锁执行业务逻辑index1业务逻辑执行完成开始调用del释放锁这时释放的是index3的锁导致index3的业务只执行1s就被别人释放最终等于没锁的情况仍会导致超卖现象发生
解决setnx获取到锁时设置一个指定的唯一值(例如uuid)释放前获取这个值判断是否是自己的锁。 实现如下 问题删除操作缺乏原子性
场景
index1执行删除时查询到的lock值确实和uuid相同index1执行删除前lock刚好过期时间已到被redis自动释放index2获取到了lockindex1执行了删除此时会把index2的lock删除
解决方案没有一个命令可以同时做到判断删除所以只能通过其他方式实现(lua脚本)
Redis中的lua脚本 lua脚本可以一次性发送多个指令给redis由于Redis是单线程的执行指令遵守one-by-one规则 现实问题
Redis采用单线程架构可以保证单个命令的原子性但是无法保证一组命令在高并发场景下的原子性。例如 在串行场景下A和B的值肯定都是3
在并发场景下A和B的值可能在0-6之间。
极限情况下1 则A的结果是0B的结果是3
极限情况下2 则A和B的结果都是6。
如果Redis客户端通过lua脚本把3个命令一次性发送给redis服务器那么这三个指令就不会被其他客户端指令打断。Redis也保证脚本会以原子性atomic的方式执行当某个脚本正在运行的时候不会有其他脚本或Redis命令被执行。这和使用MULTI/EXEC包围的事务很类似。
但是MULTI/EXEC方法来使用事务功能将一组命令打包执行无法进行业务逻辑的操作。这期间有某一条命令执行报错例如给字符串自增其他的命令还是会执行并不会回滚。
lua介绍
lua是一种轻量小巧的脚本语言用标准C语言编写并以源代码形式开放其设计目的是为了嵌入应用程序中从而为应用程序提供灵活的扩展和定制功能。
设计目的
其设计目的是为了嵌入应用程序中从而为应用程序提供灵活的扩展和定制功能。
lua特性
轻量级它用标准C语言编写并以源代码形式开放编译后仅仅一百余K可以很方便的嵌入到别的程序里。可扩展Lua提供了非常易于使用的扩展接口和机制由于宿主语言通常是C或C提供这些功能Lua可以使用它们就像是本来就内置的功能一样。其他特性 支持面向过程procedure-oriented编程和函数式变成functional programming自动内存管理只提供了一种通用类型的表(table)用它可以实现数组、哈希表、集合、对象语言内置模式匹配闭包(closure)函数也可以看做一个值提供多线程协同进程并非操作系统所支持的线程支持通过闭包和table可以很方便的支持面向对象变成所需要的一些关键机制比如数据抽象、虚函数、继承和重载等。
lua基本语言
这里不做深究感兴趣可以到官方教程或菜鸟教程这里以Redis中可能会用到的部分语法作介绍。
变量
a 5 -- 全局变量
local b 10 -- 局部变量redis只支持局部变量
a,b 10,2*x -- 等价于 a 10; b 2*x流程控制
if(布尔表达式 1)
then -- [ 在布尔表达式 1 为true时执行改语句块 ]
elseif(布尔表达式 2)
then -- [ 在布尔表达式 2 为true时执行改语句块 ]
else -- [ 在以上表达式都不为true时执行改语句块 ]
endRedis执行lua脚本-EVAL指令
在Redis中需要通过eval命令执行lua脚本。
格式
EVAL script numkeys key [key ...] arg [arg ...]
script: lua脚本字符串这段lua脚本不需要也不应该定义函数
numkeys: lua脚本中keys数组的大小
key [key ...]: KEYS数组中的元素
arg [arg ...]: ARGV数组中的元素案例1基本案例
EVAL return 10 0
# 输出10案例2动态传参
EVAL return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]} 5 10 20 30 40 50 60 70 80 90
# 输出10 20 60 70
# 下标从1开始
EVAL if KEYS[1] ARGV[1] then return 1 else return 0 end 1 10 20
# 输出0
EVAL if KEYS[1] ARGV[1] then return 1 else return 0 end 1 20 10
# 输出1 传入了两个参数 10 和 20KEYS的长度是1所以KEYS中有一个元素10剩下的一个20就是ARGV数组中的元素
redis.call()中的redis是redis中提供的lua脚本类库仅在redis环境中使用该类库。
案例3执行redis类库方法
set a 10 -- 设置一个a 值为10
EVAL return redis.call(get,a) 0
# 通过return把call方法返回给redis客户端打印10 注意**脚本里使用的所有键都应该由KEYS数组来传递。**但并不是强制性的代价是这样写出的脚本不能被Redis集群所兼容。
案例4给Redis类库方法动态传参
EVAL return redis.call(set,KEYS[1],ARGV[1]) 1 b 20 以上案例基本可以应付Redis分布式锁所需要的脚本知识了。
案例5pcall函数的使用了解
-- 当call()在执行命令的过程中发生错误时脚本会停止运行并返回一个脚本错误输出错误信息
EVAL return redis.call(sets, KEYS[1], ARGV[1]),redis.call(set, KEYS[2], ARGV[2]) 2 c d 20 30
- pcall函数不影响后续指令的执行
EVAL return redis.pcall(sets,KEYS[1],ARGV[1]),redis.pcall(set ,KEYS[2],ARGV[2]) 2 c d 20 30 注意set方法写成了sets肯定会报错。 使用lua保证删除原子性
删除lua脚本
if redis.call(get,KEYS[1]) ARGV[1] then return redis.call(del,KEYS[1]) else return 0 end 更新代码
package tech.msop.distributed.lock.service.impl;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import tech.msop.distributed.lock.constants.StockConstant;
import tech.msop.distributed.lock.entity.StockEntity;
import tech.msop.distributed.lock.mapper.StockMapper;
import tech.msop.distributed.lock.service.IStockService;import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;/*** 库存服务实现类 br/*/
Service
Slf4j
public class StockServiceImpl extends ServiceImplStockMapper, StockEntityimplements IStockService {Autowiredprivate StringRedisTemplate redisTemplate;/*** 减库存*/Overridepublic void checkAndLock() {String uuid UUID.randomUUID().toString();// 加锁 setnxwhile (!redisTemplate.opsForValue().setIfAbsent(lock, uuid,3, TimeUnit.SECONDS)) {// 重试循环try {Thread.sleep(30);} catch (InterruptedException e) {throw new RuntimeException(e);}}try {// 1. 查询库存信息String stock redisTemplate.opsForValue().get(stock);// 2. 判断库存是否充足if (stock ! null stock.length() ! 0) {Integer st Integer.valueOf(stock);if (st 0) {// 3.更新到数据库redisTemplate.opsForValue().set(stock, String.valueOf(--st));}}} finally {String script if redis.call(get,KEYS[1]) ARGV[1] then return redis.call(del,KEYS[1]) else return 0 end;redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lock),uuid);}}
}
进行压力测试并查询库存余量0 可重入锁
以上加锁命令存在一个问题由于加锁命令使用了SETNX一旦键存在就无法再设置成功这就导致后续同一线程内继续加锁将会加锁失败。当一个线程执行一段代码成功获取锁之后继续执行又遇到加锁的子任务代码可重入性就保证线程能继续执行而不可重入就是需要等待锁释放之后再次获取锁成功才能继续往下执行。
用一段Java代码解释可重入
public synchronized void a(){b();
}
public synchronized void b(){// pass
} 假设X线程在a方法获取锁之后继续执行b方法如果此时不可重入线程就必须等待锁释放再次争抢锁。
锁明明是X线程拥有却还需要等待自己释放锁然后再去抢锁这看起来就很奇怪我释放我自己。
而可重入性就可以解决这个尴尬的问题当线程拥有锁之后往后再遇到加锁方法直接将加锁次数加1然后在执行方法逻辑。退出加锁方法之后加锁此处再减1当加锁次数为0时锁才被真正的释放。
可以看到可重入锁的最大特性就是计数计算加锁的次数。所以当可重入锁需要在分布式环境实现时我们也就需要统计加锁次数。
通过阅读JDK中的可重入锁源码可知可重入锁ReentrantLock的加锁流程ReentrantLock.lock) — NonfairSync.lock() — AQS.acquire(1) — NonfairSync.tryAcquire(1) — Sync.nonfairTryAcquire(1)
CAS获取锁如果没有线程占用锁 (state 0)加锁成功并记录当前线程为有锁线程(两次)如果state的值不为0说明锁已经占用则判断当前线程是否为有锁线程若是则重入state 1若否加锁失败入队等待
可重入锁的解锁流程Reentrant.unlock() — AQS.release(1) — SyncRelease(1)
判断当前线程是否为有锁线程不是则抛出异常对state的值减1之后判断state的值是否为0为0则解锁成功返回true如果减1后的值不为0返回false
确定解决方案Redishash
加锁脚本 参照ReentrantLock中的非公平可重入锁实现分布式可重入锁hash lua脚本 Redis提供了Hash哈希表这种可以存储键值对数据结构。所以我们可以使用Redis Hash存储锁的重入次数然后利用lua脚本判断逻辑。通过JDK的可重入锁分析继续分析Redis中的加锁流程
判断锁是否存在exists不存在则直接获取锁 hset key field value如果锁存在则判断是否是自己的锁(hexists)如果是自己的锁则重入hincrby key field increment否则重试递归/循环锁lua脚本
if (redis.call(exists, KEYS[1]) 0 or redis.call(hexists, KEYS[1], ARGV[1]) 1)
thenredis.call(hincrby, KEYS[1], ARGV[1], 1);redis.call(expire, KEYS[1], ARGV[2]);return 1;
elsereturn 0;
end-- key: lock
-- arg: uuid 30 假设值为KEYS[lock]ARGV[uuid,expire]
如果锁不存在或者这是自己的锁就通过hincrby不存在就新增并加1存在就加1获取锁或者锁此处加1
解锁脚本
分析Redis的解锁流程
判断自己的锁是否存在(hexists)不存在则返回nil如果自己的锁存在则减1hincrby - 1)判断减一后的值是否为0为0则返回1不为0返回0解锁lua脚本
-- 判断hash set 可重入key的值是否等于0
-- 如果为 nil 代表 自己的锁不存在在尝试解其他线程的锁解锁失败
-- 如果为 0 代表 可重入次数被减1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call(hexists, KEYS[1], ARGV[1]) 0) then return nil;
elseif(redis.call(hincrby, KEYS[1], ARGV[1], -1) 0) then return 0;
else redis.call(del, KEYS[1]); return 1;
end-- key: lock
-- arg: uuid代码实现
由于后续会有基于Zookeeper和基于MySQL实现的分布式锁我们可以通过工厂类获取不同类型的分布式锁。
DistributedLockClient工厂类具体实现
Component
public class DistributedLockClient {Autowiredprivate StringRedisTemplate redisTemplate;private String uuid;public DistributedLockClient() {this.uuid UUID.randomUUID().toString();}public DistributedRedisLock getRedisLock(String lockName){return new DistributedRedisLock(redisTemplate, lockName, uuid);}
} DistributedRedisLock实现如下
public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate redisTemplate;this.lockName lockName;this.uuid uuid;}Overridepublic void lock() {this.tryLock();}Overridepublic void lockInterruptibly() throws InterruptedException {}Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加锁方法* param time* param unit* return* throws InterruptedException*/Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time ! -1){this.expire unit.toSeconds(time);}String script if redis.call(exists, KEYS[1]) 0 or redis.call(hexists, KEYS[1], ARGV[1]) 1 then redis.call(hincrby, KEYS[1], ARGV[1], 1) redis.call(expire, KEYS[1], ARGV[2]) return 1 else return 0 end;while (!this.redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))){Thread.sleep(50);}return true;}/*** 解锁方法*/Overridepublic void unlock() {String script if redis.call(hexists, KEYS[1], ARGV[1]) 0 then return nil elseif redis.call(hincrby, KEYS[1], ARGV[1], -1) 0 then return redis.call(del, KEYS[1]) else return 0 end;Long flag this.redisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(lockName), getId());if (flag null){throw new IllegalMonitorStateException(this lock doesnt belong to you!);}}Overridepublic Condition newCondition() {return null;}/*** 给线程拼接唯一标识* return*/String getId(){return uuid : Thread.currentThread().getId();}
}使用及测试
在业务代码中使用
package tech.msop.distributed.lock.service.impl;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import tech.msop.distributed.lock.constants.StockConstant;
import tech.msop.distributed.lock.entity.StockEntity;
import tech.msop.distributed.lock.lock.DistributedLockClient;
import tech.msop.distributed.lock.lock.DistributedRedisLock;
import tech.msop.distributed.lock.mapper.StockMapper;
import tech.msop.distributed.lock.service.IStockService;/*** 库存服务实现类 br/*/
Service
Slf4j
public class StockServiceImpl extends ServiceImplStockMapper, StockEntityimplements IStockService {Autowiredprivate StringRedisTemplate redisTemplate;Autowiredprivate DistributedLockClient distributedLockClient;/*** 减库存*/Overridepublic void checkAndLock() {DistributedRedisLock redisLock this.distributedLockClient.getRedisLock(lock);redisLock.lock();try {// 1. 查询库存信息String stock redisTemplate.opsForValue().get(stock).toString();// 2. 判断库存是否充足if (stock ! null stock.length() ! 0) {Integer st Integer.valueOf(stock);if (st 0) {// 3.扣减库存redisTemplate.opsForValue().set(stock, String.valueOf(--st));}}} finally {redisLock.unlock();}}
}
使用Jmet测试并查询库存余量0 测试可重入性 自动续期
借助Timer定时器lua脚本实现自动续期
if (redis.call(hexists,KEYS[1],ARGV[1]) 1)
then return redis.call(expire,KEYS[1],ARGV[2])
else return 0
end-- key: lock
-- arg: uuid 30 修改Redis分布式锁实现锁自动续期
package tech.msop.distributed.lock.lock;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 基于Redis实现分布式锁*/
public class DistributedRedisLock implements Lock {private final StringRedisTemplate redisTemplate;private final String lockName;private final String uuid;private long expire 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate redisTemplate;this.lockName lockName;this.uuid uuid : Thread.currentThread().getId();}Overridepublic void lock() {this.tryLock();}Overridepublic void lockInterruptibly() throws InterruptedException {}Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加锁方法** param time 超时时间* param unit 时间单位* return 加锁是否成功* throws InterruptedException 异常*/Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time ! -1) {this.expire unit.toSeconds(time);}String script if redis.call(exists, KEYS[1]) 0 or redis.call(hexists, KEYS[1], ARGV[1]) 1 then redis.call(hincrby, KEYS[1], ARGV[1], 1) redis.call(expire, KEYS[1], ARGV[2]) return 1 else return 0 end;while (!this.redisTemplate.execute(new DefaultRedisScript(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {Thread.sleep(50);}// 在加锁成功返回之前开启定时器自动续期renewExpire();return true;}/*** 解锁方法*/Overridepublic void unlock() {String script if redis.call(hexists, KEYS[1], ARGV[1]) 0 then return nil elseif redis.call(hincrby, KEYS[1], ARGV[1], -1) 0 then return redis.call(del, KEYS[1]) else return 0 end;Long flag this.redisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(lockName), uuid);if (flag null) {throw new IllegalMonitorStateException(this lock doesnt belong to you!);}}Overridepublic Condition newCondition() {return null;}/*** 给线程拼接唯一标识** return 唯一标识*/
// String getId() {
// return uuid : Thread.currentThread().getId();
// }private void renewExpire(){String script if redis.call(hexists, KEYS[1], ARGV[1]) 1 then return redis.call(expire, KEYS[1], ARGV[2]) else return 0 end;new Timer().schedule(new TimerTask() {Overridepublic void run() {if (redisTemplate.execute(new DefaultRedisScript(script,Boolean.class),Arrays.asList(lockName),uuid,String.valueOf(expire))){renewExpire();}}},this.expire * 1000 /3);}}
在tryLock方法中使用: 构造方法作如下修改 解锁方法作如下修改 手写分布式锁小结
特征 独占排他使用 setnx 防止死锁设置锁的过期时间 如果Redis客户端程序从Redis服务中获取到锁突然宕机无法执行后续操作并释放锁则其他程序也无法获取到锁并导致服务阻塞解决设置锁的过期时间即使未手动释放锁也会在一定时间后自动过期释放不可重入需要可重入 原子性 获取锁和设置过期时间必须具有原子性set key value ex 3 nx判断和释放锁之间也需要原子性借助lua脚本实现 防误删解铃还须系铃人 先判断是否是自己的锁再删除 可重入性hash(key field value) lua脚本 自动续期Timer定时器 lua脚本 程序执行时间过长超过锁的过期时间为了防止锁机制失效需要判断程序是否执行完成未完成则需要续期锁的过期时间 在集群情况下导致锁机制失效 客户端程序C1从主服务器中获取锁从服务器还没来得及同步数据主服务器宕机于是从服务器升级为主服务器客户端程序C2就从新主服务器中获取到锁导致锁机制失效
锁操作
加锁
setnx独占排他、死锁、不可重入、原子性set k v ex 30 nx独占排他、死锁 不可重入hash lua脚本可重入锁 判断锁是否被占用(exists)如果没有被占用则直接获取锁(hset/hincrby)并设置过期时间(expire)如果锁被占用则判断是否当前线程占用的(hexists)如果是则重入(hincrby)并重置过期时间(expire)否则获取锁失败在代码中重试 Time定时器 lua脚本实现锁的自动续期 判断锁是否是自己的锁hexists1如果是自己的锁则执行expire重置过期时间
解锁
del可能导致误删先判断在删除同时保证原子性lua脚本hash lua脚本可重入 判断当前线程的锁是否存在不存在则返回nil将来在代码中排除异常存在则直接减1hincrby - 1判断减1后的值是否为0为0则释放锁(del)并返回1不为0则返回0
获取锁重试
递归获取锁循环获取锁
红锁算法
Redis集群状态下的问题
客户端A从master获取到锁在master将锁同步到slave之前master宕机了slave节点被晋级到master节点客户端B取得了同一个资源被客户端A已经获取到的另一个锁
安全失效
解决集群下锁失效参照redis官方网站针对redlock文档https://redis.io/topics/distlock
在算法的分布式版本中我们假设有N个Redis服务器。这些节点是完全独立的因此我们不使用复制或任何其他隐式协调系统。在前面已经描述了如何在单个实例中安全地获取和释放锁在分布式锁算法中将使用相同的方法在单个实例中获取和释放锁。 将N设置为5是一个合理的值因此需要在不同的计算机或虚拟机上运行5个Redis主从服务器确保它们以独立的方式发生故障。
为了获取锁客户端执行以下操作
1、客户端以毫秒为单位获取当前时间的时间戳作为起始时间。2、客户端尝试在所有N个实例中顺序使用相同的键名、相同的随机值来获取锁定。每个实例尝试获取锁都需要时间客户端应该设置一个远小于总锁定时间的超时时间。例如如果自动释放锁时间为10秒则尝试获取锁的超时时间可能在5到50毫秒之间。这样可以防止客户端长时间与处于故障状态的Redis节点进行通信如果某个示例不可用尽快尝试与下一个实例进行通信。3、客户端获取当前时间 减去 步骤1 中获取的起始时间来计算获取锁所花费的时间。当且仅当客户端能够在大多数实例至少3个中获取锁时并且获取锁所花费的总时间小于锁有效时间则认为已获取锁。4、如果获取了锁则将锁有效时间减去获取锁花费的时间如步骤3中所计算5、如果客户端由于某种原因无法锁定N / 2 1个实例或有效时间为负而未能获得该锁它将尝试解锁所有实例即使没有锁定成功的实例。
每台计算机都有一个本地时钟我们通常可以依靠不同的计算机产生很小的时钟漂移。只有在拥有锁的客户端将在锁的有效时间内如步骤3中获得的减去一段时间仅几毫秒的情况下终止工作才能保证这一点。以补偿进程之间的时钟漂移。
当客户端无法获取锁时它应该在随机延迟后重试以避免同时获取同一资源的多个客户端之间不同步这可能会导致脑裂的情况没人胜。同样客户端在大多数Redis实例中尝试获取锁的速度越快出现裂脑情况以及需要重试的窗口就越小因此在理想情况下客户端应尝试将SET命令发送到N个实例同时使用多路复用。
值得强调的是对于未能获得大多数锁的客户端尽快释放部分获得的锁有多么重要这样就不必等待锁定期满才能再次获得锁但是如果发生了网络分区并且客户端不再能够与Redis实例进行通信则在等待密钥到期时需要付出可用性损失。
Redisson中的分布式锁 Redisson是一个在Redis的基础上实现的Java驻内存数据网格In-Memory Data Grid。它不仅提供了一系列的分布式的Java常用对象还提供了许多分布式服务。其中包括BitSet、Set、Multimap、SortedSet、Map、List、Queue、BlockingQueue、Semaphore、Lock、AtomicLong、CountDownLatch、Publish/Subscribe、Bloom filter、Remote Service、Spring cache、Executor Service、Live Object Service、Scheduler Service。Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern)从而让使用者能够将精力更集中地放在处理业务逻辑上。 官方文档地址https://github.com/redisson/redisson/wiki
可重入锁Reentrant Lock
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。
众所周知如果负责存储这个分布式锁的Redisson节点宕机以后而且这个锁正好处于锁住的状态时这个锁会出现锁死的状态。为了避免这种情况的发生Redisson内部提供了一个监控锁的看门狗它的作用是在Redisson示例被关闭前不断的延长锁的有效期。默认情况下看门狗检查锁的超时时间是30秒钟也可以通过吸怪Config.lockWatchdogTimeout来另行指定。
RLock对象完全符合Java的Lock规范。也就是说只有锁的进程才能解锁其他进程则会抛出IllegalMonitorStateException错误。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间锁便自动解开了。
RLock lock redisson.getLock(anyLock);
// 最常见的使用方法
lock.lock();// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);// 尝试加锁最多等待100秒上锁以后10秒自动解锁
boolean res lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {try {...} finally {lock.unlock();}
}1、引入Redisson依赖
!-- Redisson --
dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.21.3/version
/dependency2、添加配置
package tech.msop.distributed.lock.config;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Redisson 配置*/
Configuration
public class RedissonConfig {/*** Redisson 客户端配置** return Redisson客户端*/Beanpublic RedissonClient redissonClient() {// 初始化配置对象Config config new Config();// 单机Redis服务config.useSingleServer().setAddress(redis://127.0.0.1:6379) // redis服务地址必须 redis://ip:port
// .setDatabase(0) // 指定Redis数据库编号
// .setUsername() // redis 用户名
// .setPassword()// redis 密码
// .setConnectionMinimumIdleSize(10)// 连接池最小空闲连接数
// .setConnectionPoolSize(50) // 连接池最大线程数
// .setIdleConnectionTimeout(60000) // 线程超时时间
// .setConnectTimeout() // 客户端获取redis 链接的超时时间
// .setTimeout()// 响应超时时间;return Redisson.create(config);}
}
3、代码中使用
package tech.msop.distributed.lock.service.impl;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import tech.msop.distributed.lock.constants.StockConstant;
import tech.msop.distributed.lock.entity.StockEntity;
import tech.msop.distributed.lock.lock.DistributedLockClient;
import tech.msop.distributed.lock.lock.DistributedRedisLock;
import tech.msop.distributed.lock.mapper.StockMapper;
import tech.msop.distributed.lock.service.IStockService;/*** 库存服务实现类 br/*/
Service
Slf4j
public class StockServiceImpl extends ServiceImplStockMapper, StockEntityimplements IStockService {Autowiredprivate StringRedisTemplate redisTemplate;Autowiredprivate DistributedLockClient distributedLockClient;Autowiredprivate RedissonClient redissonClient;/*** 减库存*/Overridepublic void checkAndLock() {RLock lock redissonClient.getLock(lock);lock.lock();try {// 1. 查询库存信息String stock redisTemplate.opsForValue().get(stock).toString();// 2. 判断库存是否充足if (stock ! null stock.length() ! 0) {Integer st Integer.valueOf(stock);if (st 0) {// 3.扣减库存redisTemplate.opsForValue().set(stock, String.valueOf(--st));}}} finally {lock.unlock();}}
}
4、使用jmeter进行压力测试并获取库存余量0
公平锁Fair Lock
基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时优先分配给先发出请求的线程。所有请求线程会在一个队列中排队当某个线程出现宕机时Redisson会在等待5秒后继续下一个线程也就是说如果前面5个线程都处于等待状态那么后面的线程会等待至少25秒。
RLock fairLock redissonClient.getFairLock(anyLock);
// 最常见的使用方法
fairLock.lock();// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);// 尝试加锁最多等待100秒上锁以后10秒自动解锁
boolean res fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();联锁MultiLock(了解)
基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁每个RLock对象实例可以来自于不同的Redisson实例。
RLock lock1 redissonInstance1.getLock(lock1);
RLock lock2 redissonInstance2.getLock(lock2);
RLock lock3 redissonInstance3.getLock(lock3);RedissonMultiLock lock new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();红锁RedLock(了解)
基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁每个RLock对象实例可以来自于不同的Redisson实例。
RLock lock1 redissonInstance1.getLock(lock1);
RLock lock2 redissonInstance2.getLock(lock2);
RLock lock3 redissonInstance3.getLock(lock3);RedissonRedLock lock new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();读写锁ReadWriteLock
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock redisson.getReadWriteLock(anyRWLock);
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);// 尝试加锁最多等待100秒上锁以后10秒自动解锁
boolean res rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock(); 添加StockController方法
GetMapping(test/read)
public String testRead(){String msg stockService.testRead();return 测试读;
}GetMapping(test/write)
public String testWrite(){String msg stockService.testWrite();return 测试写;
} 添加StockService方法
public String testRead() {RReadWriteLock rwLock this.redissonClient.getReadWriteLock(rwLock);rwLock.readLock().lock(10, TimeUnit.SECONDS);System.out.println(测试读锁。。。。);// rwLock.readLock().unlock();return null;
}public String testWrite() {RReadWriteLock rwLock this.redissonClient.getReadWriteLock(rwLock);rwLock.writeLock().lock(10, TimeUnit.SECONDS);System.out.println(测试写锁。。。。);// rwLock.writeLock().unlock();return null;
}打开开两个浏览器窗口测试
同时访问写一个写完之后等待一会儿约10s另一个写开始同时访问读不用等待先写后读读要等待约10s写完成先读后写写要等待约10s读完成
信号量Semaphore
基于Redis的Redisson的分布式信号量SemaphoreJava对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步Async、反射式Reactive和RxJava2标准的接口。
RSemaphore semaphore redisson.getSemaphore(semaphore);
semaphore.trySetPermits(3);
semaphore.acquire();
semaphore.release();在StockController添加方法
GetMapping(test/semaphore)
public String testSemaphore(){this.stockService.testSemaphore();return 测试信号量;
}在StockService添加方法
public void testSemaphore() {RSemaphore semaphore this.redissonClient.getSemaphore(semaphore);semaphore.trySetPermits(3);try {semaphore.acquire();TimeUnit.SECONDS.sleep(5);System.out.println(System.currentTimeMillis());semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}
} 添加测试用例并发10次循环一次 控制台效果
控制台1
1606960790234
1606960800337
1606960800443
1606960805248控制台2
1606960790328
1606960795332
1606960800245控制台3
1606960790433
1606960795238
1606960795437由此可知
1606960790秒有3次请求进来每个控制台各1次
1606960795秒有3次请求进来控制台2有1次控制台3有2次
1606960800秒有3次请求进来控制台1有2次控制台2有1次
1606960805秒有1次请求进来控制台1有1次
闭锁CountDownLatch
基于Redisson的Redisson分布式闭锁CountDownLatchJava对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。
RCountDownLatch latch redisson.getCountDownLatch(anyCountDownLatch);
latch.trySetCount(1);
latch.await();// 在其他线程或其他JVM里
RCountDownLatch latch redisson.getCountDownLatch(anyCountDownLatch);
latch.countDown(); 需要两个方法一个等待一个计数countDown
给StockController添加测试方法
GetMapping(test/latch)
public String testLatch(){stockService.testLatch();return 班长锁门。。。;
}GetMapping(test/countdown)
public String testCountDown(){stockService.testCountDown();return 出来了一位同学;
} 给StockService添加测试方法
public void testLatch() {RCountDownLatch cdl this.redissonClient.getCountDownLatch(cdl);cdl.trySetCount(6);try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}
}public void testCountDown() {RCountDownLatch cdl this.redissonClient.getCountDownLatch(cdl);cdl.countDown();
} 重启测试打开两个页面当第二个请求执行6次之后第一个请求才会执行。
相关文章: