1 并发集合
1.1 线程安全集合分类
a.遗留的线程安全集合
- 遗留的线程安全集合如 Hashtable , Vector
b.使用 Collections 装饰的线程安全集合
- 使用 Collections 装饰的线程安全集合,如:
-
- Collections.synchronizedCollection
- Collections.synchronizedList
- Collections.synchronizedMap
- Collections.synchronizedSet
- Collections.synchronizedNavigableMap
- Collections.synchronizedNavigableSet
- Collections.synchronizedSortedMap
- Collections.synchronizedSortedSet
- java.util.concurrent.*
c.JUC下的安全集合: Blocking、CopyOnWrite、Concurrent
重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词: Blocking、CopyOnWrite、Concurrent
- Blocking类型 大部分实现基于锁,并提供用来阻塞的方法。该类中大部分方法都是在不满足条件时进行阻塞等待。
- CopyOnWrite类型 之类容器修改开销相对较重。采用了一种在修改时拷贝的方式来避免多线程访问读写时的线程安全问题。适用于读多写少的情况 (写的开销相对比较大)。
- Concurrent 类型的容器 (性能较高)
-
- 优点:内部很多操作使用 cas 优化,一般可以提供较高吞吐量
- 缺点:弱一致性
-
-
- 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
- 求大小弱一致性,size 操作未必是 100% 准确
- 读取弱一致性
-
遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出ConcurrentModifificationException,不再继续遍历
1.2 集合对比
三种集合:
- HashMap 是线程不安全的,性能好
- Hashtable 线程安全基于 synchronized,方法被synchronized修饰,虽然能保证线程安全,但性能太差,已经被淘汰
- ConcurrentHashMap 保证了线程安全,综合性能较好,不止线程安全,而且效率高,性能好
注意:修饰的安全集合类,如:SynchronizedMap,本质和Hashtable类一样,都是在每个方法上加了一个sychronized关键字修饰,性能上仍然没什么提升,因此也不推荐。
集合对比:
- Hashtable 继承 Dictionary 类,HashMap、ConcurrentHashMap 继承 AbstractMap,均实现 Map 接口
- Hashtable 底层是数组 + 链表,JDK8 以后 HashMap 和 ConcurrentHashMap 底层是数组 + 链表 + 红黑树
- HashMap 线程非安全,Hashtable 线程安全,Hashtable 的方法都加了 synchronized 关来确保线程同步
- ConcurrentHashMap、Hashtable 不允许 null 值,HashMap 允许 null 值
- ConcurrentHashMap、HashMap 的初始容量为 16,Hashtable 初始容量为11,填充因子默认都是 0.75,两种 Map 扩容是当前容量翻倍:capacity * 2,Hashtable 扩容时是容量翻倍 + 1:capacity*2 + 1
工作步骤:
-
初始化,使用 cas 来保证并发安全,懒惰初始化 table
-
树化,当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程会用 synchronized 锁住链表头
说明:锁住某个槽位的对象头,是一种很好的细粒度的加锁方式,类似 MySQL 中的行锁
-
put,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素添加至 bin 的尾部
-
get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 会让 get 操作在新 table 进行搜索
-
扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时其它竞争线程也不是无事可做,它们会帮助把其它 bin 进行扩容
-
size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中,最后统计数量时累加
2 ConcurrentHashMap
练习demo:单词计数
生成测试数据
static final String ALPHA = "abcedfghijklmnopqrstuvwxyz";static final String PATH = "";public static void main(String[] args) {ArrayList<String> list = new ArrayList<>();int count = 200;int length = ALPHA.length();//将26个字母,每个字母200个,保存到list集合中for (int i = 0; i < length; i++) {char c = ALPHA.charAt(i);for (int j = 0; j < count; j++) {list.add(String.valueOf(c));}}writeToFile(list, count);//调用demo方法......//将字母写入文件中public static void writeToFile(ArrayList<String> list, int count) {//把集合打乱Collections.shuffle(list);//把集合分成26份,每份200个字母,写入文件中for (int i = 0; i < 26; i++) {try (PrintWriter out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(PATH + "tmp/" + (i + 1) + ".txt")))) {String collect = list.subList(i * count, (i + 1) * count).stream().collect(Collectors.joining("\n"));out.print(collect);} catch (IOException e) {}}}
模版代码,模版代码中封装了多线程读取文件的代码
//该方法有两个参数//1. 供给型参数,用来得到一个多线程之间的共同统计单词数量的map集合//2. 两个传参的消费型参数,传入两个参数,一个map一个list。该方法是:遍历list中出现的单词数量,在对应的map中把数量+1private static <V> void demo(Supplier<Map<String, V>> supplier,BiConsumer<Map<String, V>, List<String>> consumer) {Map<String, V> counterMap = supplier.get();ArrayList<Thread> ts = new ArrayList<>();//开启26个线程,每个线程统计一个文件中的单词数量,再把结果加到一个总的map中for (int i = 1; i <= 26; i++) {int idx = i;//开启线程Thread thread = new Thread(() -> {//获取该文件中的字符串集合List<String> words = readFromFile(idx);//调用BiConsumer,统计当前words中的单词数量consumer.accept(counterMap, words);});ts.add(thread);}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(counterMap);}private static <V> void demo2(Supplier<Map<String, V>> supplier,BiConsumer<Map<String, V>, List<String>> consumer) {Map<String, V> counterMap = supplier.get();ArrayList<Thread> ts = new ArrayList<>();CountDownLatch countDownLatch = new CountDownLatch(26);//开启26个线程,每个线程统计一个文件中的单词数量,再把结果加到一个总的map中for (int i = 1; i <= 26; i++) {int idx = i;//开启线程Thread thread = new Thread(() -> {try {//获取该文件中的字符串集合List<String> words = readFromFile(idx);//调用BiConsumer,统计当前words中的单词数量consumer.accept(counterMap, words);} finally {countDownLatch.countDown();}});thread.start();ts.add(thread);}try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(counterMap);}//返回文件中所有的字符串public static List<String> readFromFile(int i) {ArrayList<String> list = new ArrayList<>();try {BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(PATH + "tmp/" + i + ".txt")));while (true) {String word = in.readLine();if (word == null) {break;}list.add(word);}} catch (Exception e) {e.printStackTrace();}return list;}
说明一下:这里demo和demo2方法是一样的,只是demo2我想使用一下CountDownLatch
正确输出结果应该为:
正确结果输出应该是每个单词出现 200 次
{a=200, b=200, c=200, d=200, e=200, f=200, g=200, h=200, i=200, j=200, k=200, l=200, m=200, n=200, o=200, p=200, q=200, r=200, s=200, t=200, u=200, v=200, w=200, x=200, y=200, z=200}
错误调用demo方法
demo(// 创建 map 集合// 创建 ConcurrentHashMap 对不对?() -> new HashMap<String, Integer>(),// 进行计数(map, words) -> {for (String word : words) {//这里的getter和setter无法保证原子性Integer counter = map.get(word);int newValue = counter == null ? 1 : counter + 1;map.put(word, newValue);}}
);
运行结果:
{a=198, b=186, c=192, d=200, e=174, f=195, g=193, h=199, i=200, j=198, k=198, l=197, m=197, n=200, o=198, p=197, q=199, r=194, s=199, t=196, u=199, v=197, w=199, x=184, y=195, z=194}
说明:可以看到运行结果是有问题的。原因是put新的value的时候,那三段代码无法保证原子性。
解决方案1
//调用demo方法demo(() -> new ConcurrentHashMap<String, Integer>(), (map, words) -> {for (String word : words) {synchronized (map) {Integer counter = map.get(word);map.put(word, counter == null ? 1 : counter + 1);}}});
说明:这里可以通过加synchronized锁,保证对于map中value修改的时候的原子性。但问题是锁了整个map性能下降。
解决方案2
demo2(() -> new ConcurrentHashMap<String, LongAdder>(),(map, words) -> {for (String word : words) {// 注意不能使用 putIfAbsent,此方法返回的是上一次的 value,首次调用返回 null// 解释一下这段代码:首先computeIfAbsent是判断当前word是否为null,// 如果为null就创建一个新的LongAdder(初始值为 0)// 如果不为null,就根据当前key(就是word)从map中获取对应的value(LongAdder)LongAdder value = map.computeIfAbsent(word, (key) -> {return new LongAdder();});//即使这里设置key-value和value自增不是原子性操作,LongAdder的CAS会获取到最新的值再进行加一的,这一点可以放心// 不需要加锁,性能更好value.increment();//再把这个累加器+1}});
使用computeIfAbsent和LongAdder中的CAS原理,避免锁整个map集合。
解决方案3
demo(() -> new ConcurrentHashMap<String, Integer>(), (map, words) -> {for (String word : words) {//这里根据word从map中找value,如果value为null,初始值就为0//反之,value就是map中根据key获取的结果//最后把a和b(1)相加map.merge(word, 1, (a, b) -> {return a + b;});}});