什么是网站建设方案书,注册公司100万意味着什么,内部网站做登陆内部链接,金蝶库存管理软件说在前面
在40岁老架构师 尼恩的读者社区(50)中#xff0c;最近有小伙伴拿到了一线互联网企业如极兔、有赞、希音、百度、网易的面试资格#xff0c;遇到了几个很重要的面试题#xff1a; 如何设计线程池#xff1f; 与之类似的、其他小伙伴遇到过的问题还有#xff1a; …说在前面
在40岁老架构师 尼恩的读者社区(50)中最近有小伙伴拿到了一线互联网企业如极兔、有赞、希音、百度、网易的面试资格遇到了几个很重要的面试题 如何设计线程池 与之类似的、其他小伙伴遇到过的问题还有 请手写一个简单线程池 这个问题很多小伙伴在社群里边反馈都遇到过 线程池的知识既是面试的核心知识又是开发的核心知识。
所以这里尼恩给大家做一下系统化、体系化的线程池梳理使得大家可以充分展示一下大家雄厚的 “技术肌肉”让面试官爱到 “不能自已、口水直流”。
也一并把这个题目以及参考答案收入咱们的 《尼恩Java面试宝典》V62版本供后面的小伙伴参考提升大家的 3高 架构、设计、开发水平。 注本文以 PDF 持续更新最新尼恩 架构笔记、面试题 的PDF文件请从公众号 【技术自由圈】获取。 为什么要使用线程池
多线程编程是在开发过程中非常基础且非常重要的一个环节基本上任何一家软件公司或者项目中都会使用多线程。主要有三个原因
降低资源的消耗。降低线程创建和销毁的资源消耗。提高响应速度线程的创建时间为T1执行时间T2销毁时间T3免去T1和T3的时间提高线程的可管理性
总之线程池是一种常用的并发编程工具它可以帮助我们更好地管理和复用线程资源提高程序的性能和稳定性。
线程池也是 3高架构的基础技术。
JAVA中的线程池组件
在Java中我们可以使用java.util.concurrent包中提供的ThreadPoolExecutor类来创建和使用线程池。
ThreadPoolExecutor 是非常高频非常常用的组件。
对于 ThreadPoolExecutor 的底层原理和源码大家要做到非常深入的掌握。大家一定要深入看ThreadPoolExecutor线程池源码了解其执行过程。
另外看懂线程池执行流程和源码设计有助于提升我们多线程编程技术和解决工作中遇到的问题。
手写线程池的重要性
很多小伙伴给尼恩反馈 说线程池的源码太难看不懂。
怎么办呢
大家可以先易后难。
可以手撸一个简单版的线程池加强一下对执行流程的理解。然后再深入源码去历险记。
或者说如果我们想要更好地理解线程池的工作原理那么自己动手实现一个简单的线程池是一个很好的选择。
接下来我将带领大家一步一步地实现一个简单的线程池。
我们将从最基本的功能开始逐步添加更多的功能和优化最终实现一个完整的线程池。
线程池的实现原理
线程池是一个典型的生产者-消费者模型。下图所示为线程池的实现原理
调用方不断向线程池中提交任务 生产者线程池中有一组线程不断地从队列中取任务消费者线程池管理一个任务队列对 异步任务进行缓冲 缓冲区 要实现一个线程池有几个问题需要考虑
队列设置多长 如果是无界的调用方不断往队列中方任务可能导致内存耗尽。如果是有界的当队列满了之后调用方如何处理线程池中的线程个数是固定的还是动态变化的每次提交新任务是放入队列还是开新线程当没有任务的时候线程是睡眠一小段时间还是进入阻塞如果进入阻塞如何唤醒
针对问题4有3种做法
不使用阻塞队列只使用一般的线程安全的队列也无阻塞/唤醒机制。 当队列为空时线程池中的线程只能睡眠一会儿然后醒来去看队列中有没有新任务到来如此不断轮询。不使用阻塞队列但在队列外部线程池内部实现了阻塞/唤醒机制使用阻塞队列
很显然做法3最完善既避免了线程池内部自己实现阻塞/唤醒机制的麻烦也避免了做法1的睡眠/轮询带来的资源消耗和延迟。
现在来带大家手写一个简单的线程池让大家更加理解线程池的工作原理
手写一个简单线程池
第一步定义线程池接口
首先我们需要定义一个线程池接口用来表示线程池应该具备哪些功能。
一个简单的线程池应该至少具备以下几个功能
添加任务并执行关闭线程池强制关闭线程池
因此我们可以定义一个ThreadPool接口它包含三个方法execute、shutdown和shutdownNow。
import java.util.List;// 线程池接口
public interface ThreadPool {// 提交任务到线程池void execute(Runnable task);// 优雅关闭void shutdown();//立即关闭ListRunnable shutdownNow();
}其中
execute方法用来添加任务并执行shutdown方法用来关闭线程池它会等待已经提交到线程池中的任务都执行完毕后再关闭shutdownNow方法用来强制关闭线程池它会立即停止所有正在执行或等待执行的任务并返回未执行的任务列表。
第二步实现线程的池化管理
接下来我们需要实现一个简单的线程池类它实现了ThreadPool接口并提供了基本的功能。
为了简化代码先实现一个v1版本这是一个基础版本一个简单的实现示例。
在v1版本中我们先不考虑拒绝策略、自动调节线程资源等高级功能。
下面是一个简单的实现示例
首先定义一个工作线程类
// 定义一个工作线程类
public class WorkerThread extends Thread {// 用于从任务队列中取出并执行任务private BlockingQueueRunnable taskQueue;// 构造方法传入任务队列public WorkerThread(BlockingQueueRunnable taskQueue) {this.taskQueue taskQueue;}// 重写run方法Overridepublic void run() {// 循环执行直到线程被中断while (!Thread.currentThread().isInterrupted() !taskQueue.isEmpty()) {try {// 从任务队列中取出一个任务如果队列为空则阻塞等待Runnable task taskQueue.take();// 执行任务task.run();} catch (Exception e) {e.printStackTrace();// 如果线程被中断则退出循环break;}}}
}然后, 基于一个线程池接口实现一个简单的线程池,
// 简单的线程池实现
public class SimpleThreadPool implements ThreadPool {// 线程池初始化时的线程数量private int initialSize;// 任务队列private BlockingQueueRunnable taskQueue;// 用于存放和管理工作线程的集合private ListWorkerThread threads;// 是否已经被shutdown标志private volatile boolean isShutdown false;public SimpleThreadPool(int initialSize) {this.initialSize initialSize;taskQueue new LinkedBlockingQueue();threads new ArrayList(initialSize);// 初始化方法创建一定数量的工作线程并启动它们for (int i 0; i initialSize; i) {WorkerThread workerThread new WorkerThread(taskQueue);workerThread.start();threads.add(workerThread);}}// 实现execute方法用于将任务加入到任务队列并通知工作线程来执行Overridepublic void execute(Runnable task) {if (isShutdown) {throw new IllegalStateException(ThreadPool is shutdown);}taskQueue.offer(task);}// 关闭线程池, 等待所有线程执行完毕Overridepublic void shutdown() {// 修改状态isShutdown true;for (WorkerThread thread : threads) {// 中断线程thread.interrupt();}}Overridepublic ListRunnable shutdownNow() {// 修改状态isShutdown true;// 清空队列ListRunnable remainingTasks new ArrayList();taskQueue.drainTo(remainingTasks);// 中断所有线程for (WorkerThread thread : threads) {thread.interrupt();}// 返回未执行任务集合return remainingTasks;}
}这个版本的线程池实现了基本的添加任务并执行、关闭线程池和强制关闭线程池等功能。
它在构造方法中接收一个初始化线程池大小参数用于初始化任务队列和工作线程集合并创建一定数量的工作线程。
第三步自定义线程池的基本参数
在上一步中我们实现了一个简单的线程池它具备了基本的功能。
但是它存在一个问题任务队列没有指定容量大小是个无界队列其次只指定了初始的线程池大小应该要提供根据不同的应用场景来调整线程池的大小参数以提高性能和资源利用率。
因此线程池实现类需要实现自定义初始大小、最大大小以及核心大小的功能。
初始大小是指线程池初始化时创建的工作线程数量最大大小是指线程池能够容纳的最多的工作线程数量核心大小是指线程池在没有任务时保持存活的工作线程数量。
这三个参数需要在基本的线程池实现类中定义为成员变量并在构造方法中传入并赋值。
同时还需要在execute方法中根据这三个参数来动态地调整工作线程的数量例如
当活跃的工作线程数量小于核心大小时尝试创建并启动一个新的工作线程来执行任务当活跃的工作线程数量大于等于核心大小时将任务加入到任务队列等待空闲的工作线程来执行当任务队列已满时尝试创建并启动一个新的工作线程来执行任务当活跃的工作线程数量达到最大大小时无法再创建新的工作线程。
我们还需要在构造方法里提供一个参数queueSize用于限制队列大小。
下面我们就对类进行改造
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;public class SimpleThreadPool implements ThreadPool {// 线程池初始化时的线程数量private int initialSize;// 线程池最大线程数量private int maxSize;// 线程池核心线程数量private int coreSize;// 队列大小private int queueSize;// 任务队列private BlockingQueueRunnable taskQueue;// 用于存放和管理工作线程的集合private ListWorkerThread threads;// 是否已经被shutdown标志private volatile boolean isShutdown false;public SimpleThreadPool(int initialSize, int maxSize, int coreSiz, int queueSizee) {// 初始化参数this.initialSize initialSize;this.maxSize maxSize;this.coreSize coreSize;taskQueue new LinkedBlockingQueue(queueSize);threads new ArrayList(initialSize);// 初始化方法创建一定数量的工作线程并启动它们for (int i 0; i initialSize; i) {WorkerThread workerThread new WorkerThread();workerThread.start(taskQueue);threads.add(workerThread);}}Overridepublic void execute(Runnable task) {if (isShutdown) {throw new IllegalStateException(ThreadPool is shutdown);}// 当线程数量小于核心线程数时创建新的线程if (threads.size() coreSize) {addWorkerThread(task);} else if (!taskQueue.offer(task)) {// 当队列已满时且线程数量小于最大线程数量时创建新的线程if (threads.size() maxSize) {addWorkerThread(task);} else {throw new IllegalStateException(执行任务失败);}}}// 创建新的线程并执行任务private void addWorkerThread(Runnable task) {WorkerThread workerThread new WorkerThread();workerThread.start(taskQueue);threads.add(workerThread);// 任务放入队列taskQueue.offer(task);}省略其它代码}这一步我们在 SimpleThreadPool里新增了initialSizemaxSize coreSize 三个变量在构造方法里传入对应三个参数同时在execute方法里当有任务进入时先判断当前线程池数量是否满足不同条件进而执行不同的处理逻辑。
第四步设计饱和拒绝策略
这个功能是为了处理当任务队列已满且无法再创建新的工作线程时也是就线程池的工作量饱和时如何处理被拒绝的任务。
不同的场景可能需要不同的拒绝策略例如
直接抛出异常忽略任务阻塞当前线程等等
为了让用户可以自定义拒绝策略需要
定义一个拒绝策略接口声明一个方法用于处理被拒绝的任务。然后需要在基本的线程池实现类中定义一个拒绝策略成员变量并在构造方法中传入并赋值。最后在execute方法中在无法创建新的工作线程时调用拒绝策略来处理该任务。
下面是一个简单的实现示例
我们首先定义了一个RejectedExecutionHandler接口用来表示拒绝策略。用户可以根据需要实现这个接口并在构造线程池时传入自己的拒绝策略。
public interface RejectedExecutionHandler {// 参数r 代表被拒绝的任务executor 代表线程池对象void rejectedExecution(Runnable r, ThreadPool executor);
}我们再实现一个直接抛出异常的拒绝策略
// 直接抛出异常的拒绝策略
public class AbortPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPool executor) {throw new RuntimeException(The thread pool is full and the task queue is full.);}
}我们也可以实现一个丢弃策略
// 忽略任务的拒绝策略
public class DiscardPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPool executor) {// do nothingSystem.out.println(Task rejected: r);}
}接下来我们再优化SimpleThreadPool类
public class SimpleThreadPool implements ThreadPool {// 线程池初始化时的线程数量private int initialSize;// 线程池最大线程数量private int maxSize;// 线程池核心线程数量private int coreSize;// 队列大小private int queueSize;// 任务队列private BlockingQueueRunnable taskQueue;// 用于存放和管理工作线程的集合private ListWorkerThread threads;// 是否已经被shutdown标志private volatile boolean isShutdown false;// 默认的拒绝策略private final static RejectedExecutionHandler DEFAULT_REJECT_HANDLER new AbortPolicy();// 拒绝策略成员变量private final RejectedExecutionHandler rejectHandler;public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize) {this(initialSize, maxSize, coreSize, queueSize, DEFAULT_REJECT_HANDLER);}public SimpleThreadPool(int initialSize, int maxSize, int coreSize , int queueSize, RejectedExecutionHandler rejectHandler) {System.out.printf(初始化线程池: initialSize: %d, maxSize: %d, coreSize: %d%n, initialSize, maxSize, coreSize);// 初始化参数this.initialSize initialSize;this.maxSize maxSize;this.coreSize coreSize;taskQueue new LinkedBlockingQueue(queueSize);threads new ArrayList(initialSize);this.rejectHandler rejectHandler;// 初始化方法创建一定数量的工作线程并启动它们for (int i 0; i initialSize; i) {WorkerThread workerThread new WorkerThread(taskQueue);workerThread.start();threads.add(workerThread);}}Overridepublic void execute(Runnable task) {System.out.printf(添加任务: %s%n, task.toString());if (isShutdown) {throw new IllegalStateException(ThreadPool is shutdown);}// 当线程数量小于核心线程数时创建新的线程if (threads.size() coreSize) {addWorkerThread(task);System.out.printf(创建新的线程: thread count: %d, number of queues: %d%n, threads.size(), taskQueue.size());} else if (!taskQueue.offer(task)) {// 当队列已满时且线程数量小于最大线程数量时创建新的线程if (threads.size() maxSize) {addWorkerThread(task);System.out.printf(创建新的线程: thread count: %d, number of queues: %d%n, threads.size(), taskQueue.size());} else {//使用拒绝策略rejectHandler.rejectedExecution(task, this);}}}// 省略其它代码
}这个版本的线程池在构造方法中新增了一个handler参数用来表示拒绝策略。当任务队列已满时它会调用handler的rejectedExecution方法来处理被拒绝的任务。
第五步性能优化之自动调节线程资源
到目前为止我们已经实现了一个简单但功能完备的线程池。
但是它还有很多可以优化和扩展的地方。
例如可以添加自动调节线程资源的功能为啥需要 自动调节 线程资源呢
因为线程资源是非常昂贵的。
自动调节 线程资源功能是为了让线程池可以根据任务的变化动态地增加或减少工作线程的数量以提高性能和资源利用率。
为了实现这个功能需要在基本的线程池实现类中定义一个空闲时长成员变量并在构造方法中传入并赋值。
空闲时长是指当工作线程没有任务执行时可以保持存活的时间。
如果超过这个时间还没有新的任务那么工作线程就会自动退出。
同时还需要在工作线程类中定义一个空闲开始时间成员变量并在run方法中更新它。
空闲开始时间是指当工作线程从任务队列中取出一个任务后上一次取出任务的时间。
如果当前时间减去空闲开始时间大于空闲时长那么工作线程就会自动退出。
ok那么我们继续改进线程池,
public class SimpleThreadPool implements ThreadPool {// 省略其它代码// 线程空闲时长private long keepAliveTime;public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize, long keepAliveTime) {this(initialSize, maxSize, coreSize, queueSize, keepAliveTime, DEFAULT_REJECT_HANDLER);}public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize, long keepAliveTime, RejectedExecutionHandler rejectHandler) {System.out.printf(初始化线程池: initialSize: %d, maxSize: %d, coreSize: %d%n, initialSize, maxSize, coreSize);// 初始化参数this.initialSize initialSize;this.maxSize maxSize;this.coreSize coreSize;taskQueue new LinkedBlockingQueue(queueSize);threads new ArrayList(initialSize);this.rejectHandler rejectHandler;this.keepAliveTime keepAliveTime;// 初始化方法创建一定数量的工作线程并启动它们for (int i 0; i initialSize; i) {// 传入相关参数到工作线程WorkerThread workerThread new WorkerThread(keepAliveTime, taskQueue, threads);workerThread.start();threads.add(workerThread);}}// 省略其它代码
}然后改造工作线程WorkerThread
// 定义一个工作线程类
public class WorkerThread extends Thread {private ListWorkerThread threads;// 空闲时长private long keepAliveTime;// 用于从任务队列中取出并执行任务private BlockingQueueRunnable taskQueue;// 构造方法传入任务队列public WorkerThread(long keepAliveTime, BlockingQueueRunnable taskQueue, ListWorkerThread threads) {this.keepAliveTime keepAliveTime;this.taskQueue taskQueue;this.threads threads;}// 重写run方法Overridepublic void run() {long lastActiveTime System.currentTimeMillis();// 循环执行直到线程被中断Runnable task;while (!Thread.currentThread().isInterrupted() !taskQueue.isEmpty()) {try {// 从任务队列中取出一个任务如果队列为空则阻塞等待task taskQueue.poll(keepAliveTime, TimeUnit.MILLISECONDS);if (task ! null) {task.run();System.out.printf(WorkerThread %d, current task: %s%n, Thread.currentThread().getId(), task.toString());lastActiveTime System.currentTimeMillis();} else if (System.currentTimeMillis() - lastActiveTime keepAliveTime) {// 从线程池中移除threads.remove(this);System.out.printf(WorkerThread %d exit %n, Thread.currentThread().getId());break;}} catch (Exception e) {// 从线程池中移除threads.remove(this);e.printStackTrace();// 如果线程被中断则退出循环break;}}}
}在WorkerThread类run方法里采用taskQueue.poll方法指定等待时长这里是线程退出的关键。
如果超时未获取到任务则表明当前线程长时间未处理任务可以正常退出并从线程池里移除该线程。
手写一个简单线程池的完整代码
下面我们给出完整的所有代码
拒绝策略相关类
// 拒绝策略接口
public interface RejectedExecutionHandler {// 参数r 代表被拒绝的任务executor 代表线程池对象void rejectedExecution(Runnable r, ThreadPool executor);
}// 忽略任务的拒绝策略
public class DiscardPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPool executor) {// do nothingRunnableWrapper wrapper (RunnableWrapper) r;System.out.println(Task rejected: wrapper.getTaskId());}
}为了通过输出日志清晰的展现线程池中任务的运行流程新增了RunnableWrapper用于记录taskId方便日志监控。
public class RunnableWrapper implements Runnable{private final Integer taskId;public RunnableWrapper(Integer taskId) {this.taskId taskId;}public Integer getTaskId() {return this.taskId;}Overridepublic void run() {System.out.println(Task taskId is running.);try {Thread.sleep(100);} catch (Exception e) {e.printStackTrace();// ignore}System.out.println(Task taskId is completed.);}
}
线程池接口
// 线程池接口
public interface ThreadPool {// 提交任务到线程池void execute(Runnable task);// 优雅关闭void shutdown();//立即关闭ListRunnable shutdownNow();
}工作线程类
// 定义一个工作线程类
public class WorkerThread extends Thread {private ListWorkerThread threads;// 空闲时长private long keepAliveTime;// 用于从任务队列中取出并执行任务private BlockingQueueRunnable taskQueue;// 构造方法传入任务队列public WorkerThread(long keepAliveTime, BlockingQueueRunnable taskQueue, ListWorkerThread threads) {this.keepAliveTime keepAliveTime;this.taskQueue taskQueue;this.threads threads;}// 重写run方法Overridepublic void run() {long lastActiveTime System.currentTimeMillis();// 循环执行直到线程被中断Runnable task;while (!Thread.currentThread().isInterrupted() || !taskQueue.isEmpty()) {try {// 从任务队列中取出一个任务如果队列为空则阻塞等待task taskQueue.poll(keepAliveTime, TimeUnit.MILLISECONDS);RunnableWrapper wrapper (RunnableWrapper) task;if (task ! null) {System.out.printf(WorkerThread %s, poll current task: %s%n, Thread.currentThread().getName(), wrapper.getTaskId());task.run();lastActiveTime System.currentTimeMillis();} else if (System.currentTimeMillis() - lastActiveTime keepAliveTime) {// 从线程池中移除threads.remove(this);System.out.printf(WorkerThread %s exit %n, Thread.currentThread().getName());break;}} catch (Exception e) {// 从线程池中移除System.out.printf(WorkerThread %s occur exception%n, Thread.currentThread().getName());threads.remove(this);e.printStackTrace();// 如果线程被中断则退出循环break;}}}
}简单线程池实现类
public class SimpleThreadPool implements ThreadPool {// 线程池初始化时的线程数量private int initialSize;// 线程池最大线程数量private int maxSize;// 线程池核心线程数量private int coreSize;// 任务队列private BlockingQueueRunnable taskQueue;// 用于存放和管理工作线程的集合private ListWorkerThread threads;// 是否已经被shutdown标志private volatile boolean isShutdown false;// 默认的拒绝策略private final static RejectedExecutionHandler DEFAULT_REJECT_HANDLER new AbortPolicy();// 拒绝策略成员变量private final RejectedExecutionHandler rejectHandler;// 线程空闲时长private long keepAliveTime;public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize, long keepAliveTime) {this(initialSize, maxSize, coreSize, queueSize, keepAliveTime, DEFAULT_REJECT_HANDLER);}public SimpleThreadPool(int initialSize, int maxSize, int coreSize, int queueSize, long keepAliveTime, RejectedExecutionHandler rejectHandler) {System.out.printf(初始化线程池: initialSize: %d, maxSize: %d, coreSize: %d%n, initialSize, maxSize, coreSize);// 初始化参数this.initialSize initialSize;this.maxSize maxSize;this.coreSize coreSize;taskQueue new LinkedBlockingQueue(queueSize);threads new ArrayList(initialSize);this.rejectHandler rejectHandler;this.keepAliveTime keepAliveTime;// 初始化方法创建一定数量的工作线程并启动它们for (int i 0; i initialSize; i) {WorkerThread workerThread new WorkerThread(keepAliveTime, taskQueue, threads);workerThread.start();threads.add(workerThread);}}Overridepublic void execute(Runnable task) {if (isShutdown) {throw new IllegalStateException(ThreadPool is shutdown);}RunnableWrapper wrapper (RunnableWrapper) task;System.out.printf(put task: %s %n , wrapper.getTaskId());// 当线程数量小于核心线程数时创建新的线程if (threads.size() coreSize) {addWorkerThread(task);System.out.printf(小于核心线程数创建新的线程: thread count: %d, queue remainingCapacity : %d%n, threads.size(), taskQueue.remainingCapacity());} else if (!taskQueue.offer(task)) {// 当队列已满时且线程数量小于最大线程数量时创建新的线程if (threads.size() maxSize) {addWorkerThread(task);System.out.printf(队列已满, 创建新的线程: thread count: %d, queue remainingCapacity : %d%n, threads.size(), taskQueue.remainingCapacity());} else {rejectHandler.rejectedExecution(task, this);}} else {System.out.printf(任务放入队列: thread count: %d, queue remainingCapacity : %d%n, threads.size(), taskQueue.remainingCapacity());}}// 创建新的线程并执行任务private void addWorkerThread(Runnable task) {WorkerThread workerThread new WorkerThread(keepAliveTime, taskQueue, threads);workerThread.start();threads.add(workerThread);// 任务放入队列try {taskQueue.put(task);} catch (InterruptedException e) {throw new RuntimeException(e);}}// 关闭线程池, 等待所有线程执行完毕Overridepublic void shutdown() {System.out.printf(shutdown thread, count: %d, queue remainingCapacity : %d%n, threads.size(), taskQueue.remainingCapacity());// 修改状态isShutdown true;for (WorkerThread thread : threads) {// 中断线程thread.interrupt();}}Overridepublic ListRunnable shutdownNow() {System.out.printf(shutdown thread now, count: %d, queue remainingCapacity : %d%n, threads.size(), taskQueue.remainingCapacity());// 修改状态isShutdown true;// 清空队列ListRunnable remainingTasks new ArrayList();taskQueue.drainTo(remainingTasks);// 中断所有线程for (WorkerThread thread : threads) {thread.interrupt();}// 返回未执行任务集合return remainingTasks;}
}第六步验证线程池
接下来我们编写一个测试用例来验证我们的线程池是否能正常运行。
最后我们编写了一个测试用例SimpleThreadPoolTest
// 定义一个测试用例类
public class SimpleThreadPoolTest {public static void main(String[] args) throws InterruptedException {SimpleThreadPool threadPool new SimpleThreadPool(1, 4, 2, 3, 2000, new DiscardPolicy());for (int i 0; i 10; i) {threadPool.execute(new RunnableWrapper(i));}Thread.sleep(10_000);threadPool.shutdown();}
}这个测试用例创建了一个拥有1个初始线程、最多4个线程、核心线程数为2、队列长度为3空闲线程保留时间为2000毫秒的线程池。
它使用了一个简单的拒绝策略当任务被拒绝时它会打印一条消息。
然后测试用例向线程池中提交了10个简单的任务每个任务都会打印一条消息然后睡眠100毫秒再打印一条消息。
最后测试用例调用了shutdown方法来关闭线程池。
当我们运行这个测试用例时会看到类似下面的输出
初始化线程池: initialSize: 1, maxSize: 4, coreSize: 2
put task: 0
小于核心线程数创建新的线程: thread count: 2, queue remainingCapacity : 2
put task: 1
WorkerThread Thread-1, poll current task: 0
WorkerThread Thread-0, poll current task: 1
Task 1 is running.
任务放入队列: thread count: 2, queue remainingCapacity : 2
put task: 2
Task 0 is running.
任务放入队列: thread count: 2, queue remainingCapacity : 2
put task: 3
任务放入队列: thread count: 2, queue remainingCapacity : 1
put task: 4
任务放入队列: thread count: 2, queue remainingCapacity : 0
put task: 5
WorkerThread Thread-2, poll current task: 2
Task 2 is running.
队列已满, 创建新的线程: thread count: 3, queue remainingCapacity : 0
put task: 6
WorkerThread Thread-3, poll current task: 3
Task 3 is running.
队列已满, 创建新的线程: thread count: 4, queue remainingCapacity : 0
put task: 7
Task rejected: 7
put task: 8
Task rejected: 8
put task: 9
Task rejected: 9
Task 2 is completed.
Task 1 is completed.
WorkerThread Thread-2, poll current task: 4
Task 4 is running.
Task 0 is completed.
Task 3 is completed.
WorkerThread Thread-1, poll current task: 6
WorkerThread Thread-0, poll current task: 5
Task 6 is running.
Task 5 is running.
Task 5 is completed.
Task 6 is completed.
Task 4 is completed.
WorkerThread Thread-3 exit
WorkerThread Thread-1 exit
WorkerThread Thread-0 exit
WorkerThread Thread-2 exit
shutdown thread, count: 0, queue remainingCapacity : 3从输出中可以看出线程池中最多有4个线程在同时运行。当有空闲线程时新提交的任务会被立即执行否则新提交的任务会被添加到任务队列中等待执行。
ok到了这里大家能够帮助大家更好地理解上文中实现的简单线程池。
接下来大家可以去进一步技术深入去研究线程池的源码了。
实操总结
通过实操我们一步一步地实现了一个简单的线程池。
我们从最基本的功能开始逐步添加了拒绝策略、自动调节线程资源等高级功能并对线程池进行了优化。
通过这个过程我们可以更好地理解线程池的工作原理和实现细节。
当然这只是一个简单的示例实际应用中的线程池可能会更加复杂和强大。
上的实操与Java标准库中提供的线程池相比仍然存在一些不足之处。
例如
没有提供足够的构造参数和方法让用户可以更好地控制和监控线程池的行为。没有提供足够多的拒绝策略让用户可以根据不同的场景选择不同的拒绝策略。没有提供定时任务和周期性任务的执行功能。没有提供足够完善的错误处理机制。
Java标准库中提供的线程池实现如ThreadPoolExecutor类则在这些方面都做得更好。
它提供了丰富的构造参数和方法让用户可以更好地控制和监控线程池的行为它提供了多种拒绝策略让用户可以根据不同的场景选择不同的拒绝策略它还提供了ScheduledThreadPoolExecutor类用来执行定时任务和周期性任务它还提供了完善的错误处理机制可以帮助用户更好地处理异常情况。
如果要深入了解Java标准库中提供的线程池实现可以参考清华大学出版社所出版的尼恩 Java高并发三部曲之二 《Java高并发核心编程 卷2 加强版》。
此书广受好评写得非常细致这里不做赘述。
作者介绍
本文1作 Andy资深架构师 《Java 高并发核心编程 加强版》作者之1 。
本文2作 尼恩40岁资深老架构师 《Java 高并发核心编程 加强版 卷1、卷2、卷3》创世作者 著名博主 。 《K8S学习圣经》《Docker学习圣经》等11个PDF 圣经的作者。
参考文献
https://www.cnblogs.com/daoqidelv/p/7043696.html
清华大学出版社《Java高并发核心编程 卷2 加强版》
《队列之王 Disruptor 红宝书》
技术自由的实现路径 PDF
实现你的 架构自由
《吃透8图1模板人人可以做架构》
《10Wqps评论中台如何架构B站是这么做的》
《阿里二面千万级、亿级数据如何性能优化 教科书级 答案来了》
《峰值21WQps、亿级DAU小游戏《羊了个羊》是怎么架构的》
《100亿级订单怎么调度来一个大厂的极品方案》
《2个大厂 100亿级 超大流量 红包 架构方案》
… 更多架构文章正在添加中
实现你的 响应式 自由
《响应式圣经10W字实现Spring响应式编程自由》
这是老版本 《Flux、Mono、Reactor 实战史上最全》
实现你的 spring cloud 自由
《Spring cloud Alibaba 学习圣经》
《分库分表 Sharding-JDBC 底层原理、核心实战史上最全》
《一文搞定SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系史上最全》
实现你的 linux 自由
《Linux命令大全2W多字一次实现Linux自由》
实现你的 网络 自由
《TCP协议详解 (史上最全)》
《网络三张表ARP表, MAC表, 路由表实现你的网络自由》
实现你的 分布式锁 自由
《Redis分布式锁图解 - 秒懂 - 史上最全》
《Zookeeper 分布式锁 - 图解 - 秒懂》
实现你的 王者组件 自由
《队列之王 Disruptor 原理、架构、源码 一文穿透》
《缓存之王Caffeine 源码、架构、原理史上最全10W字 超级长文》
《缓存之王Caffeine 的使用史上最全》
《Java Agent 探针、字节码增强 ByteBuddy史上最全》
实现你的 面试题 自由
4000页《尼恩Java面试宝典 》 40个专题
以上尼恩 架构笔记、面试题 的PDF文件更新请到《技术自由圈》公号获取↓↓↓
免费获取11个技术圣经 PDF