1 简介
AbstractQueuedSynchronizer(以下简称AQS)是一个提供了锁和同步功能的抽象类,它为许多Java的并发类提供了同步方式:例如CountDownLatch、ReentrantLock、ReentrantReadWriteLock等。
作为一个没有抽象方法的抽象类,AQS在使用时需要被继承,而且其文档建议通过一个non-public的内部类来继承AQS。因此其定位在于实现一个帮助类,为包含它的类提供同步和锁的相关功能。
本文仅作AQS的简述,以流程图的方式展示其内部实现,并不深究代码及其无锁编程的原理。有兴趣的同学可以仔细对照源码并参考结尾的论文。
在了解怎么使用它之前,我们先看一下它提供了哪些功能以及怎么实现的。
2 提供的功能
2.1 排它锁
AQS提供的与排它锁相关的public方法如下:
1 | /* |
我们选取acquire和release两个方法,来简单阐述一下其内部实现(其他两个acquire方法原理相仿,只是添加了一些功能)。本文只讨论常规流程,具体细节请参照源码。
上图展示了acquire方法的内部流程。为了方便理解,我们首先需要了解上图提到的两个概念:
1.tryAcquire
首先是tryAcquire方法,图中可见该方法在acquire的调用初始和每次唤醒的检测中都会被调用。它定义在AQS类中,其完整声明如下:
1 | protected boolean tryAcquire(int arg); |
该方法内需要查询当前条件是否可以拿到锁,如果可以则获取锁。如果tryAcquire返回true,则说明条件满足并且成功的拿到了锁。
至于用什么条件才可以拿到锁,用什么表示锁的状态,则需要由AQS的使用者来定义。因此AQS并没有实现该方法的具体逻辑,而是需要使用者的子类覆盖这个方法。其具体使用细节后面会提到。
2.同步队列
AQS将等待获取锁的所有线程放入了一个FIFO的双向链表里,该队列是CLH队列的一种变种:即每个节点都有一个表示状态的属性。队列节点的代码和具体属性值如下:
1 | static final class Node { |
1 | //因为超时或者中断,节点会被设置为取消状态。取消状态的节点会被移除队列。 |
对于同步队列,有以下几个特点:
- 每个结点都是由前一个结点唤醒
- 当一个结点发现其前驱结点是head并且tryAcquire()成功,该节点会成为head并继续执行
- 当一个结点的状态为SIGNAL时,表示其后面的结点需要被唤醒
了解了上述两个概念,基本可以了解acquire获取互斥锁的基本流程了:首先会通过tryAcquire()快速判断是否能获取锁(此时有可能刚好抢占到用户的条件,这也是它的非公平之处),若获取失败则加入到同步队列末尾。在同步队列中,通常首节点是已经拿到锁的线程,每当其释放锁,都会唤醒后面的等待节点并离开队列。被唤醒的节点再次授权尝试tryAcquire(),成功后即可拿到锁。
上面简单提到了释放锁release(),我们来看一下release()方法的内部流程。
类似于tryAcquire()方法,释放锁也需要AQS的子类来实现tryRelease()方法,实现方式见后文。整个释放锁的流程也相对简单,只需要在tryRelease()之后唤醒下一个等待的线程即可。
以上就是AQS对于排它锁的实现。此处再介绍一下AQS对休眠和唤醒线程的方式,其对于线程的休眠和唤醒采用的是LockSupport类,而LockSupport类调用的是Unsafe类的方法,如下:
1 | public native void unpark(Thread jthread); |
LockSupport.park/unpark相比于wait/notify的优点在于unpark可以先于park调用,unpark可以对唤醒的线程做标记,当遇到park调用时直接拿到标记而不用等待。
注意,LockSupport.park也是可能被虚假唤醒的,并且LockSupport.park遇到Thread.interrupt()是会立刻返回的,但这个方法不会抛出InterruptedException。(但是例如上层的ConditonObject会替他抛出)
其在HotSpot VM的实现,就是标记位+互斥锁+条件变量。在HotSpot VM中,每个Java线程都持有一个互斥锁(pthread_mutex_t)和条件变量(pthread_cond_t),因此对于线程的park就是对线程的条件变量调用pthread_cond_wait,unpark就是对目标线程的条件变量调用pthread_cond_signal。再配合上条件变量所必需的互斥锁(pthread_mutex_lock)即可。
2.2 共享锁
AQS提供的与共享锁相关的public方法与排它锁很像,如下:
1 | /* |
还是以acquireShared和releaseShared为例,解释一下其内部流程。如下图所示。
类似于tryAcquire(),共享锁的获取和释放也需要用户自己实现tryAcquireShared()和tryReleaseShared()方法,方法的返回值表示了是否成功获取/释放共享锁。由于共享锁的获取和释放也是一对一的(即使释放失败也是正常情况),因此通常会在其实现里使用计数器来控制释放共享锁的条件。
获取共享锁的流程在排它锁的基础上,增加了唤醒下一个节点的过程。具体来说,对于已放入同步队列的线程,如果位于队列第二位并且tryAcquireShared成功,就会将当前线程设为首节点,然后判断下个节点是否是共享锁(Node会对共享锁置标记位Node.SHARED),若是则唤醒该节点的线程。这个被唤醒的线程会重复执行上述流程,以此逐一唤醒所有等待共享锁的线程。
一个AQS可以同时实现排它锁和共享锁,他们顺序的排列在同一个同步队列中。不同之处在于共享锁的节点被唤醒时,会继续唤醒后续的共享锁节点,若后续节点是排它锁则停止唤醒。
2.3 条件变量
除了锁,AQS也提供了条件变量的功能。AQS有一个内部类ConditionObject,它是Condition接口的一个实现类。由于条件变量一定要和互斥锁配合使用,ConditionObject需要配合其外部的AQS类提供的排它锁(acquire/release)。Condition接口提供了如下的功能:
1 | /* |
可以看到,其作用与Object原生的wait()/notify()/notifyAll()很相似,但是增加了更多的功能。下面以awaitUninterruptibly()、signal()为例,阐述一下其内部实现。
如上图,await的步骤基本分为 条件队列增加节点→释放锁→休眠→拿回锁,signal的动作基本为条件队列首节点迁移至同步队列末尾。
条件变量使用了条件队列和同步队列两个队列,他们的节点结构相同,但有着不同的分工。条件队列用于保存正在等待的条件变量,同步队列用于保存等待被唤醒的线程。
我们知道,无论是Object的wait(),还是操作系统提供的pthread_cond_wait,其内部实现都是释放互斥锁-休眠-获取互斥锁,其中释放-休眠两步需要保证原子性(若中间发生了线程切换,则可能出现signal唤醒时没有休眠的线程)。由于使用的LockSupport支持park/unpark的乱序(见2.1所述),正好避免了上述问题。
3 如何基于AQS实现自己的锁
再次需要明确的是AQS是一个抽象类,其作用也是供开发者基于其公有方法二次开发。对于开发者,AQS提供了一个需要使用的变量和五个需要实现的方法。
首先,需要锁的地方一定有资源的竞争,因此AQS提供了一个私有的int变量state,用来抽象地代表锁竞争的资源(例如1表示已被占用,0表示未被占用)。开发者应该使用以下三个相关的protected方法对其操作。
1 | private volatile int state; |
尽管AQS的实现逻辑没有使用到state变量,但是在使用条件变量时,会默认将state变量作为release(int arg)的传入参数,而他们又会将参数值传递给用户实现的tryRelease(int arg)方法。因此在实现try系列方法时,往往需要修改state变量值。
为了扩展使用AQS,使其成为一个同步器类,开发者需要在继承AQS的基础上,按需实现(覆盖)以下五个protected方法:
1 | protected boolean tryAcquire(int arg); |
3.1 tryAcquire & tryRelease
这两个方法是实现排它锁必须覆盖的方法,其传入的int参数就是acquire(int)/release(int)的传入参数,其返回值就是是否成功获取/释放锁。
一个简单的不可重入的排它锁的实现如下,其传入参数int可由开发者自己定义。tryAcquire需要实现的是查询并获取竞争资源,通常由state的CAS操作完成。tryRelease需要实现的是释放竞争资源,通常只需使state变量自减或清零即可。二者的实现内都不应该出现阻塞和等待的情况。开发者通过对state变量的原子操作,实现了竞争资源的获取与释放的逻辑,至于竞争者之间的等待和调度,则由上文所述的AQS内部实现。
1 | //This is a mutex |
3.2 tryAcquireShared & tryReleaseShared
与上述两个方法很类似,这两个方法是实现共享锁必须覆盖的方法。其中tryAcquireShared的返回值为int可视作剩余的共享资源的个数(负数则表示acquire失败),传入参数由开发者定义,可视作想要获取/释放的资源个数。
这种共享锁可以用来实现读锁、信号量或者同步计数器。下面以一个信号量的实现为例:tryAcquireShared使用了CAS+自旋的方式,因为信号量的数量不止一个,CAS失败并不意味着无法获取信号量。tryReleaseShared同样使用了自旋,以防止高并发下释放信号量失败。
1 | //This is a semaphore |
3.1 isHeldExclusively
该方法是实现条件变量必须覆盖的方法,其返回值表示了当前线程是否独占了条件变量对应的互斥锁。常见的实现方式如下,其中set/getExclusiveOwnerThread源自AbstractOwnableSynchronizer类,该类是AQS的父类。
1 | protected boolean isHeldExclusively() { |
4 性能
与synchronized同步块相比,AQS的衍生类通常具有更好的性能和更多的功能,但是从业务代码的可读性和维护性考虑,仍然推荐使用synchronized同步块。
在无竞争的条件下,二者性能相差不大。在充分竞争的条件下,AQS的非公平互斥锁 明显优于 synchronized同步块 明显优于 AQS的公平互斥锁。具体的性能测试数据见参考文档的原作者论文。
参考文档:
原作者论文,有兴趣深入的建议阅读:
Lea Doug. The java.util.concurrent synchronizer framework
官方文档:
AbstractQueuedSynchronizer (Java Platform SE 7 ) - Oracle Help Center
网络资源:
AbstractQueuedSynchronizer的介绍和原理分析
Java的LockSupport.park()实现分析