Atomic类和CAS

Atomic类和CAS

线程安全一直编程中重要问题,而阻塞同步(互斥同步)和非阻塞同步一直是实现线程安全的两个重要手段。非阻塞同步是一种乐观策略,CAS(比较并交换Compare-and-Swap)就是这种乐观并发策略的一个实现。java.util.concurrent 包中提供的 atomic 类底层就是使用处理的 CAS 指令,来实现原子操作。


一、悲观和乐观策略

1、悲观锁

Java中常用的 synchronize 和 ReentrantLock就是典型的悲观并发策略,这种策略下无论共享数据是否真的会出现竞争,它都要进行加锁,然后进行资源独占。带来的问题就是更高的性能消耗:用户态核心态转换、维护锁计数器等。

2、乐观锁

随着硬件指令集的发展,我们有了另外一个选择:基于冲突检测的乐观并发策略。乐观并发策略中,先进行操作,如果没有其他线程争用共享数据,那操作就成功了;如果共享数据有争用,产生了冲突,那就再采取其他的补偿措施(最常见的补偿措施就是不断地重试,直到成功为止),这种乐观的并发策略的许多实现都不需要把线程挂起。

通常情况下,乐观并发策略的实现需要依赖处理器提供的原子指令,现代主流处理器很多都提供对应的原子指令来完成 CAS 功能。

二、CAS-Compare-and-Swap

1、CAS操作过程

CAS指令需要有3个操作数,分别是内存位置(在Java中可以简单理解为变量的内存地址,用V表示)、旧的预期值(用A表示)和新值(用B表示)。CAS指令执行时,当且仅当V符合旧预期值A时,处理器用新值B更新V的值,否则它就不执行更新,但是无论是否更新了V的值,都会返回V的旧值,上述的处理过程是一个原子操作。

Java1.5之后,利用 sun.misc.Unsafe 类里面的 compareAndSwapInt() 和compareAndSwapLong() 等方法,就可以实现 CAS 操作。java.util.concurrent 包中提供的 atomic 类底层就是就是调用的 Unsafe 来进行 CAS 操作。

2、CAS的问题

(1)ABA问题

CAS从语义上来说并不是完美的,存在这样的一个逻辑漏洞:如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然为A值,那我们就能说它的值没有被其他线程改变过了吗?如果在这段期间它的值曾经被改成了B,后来又被改回为A,那CAS操作就会误认为它从来没有被改变过。这个漏洞称为CAS操作的”ABA”问题。

大部分情况下ABA问题不会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更高效。

(2)自旋时间过长的CPU消耗

CAS操作更新失败的常用补救措施就是不断尝试直到成功,例如 Java中的 sun.misc.Unsafe类的 CAS 操作,

public final int getAndSetInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var4));
    return var5;
}

而不断尝试的过程是会占用CPU资源的。Unsafe类中自旋补救直接应用的是处理机指令 ,其处理速度十分迅速,基本上不用去考虑自旋的CPU消耗问题。但是在很多场景中,这种循环尝试的手段是需要加以控制的。

三、Atomic类

1、AtomicInteger

AtomicInteger 整体比较简单,实际数据保存在volatile修改的成员变量 value 中。

private static final Unsafe unsafe = Unsafe.getUnsafe();
//value的字节偏移
private static final long valueOffset;
static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}
//实际数据
private volatile int value;

其核心方法也是只是对unsafe的简单包装

public final int getAndSet(int newValue) {
    return unsafe.getAndSetInt(this, valueOffset, newValue);
}

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

public final int getAndDecrement() {
    return unsafe.getAndAddInt(this, valueOffset, -1);
}

public final int getAndAdd(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta);
}

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

public final int decrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}

public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

sun.misc.Safe 类中 CAS 操作,核心就是native CAS调用的调动和 自旋操作

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

// 失败则自旋,直到成功
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    return var5;
}

public final long getAndAddLong(Object var1, long var2, long var4) {
    //………………
}

public final int getAndSetInt(Object var1, long var2, int var4) {
    //………………
}

public final long getAndSetLong(Object var1, long var2, long var4) {
  //………………
}

public final Object getAndSetObject(Object var1, long var2, Object var4) 
 //………………
}

AtomicBoolean、AtomicLong、AtomicReference<V>

AtomicBoolean中实际成员变量是一个 int 值,通过对 value == 0 的判断来实现 bool 值。

private volatile int value;

AtomicLong 中的操作则与 AtomicInteger 类似, 只不过调用的是 Unsafe中的 long 变量对应的方法,

/**
 * Atomically increments by one the current value.
 *
 * @return the previous value
 */
public final long getAndIncrement() {
    return unsafe.getAndAddLong(this, valueOffset, 1L);
}

类似的AtomicReference,可以原子的修改对象引用,而底层则用的是 getAndSetObject 方法。


public final Object getAndSetObject(Object var1, long var2, Object var4) {
    Object var5;
    do {
        var5 = this.getObjectVolatile(var1, var2);
    } while(!this.compareAndSwapObject(var1, var2, var5, var4));
    return var5;
}

2、AtomicIntegerArray 和 AtomicLongArray

内部元素可以进行原子操作**的数组**,这两个类本质操作一直,只是数据类型不一样。我们看看 AtomicIntegerArray的 getAndAdd 方法。

/**
 * Atomically adds the given value to the element at index {@code i}.
 *
 * @param i the index
 * @param delta the value to add
 * @return the previous value
 */
public final int getAndAdd(int i, int delta) {
    return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}
//计算指定索引数据的偏移量
private long checkedByteOffset(int i) {
    if (i < 0 || i >= array.length)
        throw new IndexOutOfBoundsException("index " + i);
    return byteOffset(i);
}

private static long byteOffset(int i) {
    return ((long) i << shift) + base;
}

指定索引后,计算出索引数据的偏移量,对指定索引的值进行原子的 CAS 操作。

3、对象字段的原子更改 AtomicXXXUpdater

原子的对象字段更改器,可以更改被 volatile 修饰的 int、long和引用数据,分别对应AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。

原理是通过反射获取对应字段的偏移量,然后通过对的 CAS操作进行原子操作。

4、LongAdder、DoubleAdder

https://www.cnblogs.com/tong-yuan/p/LongAdder.html

LongAdder是java8中新增的原子类,在多线程环境。内部通过 cell 存储实际值,并使用 CAS 进行原子操作。分段锁的思想。通过 sum()方法是把 base 和所有段 cell 的值相加得到结果。

主要属性

// 这三个属性都在Striped64中
// cells数组,存储各个段的值
transient volatile Cell[] cells;
// 最初无竞争时使用的,也算一个特殊的段
transient volatile long base;
// 标记当前是否有线程在创建或扩容cells,或者在创建Cell
// 通过CAS更新该值,相当于是一个锁
transient volatile int cellsBusy;
static final class Cell {
    // 存储元素的值,使用volatile修饰保证可见性
    volatile long value;
    
    Cell(long x) { value = x; }
    // CAS更新value的值
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
    //…………
}

使用

static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        LongAdder longAdder = new LongAdder();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    longAdder.add(1);
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }

(1)LongAdder通过base和cells数组来存储值;

(2)不同的线程会hash到不同的cell上去更新,减少了竞争;分段锁思想,减少锁粒度。

(3)LongAdder的性能非常高,最终会达到一种无竞争的状态;

四、参考

深入理解Java虚拟机(第2版) : JVM高级特性与最佳实践

乐观锁的一种实现方式——CAS

【死磕Java并发】—-深入分析CAS

发表评论

电子邮件地址不会被公开。 必填项已用*标注