网站优化软件破解版,佛山新网站建设渠道,网站制作安全防范方式,铁岭手机网站建设如何通过自定义线程池提升系统稳定性
背景
在高并发系统中#xff0c;线程池管理至关重要。默认线程池可能导致#xff1a;
资源浪费#xff08;创建过多线程导致 OOM#xff09;任务堆积#xff08;队列满后任务被拒绝#xff09;任务丢失#xff08;默认拒绝策略丢…如何通过自定义线程池提升系统稳定性
背景
在高并发系统中线程池管理至关重要。默认线程池可能导致
资源浪费创建过多线程导致 OOM任务堆积队列满后任务被拒绝任务丢失默认拒绝策略丢弃任务 为了防止这些问题我们使用 Spring Boot 自定义线程池并优化 异常处理 和 拒绝策略。
线程池方案设计
在 ExecutorConfig 类中我们定义了两个线程池
myExecutor用于普通任务采用CallerRunsPolicy 避免任务丢失。oneExecutor用于信号计算任务单线程模式具有 自定义异常处理 和 阻塞式拒绝策略。
代码解析
线程池 myExecutor通用任务池
Bean(name myExecutor)
public Executor myExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadProperties.getCorePoolSize());executor.setMaxPoolSize(threadProperties.getMaxPoolSize());executor.setQueueCapacity(threadProperties.getQueueCapacity());executor.setThreadNamePrefix(signal-executor-);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;
}设计要点 CallerRunsPolicy线程池满了主线程执行任务防止丢失但可能影响性能。 线程池 oneExecutor单线程计算池
Bean(name oneExecutor)
public Executor oneExecutor() {ThreadFactory threadFactory new BasicThreadFactory.Builder().uncaughtExceptionHandler(new MyThreadException()).namingPattern(one-thread-%s).build();ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setMaxPoolSize(1);executor.setQueueCapacity(1);executor.setThreadFactory(threadFactory);executor.setThreadGroup(new ThreadGroup(1));executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());executor.initialize();return executor;
}设计要点 单线程池保证任务顺序执行如果无须那就按照当前的服务节点配置来设置参数 自定义异常处理防止线程因异常崩溃 自定义拒绝策略任务队列满时阻塞等待 自定义异常处理
class MyThreadException implements Thread.UncaughtExceptionHandler {Overridepublic void uncaughtException(Thread t, Throwable e) {log.error(异常: {}线程: {}, ExceptionUtils.getStackTrace(e), t.getName());}
}作用防止线程因未捕获异常直接终止提升系统稳定性。当然这个是处理线程池中子任务处理业务逻辑的时候发生业务异常的处理方式除此之外还有其他的解决方案 异常处理
afterExecute() 处理异常可扩展 用于处理执行过程中抛出的异常uncaughtExceptionHandler 处理未捕获异常默认 JVM 打印堆栈 用于处理线程未捕获的异常RejectedExecutionHandler 处理任务拒绝处理任务被拒绝的情况。 处理顺序 当任务执行时如果任务抛出异常它会首先被 afterExecute() 捕获并且你可以在这里进行进一步的处理。如果任务中的异常没有被 afterExecute() 捕获或处理且是未捕获异常它会交由 uncaughtExceptionHandler 进行处理。RejectedExecutionHandler 是处理线程池拒绝接受新任务的情况这通常和任务执行过程中的异常无关主要处理线程池饱和时的情况。 注意beforeExecute() 在任务开始执行前调用通常用于准备工作 异常处理上beforeExecute() 不会直接处理任务执行过程中的异常但可以捕获并处理自己内部的异常 相关源码分析 public void execute(Runnable command) { if (command null) throw new NullPointerException(); int c ctl.get(); // 1️⃣ 线程池当前线程数 corePoolSize则尝试新增核心线程执行任务 if (workerCountOf© corePoolSize) { if (addWorker(command, true)) return; c ctl.get(); } // 2️⃣ 线程池已满尝试加入工作队列 if (isRunning© workQueue.offer(command)) { int recheck ctl.get(); if (!isRunning(recheck) remove(command)) reject(command); // 任务队列中的任务被拒绝 else if (workerCountOf(recheck) 0) addWorker(null, false); // 防止线程池为空确保有线程执行任务 } // 3️⃣ 线程池满且队列满尝试新增非核心线程 else if (!addWorker(command, false)) reject(command); // 线程池已满拒绝任务 } final void runWorker(Worker w) { Thread wt Thread.currentThread(); Runnable task w.firstTask; w.firstTask null; w.unlock(); // 允许中断 boolean completedAbruptly true; try { while (task ! null || (task getTask()) ! null) { w.lock(); // 1️⃣ 执行任务 beforeExecute(wt, task); Throwable thrown null; try { task.run(); // ⚠ 任务执行点 } catch (RuntimeException x) { thrown x; throw x; } catch (Error x) { thrown x; throw x; } catch (Throwable x) { thrown x; throw new Error(x); } finally { afterExecute(task, thrown); // 2️⃣ 任务执行后的扩展方法 } task null; w.completedTasks; } completedAbruptly false; } finally { processWorkerExit(w, completedAbruptly); // 3️⃣ 任务异常退出删除该线程 } } 自定义拒绝策略-重新放回队列中
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {if (!executor.isShutdown()) {log.info(队列已满阻塞等待...);executor.getQueue().put(r);log.info(任务已加入队列);}} catch (Exception e) {log.error(拒绝策略异常, e);}}
}作用 默认拒绝策略丢弃任务而此策略会阻塞等待确保任务不丢失。 适用于任务量较大但不能丢失任务的场景如消息队列处理 自定义拒绝策略-主线程执行 /*** 自定义线程池防止使用默认线程池导致内存溢出** param* return* author bu.junjie* date 2021/11/10 10:00*/Bean(name myExecutor)public Executor myExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadProperties.getCorePoolSize());executor.setMaxPoolSize(threadProperties.getMaxPoolSize());executor.setQueueCapacity(threadProperties.getQueueCapacity());executor.setThreadNamePrefix(signal-executor-);// 使用此策略如果添加到线程池失败那么主线程会自己去执行该任务不会等待线程池中的线程去执行阻塞主线程executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}适用场景
✅ 高并发请求如 HTTP 任务 ✅ 后台数据处理如日志分析、批量计算 ✅ 长时间任务如大文件处理、消息队列消费
总结
自定义线程池 防止资源浪费提升吞吐量。异常处理 避免线程因未捕获异常而终止。优化拒绝策略 防止任务丢失提高系统可靠性。
线程池优化是高并发系统的关键希望本篇博客能帮助你更好地理解和应用线程池
完整代码示例
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import javax.annotation.Resource;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;/*** 线程池配置参数** version 1.0.0* createTime 2025-11-09 14:01*/
Configuration
EnableAsync
Slf4j
public class ExecutorConfig {Resourceprivate ThreadProperties threadProperties;/*** 自定义线程池防止使用默认线程池导致内存溢出** param* return* author bu.junjie* date 2021/11/10 10:00*/Bean(name myExecutor)public Executor myExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(threadProperties.getCorePoolSize());executor.setMaxPoolSize(threadProperties.getMaxPoolSize());executor.setQueueCapacity(threadProperties.getQueueCapacity());executor.setThreadNamePrefix(signal-executor-);// 使用此策略如果添加到线程池失败那么主线程会自己去执行该任务不会等待线程池中的线程去执行阻塞主线程executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}/*** 信号计算时的线程池(1号线程池)** param* return* author bu.junjie* date 2022/1/5 13:01*/Bean(name oneExecutor)public Executor oneExecutor() {ThreadFactory threadFactory new BasicThreadFactory.Builder().uncaughtExceptionHandler(new MyThreadException()).namingPattern(one-thread-%s).build();ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(1);executor.setMaxPoolSize(1);executor.setThreadFactory(threadFactory);executor.setQueueCapacity(1);executor.setThreadGroup(new ThreadGroup(1));executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());executor.initialize();return executor;}class MyThreadException implements Thread.UncaughtExceptionHandler {/*** Method invoked when the given thread terminates due to the* given uncaught exception.* pAny exception thrown by this method will be ignored by the* Java Virtual Machine.** param t the thread* param e the exception*/Overridepublic void uncaughtException(Thread t, Throwable e) {log.error(MyThreadException is exception【{}】,Thread 【{}】, ExceptionUtils.getStackTrace(e), t.getName());}}/*** 拒绝策略优化** param* author bu.junjie* date 2022/1/8 14:06* return*/public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {// 核心改造点由blockingqueue的offer改成put阻塞方法if (!executor.isShutdown()) {long start System.currentTimeMillis();log.info(当前阻塞队列已满开始请求存放队列束);executor.getQueue().put(r);log.info(存放阻塞队列成功,阻塞时间time 【{}】, System.currentTimeMillis() - start);}} catch (Exception e) {e.printStackTrace();}}}}思考
为什么拒绝策略要重新抛出异常
我们会发现默认的四种拒绝策略在处理完业务逻辑之后还会重新抛出异常就算你是自定义的拒绝策略也需要重新抛出异常为什么呢不抛出会怎么样
如果不抛出异常调用方业务代码无法感知任务被拒绝可能导致任务丢失或业务逻辑异常。
场景分析
当线程池队列满了时会触发 rejectedExecution 方法。如果我们只是记录日志而不抛出异常
主线程会继续执行但任务并未真正执行业务方无法感知到这个问题。可能导致数据丢失尤其是在关键业务如支付、订单、消息处理场景中。
重新抛出异常的好处
✅ 保证调用方可以感知任务拒绝决定是否降级处理、重试或报警。 ✅ 防止静默丢失任务保证业务的可靠性。 ✅ 与 Spring 线程池默认行为保持一致防止意外吞掉异常。
代码示例
❌ 错误示例未抛出异常可能导致任务丢失
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {if (!executor.isShutdown()) {log.warn(队列已满任务阻塞等待...);executor.getQueue().put(r); // 可能抛出异常log.info(任务已放入队列);}} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 仅恢复中断状态但未通知调用方}}
} 问题 调用方不会收到异常以为任务已经成功执行但其实可能丢失了。 例如在支付系统中如果订单更新任务丢失可能导致订单状态未更新。 ✅ 正确示例重新抛出异常保证调用方可感知
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {if (!executor.isShutdown()) {log.warn(队列已满阻塞等待...);executor.getQueue().put(r);log.info(任务成功进入队列);return; // 任务成功加入队列后不需要抛异常}} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 恢复线程中断状态throw new RejectedExecutionException(任务提交被中断, e);} catch (Exception e) {log.error(任务拒绝策略发生异常, e);throw new RejectedExecutionException(自定义拒绝策略异常, e);}}
}改进点 任务成功放入队列时不会抛异常避免不必要的错误。 如果 put() 失败抛出 RejectedExecutionException让业务方感知。 捕获 InterruptedException 并恢复中断状态避免影响后续任务。 其实这个原因和为什么需要恢复线程中断一样的逻辑也是为了让调用方感知到 业务方如何处理异常
如果 rejectedExecution 抛出 RejectedExecutionException业务代码可以捕获异常并进行降级例如
try {executor.execute(task);
} catch (RejectedExecutionException e) {log.error(线程池已满任务执行失败进行降级处理, e);// 业务降级策略例如saveToDatabaseForLaterProcessing(task);
}降级方案如果线程池拒绝任务可以存入 数据库、MQ 或 重试队列避免任务丢失。 结论 必须重新抛出异常否则
任务可能悄悄丢失业务方无法感知。可能影响数据一致性如支付、订单、日志处理。业务代码无法主动补救重试、降级等。
最佳实践
成功放入队列 → 不抛异常任务无法处理 → 抛出 RejectedExecutionException让调用方感知 这样可以既保证任务不丢失又确保调用方有能力处理拒绝任务
自定义拒绝策略put方法
其实默认拒绝策略是offer方法是非阻塞的也就是只要队列中的任务只要有那就去创建子线程直至触发拒绝策略 ✅ 正确示例
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {System.out.println(队列已满阻塞等待...);executor.getQueue().put(r); // 阻塞等待队列有空位System.out.println(任务重新加入队列 r.toString());} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RejectedExecutionException(任务提交失败线程被中断, e);}}
}