贵阳汽车网站建设,网站的页面结构,设立网站 费用,wordpress用户上传提示#xff1a;文章写完后#xff0c;目录可以自动生成#xff0c;如何生成可参考右边的帮助文档 文章目录 四、共享模型之内存4.1 Java 内存模型4.2 可见性退不出的循环解决方法可见性 vs 原子性模式之 Balking1.定义2.实现 4.3 有序性原理之指令级并行1. 名词2.鱼罐头的故… 提示文章写完后目录可以自动生成如何生成可参考右边的帮助文档 文章目录 四、共享模型之内存4.1 Java 内存模型4.2 可见性退不出的循环解决方法可见性 vs 原子性模式之 Balking1.定义2.实现 4.3 有序性原理之指令级并行1. 名词2.鱼罐头的故事3.指令重排序优化4.支持流水线的处理器 诡异的结果解决方法原理之 volatile1.如何保证可见性2.如何保证有序性3.double-checked locking 问题4.double-checked locking 解决 happens-before 4.4 单例模式1.饿汉单例2. 枚举单例3. 懒汉单例4. DCL 懒汉单例5. 静态内部类懒汉单例 五、共享模型之无锁5.1 问题提出解决思路-锁解决思路-无锁 5.2 CAS 与 volatilevolatile为什么无锁效率高CAS 的特点 5.3 原子整数5.4 原子引用不安全实现安全实现-使用锁安全实现-使用 CASABA 问题及解决ABA问题AtomicStampedReferenceAtomicMarkableReference 5.5 原子数组不安全的数组安全的数组 5.6 字段更新器5.7 原子累加器累加器性能比较cas 锁原理之伪共享LongAdder源码 5.8 Unsafe概述Unsafe CAS 操作 六、共享模型之不可变6.1 日期转换的问题问题提出思路 - 同步锁思路 - 不可变 6.2 不可变设计final 的使用保护性拷贝模式之享元1.定义2.体现2.1 包装类2.2 String 串池2.3 BigDecimal BigInteger 3. DIY 原理之 final1. 设置 final 变量的原理 6.3 无状态 七、共享模型之工具--线程池7.1 自定义线程池7.2 ThreadPoolExecutor1线程池状态2构造方法3newFixedThreadPool4newCachedThreadPool5newSingleThreadExecutor6提交任务7关闭线程池8任务调度线程池9正确处理执行任务异常10 Tomcat 线程池 7.3 Fork/Join1概念2使用 模式之工作线程1. 定义2.饥饿3.创建多少线程池合适3.1 CPU 密集型运算3.2 I/O 密集型运算 四、共享模型之内存
4.1 Java 内存模型
JMM 即 Java Memory Model它定义了主存、工作内存抽象概念底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。
JMM 体现在以下几个方面
原子性 - 保证指令不会受到线程上下文切换的影响可见性 - 保证指令不会受 cpu 缓存的影响有序性 - 保证指令不会受 cpu 指令并行优化的影响
4.2 可见性
退不出的循环
main 线程对 run 变量的修改对于 t 线程不可见导致了 t 线程无法停止
static boolean run true;
public static void main(String[] args) throws InterruptedException {Thread t new Thread(() - {while (run) {// ....}});t.start();sleep(1);run false; // 线程t不会如预想的停下来
}分析
初始状态 t 线程刚开始从主内存读取了 run 的值到工作内存因为 t 线程要频繁从主内存中读取 run 的值JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中 减少对主存中 run 的访问提高效率1 秒之后main 线程修改了 run 的值并同步至主存而 t 是从自己工作内存中的高速缓存中读取这个变量 的值结果永远是旧值 解决方法
volatile易变关键字
可以用来修饰成员变量和静态成员变量避免线程从自己的工作缓存中查找变量的值必须到主存中获取 它的值线程操作 volatile 变量都是直接操作主存
可见性 vs 原子性
前面例子体现的实际就是可见性它保证的是在多个线程之间一个线程对 volatile 变量的修改对另一个线程可 见 不能保证原子性仅用在一个写线程多个读线程的情况 上例从字节码理解是这样的
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
putstatic run // 线程 main 修改 run 为 false 仅此一次
getstatic run // 线程 t 获取 run false 比较一下之前我们将线程安全时举的例子两个线程一个 i 一个 i-- 只能保证看到最新值不能解决指令交错
// 假设i的初始值为0
getstatic i // 线程2-获取静态变量i的值 线程内i0
getstatic i // 线程1-获取静态变量i的值 线程内i0
iconst_1 // 线程1-准备常量1
iadd // 线程1-自增 线程内i1
putstatic i // 线程1-将修改后的值存入静态变量i 静态变量i1
iconst_1 // 线程2-准备常量1
isub // 线程2-自减 线程内i-1
putstatic i // 线程2-将修改后的值存入静态变量i 静态变量i-1注意 synchronized 语句块既可以保证代码块的原子性也同时保证代码块内变量的可见性。但缺点是 synchronized 是属于重量级操作性能相对更低 如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符线程 t 也能正确看到 对 run 变量的修改了因为println()方法里面加了synchronized 模式之 Balking
1.定义
Balking 犹豫模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事那么本线程就无需再做 了直接结束返回
2.实现
例如
public class MonitorService {// 用来表示是否已经有线程已经在执行启动了private volatile boolean starting;public void start() {log.info(尝试启动监控线程...);synchronized(this) {if (starting) {return;}starting true;}// 真正启动监控线程...}
}它还经常用来实现线程安全的单例
public final class Singleton {private Singleton() {}private static Singleton INSTANCE null;public static synchronized Singleton getInstance() {if (INSTANCE ! null) {return INSTANCE;}INSTANCE new Singleton();return INSTANCE;}
}4.3 有序性
JVM 会在不影响正确性的前提下可以调整语句的执行顺序代码如下
static int i;
static int j;
// 在某个线程内执行如下赋值操作
i ...;
j ...;可以看到至于是先执行 i 还是 先执行 j 对最终的结果不会产生影响。所以上面代码真正执行时既可以是
i ...;
j ...;也可以是
j ...;
i ...;这种特性称之为『指令重排』多线程下『指令重排』会影响正确性。为什么要有重排指令这项优化呢从 CPU 执行指令的原理来理解
原理之指令级并行
1. 名词
Clock Cycle Time
主频的概念大家接触的比较多而 CPU 的 Clock Cycle Time时钟周期时间等于主频的倒数意思是 CPU 能 够识别的最小时间单位比如说 4G 主频的 CPU 的 Clock Cycle Time 就是 0.25 ns作为对比我们墙上挂钟的 Cycle Time 是 1s
例如运行一条加法指令一般需要一个时钟周期时间
CPI
有的指令需要更多的时钟周期时间所以引出了 CPI Cycles Per Instruction指令平均时钟周期数
IPC
IPCInstruction Per Clock Cycle 即 CPI 的倒数表示每个时钟周期能够运行的指令数
CPU 执行时间
程序的 CPU 执行时间可以用下面的公式来表示
程序 CPU 执行时间 指令数 * CPI * Clock Cycle Time2.鱼罐头的故事
加工一条鱼需要 50 分钟只能一条鱼、一条鱼顺序加工… 可以将每个鱼罐头的加工流程细分为 5 个步骤
去鳞清洗 10分钟蒸煮沥水 10分钟加注汤料 10分钟杀菌出锅 10分钟真空封罐 10分钟 即使只有一个工人最理想的情况是他能够在 10 分钟内同时做好这 5 件事因为对第一条鱼的真空装罐不会 影响对第二条鱼的杀菌出锅…
3.指令重排序优化
现代处理器会设计为一个时钟周期完成一条执行时间最长的 CPU 指令。为什么这么做呢可以想到指令还可以再划分成一个个更小的阶段例如每条指令都可以分为 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回这 5 个阶段 https://blog.51cto.com/u_12855/7031517
在不改变程序结果的前提下这些指令的各个阶段可以通过重排序和组合来实现指令级并行这一技术在 80’s 中 叶到 90’s 中叶占据了计算架构的重要地位。
指令重排的前提是重排指令不能影响结果例如
// 可以重排的例子
int a 10; // 指令1
int b 20; // 指令2
System.out.println( a b );
// 不能重排的例子
int a 10; // 指令1
int b a - 5; // 指令24.支持流水线的处理器
现代 CPU 支持多级指令流水线例如支持同时执行 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 的处理 器就可以称之为五级指令流水线。这时 CPU 可以在一个时钟周期内同时运行五条指令的不同阶段相当于一 条执行时间最长的复杂指令IPC 1本质上流水线技术并不能缩短单条指令的执行时间但它变相地提高了 指令地吞吐率。 诡异的结果
int num 0;
boolean ready false;
// 线程1 执行此方法
public void actor1(I_Result r) {if (ready) {r.r1 num num;} else {r.r1 1;}}// 线程2 执行此方法
public void actor2(I_Result r) {num 2;ready true;
}情况1线程1 先执行这时 ready false所以进入 else 分支结果为 1
情况2线程2 先执行 num 2但没来得及执行 ready true线程1 执行还是进入 else 分支结果为1
情况3线程2 执行到 ready true线程1 执行这回进入 if 分支结果为 4因为 num 已经执行过了
情况4线程2 执行 ready true切换到线程1进入 if 分支相加为 0再切回线程2 执行 num 2这种现象叫做指令重排是 JIT 编译器在运行时的一些优化
解决方法
volatile 修饰的变量可以禁用指令重排
int num 0;
volatile boolean ready false;
// 线程1 执行此方法
public void actor1(I_Result r) {if (ready) {r.r1 num num;} else {r.r1 1;}}// 线程2 执行此方法
public void actor2(I_Result r) {num 2;ready true;
}原理之 volatile
volatile 的底层实现原理是内存屏障Memory BarrierMemory Fence
对 volatile 变量的写指令后会加入写屏障对 volatile 变量的读指令前会加入读屏障
1.如何保证可见性
写屏障sfence保证在该屏障之前的对共享变量的改动都同步到主存当中
public void actor2(I_Result r) {num 2;ready true; // ready 是 volatile 赋值带写屏障// 写屏障
} 而读屏障lfence保证在该屏障之后对共享变量的读取加载的是主存中最新数据
public void actor1(I_Result r) {// 读屏障// ready 是 volatile 读取值带读屏障if (ready) {r.r1 num num;} else {r.r1 1;}
}2.如何保证有序性 写屏障会确保指令重排序时不会将写屏障之前的代码排在写屏障之后 读屏障会确保指令重排序时不会将读屏障之后的代码排在读屏障之前
不能解决指令交错 写屏障仅仅是保证之后的读能够读到最新的结果但不能保证读跑到它前面去有并发问题 而有序性的保证也只是保证了本线程内相关代码不被重排序 #mermaid-svg-KHDfxHf3iQJt5Jdx {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-KHDfxHf3iQJt5Jdx .error-icon{fill:#552222;}#mermaid-svg-KHDfxHf3iQJt5Jdx .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-KHDfxHf3iQJt5Jdx .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-KHDfxHf3iQJt5Jdx .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-KHDfxHf3iQJt5Jdx .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-KHDfxHf3iQJt5Jdx .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-KHDfxHf3iQJt5Jdx .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-KHDfxHf3iQJt5Jdx .marker{fill:#333333;stroke:#333333;}#mermaid-svg-KHDfxHf3iQJt5Jdx .marker.cross{stroke:#333333;}#mermaid-svg-KHDfxHf3iQJt5Jdx svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-KHDfxHf3iQJt5Jdx .actor{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-KHDfxHf3iQJt5Jdx text.actortspan{fill:black;stroke:none;}#mermaid-svg-KHDfxHf3iQJt5Jdx .actor-line{stroke:grey;}#mermaid-svg-KHDfxHf3iQJt5Jdx .messageLine0{stroke-width:1.5;stroke-dasharray:none;stroke:#333;}#mermaid-svg-KHDfxHf3iQJt5Jdx .messageLine1{stroke-width:1.5;stroke-dasharray:2,2;stroke:#333;}#mermaid-svg-KHDfxHf3iQJt5Jdx #arrowhead path{fill:#333;stroke:#333;}#mermaid-svg-KHDfxHf3iQJt5Jdx .sequenceNumber{fill:white;}#mermaid-svg-KHDfxHf3iQJt5Jdx #sequencenumber{fill:#333;}#mermaid-svg-KHDfxHf3iQJt5Jdx #crosshead path{fill:#333;stroke:#333;}#mermaid-svg-KHDfxHf3iQJt5Jdx .messageText{fill:#333;stroke:#333;}#mermaid-svg-KHDfxHf3iQJt5Jdx .labelBox{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-KHDfxHf3iQJt5Jdx .labelText,#mermaid-svg-KHDfxHf3iQJt5Jdx .labelTexttspan{fill:black;stroke:none;}#mermaid-svg-KHDfxHf3iQJt5Jdx .loopText,#mermaid-svg-KHDfxHf3iQJt5Jdx .loopTexttspan{fill:black;stroke:none;}#mermaid-svg-KHDfxHf3iQJt5Jdx .loopLine{stroke-width:2px;stroke-dasharray:2,2;stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);}#mermaid-svg-KHDfxHf3iQJt5Jdx .note{stroke:#aaaa33;fill:#fff5ad;}#mermaid-svg-KHDfxHf3iQJt5Jdx .noteText,#mermaid-svg-KHDfxHf3iQJt5Jdx .noteTexttspan{fill:black;stroke:none;}#mermaid-svg-KHDfxHf3iQJt5Jdx .activation0{fill:#f4f4f4;stroke:#666;}#mermaid-svg-KHDfxHf3iQJt5Jdx .activation1{fill:#f4f4f4;stroke:#666;}#mermaid-svg-KHDfxHf3iQJt5Jdx .activation2{fill:#f4f4f4;stroke:#666;}#mermaid-svg-KHDfxHf3iQJt5Jdx .actorPopupMenu{position:absolute;}#mermaid-svg-KHDfxHf3iQJt5Jdx .actorPopupMenuPanel{position:absolute;fill:#ECECFF;box-shadow:0px 8px 16px 0px rgba(0,0,0,0.2);filter:drop-shadow(3px 5px 2px rgb(0 0 0 / 0.4));}#mermaid-svg-KHDfxHf3iQJt5Jdx .actor-man line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-KHDfxHf3iQJt5Jdx .actor-man circle,#mermaid-svg-KHDfxHf3iQJt5Jdx line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;stroke-width:2px;}#mermaid-svg-KHDfxHf3iQJt5Jdx :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} t1线程 volatile i0 t2线程 读取i0 读取i0 i1 写入i1 i1 写入i1 t1线程 volatile i0 t2线程
3.double-checked locking 问题
以著名的 double-checked locking 单例模式为例
public final class Singleton {private Singleton() {}private static Singleton INSTANCE null;public static Singleton getInstance() {if (INSTANCE null) { // t2// 首次访问会同步而之后的使用没有 synchronizedsynchronized(Singleton.class) {if (INSTANCE null) { // t1INSTANCE new Singleton();}}}return INSTANCE;}
}以上的实现特点是
懒惰实例化首次使用 getInstance() 才使用 synchronized 加锁后续使用时无需加锁有隐含的但很关键的一点第一个 if 使用了 INSTANCE 变量是在同步块之外
但在多线程环境下上面的代码是有问题的getInstance 方法对应的字节码为 其中
17 表示创建对象将对象引用入栈 // new Singleton20 表示复制一份对象引用 // 引用地址21 表示利用一个对象引用调用构造方法24 表示利用一个对象引用赋值给 static INSTANCE
也许 jvm 会优化为先执行 24再执行 21。如果两个线程 t1t2 按如下时间序列执行 #mermaid-svg-Gyfo4YQVDaX0Jvr8 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .error-icon{fill:#552222;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .marker.cross{stroke:#333333;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .actor{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 text.actortspan{fill:black;stroke:none;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .actor-line{stroke:grey;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .messageLine0{stroke-width:1.5;stroke-dasharray:none;stroke:#333;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .messageLine1{stroke-width:1.5;stroke-dasharray:2,2;stroke:#333;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 #arrowhead path{fill:#333;stroke:#333;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .sequenceNumber{fill:white;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 #sequencenumber{fill:#333;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 #crosshead path{fill:#333;stroke:#333;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .messageText{fill:#333;stroke:#333;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .labelBox{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .labelText,#mermaid-svg-Gyfo4YQVDaX0Jvr8 .labelTexttspan{fill:black;stroke:none;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .loopText,#mermaid-svg-Gyfo4YQVDaX0Jvr8 .loopTexttspan{fill:black;stroke:none;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .loopLine{stroke-width:2px;stroke-dasharray:2,2;stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .note{stroke:#aaaa33;fill:#fff5ad;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .noteText,#mermaid-svg-Gyfo4YQVDaX0Jvr8 .noteTexttspan{fill:black;stroke:none;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .activation0{fill:#f4f4f4;stroke:#666;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .activation1{fill:#f4f4f4;stroke:#666;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .activation2{fill:#f4f4f4;stroke:#666;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .actorPopupMenu{position:absolute;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .actorPopupMenuPanel{position:absolute;fill:#ECECFF;box-shadow:0px 8px 16px 0px rgba(0,0,0,0.2);filter:drop-shadow(3px 5px 2px rgb(0 0 0 / 0.4));}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .actor-man line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 .actor-man circle,#mermaid-svg-Gyfo4YQVDaX0Jvr8 line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;stroke-width:2px;}#mermaid-svg-Gyfo4YQVDaX0Jvr8 :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} t1 INSTANCE t2 17new 20dup 24putstatic(给INSTANCE赋值) 0getstatic(获取INSTANCE引用) 3ifnonnull 37(判断不为空跳转37行) 37getstatic(获取INSTAMCE引用) 40areturn(返回) 使用对象 21invokespecial(调用构造方法) t1 INSTANCE t2 关键在于 0: getstatic 这行代码在 monitor 控制之外可以越过 monitor 读取 INSTANCE 变量的值
这时 t1 还未完全将构造方法执行完毕如果在构造方法中要执行很多初始化操作那么 t2 拿到的是将是一个未初 始化完毕的单例
对 INSTANCE 使用 volatile 修饰即可可以禁用指令重排但要注意在 JDK 5 以上的版本的 volatile 才会真正有效
4.double-checked locking 解决
public final class Singleton {private Singleton() {}private static volatile Singleton INSTANCE null;public static Singleton getInstance() {// 实例没创建才会进入内部的 synchronized代码块if (INSTANCE null) {synchronized(Singleton.class) { // t2// 也许有其它线程已经创建实例所以再判断一次if (INSTANCE null) { // t1INSTANCE new Singleton();}}}return INSTANCE;}
}读写 volatile 变量时会加入内存屏障保证可见性和有序性。即刚刚调用构造方法不可能出现在赋值后面它在putstatic前加了一个写屏障禁止重排序 #mermaid-svg-ijpzo3uT6JJkGBcK {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-ijpzo3uT6JJkGBcK .error-icon{fill:#552222;}#mermaid-svg-ijpzo3uT6JJkGBcK .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-ijpzo3uT6JJkGBcK .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-ijpzo3uT6JJkGBcK .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-ijpzo3uT6JJkGBcK .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-ijpzo3uT6JJkGBcK .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-ijpzo3uT6JJkGBcK .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-ijpzo3uT6JJkGBcK .marker{fill:#333333;stroke:#333333;}#mermaid-svg-ijpzo3uT6JJkGBcK .marker.cross{stroke:#333333;}#mermaid-svg-ijpzo3uT6JJkGBcK svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-ijpzo3uT6JJkGBcK .actor{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-ijpzo3uT6JJkGBcK text.actortspan{fill:black;stroke:none;}#mermaid-svg-ijpzo3uT6JJkGBcK .actor-line{stroke:grey;}#mermaid-svg-ijpzo3uT6JJkGBcK .messageLine0{stroke-width:1.5;stroke-dasharray:none;stroke:#333;}#mermaid-svg-ijpzo3uT6JJkGBcK .messageLine1{stroke-width:1.5;stroke-dasharray:2,2;stroke:#333;}#mermaid-svg-ijpzo3uT6JJkGBcK #arrowhead path{fill:#333;stroke:#333;}#mermaid-svg-ijpzo3uT6JJkGBcK .sequenceNumber{fill:white;}#mermaid-svg-ijpzo3uT6JJkGBcK #sequencenumber{fill:#333;}#mermaid-svg-ijpzo3uT6JJkGBcK #crosshead path{fill:#333;stroke:#333;}#mermaid-svg-ijpzo3uT6JJkGBcK .messageText{fill:#333;stroke:#333;}#mermaid-svg-ijpzo3uT6JJkGBcK .labelBox{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-ijpzo3uT6JJkGBcK .labelText,#mermaid-svg-ijpzo3uT6JJkGBcK .labelTexttspan{fill:black;stroke:none;}#mermaid-svg-ijpzo3uT6JJkGBcK .loopText,#mermaid-svg-ijpzo3uT6JJkGBcK .loopTexttspan{fill:black;stroke:none;}#mermaid-svg-ijpzo3uT6JJkGBcK .loopLine{stroke-width:2px;stroke-dasharray:2,2;stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);}#mermaid-svg-ijpzo3uT6JJkGBcK .note{stroke:#aaaa33;fill:#fff5ad;}#mermaid-svg-ijpzo3uT6JJkGBcK .noteText,#mermaid-svg-ijpzo3uT6JJkGBcK .noteTexttspan{fill:black;stroke:none;}#mermaid-svg-ijpzo3uT6JJkGBcK .activation0{fill:#f4f4f4;stroke:#666;}#mermaid-svg-ijpzo3uT6JJkGBcK .activation1{fill:#f4f4f4;stroke:#666;}#mermaid-svg-ijpzo3uT6JJkGBcK .activation2{fill:#f4f4f4;stroke:#666;}#mermaid-svg-ijpzo3uT6JJkGBcK .actorPopupMenu{position:absolute;}#mermaid-svg-ijpzo3uT6JJkGBcK .actorPopupMenuPanel{position:absolute;fill:#ECECFF;box-shadow:0px 8px 16px 0px rgba(0,0,0,0.2);filter:drop-shadow(3px 5px 2px rgb(0 0 0 / 0.4));}#mermaid-svg-ijpzo3uT6JJkGBcK .actor-man line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-ijpzo3uT6JJkGBcK .actor-man circle,#mermaid-svg-ijpzo3uT6JJkGBcK line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;stroke-width:2px;}#mermaid-svg-ijpzo3uT6JJkGBcK :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} t1 t2 INSTANCE 17new 20dup 21invokespecial(调用构造方法) 0getstatic(获取INSTANCE引用) 24putstatic(给INSTANCE赋值) 3ifnonnull 37(判断不为空跳转37行) 37getstatic(获取INSTAMCE引用) 40areturn(返回) 使用对象 t1 t2 INSTANCE happens-before
happens-before 规定了对共享变量的写操作对其它线程的读操作可见它是可见性与有序性的一套规则总结抛 开以下 happens-before 规则JMM 并不能保证一个线程对共享变量的写对于其它线程对该共享变量的读可见
线程解锁 m 之前对变量的写对 m 加锁的其它线程对该变量的读可见
static int x;
static Object m new Object();
new Thread(() - {synchronized(m) {x 10;}
}, t1).start();
new Thread(() - {synchronized(m) {System.out.println(x);}
}, t2).start();线程对 volatile 变量的写其它线程对该变量的读可见
volatile static int x;
new Thread(() - {x 10;
}, t1).start();
new Thread(() - {System.out.println(x);
}, t2).start();线程 start 前对变量的写线程开始后对该变量的读可见
static int x;
x 10;
new Thread(() - {System.out.println(x);
}, t2).start();线程结束前对变量的写其它线程得知它结束后对该变量的读可见比如其它线程调用 t1.isAlive() 或 t1.join()等待 它结束
static int x;
Thread t1 new Thread(() - {x 10;
}, t1);
t1.start();
t1.join();
System.out.println(x);线程 t1 打断 t2interrupt前对变量的写 t2 被打断后对该变量的读可见通过 t2.interrupted 或 t2.isInterrupted
static int x;
public static void main(String[] args) {Thread t2 new Thread(() - {while (true) {if (Thread.currentThread().isInterrupted()) {System.out.println(x);break;}}}, t2);t2.start();new Thread(() - {sleep(1);x 10;t2.interrupt();}, t1).start();while (!t2.isInterrupted()) {Thread.yield();}System.out.println(x);
}对变量默认值0falsenull的写对其它线程对该变量的读可见具有传递性如果 x hb- y 并且 y hb- z 那么有 x hb- z 配合 volatile 的防指令重排有下面的例子
volatile static int x;
static int y;
new Thread(() - {y 10;x 20;
}, t1).start();
new Thread(() - {// x20 对 t2 可见, 同时 y10 也对 t2 可见System.out.println(x);
}, t2).start();变量都是指成员变量或静态成员变量 4.4 单例模式
单例模式有很多实现方法饿汉、懒汉、静态内部类、枚举类以下示例中仅考虑创建时的安全不考虑单例对象自己属性的线程安全 饿汉式类加载就会导致该单实例对象被创建 懒汉式类加载不会导致该单实例对象被创建而是首次使用该对象时才会创建 1.饿汉单例
// 问题1为什么加 final--防止子类覆盖破坏父类的方法导致线程不安全
// 问题2如果实现了序列化接口, 还要做什么来防止反序列化破坏单例
public final class Singleton implements Serializable {// 问题3为什么设置为私有? 是否能防止反射创建新的实例?--防止外部调用生成对象不能防止反射创建private Singleton() {}// 问题4这样初始化是否能保证单例对象创建时的线程安全?--可以由jvm类加载保证private static final Singleton INSTANCE new Singleton();// 问题5为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由--可额外做些处 // 理更好的封装性public static Singleton getInstance() {return INSTANCE;}// 问题2解决反序列化返回原对象public Object readResolve() {return INSTANCE;}
}2. 枚举单例
// 问题1枚举单例是如何限制实例个数的--本质上是静态成员变量public final static Singleton INSTANCE
// 问题2枚举单例在创建时是否有并发问题--不会类加载机制
// 问题3枚举单例能否被反射破坏单例--不能
// 问题4枚举单例能否被反序列化破坏单例--不能抽象父类Enum实现了序列化接口
// 问题5枚举单例属于懒汉式还是饿汉式--饿汉式
// 问题6枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做--加一个构造方法里面写逻辑
enum Singleton {INSTANCE;
}3. 懒汉单例
public final class Singleton {private Singleton() {}private static Singleton INSTANCE null;// 分析这里的线程安全, 并说明有什么缺点--安全锁范围太大性能太差public static synchronized Singleton getInstance() {if (INSTANCE ! null) {return INSTANCE;}INSTANCE new Singleton();return INSTANCE;}
}4. DCL 懒汉单例
public final class Singleton {private Singleton() {}// 问题1解释为什么要加 volatile ?--禁止指令重排防止其他线程拿到未调用构造方法的单例对象// 分配内存空间--调用构造方法--赋值对象引用private static volatile Singleton INSTANCE null;// 问题2对比实现3, 说出这样做的意义--锁范围缩小性能比3好public static Singleton getInstance() {if (INSTANCE ! null) {return INSTANCE;}synchronized(Singleton.class) {// 问题3为什么还要在这里加为空判断, 之前不是判断过了吗-防止其他线程重复创建对象if (INSTANCE ! null) { // t2return INSTANCE;}INSTANCE new Singleton();return INSTANCE;}}
}5. 静态内部类懒汉单例
public final class Singleton {private Singleton() {}// 问题1属于懒汉式还是饿汉式--懒汉式第一次用到该类才去执行类加载操作private static class LazyHolder {static final Singleton INSTANCE new Singleton();}// 问题2在创建时是否有并发问题--没有jvm保证线程安全public static Singleton getInstance() {return LazyHolder.INSTANCE;}
}五、共享模型之无锁
5.1 问题提出
有如下需求保证 account.withdraw 取款方法的线程安全
interface Account {// 获取余额Integer getBalance();// 取款void withdraw(Integer amount);/*** 方法内会启动 1000 个线程每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(Account account) {List Thread ts new ArrayList ();long start System.nanoTime();for (int i 0; i 1000; i) {ts.add(new Thread(() - {account.withdraw(10);}));}ts.forEach(Thread::start);ts.forEach(t - {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end System.nanoTime();System.out.println(account.getBalance() cost: (end - start) / 1000 _000 ms);}
}原有实现并不是线程安全的
class AccountUnsafe implements Account {private Integer balance;public AccountUnsafe(Integer balance) {this.balance balance;}Overridepublic Integer getBalance() {return balance;}Overridepublic void withdraw(Integer amount) {balance - amount;}
}执行测试代码
public static void main(String[] args) {Account.demo(new AccountUnsafe(10000));
}某次执行结果
330 cost: 306 ms 解决思路-锁
首先想到的是给 Account 对象加锁
class AccountUnsafe implements Account {private Integer balance;public AccountUnsafe(Integer balance) {this.balance balance;}Overridepublic synchronized Integer getBalance() {return balance;}Overridepublic synchronized void withdraw(Integer amount) {balance - amount;}
}结果为
0 cost: 399 ms 解决思路-无锁
class AccountSafe implements Account {private AtomicInteger balance;public AccountSafe(Integer balance) {this.balance new AtomicInteger(balance);}Overridepublic Integer getBalance() {return balance.get();}Overridepublic void withdraw(Integer amount) {while (true) {int prev balance.get();int next prev - amount;if (balance.compareAndSet(prev, next)) {break;}}// 可以简化为下面的方法// balance.addAndGet(-1 * amount);}
}执行测试代码
public static void main(String[] args) {Account.demo(new AccountSafe(10000));
}结果为
0 cost: 302 ms 5.2 CAS 与 volatile
AtomicInteger的方法内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢
public void withdraw(Integer amount) {// 需要不断尝试直到成功为止while (true) {// 比如拿到了旧值 1000int prev balance.get();// 在这个基础上 1000-10 990int next prev - amount;/*compareAndSet 正是做这个检查在 set 前先比较 prev 与当前值- 不一致了next 作废返回 false 表示失败比如别的线程已经做了减法当前值已经被减成了 990那么本线程的这次 990 就作废了进入 while 下次循环重试- 一致以 next 设置为新值返回 true 表示成功跳出循环*/if (balance.compareAndSet(prev, next)) {break;}}
}其中的关键是 compareAndSet它的简称就是 CAS 也有 Compare And Swap 的说法它必须是原子操作。 #mermaid-svg-0E7qkNjdUZR9njxJ {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-0E7qkNjdUZR9njxJ .error-icon{fill:#552222;}#mermaid-svg-0E7qkNjdUZR9njxJ .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-0E7qkNjdUZR9njxJ .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-0E7qkNjdUZR9njxJ .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-0E7qkNjdUZR9njxJ .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-0E7qkNjdUZR9njxJ .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-0E7qkNjdUZR9njxJ .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-0E7qkNjdUZR9njxJ .marker{fill:#333333;stroke:#333333;}#mermaid-svg-0E7qkNjdUZR9njxJ .marker.cross{stroke:#333333;}#mermaid-svg-0E7qkNjdUZR9njxJ svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-0E7qkNjdUZR9njxJ .actor{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-0E7qkNjdUZR9njxJ text.actortspan{fill:black;stroke:none;}#mermaid-svg-0E7qkNjdUZR9njxJ .actor-line{stroke:grey;}#mermaid-svg-0E7qkNjdUZR9njxJ .messageLine0{stroke-width:1.5;stroke-dasharray:none;stroke:#333;}#mermaid-svg-0E7qkNjdUZR9njxJ .messageLine1{stroke-width:1.5;stroke-dasharray:2,2;stroke:#333;}#mermaid-svg-0E7qkNjdUZR9njxJ #arrowhead path{fill:#333;stroke:#333;}#mermaid-svg-0E7qkNjdUZR9njxJ .sequenceNumber{fill:white;}#mermaid-svg-0E7qkNjdUZR9njxJ #sequencenumber{fill:#333;}#mermaid-svg-0E7qkNjdUZR9njxJ #crosshead path{fill:#333;stroke:#333;}#mermaid-svg-0E7qkNjdUZR9njxJ .messageText{fill:#333;stroke:#333;}#mermaid-svg-0E7qkNjdUZR9njxJ .labelBox{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-0E7qkNjdUZR9njxJ .labelText,#mermaid-svg-0E7qkNjdUZR9njxJ .labelTexttspan{fill:black;stroke:none;}#mermaid-svg-0E7qkNjdUZR9njxJ .loopText,#mermaid-svg-0E7qkNjdUZR9njxJ .loopTexttspan{fill:black;stroke:none;}#mermaid-svg-0E7qkNjdUZR9njxJ .loopLine{stroke-width:2px;stroke-dasharray:2,2;stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);}#mermaid-svg-0E7qkNjdUZR9njxJ .note{stroke:#aaaa33;fill:#fff5ad;}#mermaid-svg-0E7qkNjdUZR9njxJ .noteText,#mermaid-svg-0E7qkNjdUZR9njxJ .noteTexttspan{fill:black;stroke:none;}#mermaid-svg-0E7qkNjdUZR9njxJ .activation0{fill:#f4f4f4;stroke:#666;}#mermaid-svg-0E7qkNjdUZR9njxJ .activation1{fill:#f4f4f4;stroke:#666;}#mermaid-svg-0E7qkNjdUZR9njxJ .activation2{fill:#f4f4f4;stroke:#666;}#mermaid-svg-0E7qkNjdUZR9njxJ .actorPopupMenu{position:absolute;}#mermaid-svg-0E7qkNjdUZR9njxJ .actorPopupMenuPanel{position:absolute;fill:#ECECFF;box-shadow:0px 8px 16px 0px rgba(0,0,0,0.2);filter:drop-shadow(3px 5px 2px rgb(0 0 0 / 0.4));}#mermaid-svg-0E7qkNjdUZR9njxJ .actor-man line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;}#mermaid-svg-0E7qkNjdUZR9njxJ .actor-man circle,#mermaid-svg-0E7qkNjdUZR9njxJ line{stroke:hsl(259.6261682243, 59.7765363128%, 87.9019607843%);fill:#ECECFF;stroke-width:2px;}#mermaid-svg-0E7qkNjdUZR9njxJ :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 线程1 Acount对象 线程2 获取余额100 减10 90 已经修改为90了 cas(100, 90) cas失败再次获取余额90 减10 80 已经修改为80了 cas(90, 80) cas失败再次获取余额80 减10 70 cas(80, 70)cas成功 线程1 Acount对象 线程2 注意 其实 CAS 的底层是 lock cmpxchg 指令X86 架构在单核 CPU 和多核 CPU 下都能够保证【比较-交 换】的原子性 在多核状态下某个核执行到带 lock 的指令时CPU会让总线锁住当这个核把此指令执行完毕再开启总线。这个过程中不会被线程的调度机制所打断保证了多个线程对内存操作的准确性是原子的。 volatile
获取共享变量时为了保证该变量的可见性需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量可以避免线程从自己的工作缓存中查找变量的值必须到主存中获取 它的值线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改对另一个线程可见。 注意 volatile 仅仅保证了共享变量的可见性让其它线程能够看到最新值但不能解决指令交错问题不能保证原 子性 CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果
为什么无锁效率高
无锁情况下即使重试失败线程始终在高速运行没有停歇而 synchronized 会让线程在没有获得锁的时 候发生上下文切换进入阻塞。打个比喻线程就好像高速跑道上的赛车高速运行时速度超快一旦发生上下文切换就好比赛车要减速、熄火被唤醒又得重新打火、启动、加速… 恢复到高速运行代价比较大但无锁情况下因为线程要保持运行需要额外 CPU 的支持CPU 在这里就好比高速跑道没有额外的跑道线程想高速运行也无从谈起虽然不会进入阻塞但由于没有分到时间片仍然会进入可运行状态还 是会导致上下文切换。
CAS 的特点
结合 CAS 和 volatile 可以实现无锁并发适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁的思想最乐观的估计不怕别的线程来修改共享变量就算改了也没关系我吃亏点再 重试呗。synchronized 是基于悲观锁的思想最悲观的估计得防着其它线程来修改共享变量我上了锁你们都别想改我改完了解开锁你们才有机会。CAS 体现的是无锁并发、无阻塞并发 因为没有使用 synchronized所以线程不会陷入阻塞这是效率提升的因素之一但如果竞争激烈可以想到重试必然频繁发生反而效率会受影响
5.3 原子整数
AtomicBooleanAtomicIntegerAtomicLong
以 AtomicInteger 为例
AtomicInteger i new AtomicInteger(0);// 获取并自增i 0, 结果 i 1, 返回 0类似于 i
System.out.println(i.getAndIncrement());// 自增并获取i 1, 结果 i 2, 返回 2类似于 i
System.out.println(i.incrementAndGet());// 自减并获取i 2, 结果 i 1, 返回 1类似于 --i
System.out.println(i.decrementAndGet());// 获取并自减i 1, 结果 i 0, 返回 1类似于 i--
System.out.println(i.getAndDecrement());// 获取并加值i 0, 结果 i 5, 返回 0
System.out.println(i.getAndAdd(5));// 加值并获取i 5, 结果 i 0, 返回 0
System.out.println(i.addAndGet(-5));// 获取并更新i 0, p 为 i 的当前值, 结果 i -2, 返回 0
// 其中函数中的操作能保证原子但函数需要无副作用
System.out.println(i.getAndUpdate(p - p - 2));// 更新并获取i -2, p 为 i 的当前值, 结果 i 0, 返回 0
// 其中函数中的操作能保证原子但函数需要无副作用
System.out.println(i.updateAndGet(p - p 2));// 获取并计算i 0, p 为 i 的当前值, x 为参数1, 结果 i 10, 返回 0
// 其中函数中的操作能保证原子但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) - p x));// 计算并获取i 10, p 为 i 的当前值, x 为参数1, 结果 i 0, 返回 0
// 其中函数中的操作能保证原子但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) - p x5.4 原子引用
AtomicReferenceAtomicMarkableReferenceAtomicStampedReference
有如下方法
public interface DecimalAccount {// 获取余额BigDecimal getBalance();// 取款void withdraw(BigDecimal amount);/*** 方法内会启动 1000 个线程每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(DecimalAccount account) {ListThread ts new ArrayList();for (int i 0; i 1000; i) {ts.add(new Thread(() - {account.withdraw(BigDecimal.TEN);}));}ts.forEach(Thread::start);ts.forEach(t - {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(account.getBalance());}
}试着提供不同的 DecimalAccount 实现实现安全的取款操作
不安全实现
class DecimalAccountUnsafe implements DecimalAccount {BigDecimal balance;public DecimalAccountUnsafe(BigDecimal balance) {this.balance balance;}Overridepublic BigDecimal getBalance() {return balance;}Overridepublic void withdraw(BigDecimal amount) {BigDecimal balance this.getBalance();this.balance balance.subtract(amount);}
}安全实现-使用锁
class DecimalAccountSafeLock implements DecimalAccount {private final Object lock new Object();BigDecimal balance;public DecimalAccountSafeLock(BigDecimal balance) {this.balance balance;}Overridepublic BigDecimal getBalance() {return balance;}Overridepublic void withdraw(BigDecimal amount) {synchronized (lock) {BigDecimal balance this.getBalance();this.balance balance.subtract(amount);}}
}安全实现-使用 CAS
class DecimalAccountSafeCas implements DecimalAccount {AtomicReferenceBigDecimal ref;public DecimalAccountSafeCas(BigDecimal balance) {ref new AtomicReference(balance);}Overridepublic BigDecimal getBalance() {return ref.get();}Overridepublic void withdraw(BigDecimal amount) {while (true) {BigDecimal prev ref.get();BigDecimal next prev.subtract(amount);if (ref.compareAndSet(prev, next)) {break;}}}
}测试代码
DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal(10000)));
DecimalAccount.demo(new DecimalAccountSafeLock(new BigDecimal(10000)));
DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal(10000)));运行结果
4310 cost: 425 ms
0 cost: 285 ms
0 cost: 274 msABA 问题及解决
ABA问题
Slf4j
public class ReferenceTest {static AtomicReferenceString ref new AtomicReference(A);public static void main(String[] args) throws InterruptedException {log.debug(main start...);// 获取值 A// 这个共享变量被它线程修改过String prev ref.get();other();sleep(1000);// 尝试改为 Clog.debug(change A-C {}, ref.compareAndSet(prev, C));}private static void other() {new Thread(() - {log.debug(change A-B {}, ref.compareAndSet(ref.get(), B));}, t1).start();sleep(500);new Thread(() - {log.debug(change B-A {}, ref.compareAndSet(ref.get(), A));}, t2).start();}private static void sleep(int millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}
}输出
21:14:58.245 [main] DEBUG com.zhou.cas.Test - main start...
21:14:58.316 [t1] DEBUG com.zhou.cas.Test - change A-B true
21:14:58.823 [t2] DEBUG com.zhou.cas.Test - change B-A true
21:14:59.834 [main] DEBUG com.zhou.cas.Test - change A-C true主线程仅能判断出共享变量的值与最初值 A 是否相同不能感知到这种从 A 改为 B 又 改回 A 的情况如果主线程 希望
只要有其它线程【动过了】共享变量那么自己的 cas 就算失败这时仅比较值是不够的需要再加一个版本号
AtomicStampedReference
static AtomicStampedReferenceString ref new AtomicStampedReference(A, 0);public static void main(String[] args) throws InterruptedException {log.debug(main start...);// 获取值 AString prev ref.getReference();// 获取版本号int stamp ref.getStamp();log.debug(版本 {}, stamp);// 如果中间有其它线程干扰发生了 ABA 现象other();sleep(1000);// 尝试改为 Clog.debug(change A-C {}, ref.compareAndSet(prev, C, stamp, stamp 1));}private static void other() {new Thread(() - {log.debug(change A-B {}, ref.compareAndSet(ref.getReference(), B,ref.getStamp(), ref.getStamp() 1));log.debug(更新版本为 {}, ref.getStamp());}, t1).start();sleep(500);new Thread(() - {log.debug(change B-A {}, ref.compareAndSet(ref.getReference(), A,ref.getStamp(), ref.getStamp() 1));log.debug(更新版本为 {}, ref.getStamp());}, t2).start();}输出
21:20:45.912 [main] DEBUG com.zhou.cas.StampedTest - main start...
21:20:45.912 [main] DEBUG com.zhou.cas.StampedTest - 版本 0
21:20:45.974 [t1] DEBUG com.zhou.cas.StampedTest - change A-B true
21:20:45.974 [t1] DEBUG com.zhou.cas.StampedTest - 更新版本为 1
21:20:46.474 [t2] DEBUG com.zhou.cas.StampedTest - change B-A true
21:20:46.474 [t2] DEBUG com.zhou.cas.StampedTest - 更新版本为 2
21:20:47.475 [main] DEBUG com.zhou.cas.StampedTest - change A-C falseAtomicStampedReference 可以给原子引用加上版本号追踪原子引用整个的变化过程如 A - B - A - C 通过AtomicStampedReference我们可以知道引用变量中途被更改了几次。
但是有时候并不关心引用变量更改了几次只是关心是否更改过所以就有了 AtomicMarkableReference
AtomicMarkableReference
class GarbageBag {String desc;public GarbageBag(String desc) {this.desc desc;}public void setDesc(String desc) {this.desc desc;}Overridepublic String toString() {return super.toString() desc;}
}Slf4j
public class MarkableReferenceTest {public static void main(String[] args) throws InterruptedException {GarbageBag bag new GarbageBag(装满了垃圾);// 参数2 mark 可以看作一个标记表示垃圾袋满了AtomicMarkableReferenceGarbageBag ref new AtomicMarkableReference(bag, true);log.debug(主线程 start...);GarbageBag prev ref.getReference();log.debug(prev.toString());new Thread(() - {log.debug(打扫卫生的线程 start...);bag.setDesc(空垃圾袋);while (!ref.compareAndSet(bag, bag, true, false)) {}log.debug(bag.toString());}).start();Thread.sleep(1000);log.debug(主线程想换一只新垃圾袋);boolean success ref.compareAndSet(prev, new GarbageBag(空垃圾袋), true, false);log.debug(换了么 success);log.debug(ref.getReference().toString());}
}输出此时主线程就知道有其他线程更换了垃圾袋自己不用再去更换了
21:28:52.283 [main] DEBUG com.zhou.cas.MarkableReferenceTest - 主线程 start...
21:28:52.285 [main] DEBUG com.zhou.cas.MarkableReferenceTest - com.zhou.cas.GarbageBag2e0fa5d3 装满了垃圾
21:28:52.333 [Thread-0] DEBUG com.zhou.cas.MarkableReferenceTest - 打扫卫生的线程 start...
21:28:52.333 [Thread-0] DEBUG com.zhou.cas.MarkableReferenceTest - com.zhou.cas.GarbageBag2e0fa5d3 空垃圾袋
21:28:53.334 [main] DEBUG com.zhou.cas.MarkableReferenceTest - 主线程想换一只新垃圾袋
21:28:53.334 [main] DEBUG com.zhou.cas.MarkableReferenceTest - 换了么false
21:28:53.334 [main] DEBUG com.zhou.cas.MarkableReferenceTest - com.zhou.cas.GarbageBag2e0fa5d3 空垃圾袋可以注释掉打扫卫生线程代码再观察输出
5.5 原子数组
AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray
有如下方法
/*** 参数1提供数组、可以是线程不安全数组或线程安全数组* 参数2获取数组长度的方法* 参数3自增方法回传 array, index* 参数4打印数组的方法*/
// supplier 提供者 无中生有 ()-结果
// function 函数 一个参数一个结果 (参数)-结果 , BiFunction (参数1,参数2)-结果
// consumer 消费者 一个参数没结果 (参数)-void, BiConsumer (参数1,参数2)-private static T void demo(SupplierT arraySupplier,FunctionT, Integer lengthFun,BiConsumerT, Integer putConsumer,ConsumerT printConsumer) {ListThread ts new ArrayList();T array arraySupplier.get();int length lengthFun.apply(array);for (int i 0; i length; i) {// 每个线程对数组作 10000 次操作ts.add(new Thread(() - {for (int j 0; j 10000; j) {putConsumer.accept(array, j % length);}}));}ts.forEach(t - t.start()); // 启动所有线程ts.forEach(t - {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}); // 等所有线程结束printConsumer.accept(array);}不安全的数组
demo(()-new int[10],(array)-array.length,(array, index) - array[index],array- System.out.println(Arrays.toString(array))
);结果
[9870, 9862, 9774, 9697, 9683, 9678, 9679, 9668, 9680, 9698] 安全的数组
demo(()- new AtomicIntegerArray(10),(array) - array.length(),(array, index) - array.getAndIncrement(index),array - System.out.println(array)
);结果
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000] 5.6 字段更新器
AtomicReferenceFieldUpdater // 域 字段AtomicIntegerFieldUpdaterAtomicLongFieldUpdater
利用字段更新器可以针对对象的某个域Field进行原子操作只能配合 volatile 修饰的字段使用否则会出现异常
Exception in thread main java.lang.IllegalArgumentException: Must be volatile typepublic class ReferenceFieldTest {private volatile int field;public static void main(String[] args) {AtomicIntegerFieldUpdater fieldUpdater AtomicIntegerFieldUpdater.newUpdater(ReferenceFieldTest.class, field);ReferenceFieldTest test new ReferenceFieldTest();fieldUpdater.compareAndSet(test, 0, 10);// 修改成功 field 10System.out.println(test.field);// 修改成功 field 20fieldUpdater.compareAndSet(test, 10, 20);System.out.println(test.field);// 修改失败 field 20fieldUpdater.compareAndSet(test, 10, 30);System.out.println(test.field);}
}结果
10
20
205.7 原子累加器
累加器性能比较
private static T void demo(SupplierT adderSupplier, ConsumerT action) {T adder adderSupplier.get();long start System.nanoTime();ListThread ts new ArrayList();// 2 个线程每人累加 50 万for (int i 0; i 4; i) {ts.add(new Thread(() - {for (int j 0; j 500000; j) {action.accept(adder);}}));}ts.forEach(t - t.start());ts.forEach(t - {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end System.nanoTime();System.out.println(adder cost: (end - start) / 1000_000);}比较 AtomicLong 与 LongAdder
for (int i 0; i 5; i) {demo(() - new LongAdder(), adder - adder.increment());
}
for (int i 0; i 5; i) {demo(() - new AtomicLong(), adder - adder.getAndIncrement());
}输出
1000000 cost:43
1000000 cost:9
1000000 cost:7
1000000 cost:7
1000000 cost:71000000 cost:31
1000000 cost:27
1000000 cost:28
1000000 cost:24
1000000 cost:22性能提升的原因很简单就是在有竞争时设置多个累加单元Therad-0 累加 Cell[0]而 Thread-1 累加 Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量因此减少了 CAS 重试失败从而提高性 能。
cas 锁
Slf4j
// 不要用于实践
public class LockCas {private AtomicInteger state new AtomicInteger(0);public void lock() {while (true) {if (state.compareAndSet(0, 1)) {break;}}}public void unlock() {log.debug(unlock...);state.set(0);}public static void main(String[] args) {LockCas lock new LockCas();new Thread(() - {log.debug(begin...);lock.lock();try {log.debug(lock...);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} finally {lock.unlock();}}).start();new Thread(() - {log.debug(begin...);lock.lock();try {log.debug(lock...);} finally {lock.unlock();}}).start();}
}输出
22:03:11.771 [Thread-0] DEBUG com.zhou.cas.LockCas - begin...
22:03:11.771 [Thread-1] DEBUG com.zhou.cas.LockCas - begin...
22:03:11.771 [Thread-0] DEBUG com.zhou.cas.LockCas - lock...
22:03:12.771 [Thread-0] DEBUG com.zhou.cas.LockCas - unlock...
22:03:12.771 [Thread-1] DEBUG com.zhou.cas.LockCas - lock...
22:03:12.771 [Thread-1] DEBUG com.zhou.cas.LockCas - unlock...原理之伪共享
LongAdder中的Cell 即为累加单元源码如下
// 防止缓存行伪共享
sun.misc.Contended
static final class Cell {volatile long value;Cell(long x) {value x;}// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值final boolean cas(long prev, long next) {return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);}// 省略不重要代码
}缓存与内存的速度比较 每个CPU核心都有三级缓存其中第三级缓存各核心共享CPU核心和它们之间的访问速度如下
从 cpu 到大约需要的时钟周期寄存器1 cycle (4GHz 的 CPU 约为0.25ns)L13~4 cycleL210~20 cycleL340~45 cycle内存120~240 cycle
因为 CPU 与 内存的速度差异很大需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位每个缓存行对应着一块内存一般是 64 byte8 个 long
缓存的加入会造成数据副本的产生即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性如果某个 CPU 核心更改了数据其它 CPU 核心对应的整个缓存行必须失效 因为 Cell 是数组形式在内存中是连续存储的一个 Cell 为 24 字节16 字节的对象头和 8 字节的 value因为数组是连续存储的因此缓存行可以存下 2 个的 Cell 对象。这样问题来了
Core-0 要修改 Cell[0]Core-1 要修改 Cell[1]
无论谁修改成功都会导致对方 Core 的缓存行失效比如 Core-0 中 Cell[0]6000, Cell[1]8000 要累加 Cell[0]6001, Cell[1]8000 这时会让 Core-1 的缓存行失效
sun.misc.Contended 就是用来解决这个问题对某字段或对象加上该注解则表示该字段或对象会单独占用一个缓存行是通过在对象或字段的前后添加一定的padding实现的这样不会造成对方缓存行的失效提升效率
在 Java 8 之前是通过代码里手动添加属性的方式解决的如
class LongWithPadding {long value;// 一个 long 占 8 个字节再添加 7 个 long 属性就会变成 64 个字节刚好是一个缓存行大小long p0, p1, p2, p3, p4, p5, p6;
}LongAdder源码
LongAdder 类有几个关键域
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;LongAdder的累加主要调用下面的方法
public void add(long x) {// as 为累加单元数组// b 为基础值// x 为累加值Cell[] as; long b, v; int m; Cell a// 进入 if 的两个条件// 1. as 有值, 表示已经发生过竞争, 进入 if// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if if ((as cells) ! null || !casBase(b base, b x)) {// uncontended 表示 cell 没有竞争boolean uncontended true;if ( // as 还没有创建as null || (m as.length - 1) 0 ||// 当前线程对应的 cell 还没有(a as[getProbe() m]) null ||// cas 给当前线程的 cell 累加失败 uncontendedfalse ( a 为当前线程的 cell )!(uncontended a.cas(v a.value, v x)))// 进入 cell 数组创建、cell 创建的流程longAccumulate(x, null, uncontended);}}add 流程图 final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cellif ((h getProbe()) 0) {// 初始化 probeThreadLocalRandom.current();// h 对应新的 probe 值, 用来对应 cellh getProbe();wasUncontended true;}// collide 为 true 表示需要扩容boolean collide false;for (;;) {Cell[] as; Cell a; int n; long v;// 已经有了 cellsif ((as cells) ! null (n as.length) 0) {// 分支一还没有 cellif ((a as[(n - 1) h]) null) {// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x// 成功则 break, 否则继续 continue 循环}// 分支三// 有竞争, 改变线程对应的 cell 来重试 caselse if (!wasUncontended)wasUncontended true;// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 nullelse if (a.cas(v a.value, ((fn null) ? v x : fn.applyAsLong(v, x))))break;// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 caselse if (n NCPU || cells ! as)collide false;// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了else if (!collide)//没到达cpu数量的情况不会立刻扩容而是将collide标记为true然后先试下一下换个 //cell能不能解决问题还不能解决才真正扩容也就是会再给一次机会因为collide标 //为true下次循环就不会进入这个分支了collide true;// 加锁else if (cellsBusy 0 casCellsBusy()) {// 加锁成功, 扩容continue;}// 改变线程对应的 cellh advanceProbe(h);}// 分支二还没有 cells, 尝试给 cellsBusy 加锁else if (cellsBusy 0 cells as casCellsBusy()) {// 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell// 成功则 break;}// 分支三上两种情况失败, 尝试给 base 累加else if (casBase(v base, ((fn null) ? v x : fn.applyAsLong(v, x))))break;}
}longAccumulate 流程图
分支二还没有 cells, 尝试给 cellsBusy 加锁 分支一还没有 cell 每个线程刚进入 longAccumulate 时会尝试对应一个 cell 对象找到一个坑位
注意
此图有误当没有超过CPU上限时会先改变线程对应的cell而不是立马走加锁扩容操作如果下次循环还没累加成功才会加锁走扩容操作
分支三 获取最终结果通过 sum 方法
public long sum() {Cell[] as cells; Cell a;long sum base;if (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null)sum a.value;}}return sum;}5.8 Unsafe
概述
Unsafe 对象提供了非常底层的操作内存、线程的方法Unsafe 对象不能直接调用只能通过反射获得
public class UnsafeAccessor {static Unsafe unsafe;static {try {Field theUnsafe Unsafe.class.getDeclaredField(theUnsafe);theUnsafe.setAccessible(true);unsafe (Unsafe) theUnsafe.get(null);} catch (NoSuchFieldException | IllegalAccessException e) {throw new Error(e);}}static Unsafe getUnsafe() {return unsafe;}
}Unsafe CAS 操作
Data
class Student {volatile int id;volatile String name;public static void main(String[] args) throws NoSuchFieldException {Unsafe unsafe UnsafeAccessor.getUnsafe();Field id Student.class.getDeclaredField(id);Field name Student.class.getDeclaredField(name);// 获得成员变量的偏移量long idOffset unsafe.objectFieldOffset(id);long nameOffset unsafe.objectFieldOffset(name);Student student new Student();// 使用cas方法替换成员变量的值unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回trueunsafe.compareAndSwapObject(student, nameOffset, null, 张三); // 返回trueSystem.out.println(student);}
}输出
Student(id20, name张三)使用自定义的 AtomicData 实现之前线程安全的原子整数 Account 实现
class AtomicData {private volatile int data;static final Unsafe unsafe;static final long DATA_OFFSET;static {unsafe UnsafeAccessor.getUnsafe();try {// data 属性在 DataContainer 对象中的偏移量用于 Unsafe 直接访问该属性DATA_OFFSET unsafe.objectFieldOffset(AtomicData.class.getDeclaredField(data));} catch (NoSuchFieldException e) {throw new Error(e);}}public AtomicData(int data) {this.data data;}public void decrease(int amount) {int oldValue;while (true) {// 获取共享变量旧值可以在这一行加入断点修改 data 调试来加深理解oldValue data;// cas 尝试修改 data 为 旧值 amount如果期间旧值被别的线程改了返回 falseif (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {return;}}}public int getData() {return data;}
}Account实现
Account.demo(new Account() {AtomicData atomicData new AtomicData(10000);Overridepublic Integer getBalance() {return atomicData.getData();}Overridepublic void withdraw(Integer amount) {atomicData.decrease(amount);}
});六、共享模型之不可变
6.1 日期转换的问题
问题提出
下面的代码在运行时由于 SimpleDateFormat 不是线程安全的
SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd);
for (int i 0; i 10; i) {new Thread(() - {try {log.debug({}, sdf.parse(1951-04-21));} catch (Exception e) {log.error({}, e);}}).start();
}有很大几率出现 java.lang.NumberFormatException 或者出现不正确的日期解析结果例如
19:10:40.859 [Thread-2] c.TestDateParse - {}
java.lang.NumberFormatException: For input string: at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)at java.lang.Long.parseLong(Long.java:601)at java.lang.Long.parseLong(Long.java:631)at java.text.DigitList.getLong(DigitList.java:195)at java.text.DecimalFormat.parse(DecimalFormat.java:2084)at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:2162)at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)at java.text.DateFormat.parse(DateFormat.java:364)at cn.itcast.n7.TestDateParse.lambda$test1$0(TestDateParse.java:18)at java.lang.Thread.run(Thread.java:748)
19:10:40.859 [Thread-1] c.TestDateParse - {}
java.lang.NumberFormatException: empty Stringat sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)at java.lang.Double.parseDouble(Double.java:538)at java.text.DigitList.getDouble(DigitList.java:169)at java.text.DecimalFormat.parse(DecimalFormat.java:2089)at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:2162)at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1514)at java.text.DateFormat.parse(DateFormat.java:364)at cn.itcast.n7.TestDateParse.lambda$test1$0(TestDateParse.java:18)at java.lang.Thread.run(Thread.java:748)
19:10:40.857 [Thread-8] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-9] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-6] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-4] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-5] c.TestDateParse - Mon Apr 21 00:00:00 CST 178960645
19:10:40.857 [Thread-0] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-7] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-3] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951思路 - 同步锁
这样虽能解决问题但带来的是性能上的损失并不算很好
SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd);
for (int i 0; i 50; i) {new Thread(() - {synchronized(sdf) {try {log.debug({}, sdf.parse(1951-04-21));} catch (Exception e) {log.error({}, e);}}}).start();
}思路 - 不可变
如果一个对象在不能够修改其内部状态属性那么它就是线程安全的因为不存在并发修改这样的对象在 Java 中有很多例如在 Java 8 后提供了一个新的日期格式化类
DateTimeFormatter dtf DateTimeFormatter.ofPattern(yyyy-MM-dd);
for (int i 0; i 10; i) {new Thread(() - {LocalDate date dtf.parse(2018-10-01, LocalDate::from);log.debug({}, date);}).start();
}不可变对象实际是另一种避免竞争的方式。
6.2 不可变设计
以String类为例说明下不可变设计的要素
public final class String
implements java.io.Serializable, Comparable String , CharSequence {/** The value is used for character storage. */private final char value[];/** Cache the hash code for the string */private int hash; // Default to 0// ...}final 的使用
发现该类、类中所有属性都是 final 的
属性用 final 修饰保证了该属性是只读的不能修改类用 final 修饰保证了该类中的方法不能被覆盖防止子类无意间破坏不可变性
保护性拷贝
使用字符串时也有一些跟修改相关的方法比如 substring 等它们是如何实现不可变的就以 substring 为例
public String substring(int beginIndex) {if (beginIndex 0) {throw new StringIndexOutOfBoundsException(beginIndex);}int subLen value.length - beginIndex;if (subLen 0) {throw new StringIndexOutOfBoundsException(subLen);}return (beginIndex 0) ? this : new String(value, beginIndex, subLen);
}发现其内部是调用 String 的构造方法创建了一个新字符串再进入这个构造看看是否对 final char[] value 做出 了修改
public String(char value[], int offset, int count) {if (offset 0) {throw new StringIndexOutOfBoundsException(offset);}if (count 0) {if (count 0) {throw new StringIndexOutOfBoundsException(count);}if (offset value.length) {this.value .value;return;}}if (offset value.length - count) {throw new StringIndexOutOfBoundsException(offset count);}this.value Arrays.copyOfRange(value, offset, offset count);
}结果发现也没有构造新字符串对象时会生成新的 char[] value对内容进行复制 。这种通过创建副本对象来避 免共享的手段称之为【保护性拷贝defensive copy】
再来看下String的其中一个构造方法
public String(char value[]) {this.value Arrays.copyOf(value, value.length);
}为什么传进来的char数组还要进行拷贝赋值因为这个char数组是外部传进来的所以外部有可能会对这个数组的值进行修改从而会影响到String对象所以需要将这个数组进行拷贝赋值
模式之享元
1.定义
享元Flyweight)模式的定义
运用共享技术来有效地支持大量细粒度对象的复用。它通过共享已经存在的对象来大幅度减少需要创建的对象数量、避免大量相似类的开销从而提高系统资源的利用率。 享元模式的主要优点是
相同对象只要保存一份这降低了系统中对象的数量从而降低了系统中细粒度对象给内存带来的压力。 其主要缺点是
①为了使对象可以共享需要将一些不能共享的状态外部化这将增加程序的复杂性
②读取享元模式的外部状态会使得运行时间稍微变长。
2.体现
2.1 包装类
在JDK中 BooleanByteShortIntegerLongCharacter 等包装类提供了 valueOf 方法例如 Long 的 valueOf 会缓存 -128~127 之间的 Long 对象在这个范围之间会重用对象大于这个范围才会新建 Long 对 象
public static Long valueOf(long l) {final int offset 128;if (l -128 l 127) { // will cachereturn LongCache.cache[(int) l offset];}return new Long(l);
}注意 Byte, Short, Long 缓存的范围都是 -128~127Character 缓存的范围是 0~127Integer的默认范围是 -128~127 最小值不能变但最大值可以通过调整虚拟机参数 -Djava.lang.Integer.IntegerCache.high 来改变 Boolean 缓存了 TRUE 和 FALSE 2.2 String 串池
2.3 BigDecimal BigInteger
3. DIY
例如一个线上商城应用QPS 达到数千如果每次都重新创建和关闭数据库连接性能会受到极大影响。 这时 预先创建好一批连接放入连接池。一次请求到达后从连接池获取连接使用完毕后再还回连接池这样既节约 了连接的创建和关闭时间也实现了连接的重用不至于让庞大的连接数压垮数据库。
Slf4j
class Pool {// 1. 连接池大小private final int poolSize;// 2. 连接对象数组private Connection[] connections;// 3. 连接状态数组 0 表示空闲 1 表示繁忙private AtomicIntegerArray states;// 4. 构造方法初始化public Pool(int poolSize) {this.poolSize poolSize;this.connections new Connection[poolSize];this.states new AtomicIntegerArray(new int[poolSize]);for (int i 0; i poolSize; i) {connections[i] new MockConnection(连接 (i 1));}}// 5. 借连接public Connection borrow() {while (true) {for (int i 0; i poolSize; i) {// 获取空闲连接if (states.get(i) 0) {if (states.compareAndSet(i, 0, 1)) {log.debug(borrow {}, connections[i]);return connections[i];}}}// 如果没有空闲连接当前线程进入等待synchronized (this) {try {log.debug(wait...);this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 6. 归还连接public void free(Connection conn) {for (int i 0; i poolSize; i) {if (connections[i] conn) {states.set(i, 0);synchronized (this) {log.debug(free {}, conn);// 唤醒等待连接的线程this.notifyAll();}break;}}}
}class MockConnection implements Connection {private String name;public MockConnection(String name) {this.name name;}// 实现略
}使用连接池
Pool pool new Pool(2);for (int i 0; i 5; i) {new Thread(() - {Connection conn pool.borrow();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}pool.free(conn);}).start();}以上实现没有考虑
连接的动态增长与收缩连接保活可用性检测等待超时处理分布式 hash
对于关系型数据库有比较成熟的连接池实现例如c3p0, druid等对于更通用的对象池可以考虑使用apache commons pool例如redis连接池可以参考jedis中关于连接池的实现
原理之 final
1. 设置 final 变量的原理
public class TestFinal {final int a 20;
}字节码
0: aload_0
1: invokespecial #1 // Method java/lang/Object.init:()V
4: aload_0
5: bipush 20
7: putfield #2 // Field a:I-- 写屏障
10: return
发现 final 变量的赋值也会通过 putfield 指令来完成同样在这条指令之后也会加入写屏障保证在其它线程读到 它的值时不会出现为 0 的情况
6.3 无状态
在 web 阶段学习时设计 Servlet 时为了保证其线程安全都会有这样的建议不要为 Servlet 设置成员变量这 种没有任何成员变量的类是线程安全的 因为成员变量保存的数据也可以称为状态信息因此没有成员变量就称之为【无状态】 七、共享模型之工具–线程池
7.1 自定义线程池 步骤1自定义拒绝策略接口
拒绝策略(策略模式)当任务队列满时如何处理新增的任务
// 拒绝策略
FunctionalInterface
interface RejectPolicyT {void reject(BlockingQueueT queue, T task);
}步骤2自定义任务队列
供线程存取任务使用
Slf4j
public class BlockingQueueT {// 1. 任务队列private DequeT queue new ArrayDeque();// 2. 锁private ReentrantLock lock new ReentrantLock();// 3. 生产者条件变量private Condition fullWaitSet lock.newCondition();// 4. 消费者条件变量private Condition emptyWaitSet lock.newCondition();// 5. 容量private int capacity;public BlockingQueue(int capacity) {this.capacity capacity;}// 带超时阻塞获取public T poll(long timeout, TimeUnit unit) {lock.lock();try {// 将 timeout 统一转换为纳秒long nanos unit.toNanos(timeout);// 防止虚假唤醒while (queue.isEmpty()) {try {if (nanos 0) {return null;}// 返回值是剩余时间nanos emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞获取public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}T t queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞添加public void put(T task) {lock.lock();try {while (queue.size() capacity) {try {log.debug(等待加入任务队列 {} ..., task);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug(加入任务队列 {}, task);queue.addLast(task);emptyWaitSet.signal();} finally {lock.unlock();}}// 带超时时间阻塞添加public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos timeUnit.toNanos(timeout);while (queue.size() capacity) {try {if (nanos 0) {return false;}log.debug(等待加入任务队列 {} ..., task);nanos fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug(加入任务队列 {}, task);queue.addLast(task);emptyWaitSet.signal();return true;} finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}public void tryPut(RejectPolicyT rejectPolicy, T task) {lock.lock();try {// 判断队列是否满if (queue.size() capacity) {rejectPolicy.reject(this, task);} else { // 有空闲log.debug(加入任务队列 {}, task);queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}
}步骤3自定义线程池
管理线程提交任务
Slf4j
public class ThreadPool {// 任务队列private BlockingQueueRunnable taskQueue;// 线程集合private HashSetWorker workers new HashSet();// 核心线程数private int coreSize;// 获取任务时的超时时间private long timeout;// 获取任务时的超时时间单位private TimeUnit timeUnit;// 拒绝策略private RejectPolicyRunnable rejectPolicy;// 执行任务public void execute(Runnable task) {// 当任务数没有超过 coreSize 时直接交给 worker 对象执行// 如果任务数超过 coreSize 时加入任务队列暂存synchronized (workers) {if (workers.size() coreSize) {Worker worker new Worker(task);log.debug(新增 worker{}, {}, worker, task);workers.add(worker);worker.start();} else {
// taskQueue.put(task);// 当任务队列满时可以自定义实现以下拒绝策略处理// 1) 死等// 2) 带超时等待// 3) 让调用者放弃任务执行// 4) 让调用者抛出异常// 5) 让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}}public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,RejectPolicyRunnable rejectPolicy) {this.coreSize coreSize;this.timeout timeout;this.timeUnit timeUnit;this.taskQueue new BlockingQueue(queueCapcity);this.rejectPolicy rejectPolicy;}// 工作线程class Worker extends Thread {private Runnable task;public Worker(Runnable task) {this.task task;}Overridepublic void run() {// 执行任务// 1) 当 task 不为空执行任务// 2) 当 task 执行完毕再接着从任务队列获取任务并执行
// while(task ! null || (task taskQueue.take()) ! null) {while (task ! null || (task taskQueue.poll(timeout, timeUnit)) ! null) {try {log.debug(正在执行...{}, task);task.run();} catch (Exception e) {e.printStackTrace();} finally {// 如果不置为null则会重复执行当前任务task null;}}// 所有任务执行完成移除当前线程synchronized (workers) {log.debug(worker 被移除{}, this);workers.remove(this);}}}public static void main(String[] args) {ThreadPool threadPool new ThreadPool(1,1000, TimeUnit.MILLISECONDS, 1, (queue, task) - {// 1. 死等
// queue.put(task);// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行(什么都不用处理)
// log.debug(放弃{}, task);// 4) 让调用者抛出异常
// throw new RuntimeException(任务执行失败 task);// 5) 让调用者自己执行任务(主线程会执行这个任务)task.run();});for (int i 0; i 4; i) {int j i;threadPool.execute(() - {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug({}, j);});}}
}7.2 ThreadPoolExecutor
1线程池状态
用 int 的高 3 位来表示线程池状态低 29 位表示线程数量
状态名高 3 位接收新任 务处理阻塞队列任 务说明RUNNING111YYSHUTDOWN000NY不会接收新任务但会处理阻塞队列剩余任务STOP001NN会中断正在执行的任务并抛弃阻塞队列任务TIDYING010--任务全执行完毕活动线程为 0 即将进入终结TERMINATED011--终结状态
这些信息存储在一个原子变量 ctl 中目的是将线程池状态与线程个数合二为一用一次 cas 原子操作就可以进行赋值
private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));// rs 为高 3 位代表线程池状态 wc 为低 29 位代表线程个数ctl 是通过按位或合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }// 一次cas操作c 为旧值 ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));2构造方法
// 七大参数 根据这个构造方法JDK Executors类中提供了众多工厂方法来创建各种用途的线程池
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)corePoolSize 核心线程数目 (最多保留的线程数)maximumPoolSize 最大线程数目keepAliveTime 生存时间 - 针对救急线程unit 时间单位 - 针对救急线程workQueue 阻塞队列存放任务threadFactory 线程工厂 - 可以为线程创建时起名字handler 拒绝策略
工作方式
线程池中刚开始没有线程当一个任务提交给线程池后线程池会创建一个新线程来执行任务当线程数达到corePoolSize且没有线程空闲时这时再加入任务新加的任务会被加入workQueue队列排 队直到有空闲的线程如果队列选择了有界队列那么任务超过了队列大小时会创建 maximumPoolSize - corePoolSize 数目的线 程来救急如果线程到达maximumPoolSize且任务队列已经满了仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现其它著名框架也提供了实现 AbortPolicy 让调用者抛出 RejectedExecutionException 异常这是默认策略CallerRunsPolicy 让调用者运行任务DiscardPolicy 放弃本次任务DiscardOldestPolicy 放弃队列中最早的任务本任务取而代之Dubbo 的实现在抛出 RejectedExecutionException 异常之前会记录日志并 dump 线程栈信息方 便定位问题Netty 的实现是创建一个新线程来执行任务ActiveMQ 的实现带超时等待60s尝试放入队列PinPoint 的实现它使用了一个拒绝策略链会逐一尝试策略链中每种拒绝策略 当高峰过去后超过corePoolSize 的救急线程如果一段时间没有任务做需要结束节省资源这个时间由keepAliveTime 和 unit 来控制。
3newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable());}特点
核心线程数 最大线程数没有救急线程被创建因此也无需超时时间阻塞队列是无界的可以放(2^31) - 1数量(Integer.MAX_VALUE)的任务适用于任务量已知相对耗时的任务
4newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueueRunnable(),threadFactory);}希望多个任务排队执行。线程数固定为 1任务数多于 1 时会放入无界队列排队。任务执行完毕这唯一的线程 也不会被释放。 核心线程数是 0 最大线程数是 Integer.MAX_VALUE救急线程的空闲生存时间是 60s意味着 全部都是救急线程60s 后可以回收救急线程可以无限创建 队列采用了 SynchronousQueue 实现特点是没有容量没有线程来取是放不进去的 线程数会根据任务量不断增长没有上限当任务执行完毕空闲 1分钟后释放线程。 适合任务数比较密集但每个任务执行时间较短的情况如果任务时间太长会导致创建大量线程
5newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable()));}**使用场景 **
希望多个任务排队执行。线程数固定为 1任务数多于 1 时会放入无界队列排队。任务执行完毕线程也不会被释放。
区别
自己创建一个单线程串行执行任务如果任务执行失败而终止那么没有任何补救措施而线程池还会新建一 个线程保证池的正常工作Executors.newSingleThreadExecutor() 线程个数始终为1不能修改 FinalizableDelegatedExecutorService 应用的是装饰器模式只对外暴露了 ExecutorService 接口即使强转也不能调用 ThreadPoolExecutor 中特有的方法 Executors.newFixedThreadPool(1) 初始时为1以后还可以修改 对外暴露的是 ThreadPoolExecutor 对象可以强转后调用 setCorePoolSize 等方法进行修改
6提交任务
// 执行任务
void execute(Runnable command);// 提交任务 task用返回值 Future 获得任务执行结果
T FutureT submit(CallableT task);// 提交 tasks 中所有任务
T ListFutureT invokeAll(Collection? extends CallableT tasks)throws InterruptedException;// 提交 tasks 中所有任务带超时时间
T ListFutureT invokeAll(Collection? extends CallableT tasks,long timeout, TimeUnit unit) throws InterruptedException;// 提交 tasks 中所有任务哪个任务先成功执行完毕返回此任务执行结果其它任务取消
T T invokeAny(Collection? extends CallableT tasks)throws InterruptedException, ExecutionException;// 提交 tasks 中所有任务哪个任务先成功执行完毕返回此任务执行结果其它任务取消带超时时间
T T invokeAny(Collection? extends CallableT tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;7关闭线程池
shutdown
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();public void shutdown() {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程池状态advanceRunState(SHUTDOWN);// 仅会打断空闲线程interruptIdleWorkers();onShutdown(); // 扩展点 ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 尝试终结(没有运行的线程可以立刻终结如果还有运行的线程也不会等(指的是调用shutdown方法的线程不会等待任务执行完毕))tryTerminate();
}shutdownNow
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
ListRunnable shutdownNow();public List Runnable shutdownNow() {List Runnable tasks;final ReentrantLock mainLock this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程池状态advanceRunState(STOP);// 打断所有线程interruptWorkers();// 获取队列中剩余任务tasks drainQueue();} finally {mainLock.unlock();}// 尝试终结tryTerminate();return tasks;
}其它方法
// 不在 RUNNING 状态的线程池此方法就返回 true
boolean isShutdown();// 线程池状态是否是 TERMINATED
boolean isTerminated();// 调用 shutdown 后由于调用线程并不会等待所有任务运行结束因此如果它想在线程池 TERMINATED 后做些事
// 情可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;8任务调度线程池
可以使用 java.util.Timer来实现定时功能
Timer 的优点在于简单易用但由于所有任务都是由同一个线程来调度因此所有任务都是串行执行的同一时间只能有一个任务在执行前一个任务的延迟或异常都将会影响到之后的任务
public static void main(String[] args) {Timer timer new Timer();TimerTask task1 new TimerTask() {Overridepublic void run() {log.debug(task 1);sleep(2);}};TimerTask task2 new TimerTask() {Overridepublic void run() {log.debug(task 2);}};// 使用 timer 添加两个任务希望它们都在 1s 后执行// 但由于 timer 内只有一个线程来顺序执行队列中的任务因此『任务1』的延时影响了『任务2』的执行timer.schedule(task1, 1000);timer.schedule(task2, 1000);
}输出
20:46:09.444 c.TestTimer [main] - start...
20:46:10.447 c.TestTimer [Timer-0] - task 1
20:46:12.448 c.TestTimer [Timer-0] - task 2使用 ScheduledExecutorService 改写
ScheduledExecutorService executor Executors.newScheduledThreadPool(2);
// 添加两个任务希望它们都在 1s 后执行
executor.schedule(() - {System.out.println(任务1执行时间 new Date());try {Thread.sleep(2000);} catch (InterruptedException e) {}
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() - {System.out.println(任务2执行时间 new Date());
}, 1000, TimeUnit.MILLISECONDS);输出
任务1执行时间Thu Jan 03 12:45:17 CST 2019
任务2执行时间Thu Jan 03 12:45:17 CST 2019 scheduleAtFixedRate 例子
ScheduledExecutorService pool Executors.newScheduledThreadPool(1);
log.debug(start...);
// 每隔一秒执行一次
pool.scheduleAtFixedRate(() - {log.debug(running...);
}, 1, 1, TimeUnit.SECONDS);输出
21:45:43.167 c.TestTimer [main] - start...
21:45:44.215 c.TestTimer [pool-1-thread-1] - running...
21:45:45.215 c.TestTimer [pool-1-thread-1] - running...
21:45:46.215 c.TestTimer [pool-1-thread-1] - running...
21:45:47.215 c.TestTimer [pool-1-thread-1] - running...scheduleAtFixedRate 例子任务执行时间超过了间隔时间
ScheduledExecutorService pool Executors.newScheduledThreadPool(1);
log.debug(start...);
pool.scheduleAtFixedRate(() - {log.debug(running...);sleep(2);
}, 1, 1, TimeUnit.SECONDS);输出一开始延时 1s接下来由于任务执行时间 间隔时间间隔被『撑』到了 2s
21:44:30.311 c.TestTimer [main] - start...
21:44:31.360 c.TestTimer [pool-1-thread-1] - running...
21:44:33.361 c.TestTimer [pool-1-thread-1] - running...
21:44:35.362 c.TestTimer [pool-1-thread-1] - running...
21:44:37.362 c.TestTimer [pool-1-thread-1] - running..scheduleWithFixedDelay 例子
ScheduledExecutorService pool Executors.newScheduledThreadPool(1);
log.debug(start...);
pool.scheduleWithFixedDelay(()- {log.debug(running...);sleep(2);
}, 1, 1, TimeUnit.SECONDS);输出一开始延时 1sscheduleWithFixedDelay 的间隔是上一个任务结束才开始加上延时时间所以间隔都是 3s
21:40:55.078 c.TestTimer [main] - start...
21:40:56.140 c.TestTimer [pool-1-thread-1] - running...
21:40:59.143 c.TestTimer [pool-1-thread-1] - running...
21:41:02.145 c.TestTimer [pool-1-thread-1] - running...
21:41:05.147 c.TestTimer [pool-1-thread-1] - running...整个线程池表现为线程数固定任务数多于线程数时会放入无界队列排队。任务执行完毕这些线 程也不会被释放。用来执行延迟或反复执行的任务 9正确处理执行任务异常
方法1主动捉异常
ExecutorService pool Executors.newFixedThreadPool(1);
pool.submit(() - {try {log.debug(task1);int i 1 / 0;} catch (Exception e) {log.error(error:, e);}
});输出
21:59:04.558 c.TestTimer [pool-1-thread-1] - task1
21:59:04.562 c.TestTimer [pool-1-thread-1] - error:
java.lang.ArithmeticException: / by zeroat cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748) 方法2使用 Future
ExecutorService pool Executors.newFixedThreadPool(1);
Future Boolean f pool.submit(() - {log.debug(task1);int i 1 / 0;return true;
});
// 当有异常时Future的get方法会返回异常信息当没异常时会返回任务的结果
log.debug(result:{}, f.get());输出
21:54:58.208 c.TestTimer [pool-1-thread-1] - task1
Exception in thread main java.util.concurrent.ExecutionException:
java.lang.ArithmeticException: / by zeroat java.util.concurrent.FutureTask.report(FutureTask.java:122)at java.util.concurrent.FutureTask.get(FutureTask.java:192)at cn.itcast.n8.TestTimer.main(TestTimer.java:31)
Caused by: java.lang.ArithmeticException: / by zeroat cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)10 Tomcat 线程池 #mermaid-svg-BPt4V8bdIERpNJlU {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-BPt4V8bdIERpNJlU .error-icon{fill:#552222;}#mermaid-svg-BPt4V8bdIERpNJlU .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-BPt4V8bdIERpNJlU .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-BPt4V8bdIERpNJlU .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-BPt4V8bdIERpNJlU .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-BPt4V8bdIERpNJlU .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-BPt4V8bdIERpNJlU .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-BPt4V8bdIERpNJlU .marker{fill:#333333;stroke:#333333;}#mermaid-svg-BPt4V8bdIERpNJlU .marker.cross{stroke:#333333;}#mermaid-svg-BPt4V8bdIERpNJlU svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-BPt4V8bdIERpNJlU .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-BPt4V8bdIERpNJlU .cluster-label text{fill:#333;}#mermaid-svg-BPt4V8bdIERpNJlU .cluster-label span{color:#333;}#mermaid-svg-BPt4V8bdIERpNJlU .label text,#mermaid-svg-BPt4V8bdIERpNJlU span{fill:#333;color:#333;}#mermaid-svg-BPt4V8bdIERpNJlU .node rect,#mermaid-svg-BPt4V8bdIERpNJlU .node circle,#mermaid-svg-BPt4V8bdIERpNJlU .node ellipse,#mermaid-svg-BPt4V8bdIERpNJlU .node polygon,#mermaid-svg-BPt4V8bdIERpNJlU .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-BPt4V8bdIERpNJlU .node .label{text-align:center;}#mermaid-svg-BPt4V8bdIERpNJlU .node.clickable{cursor:pointer;}#mermaid-svg-BPt4V8bdIERpNJlU .arrowheadPath{fill:#333333;}#mermaid-svg-BPt4V8bdIERpNJlU .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-BPt4V8bdIERpNJlU .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-BPt4V8bdIERpNJlU .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-BPt4V8bdIERpNJlU .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-BPt4V8bdIERpNJlU .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-BPt4V8bdIERpNJlU .cluster text{fill:#333;}#mermaid-svg-BPt4V8bdIERpNJlU .cluster span{color:#333;}#mermaid-svg-BPt4V8bdIERpNJlU div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-BPt4V8bdIERpNJlU :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} Connector(NIO EndPoint) Executor 有读 有读 socketProcessor socketProcessor worker1 worker1 Acceptor LimitLatch SocketChannel 1 SocketChannel 2 Poller LimitLatch 用来限流可以控制最大连接个数类似 JUC中的 SemaphoreAcceptor 只负责【接收新的 socket 连接】Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】,IO多路复用一旦可读封装一个任务对象socketProcessor提交给 Executor 线程池处理Executor 线程池中的工作线程最终负责【处理请求】
Tomcat 线程池扩展了 ThreadPoolExecutor行为稍有不同
如果总线程数达到 maximumPoolSize 这时不会立刻抛 RejectedExecutionException 异常而是再次尝试将任务放入队列如果还失败才抛出 RejectedExecutionException 异常
源码 tomcat-7.0.42
public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {// 调用原线程池的执行方法super.execute(command);} catch (// 当线程数量和任务队列都满时捕获默认拒绝策略抛出的异常RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue (TaskQueue) super.getQueue();try {// 再次尝试放入任务队列if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException(Queue capacity is full.);}} catch (InterruptedException x) {submittedCount.decrementAndGet();Thread.interrupted();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}
}TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if (parent.isShutdown())throw new RejectedExecutionException(Executor not running, cant force a command into the queue);// 放入任务队列带时间return super.offer(o, timeout, unit); //forces the item onto the queue, to be used if the task is rejected
}Connector 配置
配置项默认值说明acceptorThreadCount1acceptor 线程数量pollerThreadCount1poller 线程数量一个线程即可IO多路复用minSpareThreads10核心线程数即 corePoolSizemaxThreads200最大线程数即 maximumPoolSizeexecutor-Executor 名称用来引用下面的 Executor如果配置了覆盖上面的属性
Executor 线程配置
配置项默认值说明threadPriority5线程优先级daemontrue是否守护线程minSpareThreads25核心线程数即 corePoolSizemaxThreads200最大线程数即 maximumPoolSizemaxIdleTime60000线程生存时间单位是毫秒默认值即 1 分钟maxQueueSizeInteger.MAX_VALUE队列长度prestartminSpareThreadsfalse核心线程是否在服务器启动时启动
执行流程
如果按以前线程池的步骤当线程池中的线程数量等于核心线程数时则会将后续任务添加到任务队列中但是Tomcat线程池的任务队列大小为Integer.MAX_VALUE在实际情况中救急线程根本无法创建出来救急导致并发降低影响请求响应时间所以tomcat做了一定的改进如下图 #mermaid-svg-QrBEAJg7uuhBj316 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-QrBEAJg7uuhBj316 .error-icon{fill:#552222;}#mermaid-svg-QrBEAJg7uuhBj316 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-QrBEAJg7uuhBj316 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-QrBEAJg7uuhBj316 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-QrBEAJg7uuhBj316 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-QrBEAJg7uuhBj316 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-QrBEAJg7uuhBj316 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-QrBEAJg7uuhBj316 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-QrBEAJg7uuhBj316 .marker.cross{stroke:#333333;}#mermaid-svg-QrBEAJg7uuhBj316 svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-QrBEAJg7uuhBj316 .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-QrBEAJg7uuhBj316 .cluster-label text{fill:#333;}#mermaid-svg-QrBEAJg7uuhBj316 .cluster-label span{color:#333;}#mermaid-svg-QrBEAJg7uuhBj316 .label text,#mermaid-svg-QrBEAJg7uuhBj316 span{fill:#333;color:#333;}#mermaid-svg-QrBEAJg7uuhBj316 .node rect,#mermaid-svg-QrBEAJg7uuhBj316 .node circle,#mermaid-svg-QrBEAJg7uuhBj316 .node ellipse,#mermaid-svg-QrBEAJg7uuhBj316 .node polygon,#mermaid-svg-QrBEAJg7uuhBj316 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-QrBEAJg7uuhBj316 .node .label{text-align:center;}#mermaid-svg-QrBEAJg7uuhBj316 .node.clickable{cursor:pointer;}#mermaid-svg-QrBEAJg7uuhBj316 .arrowheadPath{fill:#333333;}#mermaid-svg-QrBEAJg7uuhBj316 .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-QrBEAJg7uuhBj316 .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-QrBEAJg7uuhBj316 .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-QrBEAJg7uuhBj316 .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-QrBEAJg7uuhBj316 .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-QrBEAJg7uuhBj316 .cluster text{fill:#333;}#mermaid-svg-QrBEAJg7uuhBj316 .cluster span{color:#333;}#mermaid-svg-QrBEAJg7uuhBj316 div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-QrBEAJg7uuhBj316 :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 是 否 否 是 添加新任务 提交任务 核心线程 加入队列 提交任务 最大线程 创建救急线程 如果当前执行的任务数小于核心线程数则把任务加到队列等待空闲核心线程执行
否则判断当前执行的任务数是否小于最大线程数
如果小于最大线程数则直接创建救急线程执行任务解决了刚刚的问题必须加满队列才创建救急线程
否则加入队列中等待线程执行
7.3 Fork/Join
1概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现体现的是一种分治思想适用于能够进行任务拆分的 cpu 密集型运算 默认会创建与 cpu 核心数大小相同的线程池
任务拆分是将一个大任务拆分为算法上相同的小任务直至不能拆分可以直接求解。跟递归相关的一些计算如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程可以把每个任务的分解和合并交给不同的线程来完成进一步提升了运 算效率
2使用
提交给 Fork/Join 线程池的任务需要继承
RecursiveTask有返回值RecursiveAction没有返回值
例如下 面定义了一个对 1~n 之间的整数求和的任务
class AddTask1 extends RecursiveTask Integer {int n;public AddTask1(int n) {this.n n;}Overridepublic String toString() {return { n };}Overrideprotected Integer compute() {// 如果 n 已经为 1可以求得结果了if (n 1) {log.debug(join() {}, n);return n;}// 将任务进行拆分(fork)AddTask1 t1 new AddTask1(n - 1);t1.fork();log.debug(fork() {} {}, n, t1);// 合并(join)结果int result n t1.join();log.debug(join() {} {} {}, n, t1, result);return result;}public static void main(String[] args) {ForkJoinPool pool new ForkJoinPool(4);System.out.println(pool.invoke(new AddTask1(5)));}}结果当前任务依赖下一个任务
[ForkJoinPool-1-worker-0] - fork() 2 {1}
[ForkJoinPool-1-worker-1] - fork() 5 {4}
[ForkJoinPool-1-worker-0] - join() 1
[ForkJoinPool-1-worker-0] - join() 2 {1} 3
[ForkJoinPool-1-worker-2] - fork() 4 {3}
[ForkJoinPool-1-worker-3] - fork() 3 {2}
[ForkJoinPool-1-worker-3] - join() 3 {2} 6
[ForkJoinPool-1-worker-2] - join() 4 {3} 10
[ForkJoinPool-1-worker-1] - join() 5 {4} 15
15改进
class AddTask3 extends RecursiveTask Integer {int begin;int end;public AddTask3(int begin, int end) {this.begin begin;this.end end;}Overridepublic String toString() {return { begin , end };}Overrideprotected Integer compute() {// 5, 5if (begin end) {log.debug(join() {}, begin);return begin;}// 4, 5if (end - begin 1) {log.debug(join() {} {} {}, begin, end, end begin);return end begin;}// 1 5int mid (end begin) / 2; // 3AddTask3 t1 new AddTask3(begin, mid); // 1,3t1.fork();AddTask3 t2 new AddTask3(mid 1, end); // 4,5t2.fork();log.debug(fork() {} {} ?, t1, t2);int result t1.join() t2.join();log.debug(join() {} {} {}, t1, t2, result);return result;}public static void main(String[] args) {ForkJoinPool pool new ForkJoinPool(4);System.out.println(pool.invoke(new AddTask3(1, 10)));}}结果每次拆分两个任务分给两个线程执行因为只有4个线程所以t0执行了两个任务
[ForkJoinPool-1-worker-0] - join() 1 2 3
[ForkJoinPool-1-worker-3] - join() 4 5 9
[ForkJoinPool-1-worker-0] - join() 3
[ForkJoinPool-1-worker-1] - fork() {1,3} {4,5} ?
[ForkJoinPool-1-worker-2] - fork() {1,2} {3,3} ?
[ForkJoinPool-1-worker-2] - join() {1,2} {3,3} 6
[ForkJoinPool-1-worker-1] - join() {1,3} {4,5} 15
15 #mermaid-svg-CPNkGKbHXmVDvmTP {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-CPNkGKbHXmVDvmTP .error-icon{fill:#552222;}#mermaid-svg-CPNkGKbHXmVDvmTP .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-CPNkGKbHXmVDvmTP .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-CPNkGKbHXmVDvmTP .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-CPNkGKbHXmVDvmTP .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-CPNkGKbHXmVDvmTP .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-CPNkGKbHXmVDvmTP .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-CPNkGKbHXmVDvmTP .marker{fill:#333333;stroke:#333333;}#mermaid-svg-CPNkGKbHXmVDvmTP .marker.cross{stroke:#333333;}#mermaid-svg-CPNkGKbHXmVDvmTP svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-CPNkGKbHXmVDvmTP .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-CPNkGKbHXmVDvmTP .cluster-label text{fill:#333;}#mermaid-svg-CPNkGKbHXmVDvmTP .cluster-label span{color:#333;}#mermaid-svg-CPNkGKbHXmVDvmTP .label text,#mermaid-svg-CPNkGKbHXmVDvmTP span{fill:#333;color:#333;}#mermaid-svg-CPNkGKbHXmVDvmTP .node rect,#mermaid-svg-CPNkGKbHXmVDvmTP .node circle,#mermaid-svg-CPNkGKbHXmVDvmTP .node ellipse,#mermaid-svg-CPNkGKbHXmVDvmTP .node polygon,#mermaid-svg-CPNkGKbHXmVDvmTP .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-CPNkGKbHXmVDvmTP .node .label{text-align:center;}#mermaid-svg-CPNkGKbHXmVDvmTP .node.clickable{cursor:pointer;}#mermaid-svg-CPNkGKbHXmVDvmTP .arrowheadPath{fill:#333333;}#mermaid-svg-CPNkGKbHXmVDvmTP .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-CPNkGKbHXmVDvmTP .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-CPNkGKbHXmVDvmTP .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-CPNkGKbHXmVDvmTP .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-CPNkGKbHXmVDvmTP .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-CPNkGKbHXmVDvmTP .cluster text{fill:#333;}#mermaid-svg-CPNkGKbHXmVDvmTP .cluster span{color:#333;}#mermaid-svg-CPNkGKbHXmVDvmTP div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-CPNkGKbHXmVDvmTP :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} {4,5} {1,3} 9 {3,3} 3 {1,2} 3 6 t1{1,3}{4,5} t2{1,2}{3,3} t3 t0 模式之工作线程
1. 定义
让有限的工作线程Worker Thread来轮流异步处理无限多的任务。可以将其归类为分工模式它的典型实现 就是线程池也体现了经典设计模式中的享元模式。
注意不同任务类型应该使用不同的线程池这样能够避免饥饿并能提升效率
例如如果一个餐馆的工人既要招呼客人任务类型A又要到后厨做菜任务类型B显然效率不咋地分成 服务员线程池A与厨师线程池B更为合理
2.饥饿
固定大小线程池会有饥饿现象例如
两个工人是同一个线程池中的两个线程他们要做的事情是为客人点餐和到后厨做菜这是两个阶段的工作 客人点餐必须先点完餐等菜做好了上菜在此期间处理点餐的工人必须等待后厨做菜没啥说的做就是了 比如工人A 处理了点餐任务接下来它要等着 工人B 把菜做好然后上菜他俩也配合的蛮好但现在同时来了两个客人这个时候工人A 和工人B 都去处理点餐了两个工人都在等待厨师做菜可这时没人做菜了出现了饥饿现象
public class TestDeadLock {static final List String MENU Arrays.asList(宫保鸡丁, 辣子鸡丁, 烤鸡翅);static Random RANDOM new Random();// 随机做一个菜static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {// 固定两个核心线程ExecutorService executorService Executors.newFixedThreadPool(2);executorService.execute(() - {log.debug(处理点餐...);// 由另一个线程做菜Future String f executorService.submit(() - {log.debug(做菜);return cooking();});try {// 当前线程阻塞等待另一个线程做菜完毕log.debug(上菜: {}, f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});/*executorService.execute(() - {log.debug(处理点餐...);Future String f executorService.submit(() - {log.debug(做菜);return cooking();});try {log.debug(上菜: {}, f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});*/}
}输出
17:21:27.883 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:21:27.891 c.TestDeadLock [pool-1-thread-2] - 做菜
17:21:27.891 c.TestDeadLock [pool-1-thread-1] - 上菜: 烤鸡翅当注释取消后可能的输出两个线程都在等待其他线程做菜但是线程数只有两个它们一直阻塞等待
17:08:41.339 c.TestDeadLock [pool-1-thread-2] - 处理点餐...
17:08:41.339 c.TestDeadLock [pool-1-thread-1] - 处理点餐... 解决方法可以增加线程池的大小不过不是根本解决方案还是前面提到的不同的任务类型采用不同的线程 池例如
public class TestDeadLock {static final List String MENU Arrays.asList(宫保鸡丁, 辣子鸡丁, 烤鸡翅);static Random RANDOM new Random();static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {ExecutorService waiterPool Executors.newFixedThreadPool(1);ExecutorService cookPool Executors.newFixedThreadPool(1);waiterPool.execute(() - {log.debug(处理点餐...);Future String f cookPool.submit(() - {log.debug(做菜);return cooking();});try {log.debug(上菜: {}, f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});waiterPool.execute(() - {log.debug(处理点餐...);Future String f cookPool.submit(() - {log.debug(做菜);return cooking();});try {log.debug(上菜: {}, f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}
}输出
17:25:14.626 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.630 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.631 c.TestDeadLock [pool-1-thread-1] - 上菜: 烤鸡翅
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.632 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 上菜: 辣子鸡丁3.创建多少线程池合适
过小会导致程序不能充分地利用系统资源、容易导致饥饿过大会导致更多的线程上下文切换占用更多内存
3.1 CPU 密集型运算
通常采用 cpu 核数 1 能够实现最优的 CPU 利用率1 是保证当线程由于页缺失故障操作系统或其它原因 导致暂停时额外的这个线程就能顶上去保证 CPU 时钟周期不被浪费
3.2 I/O 密集型运算
CPU 不总是处于繁忙状态例如当你执行业务计算时这时候会使用 CPU 资源但当你执行 I/O 操作时、远程 RPC 调用时包括进行数据库操作时这时候 CPU处于空闲状态你可以利用多线程提高它的利用率。
经验公式如下
线程数 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间等待时间) / CPU 计算时间
例如 4 核 CPU 计算时间是 50% 其它等待时间是 50%期望 cpu 被 100% 利用套用公式
4 * 100% * 100% / 50% 8
例如 4 核 CPU 计算时间是 10% 其它等待时间是 90%期望 cpu 被 100% 利用套用公式
4 * 100% * 100% / 10% 40