JUC并发—8.并发安全集合一

大纲

1.JDK 1.7的HashMap的死循环与数据丢失

2.ConcurrentHashMap的并发安全

3.ConcurrentHashMap的设计介绍

4.ConcurrentHashMap的put操作流程

5.ConcurrentHashMap的Node数组初始化

6.ConcurrentHashMap对Hash冲突的处理

7.ConcurrentHashMap的并发扩容机制

8.ConcurrentHashMap的分段锁统计元素数据

9.ConcurrentHashMap的查询操作是否涉及锁

10.ConcurrentHashMap中红黑树的使用

1.JDK 1.7的HashMap的死循环与数据丢失

(1)JDK 1.7的HashMap工作原理

(2)JDK 1.7的HashMap并发下导致的环形链表

(3)环形链表引发的死循环与数据丢失

(4)JDK 1.7和JDK 1.8的HashMap对比

(5)并发安全的集合

(1)JDK 1.7的HashMap工作原理

一.Hash算法

put(key, value) => 对key执行Hash算法 => 根据Hash值用类似取模的算法 => 定位数组的某一个元素 => 如果数组元素为空,则将value存放在该数组元素里。

二.Hash冲突

如果两个key的Hash值,经过取模算法定位到数组的同一个位置,此时就会用链表处理这种Hash冲突。

三.数组扩容

如果数组元素达到了:数组大小 * loadFactor(0.75),此时就会数组扩容。扩容时会按照倍数扩容,首先创建一个两倍大小的新数组。然后遍历原来的数组元素,对每个元素的key值进行Hash运算。接着将Hash运算后的Hash值对新数组大小进行取模,定位到新数组位置。

(2)JDK 1.7的HashMap并发下导致的环形链表

多线程并发操作HashMap时,可能会在扩容过程中形成一个环形链表。比如两个线程同时插入一个元素,而此时恰好两个线程同时触发了数组扩容。那么在数组扩容的过程中,就可能会形成一个环形链表。

下面是JDK1.7中HashMap扩容的核心源码。进行数组扩容时,会使用头插法来进行链表迁移。如果并发执行的两个线程同时使用头插法进行链表迁移,那么就有可能形成一个环形链表。

//JDK1.7的HashMap扩容的核心方法
void transfer(Entry[] newTable) {Entry[] src = table;//旧的数组int newCapacity = newTable.length;for (int j = 0; j < src.length; j++) {Entry<K,V> e = src[j];if (e != null) {src[j] = null;do {Entry<K,V> next = e.next;//线程1执行到这里,假设此时的链表为:newTable[i] = <k1,v1> -> <k2,v2>//那么可知:e = <k1,v1>,next = <k2,v2>//恰好此时CPU发生了上下文切换,于是切换到线程2去执行扩容//线程2扩容时处理完链表的这两个节点后,newTable[i]就变成了:<k2,v2> -> <k1,v1>//然后CPU又切换回线程1来执行,由于此时e = <k1,v1>,那么后续代码对e.next赋值后,e就成为环形链表了://也就是e = <k1,v1> -> <k2,v2> -> <k1,v1>,最后e又赋值给newTable[i]int i = indexFor(e.hash, newCapacity);//在新数组的位置//头插法:刚开始newTable[i]为null,后来newTable[i]变为<k1,v1>;//然后当e=<k2,v2>时,这里e设为<k2,v2> -> <k1,v1>,并又赋值给newTable[i]//接着遍历链表当e=<k3,v3>时,这里e设为<k3,v3> -> <k2,v2> -> <k1,v1> ...e.next = newTable[i];newTable[i] = e;e = next;} while (e != null);}}
}

(3)环形链表引发的死循环与数据丢失

一.环形链表导致死循环

假如执行get(k3)时,k3的Hash取模算法定位到环形链表的位置。于是开始遍历环形链表,但由于环形链表里没有k3的值,所以会导致在环形链表中无法找到k3对应的值进行返回。这样就导致了一直在环形链表中进行死循环,无法退出遍历。最后导致CPU飙升,线上系统被这个get操作卡死。

二.环形链表导致丢失数据

上面例子就导致了从出发是无法找到的,因此这条数据就永久丢失了,甚至会被垃圾回收掉。

(4)JDK 1.7和JDK 1.8的HashMap对比

在JDK1.7中,HashMap采用数组 + 链表的数据结构来存储数据。在多个线程并发扩容时,可能会造成环形链表最终导致死循环和数据丢失。

在JDK1.8中,HashMap采用数组 + 链表 + 红黑树的数据结构来存储数据,并且优化了JDK1.7中的数组扩容方案,解决了死循环和数据丢失的问题。但是在并发场景下调用put()方法时,有可能会存在数据覆盖的问题。

(5)并发安全的集合

比如HashTable使用synchronized来保证线程的安全性,比如Collections.synchronizedMap可以把一个线程不安全的Map,通过synchronized的方式,将其变成安全的。

但是这些方法在线程竞争激烈的情况下,效率都比较低。因为它们都是在方法层面上使用了synchronized实现的锁机制,从而导致不管是put操作还是get操作都需要去竞争同一把锁。

ConcurrentHashMap既能保证并发安全,性能也好于HashTable等集合。

2.ConcurrentHashMap的并发安全

(1)如何理解ConcurrentHashMap的并发安全

(2)ConcurrentHashMap在复合操作中的安全问题

(3)ConcurrentMap可解决复合操作的安全问题

(4)ConcurrentMap支持lambda表达式操作

(1)如何理解ConcurrentHashMap的并发安全

只能保证多线程并发执行时,容器中的数据不会被破坏。无法保证涉及多个线程的复合操作的正确性,复合操作会有并发安全问题。

(2)ConcurrentHashMap在复合操作中的安全问题

假设需要通过一个ConcurrentHashMap来记录每个用户的访问次数。如果指定用户已经有访问次数的记录,则进行递增,否则添加新访问记录。

如下代码在多线程并发调用时,会存在并发安全问题。虽然ConcurrentHashMap对于数据操作本身是安全的,但这里是复合操作,也就是"读—修改—写",而这三个操作作为一个整体却不是原子的。所以当多个线程访问同一个用户时,很可能会覆盖相互操作的结果,从而造成该用户的访问记录次数少于实际访问次数。

public class Demo {private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);public static void main(String[] args) throws InterruptedException {Long accessCount = USER_ACCESS_COUNT.get("张三");if (accessCount == null) {USER_ACCESS_COUNT.put("张三", 1L);} else {USER_ACCESS_COUNT.put("张三", accessCount + 1);}}
}

(3)ConcurrentMap可解决复合操作的安全问题

虽然ConcurrentHashMap是并发安全的,但对于其复合操作需要特别关注。上述复合操作的安全问题的解决方案是,可以对复合操作加锁,也可以使用ConcurrentMap接口来解决复合操作的安全问题。

ConcurrentMap是一个支持并发访问的Map集合,相当于在原本的Map集合上新增了一些方法来扩展Map的功能。

ConcurrentMap接口定义的如下4个方法,都能满足原子性的,可以用在ConcurrentHashMap的复合操作场景中。

//A java.util.Map providing thread safety and atomicity guarantees.
public interface ConcurrentMap<K, V> extends Map<K, V> {...//向ConcurrentHashMap集合插入数据//如果插入数据的key不存在于集合中,则保存当前数据并且返回null//如果key已经存在,则返回存在的key对应的valueV putIfAbsent(K key, V value);//根据key和value来删除ConcurrentHashMap集合中的元素//该删除操作必须保证key和value完全匹配,删除成功则返回true,否则返回falseboolean remove(Object key, Object value);//根据key和oldValue来替换ConcurrentHashMap中已经存在的值,新的值是newValue//该替换操作必须保证key和oldValue玩去匹配,替换成功则返回true,否则返回falseboolean replace(K key, V oldValue, V newValue);//和replace(key, oldValue, newValue)不同之处在于,少了对oldValue的判断//如果替换成功,则返回替换之前的value,否则返回nullV replace(K key, V value);...
}

因此,可以基于ConcurrentMap提供的接口对上述Demo进行改造。将原来ConcurrentHashMap第一次的put()方法替换为putIfAbsent()方法,将原来ConcurrentHashMap修改用的put()方法替换为replace()方法。由于putIfAbsent()方法和replace()方法都能保证原子性,所以并发安全了。同时增加一个while(true)方法以实现一个类似自旋的操作,确保操作成功。

public class KeyUtil {private static final ConcurrentHashMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);public static void main(String[] args) throws InterruptedException {while (true) {Long accessCount = USER_ACCESS_COUNT.get("张三");if (accessCount == null) {if (USER_ACCESS_COUNT.putIfAbsent("张三", 1L) == null) {break;}} else {if (USER_ACCESS_COUNT.replace("张三", accessCount, accessCount + 1)) {break;}}}}
}

(4)ConcurrentMap支持lambda表达式操作

一.computeIfAbsent()方法

该方法通过判断传入的key是否存在来对ConcurrentMap进行数据初始化。如果key存在,则不做任何处理。如果key不存在,则调用mappingFunction计算出value值添加到Map。

//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1
USER_ACCESS_COUNT.computeIfAbsent("张三", k -> 1L);

二.computeIfPresent()方法

该方法对已经存在的key对应的value值进行修改。如果key不存在,则返回null。如果key存在,则调用mappingFunction计算出value值修改Map。

//如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码:
USER_ACCESS_COUNT.computeIfPresent("张三", (k,v) -> v + 1);

三.compute()方法

compute()方法是computeIfAbsent()方法和computeIfPresent()方法的结合体。不管key是否存在,都会调用mappingFunction计算出value值。如果key存在,则对value进行修改。如果key不存在,则进行初始化处理。

//如果张三这个用户不存在,则下面代码会初始化张三这个用户的值为1
USER_ACCESS_COUNT.computeIfAbsent("张三", k -> 1L);//如果要对张三这个已经存在的用户的value值进行修改,可以使用如下代码:
USER_ACCESS_COUNT.computeIfPresent("张三", (k,v) -> v + 1);//如果张三这个用户存在,则对其value加1,否则初始化其值为1
USER_ACCESS_COUNT.compute("张三", (k,v) -> (v == null) ? 1L : v + 1);

3.ConcurrentHashMap的设计介绍

(1)JDK1.8相比于JDK1.7的改进

(2)ConcurrentHashMap的设计思想

(3)ConcurrentHashMap的数据结构定义

(1)JDK1.8相比于JDK1.7的改进

一.取消了segment分段设计,直接使用Node数组来保存数据

采用Node数组元素作为锁的粒度,进一步减少并发冲突的范围和概率。

二.引入红黑树设计

红黑树降低了极端情况下查询某个结点数据的时间复杂度,从O(n)降低到了O(logn),提升了查找性能。

(2)ConcurrentHashMap的设计思想

一.通过对数组元素加锁来降低锁的粒度

二.多线程进行并发扩容

三.高低位迁移方法

四.链表转红黑树及红黑树转链表

五.分段锁来实现数据统计

(3)ConcurrentHashMap的数据结构定义

ConcurrentHashMap采用Node数组来存储数据,数组长度默认为16。Node表示数组中的一个具体的数据结点,并且实现了Map.Entry接口。Node的key和val属性,表示实际存储的key和value。Node的hash属性,表示当前key对应的hash值。Node的next属性,表示如果是链表结构,则指向下一个Node结点。

当链表长度大于等于8 + Node数组长度大于64时,链表会转为红黑树,红黑树的存储使用TreeNode来实现。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { ...//The default initial table capacity.//Must be a power of 2 (i.e., at least 1) and at most MAXIMUM_CAPACITY.private static final int DEFAULT_CAPACITY = 16;//The array of bins. Lazily initialized upon first insertion.//Size is always a power of two. Accessed directly by iterators.transient volatile Node<K,V>[] table;//用来存储ConcurrentHashMap数据的Node数组//Key-value entry.  //This class is never exported out as a user-mutable Map.Entry //(i.e., one supporting setValue; see MapEntry below), //but can be used for read-only traversals used in bulk tasks.//Subclasses of Node with a negative hash field are special, //and contain null keys and values (but are never exported).  //Otherwise, keys and vals are never null.static class Node<K,V> implements Map.Entry<K,V> {final int hash;//当前key对应的hash值final K key;//实际存储的keyvolatile V val;//实际存储的valuevolatile Node<K,V> next;//如果是链表结构,则指向下一个Node结点Node(int hash, K key, V val, Node<K,V> next) {this.hash = hash;this.key = key;this.val = val;this.next = next;}...}//Nodes for use in TreeBinsstatic final class TreeNode<K,V> extends Node<K,V> {TreeNode<K,V> parent;//red-black tree linksTreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev;//needed to unlink next upon deletionboolean red;TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) {super(hash, key, val, next);this.parent = parent;}...}...
}

4.ConcurrentHashMap的put操作流程

(1)ConcurrentHashMap的put操作流程

(2)ConcurrentHashMap和HashMap的put操作

(3)为什么ConcurrentHashMap是并发安全的

(1)ConcurrentHashMap的put操作流程

首先通过key的hashCode的高低16位的位与操作来计算key的hash值,让32位的hashCode都参与运算以降低数组大小小于32时哈希冲突的概率。

然后判断Node数组是否为空或者Node数组的长度是否为0。如果为空或者为0,则调用initTable()方法进行初始化。如果不为空,则通过hash & (n - 1)计算当前key在Node数组中的下标位置。并通过tabAt()方法获取该位置的值f,然后判断该位置的值f是否为空。

如果该位置的值f为空,则把当前的key和value封装成Node对象。然后尝试通过casTabAt()方法使用CAS设置该位置的值f为封装好的Node对象。如果CAS设置成功,则退出for循环,否则继续进行下一次for循环。

如果该位置的值f不为空,则判断Node数组是否正处于扩容中。如果是,那么当前线程就调用helpTransfer()方法进行并发扩容。如果不是,那么说明当前的key在Node数组中出现了Hash冲突。于是通过synchronized关键字,对该位置的值f进行Hash冲突处理。其实JUC还可以继续优化,比如先用CAS尝试修改哈希冲突下的链表或红黑树。如果CAS修改失败,那么再通过使用synchronized对该数组元素加锁来进行处理。

最后,会调用addCount()方法统计Node数组中的元素个数。

public class ConcurrentHashMapDemo {public static void main(String[] args) {ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>();map.put("k1", "v1");}
}public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { ...//The array of bins. Lazily initialized upon first insertion.//Size is always a power of two. Accessed directly by iterators.transient volatile Node<K,V>[] table;//Creates a new, empty map with the default initial table size (16).public ConcurrentHashMap() {}//Creates a new, empty map with an initial table size //accommodating the specified number of elements without the need to dynamically resize.//@param initialCapacity The implementation performs internal sizing to accommodate this many elements.public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0) {throw new IllegalArgumentException();}int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap;}//Returns a power of two table size for the given desired capacity.private static final int tableSizeFor(int c) {int n = c - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;}static final int spread(int h) {//通过hashCode的高低16位的异或运算来计算hash值,以降低数组大小比32小的时候的哈希冲突概率return (h ^ (h >>> 16)) & HASH_BITS;}//Maps the specified key to the specified value in this table.//Neither the key nor the value can be null.public V put(K key, V value) {return putVal(key, value, false);}//获取Node数组在位置i的元素,通过Unsafe类让数组中的元素具有可见性//虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的//因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}//CAS设置Node数组的元素为某个Node对象static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);}final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) {throw new NullPointerException();}//通过key的hashCode的高低16位的位与操作来计算hash值int hash = spread(key.hashCode());int binCount = 0;//这是一个没有结束条件的for循环,用来自旋//其中Node数组的引用赋值给了tab变量for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0) {//调用initTable()方法初始化Node数组tab = initTable();} else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {break;// no lock when adding to empty bin}} else if ((fh = f.hash) == MOVED) {//如果发现Node数组正处于扩容中,那么就进行并发扩容tab = helpTransfer(tab, f);} else {V oldVal = null;//处理Hash冲突synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {//如果是链表binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent) {e.val = value;}break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key, value, null);break;}}} else if (f instanceof TreeBin) {//如果是红黑树Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent) {p.val = value;}}}}}if (binCount != 0) {//如果链表的元素大于等于8if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8treeifyBin(tab, i);//链表转红黑树}if (oldVal != null) {return oldVal;}break;}}}//调用addCount()方法统计Node数组元素的个数addCount(1L, binCount);return null;}...
}

(2)ConcurrentHashMap和HashMap的put操作

都是通过key的hashCode的高低16位的异或运算,来降低Hash冲突概率。

都是通过Hash值与数组大小-1的位与运算(取模),来定位key在数组的位置。

但ConcurrentHashMap使用了自旋 + CAS + synchronized来处理put操作,从而保证了多个线程对数组里某个key进行赋值时的效率 + 并发安全性。

public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {static final int TREEIFY_THRESHOLD = 8;//链表转红黑树的阈值...public V put(K key, V value) {return putVal(hash(key), key, value, false, true);}final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {Node<K,V>[] tab; Node<K,V> p; int n, i;if ((tab = table) == null || (n = tab.length) == 0) {n = (tab = resize()).length;}if ((p = tab[i = (n - 1) & hash]) == null) {//如果通过哈希寻址算法定位到的下标为i的数组元素为空(即tab[i]为空)//那么就可以直接将一个新创建的Node对象放到数组的tab[i]这个位置;tab[i] = newNode(hash, key, value, null);} else {Node<K,V> e; K k;if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) {//通过哈希寻址算法定位到的数组位置已有Node元素//那么判断是否为相同的key,如果是相同的key则进行value覆盖e = p;} else if (p instanceof TreeNode) {//通过哈希寻址算法定位到的数组位置已有Node元素,而且不是相同的key//那么通过"p instanceof TreeNode)",判断数组的tab[i]元素是否是一颗红黑树e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);} else {...}...}++modCount;//判断数组大小size,是否已经达到了扩容阈值threshold大小if (++size > threshold) {resize();}afterNodeInsertion(evict);return null;}...
}

(3)为什么ConcurrentHashMap是并发安全的

首先在初始化Node数组时,会通过自旋 + CAS去设置sizeCtl的值来获得锁。然后在put()操作时,也会通过自旋 + CAS去设置数组某个位置的值。当出现Hash冲突时,则使用synchronized关键字来修改数组某个位置的值。

5.ConcurrentHashMap的Node数组初始化

(1)调用put()方法时才初始化Node数组

(2)initTable()方法的初始化逻辑

(3)sizeCtl的状态流转

(1)调用put()方法时才初始化Node数组

Node数组的初始化过程是被动的,当调用ConcurrentHashMap.put()方法时,如果发现Node数组还没有被初始化,才会调用initTable()方法完成初始化。

(2)initTable()方法的初始化逻辑

initTable()方法和一般的初始化方法不同,因为需要考虑多线程并发情形。

首先while循环的退出条件是Node数据即table初始化成功,否则一直循环。这其实就使用到了自旋的机制,因为多个线程调用initTable()必然会竞争。而在竞争的情况下如果不采用独占锁机制,就只能通过自旋来不断重试。

然后通过sizeCtl是否小于0来判断当前是否有其他线程正在进行初始化。如果有,则通过Thread.yield()把自己变成就绪状态,释放CPU资源。如果没有,则通过CAS修改sizeCtl变量的值为-1。

如果CAS修改sizeCtl成功,则表示当前线程获取初始化Node数组的锁成功了;

如果CAS修改sizeCtl失败,则表示当前线程获取初始化Node数组的锁失败了;

对于获取锁失败的线程,会继续进入下一次while循环进行重试,这样设计是为了避免出现多个线程同时初始化Node数组。

对于获取锁成功的线程,首先会判断Node数组是否已经初始化完成。如果Node数组已经初始化完成,则退出while循环。如果Node数组还是空,则创建一个Node数组,然后赋值给table变量。并且计算下次扩容的阈值(0.75倍当前数组容量),然后赋值给sizeCtl。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { //sizeCtl = -1,表示当前有线程抢占到了初始化Node数组的资格,正在初始化Node数组//sizeCtl < -1,用sizeCtl值的二进制低16位来记录当前参与扩容的线程数量//sizeCtl = 0,表示Node数组未初始化,并且在ConcurrentHashMap构造方法中没有指定初始容量//sizeCtl > 0,如果Node数组已经初始化,那么sizeCtl表示扩容的阈值(初始容量 * 0.75),如果未初始化,则表示数组的初始容量private transient volatile int sizeCtl;private static final long SIZECTL;static {try {U = sun.misc.Unsafe.getUnsafe();//获取UnSafe对象Class<?> k = ConcurrentHashMap.class;SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));//获取sizeCtl变量的偏移量...} catch (Exception e) {throw new Error(e);}}...//初始化Node数组//Initializes table, using the size recorded in sizeCtl.private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;//退出while循环的条件是Node数组即table初始化成功while ((tab = table) == null || tab.length == 0) {//判断当前是否有其他线程正在进行初始化if ((sc = sizeCtl) < 0) {//如果有,则通过Thread.yield()把自己变成就绪状态,释放CPU资源Thread.yield();//lost initialization race; just spin} else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//如果没有线程正在进行初始化,则通过CAS修改sizeCtl变量的值为-1//如果CAS修改成功,则表示当前线程获得了初始化数组的锁//如果CAS修改失败,则表示当前线程获取初始化数组的锁失败try {//再次判断Node数组是否为空,即Node数组是否已经初始化完成//因为执行Thread.yield()让出CPU资源的线程必然会再次执行到这里if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")//初始化大小为n的Node数组,然后赋值给tab变量和table变量Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//赋值给ConcurrentHashMap的全局Node数组tabletable = tab = nt;//计算下次扩容的阈值,阈值的计算是当前数组容量的0.75倍sc = n - (n >>> 2);}} finally {//最后将扩容的阈值赋值给sizeCtlsizeCtl = sc;}break;}}return tab;}...
}

(3)sizeCtl的状态流转

一.sizeCtl = -1

表示当前有线程抢占到了初始化Node数组的资格,正在初始化Node数组。

二.sizeCtl < -1

用sizeCtl值的二进制低16位来记录当前参与扩容的线程数量。

三.sizeCtl = 0

表示Node数组未初始化,且创建ConcurrentHashMap时没有指定初始容量。

四.sizeCtl > 0

如果Node数组已经初始化,那么sizeCtl表示扩容的阈值(初始容量 * 0.75)。如果Node数组未初始化,则表示数组的初始容量。

图片

6.ConcurrentHashMap对Hash冲突的处理

(1)Hash冲突的几个解决方案

(2)ConcurrentHashMap对Hash冲突的处理

(3)链表长度大于8时是扩容还是转化为红黑树

(1)Hash冲突的几个解决方案

一.开放寻址法

如果位置i被占用,那么就探查i+1、i+2、i+3的位置。ThreadLocal采用的就是开放寻址法。

二.链式寻址法

Hash表的每个位置都连接一个链表。当发生Hash冲突时,冲突的元素会被加入到这个位置的链表中。ConcurrentHashMap就是基于链式寻址法解决Hash冲突的。

三.再Hash法

提供多个不同的Hash函数,当发生Hash冲突时,使用第二个、第三个等。

(2)ConcurrentHashMap对Hash冲突的处理

首先使用synchronized对当前位置的Node对象f进行加锁。由于这种锁控制在数组的单个数据元素上,所以长度为16的数组理论上就可以支持16个线程并发写入数据。

然后判断当前位置的Node对象f是链表还是红黑树。如果是链表,那么就把当前的key/value封装成Node对象插入到链表的尾部。如果是红黑树,那么就调用TreeBin的putTreeVal()方法往红黑树插入结点。

最后判断链表的长度是否大于等于8,如果链表的长度大于等于8,再调用treeifyBin()方法决定是扩容数组还是将链表转化为红黑树。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { ...//The array of bins. Lazily initialized upon first insertion.//Size is always a power of two. Accessed directly by iterators.transient volatile Node<K,V>[] table;//Maps the specified key to the specified value in this table.//Neither the key nor the value can be null.public V put(K key, V value) {return putVal(key, value, false);}//获取Node数组在位置i的元素//虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的//因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}//CAS设置Node数组的元素为某个Node对象static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);}final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) {throw new NullPointerException();}//通过key的hashCode的高低16位的位与操作来计算hash值int hash = spread(key.hashCode());int binCount = 0;//这是一个没有结束条件的for循环,用来自旋//其中Node数组的引用赋值给了tab变量for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0) {//调用initTable()方法初始化Node数组tab = initTable();} else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果通过CAS设置Node数组位置i的值为key/value封装的Node对象,则退出for循环if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {break;// no lock when adding to empty bin}} else if ((fh = f.hash) == MOVED) {//发现Node数组正处于扩容中,那么就进行并发扩容tab = helpTransfer(tab, f);} else {V oldVal = null;//处理Hash冲突synchronized (f) {//使用synchronized对当前数组位置的Node对象f进行加锁if (tabAt(tab, i) == f) {if (fh >= 0) {//如果是链表binCount = 1;//binCount用来记录链表的长度//从链表的头结点开始遍历每个结点for (Node<K,V> e = f;; ++binCount) {K ek;//如果存在相同的key,则修改该key的valueif (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent) {e.val = value;}break;}Node<K,V> pred = e;//找到链表的最后一个结点if ((e = e.next) == null) {//把当前的key/value封装成Node对象插入到链表的尾部pred.next = new Node<K,V>(hash, key, value, null);break;}}} else if (f instanceof TreeBin) {//如果是红黑树Node<K,V> p;binCount = 2;//调用TreeBin的putTreeVal()方法往红黑树插入结点if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent) {p.val = value;}}}}}//最后判断链表的长度是否大于等于8if (binCount != 0) {//如果链表的长度大于等于8,再调用treeifyBin()方法决定是扩容数组还是转化为红黑树if (binCount >= TREEIFY_THRESHOLD) {//TREEIFY_THRESHOLD = 8treeifyBin(tab, i);//是扩容数组还是转化为红黑树}if (oldVal != null) {return oldVal;}break;}}}//调用addCount()方法统计Node数组元素的个数addCount(1L, binCount);return null;}...
}

(3)链表长度大于8时是扩容还是转化为红黑树

当链表长度 >= 8时ConcurrentHashMap会对链表采用两种方式进行优化。

方式一:对数组进行扩容

当数组长度 <= 64,且链表长度 >= 8时,优先选择对数组进行扩容。

方式二:把链表转化为红黑树

当数组长度 > 64,且链表长度 >= 8时,会将链表转化为红黑树。

treeifyBin()方法的作用是根据相关阈值来决定是扩容还是把链表转为红黑树。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { static final int MIN_TREEIFY_CAPACITY = 64;...//Replaces all linked nodes in bin at given index unless table is too small, in which case resizes instead.private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {//如果当前数组的长度小于64,则调用tryPresize()方法进行数组扩容if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {tryPresize(n << 1);//数组扩容} else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {//构建一个TreeNode并插入红黑树中TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);if ((p.prev = tl) == null) {hd = p;} else {tl.next = p;}tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}}...
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/21320.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cython学习笔记1:利用Cython加速Python运行速度

Cython学习笔记1&#xff1a;利用Cython加速Python运行速度 CythonCython 的核心特点&#xff1a;利用Cython加速Python运行速度1. Cython加速Python运行速度原理2. 不使用Cython3. 使用Cython加速&#xff08;1&#xff09;使用pip安装 cython 和 setuptools 库&#xff08;2&…

DApp 开发入门指南

DApp 开发入门指南 &#x1f528; 1. DApp 基础概念 1.1 什么是 DApp&#xff1f; 去中心化应用&#xff08;DApp&#xff09;是基于区块链的应用程序&#xff0c;特点是&#xff1a; 后端运行在区块链网络前端可以是任何框架使用智能合约处理业务逻辑数据存储在区块链上 1…

基于Spring Security 6的OAuth2 系列之二十 - 高级特性--令牌交换(Token Exchange)

之所以想写这一系列&#xff0c;是因为之前工作过程中使用Spring Security OAuth2搭建了网关和授权服务器&#xff0c;但当时基于spring-boot 2.3.x&#xff0c;其默认的Spring Security是5.3.x。之后新项目升级到了spring-boot 3.3.0&#xff0c;结果一看Spring Security也升级…

瑞芯微RV1126部署YOLOv8全流程:环境搭建、pt-onnx-rknn模型转换、C++推理代码、错误解决、优化、交叉编译第三方库

目录 1 环境搭建 2 交叉编译opencv 3 模型训练 4 模型转换 4.1 pt模型转onnx模型 4.2 onnx模型转rknn模型 4.2.1 安装rknn-toolkit 4.2.2 onn转成rknn模型 5 升级npu驱动 6 C++推理源码demo 6.1 原版demo 6.2 增加opencv读取图片的代码 7 交叉编译x264 ffmepg和op…

【开源】编译器,在线操作

目录 1. 思绪思维导图&#xff1a;simple mind map2. Markdown&#xff1a;md-editor-v33. 文档&#xff1a;wangEditor4. 电子表格&#xff1a;Luckysheet5. 幻灯片&#xff1a;PPTist6. 白板&#xff1a;excalidraw7. 流程图&#xff1a;drawio 1. 思绪思维导图&#xff1a;…

跳表(Skip List)详解

一、什么是跳表&#xff1f; 跳表是一种基于有序链表的高效数据结构&#xff0c;通过建立多级索引实现快速查询。它在平均情况下支持O(log n)时间复杂度的搜索、插入和删除操作&#xff0c;性能接近平衡树&#xff0c;但实现更为简单。 二、核心原理 1. 层级结构 底层为完整…

【Quest开发】全身跟踪

软件&#xff1a;Unity 2022.3.51f1c1、vscode、Meta XR All in One SDK V72 硬件&#xff1a;Meta Quest3 最终效果&#xff1a;能像meta的操作室沉浸场景一样根据头盔移动来推断用户姿势&#xff0c;实现走路、蹲下、手势匹配等功能 需要借助UnityMovement这个包 GitHub …

25年2月通信基础知识补充:多普勒频移与多普勒扩展、3GPP TDL信道模型

看文献过程中不断发现有太多不懂的基础知识&#xff0c;故长期更新这类blog不断补充在这过程中学到的知识。由于这些内容与我的研究方向并不一定强相关&#xff0c;故记录不会很深入请见谅。 【通信基础知识补充7】25年2月通信基础知识补充1 一、多普勒频移与多普勒扩展傻傻分不…

栈,优先级队列,map,set

文章目录 栈题目解析代码 优先级队列题解代码 map题解代码 set题解代码 栈 题目解析 1.先把元素push进栈中&#xff0c;如果栈非空并且栈中的元素按顺序和k相等就出栈&#xff0c;直到栈为空或者k ! sk.top() 代码 #include<iostream> #include<stack> #include&l…

Linux探秘坊-------4.进度条小程序

1.缓冲区 #include <stdio.h> int main() {printf("hello bite!");sleep(2);return 0; }执行此代码后&#xff0c;会 先停顿两秒&#xff0c;再打印出hello bite&#xff0c;但是明明打印在sleep前面&#xff0c;为什么会后打印呢&#xff1f; 因为&#xff…

Redis的预备知识

1.Redis的基本全局命令 Redis有多种数据结构&#xff0c;但它们都是键值对的&#xff0c;对于与键来说有一些通用的命令 1.1 KEYS 返回所有满足样式&#xff08;pattern&#xff09;的key 假定当前具有以下value值&#xff1a;hllo&#xff0c;hello&#xff0c;hallo&…

量子计算的威胁,以及企业可以采取的措施

当谷歌、IBM、Honeywell和微软等科技巨头纷纷投身量子计算领域时&#xff0c;一场技术军备竞赛已然拉开帷幕。 量子计算虽能为全球数字经济带来巨大价值&#xff0c;但也有可能对相互关联的系统、设备和数据造成损害。这一潜在影响在全球网络安全领域引起了强烈关注。也正因如…

nlp|微调大语言模型初探索(3),qlora微调deepseek记录

前言 上篇文章记录了使用lora微调llama-1b,微调成功,但是微调llama-8b显存爆炸,这次尝试使用qlora来尝试微调参数体量更大的大语言模型,看看64G显存的极限在哪里。 1.Why QLora? QLoRA 在模型加载阶段通过 4-bit 量化大幅减少了模型权重的显存占用。QLoRA 通过 反量化到 …

【设计模式】【创建型模式】工厂方法模式(Factory Methods)

&#x1f44b;hi&#xff0c;我不是一名外包公司的员工&#xff0c;也不会偷吃茶水间的零食&#xff0c;我的梦想是能写高端CRUD &#x1f525; 2025本人正在沉淀中… 博客更新速度 &#x1f44d; 欢迎点赞、收藏、关注&#xff0c;跟上我的更新节奏 &#x1f3b5; 当你的天空突…

DeepSeek模型快速部署教程-搭建自己的DeepSeek

前言&#xff1a;在人工智能技术飞速发展的今天&#xff0c;深度学习模型已成为推动各行各业智能化转型的核心驱动力。DeepSeek 作为一款领先的 AI 模型&#xff0c;凭借其高效的性能和灵活的部署方式&#xff0c;受到了广泛关注。无论是自然语言处理、图像识别&#xff0c;还是…

Deepseek 与 ChatGPT:AI 浪潮中的双子星较量

引言 在人工智能飞速发展的当下&#xff0c;AI 语言模型成为了人们关注的焦点。Deepseek 与 ChatGPT 作为其中的佼佼者&#xff0c;各自展现出独特的魅力&#xff0c;引领着 AI 技术的发展潮流。今天&#xff0c;就让我们深入探讨这两款模型&#xff0c;看看它们在 AI 领域中是…

QT事件循环

文章目录 主事件循环事件循环事件调度器事件处理投递事件发送事件 事件循环的嵌套线程的事件循环deleteLater与事件循环QEventLoop类QEventLoop应用等待一段时间同步操作模拟模态对话框 参考 本文主要对QT中的事件循环做简单介绍和使用 Qt作为一个跨平台的UI框架&#xff0c;其…

解决DeepSeek服务器繁忙问题的实用指南

目录 简述 1. 关于服务器繁忙 1.1 服务器负载与资源限制 1.2 会话管理与连接机制 1.3 客户端配置与网络问题 2. 关于DeepSeek服务的备用选项 2.1 纳米AI搜索 2.2 硅基流动 2.3 秘塔AI搜索 2.4 字节跳动火山引擎 2.5 百度云千帆 2.6 英伟达NIM 2.7 Groq 2.8 Firew…

进程等待和进程程序替换

进程控制 进程等待进程程序替换 进程等待 如果子进程没有退出 而父进程在进行执行waitpid进行等待&#xff0c;阻塞等待&#xff0c; 进程阻塞了 在等待某种条件发生&#xff08;子进程退出&#xff09; 进程程序替换 1 #include <stdio.h>2 #include <unistd.h>3…

UEFI Spec 学习笔记---6 - Block Translation Table (BTT) Layout

6.1 Block Translation Table (BTT) Background 定义个一个连续地址的非易失性的namespace&#xff0c;就是将一整个namespace 拆分成一个个block&#xff0c;其中的地址保存至BBT&#xff08;块转换表&#xff09;&#xff0c;这样可以防止扇区撕裂&#xff08;由于电源问题导…