文章目录
- 原子累加器
- cas 锁
- 原理之伪共享
- Unsafe
- Unsafe CAS 操作
原子累加器
累加器性能比较
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {T adder = adderSupplier.get();long start = System.nanoTime();List<Thread> ts = new ArrayList<>();// 4 个线程,每人累加 50 万
for (int i = 0; i < 40; i++) {ts.add(new Thread(() -> {for (int j = 0; j < 500000; j++) {action.accept(adder);}}));}ts.forEach(t -> t.start());ts.forEach(t -> {try {t.join();}
catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(adder + " cost:" + (end - start)/1000_000);}
比较 AtomicLong 与 LongAdder
for (int i = 0; i < 5; i++) {demo(() -> new LongAdder(), adder -> adder.increment());}for (int i = 0; i < 5; i++) {demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());}
输出
1000000 cost:43
1000000 cost:9
1000000 cost:7
1000000 cost:7
1000000 cost:7
1000000 cost:31
1000000 cost:27
1000000 cost:28
1000000 cost:24
1000000 cost:22
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性
能。
- 源码之 LongAdder
LongAdder 是并发大师 @author Doug Lea 的作品,设计的非常精巧
LongAdder 类有几个关键域
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
cas 锁
// 不要用于实践!!!
public class LockCas {private AtomicInteger state = new AtomicInteger(0);public void lock() {while (true) {if (state.compareAndSet(0, 1)) {break;}}}public void unlock() {log.debug("unlock...");state.set(0);}
}
测试
LockCas lock = new LockCas();new Thread(() -> {log.debug("begin...");lock.lock();try {log.debug("lock...");sleep(1);}
finally {lock.unlock();}}).start();new Thread(() -> {log.debug("begin...");lock.lock();try {log.debug("lock...");}
finally {lock.unlock();}}).start();
输出
18:27:07.198 c.Test42 [Thread-0] - begin...
18:27:07.202 c.Test42 [Thread-0] - lock...
18:27:07.198 c.Test42 [Thread-1] - begin...
18:27:08.204 c.Test42 [Thread-0] - unlock...
18:27:08.204 c.Test42 [Thread-1] - lock...
18:27:08.204 c.Test42 [Thread-1] - unlock...
原理之伪共享
其中 Cell 即为累加单元
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {volatile long value;Cell(long x) { value = x; }// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);}// 省略不重要代码
}
得从缓存说起
缓存与内存的速度比较
从 cpu 到 | 大约需要的时钟周期 |
---|---|
寄存器 | 1 cycle (4GHz 的 CPU 约为0.25ns) |
L1 | 3~4 cycle |
L2 | 10~20 cycle |
L3 | 40~45 cycle |
内存 | 120~240 cycle |
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:
- Core-0 要修改 Cell[0]
- Core-1 要修改 Cell[1]
论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
累加主要调用下面的方法
public void add(long x) {// as 为累加单元数组
// b 为基础值
// x 为累加值
Cell[] as; long b, v; int m; Cell a;// 进入 if 的两个条件
// 1. as 有值, 表示已经发生过竞争, 进入 if
// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 ifif ((as = cells) != null || !casBase(b = base, b + x)) {// uncontended 表示 cell 没有竞争
boolean uncontended = true;if (// as 还没有创建
as == null || (m = as.length - 1) < 0 ||// 当前线程对应的 cell 还没有(a
= as[getProbe() & m]) == null ||// cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )!(uncontended = a.cas(v = a.value, v + x))) {// 进入 cell 数组创建、cell 创建的流程longAccumulate(x, null, uncontended);}}}
add 流程图
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cellif ((h = getProbe()) == 0) {// 初始化 probeThreadLocalRandom.current();// h 对应新的 probe 值, 用来对应 cellh = getProbe();wasUncontended = true;}// collide 为 true 表示需要扩容
boolean collide = false;
for (;;) {Cell[] as; Cell a; int n; long v;// 已经有了 cellsif ((as = cells) != null && (n = as.length) > 0) {// 还没有 cellif ((a = as[(n - 1) & h]) == null) {// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x// 成功则 break, 否则继续 continue 循环}// 有竞争, 改变线程对应的 cell 来重试 caselse if (!wasUncontended)wasUncontended = true;// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 nullelse if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 caselse if (n >= NCPU || cells != as)collide = false;// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
else if (!collide)collide = true;// 加锁
else if (cellsBusy == 0 && casCellsBusy()) {// 加锁成功, 扩容
continue;}// 改变线程对应的 cellh = advanceProbe(h);}// 还没有 cells, 尝试给 cellsBusy 加锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {// 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell// 成功则 break;}// 上两种情况失败, 尝试给 base 累加
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;}}
longAccumulate 流程图
每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)
获取最终结果通过 sum 方法
public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}
Unsafe
概述
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
public class UnsafeAccessor {static Unsafe unsafe;static {try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);unsafe = (Unsafe) theUnsafe.get(null);}
catch (NoSuchFieldException | IllegalAccessException e) {throw new Error(e);}}static Unsafe getUnsafe() {return unsafe;}}
Unsafe CAS 操作
@Dataclass Student {volatile int id;volatile String name;}
Unsafe unsafe = UnsafeAccessor.getUnsafe();Field id = Student.class.getDeclaredField("id");Field name = Student.class.getDeclaredField("name");// 获得成员变量的偏移量
long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);Student student = new Student();// 使用 cas 方法替换成员变量的值
UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 trueUnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 trueSystem.out.println(student);
输出
Student(id=20, name=张三)
使用自定义的 AtomicData 实现之前线程安全的原子整数 Account 实现
class AtomicData {private volatile int data;static final Unsafe unsafe;static final long DATA_OFFSET;static {unsafe = UnsafeAccessor.getUnsafe();try {// data 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性
DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));}
catch (NoSuchFieldException e) {throw new Error(e);}}public AtomicData(int data) {this.data = data;}public void decrease(int amount) {int oldValue;while(true) {// 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解
oldValue = data;// cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 falseif (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {return;}}}public int getData() {return data;}}
Account 实现
Account.demo(new Account() {AtomicData atomicData = new AtomicData(10000);@Overridepublic Integer getBalance() {return atomicData.getData();}@Overridepublic void withdraw(Integer amount) {atomicData.decrease(amount);}});