【源码】【Java并发】【AQS】从ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier看AQS源码
👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD
🔥 2025本人正在沉淀中… 博客更新速度++
👍 欢迎点赞、收藏、关注,跟上我的更新节奏
📚欢迎订阅专栏,专栏名《在2B工作中寻求并发是否搞错了什么》
前言
经过上一篇的学习,我们知道了。AQS的基本原理和使用。
【Java并发】【AQS】适合初学者体质的AQS入门
主播觉得,AQS的原理,就是通过这2个队列的协助,实现核心功能,同步队列(CLH队列)和条件队列(Condition队列)。
同步队列(CLH队列)
- 作用:管理需要获取锁的线程。当多个线程竞争共享资源时,未获取到锁的线程会被封装成节点,按FIFO顺序加入阻塞队列,等待唤醒后重新尝试获取锁。
- 解决的问题:
实现锁的公平性和线程排队机制。通过CLH队列,AQS可以按顺序唤醒线程(如公平锁),避免线程无休止自旋竞争资源,减少CPU开销。
条件队列(Condition队列)
- 作用:管理等待特定条件的线程。当线程调用
Condition.await()
时,会释放锁并进入条件队列;当其他线程调用Condition.signal()
时,条件队列中的线程会被转移到阻塞队列,重新参与锁竞争。 - 解决的问题:
实现线程间的精细化协作(如生产者-消费者模式)。例如:
-
- 生产者线程在队列满时,通过条件队列挂起,而非占用锁空等。
- 消费者线程消费数据后,通过
signal()
唤醒生产者,解耦等待条件与锁竞争。
下面,主播会通过ReentrantLock、Semaphore、CutDownLunch、CyclicBarrier的源码的角度来分析。它们都是基于AQS的实现。带大家看看AQS到底有啥?
以ReentrantLock的角度看AQS独占实现
ReentrantLock的简单使用
简单看下ReentrantLock怎么使用的独占锁。
public class SimpleLockDemo {static ReentrantLock lock = new ReentrantLock(); // 1. 创建锁对象static int count = 0; // 共享资源public static void main(String[] args) throws InterruptedException {Runnable task = () -> {lock.lock(); // 2. 加锁try {count++; // 3. 操作共享资源} finally {lock.unlock(); // 4. 解锁(必须执行)}};Thread t1 = new Thread(task);Thread t2 = new Thread(task);t1.start();t2.start();t1.join();t2.join();System.out.println("结果: " + count); // 输出 2}
}
ReentrantLock独占锁源码分析
这里多说下ReentrantLock
,公平锁和非公平锁吧!
其实非公平就是多了一步,setExclusiveOwnerThread
将当前线程所有者改为当前线程。
这个exclusiveOwnerThread
字段,是AQS继承AbstractOwnableSynchronizer
来的字段。
state
字段是AQS定义的。在ReentrantLock
中,这个state
,0
就是没有线程获锁,1
就是有线程获取到锁。
// 非公平锁 ReentrantLock.NonfairSync
final void lock() {// 尝试获取锁,将state由0改1if (compareAndSetState(0, 1))// 把锁的线程的所有者exclusiveOwnerThread字段,设置为当前线程setExclusiveOwnerThread(Thread.currentThread()); elseacquire(1); // 抢锁失败,放入阻塞队列
}// AbstractOwnableSynchronizer#setExclusiveOwnerThread
protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;
}// 公平锁 ReentrantLock.FairSync
final void lock() {acquire(1);
}
下面是AQS的独占锁具体逻辑:
首先是执行子类(ReentrantLock
)的实现:
tryAcquire
方法,尝试再获取锁1次。addWaiter
将当前线程封装为Node,加入CLH队列。acquireQueued
将线程挂起。
public final void acquire(int arg) {if (!tryAcquire(arg) && // 1.tryAcquire调用子类实现acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // AQS实现,加入同步队列等待selfInterrupt(); // 线程中断复位
}
tryAcquire
tryAcquire
方法,由于子类具体实现,下面是公平锁的实现源码:
// 不公平的实现
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}// nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {// 获取当前线程final Thread current = Thread.currentThread();// 获取当前状态,ReentrantLock中 0是锁没有被抢占,1是已经被其他线程抢占了int c = getState();// 如果锁没有被抢占if (c == 0) {// cas尝试抢占,state由0改到1if (compareAndSetState(0, acquires)) {// 把锁的线程的所有者exclusiveOwnerThread字段,设置为当前线程setExclusiveOwnerThread(current);return true;}}// 可重入锁的情况,如果持有锁的线程为当前线程else if (current == getExclusiveOwnerThread()) { // 线程重入数量+1,ReentrantLock独占,这里的acquires就是1int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");// 修改state的状态setState(nextc);return true;}return false;
}
公平锁的实现,和非公平不一样的地方在hasQueuedPredecessors
方法这里,hasQueuedPredecessors
方法的作用是判断当前线程是否排队等待获取锁
。
🍪这里我们不展开说,主播把大家当初学者来看,现在你并不知道同步队列的结构,主播会在下面的Semaphore
源码分析的时候,再说这个东西。你现在只需要知道这个方法是用来判断当前线程是否排队等待获取锁
。
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 当前线程不需要等待获取锁 且 cas获取锁成功if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {// 把锁的线程的所有者exclusiveOwnerThread字段,设置为当前线程setExclusiveOwnerThread(current);return true;
...
addWaiter
创建一个独占的Node,并将它放入同步队列中。
// 独占模式这里是mode是Node.EXCLUSIVE。值为null
private Node addWaiter(Node mode) {// 创建1个独占模式的 Node。在同步队列中nextWaiter字段用来区分是独占还是共享模式。// waitStatus初始值就是0,Node.EXCLUSIVE值为nullNode node = new Node(Thread.currentThread(), mode);// pred赋值为当前同步队列的tailNode pred = tail;// 如果当前同步队列有tail(就是已经构建过同步队列了)if (pred != null) {// 当前要加入同步队列Node的前序,指向同步队列的尾部node.prev = pred;// cas将同步队列的tail赋值为当前要加入的Nodeif (compareAndSetTail(pred, node)) {// 同步队列的tail的下一个Node赋值为当前要加入的Nodepred.next = node;return node;}}// 没有构建过同步队列,node入队enq(node);return node;
}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;
}
enq方法
private Node enq(final Node node) {// cas修改for (;;) {// 临时Node赋值为tail节点Node t = tail;// 当前同步队列没有tail节点(说明没有初始化过)if (t == null) { // cas将head节点设置为新创建的节点(注意,这里是new的Node,不是入参的Node)if (compareAndSetHead(new Node()))tail = head; // 将tail赋值为head} else {// ====== 下面是同步队列初始化过的逻辑 ========// 要加入同步队列的node的prev设置为tail节点node.prev = t;// cas将tail节点,由t设置为要加入同步队列的nodeif (compareAndSetTail(t, node)) {// tail节点的下一个节点赋值为当前要加入node的节点t.next = node;return t;}}}
}
acquireQueued
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获取当前node的前一个nodefinal Node p = node.predecessor();// 前一个node为头节点 且 再次尝试获取1次成功if (p == head && tryAcquire(arg)) {// 当前要加入的节点设置为头节点setHead(node);// 前一个node的next设置为null(方便当前node的前驱被gc回收)p.next = null; failed = false;return interrupted;}// 抢占失败,判断是否要将线程挂起if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node); // 因为抢占异常,将等待状态设置为CANCELLED}
}// setHead方法
private void setHead(Node node) {// 同步队列头节点设置为当前要加入的nodehead = node;// 当前要加入的节点的线程,设置为nullnode.thread = null;// 当前要加入的节点的前驱,设置为nullnode.prev = null;
}
下面是判断是否要将线程挂起shouldParkAfterFailedAcquire
,和具体挂起线程parkAndCheckInterrupt
的代码方法。
// 判断是否要将线程挂起
// 入参:
// - pred:当前要加入的CLH队列节点的前驱节点,下面会称为前驱节点
// - node:当前要加入的CLH队列的节点,下面会称为当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 前驱节点的等待状态int ws = pred.waitStatus;// 判断当前节点,是否为Node.SIGNAL(值为-1)// Node.SIGNAL表示,pred节点释放后,会通知node,当前线程可以安心的挂起。if (ws == Node.SIGNAL)return true;// 当前节点的状态为CANCELLED(值为1),说明前驱节点已因超时/中断被取消if (ws > 0) {// 前驱节点向前找,将当前节点的前驱设置为,不为CANCELLED状态的节点。do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);// 前驱节点的后继节点设置为当前节点。pred.next = node;} else {// cas将前驱节点的等待状态设置为Node.SIGNAL。compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}// 中断线程
private final boolean parkAndCheckInterrupt() {// 将当前线程挂起LockSupport.park(this);// 唤醒后执行下面的代码return Thread.interrupted();
}
什么情况下,节点的等待状态会变成CANCELLED呢?
- 线程被中断:当线程在
acquire
过程中被中断(调用Thread.interrupt()
),会触发cancelAcquire
方法将节点状态设为CANCELLED
- 超时未获锁:在
doAcquireNanos
等带超时的获取方法中,若超时仍未获得锁,会通过cancelAcquire
标记为取消 - 节点失效处理:在
shouldParkAfterFailedAcquire
中,若发现前驱节点是CANCELLED
状态,会主动跳过这些失效节点
ReentrantLock释放资源分析
ReentrantLock释放资源的入口unlock方法,调用
// ReentrantLock#unlock
public void unlock() {sync.release(1);
}// AQS#release
public final boolean release(int arg) {// 调用ReentrantLock.Sync子类实现的tryRelease方法if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
因为这篇是AQS源码阅读,这里我们简单看ReentrantLock如何tryRelease方法的
// ReentrantLock.Sync#tryRelease
// 入参 releases = 1
protected final boolean tryRelease(int releases) {// 这c,不一定是0,因为锁是可重入的,每次重入state+1int c = getState() - releases;// 只有持有锁的线程,才能释放锁if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();// free表示是否是否成功,只有c=0的时候才算释放成功boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
所以有lock
一定要有对应的unlock
来减少state数量,不然就线程安全问题了💀。下面是可重入锁,但是没有unlock释放锁,导致线程获取不到锁。
public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();for (int i = 0; i < 5; i++) {new Thread(() -> {lock.lock();try {System.out.println(Thread.currentThread().getName() + ",获取到锁了。");Thread.sleep(1000);lock.lock();System.out.println("重入获取锁");// 缺少unlock} catch (Exception e) {throw new RuntimeException(e);} finally {lock.unlock();}}).start();}
}// 输出结果
Thread-0,获取到锁了。
重入获取锁
好了,主播好像又说了一堆别的内容,现在继续来说说AQS那块是怎么实现释放资源的吧 !
省流版就是:
- 获取同步队列中的头节点。
- 如果有头节点且等待状态不为0,就唤醒头结点后续的节点。
public final boolean release(int arg) {if (tryRelease(arg)) {// 获取同步队列的head节点Node h = head;// 同步队列的头节点不为空且头节节点等待状态不为0if (h != null && h.waitStatus != 0)unparkSuccessor(h); // 唤醒head节点return true;}return false;
}
unparkSuccessor做了什么呢?省流来咯:
- CAS将头节点的等待状态改为0。
- 唤醒同步队列中最先进入不为CANCELLED的节点。
// 这里的入参node是头节点,为了方便理解下面都说头节点。
private void unparkSuccessor(Node node) {int ws = node.waitStatus;// 将头节点的等待状态设置0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 获取头节点的后继节点Node s = node.next;// 头节点没有后继节点 或 头节点的等待状态大于0(CANCELLED)// 那就从尾节点向前,不断找不最前面,不为CANCELLED的节点,并赋值给sif (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// s不空就唤醒s节点的线程if (s != null)LockSupport.unpark(s.thread);
}
以为Semaphore的角度看AQS共享锁实现
Semaphore的简单使用
Semaphore可以用来控制资源并发访问数量,可以用来做限流,下面的代码例子,我们限制每次只能由2个线程来获取共享资源。
public static void main(String[] args) {Semaphore semaphore = new Semaphore(2);for (int i = 0; i < 5; i++) {new Thread(() -> {try {// 访问共享资源semaphore.acquire();// 模拟执行业务时间2sThread.sleep(2000);System.out.println("Thread " + Thread.currentThread().getName() + " 获取到共享资源");} catch (InterruptedException e) {throw new RuntimeException(e);} finally {semaphore.release();}}).start();}
}
输出结果
Thread Thread-0 获取到共享资源
Thread Thread-1 获取到共享资源
Thread Thread-2 获取到共享资源
Thread Thread-4 获取到共享资源
Thread Thread-3 获取到共享资源
Semaphore获取共享锁的源码分析
Semaphore也是有分为公平和非公平的说法的。
// Semaphore#acquire
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0) // 调用子类实现doAcquireSharedInterruptibly(arg);
}tryAcquireShared,让我们看看Semaphore子类是怎么实现的,Semaphore也有非公平和公平的说法。public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
公平和非公平的区别在,公平的会先判断,当前线程是否需要排队。
我们先来看看公平的实现:
// Semaphore.NonfairSync#tryAcquireShared
protected int tryAcquireShared(int acquires) {for (;;) {// 判断当前线程是否需要排队。if (hasQueuedPredecessors())return -1;// state在这里表示共享资源的可占有数量int available = getState();// 减去本次想要占有的数量int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining; // 返回的remaining要是小于0就是抢共享资源失败的意思}
}
之前主播在ReentrantLock说的,这里要详细说说这个hasQueuedPredecessors
方法
现在来具体说下这个hasQueuedPredecessors
方法,判断当前线程是否需要排队。
public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;// 具体下面的判断return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}
步骤1(判断头节点和尾节点是否不同?): h != t
目的是为了快速判断当前同步队列是否为空,如果头节点和尾节点相同,说明同步队列为空,如下图所示,所以当前线程是不需要排队的。
步骤2(头节点的下一个节点 s 是否存在?): (s = h.next) == null
在并发场景中,可能有其他线程正在入队(比如刚设置完 tail,但还未更新 head.next),导致 h.next 暂时为 null。如下图所示。这时,认为存在并发竞争,保守判定为需要排队
步骤3(队列中第一个有效等待线程(s.thread)是否是当前线程?) :s.thread != Thread.currentThread()
如果是当前线程 ,说明自己是队列中的第一个等待者 ,不用排队(返回 false)。
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
如果不是 ,其他线程更早排队(下图thread0更早) , 必须排队(返回 true)。
非公平的其实很公平的差不多,少了个判断是需要排队。
// Semaphore.FairSync#tryAcquireShared
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}// Semaphore.Sync#nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {for (;;) {// state在这里表示共享资源的可占有数量int available = getState();// 减去本次想要占有的数量int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining; // 返回的remaining要是小于0就是抢共享资源失败的意思}
}
好了,现在回到我们AQS的获取共享资源的代码里来吧!
// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); // AQS的具体获取共享资源
}
doAcquireSharedInterruptibly 方法
// AQS#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 创建一个共享的Node,加入同步队列中(这里的addWaiter和ReentrantLock是一样的流程)// static final Node SHARED = new Node();final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();// 如果当前节点的前驱节点是头节点if (p == head) {// 尝试获取锁,返回的r是,后面方法的入参propagate,意思是还有多个共享资源可以占有int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r); // 将当前节点设置为头节点,并试试唤醒后继节点p.next = null; failed = false;return;}}// 获取锁失败。找到等待状态为Node.SIGNAL的前继节点 + 挂起线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
setHeadAndPropagate方法,做了2件事:
- 设置当前节点为头节点
- 尝试唤醒后继节点
private void setHeadAndPropagate(Node node, int propagate) {// 备份head节点Node h = head;// 将当前节点设置为头节点setHead(node);// 尝试唤醒后继节点// 1.propagate > 0: 表示当前有剩余资源(如Semaphore的许可),可以继续唤醒后续线程。// 2.h == null: h 是旧的头部节点,若为 null 说明队列异常(实际极少发生)// 3.h.waitStatus < 0:表示旧头部节点处于需唤醒后续节点的状态(如 SIGNAL)。// 4.(h = head) == null || h.waitStatus < 0:重新获取下head的值再试一次if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 当前唤醒节点的后继节点为null或者是共享类型的Node,就唤醒后继节点。if (s == null || s.isShared())doReleaseShared(); // 具体的释放资源逻辑,下面会说}
}// setHead方法
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;
}
Semaphore释放共享锁的源码分析
当我们调用release
方法的时候,会释放共享资源。
// Semaphore#release
public void release() {sync.releaseShared(1);
}// AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {// 子类实现尝试释放资源if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
让我先看看子类Semaphore
是怎么实现释放资源的:
简单来说,就是CAS的,修改state。
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}
好了,我们看看AQS那部分是怎么做的释放共享资源,跟上主播的节奏,来到doReleaseShared
方法:
private void doReleaseShared() {for (;;) {Node h = head;// 当前同步队列不为空,不为刚刚初始化完。if (h != null && h != tail) {int ws = h.waitStatus;// 头节点的状态为Node.SIGNAL,就唤醒这个线程。并将当前头节点的waitState改为0。if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; unparkSuccessor(h);}// 处理头节点处于0状态的情况,确保在并发释放时唤醒信号能正确传播。// 你一定会好奇这个ws == 0是怎么来的?// 线程A将头节点状态从SIGNAL改为0并唤醒线程B。// 线程B获取资源后成为新头节点,此时线程C进入doReleaseShared(),发现新头节点状态为0else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; }// 如果head节点发生改变,说明存在竞争就需要重新判断唤醒。没有变的话就结束。if (h == head) break;}
}
以CutDownLunch的角度看AQS共享锁的实现
CutDownLunch的简单使用
让我简单看看CountDownLatch的使用,通过一个计数器实现线程等待,适用于“主线程等待子线程完成任务”或“多线程同时启动”等场景。
public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3); // 初始化计数器为3// 创建并启动3个子线程for (int i = 0; i < 3; i++) {new Thread(() -> {try {System.out.println("子线程执行任务...");Thread.sleep(1000);latch.countDown(); // 任务完成后计数器减1} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();}latch.await(); // 主线程等待所有子线程完成任务System.out.println("所有子线程已完成任务,主线程继续执行");
}
输出结果
子线程执行任务...
子线程执行任务...
子线程执行任务...
所有子线程已完成任务,主线程继续执行
CutDownLunch获取共享锁源码分析
让我们从CountDownLatch的await方法开始看起:
// CountDownLatch#await()
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
AQS怎么获取共享资源的?
// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0) // 调用子类(CutDownLunch)tryAcquireShared实现doAcquireSharedInterruptibly(arg); // 获取锁失败执行
}
让我们简单看看CutDownLunch
的tryAcquireShared
方法
- 当
state == 0
:返回 1,表示倒计时已完成,线程可以直接通过。 - 当
state > 0
:返回 -1,表示倒计时未完成,线程需阻塞等待。
// CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
我们看看AQS是怎么做的,其实如果上面你看了Semaphore
的共享资源获取实现,你就会惊奇的发现,好像差不多哈。
// AQS#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {// 这里返回是1或者-1// - 返回1,线程可以直接通过。// - 返回-1,表示倒计时未完成,线程需阻塞等待。int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r); // 尝试唤醒后继节点p.next = null; failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
setHeadAndPropagate
// 入参解释下
// node:当前新增的节点 propagate:这里是1
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; setHead(node); // 将当前节点设置为头节点// 重点在这里// CountDownLatch中:propagate 表示是否已完全释放(即 state 是否减到 0)if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared(); // }
}
CutDownLunch释放共享锁源码分析
我们看看countDown是怎么释放共享资源的。
// CountDownLatch#countDown
public void countDown() {sync.releaseShared(1);
}
CountDownLatch#countDown中调用AQS的releaseShared方法
public final boolean releaseShared(int arg) {// 子类实现tryReleaseShared方法获取共享资源if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
doReleaseShared
是AQS的实现,我们在上面的Semaphore
讲过了。
这里具体看看子类CountDownLatch
是怎么重写tryReleaseShared
方法的。
protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;// 将state数量-1,state减到0就表示可以放行所有被await()阻塞的线程。int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}
}
以CyclicBarrier的角度看AQS条件队列
CyclicBarrier的简单使用
CyclicBarrier
(循环屏障)用于让一组线程互相等待,直到所有线程都到达某个屏障点后,再一起继续执行。
public static void main(String[] args) {int threadCount = 3;// 创建 CyclicBarrier,指定等待的线程数和到达屏障后的回调动作CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {System.out.println("所有线程已到达屏障,开始下一阶段");});for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 开始第一阶段任务");Thread.sleep(1000);barrier.await(); // 等待其他线程到达屏障System.out.println(Thread.currentThread().getName() + " 开始第二阶段任务");Thread.sleep(1000);barrier.await(); // 再次等待System.out.println(Thread.currentThread().getName() + " 完成所有任务");} catch (Exception e) {e.printStackTrace();}}, "线程" + (i + 1)).start();}
}
CyclicBarrier循环屏障源码分析
await方法
// CyclicBarrier#await
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}
}
dowait方法
// CyclicBarrier#dowait
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;// 这里加锁是为了使用Condition的await(),具体原因会在ReentrantLock源码解读中说。lock.lock();try {// generation代的概念,线程数量达到构造CyclicBarrier传的parties数量,可以执行构造时传的任务,这就是1代。final Generation g = generation;// 标记屏障是否被破坏,如果屏障被破坏,其他等待线程需要立即感知到这一状态,而不是无限等待。if (g.broken)throw new BrokenBarrierException();// 出现线程被中断,主动破坏屏障(breakBarrier()),唤醒所有等待线程,避免它们无限等待if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 需要打破屏障的线程数量--// 数量为0,就是打破屏障,执行构造CyclicBarrier传的任务。int index = --count;if (index == 0) { boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run(); // 执行任务ranAction = true; nextGeneration(); // 复原,准备下一代的数据return 0;} finally {if (!ranAction)breakBarrier();}}// 如果没有达到打破屏障的线程数,那就挂起这个线程for (;;) {try {// 是否启用超时机制if (!timed)trip.await(); // 释放锁 + 挂起线程else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock(); // 释放锁}
}
await方法,这里会涉及到线程的挂起和锁的释放。
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter(); // 加入到条件队列中int savedState = fullyRelease(node); // 释放当前线程持有的锁int interruptMode = 0;// 如果不在同步队列中,那么就挂起这个线程。while (!isOnSyncQueue(node)) {LockSupport.park(this); // 挂起线程if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
addConditionWaiter方法,创建Node.CONDITION
的Node,添加到条件队列里面。
下图模拟了条件队列的可能的添加新Node的情况。
private Node addConditionWaiter() {Node t = lastWaiter;// 有最后一个节点,且等待状态不为Node.CONDITIONif (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}
条件队列转同步队列
比如,我们在await
的线程数量,达到可以打破屏障的时候,我们会执行nextGeneration
方法。这时候就会涉及到条件队列转同步队列。
int index = --count;
if (index == 0) { boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run(); ranAction = true; nextGeneration(); // 唤醒所有线程,从条件队列进入到同步队列
nextGeneration
方法,会为下一代准备数据。
// CyclicBarrier#nextGeneration
private void nextGeneration() {// 重点在这里,唤醒线程trip.signalAll();count = parties; // 复原打破屏障的数量 generation = new Generation(); // 创建新的一代
}// AbstractQueuedSynchronizer#signalAll
public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first); // 重点在这里
}
doSignalAll,这里会逐步的将每个节点都放入,放入到同步队列种。
// AbstractQueuedSynchronizer#doSignalAll
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);
}
具体转换同步队列
// AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {// CAS将等待状态修改从-2到0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 加入同步队列,enq具体做啥之前说过了(‾◡◝)Node p = enq(node);int ws = p.waitStatus;// 等待状态为cancel 或者 cas修改等待状态Node.SIGNAL失败。就唤醒node的线程。if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}// AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) {if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
后话
怎么样?有毅力的你,能看到这里,你真的非常的厉害了,给你一个👍
通过对ReentrantLock独占锁的分析,聪明的你一定明白了Node是什么?Node.SIGNAL的意思,AQS的同步队列是什么样子的,怎么加入同步队列的?
通过Semaphore、CutDownLunch共享锁的分析,聪明的你一定明白了Node.PROPAGATE是干嘛的,它们是怎么基于AQS实现共享模式的?nextWaite是用来区分独占和共享模式的字段。
通过CyclicBarrier的分析,我们知道了条件队列,和AQS实现的条件队列转同步队列。
最后的最后
其实还有很多内容还是可以补充的,也欢迎各位大佬指出我的不足🙇♂️🙇♂️🙇♂️