可重入锁、ReentrantLock、AQS、Condition
本文简单介绍可重入锁的概念,简单介绍java中内置的synchronized锁
和java.util.concurrent.locks.ReentrantLock
,然后大致介绍下ReentrantLock的代码实现。ReentrantLock和AQS代码逻辑真心复杂和难懂,尤其是AQS中Node的ws值中的SIGNAL值的用法和线程park/unpark的用法,持续几周看了好几天终于算是给弄明白了!
可重入锁
可重入锁,这是一个理论范畴的概念,具体可自己查询。意思就是,在当前线程已经获取锁的情况下,可以再度获取锁而不会引起死锁。最好理解的就是java代码中最常见的synchronized锁,它就是一个可重入锁,因为同为synchronized修饰的方法A和B,在A中call B,是没有问题的,从A到B的过程就是锁的重入。
所以很好理解,可重入锁需要在内部维持一个进入次数N和占有锁的线程对象S,当N == 0时表示锁闲置,获取锁时set N = 1并set S = currentThread,每次进入锁N++,每次退出锁N–,直到N == 0锁被释放并set S = null,当然前提所有操作要保证currentThread == S才行。
synchronized锁和ReentrantLock
两者都是可重入锁
区别在于:
synchronized | ReentrantLock |
语法内置,自动加锁和释放锁得到保证 | 手动lock和unlock,unlock需在finally中,否则临界区代码异常会引起死锁 |
非公平锁 | 提供默认的非公平锁和公平锁实现 |
加锁API丰富,lock、tryLock、lockInterruptibly、tryLock(timeout) | |
利用object或者class作为锁本身来wait和notify | 提供Condition机制来await和signal,更灵活和好理解 |
使用难度低,效率偏低 | 使用难度较高,效率更高 |
虽然效率上Reentrantlock偏高,但如果不熟悉可能会引起更多问题,在大多数情况下理应优先使用synchronized,只有在需要利用ReentrantLock的公平机制、其他lock API的情况下,或者对于效率有着非常严格要求的情况下,才推荐使用ReentrantLock。
ReentrantLock与AbstractQueuedSynchronizer
ReentrantLock是AbstractQueuedSynchronizer(AQS)的一个实现(不是子类),绝大部分的实现都在于AQS之内,AQS仅仅暴露了几个API供子类实现,而ReentrantLock仅仅实现了其中两个:
- protected boolean tryAcquire(int arg):尝试加锁
- protected boolean tryRelease(int arg):尝试解锁
AQS内部数据结构
- 静态内部类Node,作为一个双向list的节点,存储有线程对象和一个状态值,以及另外一个变量表明是否为独占
- Node head, tail 节点,维护Node的双向list,此为等待加锁的线程节点Sync列表
- int state值,供子类使用,可表示不同意义;在Reentrantlock中表示重入锁次数
- 内部类ConditionObject,每个ConditionObject内部维护一个Node队列,是为等待此condition的线程队列
AQS中Node的waitStatus(以下简称ws值)状态值
- 0,正常状态,不停地尝试去获取锁
- 1,CANCELLED,表示此节点已被Cancel,退出竞争锁
- -1,SIGNAL,表示当前node在release时后面的节点需要被唤醒
- -2,CONDITION,表示当前node在某一个ConditionObject的等待队列中,不在Sync队列中
- -3,PROPAGATE
AQS中Node的nextWaiter节点
nextWaiter == Node.EXCLUSIVE == null,表明为独占
nextWaiter == Node.SHARED== new Node(),表明为共享
ReentrantLock中的Sync、NonfairSync、FairSync
Reentrantlock中的Sync才是AQS的直接子类,nonfairTryAcquire()方法实现了非公平锁的实现,并实现了tryRelease()方法,基本就是通过CAS原理,通过AQS的state值和ownerThread来实现,很好理解。
NonfairSync又是Sync的子类,tryAcquire()直接调用nonfairTryAcquire()。
FairSync也是是Sync的子类,单独实现tryAcquire(),比nonfairTryAcquire()多了一个isFirst(current)的判断,就保证了公平性。后面会具体讲。
AQS中对线程的park和unpark
利用unsafe实现的,对线程挂起和唤醒的操作。
有2个地方会引起线程park
- ConditionObject.await方法,使得当前线程节点进入condition队列
- AQS.acquireQueued的循环中,当前线程节点的prev节点的ws值为SIGNAL
有2个地方会引起线程unpark
- ConditionObject的doSignal方法,使得线程节点从condition队列移除,添加到Sync队列,若prev节点被Cancel了或者设置prev.ws为SIGNAL失败,则立即激活本节点线程
- 当前线程节点的prev节点线程抢占锁并release,且prev.ws == SIGNAL,唤醒next节点
ReentrantLock的代码实现
此部分内容,不考虑cancelAcquire操作、其他lock操作和线程中断等复杂情况,以最基本的逻辑讲述。
非公平锁的lock
- 新的线程,在NonfairSync.lock中直接尝试CAS加锁,成功则返回
- 若失败,call AQS.acquire(1),1表示进入次数加1
- AQS.acquire:!tryAcquire(n) && acquireQueued(addWaiter(Node.EXCLUSIVE), n)
- 调用子类的tryAcquire再试一遍加锁:即Sync.nonfairTryAcquire(),通过CAS尝试加锁或者计算重进入,成功则返回
- 若失败,回到AQS.acquire,执行acquireQueued(addWaiter(Node.EXCLUSIVE), n)
- AQS.addWaiter,新建Node节点(独占),通过AQS.enq方法无限循环保证(阻塞式地)添加到Sync队列中
- AQS.acquireQueued,无限循环开始:判断节点是否为Sync队列中的first有效节点并call tryAcquire尝试加锁,若成功,移除原head,设置新head = first,返回
- 循环中:若失败,call AQS.shouldParkAfterFailedAcquire判断node是否需要被park,然后park当前线程
- 循环结束,回到步骤7
- 第1、4步中可以看到,新的线程尝试加锁可以无视Sync队列中等待的其他线程直接尝试加锁(而且代码更简单效率更高,如下图中的黄色快速通道所示),此为非公平性原理
- 第7步中得知,在Sync队列中等待的线程,是严格按照先后顺序获取锁的。所以是非严格的非公平性。
- 第8步中shouldParkAfterFailedAcquire的原理,若node.prev的ws为SIGNAL,返回true; 否则,尝试set node.prev.ws = SIGNAL并返回false(在下一次的循环中,就可能会返回true了)
公平锁的lock
和非公平锁的区别就在于
- 没有第1步,直接进入AQS.acquire(1)
- FairSync.tryAcquire中,需要额外判断Sync队列为空或当前线程是Sync队列中的first才尝试CAS加锁,成功则返回
- 从第5步开始完全一致
取消非公平锁的第1步,调整第4步,就保证了严格按照Sync队列来获得锁,即为公平性。
unlock
公平锁和非公平锁的unlock操作是一样的,通过调用AQS.release(1)实现
- call Sync.tryRelease,就是通过CAS将state – 1,若最后state > 0,锁还未释放,返回
- 若最后state == 0(锁被释放),若head节点(上一个加锁成功的节点)的ws值 != 0(就是为SIGNAL的情况下), call AQS.unparkSuccessor(head)
- AQS.unparkSuccessor(node):先CAS设置node.ws = 0
- 在node节点后找到第一个没有被cancel(ws值 <= 0)的节点,尝试将其unpark(这里要unpark的next线程节点,不一定真的被park了,就是保证next节点的线程可以work来抢锁)
AQS中Condition的实现
condition被新建时,就在内部维护了一个队列
condition.await
- addConditionWaiter(),将当前线程新建为Node,设置ws值为CONDITION,添加到condition的队列中
- call AQS.fullyRelease,将当前线程获得的锁完全释放(即无论重入几次,state之间减为0)
- 无限循环开始:当node.ws == CONDITION或node不在Sync队列中,park线程
- 等待condition被其他线程signal或signalAll了,节点ws值被修改且被移回Sync队列中(循环条件被打破),但此时线程仍在park中
- 在Sync队列中,node.prev得到锁并释放后,node节点线程被unpark
- 跳出循环,call acquireQueued(),进入正常锁竞争逻辑
condition.signal和condition.signalAll
将condition的队列中的first节点或者全部节点,调用condition.doSignal(),然后就是调用AQS.transferForSignal()
- 首先将本节点的ws从CONDITION设为0
- call AQS.enq,将节点加入到Sync队列中
- 判断node.prev.ws值为CANCELLED,或者设置node.prev.ws = SIGNAL失败,直接将本节点线程unpark
经过这3步,就可以打破await中的循环条件,然后当node.prev得到锁并释放后,node节点线程被unpark