AQS 如何支持 Condition 的等待/唤醒?
在 Java 的并发框架中,AQS(AbstractQueuedSynchronizer)通过内部机制提供了对 Condition 的强大支持,用于实现线程间的等待 / 通知协调。
这是 Java 中构建 可中断、可限时、可多条件等待队列的基础。
一、什么是 Condition?
Condition 是 Lock 接口的一个子系统,用来替代 Object.wait()/notify(),并由 AQS 提供底层支持。
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
每一个
Condition都维护着一个独立的等待队列,用于存放调用await()后阻塞的线程节点。
二、AQS 中 Condition 的关键机制
AQS 中,Condition 是通过一个嵌套类实现的:
public class AbstractQueuedSynchronizer {
public class ConditionObject implements Condition {
...
}
}
AQS 核心队列结构:
同步队列(Sync Queue) 条件队列(Condition Queue)
+------+ +------+
| head | --> Node1 | NodeX | --> NodeY --> ...
+------+ +------+
三、等待与唤醒过程简述
1. await() 等待流程(ConditionObject.await)
调用 condition.await() 会:
- 当前线程封装成 Node 加入 condition queue;
- 释放锁(
ReentrantLock.unlock()); - 阻塞当前线程(park);
- 等待被
signal()唤醒后,重新尝试竞争锁。
源码逻辑(简化):
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Node node = addConditionWaiter(); // 加入 Condition 队列
int savedState = fullyRelease(node); // 释放锁
boolean interrupted = false;
while (!isOnSyncQueue(node)) { // 直到进入同步队列
LockSupport.park(this); // 阻塞线程
if (Thread.interrupted()) interrupted = true;
}
// 被唤醒后再次加入同步队列并竞争锁
acquireQueued(node, savedState);
if (interrupted) throw new InterruptedException();
}
2. signal() 唤醒流程(ConditionObject.signal)
signal() 会将 Condition Queue 中最前面的节点转移到 Sync Queue 中,等待重新获取锁。
源码逻辑(简化):
public final void signal() {
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 转移到同步队列
}
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
transferForSignal(Node node) 会将该 Node 的状态修改并添加到 AQS 的同步队列(Sync Queue)尾部。
四、整个流程时序图
线程A: Condition Queue Sync Queue
lock.lock()
└──→ condition.await()──┐
↓
[A - Node] ←─────┐
↓ │
lock.unlock() → park │
│
线程B: │
└──→ lock.lock()
└──→ condition.signal() ──────────┐
↓ ↓
[A - Node] → 入 Sync Queue
↓
unpark(Thread A)
↓
Thread A 重新竞争锁
五、Condition vs Object.wait()
| 特性 | Condition | Object.wait() |
|---|---|---|
| 多条件队列 | ✅ 支持多个 Condition | ❌ 只能有一个 wait set |
| 精细通知 | ✅ 可以 signal() 或 signalAll() | ❌ 只能 notify/notifyAll |
| 锁类型 | 配合 Lock(如 ReentrantLock)使用 | 配合 synchronized 使用 |
| 可中断/限时 | ✅ await() 可设置超时、中断 | 部分支持 |
六、实战案例示例
class BoundedBuffer<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int size) {
this.capacity = size;
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待非满
}
queue.add(item);
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待非空
}
T item = queue.remove();
notFull.signal(); // 通知生产者
return item;
} finally {
lock.unlock();
}
}
}
七、权威资料推荐
- OpenJDK – AbstractQueuedSynchronizer 源码
- Java Concurrency in Practice(Java 并发编程实战)—— 第 14 章
- 深入理解 Java 虚拟机 — 锁机制部分
- 官方文档:Java Condition 接口
总结一图流(记忆口诀)
await 是“入队阻塞 + 释放锁”, signal 是“移出 + 加入 sync 队列”, 唤醒后还要抢锁,才能真的继续跑。
更多详细内容请关注其他相关文章!