前言

java并发编程(十七)——深入剖析乐观锁-悲观锁 一文中我们在学习乐观锁时提到了CAS,本文来深入研究下CAS的原理及应用场景。

CAS简介

CAS的英文全称是 Compare-And-Swap,中文叫做“比较并交换”,它是一种思想、一种算法。

在多线程的情况下,各个代码的执行顺序是不能确定的,所以为了保证并发安全,我们可以使用互斥锁。而 CAS 的特点是避免使用互斥锁,当多个线程同时使用 CAS 更新同一个变量时,只有其中一个线程能够操作成功,而其他线程都会更新失败。不过和同步互斥锁不同的是,更新失败的线程并不会被阻塞,而是被告知这次由于竞争而导致的操作失败,但还可以再次尝试。

CAS 被广泛应用在并发编程领域中,以实现那些不会被打断的数据交换操作,从而就实现了无锁的线程安全。

CAS的思想

在大多数处理器的指令中,都会实现 CAS 相关的指令,这一条指令就可以完成“比较并交换”的操作,也正是由于这是一条(而不是多条)CPU 指令,所以 CAS 相关的指令是具备原子性的,这个组合操作在执行期间不会被打断,这样就能保证并发安全。由于这个原子性是由 CPU 保证的,所以无需我们程序员来操心。

CAS 有三个操作数:内存值 V、预期值 A、要修改的值 B。CAS 最核心的思路就是,仅当预期值 A 和当前的内存值 V 相同时,才将内存值修改为 B。

我们对此展开描述一下:CAS 会提前假定当前内存值 V 应该等于值 A,而值 A 往往是之前读取到当时的内存值 V。在执行 CAS 时,如果发现当前的内存值 V 恰好是值 A 的话,那 CAS 就会把内存值 V 改成值 B,而值 B 往往是在拿到值 A 后,在值 A 的基础上经过计算而得到的。如果执行 CAS 时发现此时内存值 V 不等于值 A,则说明在刚才计算 B 的期间内,内存值已经被其他线程修改过了,那么本次 CAS 就不应该再修改了,可以避免多人同时修改导致出错。这就是 CAS 的主要思路和流程。

JDK 正是利用了这些 CAS 指令,可以实现并发的数据结构,比如 AtomicInteger 等原子类。

举例

假设有两个线程,分别使用两个 CPU,它们都想利用 CAS 来改变右边的变量的值。我们先来看线程 1,它使用 CPU 1,假设它先执行,它期望当前的值是 100,并且想将其改成 150。在执行的时候,它会去检查当前的值是不是 100,发现真的是 100,所以可以改动成功,而当改完之后,右边的值就会从 100 变成 150:

如上图所示,假设现在才刚刚轮到线程 2 所使用的 CPU 2 来执行,它想要把这个值从 100 改成 200,所以它也希望当前值是 100,可实际上当前值是 150,所以它会发现当前值不是自己期望的值,所以并不会真正的去继续把 100 改成 200,也就是说整个操作都是没有效果的,此次没有修改成功,CAS 操作失败:

当然,接下来线程 2 还可以有其他的操作,这需要根据业务需求来决定,比如重试、报错或者干脆跳过执行。举一个例子,在秒杀场景下,多个线程同时执行秒杀,只要有一个执行成功就够了,剩下的线程当发现自己 CAS 失败了,其实说明兄弟线程执行成功了,也就没有必要继续执行了,这就是跳过操作。所以业务逻辑不同,就会有不同的处理方法,但无论后续怎么处理,之前的那一次 CAS 操作是已经失败了的。

CAS深入

接下来我们把 CAS 拆开,看看它内部究竟做了哪些事情。CAS 的等价语义的代码,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**

* 描述: 模拟CAS操作,等价代码

*/

public class SimulatedCAS {

private int value;

public synchronized int compareAndSwap(int expectedValue, int newValue) {

int oldValue = value;

if (oldValue == expectedValue) {
value = newValue;
}
return oldValue;
}
}

在这段代码中有一个 compareAndSwap 方法,在这个方法里有两个入参,第 1 个入参期望值 expectedValue,第 2 个入参是 newValue,它就是我们计算好的新的值,我们希望把这个新的值去更新到变量上去。

你一定注意到了, compareAndSwap 方法是被 synchronized 修饰的,我们用同步方法为 CAS 的等价代码保证了原子性。

接下来我们来看在 compareAndSwap 方法里都做了哪些事情。需要先拿到变量的当前值,所以代码里用就会用 int oldValue = value 把变量的当前值拿到。然后就是 compare,也就是“比较”,所以此时会用 if (oldValue == expectedValue) 把当前值和期望值进行比较,如果它们是相等的话,那就意味着现在的值正好就是我们所期望的值,满足条件,说明此时可以进行 swap,也就是交换,所以就把 value 的值修改成 newValue,最后再返回 oldValue,完成了整个 CAS 过程。

CAS 最核心的思想就在上面这个流程中体现了,可以看出,compare 指的就是 if 里的比较,比较 oldValue 是否等于 expectedValue;同样,swap 实际上就是把 value 改成 newValue,并且返回 oldValue。所以这整个 compareAndSwap 方法就还原了 CAS 的语义,也象征了 CAS 指令在背后所做的工作。

案例演示

上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class DebugCAS implements Runnable {

private volatile int value;

public synchronized int compareAndSwap(int expectedValue, int newValue) {

int oldValue = value;
if (oldValue == expectedValue) {
value = newValue;
System.out.println("线程"+Thread.currentThread().getName()+"执行成功");
}

return oldValue;
}



public static void main(String[] args) throws InterruptedException {

DebugCAS r = new DebugCAS();
r.value = 100;

Thread t1 = new Thread(r,"Thread 1");
Thread t2 = new Thread(r,"Thread 2");

t1.start();
t2.start();

t1.join();
t2.join();

System.out.println(r.value);
}

@Override
public void run() {
compareAndSwap(100, 150);
}
}

这里的 compareAndSwap 方法就是刚才所讲过的 CAS 的等价语义的代码,然后我们在此基础上加了一行代码,如果执行成功的话,它会打印出是哪个线程执行成功。

而在我们的 main() 方法里面,首先把 DebugCAS 类实例化出来,并把 value 的值修改为 100,这样它的初始值就为 100,接着我们新建两个线程 Thread t1 和 Thread t2,把它们启动起来,并且主线程等待两个线程执行完毕之后,去打印出最后 value 的值。

新建的这两个线程都做了什么内容呢?在 run() 方法里面可以看到,就是执行 compareAndSwap 方法,并且期望的值是 100,希望改成的值是 150,那么当两个线程都去执行 run() 方法的时候,可以预见到的是,只会有一个线程执行成功,另外一个线程不会打印出“执行成功”这句话,因为当它执行的时候会发现,当时的值已经被修改过了,不是 100 了。

执行结果:

可以看到,Thread 1 执行成功,且最终的结果是 150。在这里,打印“Thread 1 执行成功”这句话的概率比打印“Thread 2 执行成功”这句话的概率要大得多,因为 Thread 1 是先 start 的。

打断点Debug运行:

可以看到,此时程序已经停留在打断点的地方了,停留的是 Thread 1(在 Debugger 里可以显示出来当前线程的名字和状态),而 Thread 2 此时的状态是 Monitor (对应 Java 线程的 Blocked 状态),其含义是没有拿到这把锁 synchronized,正在外面等待这把锁。

现在 Thread 1 进到 compareAndSwap 方法里了,我们可以很清楚地看到,oldValue 值是 100,而 expectedValue 的值也是 100,所以它们是相等的。

继续让代码单步运行,因为满足 if 判断条件,所以可以进到 if 语句中,所以接下来会把 value 改成 newValue,而 newValue 的值正是 150。

在修改完成后,还会打印出“线程Thread 1执行成功”这句话。

接下来我们按下左侧的执行按钮,就轮到 Thread 2 了,此时情景就不同了:

以看到,oldValue 拿到的值是 150,因为 value 的值已经被 Thread 1 修改过了,所以,150 与 Thread 2 所期望的 expectedValue 的值 100 是不相等的,从而会跳过整个 if 语句,也就不能打印出“Thread 2 执行成功”这句话,最后会返回 oldValue,其实对这个值没有做任何的修改。

到这里,两个线程就执行完毕了。在控制台,只打印出 Thread 1 执行成功,而没有打印出 Thread 2 执行成功。其中的原因,我们通过 Debug 的方式已经知晓了。

CAS的应用场景

并发容器

Doug Lea 大神在 JUC 包中大量使用了 CAS 技术,该技术既能保证安全性,又不需要使用互斥锁,能大大提升工具类的性能。下面我将通过两个例子来展示 CAS 在并发容器中的使用情况。

案例一:ConcurrentHashMap

有一个醒目的方法,它就是 “casTabAt”,这个方法名就带有 “CAS”,可以猜测它一定是和 CAS 密不可分了,casTabAt方法的实现如下:

1
2
3
4
5
6
7
    // Unsafe mechanics
private static final Unsafe U = Unsafe.getUnsafe();

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

其中Unsafe类是jdk.internal.misc包下的。Unsafe类包含compareAndSetObject、compareAndSetInt、compareAndExchangeInt等和 CAS 密切相关的 native 层的方法,其底层正是利用 CPU 对 CAS 指令的支持实现的。

上面介绍的 casTabAt 方法,不仅被用在了 ConcurrentHashMap 的 putVal 方法中,还被用在了 merge、compute、computeIfAbsent、transfer 等重要的方法中,所以 ConcurrentHashMap 对于 CAS 的应用是比较广泛的。

案例二:ConcurrentLinkedQueue

非阻塞并发队列 ConcurrentLinkedQueue 的 offer 方法里也有 CAS 的身影,offer 方法的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean offer(E e) {
final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (NEXT.compareAndSet(p, null, newNode)) {
if (p != t)
TAIL.weakCompareAndSet(this, t, newNode);
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)

p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

可以看到这个方法里面又是个死循环,只有一个return语句可以中断此循环。里面的NEXT.compareAndSet(p, null, newNode)方法也是使用了CAS的思想进行不断的重试,直到某一次满足条件时return退出。

数据库

在我们的数据库中,也存在对乐观锁和 CAS 思想的应用。在更新数据时,我们可以利用 version 字段在数据库中实现乐观锁和 CAS 操作,而在获取和修改数据时都不需要加悲观锁。

具体思路如下:当我们获取完数据,并计算完毕,准备更新数据时,会检查现在的版本号与之前获取数据时的版本号是否一致,如果一致就说明在计算期间数据没有被更新过,可以直接更新本次数据;如果版本号不一致,则说明计算期间已经有其他线程修改过这个数据了,那就可以选择重新获取数据,重新计算,然后再次尝试更新数据。

假设取出数据的时候 version 版本为 1,相应的 SQL 语句示例如下所示:

1
UPDATE student SET name = ‘小王’,version = 2  WHERE  id = 10  AND version = 1 

这样一来就可以用 CAS 的思想去实现本次的更新操作,它会先去比较 version 是不是最开始获取到的 1,如果和初始值相同才去进行 name 字段的修改,同时也要把 version 的值加一。

原子类

在原子类中,例如 AtomicInteger,也使用了 CAS,AtomicInteger 的 getAndAdd 方法,该方法代码如下所示:

getAndAddInt方法源码如下:

1
2
3
4
5
6
7
    public final int getAndAddInt(Object var1 , long offset, int delta) {
int var5;
do {
var5 = getIntVolatile(var1, offset);
} while (!weakCompareAndSetInt(var1, offset, var5, var5 + delta));
return var5;
}

在这里,我们看到上述方法中有对 var5 的赋值,调用了 unsafe 的 getIntVolatile(var1, offset) 方法,这是一个 native 方法,作用是获取变量 var1 中偏移量 offset处的值。这里传入 var1 的是 AtomicInteger 对象的引用,而 offset就是 AtomicInteger 里面所存储的数值(也就是 value)的偏移量 valueOffset,所以此时得到的 var5 实际上代表当前时刻下的原子类中存储的数值。

接下来重点来了,我们看到有一个weakCompareAndSetInt方法,这里会传入多个参数,分别是 var1、offset、 var5、var5 + delta,其实它们代表 object、offset、expectedValue 和 newValue。

  1. 第一个参数 object 就是将要修改的对象,传入的是 this,也就是 atomicInteger 这个对象本身;
  2. 第二个参数是 offset,也就是偏移量,借助它就可以获取到 value 的数值;
  3. 第三个参数 expectedValue,代表“期望值”,传入的是刚才获取到的 var5;
  4. 而最后一个参数 newValue 是希望修改为的新值 ,等于之前取到的数值 var5 再加上 delta,而 delta就是我们之前所传入的 delta,delta 就是我们希望原子类所改变的数值,比如可以传入 +1,也可以传入 -1。

所以 compareAndSwapInt 方法的作用就是,判断如果现在原子类里 value 的值和之前获取到的 var5 相等的话,那么就把计算出来的 var5 + delta给更新上去,所以说这行代码就实现了 CAS 的过程。

一旦 CAS 操作成功,就会退出这个 while 循环,但是也有可能操作失败。如果操作失败就意味着在获取到 var5 之后,并且在 CAS 操作之前,value 的数值已经发生变化了,证明有其他线程修改过这个变量。

这样一来,就会再次执行循环体里面的代码,重新获取 var5 的值,也就是获取最新的原子变量的数值,并且再次利用 CAS 去尝试更新,直到更新成功为止,所以这是一个死循环。

我们总结一下,Unsafe 的 getAndAddInt 方法是通过循环 + CAS 的方式来实现的,在此过程中,它会通过 compareAndSwapInt 方法来尝试更新 value 的值,如果更新失败就重新获取,然后再次尝试更新,直到更新成功。

CAS的缺点

CAS 是有很多优点的,比如可以避免加互斥锁,可以提高程序的运行效率。但是同样 CAS 也有非常明显的缺点。所以我们在使用 CAS 的时候应该同时考虑到它的优缺点,合理地进行技术选型。

下面我们就来看一下 CAS 有哪几个主要的缺点。

缺点一——ABA 问题

首先,CAS 最大的缺点就是 ABA 问题。

决定 CAS 是否进行 swap 的判断标准是“当前的值和预期的值是否一致”,如果一致,就认为在此期间这个数值没有发生过变动,这在大多数情况下是没有问题的。

但是在有的业务场景下,我们想确切知道从上一次看到这个值以来到现在,这个值是否发生过变化。例如,这个值假设从 A 变成了 B,再由 B 变回了 A,此时,我们不仅认为它发生了变化,并且会认为它变化了两次。

在这种场景下,我们使用 CAS,就看不到这两次的变化,因为仅判断“当前的值和预期的值是否一致”就是不够的了。CAS 检查的并不是值有没有发生过变化,而是去比较这当前的值和预期值是不是相等,如果变量的值从旧值 A 变成了新值 B 再变回旧值 A,由于最开始的值 A 和现在的值 A 是相等的,所以 CAS 会认为变量的值在此期间没有发生过变化。所以,CAS 并不能检测出在此期间值是不是被修改过,它只能检查出现在的值和最初的值是不是一样。

我们举一个例子:假设第一个线程拿到的初始值是 100,然后进行计算,在计算的过程中,有第二个线程把初始值改为了 200,然后紧接着又有第三个线程把 200 改回了 100。等到第一个线程计算完毕去执行 CAS 的时候,它会比较当前的值是不是等于最开始拿到的初始值 100,此时会发现确实是等于 100,所以线程一就认为在此期间值没有被修改过,就理所当然的把这个 100 改成刚刚计算出来的新值,但实际上,在此过程中已经有其他线程把这个值修改过了,这样就会发生 ABA 问题

如果发生了 ABA 问题,那么线程一就根本无法知晓在计算过程中是否有其他线程把这个值修改过,由于第一个线程发现当前值和预期值是相等的,所以就会认为在此期间没有线程修改过变量的值,所以它接下来的一些操作逻辑,是按照在此期间这个值没被修改过”的逻辑去处理的,比如它可能会打印日志:“本次修改十分顺利”,但是它本应触发其他的逻辑,比如当它发现了在此期间有其他线程修改过这个值,其实本应该打印的是“本次修改过程受到了干扰”。

那么如何解决这个问题呢?添加一个版本号就可以解决。

我们在变量值自身之外,再添加一个版本号,那么这个值的变化路径就从 A→B→A 变成了 1A→2B→3A,这样一来,就可以通过对比版本号来判断值是否变化过,这比我们直接去对比两个值是否一致要更靠谱,所以通过这样的思路就可以解决 ABA 的问题了。

在 atomic 包中提供了 AtomicStampedReference 这个类,它是专门用来解决 ABA 问题的,解决思路正是利用版本号,AtomicStampedReference 会维护一种类似 <Object,int> 的数据结构,其中的 int 就是用于计数的,也就是版本号,它可以对这个对象和 int 版本号同时进行原子更新,从而也就解决了 ABA 问题。因为我们去判断它是否被修改过,不再是以值是否发生变化为标准,而是以版本号是否变化为标准,即使值一样,它们的版本号也是不同的。

以上就是对 CAS 的第一个缺点—— ABA 问题的介绍。

缺点二——自旋时间过长

CAS 的第二个缺点就是自旋时间过长。

由于单次 CAS 不一定能执行成功,所以 CAS 往往是配合着循环来实现的,有的时候甚至是死循环,不停地进行重试,直到线程竞争不激烈的时候,才能修改成功。

可是如果我们的应用场景本身就是高并发的场景,就有可能导致 CAS 一直都操作不成功,这样的话,循环时间就会越来越长。而且在此期间,CPU 资源也是一直在被消耗的,这会对性能产生很大的影响。所以这就要求我们,要根据实际情况来选择是否使用 CAS,在高并发的场景下,通常 CAS 的效率是不高的。

缺点三——范围不能灵活控制

CAS 的第三个缺点就是不能灵活控制线程安全的范围。

通常我们去执行 CAS 的时候,是针对某一个,而不是多个共享变量的,这个变量可能是 Integer 类型,也有可能是 Long 类型、对象类型等等,但是我们不能针对多个共享变量同时进行 CAS 操作,因为这多个变量之间是独立的,简单的把原子操作组合到一起,并不具备原子性。因此如果我们想对多个对象同时进行 CAS 操作并想保证线程安全的话,是比较困难的。

有一个解决方案,那就是利用一个新的类,来整合刚才这一组共享变量,这个新的类中的多个成员变量就是刚才的那多个共享变量,然后再利用 atomic 包中的 AtomicReference 来把这个新对象整体进行 CAS 操作,这样就可以保证线程安全。

相比之下,如果我们使用其他的线程安全技术,那么调整线程安全的范围就可能变得非常容易,比如我们用 synchronized 关键字时,如果想把更多的代码加锁,那么只需要把更多的代码放到同步代码块里面就可以了。

总结

  1. 我们学习了 CAS的原理,主要是比较并交换,以及CAS的一些使用举例;

  2. 我们了解了CAS的使用场景,以及在jdk源码比如并发包(例如ConcurrentHashMap)中的应用场景;

  3. 我们剖析了CAS的一些缺点,主要有ABA问题、自旋时间过长、范围不能灵活控制三个缺点。