建设电子网站试卷a,网站如何做服务器授权,广东品牌网站制作公司,广西住房建设厅网站首页码出高效#xff1a;Java开发手册笔记#xff08;线程池及其源码#xff09; 码出高效#xff1a;Java开发手册笔记#xff08;线程池及其源码#xff09; 码出高效#xff1a;Java开发手册笔记#xff08;线程池及其源码#xff09;前言一、线程池的作用线程的生命周…码出高效Java开发手册笔记线程池及其源码 码出高效Java开发手册笔记线程池及其源码 码出高效Java开发手册笔记线程池及其源码前言一、线程池的作用线程的生命周期 二、线程池7大参数execute() 与 submit()Executors 静态工厂方法创建线程池自定义实现线程工厂 ThreadFactory自定义线程池拒绝策略 实现*RejectedExecutionHandler*线程池拒绝策略 三、线程池源码详解总结 前言
线程池 一、线程池的作用
线程使应用能够更加充分合理地协调利用 CPU、内存、网络、 I/O 等系统资源。线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间。在线程销毁时需要回收这些系统资源。频繁地创建和销毁线程会浪费大量的系统资源增加并发编程风险。另外在服务器负载过大的时候如何让新的线程等待或者友好地拒绝服务这些都是线程自身无法解决的。所以需要通过线程池协调多个线程 并实现类似主次线程隔离、定时执行、周期执行等任务。线程池的作用包括
利用线程池管理并复用线程、控制最大并发数等。实现任务线程队列缓存策略和拒绝机制。实现某些与时间相关的功能如定时执行、周期执行等。隔离线程环境。比如交易服务和搜索服务在同一台服务器上分别开启两个线程池交易线程的资源消耗明显要大因此通过配置独立的线程池 将较慢的交易服务与搜索服务隔离开避免各服务线程相互影响。
线程的生命周期 线程池重要的参数ctl是一个包含两个属性的原子整型一个属性是 workerCount指的是有效线程数另一个是 runState指的是线程的状态线程的生命周期
/*** The workerCount is the number of workers that have been* permitted to start and not permitted to stop. The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.*/
/*** The runState provides the main lifecycle control, taking on values:** RUNNING: Accept new tasks and process queued tasks* SHUTDOWN: Dont accept new tasks, but process queued tasks* STOP: Dont accept new tasks, dont process queued tasks,* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook method* TERMINATED: terminated() has completed*/// runState is stored in the high-order bitsprivate static final int RUNNING -1 COUNT_BITS;private static final int SHUTDOWN 0 COUNT_BITS;private static final int STOP 1 COUNT_BITS;private static final int TIDYING 2 COUNT_BITS;private static final int TERMINATED 3 COUNT_BITS;Thread类中的线程生命周期
public enum State {/*** Thread state for a thread which has not yet started.*/NEW,/*** Thread state for a runnable thread. A thread in the runnable* state is executing in the Java virtual machine but it may* be waiting for other resources from the operating system* such as processor.*/RUNNABLE,/*** Thread state for a thread blocked waiting for a monitor lock.* A thread in the blocked state is waiting for a monitor lock* to enter a synchronized block/method or* reenter a synchronized block/method after calling* {link Object#wait() Object.wait}.*/BLOCKED,/*** Thread state for a waiting thread.* A thread is in the waiting state due to calling one of the* following methods:* ul* li{link Object#wait() Object.wait} with no timeout/li* li{link #join() Thread.join} with no timeout/li* li{link LockSupport#park() LockSupport.park}/li* /ul** pA thread in the waiting state is waiting for another thread to* perform a particular action.** For example, a thread that has called ttObject.wait()/tt* on an object is waiting for another thread to call* ttObject.notify()/tt or ttObject.notifyAll()/tt on* that object. A thread that has called ttThread.join()/tt* is waiting for a specified thread to terminate.*/WAITING,/*** Thread state for a waiting thread with a specified waiting time.* A thread is in the timed waiting state due to calling one of* the following methods with a specified positive waiting time:* ul* li{link #sleep Thread.sleep}/li* li{link Object#wait(long) Object.wait} with timeout/li* li{link #join(long) Thread.join} with timeout/li* li{link LockSupport#parkNanos LockSupport.parkNanos}/li* li{link LockSupport#parkUntil LockSupport.parkUntil}/li* /ul*/TIMED_WAITING,/*** Thread state for a terminated thread.* The thread has completed execution.*/TERMINATED;
}在了解线程池的基本作用后我们学习一下线程池是如何创建线程的。首先从 ThreadPoolExecutor 构造方法讲起 学习如何自定义 ThreadFactory 和RejectedExecutionHandler 并编写一个最简单的线程池示例。然后通过分析ThreadPoolExecutor 的 execute 和 addWorker 两个核心方法学习如何把任务线程加入到线程池中运行。 ThreadPoolExecutor 的构造方法如下
二、线程池7大参数
public ThreadPoolExecutor(int corePoolSize, //第一个参数int maximumPoolSize, //第二个参数long keepAliveTime, //第三个参数TimeUnit unit, //第四个参数BlockingQueueRunnable workQueue, //第五个参数ThreadFactory threadFactory, //第六个参数RejectedExecutionHandler handler) { //第七个参数if (corePoolSize 0 ||// maximumPoolSize必须大于或等于1也要大于或等于corePoolSize 第1处maximumPoolSize 0 ||maximumPoolSize corePoolSize ||keepAliveTime 0)throw new IllegalArgumentException();// 第2处if (workQueue null || threadFactory null || handler null)throw new NullPointerException();this.corePoolSize corePoolSize;this.maximumPoolSize maximumPoolSize;this.workQueue workQueue;this.keepAliveTime unit.toNanos(keepAliveTime);this.threadFactory threadFactory;this.handler handler;
}第 1 个参数corePoolSize 表示常驻核心线程数。如果等于 0 则任务执行完之后 没有任何请求进入时销毁线程池的线程如果大于 0 即使本地任务执行完毕核心线程也不会被销毁。这个值的设置非常关键设置过大会浪费资源设置过小会导致线程频繁地创建或销毁。 第 2 个参数 maximumPoolSize 表示线程池能够容纳同时执行的最大线程数。从上方示例代码中的第 1 处来看必须大于或等于 1。如果待执行的线程数大于此值需要借助第 5 个参数的帮助缓存在队列中。如果 maximumPoolSize 与 corePoolSize相等即是固定大小线程池。 第 3 个参数 keepAliveTime 表示线程池中的线程空闲时间当空闲时间达到 keepAliveTime 值时线程会被销毁直到只剩下 corePoolSize 个线程为止避免浪费内存和句柄资源。在默认情况下当线程池的线程数大于 corePoolSize 时keepAliveTime 才会起作用。但是当 ThreadPoolExecutor 的 allowCoreThreadTimeOut变量设置为 true 时 核心线程超时后也会被回收。 第 4 个参数TimeUnit 表示时间单位。 keepAliveTime 的时间单位通常是TimeUnit.SECONDS 。 第 5 个参数 workQueue 表示缓存队列。当请求的线程数大于 maximumPoolSize时 线程进入 BlockingQueue 阻塞队列。后续示例代码中使用的 LinkedBlockingQueue是单向链表使用锁来控制入队和出队的原子性两个锁分别控制元素的添加和获取是一个生产消费模型队列。 第 6 个参数 threadFactory 表示线程工厂。它用来生产一组相同任务的线程。线程池的命名是通过给这个 factory 增加组名前缀来实现的。在虚拟机栈分析时就可以知道线程任务是由哪个线程工厂产生的。 第 7 个参数 handler 表示执行拒绝策略的对象。当超过第 5 个参数 workQueue的任务缓存区上限的时候就可以通过该策略处理请求这是 种简单的限流保护。像某年双十一没有处理好访问流量过载时的拒绝策略导致内部测试页面被展示出来使用户手足无措。友好的拒绝策略可以是如下三种 ( 1 保存到数据库进行削峰填谷。在空间时再提取出来执行。 ( 2 转向某个提示页由。 ( 3 打印日志。 从代码第 2 处来看 队列、线程工厂、拒绝处理服务都必须有实例对象 但在实际编程中很少有程序员对这三者进行实例化而通过 Executors 这个线程池静态工厂提供默认实现 那么 Exceutors 与 ThreadPoolExecutor 是什么关系呢线程池相关类图如下图所示。
execute() 与 submit()
execute为ThreadPoolExecutor里的方法没有返回值
/*** param 线程任务* throws RejectedExecutionException 如果无法创建任何状态的线程任务* /
void execute (Runnable command );submit为AbstractExecutorService里的方法有返回值
public Future? submit(Runnable task) {if (task null) throw new NullPointerException();RunnableFutureVoid ftask newTaskFor(task, null);execute(ftask);return ftask;
}Executors 静态工厂方法创建线程池 ExecutorService 接口继承了 Executor 接口定义了管理线程任务的方法。ExecutorService 的抽象类 AbstractExecutorService 提供了 submit() 、 invokeAll() 等部分方法的实现 但是核心方法Executor.execute() 并没有在这里实现。因为所有的任务都在这个方法里执行 不同实现会带来不同的执行策略这点在后续的 ThreadPoolExecutor 解析时 会一步步地分析。通过Executors 的静态工厂方法可以创建三个线城池的包装对象。 ForkJoinPool 、 ThreadPooIExecutor、ScheduledThreadPoolExecutor 。 Executors 核心的方法有五个
Executors.newWorkStealingPool: JDK8 引人创建持有足够线程的线程池支持给定的并行度 并通过使用多个队列减少竞争 此构造方法中把 CPU 数量设置为默认的并行度 /*** Creates a work-stealing thread pool using all* {link Runtime#availableProcessors available processors}* as its target parallelism level.* return the newly created thread pool* see #newWorkStealingPool(int)* since 1.8*/public static ExecutorService newWorkStealingPool() {// 返回 ForkJoinPoolJDK7引入对象它也是 AbstractExecutorService 的子类return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);}Executors.newCachedThreadPool: maximumPoolSize 最大可以至 Integer.MAX_VALUE 是高度可伸缩的线程池 如果达到这个上限 相信没有任何服务器能够继续工作肯定会抛出 OOM 异常。 keepAliveTime 默认为 60 秒工作线程处于空闲状态 则回收工作线程。如果任务数增加 再次创建出新线程处理任务。Executors.newScheduledThreadPool线程数最大至 Integer.MAX_VALUE与上述相同存在 OOM 风险。它是ScheduledExecutorService 接口家族的实现类支持定时及周期性任务执行。相 比 Timer , ScheduledExecutorService 更安全功能更强大与 newCachedThreadPool 的区别是不回收工作线程。Executors.newSingleThreadExecutor 创建一个单线程的线程池相当于单线程串行执行所有任务保证按任务的提交顺序依次执行。Executors.newFixedThreadPool : 输入的参数即是固定线程数既是核心线程数也是最大线程数 不存在空闲线程所以keepAliveTime 等于 0 /*** Creates a thread pool that reuses a fixed number of threads* operating off a shared unbounded queue. At any point, at most* {code nThreads} threads will be active processing tasks.* If additional tasks are submitted when all threads are active,* they will wait in the queue until a thread is available.* If any thread terminates due to a failure during execution* prior to shutdown, a new one will take its place if needed to* execute subsequent tasks. The threads in the pool will exist* until it is explicitly {link ExecutorService#shutdown shutdown}.** param nThreads the number of threads in the pool* return the newly created thread pool* throws IllegalArgumentException if {code nThreads 0}*/public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable());}这里 输入的队列没有指明长度下面介绍 LinkedBlockingQueue 的构造方法 /*** Creates a {code LinkedBlockingQueue} with a capacity of* {link Integer#MAX_VALUE}.*/public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}使用这样的无界队列 如果瞬间请求非常大 会有 OOM 的风险。除newWorkStealingPool 外 其他四个创建方式都存在资源耗尽的风险。
自定义实现线程工厂 ThreadFactory Executors 中默认的线程工厂和拒绝策略过于简单通常对用户不够友好。线程工厂需要做创建前的准备工作对线程池创建的线程必须明确标识就像药品的生产批号一样为线程本身指定有意义的名称和相应的序列号。拒绝策略应该考虑到业务场景返回相应的提示或者友好地跳转。以下为简单的 ThreadFactory 示例
package com.example.demo.test1;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** Author: Ron* Create: 2023-04-24 15:07*/
public class UserThreadFactory implements ThreadFactory {private final String namePrefix;private final AtomicInteger nextId new AtomicInteger(1);// 定义线程组名称在使用jstack来排查线程问题时可以通过线程组名称来快速定位问题UserThreadFactory(String whatFeatureOfGroup) {namePrefix From UserThreadFactorys whatFeatureOfGroup -Worker-;}Overridepublic Thread newThread(Runnable task) {String name namePrefix nextId.getAndIncrement();Thread thread new Thread(null, task, name, 0);
// System.out.println(thread.getName());return thread;}
}上述示例包括线程工厂和任务执体的定义 通过 newThread 方法快速、统一地创建线程任务强调线程一定要有特定意义的名称方便出错时回溯。 如图 7-5 所示为排查底层公共缓存调用出错时的截图绿色框采用自定义的线程工厂明显比蓝色框默认的线程工厂创建的线程名称拥有更多的额外信息 。 如调用来源、线程的业务含义有助于快速定位到死锁、 StackOverflowError 等问题。
自定义线程池拒绝策略 实现RejectedExecutionHandler 下面再简单地实现一下 RejectedExecutionHandler实现了接口的 rejectedExecution方法打印出当前线程池状态源码如下
package com.example.demo.test1;import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;/*** Author: Ron* Create: 2023-04-24 15:48*/
public class UserRejectHandler implements RejectedExecutionHandler {Overridepublic void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {System.out.println(Task rejected from executor.toString());// 可以将task存入队列等空闲时再去执行}
}线程池拒绝策略
在ThreadPoolExecutor 中提供了四个公开的内部静态类
AbortPolicy 默认丢弃任务并抛出 RejectedExecutionException 异常。DiscardPolicy 丢弃任务但是不抛出异常 这是不推荐的做法。DiscardOldestPolicy 抛弃队列中等待最久的任务 然后把当前任务加入队列中。CallerRunsPolicy 调用任务的 run() 方法绕过线程池直接执行使用调用线程执行任务。 new ThreadPoolExecutor.AbortPolicy(); // 默认队列满了丢弃任务并抛出异常new ThreadPoolExecutor.CallerRunsPolicy(); // 如果添加线城池失败使用调用线程执行任务new ThreadPoolExecutor.DiscardOldestPolicy(); // 将最早进入队列的任务删除之后尝试加入队列new ThreadPoolExecutor.DiscardPolicy(); // 队列满了丢弃任务不抛出异常根据之前实现的线程工厂和拒绝策略线程池的相关代码实现如下
package com.example.demo.test1;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** Author: Ron* Create: 2023-04-24 15:59*/
public class UserThreadPool {public static void main(String[] args) {// 缓存队列设置固定长度为2为了快速触发 rejectHandlerBlockingQueue queue new LinkedBlockingQueue(2);// 假设外部任务线程的来源由机房1和机房2的混合调用UserThreadFactory f1 new UserThreadFactory(第1机房);UserThreadFactory f2 new UserThreadFactory(第2机房);// 自定义拒绝策略UserRejectHandler handler new UserRejectHandler();// 核心线程为1最大线程为2为了保证触发rejectHandlerThreadPoolExecutor threadPoolFirst new ThreadPoolExecutor(1, 2, 60,TimeUnit.SECONDS, queue, f1, handler);// 利用第二个线程工厂实力创建第二个线程池ThreadPoolExecutor threadPoolSecond new ThreadPoolExecutor(1, 2, 60,TimeUnit.SECONDS, queue, f2, handler);// 创建400个任务线程Runnable task new Task();for (int i 0; i 200; i) {threadPoolFirst.execute(task);threadPoolSecond.execute(task);threadPoolSecond.submit(task);}}}
执行结果如下
当任务被拒绝的时候拒绝策略会打印出当前线程池的大小已经达到了maximumPoolSize2 且队列已满完成的任务数提示已经有 4 个最后一行。
三、线程池源码详解 在 ThreadPoolExecutor 的属性定义中频繁地用位移运算来表示线程池状态 位移运算是改变当前值的一种高效手段 包括左移与右移。下面从属性定义开始阅读ThreadPoolExecutor 的源码。 // Integer共有32位最右边29位表示工作线程数最左边3位表示线程池状态// 注简单地说3个二进制可以表示从0到7共8个不同的数值第1处private static final int COUNT_BITS Integer.SIZE - 3;// 000-11111...111129个1类似于子网掩码用于位的与运算// 得到左边3位还是右边29位private static final int CAPACITY (1 COUNT_BITS) - 1;// runState is stored in the high-order bits// 用左边3位实现5种线程池状态。在左3位之后加入中画线有助于理解// 111-00000...000029个0十进制值-536,870,912// 此状态表示线程池能接受新任务private static final int RUNNING -1 COUNT_BITS;// 000-00000...000029个0十进制值0// 此状态不再接受新任务但可以继续执行队列中的任务private static final int SHUTDOWN 0 COUNT_BITS;// 001-00000...000029个0十进制值536,870,912// 此状态全面拒绝并中断正在处理的任务private static final int STOP 1 COUNT_BITS;// 010-00000...000029个0十进制值1,073,741,824// 此状态表示所有任务已经被终止private static final int TIDYING 2 COUNT_BITS;// 011-00000...000029个0十进制值1,610,612,736// 此状态表示已清理完现场private static final int TERMINATED 3 COUNT_BITS;// Packing and unpacking ctl// 与运算比如 001-00000...000029个0表示67个工作线程// 掩码取反 111-00000...000029个0即得到左边3位001// 表示线程池当前出于STOP状态private static int runStateOf(int c) { return c ~CAPACITY; }// 同理掩码 000-11111...111129个1得到右边29位即工作线程数private static int workerCountOf(int c) { return c CAPACITY; }// 把左边3位与右边29位按或运算合并成一个值private static int ctlOf(int rs, int wc) { return rs | wc; }
第 l 处说明 线程池的状态用高 3 位表示其中包括了符号位。五种状态的 十进制值按从小到大依次排序为 RUNNING SHUTDOWN STOP TIDYING TERMINATED 这样设计的好处是可以通过比较值的大小来确定线程池的状态。例 如程序中经常会出现 isRunning 的判断
private static boolean isRunning(int c) {return c SHUTDOWN;
}我们都知道 Executor 接口有且只有一个方法 execute 通过参数传入待执行线程的对象。下面分析 ThreadPoolExecutor 关于 execute 方法的实现 /*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {code RejectedExecutionHandler}.** param command the task to execute* throws RejectedExecutionException at discretion of* {code RejectedExecutionHandler}, if the task* cannot be accepted for execution* throws NullPointerException if {code command} is null*/public void execute(Runnable command) {if (command null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldnt, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/// 返回包含线程数及线程池状态的Integer类型数值int c ctl.get();// 如果工作线程数小于核心线程数则创建线程任务并执行if (workerCountOf(c) corePoolSize) {// addWorker是另一个极为重要的方法见下一段源码解析第1处if (addWorker(command, true))return;// 如果创建失败防止外部已经在线程池中加入新任务重新获取一下c ctl.get();}// 只有线程池处于RUNNING状态才执行后半句置入队列if (isRunning(c) workQueue.offer(command)) {int recheck ctl.get();// 如果线程池不是RUNNING状态则将刚加入队列的任务移除if (! isRunning(recheck) remove(command))reject(command);// 如果之前的线程已被消费完新建一个线程else if (workerCountOf(recheck) 0)addWorker(null, false);}// 核心池和队列都已满尝试创建一个新线程else if (!addWorker(command, false))// 如果addWorker返回是false即创建失败则唤醒拒绝策略第2处reject(command);}第 1 处 execute 方法在不同的阶段有三次 addWorker 的尝试动作。 第 2 处 发生拒绝的理由有两个 ( 1 线程池状态为非 RUNNING状态 ( 2 等待队列己满。 下面继续分析 addWorker 方法的源码 /*** 根据当前线程池状态检查是否可以添加新的任务线程如果则创建并启动任务* 如果一切正常则返回true。返回false的可能性如下* 1.线程池没有处于RUNNING状态* 2.线程工厂创建新的任务线程失败* param firstTask 外部启动线程池是需要构造的第一个线程他是线程的母体* param core 新增工作线程时的判断指标解释如下* true 表示新增工作线程时需要判断当前RUNNING状态的线程是否少于corePoolSize* false 表示新增工作线程时需要判断当前RUNNING状态的线程是否少于maximumPoolSize* return*/private boolean addWorker(Runnable firstTask, boolean core) {// 不需要任务预定义的语法标签响应下文的continue retry快速退出多层嵌套循环第1处retry:for (;;) {// 参考之前的状态分析如果RUNNING状态则条件为假不执行后面的判断// 如果是STOP及之上的状态或者firstTask初始线程不为空或者工作队列不为空// 都会直接返回创建失败第2处int c ctl.get();int rs runStateOf(c);// Check if queue empty only if necessary.if (rs SHUTDOWN ! (rs SHUTDOWN firstTask null ! workQueue.isEmpty()))return false;for (;;) {// 如果超过最大允许线程数则不能再添加新的线程// 最大线程数不能超过2^29否则影响左边3位的线程池状态值int wc workerCountOf(c);if (wc CAPACITY ||wc (core ? corePoolSize : maximumPoolSize))return false;// 将当前活动线程数1 (第3处)if (compareAndIncrementWorkerCount(c))break retry;// 线程池状态和工作线程数是可变化的需要经常提取这个最新值c ctl.get(); // Re-read ctl// 如果已经关闭则再次从retry标签处进入在第2处再做判断 (第4处)if (runStateOf(c) ! rs)continue retry;// else CAS failed due to workerCount change; retry inner loop// 如果线程还是处于RUNNING状态那就在说明仅仅是第3处失败// 继续循环执行 第5处}}// 开始创建工作线程boolean workerStarted false;boolean workerAdded false;ThreadPoolExecutor.Worker w null;try {// 利用Worker构造方法中的线程池工厂创建线程并封装成工作线程Worker对象w new ThreadPoolExecutor.Worker(firstTask);// 注意这是Worker中的属性对象thread (第6处)final Thread t w.thread;if (t ! null) {// 在进行ThreadPoolExecutor的敏感操作时// 都需要持有主锁避免在添加和启动线程时被干扰final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs runStateOf(ctl.get());// 当线程池状态为RUNNING或SHUTDOWN// 且firstTask初始线程为空时if (rs SHUTDOWN ||(rs SHUTDOWN firstTask null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s workers.size();// 整个线程池在运行期间的最大并发任务个数if (s largestPoolSize)largestPoolSize s;workerAdded true;}} finally {mainLock.unlock();}if (workerAdded) {// 终于看到亲切的start方法// 注意并非线程池的execute的command参数指向的线程t.start();workerStarted true;}}} finally {if (! workerStarted)// 线程启动失败把刚才第3处加上的工作线程计数再减回去addWorkerFailed(w);}return workerStarted;}这段代码晦涩难懂部分地方甚至违反了代码规约但其中蕴含的丰富的编码知识点值得我们去学习下面接序号来依次讲解。 第 1 处 配合循环语旬出现的 label 类似于 goto 作用。 label 定义时必须把标签和冒号的组合语旬紧紧相邻定义在循环体之前否则会编译出错。目的是在实现多重循环时能够快速退出到任何一层。这种做法的出发点似乎非常贴心但是在大型软件项目中 滥用标签行跳转的后果将是灾难性的。示例代码中在 retry 下方有两个无限循环在 workerCount 加 1 成功后直接退出两层循环。 第 2 处这样的表达式不利于代码阅读应该改成
Boolean isNotAllowedToCreateTask runStateLeast(c, SHUTDOWN) (runStateAtLeast(c, STOP)|| firstTask ! null || workQueue.isEmpty());
if (isNotAllowedToCreateTask) {// ...
}第 3 处 与第 1 处的标签呼应 Atomiclnteger 对象的加1操作是原子性的。break retry 表示直接跳出与 retry 相邻的这个循环体。 第 4 处此 continue 跳转至标签处继续执行循环。如果条件为假则说明线程池还处于运行状态即继续在 for 循环内执行。 第 5 处 compareAndlncrementWorkerCount 方法执行失败的概率非常低。即使失败 再次执行时成功的概率也是极高的类似于自旋锁原理。这里的处理逻辑是先加 1创建失败再减1这是轻量处理并发创建线程的方式。如果先创建线程成功再加1,当发现超出限制后再销毁线程那么这样的处理方式明显比前者代价要大。 第 6 处 Worker 对象是工作线程的核心类实现部分源码如下
/*** 它实现Runnable接口并把本对象作为参数输入给run方法中的runWorker(this),* 所以内部属性线程thread在start的时候即会调用runWorker方法*/private final class Worker extends AbstractQueuedSynchronizer implements Runnable {/*** Creates with given first task and thread from ThreadFactory.* param firstTask the first task (null if none)*/Worker(Runnable firstTask) {// 它是AbstractQueuedSynchronizer的方法// 在runWorker方法执行之前禁止线程被中断setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask;this.thread getThreadFactory().newThread(this);}// 当thread被start()之后执行runWorker的方法/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}}总结 线程池的相关源码比较精炼还包括线程池的销毁、任务提取和消费等与线程状态图一样线程池也有自己独立的状态转化流程本节不再展开。总结一下使用线程池要注意如下几点 ( 1 合理设置各类参数应根据实际业务场景来设置合理的工作线程数。 ( 2 线程资源必须通过线城池提供不允许在应用中自行显式创建线程。 ( 3 创建线程或线城池时请指定有意义的线程名称方便出错时回溯。 线程池不允许使用 Executors 而是通过 ThreadPoolExecutor 的方式创建 这样的处理方式能更加明确线程池的运行规则规避资源耗尽的风险。