可重入锁、ReentrantLock、AQS、Condition

6月 13th, 2016 2,725 留下评论 阅读评论

本文简单介绍可重入锁的概念,简单介绍java中内置的synchronized锁java.util.concurrent.locks.ReentrantLock,然后大致介绍下ReentrantLock的代码实现。ReentrantLock和AQS代码逻辑真心复杂和难懂,尤其是AQS中Node的ws值中的SIGNAL值的用法和线程park/unpark的用法,持续几周看了好几天终于算是给弄明白了!

可重入锁

可重入锁,这是一个理论范畴的概念,具体可自己查询。意思就是,在当前线程已经获取锁的情况下,可以再度获取锁而不会引起死锁。最好理解的就是java代码中最常见的synchronized锁,它就是一个可重入锁,因为同为synchronized修饰的方法A和B,在A中call B,是没有问题的,从A到B的过程就是锁的重入。

所以很好理解,可重入锁需要在内部维持一个进入次数N和占有锁的线程对象S,当N == 0时表示锁闲置,获取锁时set N = 1并set S = currentThread,每次进入锁N++,每次退出锁N–,直到N == 0锁被释放并set S = null,当然前提所有操作要保证currentThread == S才行。

synchronized锁和ReentrantLock

两者都是可重入锁

区别在于:

synchronized ReentrantLock
语法内置,自动加锁和释放锁得到保证 手动lock和unlock,unlock需在finally中,否则临界区代码异常会引起死锁
非公平锁 提供默认的非公平锁和公平锁实现
加锁API丰富,lock、tryLock、lockInterruptibly、tryLock(timeout)
利用object或者class作为锁本身来wait和notify 提供Condition机制来await和signal,更灵活和好理解
使用难度低,效率偏低 使用难度较高,效率更高

虽然效率上Reentrantlock偏高,但如果不熟悉可能会引起更多问题,在大多数情况下理应优先使用synchronized,只有在需要利用ReentrantLock的公平机制、其他lock API的情况下,或者对于效率有着非常严格要求的情况下,才推荐使用ReentrantLock。

ReentrantLock与AbstractQueuedSynchronizer

ReentrantLock是AbstractQueuedSynchronizer(AQS)的一个实现(不是子类),绝大部分的实现都在于AQS之内,AQS仅仅暴露了几个API供子类实现,而ReentrantLock仅仅实现了其中两个:

  • protected boolean tryAcquire(int arg):尝试加锁
  • protected boolean tryRelease(int arg):尝试解锁

AQS内部数据结构

  • 静态内部类Node,作为一个双向list的节点,存储有线程对象和一个状态值,以及另外一个变量表明是否为独占
  • Node head, tail 节点,维护Node的双向list,此为等待加锁的线程节点Sync列表
  • int state值,供子类使用,可表示不同意义;在Reentrantlock中表示重入锁次数
  • 内部类ConditionObject,每个ConditionObject内部维护一个Node队列,是为等待此condition的线程队列

AQS中Node的waitStatus(以下简称ws值)状态值

  • 0,正常状态,不停地尝试去获取锁
  • 1,CANCELLED,表示此节点已被Cancel,退出竞争锁
  • -1,SIGNAL,表示当前node在release时后面的节点需要被唤醒
  • -2,CONDITION,表示当前node在某一个ConditionObject的等待队列中,不在Sync队列中
  • -3,PROPAGATE

AQS中Node的nextWaiter节点

nextWaiter == Node.EXCLUSIVE == null,表明为独占

nextWaiter == Node.SHARED== new Node(),表明为共享

ReentrantLock中的Sync、NonfairSync、FairSync

Reentrantlock中的Sync才是AQS的直接子类,nonfairTryAcquire()方法实现了非公平锁的实现,并实现了tryRelease()方法,基本就是通过CAS原理,通过AQS的state值和ownerThread来实现,很好理解。

NonfairSync又是Sync的子类,tryAcquire()直接调用nonfairTryAcquire()。

FairSync也是是Sync的子类,单独实现tryAcquire(),比nonfairTryAcquire()多了一个isFirst(current)的判断,就保证了公平性。后面会具体讲。

AQS中对线程的park和unpark

利用unsafe实现的,对线程挂起和唤醒的操作。

有2个地方会引起线程park

  1. ConditionObject.await方法,使得当前线程节点进入condition队列
  2. AQS.acquireQueued的循环中,当前线程节点的prev节点的ws值为SIGNAL

有2个地方会引起线程unpark

  1. ConditionObject的doSignal方法,使得线程节点从condition队列移除,添加到Sync队列,若prev节点被Cancel了或者设置prev.ws为SIGNAL失败,则立即激活本节点线程
  2. 当前线程节点的prev节点线程抢占锁并release,且prev.ws == SIGNAL,唤醒next节点

ReentrantLock的代码实现

此部分内容,不考虑cancelAcquire操作、其他lock操作和线程中断等复杂情况,以最基本的逻辑讲述。

非公平锁的lock

  1. 新的线程,在NonfairSync.lock中直接尝试CAS加锁,成功则返回
  2. 若失败,call AQS.acquire(1),1表示进入次数加1
  3. AQS.acquire:!tryAcquire(n) && acquireQueued(addWaiter(Node.EXCLUSIVE), n)
  4. 调用子类的tryAcquire再试一遍加锁:即Sync.nonfairTryAcquire(),通过CAS尝试加锁或者计算重进入,成功则返回
  5. 若失败,回到AQS.acquire,执行acquireQueued(addWaiter(Node.EXCLUSIVE), n)
  6. AQS.addWaiter,新建Node节点(独占),通过AQS.enq方法无限循环保证(阻塞式地)添加到Sync队列中
  7. AQS.acquireQueued,无限循环开始:判断节点是否为Sync队列中的first有效节点并call tryAcquire尝试加锁,若成功,移除原head,设置新head = first,返回
  8. 循环中:若失败,call AQS.shouldParkAfterFailedAcquire判断node是否需要被park,然后park当前线程
  9. 循环结束,回到步骤7
  • 第1、4步中可以看到,新的线程尝试加锁可以无视Sync队列中等待的其他线程直接尝试加锁(而且代码更简单效率更高,如下图中的黄色快速通道所示),此为非公平性原理
AQS中的同步队列与加锁通道

AQS中的同步队列与加锁通道

  • 第7步中得知,在Sync队列中等待的线程,是严格按照先后顺序获取锁的。所以是非严格的非公平性。
  • 第8步中shouldParkAfterFailedAcquire的原理,若node.prev的ws为SIGNAL,返回true; 否则,尝试set node.prev.ws = SIGNAL并返回false(在下一次的循环中,就可能会返回true了)

公平锁的lock

和非公平锁的区别就在于

  1. 没有第1步,直接进入AQS.acquire(1)
  2. FairSync.tryAcquire中,需要额外判断Sync队列为空或当前线程是Sync队列中的first才尝试CAS加锁,成功则返回
  3. 从第5步开始完全一致

取消非公平锁的第1步,调整第4步,就保证了严格按照Sync队列来获得锁,即为公平性。

unlock

公平锁和非公平锁的unlock操作是一样的,通过调用AQS.release(1)实现

  1. call Sync.tryRelease,就是通过CAS将state – 1,若最后state > 0,锁还未释放,返回
  2. 若最后state == 0(锁被释放),若head节点(上一个加锁成功的节点)的ws值 != 0(就是为SIGNAL的情况下), call AQS.unparkSuccessor(head)
  3. AQS.unparkSuccessor(node):先CAS设置node.ws = 0
  4. 在node节点后找到第一个没有被cancel(ws值 <= 0)的节点,尝试将其unpark(这里要unpark的next线程节点,不一定真的被park了,就是保证next节点的线程可以work来抢锁)

AQS中Condition的实现

condition被新建时,就在内部维护了一个队列

condition.await

  1. addConditionWaiter(),将当前线程新建为Node,设置ws值为CONDITION,添加到condition的队列中
  2. call AQS.fullyRelease,将当前线程获得的锁完全释放(即无论重入几次,state之间减为0)
  3. 无限循环开始:当node.ws == CONDITION或node不在Sync队列中,park线程
  4. 等待condition被其他线程signal或signalAll了,节点ws值被修改且被移回Sync队列中(循环条件被打破),但此时线程仍在park中
  5. 在Sync队列中,node.prev得到锁并释放后,node节点线程被unpark
  6. 跳出循环,call acquireQueued(),进入正常锁竞争逻辑

condition.signal和condition.signalAll

将condition的队列中的first节点或者全部节点,调用condition.doSignal(),然后就是调用AQS.transferForSignal()

  1. 首先将本节点的ws从CONDITION设为0
  2. call AQS.enq,将节点加入到Sync队列中
  3. 判断node.prev.ws值为CANCELLED,或者设置node.prev.ws = SIGNAL失败,直接将本节点线程unpark

经过这3步,就可以打破await中的循环条件,然后当node.prev得到锁并释放后,node节点线程被unpark

Categories: Java 标签:,
  1. 还没有评论呢。