做网站怎么跟别人讲价,物流商 网站建设方案,望京做网站公司,郑州做网站大量网站被关#x1f44f;作者简介#xff1a;大家好#xff0c;我是爱吃芝士的土豆倪#xff0c;24届校招生Java选手#xff0c;很高兴认识大家#x1f4d5;系列专栏#xff1a;Spring源码、JUC源码#x1f525;如果感觉博主的文章还不错的话#xff0c;请#x1f44d;三连支持作者简介大家好我是爱吃芝士的土豆倪24届校招生Java选手很高兴认识大家系列专栏Spring源码、JUC源码如果感觉博主的文章还不错的话请三连支持一下博主哦博主正在努力完成2023计划中源码溯源一探究竟联系方式nhs19990716加我进群大家一起学习一起进步一起对抗互联网寒冬 文章目录 常见思路8.锁粒度避免过粗synchronizedredis分布式锁非原子操作忘了释放锁释放了别人的锁自旋锁锁重入问题加锁解锁 锁竞争问题读写锁锁分段 锁超时问题主从复制的问题 数据库分布式锁基于数据库表的增删基于数据库排他锁 9.切换存储方式文件中转暂存数据10.优化程序结构逻辑结构日志 11.压缩传输内容12.线程池设计线程池默认使用无界队列任务过多导致OOM线程池创建线程过多导致OOM共享线程池次要逻辑拖垮主要逻辑线程池拒绝策略的坑使用不当导致阻塞Spring内部线程池的坑使用线程池时没有自定义命名线程池参数设置不合理线程池异常处理的坑线程池使用完毕后忘记关闭ThreadLocal与线程池搭配线程复用导致信息错乱 13.机器问题 GC、线程打满、太多IO资源没关闭等等GC线程资源提升服务器硬件关于JVM调优部分的内容将会在后续专门的出一些文章因为目前笔者对这方面理解还不够所以暂不多做赘述 14.调用链路优化跨地域调用单元化架构不同的用户路由到不同的集群单元微服务拆分过细会导致Rpc调用较多提前过滤减少无效调用拆分接口 相关优秀博客 常见思路
8.锁粒度避免过粗
synchronized
在高并发场景为了防止超卖等情况我们经常需要加锁来保护共享资源。但是如果加锁的粒度过粗是很影响接口性能的。
什么是加锁粒度呢
其实就是就是你要锁住的范围是多大。比如你在家上卫生间你只要锁住卫生间就可以了吧不需要将整个家都锁起来不让家人进门吧卫生间就是你的加锁粒度。
不管你是synchronized加锁还是redis分布式锁只需要在共享临界资源加锁即可不涉及共享资源的就不必要加锁。这就好像你上卫生间不用把整个家都锁住锁住卫生间门就可以了。
比如在业务代码中有一个ArrayList因为涉及到多线程操作所以需要加锁操作假设刚好又有一段比较耗时的操作代码中的slowNotShare方法不涉及线程安全问题。反例加锁就是一锅端全锁住:
//不涉及共享资源的慢方法
private void slowNotShare() {try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {}
}//错误的加锁方法
public int wrong() {long beginTime System.currentTimeMillis();IntStream.rangeClosed(1, 10000).parallel().forEach(i - {//加锁粒度太粗了slowNotShare其实不涉及共享资源synchronized (this) {slowNotShare();data.add(i);}});log.info(cosume time:{}, System.currentTimeMillis() - beginTime);return data.size();
}正例
public int right() {long beginTime System.currentTimeMillis();IntStream.rangeClosed(1, 10000).parallel().forEach(i - {slowNotShare();//可以不加锁//只对List这部分加锁synchronized (data) {data.add(i);}});log.info(cosume time:{}, System.currentTimeMillis() - beginTime);return data.size();
}对于锁的更细致使用来说在java中提供了synchronized关键字给我们的代码加锁。
通常有两种写法在方法上加锁 和 在代码块上加锁。
先看看如何在方法上加锁
public synchronized doSave(String fileUrl) {mkdir();uploadFile(fileUrl);sendMessage(fileUrl);
}这里加锁的目的是为了防止并发的情况下创建了相同的目录第二次会创建失败影响业务功能。
但这种直接在方法上加锁锁的粒度有点粗。因为doSave方法中的上传文件和发消息方法是不需要加锁的。只有创建目录方法才需要加锁。
我们都知道文件上传操作是非常耗时的如果将整个方法加锁那么需要等到整个方法执行完之后才能释放锁。显然这会导致该方法的性能很差变得得不偿失。
这时我们可以改成在代码块上加锁了具体代码如下
public void doSave(String path,String fileUrl) {synchronized(this) {if(!exists(path)) {mkdir(path);}}uploadFile(fileUrl);sendMessage(fileUrl);
}这样改造之后锁的粒度一下子变小了只有并发创建目录功能才加了锁。而创建目录是一个非常快的操作即使加锁对接口的性能影响也不大。
最重要的是其他的上传文件和发送消息功能任然可以并发执行。
当然这种做在单机版的服务中是没有问题的。但现在部署的生产环境为了保证服务的稳定性一般情况下同一个服务会被部署在多个节点中。
同时它也带来了新的问题synchronized只能保证一个节点加锁是有效的但如果有多个节点如何加锁呢?
这就需要使用分布式锁了。目前主流的分布式锁包括redis分布式锁 和 数据库分布式锁。
redis分布式锁
在分布式系统中由于redis分布式锁相对于更简单和高效成为了分布式锁的首先被我们用到了很多实际业务场景当中。
public void doSave(String path,String fileUrl) {try {String result jedis.set(lockKey, requestId, NX, PX, expireTime);if (OK.equals(result)) {if(!exists(path)) {mkdir(path);uploadFile(fileUrl);sendMessage(fileUrl);}return true;}} finally{unlock(lockKey,requestId);} return false;
}跟之前使用synchronized关键字加锁时一样这里锁的范围也太大了换句话说就是锁的粒度太粗这样会导致整个方法的执行效率很低。
其实只有创建目录的时候才需要加分布式锁其余代码根本不用加锁。
于是我们需要优化一下代码
public void doSave(String path,String fileUrl) {if(this.tryLock()) {mkdir(path);}uploadFile(fileUrl);sendMessage(fileUrl);
}private boolean tryLock() {try {String result jedis.set(lockKey, requestId, NX, PX, expireTime);if (OK.equals(result)) {return true;}} finally{unlock(lockKey,requestId);} return false;
}上面代码将加锁的范围缩小了只有创建目录时才加了锁。这样看似简单的优化之后接口性能能提升很多。
但是Redis锁也存在着一些弊端情况如下共有八条总结性建议
非原子操作
使用redis的分布式锁我们首先想到的可能是setNx命令。
if (jedis.setnx(lockKey, val) 1) {jedis.expire(lockKey, timeout);
}这段代码确实可以加锁成功但你有没有发现什么问题
加锁操作和后面的设置超时时间是分开的并非原子操作。
假如加锁成功但是设置超时时间失败了该lockKey就变成永不失效。假如在高并发场景中有大量的lockKey加锁成功了但不会失效有可能直接导致redis内存空间不足。
忘了释放锁
上面说到使用setNx命令加锁操作和设置超时时间是分开的并非原子操作。
而在redis中还有set命令该命令可以指定多个参数。
String result jedis.set(lockKey, requestId, NX, PX, expireTime);
if (OK.equals(result)) {return true;
}
return false;其中
lockKey锁的标识requestId请求idNX只在键不存在时才对键进行设置操作。PX设置键的过期时间为 millisecond 毫秒。expireTime过期时间
set命令是原子操作加锁和设置超时时间一个命令就能轻松搞定。
使用set命令加锁表面上看起来没有问题。但如果仔细想想加锁之后每次都要达到了超时时间才释放锁会不会有点不合理加锁后如果不及时释放锁会有很多问题。
分布式锁更合理的用法是
手动加锁业务操作手动释放锁如果手动释放锁失败了则达到超时时间redis会自动释放锁。 那么问题来了如何释放锁呢
try{String result jedis.set(lockKey, requestId, NX, PX, expireTime);if (OK.equals(result)) {return true;}return false;
} finally {unlock(lockKey);
} 只在finally中释放锁就够了吗
释放了别人的锁
在多线程场景中可能会出现释放了别人的锁的情况。
假如线程A和线程B都使用lockKey加锁。线程A加锁成功了但是由于业务功能耗时时间很长超过了设置的超时时间。这时候redis会自动释放lockKey锁。此时线程B就能给lockKey加锁成功了接下来执行它的业务操作。恰好这个时候线程A执行完了业务功能释放了锁lockKey。这不就出问题了线程B的锁被线程A释放了。
那么如何解决这个问题呢
不知道你们注意到没在使用set命令加锁时除了使用lockKey锁标识还多设置了一个参数requestId为什么要需要记录requestId呢
答requestId是在释放锁的时候用的。
if (jedis.get(lockKey).equals(requestId)) {jedis.del(lockKey);return true;
}
return false;在释放锁的时候先获取到该锁的值之前设置值就是requestId然后判断跟之前设置的值是否相同如果相同才允许删除锁返回成功。如果不同则直接返回失败。
换句话说就是自己只能释放自己加的锁不允许释放别人加的锁。
当然在这里也需要保证 判断 和 删除的原子性问题
自旋锁
上面的加锁方法看起来好像没有问题但如果你仔细想想如果有1万的请求同时去竞争那把锁可能只有一个请求是成功的其余的9999个请求都会失败。
此外还有一种场景
比如有两个线程同时上传文件到sftp上传文件前先要创建目录。假设两个线程需要创建的目录名都是当天的日期比如20210920如果不做如何控制这样直接并发的创建第二个线程会失败。
有同学会说这还不容易加一个redis分布式锁就能解决问题了此外再判断一下如果目录已经存在就不创建只有目录不存在才需要创建。
try {String result jedis.set(lockKey, requestId, NX, PX, expireTime);if (OK.equals(result)) {if(!exists(path)) {mkdir(path);}return true;}
} finally{unlock(lockKey,requestId);
}
return false;只是加redis分布式锁是不够的因为第二个请求如果加锁失败了接下来是返回失败呢还是返回成功呢 显然肯定是不能返回失败的如果返回失败了这个问题还是没有被解决。如果文件还没有上传成功直接返回成功会有更大的问题。头疼到底该如何解决呢
答使用自旋锁
try {Long start System.currentTimeMillis();while(true) {String result jedis.set(lockKey, requestId, NX, PX, expireTime);if (OK.equals(result)) {if(!exists(path)) {mkdir(path);}return true;}long time System.currentTimeMillis() - start;if (timetimeout) {return false;}try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}
} finally{unlock(lockKey,requestId);
}
return false;在规定的时间比如500毫秒内自旋不断尝试加锁说白了就是在死循环中不断尝试加锁如果成功则直接返回。如果失败则休眠50毫秒再发起新一轮的尝试。如果到了超时时间还未加锁成功则直接返回失败。
锁重入问题
我们都知道redis分布式锁是互斥的。如果我们对某个key加锁了如果该key对应的锁还没失效再用相同key去加锁大概率会失败。
假设在某个请求中需要获取一颗满足条件的菜单树或者分类树。我们以菜单为例这就需要在接口中从根节点开始递归遍历出所有满足条件的子节点然后组装成一颗菜单树。
需要注意的是菜单不是一成不变的在后台系统中运营同学可以动态添加、修改和删除菜单。为了保证在并发的情况下每次都可能获取最新的数据这里可以加redis分布式锁。
加redis分布式锁的思路是对的。但接下来问题来了在递归方法中递归遍历多次每次都是加的同一把锁。递归第一层当然是可以加锁成功的但递归第二层、第三层…第N层不就会加锁失败了
递归方法中加锁的伪代码如下
private int expireTime 1000;public void fun(int level,String lockKey,String requestId){try{String result jedis.set(lockKey, requestId, NX, PX, expireTime);if (OK.equals(result)) {if(level10){this.fun(level,lockKey,requestId);} else {return;}}return;} finally {unlock(lockKey,requestId);}
}如果你直接这么用看起来好像没有问题。但最终执行程序之后发现等待你的结果只有一个出现异常。
因为从根节点开始第一层递归加锁成功还没释放说就直接进入第二层递归。因为requestId作为key的锁已经存在所以第二层递归大概率会加锁失败然后返回到第一层。第一层接下来正常释放锁然后整个递归方法直接返回了。
那么这个问题该如何解决呢
答使用可重入锁。
我们以redisson框架为例它的内部实现了可重入锁的功能。
伪代码如下
private int expireTime 1000;public void run(String lockKey) {RLock lock redisson.getLock(lockKey);this.fun(lock,1);
}public void fun(RLock lock,int level){try{lock.lock(5, TimeUnit.SECONDS);if(level10){this.fun(lock,level);} else {return;}} finally {lock.unlock();}
}接下来聊聊redisson可重入锁的实现原理。
加锁
先判断如果锁名不存在则加锁。然后判断判断如果锁名和requestId值都存在则使用hincrby命令给该锁名和requestId值计数每次都加1。注意一下这里就是重入锁的关键锁重入一次就加1。如果锁名存在但值不是requestId则返回过期时间。
解锁
先判断如果锁名和requestId值不存在则时间返回。如果锁名和requestId值存在则重入锁减1。如果减1后重入锁的value值还大于0说明还有引用则重试设置过期时间。如果减1后重入锁的value值还等于0则可以删除锁然后发消息通知等待线程抢锁。
锁竞争问题
如果有大量写入的场景使用普通的redis分布式锁是没有问题的。
但如果有些业务场景写入的操作比较少反而有大量读取的操作。直接使用普通的redis分布式锁性能会不会不太好
我们都知道锁的粒度越粗多个线程抢锁时竞争就越激烈造成多个线程锁等待的时间也就越长性能也就越差。
所以提升redis分布式锁性能的第一步就是要把锁的粒度变细。
读写锁
众所周知加锁的目的是为了保证在并发环境中读写数据的安全性即不会出现数据错误或者不一致的情况。
但在绝大多数实际业务场景中一般是读数据的频率远远大于写数据。而线程间的并发读操作是并不涉及并发安全问题我们没有必要给读操作加互斥锁只要保证读写、写写并发操作上锁是互斥的就行这样可以提升系统的性能。
我们以redisson框架为例它内部已经实现了读写锁的功能。
读锁的伪代码如下
RReadWriteLock readWriteLock redisson.getReadWriteLock(readWriteLock);
RLock rLock readWriteLock.readLock();
try {rLock.lock();//业务操作
} catch (Exception e) {log.error(e);
} finally {rLock.unlock();
}写锁的伪代码如下
RReadWriteLock readWriteLock redisson.getReadWriteLock(readWriteLock);
RLock rLock readWriteLock.writeLock();
try {rLock.lock();//业务操作
} catch (InterruptedException e) {log.error(e);
} finally {rLock.unlock();
}将读锁和写锁分开最大的好处是提升读操作的性能因为读和读之间是共享的不存在互斥性。而我们的实际业务场景中绝大多数数据操作都是读操作。所以如果提升了读操作的性能也就会提升整个锁的性能。
下面总结一个读写锁的特点
读与读是共享的不互斥读与写互斥写与写互斥
锁分段
此外为了减小锁的粒度比较常见的做法是将大锁分段。
在java中ConcurrentHashMap就是将数据分为16段每一段都有单独的锁并且处于不同锁段的数据互不干扰以此来提升锁的性能。
class ConcurrentHashMapK, V {// 初始化数组存放 Segmentprivate Segment[] segments;public ConcurrentHashMap(int initialCapacity) {segments new Segment[16]; // 初始化为 16 个 Segmentfor (int i 0; i segments.length; i) {segments[i] new Segment();}}// 获取 Segmentprivate Segment segmentFor(int hash) {return segments[(segments.length - 1) hash];}// 获取值public V get(K key) {int hash hash(key);return segmentFor(hash).get(key, hash);}// 存入值public void put(K key, V value) {int hash hash(key);segmentFor(hash).put(key, value, hash);}// Segment 类class Segment {// 使用 ReentrantLock 作为锁private final ReentrantLock lock new ReentrantLock();// 存放键值对private MapK, V map new HashMap();public V get(K key, int hash) {lock.lock();try {// 获取值return map.get(key);} finally {lock.unlock();}}public void put(K key, V value, int hash) {lock.lock();try {// 存入值map.put(key, value);} finally {lock.unlock();}}}
}放在实际业务场景中我们可以这样做
比如在秒杀扣库存的场景中现在的库存中有2000个商品用户可以秒杀。为了防止出现超卖的情况通常情况下可以对库存加锁。如果有1W的用户竞争同一把锁显然系统吞吐量会非常低。
为了提升系统性能我们可以将库存分段比如分为100段这样每段就有20个商品可以参与秒杀。
在秒杀的过程中先把用户id获取hash值然后除以100取模。模为1的用户访问第1段库存模为2的用户访问第2段库存模为3的用户访问第3段库存后面以此类推到最后模为100的用户访问第100段库存。 如此一来在多线程环境中可以大大的减少锁的冲突。以前多个线程只能同时竞争1把锁尤其在秒杀的场景中竞争太激烈了简直可以用惨绝人寰来形容其后果是导致绝大数线程在锁等待。现在多个线程同时竞争100把锁等待的线程变少了从而系统吞吐量也就提升了。
锁超时问题
前面提到过如果线程A加锁成功了但是由于业务功能耗时时间很长超过了设置的超时时间这时候redis会自动释放线程A加的锁。
通常我们加锁的目的是为了防止访问临界资源时出现数据异常的情况。比如线程A在修改数据C的值线程B也在修改数据C的值如果不做控制在并发情况下数据C的值会出问题。
为了保证某个方法或者段代码的互斥性即如果线程A执行了某段代码是不允许其他线程在某一时刻同时执行的我们可以用synchronized关键字加锁。
但这种锁有很大的局限性只能保证单个节点的互斥性。如果需要在多个节点中保持互斥性就需要用redis分布式锁。
假设线程A加redis分布式锁的代码包含代码1和代码2两段代码。 由于该线程要执行的业务操作非常耗时程序在执行完代码1的时已经到了设置的超时时间redis自动释放了锁。而代码2还没来得及执行。 此时代码2相当于裸奔的状态无法保证互斥性。假如它里面访问了临界资源并且其他线程也访问了该资源可能就会出现数据异常的情况。PS我说的访问临界资源不单单指读取还包含写入
那么如何解决这个问题呢
答如果达到了超时时间但业务代码还没执行完需要给锁自动续期。
我们可以使用TimerTask类来实现自动续期的功能
Timer timer new Timer();
timer.schedule(new TimerTask() {Overridepublic void run(Timeout timeout) throws Exception {//自动续期逻辑}
}, 10000, TimeUnit.MILLISECONDS);获取锁之后自动开启一个定时任务每隔10秒钟自动刷新一次过期时间。这种机制在redisson框架中有个比较霸气的名字watch dog即传说中的看门狗。
需要注意的地方是在实现自动续期功能时还需要设置一个总的过期时间可以跟redisson保持一致设置成30秒。如果业务代码到了这个总的过期时间还没有执行完就不再自动续期了。
自动续期的功能是获取锁之后开启一个定时任务每隔10秒判断一下锁是否存在如果存在则刷新过期时间。如果续期3次也就是30秒之后业务方法还是没有执行完就不再续期了。主从复制的问题
如果redis存在多个实例。比如做了主从或者使用了哨兵模式基于redis的分布式锁的功能就会出现问题。
假设redis现在用的主从模式1个master节点3个slave节点。master节点负责写数据slave节点负责读数据。 本来是和谐共处相安无事的。redis加锁操作都在master上进行加锁成功后再异步同步给所有的slave。
突然有一天master节点由于某些不可逆的原因挂掉了。
这样需要找一个slave升级为新的master节点假如slave1被选举出来了。 果有个锁A比较悲催刚加锁成功master就挂了还没来得及同步到slave1。
这样会导致新master节点中的锁A丢失了。后面如果有新的线程使用锁A加锁依然可以成功分布式锁失效了。
那么如果解决这个问题呢
答redisson框架为了解决这个问题提供了一个专门的类RedissonRedLock使用了Redlock算法。
RedissonRedLock解决问题的思路如下
需要搭建几套相互独立的redis环境假如我们在这里搭建了3套。每套环境都有一个redisson node节点。多个redisson node节点组成了RedissonRedLock。环境包含单机、主从、哨兵和集群模式可以是一种或者多种混合。
在这里我们以主从为例架构图如下 RedissonRedLock加锁过程如下
循环向所有的redisson node节点加锁假设节点数为N例子中N等于5。如果在N个节点当中有N/2 1个节点加锁成功了那么整个RedissonRedLock加锁是成功的。如果在N个节点当中小于N/2 1个节点加锁成功那么整个RedissonRedLock加锁是失败的。如果中途发现各个节点加锁的总耗时大于等于设置的最大等待时间则直接返回失败。
从上面可以看出使用Redlock算法确实能解决多实例场景中假如master节点挂了导致分布式锁失效的问题。
但也引出了一些新问题比如
需要额外搭建多套环境申请更多的资源需要评估一下经费是否充足。如果有N个redisson node节点需要加锁N次最少也需要加锁N/21次才知道redlock加锁是否成功。显然增加了额外的时间成本有点得不偿失。
数据库分布式锁
基于数据库表的增删
基于数据库表增删是最简单的方式首先创建一张锁的表主要包含下列字段方法名时间戳等字段。
具体使用的方法为当需要锁住某个方法时往该表中插入一条相关的记录。需要注意的是方法名有唯一性约束。如果有多个请求同时提交到数据库的话数据库会保证只有一个操作可以成功那么我们就可以认为操作成功的那个线程获得了该方法的锁可以执行方法体内容。执行完毕需要删除该记录。
基于数据库排他锁
我们还可以通过数据库的排他锁来实现分布式锁。基于 Mysql 的 InnoDB 引擎可以使用以下方法来实现加锁操作
public void lock(){connection.setAutoCommit(false)int count 0;while(count 4){try{select * from lock where lock_namexxx for update;if(结果不为空){//代表获取到锁return;}}catch(Exception e){}//为空或者抛异常的话都表示没有获取到锁sleep(1000);count;}throw new LockException();
}在查询语句后面增加 for update数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后其他线程无法再在该行记录上增加排他锁。其他没有获取到锁的就会阻塞在上述 select 语句上可能的结果有 2 种在超时之前获取到了锁在超时之前仍未获取到锁。
获得排它锁的线程即可获得分布式锁当获取到锁之后可以执行业务逻辑执行完业务之后释放锁。
9.切换存储方式文件中转暂存数据
如果数据太大落地数据库实在是慢的话就可以考虑先用文件的方式暂存。先保存文件再异步下载文件慢慢保存到数据库。
比如说一个转账接口如果是并发开启10个并发度每个批次1000笔转账明细数据数据库插入会特别耗时大概6秒左右这个跟我们公司的数据库同步机制有关并发情况下因为优先保证同步所以并行的插入变成串行啦就很耗时。
数据库同步机制可能导致并行的插入变成串行的原因有很多下面列举了一些可能的情况
锁竞争当多个事务同时尝试向相同的数据页或数据行插入数据时数据库系统可能会使用锁来确保数据的一致性。如果同步机制导致大量的锁竞争那么并行插入操作可能会被迫等待其他事务释放锁从而导致串行化。同步点阻塞某些数据库同步机制可能会引入同步点要求所有的写操作都必须在这些同步点进行同步这样就会导致并行的写操作变成串行化。冲突检测与重试在数据库同步的过程中可能会发生数据冲突系统需要检测并解决这些冲突。这种检测和解决过程可能会导致并行插入变成串行化因为某些操作需要等待其他操作完成后才能执行。数据复制延迟如果数据库采用了主从复制或者集群复制的机制数据同步可能会引入一定的延迟。在这种情况下并行的插入操作可能会因为数据尚未完全同步而变成串行化。
优化前1000笔明细转账数据先落地DB数据库返回处理中给用户再异步转账。如图 记得当时压测的时候高并发情况这1000笔明细入库耗时都比较大。所以我转换了一下思路把批量的明细转账记录保存的文件服务器然后记录一笔转账总记录到数据库即可。接着异步再把明细下载下来进行转账和明细入库。最后优化后性能提升了十几倍。
优化后流程图如下 如果你的接口耗时瓶颈就在数据库插入操作这里用来批量操作等还是效果还不理想就可以考虑用文件或者MQ等暂存。有时候批量数据放到文件会比插入数据库效率更高。
10.优化程序结构
逻辑结构
优化程序逻辑、程序代码是可以节省耗时的。比如你的程序创建多不必要的对象、或者程序逻辑混乱多次重复查数据库、又或者你的实现逻辑算法不是最高效的等等。
我举个简单的例子复杂的逻辑条件有时候调整一下顺序就能让你的程序更加高效。
假设业务需求是这样如果用户是会员第一次登陆时需要发一条感谢短信。如果没有经过思考代码直接这样写了
if(isUserVip isFirstLogin){sendSmsMsg();
}假设有5个请求过来isUserVip判断通过的有3个请求isFirstLogin通过的只有1个请求。那么以上代码isUserVip执行的次数为5次isFirstLogin执行的次数也是3次如下 如果调整一下isUserVip和isFirstLogin的顺序
if(isFirstLogin isUserVip ){sendMsg();
}isFirstLogin执行的次数是5次isUserVip执行的次数是1次 程序是不是变得更高效了呢
日志
在高并发的查询场景下打印日志可能导致接口性能下降的问题。
在排查问题时顺手打印了日志并且带上线。高峰期时发现接口的 tp99 耗时大幅增加同时 CPU 负载和垃圾回收频率也明显增加磁盘负载也增加很多。日志删除后系统回归正常。
特别是在日志中包含了大数组或大对象时更要谨慎避免打印这些日志。
不打日志无法有效排查问题。怎么办呢
为了有效地排查问题建议引入白名单机制。具体做法是在打印日志之前先判断用户是否在白名单中如果不在则不打印日志如果在则打印日志。通过将公司内的产品、开发和测试人员等相关同事加入到白名单中有利于及时发现线上问题。当用户提出投诉时也可以将相关用户添加到白名单并要求他们重新操作以复现问题。
这种方法既满足了问题排查的需求又避免了给线上环境增加压力。在测试环境中可以完全开放日志打印功能
11.压缩传输内容
压缩传输内容传输报文变得更小因此传输会更快啦。10M带宽传输10k的报文一般比传输1M的会快呀。
打个比喻一匹千里马它驮着100斤的货跑得快还是驮着10斤的货物跑得快呢
再举个视频网站的例子
如果不对视频做任何压缩编码因为带宽又是有限的。巨大的数据量在网络传输的耗时会比编码压缩后慢好多倍。
压缩文本数据可以有效地减少该数据所需的存储空间从而提高数据库和缓存的空间利用率。然而压缩和解压缩的过程会增加CPU的负载因此需要仔细考虑是否有必要进行数据压缩。此外还需要评估压缩后数据的效果即压缩对数据的影响如何。
比如说使用GZIP压缩算法的cpu负载和耗时都是比较高的。使用压缩非但不能起到降低接口耗时的效果可能导致接口耗时增加要谨慎使用。除此之外还有其他压缩算法在压缩时间和压缩率上有所权衡。可以选择适合的自己的压缩算法。
12.线程池设计
我们使用线程池就是让任务并行处理更高效地完成任务。但是有时候如果线程池设计不合理接口执行效率则不太理想。
一般我们需要关注线程池的这几个参数核心线程、最大线程数量、阻塞队列。
如果核心线程过小则达不到很好的并行效果。如果阻塞队列不合理不仅仅是阻塞的问题甚至可能会OOM如果线程池不区分业务隔离有可能核心业务被边缘业务拖垮。
下面是线程池设计建议总结
线程池默认使用无界队列任务过多导致OOM
JDK开发者提供了线程池的实现类我们基于Executors组件就可以快速创建一个线程池。日常工作中一些小伙伴为了开发效率反手就用Executors新建个线程池。写出类似以下的代码
public class NewFixedTest {public static void main(String[] args) {ExecutorService executor Executors.newFixedThreadPool(10);for (int i 0; i Integer.MAX_VALUE; i) {executor.execute(() - {try {Thread.sleep(10000);} catch (InterruptedException e) {//do nothing}});}}
}使用newFixedThreadPool创建的线程池是会有坑的它默认是无界的阻塞队列如果任务过多会导致OOM问题。运行一下以上代码出现了OOM。
Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceededat java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)at com.example.dto.NewFixedTest.main(NewFixedTest.java:14)这是因为newFixedThreadPool使用了无界的阻塞队列的LinkedBlockingQueue如果线程获取一个任务后任务的执行时间比较长(比如上面demo代码设置了10秒)会导致队列的任务越积越多导致机器内存使用不停飙升 最终出现OOM。
看下newFixedThreadPool的相关源码是可以看到一个无界的阻塞队列的如下
//阻塞队列是LinkedBlockingQueue并且是使用的是无参构造函数
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable());
}//无参构造函数默认最大容量是Integer.MAX_VALUE相当于无界的阻塞队列的了
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}因此工作中建议大家自定义线程池并使用指定长度的阻塞队列。
线程池创建线程过多导致OOM
有些小伙伴说既然Executors组件创建出的线程池newFixedThreadPool使用的是无界队列可能会导致OOM。那么Executors组件还可以创建别的线程池如newCachedThreadPool我们用它也不行嘛
我们可以看下newCachedThreadPool的构造函数
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueueRunnable());}它的最大线程数是Integer.MAX_VALUE。大家应该意识到使用它可能会引发什么问题了吧。没错如果创建了大量的线程也有可能引发OOM
所以我们使用线程池的时候还要当心线程创建过多导致OOM问题。大家尽量不要使用newCachedThreadPool并且如果自定义线程池时要注意一下最大线程数。
共享线程池次要逻辑拖垮主要逻辑
要避免所有的业务逻辑共享一个线程池。比如你用线程池A来做登录异步通知又用线程池A来做对账。如下图 如果对账任务checkBillService响应时间过慢会占据大量的线程池资源可能直接导致没有足够的线程资源去执行loginNotifyService的任务最后影响登录。就这样因为一个次要服务影响到重要的登录接口显然这是绝对不允许的。因此我们不能将所有的业务一锅炖都共享一个线程池因为这样做风险太高了犹如所有鸡蛋放到一个篮子里。应当做线程池隔离 线程池拒绝策略的坑使用不当导致阻塞
我们知道线程池主要有四种拒绝策略如下
AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。(默认拒绝策略)DiscardPolicy丢弃任务但是不抛出异常。DiscardOldestPolicy丢弃队列最前面的任务然后重新尝试执行任务。CallerRunsPolicy由调用方线程处理该任务。
如果线程池拒绝策略设置不合理就容易有坑。我们把拒绝策略设置为DiscardPolicy或DiscardOldestPolicy并且在被拒绝的任务Future对象调用get()方法,那么调用线程会一直被阻塞。
温馨提示日常开发中使用 Future.get() 时尽量使用带超时时间的因为它是阻塞的。
future.get(1, TimeUnit.SECONDS);Spring内部线程池的坑
工作中个别开发者为了快速开发喜欢直接用spring的Async来执行异步任务。
Async
public void testAsync() throws InterruptedException {System.out.println(处理异步任务);TimeUnit.SECONDS.sleep(new Random().nextInt(100));
}Spring内部线程池其实是SimpleAsyncTaskExecutor这玩意有点坑它不会复用线程的它的设计初衷就是执行大量的短时间的任务。
也就是说来了一个请求就会新建一个线程大家使用spring的Async时要避开这个坑自己再定义一个线程池。正例如下
Bean(name threadPoolTaskExecutor)
public Executor threadPoolTaskExecutor() {ThreadPoolTaskExecutor executornew ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setThreadNamePrefix(tianluo-%d);// 其他参数设置return new ThreadPoolTaskExecutor();
}使用线程池时没有自定义命名
使用线程池时如果没有给线程池一个有意义的名称将不好排查回溯问题。这不算一个坑吧只能说给以后排查埋坑
public class ThreadTest {public static void main(String[] args) throws Exception {ThreadPoolExecutor executorOne new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new ArrayBlockingQueueRunnable(20));executorOne.execute(()-{throw new NullPointerException();});}
}运行结果
Exception in thread pool-1-thread-1 java.lang.NullPointerExceptionat com.example.dto.ThreadTest.lambda$main$0(ThreadTest.java:17)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)可以发现默认打印的线程池名字是pool-1-thread-1如果排查问题起来并不友好。因此建议大家给自己线程池自定义个容易识别的名字。其实用CustomizableThreadFactory即可正例如下
public class ThreadTest {public static void main(String[] args) throws Exception {ThreadPoolExecutor executorOne new ThreadPoolExecutor(5, 5, 1,TimeUnit.MINUTES, new ArrayBlockingQueueRunnable(20),new CustomizableThreadFactory(Tianluo-Thread-pool));executorOne.execute(()-{throw new NullPointerException();});}
}线程池参数设置不合理
线程池最容易出坑的地方就是线程参数设置不合理。比如核心线程设置多少合理最大线程池设置多少合理等等。当然这块不是乱设置的需要结合具体业务。
比如线程池如何调优如何确认最佳线程数
最佳线程数目 线程等待时间线程CPU时间/线程CPU时间 * CPU数目我们的服务器CPU核数为8核一个任务线程cpu耗时为20ms线程等待网络IO、磁盘IO耗时80ms那最佳线程数目( 80 20 )/20 * 8 40。也就是设置 40个线程数最佳。
线程池异常处理的坑
public class ThreadTest {public static void main(String[] args) throws Exception {ThreadPoolExecutor executorOne new ThreadPoolExecutor(5, 5, 1,TimeUnit.MINUTES, new ArrayBlockingQueueRunnable(20),new CustomizableThreadFactory(Tianluo-Thread-pool));for (int i 0; i 5; i) {executorOne.submit(()-{System.out.println(current thread name Thread.currentThread().getName());Object object null;System.out.print(result## object.toString());});}}
}按道理运行这块代码应该抛空指针异常才是的对吧。但是运行结果却是这样的;
current thread nameTianluo-Thread-pool1
current thread nameTianluo-Thread-pool2
current thread nameTianluo-Thread-pool3
current thread nameTianluo-Thread-pool4
current thread nameTianluo-Thread-pool5这是因为使用submit提交任务不会把异常直接这样抛出来。最好就是try...catch捕获
public class ThreadTest {public static void main(String[] args) throws Exception {ThreadPoolExecutor executorOne new ThreadPoolExecutor(5, 5, 1,TimeUnit.MINUTES, new ArrayBlockingQueueRunnable(20),new CustomizableThreadFactory(Tianluo-Thread-pool));for (int i 0; i 5; i) {executorOne.submit(()-{System.out.println(current thread name Thread.currentThread().getName());try {Object object null;System.out.print(result## object.toString());}catch (Exception e){System.out.println(异常了e);}});}}
}也可以使用Future.get来获取异常。
线程池使用完毕后忘记关闭
如果线程池使用完忘记关闭的话有可能会导致内存泄露问题。所以大家使用完线程池后记得关闭一下。同时线程池最好也设计成单例模式给它一个好的命名以方便排查问题。
public class ThreadTest {public static void main(String[] args) throws Exception {ThreadPoolExecutor executorOne new ThreadPoolExecutor(5, 5, 1,TimeUnit.MINUTES, new ArrayBlockingQueueRunnable(20), new CustomizableThreadFactory(Tianluo-Thread-pool));executorOne.execute(() - {});//关闭线程池executorOne.shutdown();}
}ThreadLocal与线程池搭配线程复用导致信息错乱
使用ThreadLocal缓存信息如果配合线程池一起有可能出现信息错乱的情况。先看下一下例子
private static final ThreadLocalInteger currentUser ThreadLocal.withInitial(() - null);GetMapping(wrong)
public Map wrong(RequestParam(userId) Integer userId) {//设置用户信息之前先查询一次ThreadLocal中的用户信息String before Thread.currentThread().getName() : currentUser.get();//设置用户信息到ThreadLocalcurrentUser.set(userId);//设置用户信息之后再查询一次ThreadLocal中的用户信息String after Thread.currentThread().getName() : currentUser.get();//汇总输出两次查询结果Map result new HashMap();result.put(before, before);result.put(after, after);return result;
}按理说每次获取的before应该都是null但是呢程序运行在 Tomcat 中执行程序的线程是Tomcat的工作线程而Tomcat的工作线程是基于线程池的。
线程池会重用固定的几个线程一旦线程重用那么很可能首次从 ThreadLocal 获取的值是之前其他用户的请求遗留的值。这时ThreadLocal 中的用户信息就是其他用户的信息。
把tomcat的工作线程设置为1
server.tomcat.max-threads1用户1请求过来会有以下结果符合预期 用户2请求过来会有以下结果「不符合预期」 因此使用类似 ThreadLocal 工具来存放一些数据时需要特别注意在代码运行完后显式地去清空设置的数据正例如下
GetMapping(right)
public Map right(RequestParam(userId) Integer userId) {String before Thread.currentThread().getName() : currentUser.get();currentUser.set(userId);try {String after Thread.currentThread().getName() : currentUser.get();Map result new HashMap();result.put(before, before);result.put(after, after);return result;} finally {//在finally代码块中删除ThreadLocal中的数据确保数据不串currentUser.remove();}
}13.机器问题 GC、线程打满、太多IO资源没关闭等等
有时候我们的接口慢就是机器处理问题。主要有fullGC、线程打满、太多IO资源没关闭等等。
GC
比如说要导出60W的excel的时候卡死了接着收到了监控告警。排查得出代码是Apache POI生成的excel导出excel数据量很大时当时JVM内存吃紧会直接Full GC了。
无论是Young GC还是Full GC在进行垃圾回收时都会暂停所有的业务线程。因此需要关注垃圾回收的频率以确保对业务的影响尽可能小。
一般情况下通过调整堆大小和新生代大小可以解决大部分垃圾回收问题。其中新生代是用于存放新创建的对象的区域。对于Young GC的频率增加的情况一般是系统的请求量大量增长导致。但如果young gc增长非常多就需要考虑是否需要增加新生代的大小。
因为如果新生代过小很容易被打满。这导致本可以被Young GC掉的对象被晋升Promotion到老年代过早地进入老年代。这样一来不仅Young GC频繁触发Full GC也会频繁触发。
线程
如果线程打满了也会导致接口都在等待了。所以。如果是高并发场景我们需要接入限流把多余的请求拒绝掉。
资源
如果IO资源没关闭也会导致耗时增加。这个大家可以看下平时你的电脑一直打开很多很多文件是不是会觉得很卡。
提升服务器硬件
如果cpu负载较高 可以考虑提高每个实例cpu数量提高实例个数。同时关注网络IO负载如果机器流量较大网卡带宽可能成为瓶颈。
高峰期和低峰期如果机器负载相差较大可以考虑设置弹性伸缩策略高峰期之前自动扩容低峰期自动缩容最大程度提高资源利用率。
关于JVM调优部分的内容将会在后续专门的出一些文章因为目前笔者对这方面理解还不够所以暂不多做赘述
14.调用链路优化
在评估接口性能时我们需要首先找出最耗时的部分并优化它这样优化效果才会立竿见影。
跨地域调用
假如说北京到上海的跨地域调用需要耗费大约30毫秒的时间这个耗时是相当高的所以我们应该特别关注调用链路上是否存在跨地域调用的情况。这些跨地域调用包括Rpc调用、Http调用、数据库调用、缓存调用以及MQ调用等等。在整理调用链路的时候我们还应该标注出跨地域调用的次数例如跨地域调用数据库可能会出现多次在链路上我们需要明确标记。我们可以考虑通过降低调用次数来提高性能因此在设计优化方案时我们应该特别关注如何减少跨地域调用的次数。
举个例子在某种情况下假设上游服务在上海而我们的服务在北京和上海都有部署但是数据库和缓存的主节点都在北京这时候就无法避免跨地域调用。那么我们该如何进行优化呢考虑到我们的服务会更频繁地访问数据库和缓存如果让我们上海节点的服务去访问北京的数据库和缓存那么跨地域调用的次数就会非常多。因此我们应该让上游服务去访问我们在北京的节点这样只会有1次跨地域调用而我们的服务在访问数据库和缓存时就无需进行跨地域调用。
单元化架构不同的用户路由到不同的集群单元
如果主数据库位于北京那么南方的用户每次写请求就只能通过跨地域访问来完成吗实际上并非如此。数据库的主库不仅可以存在于一个地域而是可以在多个地域上部署主数据库。将每个用户归属于最近的地域该用户的请求都会被路由到所在地域的数据库。这样的部署不仅提升了系统性能还提高了系统的容灾等级即使单个机房发生故障也不会影响全网的用户。
这个思想类似于CDN内容分发网络它能够将用户请求路由到最近的节点。事实上由于用户的存储数据已经在该地域的数据库中用户的请求极少需要切换到其他地域。
为了实现这一点我们需要一个用户路由服务来提供用户所在地域的查询并且能够提供高并发的访问。
除了数据库之外其他的存储中间件如MQ、Redis等以及Rpc框架都需要具备单元化架构能力。
微服务拆分过细会导致Rpc调用较多
微服务拆分过细会导致更多的RPC调用一次简单的请求可能就涉及四五个服务当访问量非常高时多出来的三五次Rpc调用会导致接口耗时增加很多。
每个服务都需要处理网络IO序列化反序列化服务的GC 也会导致耗时增加这样算下来一个大服务的性能往往优于5个微服务。
当然服务过于臃肿会降低开发维护效率也不利于技术升级。微服务过多也有问题例如增加整体链路耗时、基础架构升级工作量变大、单个需求代码变更的服务更多等弊端。需要你权衡开发效率、线上性能、领域划分等多方面因素。
提前过滤减少无效调用
在某些活动匹配的业务场景里相当多的请求实际上是不满足条件的如果能尽早的过滤掉这些请求就能避免很多无效查询。例如用户匹配某个活动时会有非常多的过滤条件如果该活动的特点是仅少量用户可参加那么可首先使用人群先过滤掉大部分不符合条件的用户。
拆分接口
前面提到如果Http接口功能过于庞大核心数据和非核心数据杂糅在一起耗时高和耗时低的数据耦合在一起。为了优化请求的耗时可以通过拆分接口将核心数据和非核心数据分别处理从而提高接口的性能。
而在Rpc接口方面也可以使用类似的思路进行优化。当上游需要调用多个Rpc接口时可以并行地调用这些接口。优先返回核心数据如果处理非核心数据或者耗时高的数据超时则直接降级只返回核心数据。这种方式可以提高接口的响应速度和效率减少不必要的等待时间。
相关优秀博客
后端接口性能优化分析-问题发现问题定义-CSDN博客
后端接口性能优化分析-多线程优化-CSDN博客
后端接口性能优化分析-数据库优化-CSDN博客