可重入锁、ReentrantLock、AQS、Condition

六月 13th, 2016 1,357 留下评论 阅读评论

本文简单介绍可重入锁的概念,简单介绍java中内置的synchronized锁java.util.concurrent.locks.ReentrantLock,然后大致介绍下ReentrantLock的代码实现。ReentrantLock和AQS代码逻辑真心复杂和难懂,尤其是AQS中Node的ws值中的SIGNAL值的用法和线程park/unpark的用法,持续几周看了好几天终于算是给弄明白了!

可重入锁

可重入锁,这是一个理论范畴的概念,具体可自己查询。意思就是,在当前线程已经获取锁的情况下,可以再度获取锁而不会引起死锁。最好理解的就是java代码中最常见的synchronized锁,它就是一个可重入锁,因为同为synchronized修饰的方法A和B,在A中call B,是没有问题的,从A到B的过程就是锁的重入。

所以很好理解,可重入锁需要在内部维持一个进入次数N和占有锁的线程对象S,当N == 0时表示锁闲置,获取锁时set N = 1并set S = currentThread,每次进入锁N++,每次退出锁N–,直到N == 0锁被释放并set S = null,当然前提所有操作要保证currentThread == S才行。

synchronized锁和ReentrantLock

两者都是可重入锁

区别在于:

synchronized ReentrantLock
语法内置,自动加锁和释放锁得到保证 手动lock和unlock,unlock需在finally中,否则临界区代码异常会引起死锁
非公平锁 提供默认的非公平锁和公平锁实现
加锁API丰富,lock、tryLock、lockInterruptibly、tryLock(timeout)
利用object或者class作为锁本身来wait和notify 提供Condition机制来await和signal,更灵活和好理解
使用难度低,效率偏低 使用难度较高,效率更高

虽然效率上Reentrantlock偏高,但如果不熟悉可能会引起更多问题,在大多数情况下理应优先使用synchronized,只有在需要利用ReentrantLock的公平机制、其他lock API的情况下,或者对于效率有着非常严格要求的情况下,才推荐使用ReentrantLock。

ReentrantLock与AbstractQueuedSynchronizer

ReentrantLock是AbstractQueuedSynchronizer(AQS)的一个实现(不是子类),绝大部分的实现都在于AQS之内,AQS仅仅暴露了几个API供子类实现,而ReentrantLock仅仅实现了其中两个:

  • protected boolean tryAcquire(int arg):尝试加锁
  • protected boolean tryRelease(int arg):尝试解锁

AQS内部数据结构

  • 静态内部类Node,作为一个双向list的节点,存储有线程对象和一个状态值,以及另外一个变量表明是否为独占
  • Node head, tail 节点,维护Node的双向list,此为等待加锁的线程节点Sync列表
  • int state值,供子类使用,可表示不同意义;在Reentrantlock中表示重入锁次数
  • 内部类ConditionObject,每个ConditionObject内部维护一个Node队列,是为等待此condition的线程队列

AQS中Node的waitStatus(以下简称ws值)状态值

  • 0,正常状态,不停地尝试去获取锁
  • 1,CANCELLED,表示此节点已被Cancel,退出竞争锁
  • -1,SIGNAL,表示当前node在release时后面的节点需要被唤醒
  • -2,CONDITION,表示当前node在某一个ConditionObject的等待队列中,不在Sync队列中
  • -3,PROPAGATE

AQS中Node的nextWaiter节点

nextWaiter == Node.EXCLUSIVE == null,表明为独占

nextWaiter == Node.SHARED== new Node(),表明为共享

ReentrantLock中的Sync、NonfairSync、FairSync

Reentrantlock中的Sync才是AQS的直接子类,nonfairTryAcquire()方法实现了非公平锁的实现,并实现了tryRelease()方法,基本就是通过CAS原理,通过AQS的state值和ownerThread来实现,很好理解。

NonfairSync又是Sync的子类,tryAcquire()直接调用nonfairTryAcquire()。

FairSync也是是Sync的子类,单独实现tryAcquire(),比nonfairTryAcquire()多了一个isFirst(current)的判断,就保证了公平性。后面会具体讲。

AQS中对线程的park和unpark

利用unsafe实现的,对线程挂起和唤醒的操作。

有2个地方会引起线程park

  1. ConditionObject.await方法,使得当前线程节点进入condition队列
  2. AQS.acquireQueued的循环中,当前线程节点的prev节点的ws值为SIGNAL

有2个地方会引起线程unpark

  1. ConditionObject的doSignal方法,使得线程节点从condition队列移除,添加到Sync队列,若prev节点被Cancel了或者设置prev.ws为SIGNAL失败,则立即激活本节点线程
  2. 当前线程节点的prev节点线程抢占锁并release,且prev.ws == SIGNAL,唤醒next节点

ReentrantLock的代码实现

此部分内容,不考虑cancelAcquire操作、其他lock操作和线程中断等复杂情况,以最基本的逻辑讲述。

非公平锁的lock

  1. 新的线程,在NonfairSync.lock中直接尝试CAS加锁,成功则返回
  2. 若失败,call AQS.acquire(1),1表示进入次数加1
  3. AQS.acquire:!tryAcquire(n) && acquireQueued(addWaiter(Node.EXCLUSIVE), n)
  4. 调用子类的tryAcquire再试一遍加锁:即Sync.nonfairTryAcquire(),通过CAS尝试加锁或者计算重进入,成功则返回
  5. 若失败,回到AQS.acquire,执行acquireQueued(addWaiter(Node.EXCLUSIVE), n)
  6. AQS.addWaiter,新建Node节点(独占),通过AQS.enq方法无限循环保证(阻塞式地)添加到Sync队列中
  7. AQS.acquireQueued,无限循环开始:判断节点是否为Sync队列中的first有效节点并call tryAcquire尝试加锁,若成功,移除原head,设置新head = first,返回
  8. 循环中:若失败,call AQS.shouldParkAfterFailedAcquire判断node是否需要被park,然后park当前线程
  9. 循环结束,回到步骤7
  • 第1、4步中可以看到,新的线程尝试加锁可以无视Sync队列中等待的其他线程直接尝试加锁(而且代码更简单效率更高,如下图中的黄色快速通道所示),此为非公平性原理
AQS中的同步队列与加锁通道

AQS中的同步队列与加锁通道

  • 第7步中得知,在Sync队列中等待的线程,是严格按照先后顺序获取锁的。所以是非严格的非公平性。
  • 第8步中shouldParkAfterFailedAcquire的原理,若node.prev的ws为SIGNAL,返回true; 否则,尝试set node.prev.ws = SIGNAL并返回false(在下一次的循环中,就可能会返回true了)

公平锁的lock

和非公平锁的区别就在于

  1. 没有第1步,直接进入AQS.acquire(1)
  2. FairSync.tryAcquire中,需要额外判断Sync队列为空或当前线程是Sync队列中的first才尝试CAS加锁,成功则返回
  3. 从第5步开始完全一致

取消非公平锁的第1步,调整第4步,就保证了严格按照Sync队列来获得锁,即为公平性。

unlock

公平锁和非公平锁的unlock操作是一样的,通过调用AQS.release(1)实现

  1. call Sync.tryRelease,就是通过CAS将state – 1,若最后state > 0,锁还未释放,返回
  2. 若最后state == 0(锁被释放),若head节点(上一个加锁成功的节点)的ws值 != 0(就是为SIGNAL的情况下), call AQS.unparkSuccessor(head)
  3. AQS.unparkSuccessor(node):先CAS设置node.ws = 0
  4. 在node节点后找到第一个没有被cancel(ws值 <= 0)的节点,尝试将其unpark(这里要unpark的next线程节点,不一定真的被park了,就是保证next节点的线程可以work来抢锁)

AQS中Condition的实现

condition被新建时,就在内部维护了一个队列

condition.await

  1. addConditionWaiter(),将当前线程新建为Node,设置ws值为CONDITION,添加到condition的队列中
  2. call AQS.fullyRelease,将当前线程获得的锁完全释放(即无论重入几次,state之间减为0)
  3. 无限循环开始:当node.ws == CONDITION或node不在Sync队列中,park线程
  4. 等待condition被其他线程signal或signalAll了,节点ws值被修改且被移回Sync队列中(循环条件被打破),但此时线程仍在park中
  5. 在Sync队列中,node.prev得到锁并释放后,node节点线程被unpark
  6. 跳出循环,call acquireQueued(),进入正常锁竞争逻辑

condition.signal和condition.signalAll

将condition的队列中的first节点或者全部节点,调用condition.doSignal(),然后就是调用AQS.transferForSignal()

  1. 首先将本节点的ws从CONDITION设为0
  2. call AQS.enq,将节点加入到Sync队列中
  3. 判断node.prev.ws值为CANCELLED,或者设置node.prev.ws = SIGNAL失败,直接将本节点线程unpark

经过这3步,就可以打破await中的循环条件,然后当node.prev得到锁并释放后,node节点线程被unpark

Categories: Java 标签:,
  1. 四月 1st, 2019 03:59

    Propecia Apotek Cialis Generico Acquisto In Italia Zithromax For Prostatitis cialis 40 mg Allergy Amoxicillin Doxycycline Propecia Overdose Hair Loss Best Buy Isotretinoin Skin Health No Doctor London

  2. 三月 15th, 2019 21:12

    Generic Cialis Professional Cephalexin 11 Pound Puppy buy generic cialis online Generic Viagra Lowest Prices Keflex Erythromycin Prostate Dog Cephalexin 500mg Recall

icon_wink.gif icon_neutral.gif icon_mad.gif icon_twisted.gif icon_smile.gif icon_eek.gif icon_sad.gif icon_rolleyes.gif icon_razz.gif icon_redface.gif icon_surprised.gif icon_mrgreen.gif icon_lol.gif icon_idea.gif icon_biggrin.gif icon_evil.gif icon_cry.gif icon_cool.gif icon_arrow.gif icon_confused.gif icon_question.gif icon_exclaim.gif