做响应式网站所用的代码,下载浙江平安建设信息系统网站,营销型电子商务网站特点,怎么仿别人的网站文章目录 技术背景介绍代码实现数据库表结构尝试获取锁续约阻塞式获取锁解锁检查锁是否过期或者释放 使用示例优化方案 项目代码
技术背景介绍
一般分布式锁使用最方便的就是使用redis实现#xff0c;因为他自带超时过期机制、发布订阅模式、高吞吐高性能的优势#xff0c;… 文章目录 技术背景介绍代码实现数据库表结构尝试获取锁续约阻塞式获取锁解锁检查锁是否过期或者释放 使用示例优化方案 项目代码
技术背景介绍
一般分布式锁使用最方便的就是使用redis实现因为他自带超时过期机制、发布订阅模式、高吞吐高性能的优势但是有些项目里只有mysql数据库很多数据库都是没有数据超时过期机制和发布订阅模式的当然也不是所有的这里我只针对mysql数据库作为基础组件。
代码实现
数据库表结构
DROP TABLE IF EXISTS distributed_lock;
CREATE TABLE distributed_lock (id int(11) NOT NULL AUTO_INCREMENT COMMENT id,lock_name varchar(255) NOT NULL COMMENT 锁名,machine_id varchar(255) DEFAULT NULL COMMENT 服务器id,expire_time datetime DEFAULT NULL COMMENT 过期时间服务里会有一个看门狗续期如果过期了就说明服务挂了解锁会设置为空,is_locked tinyint(4) NOT NULL DEFAULT 0 COMMENT 当前是否锁定状态,state int(11) NOT NULL DEFAULT 0 COMMENT 锁标记位 类似次数,thread_id varchar(255) DEFAULT NULL COMMENT 当前获得锁的线程id,gmt_create datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 创建时间,gmt_modified datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT 修改时间,is_deleted tinyint(4) NOT NULL DEFAULT 0 COMMENT 是否删除,PRIMARY KEY (id) USING BTREE,UNIQUE KEY idx_lock_name (lock_name) USING BTREE
) ENGINEInnoDB DEFAULT CHARSETutf8mb4;尝试获取锁
使用乐观锁模式更新锁记录。如果获取失败则加入订阅列表中等待被唤醒或者到达超时时间自动唤醒待获取到锁后再从订阅列表中移除。他的具体等待时间取决于用户输入的等待时间和锁超时过期的时间这里使用JUC的Semaphore来实现等待功能。
public boolean tryLock(String lockName, Long waitTime, Long leaseTime, TimeUnit timeUnit) {long startTime System.currentTimeMillis();String threadId getCurrentThreadId();Long ttl tryAcquire(lockName, leaseTime, timeUnit);// lock acquiredif (ttl null) {return true;}long time timeUnit.toMillis(waitTime);if (waitTime ! -1 System.currentTimeMillis() - startTime time) {//没有获取到锁也没到等待时长执行订阅释放锁的任务LockEntry lockEntry subscribe(lockName, threadId, () - {});try {while (true) {ttl tryAcquire(lockName, leaseTime, timeUnit);// lock acquiredif (ttl null) {return true;}long remainTtl time - System.currentTimeMillis() startTime;if (remainTtl 0) {return false;}// waiting for messagelockEntry.getLatch().tryAcquire(ttl 0 ttl remainTtl ? ttl : remainTtl, TimeUnit.MILLISECONDS);}} catch (InterruptedException e) {log.error(thread interrupted, e);throw new RuntimeException(e);} finally {unsubscribe(lockEntry, lockName);}} else {return false;}}private Long tryAcquire(String lockName, long leaseTime, TimeUnit unit) {String currentThreadId getCurrentThreadId();//设定了自动释放锁的时间if (leaseTime ! -1) {return tryLockInner(leaseTime, unit, lockName, currentThreadId);}//没有设置自动过期时间就需要在获取到之后使用看门狗续期Long remainTtl tryLockInner(internalLockLeaseTime, TimeUnit.MILLISECONDS, lockName, currentThreadId);// lock acquiredif (remainTtl null) {scheduleExpirationRenewal(lockName, currentThreadId);}return remainTtl;}/*** 加锁成功返回null否则返回锁的过期时间** param leaseTime* param unit* param lockName* param threadId* return*/private Long tryLockInner(long leaseTime, TimeUnit unit, String lockName, String threadId) {long internalLockLeaseTime unit.toMillis(leaseTime);//查询是否存在锁LockObject existLock lockRepository.queryLock(lockName);LockObject lockObject new LockObject();lockObject.setLockName(lockName);lockObject.setThreadId(threadId);lockObject.setMachineId(machineId);lockObject.setIsLocked(true);lockObject.setExpireTime(new Date(System.currentTimeMillis() internalLockLeaseTime));if (existLock null) {//保存锁lockObject.setState(1);try {lockRepository.save(lockObject);} catch (Exception e) {//抛出数据重复异常说明被其他线程锁定了//返回需要等待的时间log.error(lock other thread occupy, e);return reCheckTtl(leaseTime, unit, lockName, threadId);}} else {//存在的锁会判断是否是当前线程的如果是也允许加锁成功支持可重入//如果正好其他锁释放了那也会抢锁具体是否公平由各数据库的内部锁决定int updateNum lockRepository.reentrantLock(lockObject);if (updateNum 0) {//返回需要等待的时间return reCheckTtl(leaseTime, unit, lockName, threadId);}}//加锁成功return null;}private Long reCheckTtl(long leaseTime, TimeUnit unit, String lockName, String threadId) {Long ttl queryLockTtl(lockName);if (ttl null) {//如果返回null那就是获取锁的时候失败了但是执行查询锁的过期时间的时候释放了//就需要重新执行上锁逻辑return tryLockInner(leaseTime, unit, lockName, threadId);} else {return ttl;}}/*** 获取锁的释放时间单位毫秒* 如果锁不存在 或者 未上锁 或者 已过期 则返回null** param lockName* return*/private Long queryLockTtl(String lockName) {LockObject lockObject lockRepository.queryLock(lockName);if (lockObject ! null lockObject.getExpireTime() ! null) {long intervalTime lockObject.getExpireTime().getTime() - System.currentTimeMillis();if (intervalTime 0) {return intervalTime;}}return null;}update idupdateReentrantLockupdate distributed_locksetis_locked true,machine_id #{machineId,jdbcTypeVARCHAR},thread_id #{threadId,jdbcTypeVARCHAR},state if(expire_time lt; NOW(), 1, state 1),expire_time #{expireTime,jdbcTypeTIMESTAMP}/setwhere is_deleted 0and lock_name #{lockName,jdbcTypeVARCHAR}and (expire_time lt; NOW()or is_locked falseor (machine_id #{machineId,jdbcTypeVARCHAR}and thread_id #{threadId,jdbcTypeVARCHAR}))/update续约
如果锁没有设置过期时间那么就需要设置自动续期使用过期和续期的目的也是为了防止服务宕机导致锁无法释放的问题。如果续期失败说明锁已经释放了那么会自动停止锁的续约任务。
private void scheduleExpirationRenewal(String lockName, String threadId) {ExpirationEntry entry new ExpirationEntry(lockName, threadId);ExpirationEntry oldEntry expirationRenewalMap.putIfAbsent(expirationRenewalKey(lockName, threadId), entry);if (oldEntry ! null) {oldEntry.addCount();} else {//只对第一次获取锁的线程续约后面的属于重入renewExpiration(lockName, threadId);}}private void renewExpiration(String lockName, String threadId) {String keyName expirationRenewalKey(lockName, threadId);ExpirationEntry ee expirationRenewalMap.get(keyName);if (ee null) {return;}//获取到锁后过1/3时间开启续约任务scheduledExecutor.schedule(() - {ExpirationEntry ent expirationRenewalMap.get(keyName);if (ent null) {return;}boolean renewResult renewExpirationLock(lockName, ent.getThreadId());if (!renewResult) {//更新失败说明锁被释放了log.error(Cant update lock lockName expiration);expirationRenewalMap.remove(keyName);return;}// reschedule itselfrenewExpiration(lockName, threadId);}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);}private void cancelExpirationRenewal(String lockName, String threadId) {String keyName expirationRenewalKey(lockName, threadId);ExpirationEntry task expirationRenewalMap.get(keyName);if (task null) {return;}Integer count task.reduceCount();if (count 0) {expirationRenewalMap.remove(keyName);}}private String expirationRenewalKey(String lockName, String threadId) {return lockName _ threadId;}/*** 续期** param lockName* param threadId*/private boolean renewExpirationLock(String lockName, String threadId) {LockObject lockObject new LockObject();lockObject.setLockName(lockName);lockObject.setThreadId(threadId);lockObject.setMachineId(machineId);lockObject.setExpireTime(new Date(System.currentTimeMillis() internalLockLeaseTime));int updateNum lockRepository.renewExpirationLock(lockObject);return updateNum ! 0;}update idupdateRenewExpirationLockupdate distributed_lockset expire_time #{expireTime,jdbcTypeTIMESTAMP}where is_deleted 0and is_locked trueand lock_name #{lockName,jdbcTypeVARCHAR}and machine_id #{machineId,jdbcTypeVARCHAR}and thread_id #{threadId,jdbcTypeVARCHAR}and expire_time gt; NOW()/update阻塞式获取锁
阻塞式获取锁和非阻塞的区别就是等待锁释放的过程没有获取到锁的线程会一直等待下去。
public void lock(String lockName, long leaseTime, TimeUnit unit) {LockEntry lockEntry null;try {while (true) {// 尝试获取锁Long ttl tryAcquire(lockName, leaseTime, unit);if (ttl null) {// 成功获取到锁直接退出break;}// 未获取到锁订阅锁释放通知如果还没订阅if (lockEntry null) {lockEntry subscribe(lockName, getCurrentThreadId(), () - {});}// 等待锁释放通知直到TTL时间结束try {lockEntry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {// 恢复线程的中断状态Thread.currentThread().interrupt();throw new RuntimeException(Thread was interrupted while waiting for the lock, e);}}} finally {// 确保在退出时释放锁并取消订阅if (lockEntry ! null) {unsubscribe(lockEntry, getCurrentThreadId());}}}解锁
获取锁的线程释放锁的时候state会减1直到减到0锁才会真正的释放。这里需要移除锁续约的任务并且唤醒等待当前锁的线程
public void unlock(String lockName) {if (releaseLock(lockName)) {//释放锁成功后去除看门狗的续期//如果解锁失败比如自己获取到锁过期了然后又去释放锁因为他没有续约任务所以不需要移除cancelExpirationRenewal(lockName, getCurrentThreadId());//发送锁释放的通知// 这里只处理本机维护的等待锁的线程其他的机器数据库没法主动发出通知需要轮训或者由获取锁的线程下次获取锁时自行处理LockEntry lockEntry subscribeMap.get(lockName);//要判空因为如果没有阻塞中的线程那么lockEntry会为空if (lockEntry ! null) {Semaphore semaphore lockEntry.getLatch();if (semaphore.hasQueuedThreads()) {semaphore.release();}}}}update idupdateReleaseLockupdate distributed_locksetstate state - 1,expire_time if(state0, null, expire_time),is_locked if(state0, false, true),machine_id if(state0, null, machine_id),thread_id if(state0, null, thread_id),/setwhere is_deleted 0and lock_name #{lockName,jdbcTypeVARCHAR}and machine_id #{machineId,jdbcTypeVARCHAR}and thread_id #{threadId,jdbcTypeVARCHAR}and expire_time gt; NOW()and is_locked true/update检查锁是否过期或者释放
因为mysql数据库没有发布订阅的功能所以这里采用了定时查询的模式检查锁的状态。如果检测到锁释放了会发起唤醒等待锁线程的通知让等待的线程重新尝试获取锁。
public void process() {scheduledExecutor.scheduleAtFixedRate(() - {//执行本机订阅这把锁的检查任务ListString needCheckLockNameList subscribeMap.entrySet().stream().filter(entry - entry.getValue().getCounter().get() ! 0).map(entry - entry.getKey()).collect(Collectors.toList());//查询已经过期或者释放的锁ListString lockNameList lockRepository.queryAllowObtainLockList(needCheckLockNameList);//执行对应锁的唤醒操作lockNameList.forEach(lockName - {LockEntry lockEntry subscribeMap.get(lockName);if (lockEntry ! null) {//这里最多多唤醒一次无非就是让等待线程多抢占一次没什么关系这种场景发生在tryAcquire正好过期定时任务正好运行//多一次判断可以大幅度减少冲突时多释放的信号Semaphore semaphore lockEntry.getLatch();if (semaphore.hasQueuedThreads()) {semaphore.release();log.info(定时任务发起唤醒等待锁的通知);}}});}, 0, 1, TimeUnit.SECONDS);}select idqueryAllowObtainLockList resultTypejava.lang.Stringselect lock_namefrom distributed_lockwhere is_deleted 0and lock_name inforeach collectionlist itemlockName open( close) separator,#{lockName,jdbcTypeVARCHAR}/foreachand (is_locked falseor expire_time lt; NOW())/select使用示例
public static void main(String[] args) {// 第一个Spring容器加载配置类 Config1ApplicationContext context1 new AnnotationConfigApplicationContext(MybatisPlusConfig.class);// 第二个Spring容器加载配置类 Config2ApplicationContext context2 new AnnotationConfigApplicationContext(MybatisPlusConfig.class);DatabaseDistributedLock server1 context1.getBean(DatabaseDistributedLock.class);DatabaseDistributedLock server2 context2.getBean(DatabaseDistributedLock.class);server1.lock(test);new Thread(() - {ThreadUtil.sleep(1, TimeUnit.SECONDS);if (server2.tryLock(test, 17L, TimeUnit.SECONDS)) {System.out.println(我执行了1);ThreadUtil.sleep(5, TimeUnit.SECONDS);server2.unlock(test);}}).start();new Thread(() - {ThreadUtil.sleep(2, TimeUnit.SECONDS);if (server1.tryLock(test, 17L, TimeUnit.SECONDS)) {System.out.println(我执行了2);ThreadUtil.sleep(5, TimeUnit.SECONDS);server1.unlock(test);}}).start();System.out.println(我获取到了锁);ThreadUtil.sleep(15, TimeUnit.SECONDS);server1.unlock(test);ThreadUtil.sleep(100, TimeUnit.SECONDS);}优化方案
订阅通知如果有消息队列的话可以借助用来实现发布订阅锁通知