目录
  1. 1 简介
  2. 2 提供的功能
    1. 2.1 排它锁
    2. 2.2 共享锁
    3. 2.3 条件变量
  3. 3 如何基于AQS实现自己的锁
    1. 3.1 tryAcquire & tryRelease
    2. 3.2 tryAcquireShared & tryReleaseShared
    3. 3.1 isHeldExclusively
  4. 4 性能
  5. 参考文档:

1 简介

  AbstractQueuedSynchronizer(以下简称AQS)是一个提供了锁和同步功能的抽象类,它为许多Java的并发类提供了同步方式:例如CountDownLatch、ReentrantLock、ReentrantReadWriteLock等。
  作为一个没有抽象方法的抽象类,AQS在使用时需要被继承,而且其文档建议通过一个non-public的内部类来继承AQS。因此其定位在于实现一个帮助类,为包含它的类提供同步和锁的相关功能。
  本文仅作AQS的简述,以流程图的方式展示其内部实现,并不深究代码及其无锁编程的原理。有兴趣的同学可以仔细对照源码并参考结尾的论文。
  在了解怎么使用它之前,我们先看一下它提供了哪些功能以及怎么实现的。

2 提供的功能

2.1 排它锁

  AQS提供的与排它锁相关的public方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* 
* 获取排它锁,若失败则等待
* 等待时会忽略线程的interrupt
*/
public final void acquire(int arg);

/*
* 获取排它锁,若失败则等待
* 若等待时被interrupt则抛出InterruptedException
*/
public final void acquireInterruptibly(int arg);

/*
* 获取排它锁,若失败则等待
* 若等待时被interrupt则抛出InterruptedException
* 若等待时间超过nanosTimeout,则返回false
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout);

/*
* 释放排它锁
* 返回值代表是否释放成功
*/
public final boolean release(int arg);

  我们选取acquire和release两个方法,来简单阐述一下其内部实现(其他两个acquire方法原理相仿,只是添加了一些功能)。本文只讨论常规流程,具体细节请参照源码。
image
  上图展示了acquire方法的内部流程。为了方便理解,我们首先需要了解上图提到的两个概念:

1.tryAcquire

  首先是tryAcquire方法,图中可见该方法在acquire的调用初始和每次唤醒的检测中都会被调用。它定义在AQS类中,其完整声明如下:

1
protected boolean tryAcquire(int arg);

  该方法内需要查询当前条件是否可以拿到锁,如果可以则获取锁。如果tryAcquire返回true,则说明条件满足并且成功的拿到了锁。
  至于用什么条件才可以拿到锁,用什么表示锁的状态,则需要由AQS的使用者来定义。因此AQS并没有实现该方法的具体逻辑,而是需要使用者的子类覆盖这个方法。其具体使用细节后面会提到。

2.同步队列

  AQS将等待获取锁的所有线程放入了一个FIFO的双向链表里,该队列是CLH队列的一种变种:即每个节点都有一个表示状态的属性。队列节点的代码和具体属性值如下:

1
2
3
4
5
6
7
8
static final class Node {
volatile int waitStatus; //节点的状态
volatile Node prev;//前向指针
volatile Node next;//后向指针
volatile Thread thread;//节点关联的线程
}
private transient volatile Node head;//队列的head
private transient volatile Node tail;//队列的tail
1
2
3
4
5
6
7
8
9
//因为超时或者中断,节点会被设置为取消状态。取消状态的节点会被移除队列。
static final int CANCELLED = 1;
//意味着后继节点的线程被挂起。当前节点将会在release时唤醒后继节点。
static final int SIGNAL = -1;
//当前节点的线程正在等待一个condition。说明当前队列是条件队列而不是同步队列。
static final int CONDITION = -2;
//后续结点会传播唤醒的操作,在共享模式下起作用。
static final int PROPAGATE = -3;
//除上述,队列会初始化一个status为0的节点

  对于同步队列,有以下几个特点:

  了解了上述两个概念,基本可以了解acquire获取互斥锁的基本流程了:首先会通过tryAcquire()快速判断是否能获取锁(此时有可能刚好抢占到用户的条件,这也是它的非公平之处),若获取失败则加入到同步队列末尾。在同步队列中,通常首节点是已经拿到锁的线程,每当其释放锁,都会唤醒后面的等待节点并离开队列。被唤醒的节点再次授权尝试tryAcquire(),成功后即可拿到锁。
  上面简单提到了释放锁release(),我们来看一下release()方法的内部流程。
image
  类似于tryAcquire()方法,释放锁也需要AQS的子类来实现tryRelease()方法,实现方式见后文。整个释放锁的流程也相对简单,只需要在tryRelease()之后唤醒下一个等待的线程即可。
  以上就是AQS对于排它锁的实现。此处再介绍一下AQS对休眠和唤醒线程的方式,其对于线程的休眠和唤醒采用的是LockSupport类,而LockSupport类调用的是Unsafe类的方法,如下:

1
2
public native void unpark(Thread jthread);  
public native void park(boolean isAbsolute, long time);

  LockSupport.park/unpark相比于wait/notify的优点在于unpark可以先于park调用,unpark可以对唤醒的线程做标记,当遇到park调用时直接拿到标记而不用等待。
  注意,LockSupport.park也是可能被虚假唤醒的,并且LockSupport.park遇到Thread.interrupt()是会立刻返回的,但这个方法不会抛出InterruptedException。(但是例如上层的ConditonObject会替他抛出)
  其在HotSpot VM的实现,就是标记位+互斥锁+条件变量。在HotSpot VM中,每个Java线程都持有一个互斥锁(pthread_mutex_t)和条件变量(pthread_cond_t),因此对于线程的park就是对线程的条件变量调用pthread_cond_wait,unpark就是对目标线程的条件变量调用pthread_cond_signal。再配合上条件变量所必需的互斥锁(pthread_mutex_lock)即可。

2.2 共享锁

  AQS提供的与共享锁相关的public方法与排它锁很像,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* 
* 获取共享锁,若失败则等待
* 等待时会忽略线程的interrupt
*/
public final void acquireShared(int arg);

/*
* 获取共享锁,若失败则等待
* 若等待时被interrupt则抛出InterruptedException
*/
public final void acquireSharedInterruptibly(int arg);

/*
* 获取共享锁,若失败则等待
* 若等待时被interrupt则抛出InterruptedException
* 若等待时间超过nanosTimeout,则返回false
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout);

/*
* 释放共享锁,可能唤醒多个线程
* 返回值代表是否释放成功,失败代表其他的共享锁尚未释放
*/
public final boolean releaseShared(int arg);

  还是以acquireShared和releaseShared为例,解释一下其内部流程。如下图所示。
  类似于tryAcquire(),共享锁的获取和释放也需要用户自己实现tryAcquireShared()和tryReleaseShared()方法,方法的返回值表示了是否成功获取/释放共享锁。由于共享锁的获取和释放也是一对一的(即使释放失败也是正常情况),因此通常会在其实现里使用计数器来控制释放共享锁的条件。
image

  获取共享锁的流程在排它锁的基础上,增加了唤醒下一个节点的过程。具体来说,对于已放入同步队列的线程,如果位于队列第二位并且tryAcquireShared成功,就会将当前线程设为首节点,然后判断下个节点是否是共享锁(Node会对共享锁置标记位Node.SHARED),若是则唤醒该节点的线程。这个被唤醒的线程会重复执行上述流程,以此逐一唤醒所有等待共享锁的线程。
  一个AQS可以同时实现排它锁和共享锁,他们顺序的排列在同一个同步队列中。不同之处在于共享锁的节点被唤醒时,会继续唤醒后续的共享锁节点,若后续节点是排它锁则停止唤醒。

2.3 条件变量

  除了锁,AQS也提供了条件变量的功能。AQS有一个内部类ConditionObject,它是Condition接口的一个实现类。由于条件变量一定要和互斥锁配合使用,ConditionObject需要配合其外部的AQS类提供的排它锁(acquire/release)。Condition接口提供了如下的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* 
* 使当前线程等待,直到以下4种情况任意一个发生:
* 1.另一个线程调用该对象的signal(),当前线程恰好是被选中的唤醒线程
* 2.另一个线程调用该对象的signalAll()
* 3.另一个线程interrupt当前线程(此时会抛出InterruptedException)
* 4.虚假唤醒(源自操作系统,发生概率低)
* ConditionObject要求调用时该线程已经拿到了其外部AQS类的排它锁(acquire成功)
*/
void await() throws InterruptedException;

/*
* 与await()相同,但是不会被interrupt唤醒
*/
void awaitUninterruptibly();

/*
* 与await()相同,增加了超时时间,超过超时时间也会停止等待
* 三个方法功能相似,其返回值代表剩余的超时时间,或是否超时
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;

/*
* 唤醒一个正在等待该条件变量对象的线程
* ConditionObject会选择等待时间最长的线程来唤醒
* ConditionObject要求调用时该线程已经拿到了其外部AQS类的排它锁(acquire成功)
*/
void signal();

/*
* 唤醒所有正在等待该条件变量对象的线程
* ConditionObject要求调用时该线程已经拿到了其外部AQS类的排它锁(acquire成功)
*/
void signalAll();

  可以看到,其作用与Object原生的wait()/notify()/notifyAll()很相似,但是增加了更多的功能。下面以awaitUninterruptibly()、signal()为例,阐述一下其内部实现。
image
  如上图,await的步骤基本分为 条件队列增加节点→释放锁→休眠→拿回锁,signal的动作基本为条件队列首节点迁移至同步队列末尾。
  条件变量使用了条件队列和同步队列两个队列,他们的节点结构相同,但有着不同的分工。条件队列用于保存正在等待的条件变量,同步队列用于保存等待被唤醒的线程。
  我们知道,无论是Object的wait(),还是操作系统提供的pthread_cond_wait,其内部实现都是释放互斥锁-休眠-获取互斥锁,其中释放-休眠两步需要保证原子性(若中间发生了线程切换,则可能出现signal唤醒时没有休眠的线程)。由于使用的LockSupport支持park/unpark的乱序(见2.1所述),正好避免了上述问题。

3 如何基于AQS实现自己的锁

  再次需要明确的是AQS是一个抽象类,其作用也是供开发者基于其公有方法二次开发。对于开发者,AQS提供了一个需要使用的变量和五个需要实现的方法。
  首先,需要锁的地方一定有资源的竞争,因此AQS提供了一个私有的int变量state,用来抽象地代表锁竞争的资源(例如1表示已被占用,0表示未被占用)。开发者应该使用以下三个相关的protected方法对其操作。

1
2
3
4
private volatile int state;
protected final int getState();
protected final void setState(int newState);
protected final boolean compareAndSetState(int expect, int update);

  尽管AQS的实现逻辑没有使用到state变量,但是在使用条件变量时,会默认将state变量作为release(int arg)的传入参数,而他们又会将参数值传递给用户实现的tryRelease(int arg)方法。因此在实现try系列方法时,往往需要修改state变量值。
  为了扩展使用AQS,使其成为一个同步器类,开发者需要在继承AQS的基础上,按需实现(覆盖)以下五个protected方法:

1
2
3
4
5
protected boolean tryAcquire(int arg);
protected int tryAcquireShared(int arg);
protected boolean tryRelease(int arg);
protected boolean tryReleaseShared(int arg);
protected boolean isHeldExclusively();

3.1 tryAcquire & tryRelease

  这两个方法是实现排它锁必须覆盖的方法,其传入的int参数就是acquire(int)/release(int)的传入参数,其返回值就是是否成功获取/释放锁。
  一个简单的不可重入的排它锁的实现如下,其传入参数int可由开发者自己定义。tryAcquire需要实现的是查询并获取竞争资源,通常由state的CAS操作完成。tryRelease需要实现的是释放竞争资源,通常只需使state变量自减或清零即可。二者的实现内都不应该出现阻塞和等待的情况。开发者通过对state变量的原子操作,实现了竞争资源的获取与释放的逻辑,至于竞争者之间的等待和调度,则由上文所述的AQS内部实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//This is a mutex
protected boolean tryAcquire(int acquires) {
assert acquires == 1; // 定义传入参数只能是1
if (compareAndSetState(0, 1)) {
return true;
}
return false;
}

protected boolean tryRelease(int releases) {
assert releases == 1; // 定义传入参数只能是1
if (getState() == 0) throw new IllegalMonitorStateException();
setState(0);
return true;
}

3.2 tryAcquireShared & tryReleaseShared

  与上述两个方法很类似,这两个方法是实现共享锁必须覆盖的方法。其中tryAcquireShared的返回值为int可视作剩余的共享资源的个数(负数则表示acquire失败),传入参数由开发者定义,可视作想要获取/释放的资源个数。
  这种共享锁可以用来实现读锁、信号量或者同步计数器。下面以一个信号量的实现为例:tryAcquireShared使用了CAS+自旋的方式,因为信号量的数量不止一个,CAS失败并不意味着无法获取信号量。tryReleaseShared同样使用了自旋,以防止高并发下释放信号量失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//This is a semaphore
protected int tryAcquireShared(int acquires) {//non-fair
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}

protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (compareAndSetState(current, next))
return true;
}
}

3.1 isHeldExclusively

  该方法是实现条件变量必须覆盖的方法,其返回值表示了当前线程是否独占了条件变量对应的互斥锁。常见的实现方式如下,其中set/getExclusiveOwnerThread源自AbstractOwnableSynchronizer类,该类是AQS的父类。

1
2
3
4
5
6
7
8
9
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}

protected boolean tryAcquire(int acquires) {
......
setExclusiveOwnerThread(Thread.currentThread());
return true;
}

4 性能

  与synchronized同步块相比,AQS的衍生类通常具有更好的性能和更多的功能,但是从业务代码的可读性和维护性考虑,仍然推荐使用synchronized同步块。
  在无竞争的条件下,二者性能相差不大。在充分竞争的条件下,AQS的非公平互斥锁 明显优于 synchronized同步块 明显优于 AQS的公平互斥锁。具体的性能测试数据见参考文档的原作者论文。

参考文档:

原作者论文,有兴趣深入的建议阅读:
Lea Doug. The java.util.concurrent synchronizer framework
官方文档:
AbstractQueuedSynchronizer (Java Platform SE 7 ) - Oracle Help Center
网络资源:
AbstractQueuedSynchronizer的介绍和原理分析
Java的LockSupport.park()实现分析