自己做的旅游网站简介,自己做的网站不备案不能访问吗,摄影图片素材网站,自动化项目外包平台本篇我们使用mysql实现一个分布式锁。 环境#xff1a;mysql8,navicat,maven,springboot2.3.11,mybatis-plus
分布式锁的功能
1,分布式锁使用者位于不同的机器中#xff0c;锁获取成功之后#xff0c;才可以对共享资源进行操作
2,锁具有重入的功能#xff1a;即一个使用…本篇我们使用mysql实现一个分布式锁。 环境mysql8,navicat,maven,springboot2.3.11,mybatis-plus
分布式锁的功能
1,分布式锁使用者位于不同的机器中锁获取成功之后才可以对共享资源进行操作
2,锁具有重入的功能即一个使用者可以多次获取某个锁
3,获取锁有超时的功能即在指定的时间内去尝试获取锁超过了超时时间如果还未获取成功则返回获取失败
4,能够自动容错比如A机器获取锁lock1之后在释放锁lock1之前A机器挂了导致锁lock1未释放结果会lock1一直被A机器占有着遇到这种情况时分布式锁要能够自动解决可以这么做持有锁的时候可以加个持有超时时间超过了这个时间还未释放的其他机器将有机会获取锁
预备技能乐观锁
通常我们修改表中一条数据过程如下
t1select获取记录R1
t2对R1进行编辑
t3update R1我们来看一下上面的过程存在的问题
如果A、B两个线程同时执行到t1他们俩看到的R1的数据一样然后都对R1进行编辑然后去执行t3最终2个线程都会更新成功后面一个线程会把前面一个线程update的结果给覆盖掉这就是并发修改数据存在的问题。
我们可以在表中新增一个版本号每次更新数据时候将版本号作为条件并且每次更新时候版本号1过程优化一下如下
t1打开事务start transaction
t2select获取记录R1,声明变量vR1.version
t3对R1进行编辑
t4执行更新操作update R1 set version version 1 where user_id#user_id# and version #v#;
t5t4中的update会返回影响的行数我们将其记录在count中然后根据count来判断提交还是回滚if(count1){//提交事务commit;}else{//回滚事务rollback;}上面重点在于步骤t4当多个线程同时执行到t1他们看到的R1是一样的但是当他们执行到t4的时候数据库会对update的这行记录加锁确保并发情况下排队执行所以只有第一个的update会返回1其他的update结果会返回0然后后面会判断count是否为1进而对事务进行提交或者回滚。可以通过count的值知道修改数据是否成功了。
上面这种方式就乐观锁。我们可以通过乐观锁的方式确保数据并发修改过程中的正确性。
使用mysql实现分布式锁 我们创建一个分布式锁表如下 DROP TABLE IF EXISTS t_lock;
create table t_lock(lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT 锁唯一标志,request_id varchar(64) NOT NULL DEFAULT COMMENT 用来标识请求对象的,lock_count INT NOT NULL DEFAULT 0 COMMENT 当前上锁次数,timeout BIGINT NOT NULL DEFAULT 0 COMMENT 锁超时时间,version INT NOT NULL DEFAULT 0 COMMENT 版本号每次更新1
)COMMENT 锁信息表;java代码如下 mapper接口
package com.shiguiwu.springmybatis.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import org.springframework.stereotype.Repository;/*** description: 锁mapper* author: stone* date: Created by 2021/5/30 11:12* version: 1.0.0* pakeage: com.shiguiwu.springmybatis.mapper*/
Repository
public interface LockMapper extends BaseMapperLockModel {}锁对象model
package com.shiguiwu.springmybatis.lock.model;import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.Version;
import lombok.Data;/*** description: 锁模型* author: stone* date: Created by 2021/9/10 11:13* version: 1.0.0* pakeage: com.shiguiwu.springmybatis.lock.model*/
Data
TableName(t_lock)
public class LockModel {/*** 锁的唯一值*/TableIdprivate String lockKey;/*** 请求id,同一个线程里请求id一样*/private String requestId;//锁次数private Integer lockCount;//锁超时private Long timeout;//乐观锁版本Versionprivate Integer version;
}锁接口
package com.shiguiwu.springmybatis.lock;/*** description: 锁接口* author: stone* date: Created by 2021/9/10 11:40* version: 1.0.0* pakeage: com.shiguiwu.springmybatis.lock*/
public interface ILockT {/*** 获取分布式锁,支持重入* param lockKey 锁可以* param lockTimeout 持有锁的有效时间防止死锁* param getTimeout 获取锁超时时间* return 是否锁成功*/public boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception;/*** 解锁* param lockKey 锁key**/public void unlock(String lockKey);/*** 重置锁对象* param t 锁对象* return 返回锁记录*/public int restLock(T t);}锁的实现代码如下
package com.shiguiwu.springmybatis.lock;import cn.hutool.core.util.StrUtil;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import com.shiguiwu.springmybatis.mapper.LockMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** description: mysql实现分布式锁* author: stone* date: Created by 2021/9/10 11:09* version: 1.0.0* pakeage: com.shiguiwu.springmybatis.lock*/
Component
Slf4j
public class MysqlLock implements ILockLockModel{static ThreadLocalString requestIds new ThreadLocal();Autowiredprivate LockMapper lockMapper;public String getRequestId() {String requestId requestIds.get();if (StrUtil.isBlank(requestId)) {requestId UUID.randomUUID().toString();requestIds.set(requestId);}log.info(获取到的requestId {}, requestId);return requestId;}/*** 获取锁* param lockKey 锁可以* param lockTimeout 持有锁的有效时间防止死锁* param getTimeout 获取锁超时时间* return*/Overridepublic boolean lock(String lockKey, long lockTimeout, int getTimeout) throws Exception {log.info( lock start {},lockKey);//从local中获取 请求idString requestId this.getRequestId();//获取锁的结果boolean lockResult false;//开始时间long startTime System.currentTimeMillis();while (true) {LockModel lockModel lockMapper.selectById(lockKey);if (Objects.nonNull(lockModel)) {//获取锁对象的请求idString reqId lockModel.getRequestId();//如果是空表示改锁未被占有if (StrUtil.isBlank(reqId)) {//马上占有它//设置请求idlockModel.setRequestId(requestId);//设置锁次数lockModel.setLockCount(1);//设置超时时间防止死锁lockModel.setTimeout(System.currentTimeMillis() lockTimeout);if (lockMapper.updateById(lockModel) 1) {lockResult true;break;}}//如果request_id和表中request_id一样表示锁被当前线程持有者此时需要加重入锁else if (requestId.equals(reqId)) {//可重入锁lockModel.setTimeout(System.currentTimeMillis() lockTimeout);//设置获取初次lockModel.setLockCount(lockModel.getLockCount() 1);if (lockMapper.updateById(lockModel) 1) {lockResult true;break;}}//不为空也不相等说明是其他线程占有else {//锁不是自己的并且已经超时了则重置锁继续重试if (lockModel.getTimeout() System.currentTimeMillis()) {//未超时继续重试this.restLock(lockModel);}//如果未超时休眠100毫秒继续重试else {if (startTime getTimeout System.currentTimeMillis()) {TimeUnit.MILLISECONDS.sleep(100);}else {//防止长时间阻塞break;}}}}//如果是空就插入一个锁,重新尝试获取锁else {lockModel new LockModel();//设置锁keylockModel.setLockKey(lockKey);lockMapper.insert(lockModel);}}log.info( lock end {},lockKey);return lockResult;}/*** 释放锁* param lockKey 锁key*/Overridepublic void unlock(String lockKey) {LockModel lockModel lockMapper.selectById(lockKey);//获取当前线程的请求idString reqId this.getRequestId();//获取锁次数int count 0;//当前线程requestId和库中request_id一致 lock_count0表示可以释放锁if (Objects.nonNull(lockModel) reqId.equals(lockModel.getRequestId()) (count lockModel.getLockCount()) 0) {if (count 1) {//重置锁this.restLock(lockModel);}//重入锁的问题,锁的次数减一else {lockModel.setLockCount(lockModel.getLockCount() - 1);//更新次数lockMapper.updateById(lockModel);}}}/*** 重置锁* param lockModel 锁对象* return 更新条数*/Overridepublic int restLock(LockModel lockModel) {lockModel.setLockCount(0);lockModel.setRequestId();lockModel.setTimeout(0L);return lockMapper.updateById(lockModel);}}上面代码中实现了文章开头列的分布式锁的所有功能大家可以认真研究下获取锁的方法lock释放锁的方法unlock。 测试用例
package com.shiguiwu.springmybatis;import com.shiguiwu.springmybatis.lock.ILock;
import com.shiguiwu.springmybatis.lock.model.LockModel;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;/*** description: 锁测试* author: stone* date: Created by 2021/9/10 15:32* version: 1.0.0* pakeage: com.shiguiwu.springmybatis*/
SpringBootTest
Slf4j
public class LockApplicationTests {Autowiredprivate ILockLockModel mysqlLock;测试重复获取和重复释放Testpublic void testRepeat() throws Exception {for (int i 0; i 10; i) {mysqlLock.lock(key1, 10000L, 1000);}for (int i 0; i 10; i) {mysqlLock.unlock(key1);}}// //获取之后不释放超时之后被thread1获取Testpublic void testTimeout() throws Exception {String lockKey key2;mysqlLock.lock(lockKey, 5000L, 1000);Thread thread1 new Thread(() - {try {mysqlLock.lock(lockKey, 5000L, 7000);} catch (Exception e) {e.printStackTrace();} finally {mysqlLock.unlock(lockKey);}}, thread1);thread1.start();thread1.join();}}test1方法测试了重入锁的效果。 test2测试了主线程获取锁之后一直未释放持有锁超时之后被thread1获取到了 留给大家一个问题
上面分布式锁还需要考虑一个问题比如A机会获取了key1的锁并设置持有锁的超时时间为10秒但是获取锁之后执行了一段业务操作业务操作耗时超过10秒了此时机器B去获取锁时可以获取成功的此时会导致A、B两个机器都获取锁成功了都在执行业务操作这种情况应该怎么处理大家可以思考一下然后留言我们一起讨论一下。