Atomic类和CAS
线程安全一直编程中重要问题,而阻塞同步(互斥同步)和非阻塞同步一直是实现线程安全的两个重要手段。非阻塞同步是一种乐观策略,CAS(比较并交换Compare-and-Swap)就是这种乐观并发策略的一个实现。java.util.concurrent 包中提供的 atomic 类底层就是使用处理的 CAS 指令,来实现原子操作。
一、悲观和乐观策略
1、悲观锁
Java中常用的 synchronize 和 ReentrantLock就是典型的悲观并发策略,这种策略下无论共享数据是否真的会出现竞争,它都要进行加锁,然后进行资源独占。带来的问题就是更高的性能消耗:用户态核心态转换、维护锁计数器等。
随着硬件指令集的发展,我们有了另外一个选择:基于冲突检测的乐观并发策略。乐观并发策略中,先进行操作,如果没有其他线程争用共享数据,那操作就成功了;如果共享数据有争用,产生了冲突,那就再采取其他的补偿措施(最常见的补偿措施就是不断地重试,直到成功为止),这种乐观的并发策略的许多实现都不需要把线程挂起。
通常情况下,乐观并发策略的实现需要依赖处理器提供的原子指令,现代主流处理器很多都提供对应的原子指令来完成 CAS 功能。
二、CAS-Compare-and-Swap
1、CAS操作过程
CAS指令需要有3个操作数,分别是内存位置(在Java中可以简单理解为变量的内存地址,用V表示)、旧的预期值(用A表示)和新值(用B表示)。CAS指令执行时,当且仅当V符合旧预期值A时,处理器用新值B更新V的值,否则它就不执行更新,但是无论是否更新了V的值,都会返回V的旧值,上述的处理过程是一个原子操作。
Java1.5之后,利用 sun.misc.Unsafe 类里面的 compareAndSwapInt() 和compareAndSwapLong() 等方法,就可以实现 CAS 操作。java.util.concurrent 包中提供的 atomic 类底层就是就是调用的 Unsafe 来进行 CAS 操作。
2、CAS的问题
(1)ABA问题
CAS从语义上来说并不是完美的,存在这样的一个逻辑漏洞:如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然为A值,那我们就能说它的值没有被其他线程改变过了吗?如果在这段期间它的值曾经被改成了B,后来又被改回为A,那CAS操作就会误认为它从来没有被改变过。这个漏洞称为CAS操作的”ABA”问题。
大部分情况下ABA问题不会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更高效。
(2)自旋时间过长的CPU消耗
CAS操作更新失败的常用补救措施就是不断尝试直到成功,例如 Java中的 sun.misc.Unsafe类的 CAS 操作,
public final int getAndSetInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var4)); return var5; }
而不断尝试的过程是会占用CPU资源的。Unsafe类中自旋补救直接应用的是处理机指令 ,其处理速度十分迅速,基本上不用去考虑自旋的CPU消耗问题。但是在很多场景中,这种循环尝试的手段是需要加以控制的。
三、Atomic类
1、AtomicInteger
AtomicInteger 整体比较简单,实际数据保存在volatile修改的成员变量 value 中。
private static final Unsafe unsafe = Unsafe.getUnsafe(); //value的字节偏移 private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } //实际数据 private volatile int value;
其核心方法也是只是对unsafe的简单包装
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
sun.misc.Safe 类中 CAS 操作,核心就是native CAS调用的调动和 自旋操作。
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
// 失败则自旋,直到成功
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
public final long getAndAddLong(Object var1, long var2, long var4) {
//………………
}
public final int getAndSetInt(Object var1, long var2, int var4) {
//………………
}
public final long getAndSetLong(Object var1, long var2, long var4) {
//………………
}
public final Object getAndSetObject(Object var1, long var2, Object var4)
//………………
}
AtomicBoolean、AtomicLong、AtomicReference<V>
AtomicBoolean中实际成员变量是一个 int 值,通过对 value == 0 的判断来实现 bool 值。
private volatile int value;
AtomicLong 中的操作则与 AtomicInteger 类似, 只不过调用的是 Unsafe中的 long 变量对应的方法,
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}
类似的AtomicReference,可以原子的修改对象引用,而底层则用的是 getAndSetObject 方法。
public final Object getAndSetObject(Object var1, long var2, Object var4) { Object var5; do { var5 = this.getObjectVolatile(var1, var2); } while(!this.compareAndSwapObject(var1, var2, var5, var4)); return var5; }
2、AtomicIntegerArray 和 AtomicLongArray
内部元素可以进行原子操作**的数组**,这两个类本质操作一直,只是数据类型不一样。我们看看 AtomicIntegerArray的 getAndAdd 方法。
/** * Atomically adds the given value to the element at index {@code i}. * * @param i the index * @param delta the value to add * @return the previous value */ public final int getAndAdd(int i, int delta) { return unsafe.getAndAddInt(array, checkedByteOffset(i), delta); }
//计算指定索引数据的偏移量 private long checkedByteOffset(int i) { if (i < 0 || i >= array.length) throw new IndexOutOfBoundsException("index " + i); return byteOffset(i); } private static long byteOffset(int i) { return ((long) i << shift) + base; }
指定索引后,计算出索引数据的偏移量,对指定索引的值进行原子的 CAS 操作。
3、对象字段的原子更改 AtomicXXXUpdater
原子的对象字段更改器,可以更改被 volatile 修饰的 int、long和引用数据,分别对应AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。
原理是通过反射获取对应字段的偏移量,然后通过对的 CAS操作进行原子操作。
4、LongAdder、DoubleAdder
https://www.cnblogs.com/tong-yuan/p/LongAdder.html
LongAdder是java8中新增的原子类,在多线程环境。内部通过 cell 存储实际值,并使用 CAS 进行原子操作。分段锁的思想。通过 sum()方法是把 base 和所有段 cell 的值相加得到结果。
主要属性
// 这三个属性都在Striped64中 // cells数组,存储各个段的值 transient volatile Cell[] cells; // 最初无竞争时使用的,也算一个特殊的段 transient volatile long base; // 标记当前是否有线程在创建或扩容cells,或者在创建Cell // 通过CAS更新该值,相当于是一个锁 transient volatile int cellsBusy;
static final class Cell { // 存储元素的值,使用volatile修饰保证可见性 volatile long value; Cell(long x) { value = x; } // CAS更新value的值 final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } //………… }
使用
static void testLongAdder(final int threadCount, final int times) throws InterruptedException { LongAdder longAdder = new LongAdder(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ longAdder.add(1); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } }
(1)LongAdder通过base和cells数组来存储值;
(2)不同的线程会hash到不同的cell上去更新,减少了竞争;分段锁思想,减少锁粒度。
(3)LongAdder的性能非常高,最终会达到一种无竞争的状态;