做网站公司不给源代码,网站设计中下拉列表怎么做,凡科网建站系统源码,题库网站开发ConcurrentHashMap实现原理 目录 ConcurrentHashMap实现原理核心源码解读#xff08;1#xff09;数据结构: 采用数组链表/红黑树#xff08;2#xff09;初始化#xff08;3#xff09;并发扩容#xff08;4#xff09;put 操作流程#xff08;5#xff09;计数 siz…ConcurrentHashMap实现原理 目录 ConcurrentHashMap实现原理核心源码解读1数据结构: 采用数组链表/红黑树2初始化3并发扩容4put 操作流程5计数 size6get 操作7remove 操作8遍历操作 基于源码总结一些使用注意事项 主要特点 使用Node数组作为桶数组每个桶可能是一个链表或者红黑树。通过CAS和synchronized实现线程安全每个桶的头节点作为锁减小锁的粒度。扩容时支持多线程协同工作分片迁移数据。volatile变量保证内存可见性get操作无需加锁。使用计数器如baseCount和CounterCell来高效统计元素数量。
核心源码解读
1数据结构: 采用数组链表/红黑树
链表长度 ≥8 且数组长度 ≥64 时链表转为红黑树TREEIFY_THRESHOLD红黑树节点数 ≤6 时退化为链表UNTREEIFY_THRESHOLD)
提前看看几个使用较多的方法
// 计算 Node hash 哈希值将原始哈希码 key.hashCode() 的高低位混合减少哈希冲突
static final int spread(int h) {return (h ^ (h 16)) HASH_BITS; // 高位参与运算HASH_BITS 屏蔽负数标记
}// 哈希值定位数组索引使用 tabAt 方法保证内存可见性
static final K,V NodeK,V tabAt(NodeK,V[] tab, int i) {// 通过 Unsafe.getObjectVolatile 直接读取内存中的最新值无需加锁return (NodeK,V) U.getObjectVolatile(tab, ((long)i ASHIFT) ABASE);
}// 通过 CAS比较并交换操作原子性地更新哈希表中指定位置的节点。
static final K,V boolean casTabAt(NodeK,V[] tab, int i,NodeK,V c, NodeK,V v) {return U.compareAndSwapObject(tab, ((long)i ASHIFT) ABASE, c, v);
}特殊Node hash值hash 小于0表示扩容节点/红黑树
// (1) 扩容节点ForwardingNode若遇到 ForwardingNode哈希值 MOVED -1
static final int MOVED -1; // hash for forwarding nodes
// (2) 红黑树TreeBin若节点为 TreeBin哈希值 TREEBIN -2
static final int TREEBIN -2; // hash for roots of treessizeCtl 状态管理状态控制变量用于管理哈希表的初始化、扩容状态及扩容触发阈值
sizeCtl 0表示扩容阈值元素总数 sizeCtl 时触发扩容sizeCtl -1表示哈希表正在初始化sizeCtl -1表示正在扩容存储扩容标识resizeStamp低16位存储当前参与扩容的线程数
/*** The number of bits used for generation stamp in sizeCtl.
**/
private static int RESIZE_STAMP_BITS 16;/*** The bit shift for recording size stamp in sizeCtl.*/
private static final int RESIZE_STAMP_SHIFT 32 - RESIZE_STAMP_BITS;// 高16位存储标识
static final int resizeStamp(int n) {return Integer.numberOfLeadingZeros(n) | (1 (RESIZE_STAMP_BITS - 1));
}// 低16位存储协助线程数扩容时示例
U.compareAndSwapInt(this, SIZECTL, sc, (rs RESIZE_STAMP_SHIFT) 2)2初始化
延迟初始化首次调用 put 时通过 initTable() 初始化数组利用 sizeCtl 变量volatile控制状态负数表示正在初始化或扩容正数为扩容阈值。
// 默认构造器do nothing
public ConcurrentHashMap() {
}final V putVal(K key, V value, boolean onlyIfAbsent) {if (key null || value null) throw new NullPointerException();int hash spread(key.hashCode());int binCount 0;for (NodeK,V[] tab table;;) {NodeK,V f; int n, i, fh;if (tab null || (n tab.length) 0)// 初始化数组tab initTable();...
}private final NodeK,V[] initTable() {NodeK,V[] tab; int sc;while ((tab table) null || tab.length 0) {// sizeCtl小于0等待其他线程完成初始化if ((sc sizeCtl) 0)Thread.yield(); else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // CAS 竞争初始化权 try {if ((tab table) null || tab.length 0) {int n (sc 0) ? sc : DEFAULT_CAPACITY;// 创建 Node 数组NodeK,V[] nt (NodeK,V[])new Node?,?[n];table tab nt;sc n - (n 2); // 更新阈值}} finally {sizeCtl sc;}break;}}return tab;
}3并发扩容
通过 transfer() 方法迁移数据多线程协作处理不同桶区间。扩容期间查询操作通过 ForwardingNode find() 转发到新数组进行查询。
主要扩容场景
1addCount()更新元素总数时发现元素总数超过扩容阈值sizeCtl
2树化前单个哈希桶的链表长度 8但数组长度 64, 优先扩容
3插入元素时遇到 ForwardingNode协助扩容、数据迁移****
…
// putVal 会触发扩容 / 协助扩容
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key null || value null) throw new NullPointerException();int hash spread(key.hashCode());int binCount 0;for (NodeK,V[] tab table;;) {NodeK,V f; int n, i, fh;if (tab null || (n tab.length) 0)...else if ((f tabAt(tab, i (n - 1) hash)) null) {...}else if ((fh f.hash) MOVED)// MOVED 表示当前正在扩容当前线程协助迁移数据并扩容//3插入元素时遇到 ForwardingNode协助扩容、数据迁移tab helpTransfer(tab, f);else {V oldVal null;synchronized (f) {... put逻辑 ...}if (binCount ! 0) {if (binCount TREEIFY_THRESHOLD)// 2树化前单个哈希桶的链表长度 8但数组长度 64, 优先扩容treeifyBin(tab, i);...}} }// 增/减数量// 1addCount()更新元素总数时发现元素总数超过扩容阈值addCount(1L, binCount);return null;
}/**(1) addCount()更新元素总数时发现元素总数超过扩容阈值开始扩容param check : 是否需要检查扩容通常插入操作check0删除操作可能为-1
**/
private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as counterCells) ! null ||... 计数逻辑 ...}// 检查并触发扩容if (check 0) {NodeK,V[] tab, nt; int n, sc;while (s (long)(sc sizeCtl) //元素总数超过阈值sizeCtl,在上次扩容时确定 2n*0.75(tab table) ! null (n tab.length) MAXIMUM_CAPACITY) { // 未达最大容量// 生成扩容标识int rs resizeStamp(n) RESIZE_STAMP_SHIFT;// sizeCtl小于0已有其他线程在扩容if (sc 0) {// 检查扩容是否已完成或协助线程数已达上限避免过度竞争if (sc rs MAX_RESIZERS || // 协助线程数是否超限sc rs 1 || // 扩容已完成线程数归零(nt nextTable) null || transferIndex 0)break;// 尝试通过 CAS 增加协助线程数sizeCtl 1if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1))transfer(tab, nt);}// 当前无扩容尝试发起新扩容SIZECTL 低十六位存储扩容线程数初始设置为2 扩容线程数1 1后续扩容完成则是011对应上面 sc rs 1 判断扩容是否已完成else if (U.compareAndSwapInt(this, SIZECTL, sc, rs 2))transfer(tab, null);...}}
}/**2树化前单个哈希桶的链表长度 8但数组长度 64, 优先扩容
**/
private final void treeifyBin(NodeK,V[] tab, int index) {NodeK,V b; int n, sc;if (tab ! null) {// 单个哈希桶的链表长度 8但数组长度 64, 优先扩容if ((n tab.length) MIN_TREEIFY_CAPACITY)tryPresize(n 1);else if ((b tabAt(tab, index)) ! null b.hash 0) {... 树化 ...}}
}// 尝试扩容
private final void tryPresize(int size) {// 计算目标容量确保是2的幂int c (size (MAXIMUM_CAPACITY 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size (size 1) 1);int sc;while ((sc sizeCtl) 0) {NodeK,V[] tab table; int n;// 哈希表未初始化时先进行初始化if (tab null || (n tab.length) 0) {...}else if (c sc || n MAXIMUM_CAPACITY)break; // 目标容量 当前阈值 或 已达最大容量直接退出else if (tab table) { // 确认当前 table 未被其他线程替换int rs resizeStamp(n); // 生成扩容唯一标识与容量相关if (sc 0) { // sizeCtl小于0 已有其他线程在扩容NodeK,V[] nt; // 新表// 校验扩容是否可参与// 1. 扩容标识是否匹配防止不同容量扩容冲突// 2. 协助线程数是否超限sc rs 1// 3. nextTable 是否还在// 4. 是否还有待迁移的桶transferIndex 0if ((sc RESIZE_STAMP_SHIFT) ! rs || // 扩容标识是否匹配防止不同容量扩容冲突sc rs 1 || // 扩容已完成线程数归零sc rs MAX_RESIZERS || // 协助线程数是否超限(nt nextTable) null || // nextTable 是否还在transferIndex 0) // 是否还有待迁移的桶transferIndex 0break;// CAS操作 增加协助线程数成功后参与迁移if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1))transfer(tab, nt);}// 当前无扩容尝试发起新扩容SIZECTL 低十六位存储扩容线程数初始设置为2 扩容线程数1 1后续扩容完成则是011对应上面 sc rs 1 判断扩容是否已完成else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs RESIZE_STAMP_SHIFT) 2))// 启动迁移nextTable 由 transfer 初始化transfer(tab, null); }}
}/** 插入元素时遇到 ForwardingNode协助扩容、数据迁移
**/
final NodeK,V[] helpTransfer(NodeK,V[] tab, NodeK,V f) {NodeK,V[] nextTab; int sc;// 检查到当前表非空且当前节点为 ForwardingNode当前为迁移标记节点且 ForwardingNode 中提取新表 nextTable 不为空开始协助扩容迁移if (tab ! null (f instanceof ForwardingNode) (nextTab ((ForwardingNodeK,V)f).nextTable) ! null) {// 传入tab长度生成唯一标识当前扩容阶段的戳记避免不同扩容操作混淆用于区分不同扩容阶段int rs resizeStamp(tab.length) RESIZE_STAMP_SHIFT;// 新/旧数组未被替换扩容中sizeCtl 为负数while (nextTab nextTable table tab (sc sizeCtl) 0) {// 检查扩容是否已完成或协助线程数已达上限避免过度竞争if (sc rs MAX_RESIZERS || sc rs 1 ||transferIndex 0)break;// 尝试通过 CAS 增加协助线程数sizeCtl 1if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1)) {transfer(tab, nextTab); // 调用数据迁移方法break;}}return nextTab;}// 若不符合上述if判断表示没有扩容或者扩容已完成返回当前 table 即可return table;
}核心扩容方法
// 核心扩容逻辑扩容、迁移数据
private final void transfer(NodeK,V[] tab, NodeK,V[] nextTab) {int n tab.length, stride;// static final int NCPU Runtime.getRuntime().availableProcessors();// 计算每个线程处理的桶区间大小 stride最小为 MIN_TRANSFER_STRIDE16if ((stride (NCPU 1) ? (n 3) / NCPU : n) MIN_TRANSFER_STRIDE)stride MIN_TRANSFER_STRIDE; // subdivide range// 初始化新数组仅由第一个发起扩容的线程执行 if (nextTab null) { // initiatingtry {SuppressWarnings(unchecked)// 扩容为原来两倍NodeK,V[] nt (NodeK,V[])new Node?,?[n 1];nextTab nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl Integer.MAX_VALUE;return;}nextTable nextTab;// 迁移起始位置从最后一个桶开始transferIndex n;}// 新数组大小int nextn nextTab.length;// 创建 ForwardingNode 扩容ForwardingNodeK,V fwd new ForwardingNodeK,V(nextTab);boolean advance true; // 标记是否继续分配任务boolean finishing false; // 标记是否迁移完成// 开始分配任务并迁移数据for (int i 0, bound 0;;) {NodeK,V f; int fh;// ------------------------ 不同线程竞争桶区间分配任务 ------------------------while (advance) {int nextIndex, nextBound;// 当前区间未处理完 或 迁移已完成, 退出循环if (--i bound || finishing)advance false;// transferIndex 0 无剩余任务后续退出迁移else if ((nextIndex transferIndex) 0) { // nextIndex 从最后一个桶开始上述赋值transferIndex n// 标记i-1i -1;advance false;}// CAS竞争任务区间transferIndex从nextIndex更新为nextBoundelse if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,// 每个线程处理的桶区间大小 stride往前推进nextBound (nextIndex stride ?nextIndex - stride : 0))) {// 竞争区间 [bound, i] 处理这个区间的扩容迁移任务bound nextBound;i nextIndex - 1;advance false;}}// ------------------------ 迁移完成检查 ------------------------if (i 0 || i n || i n nextn) {int sc;// 最终完成, 更新全局变量if (finishing) { nextTable null; // 设置 nextTable 为空table nextTab; // 替换为新数组sizeCtl (n 1) - (n 1); // 新阈值2n * 0.75return;}// CAS减少协助线程数if (U.compareAndSwapInt(this, SIZECTL, sc sizeCtl, sc - 1)) {// 若自己是最后一个线程触发最终检查if ((sc - 2) ! resizeStamp(n) RESIZE_STAMP_SHIFT)return;// 设置 finishing为truefinishing advance true;i n; // recheck before commit}}// 对于空桶标记为已迁移else if ((f tabAt(tab, i)) null)advance casTabAt(tab, i, null, fwd);// 跳过已迁移节点else if ((fh f.hash) MOVED)advance true; // already processedelse {// ------------------------ 迁移 ------------------------synchronized (f) {// 锁定当前节点避免并发修改if (tabAt(tab, i) f) {// 二次校验防止并发修改NodeK,V ln, hn;// fh 0 处理链表节点if (fh 0) { // 通过 runBit 和 lastRun 快速分割链表避免逐个节点重新散列。// 这里为什么是 fh n? 详见下述解释int runBit fh n; //计算散列位0或n,判断低位ln还是扩容后的高位hnNodeK,V lastRun f;// 遍历链表找到最后一段连续相同散列位的节点主要目的是直接复用 lastRun 之后的节点减少新建节点开销for (NodeK,V p f.next; p ! null; p p.next) {int b p.hash n;if (b ! runBit) {runBit b;lastRun p;}}// lastRun 是低位元素if (runBit 0) {ln lastRun;hn null;}// lastRun 是高位元素else {hn lastRun;ln null;}// 迁移 lastRun 之前的节点到 扩容后的高位/原低位for (NodeK,V p f; p ! lastRun; p p.next) {int ph p.hash; K pk p.key; V pv p.val;// 低位迁移if ((ph n) 0)ln new NodeK,V(ph, pk, pv, ln);elsehn new NodeK,V(ph, pk, pv, hn);}setTabAt(nextTab, i, ln); // 原位isetTabAt(nextTab, i n, hn); // 偏移insetTabAt(tab, i, fwd); // 标记旧桶为已迁移advance true;}// 处理树节点逻辑类似需考虑树化或链表化else if (f instanceof TreeBin) {...}}}}}
}runBit 为什么是通过 int runBit fh n 来作为切割依据?
容量是2的幂时计算key的桶位置是用位操作即通过 hash (n-1) 确定的
假设原容量是n16二进制是10000n-11111。此时hash 1111得到的是0到15的位置。
扩容后的容量是32二进制是100000n-111111。新的位置是hash 11111也就是0到31。
原来的位置是 hash 1111而新的位置可能是原来的位置或者原来的位置16例如如果hash的第5位是1的话如10010则这个必然在高位。所以新的位置其实是原位置或者原位置加n。这时候只需要判断hash的某一位是否为1就能确定节点应该放在原位还是高位。具体来说这个位就是n对应的二进制位。
比如n16时二进制是10000所以检查hash的第5位是否为1如果是则新位置是原位置16否则保持原位。
ForwardingNode 内部类表示迁移节点可通过 nextTable访问新数组
static final class ForwardingNodeK,V extends NodeK,V {final NodeK,V[] nextTable; // 指向新表的引用// 需要指定扩容后的新数组 nextTableForwardingNode(NodeK,V[] tab) {super(MOVED, null, null, null);this.nextTable tab;}// 迁移节点查找访问新数组 nextTableNodeK,V find(int h, Object k) {// loop to avoid arbitrarily deep recursion on forwarding nodesouter: for (NodeK,V[] tab nextTable;;) {NodeK,V e; int n;if (k null || tab null || (n tab.length) 0 ||(e tabAt(tab, (n - 1) h)) null)return null;for (;;) {int eh; K ek;if ((eh e.hash) h ((ek e.key) k || (ek ! null k.equals(ek))))return e;if (eh 0) {if (e instanceof ForwardingNode) {tab ((ForwardingNodeK,V)e).nextTable;continue outer;}elsereturn e.find(h, k);}if ((e e.next) null)return null;}}}
}4put 操作流程
主要流程
1通过 spread 通过原始hash code 计算哈希 hash让高位16位也参与计算确保哈希均匀分布
2根据Hash查找对应的桶位置 (n - 1) hash, 若没有冲突直接插入新节点new NodeK,V(hash, key, value, null);
3-1若当前桶位置发生Hash冲突且fh 0表示为链表遍历链表插入/更新
3-2若当前桶位置发生Hash冲突且Node为红黑树调用红黑树插入方法
4判断链表长度是否大于 TREEIFY_THRESHOLD 8执行扩容或者链表树化
5统计元素总数并且检查是否超过阈值需要扩容
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key null || value null) throw new NullPointerException();// (1) 计算哈希, 确保哈希均匀分布int hash spread(key.hashCode());int binCount 0;for (NodeK,V[] tab table;;) {NodeK,V f; int n, i, fh;if (tab null || (n tab.length) 0)...// (2) 定位桶位(n - 1) hash 确定数组索引else if ((f tabAt(tab, i (n - 1) hash)) null) {// CAS 插入若桶为空通过 casTabAt 原子插入新节点if (casTabAt(tab, i, null,new NodeK,V(hash, key, value, null)))break;}else if ((fh f.hash) MOVED)...else {V oldVal null;// (3) 同步锁处理冲突// 若桶非空使用 synchronized 锁定头节点遍历链表/红黑树插入或更新值synchronized (f) {if (tabAt(tab, i) f) {if (fh 0) { // (3-1) 链表处理binCount 1;// 遍历链表插入/更新for (NodeK,V e f;; binCount) {K ek;// 查找到了key直接设置返回if (e.hash hash ((ek e.key) key ||(ek ! null key.equals(ek)))) {oldVal e.val;if (!onlyIfAbsent)e.val value;break;}// 哈希冲突 - 继续遍历链表查找NodeK,V pred e;if ((e e.next) null) {pred.next new NodeK,V(hash, key,value, null);break;}}}else if (f instanceof TreeBin) { // (3-2) 调用红黑树插入方法NodeK,V p;binCount 2;if ((p ((TreeBinK,V)f).putTreeVal(hash, key,value)) ! null) {oldVal p.val;if (!onlyIfAbsent)p.val value;}}}}if (binCount ! 0) {if (binCount TREEIFY_THRESHOLD)// 树化treeifyBin(tab, i);if (oldVal ! null)// put 返回旧值return oldVal;break;}}}// 增/减总数统计addCount(1L, binCount);return null;
}private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as counterCells) ! null ||// 尝试无竞争更新 baseCount!U.compareAndSwapLong(this, BASECOUNT, b baseCount, s b x)) {// 若 counterCells 已初始化或 CAS更新 baseCount 失败, 都说明已存在竞争CounterCell a; long v; int m;boolean uncontended true;if (as null || (m as.length - 1) 0 || // CounterCell数组未初始化(a as[ThreadLocalRandom.getProbe() m]) null || // 当前线程的Cell未分配ThreadLocalRandom.getProbe()获取当前线程的哈希码用于在 CounterCell 数组中选择一个槽位减少不同线程竞争同一Cell的概率!(uncontended U.compareAndSwapLong(a, CELLVALUE, v a.value, v x))) {// CAS更新Cell值失败// 初始化Cell、扩容Cell数组等fullAddCount(x, uncontended);return;}if (check 1)return;s sumCount();}if (check 0) {... 检查并扩容逻辑 ...s sumCount();...}
}baseCount 是基础的计数器变量但在高并发下频繁 CAS 更新会导致性能问题可能导致U.compareAndSwapLong(this, BASECOUNT, b baseCount, s b x) 频繁失败因此引入 CounterCell 数组分散线程竞争。
例如在jdk8的时候是有引入一个类Striped64其中LongAdder和DoubleAdder就是对这个类的实现。这两个方法都是为解决高并发场景而生的。是AtomicLong的加强版AtomicLong在高并发场景性能会比LongAdder差。但是LongAdder的空间复杂度会高点
ConcurrentHashMap 高并发下更新元素计数 的核心方法 fullAddCount 借鉴了 LongAdder 的分段计数思想避免所有线程竞争同一变量分散到不同 CounterCell 槽位减少 CAS 冲突。主要流程
1未初始化counterCells数组cas加锁初始化数组并插入新的 CounterCellx
2-1已初始化若线程probe对应槽位上为空cas加锁插入新的 CounterCellx**
2-2已初始化若线程probe对应槽位上不为空cas加锁更新 CounterCellx 计数
3若2-2更新失败表示存在冲突 collidetrue翻倍扩容数组最大容量为 NCPU与CPU核心数对齐
4兜底策略 - 当 CounterCell 初始化或扩容失败时回退到无锁更新 baseCount
// 设计借鉴了 LongAdder 的分段计数思想通过分散竞争来优化性能
private final void fullAddCount(long x, boolean wasUncontended) {int h; // 当前线程的 probe哈希值用于定位 CounterCell 数组的槽位减少竞争// probe未初始化强制初始化if ((h ThreadLocalRandom.getProbe()) 0) {ThreadLocalRandom.localInit(); // force initializationh ThreadLocalRandom.getProbe();wasUncontended true;}boolean collide false; // 标记槽位是否冲突是否需要扩容for (;;) {CounterCell[] as; CounterCell a; int n; long v;// counterCells数组已初始化if ((as counterCells) ! null (n as.length) 0) {// (1) CounterCell数组对应位置槽位为空尝试创建新Cell(初始值为x)并插入数组if ((a as[(n - 1) h]) null) { // 当前槽位为空if (cellsBusy 0) { // cellsBusy 0 无其他线程在修改数组// 创建CounterCellCounterCell r new CounterCell(x); if (cellsBusy 0 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // CAS加锁 CELLSBUSYboolean created false;try { // Recheck under lockCounterCell[] rs; int m, j;if ((rs counterCells) ! null (m rs.length) 0 rs[j (m - 1) h] null) {rs[j] r; // 插入新CounterCell created true;}} finally {cellsBusy 0; // 释放锁 CELLSBUSY}if (created)break;continue; // Slot is now non-empty}}collide false;}else if (!wasUncontended) // CAS already known to failwasUncontended true; // Continue after rehashelse if (U.compareAndSwapLong(a, CELLVALUE, v a.value, v x)) // CAS更新Cell值xbreak;else if (counterCells ! as || n NCPU) // 数组已扩容或达到数量上限collide false; // 无需继续扩容 else if (!collide)collide true; // 上面cas更新 CELLVALUE 失败标记冲突下次循环可能触发扩容else if (cellsBusy 0 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // CAS加锁扩容try {if (counterCells as) {// 数组未被其他线程修改CounterCell[] rs new CounterCell[n 1];// 容量翻倍for (int i 0; i n; i) // 逐个复制旧元素rs[i] as[i];counterCells rs;// 更新新的CounterCell数组}} finally {cellsBusy 0;// 释放锁 CELLSBUSY}collide false;// 重置冲突标志continue; // Retry with expanded table}h ThreadLocalRandom.advanceProbe(h); // 更新线程哈希值 probe减少后续冲突}// counterCells数组未初始化初始化CounterCell并插入else if (cellsBusy 0 counterCells as U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {// cas加锁boolean init false;try { // Initialize tableif (counterCells as) {CounterCell[] rs new CounterCell[2];// 初始容量为2rs[h 1] new CounterCell(x);counterCells rs;init true;}} finally {cellsBusy 0;}if (init)break;}// 兜底策略当 CounterCell 初始化或扩容失败时回退到无锁更新 baseCountelse if (U.compareAndSwapLong(this, BASECOUNT, v baseCount, v x))break; // Fall back on using base}
}5计数 size
将全局计数baseCount和分片计数CounterCell 数组结合。
/**返回 int 类型最大值为 Integer.MAX_VALUE
**/
public int size() {long n sumCount();return ((n 0L) ? 0 :(n (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}/**ConcurrentHashMap-only methods支持返回 long 类型推荐优先使用
**/
public long mappingCount() {long n sumCount();return (n 0L) ? 0L : n; // ignore transient negative values
}/*** 遍历 CounterCell 数组累加所有单元格的值到 baseCount得到当前总元素数 s
**/
final long sumCount() {CounterCell[] as counterCells; CounterCell a;long sum baseCount;if (as ! null) {for (int i 0; i as.length; i) {if ((a as[i]) ! null)sum a.value;}}return sum;
}6get 操作
无锁读取依赖 volatile 修饰的 Node.val 和 Node.next 保证可见性扩容兼容性若遇到 ForwardingNode通过其 find() 方法在新数组中查找数据
public V get(Object key) {NodeK,V[] tab; NodeK,V e, p; int n, eh; K ek;int h spread(key.hashCode());if ((tab table) ! null (n tab.length) 0 (e tabAt(tab, (n - 1) h)) ! null) {if ((eh e.hash) h) {// 头节点匹配直接返回if ((ek e.key) key || (ek ! null key.equals(ek)))// 无锁读取return e.val;}// 处理特殊节点如红黑树或 ForwardingNodeelse if (eh 0)// 调用红黑树或扩容节点的查找逻辑return (p e.find(h, key)) ! null ? p.val : null;// 头节点不匹配链表读取while ((e e.next) ! null) {if (e.hash h ((ek e.key) key || (ek ! null key.equals(ek))))// 命中链表中的某个节点return e.val;}}return null;
}7remove 操作
主要流程
1通过 spread 通过原始hash code 计算哈希 hash让高位16位也参与计算确保哈希均匀分布
2根据Hash查找对应的桶位置 (n - 1) hash, 若没有返回 null
3若当前桶位置正在扩容协助迁移数据helpTransfer后重试
4加锁处理链表/树匹配键值删除节点
5更新元素总数-1
public V remove(Object key) {return replaceNode(key, null, null);
}/*** Implementation for the four public remove/replace methods:* Replaces node value with v, conditional upon match of cv if* non-null. If resulting value is null, delete.* cv: 非 null 时只有旧值匹配 cv 时才操作*/
final V replaceNode(Object key, V value, Object cv) {int hash spread(key.hashCode());for (NodeK,V[] tab table;;) {NodeK,V f; int n, i, fh;// 若表未初始化或桶为空直接退出if (tab null || (n tab.length) 0 ||(f tabAt(tab, i (n - 1) hash)) null)break;// 若桶正在扩容MOVED状态协助扩容后重试else if ((fh f.hash) MOVED)tab helpTransfer(tab, f);else {V oldVal null;boolean validated false;// 加锁处理链表或树synchronized (f) {if (tabAt(tab, i) f) {// 链表处理if (fh 0) {validated true;for (NodeK,V e f, pred null;;) {K ek;// 匹配键值if (e.hash hash ((ek e.key) key ||(ek ! null key.equals(ek)))) {V ev e.val;// cv若非空判断cv是否匹配if (cv null || cv ev ||(ev ! null cv.equals(ev))) {oldVal ev;if (value ! null)e.val value;// 替换值else if (pred ! null)pred.next e.next;// 删除中间节点elsesetTabAt(tab, i, e.next);// 删除头节点}break;}pred e;if ((e e.next) null)break;}}// 树处理逻辑else if (f instanceof TreeBin) {validated true;TreeBinK,V t (TreeBinK,V)f;TreeNodeK,V r, p;if ((r t.root) ! null (p r.findTreeNode(hash, key, null)) ! null) {V pv p.val;if (cv null || cv pv ||(pv ! null cv.equals(pv))) {oldVal pv;if (value ! null)p.val value;else if (t.removeTreeNode(p))setTabAt(tab, i, untreeify(t.first));}}}}}// 操作处理完成后if (validated) {if (oldVal ! null) {if (value null)addCount(-1L, -1); // 若是删除更新元素计数-1return oldVal;}break;}}}return null;
}8遍历操作
以遍历键值对 entrySet 举例keySet() 与 values() 同理
public SetMap.EntryK,V entrySet() {EntrySetViewK,V es;return (es entrySet) ! null ? es : (entrySet new EntrySetViewK,V(this));
}EntrySetView 的迭代器EntryIterator是弱一致性的允许遍历过程中其他线程并发修改数据不会抛出 ConcurrentModificationException。
弱一致性体现Traverser 在初始化或恢复状态时保存的是遍历开始时的哈希表引用tab。即使其他线程触发了扩容生成新表更新了table迭代器仍可能继续遍历旧表的部分数据直到遇到 ForwardingNode 才会切换到新表。这意味着遍历结果可能是新旧表的混合视图。
static final class EntrySetViewK,V extends CollectionViewK,V,Map.EntryK,Vimplements SetMap.EntryK,V, java.io.Serializable {private static final long serialVersionUID 2249069246763182397L;// 初始化会保存ConcurrentHashMap的引用EntrySetView(ConcurrentHashMapK,V map) { super(map); }.../*** return an iterator over the entries of the backing map*/public IteratorMap.EntryK,V iterator() {ConcurrentHashMapK,V m map;NodeK,V[] t;int f (t m.table) null ? 0 : t.length;return new EntryIteratorK,V(t, f, 0, f, m);}...
}迭代器实现
// entry迭代器继承自 EntryIterator - BaseIterator - Traverser
static final class EntryIteratorK,V extends BaseIteratorK,Vimplements IteratorMap.EntryK,V {EntryIterator(NodeK,V[] tab, int index, int size, int limit,ConcurrentHashMapK,V map) {super(tab, index, size, limit, map);}/**next 实现**/public final Map.EntryK,V next() {NodeK,V p;if ((p next) null)throw new NoSuchElementException();K k p.key;V v p.val;lastReturned p;advance();return new MapEntryK,V(k, v, map);}
}// 进入父类 Traverser, Traverser 是用于 并发安全遍历哈希表 的核心内部类实现了推进数组遍历的方法 advance
static class TraverserK,V {NodeK,V[] tab; // 当前遍历的哈希表可能在遍历过程中切换到扩容后的新表NodeK,V next; // 当前遍历到的节点用于链式推进。TableStackK,V stack, spare; // stack 保存遍历状态的栈用于处理 ForwardingNode 跳转到新表的情况int index; // 当前遍历的桶索引动态调整可能跨新旧表int baseIndex; // 初始表的起始索引int baseLimit; // 初始表的结束索引final int baseSize; // 初始表的容量Traverser(NodeK,V[] tab, int size, int index, int limit) {this.tab tab;this.baseSize size;this.baseIndex this.index index;this.baseLimit limit;this.next null;}/*** 推进遍历1若当前节点是链表或树节点直接移动到下一个节点。2若遇到 ForwardingNode切换到新表并保存旧表状态到栈。3恢复状态当新表遍历到边界时弹出栈顶状态恢复旧表的遍历。4跨段遍历通过 index baseSize 实现逻辑分片遍历避免遗漏旧表数据。*/final NodeK,V advance() {NodeK,V e;if ((e next) ! null)e e.next; // 移动到链表/树的下一个节点for (;;) {NodeK,V[] t; int i, n; // must use locals in checksif (e ! null)return next e; // 更新next为当前节点并返回// 遍历完成或表为空,返回nulif (baseIndex baseLimit || (t tab) null ||(n t.length) (i index) || i 0)return next null;// hash小于0处理特殊节点ForwardingNode / TreeBinif ((e tabAt(t, i)) ! null e.hash 0) {if (e instanceof ForwardingNode) {tab ((ForwardingNodeK,V)e).nextTable;// tab 切换到新表e null;pushState(t, i, n); // 保存当前表的状态到栈continue;// 重新循环处理新表}else if (e instanceof TreeBin)e ((TreeBinK,V)e).first;// 获取树的第一个节点elsee null;}if (stack ! null)recoverState(n); // 弹出栈顶状态恢复旧表遍历else if ((index i baseSize) n)index baseIndex; // visit upper slots if present}}/*** 遇到 ForwardingNode 时将当前表的遍历状态表引用、索引、容量压入栈以便后续恢复*/private void pushState(NodeK,V[] t, int i, int n) {TableStackK,V s spare; // reuse if possibleif (s ! null)spare s.next;elses new TableStackK,V();s.tab t;// 旧数组s.length n;// 旧数组长度s.index i;// 当前遍历的索引s.next stack;stack s;// 更新栈顶}/*** 当新表遍历完成后从栈中弹出旧表状态恢复索引和表引用继续遍历旧表的剩余部分** param n length of current table*/private void recoverState(int n) {TableStackK,V s; int len;while ((s stack) ! null (index (len s.length)) n) {n len;index s.index;tab s.tab;s.tab null;TableStackK,V next s.next;s.next spare; // save for reusestack next;spare s;}if (s null (index baseSize) n)index baseIndex;}
}基于源码总结一些使用注意事项
1ConcurrentHashMap 明确禁止 null 键和值使用时会直接抛出 NullPointerException
2size()返回 int可能溢出当键值对超过 Integer.MAX_VALUE 时推荐优先使用mappingCount()。注意计数均为近似值高并发情况下不保证绝对精确
3根据场景调整好初始容量和负载因子避免频繁扩容触发 transfer 方法重组数据、扩容期间会产生更多CPU时间片占用以及内存占用
4ConcurrentHashMap 是弱一致性的迭代器**** 迭代器不保证实时反映所有并发修改并发修改也不会抛出异常因此最终迭代的结果可能包含了新旧数据两部分。