基于数组的阻塞队列 ArrayBlockingQueue 原理
ArrayBlockingQueue 是 Java 并发包 java.util.concurrent 中一个重要的有界阻塞队列实现,它基于一个数组结构构建,广泛用于生产者-消费者模型等多线程环境。
一、基本概念简介
- 类名:
java.util.concurrent.ArrayBlockingQueue<E> - 结构:基于数组的循环队列(固定大小,有界)
- 特性:
- 线程安全(使用重入锁 ReentrantLock)
- 支持阻塞的
put()/take()方法 - 支持公平性策略(FIFO,可选公平锁)
二、核心构造与参数
public ArrayBlockingQueue(int capacity, boolean fair)
| 参数 | 含义 |
|---|---|
| capacity | 队列容量(固定大小) |
| fair | 是否使用公平锁(可选) |
默认是非公平的锁,性能更高,公平模式下线程按先到先服务(FIFO)排队。
三、底层核心组成
源码类图结构:
ArrayBlockingQueue<E>
├── Object[] items // 底层数据存储数组(循环使用)
├── int count // 当前队列元素数量
├── int putIndex // 写入索引
├── int takeIndex // 读取索引
├── ReentrantLock lock // 全局重入锁(保护所有操作)
├── Condition notEmpty // 消费者等待条件
├── Condition notFull // 生产者等待条件
使用 循环数组 + 双 Condition + 锁机制 保证线程安全。
四、核心方法原理分析
1. put(E e):阻塞式入队
public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // 队列满了,等待
enqueue(e);
} finally {
lock.unlock();
}
}
enqueue() 逻辑:
void enqueue(E e) {
items[putIndex] = e;
putIndex = (putIndex + 1) % items.length;
count++;
notEmpty.signal(); // 通知等待的消费者线程
}
若队列已满,则阻塞等待
notFull.await(),直到有空位。
2. take():阻塞式出队
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 队列空,等待生产者
return dequeue();
} finally {
lock.unlock();
}
}
dequeue() 逻辑:
E dequeue() {
E e = items[takeIndex];
items[takeIndex] = null;
takeIndex = (takeIndex + 1) % items.length;
count--;
notFull.signal(); // 通知等待的生产者线程
return e;
}
若队列为空,则阻塞等待
notEmpty.await(),直到有数据可取。
五、ArrayBlockingQueue 的核心特性总结
| 特性 | 说明 |
|---|---|
| 有界阻塞 | 构造时必须指定容量,满时阻塞生产者,空时阻塞消费者 |
| 循环数组结构 | 使用固定大小的数组 + 循环指针 putIndex 和 takeIndex 管理队列位置 |
| 线程安全 | 所有访问受 ReentrantLock 保证 |
| 条件变量 | 使用两个 Condition 分别唤醒生产者(notFull)和消费者(notEmpty) |
| 公平性可选 | 可选公平锁模式,默认非公平(性能更高) |
六、典型使用场景(多线程编程)
- 生产者-消费者模型(线程池、任务调度)
- 限流(容量限制)
- 控制资源竞争
- 日志异步处理
代码示例:
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Runnable producer = () -> {
try {
for (int i = 0; i < 100; i++) {
queue.put(i); // 如果队列满则阻塞
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {}
};
Runnable consumer = () -> {
try {
while (true) {
Integer val = queue.take(); // 如果队列空则阻塞
System.out.println("Consumed: " + val);
}
} catch (InterruptedException e) {}
};
new Thread(producer).start();
new Thread(consumer).start();
七、和其他队列对比(面试常问)
| 队列类型 | 底层结构 | 是否有界 | 是否阻塞 | 是否线程安全 | 特点 |
|---|---|---|---|---|---|
ArrayBlockingQueue | 数组(循环) | ✅ 是 | ✅ 是 | ✅ 是 | 固定容量,性能高,结构简单 |
LinkedBlockingQueue | 链表 | ✅/❌ 是 | ✅ 是 | ✅ 是 | 支持无界,适合高并发场景 |
SynchronousQueue | 无存储 | ❌ 否 | ✅ 是 | ✅ 是 | 每次 put 必须等 take |
PriorityBlockingQueue | 堆 | ✅ 是 | ✅ 是 | ✅ 是 | 元素按优先级自动排序 |
八、权威参考文档
- 📘 JDK 官方 API 文档
- 📘 OpenJDK 源码 ArrayBlockingQueue.java
- 📘 书籍:《Java 并发编程实战》(Java Concurrency in Practice)
九、总结(面试/项目)
| 维度 | 要点总结 |
|---|---|
| 数据结构 | 循环数组(固定大小) |
| 线程安全 | ReentrantLock + Condition 实现 |
| 阻塞原理 | 满时阻塞生产者,空时阻塞消费者 |
| 公平性 | 构造时可设定是否公平(影响性能) |
| 应用场景 | 多线程通信、生产者消费者、限流 |
更多详细内容请关注其他相关文章!