阻塞队列
之前的队列在很多场景下都不能很好地工作,例如
- 大部分场景要求分离向队列放入(生产者)、从队列拿出(消费者)两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
- 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
- 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试
因此我们需要解决的问题有
- 用锁保证线程安全
- 用条件变量让等待非空线程与等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转
有同学对线程安全还没有足够的认识,下面举一个反例,两个线程都要执行入队操作(几乎在同一时刻)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class TestThreadUnsafe { private final String[] array = new String[10]; private int tail = 0;
public void offer(String e) { array[tail] = e; tail++; }
@Override public String toString() { return Arrays.toString(array); }
public static void main(String[] args) { TestThreadUnsafe queue = new TestThreadUnsafe(); new Thread(()-> queue.offer("e1"), "t1").start(); new Thread(()-> queue.offer("e2"), "t2").start(); } }
|
执行的时间序列如下,假设初始状态 tail = 0,在执行过程中由于 CPU 在两个线程之间切换,造成了指令交错
线程1 |
线程2 |
说明 |
array[tail]=e1 |
|
线程1 向 tail 位置加入 e1 这个元素,但还没来得及执行 tail++ |
|
array[tail]=e2 |
线程2 向 tail 位置加入 e2 这个元素,覆盖掉了 e1 |
|
tail++ |
tail 自增为1 |
tail++ |
|
tail 自增为2 |
|
|
最后状态 tail 为 2,数组为 [e2, null, null …] |
糟糕的是,由于指令交错的顺序不同,得到的结果不止以上一种,宏观上造成混乱的效果
单锁实现
Java 中要防止代码段交错执行,需要使用锁,有两种选择
- synchronized 代码块,属于关键字级别提供锁保护,功能少
- ReentrantLock 类,功能丰富
以 ReentrantLock 为例
1 2 3 4 5 6 7 8 9 10 11
| ReentrantLock lock = new ReentrantLock();
public void offer(String e) { lock.lockInterruptibly(); try { array[tail] = e; tail++; } finally { lock.unlock(); } }
|
只要两个线程执行上段代码时,锁对象是同一个,就能保证 try 块内的代码的执行不会出现指令交错现象,即执行顺序只可能是下面两种情况之一
线程1 |
线程2 |
说明 |
lock.lockInterruptibly() |
|
t1对锁对象上锁 |
array[tail]=e1 |
|
|
|
lock.lockInterruptibly() |
即使 CPU 切换到线程2,但由于t1已经对该对象上锁,因此线程2卡在这儿进不去 |
tail++ |
|
切换回线程1 执行后续代码 |
lock.unlock() |
|
线程1 解锁 |
|
array[tail]=e2 |
线程2 此时才能获得锁,执行它的代码 |
|
tail++ |
|
- 另一种情况是线程2 先获得锁,线程1 被挡在外面
- 要明白保护的本质,本例中是保护的是 tail 位置读写的安全
事情还没有完,上面的例子是队列还没有放满的情况,考虑下面的代码(这回锁同时保护了 tail 和 size 的读写安全)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| ReentrantLock lock = new ReentrantLock(); int size = 0;
public void offer(String e) { lock.lockInterruptibly(); try { if(isFull()) { } array[tail] = e; tail++; size++; } finally { lock.unlock(); } }
private boolean isFull() { return size == array.length; }
|
之前是返回 false 表示添加失败,前面分析过想达到这么一种效果:
- 在队列满时,不是立刻返回,而是当前线程进入等待
- 什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行
ReentrantLock 可以配合条件变量来实现,代码进化为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| ReentrantLock lock = new ReentrantLock(); Condition tailWaits = lock.newCondition(); int size = 0;
public void offer(String e) { lock.lockInterruptibly(); try { while (isFull()) { tailWaits.await(); } array[tail] = e; tail++; size++; } finally { lock.unlock(); } }
private boolean isFull() { return size == array.length; }
|
- 条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁
- 将来我们的队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行
思考为何要用 while 而不是 if,设队列容量是 3
操作前 |
offer(4) |
offer(5) |
poll() |
操作后 |
[1 2 3] |
队列满,进入tailWaits 等待 |
|
|
[1 2 3] |
[1 2 3] |
|
|
取走 1,队列不满,唤醒线程 |
[2 3] |
[2 3] |
|
抢先获得锁,发现不满,放入 5 |
|
[2 3 5] |
[2 3 5] |
从上次等待处直接向下执行 |
|
|
[2 3 5 ?] |
关键点:
- 从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
- 这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待
最后的实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
public class BlockingQueue1<E> implements BlockingQueue<E> { private final E[] array; private int head = 0; private int tail = 0; private int size = 0;
@SuppressWarnings("all") public BlockingQueue1(int capacity) { array = (E[]) new Object[capacity]; }
ReentrantLock lock = new ReentrantLock(); Condition tailWaits = lock.newCondition(); Condition headWaits = lock.newCondition();
@Override public void offer(E e) throws InterruptedException { lock.lockInterruptibly(); try { while (isFull()) { tailWaits.await(); } array[tail] = e; if (++tail == array.length) { tail = 0; } size++; headWaits.signal(); } finally { lock.unlock(); } }
@Override public void offer(E e, long timeout) throws InterruptedException { lock.lockInterruptibly(); try { long t = TimeUnit.MILLISECONDS.toNanos(timeout); while (isFull()) { if (t <= 0) { return; } t = tailWaits.awaitNanos(t); } array[tail] = e; if (++tail == array.length) { tail = 0; } size++; headWaits.signal(); } finally { lock.unlock(); } }
@Override public E poll() throws InterruptedException { lock.lockInterruptibly(); try { while (isEmpty()) { headWaits.await(); } E e = array[head]; array[head] = null; if (++head == array.length) { head = 0; } size--; tailWaits.signal(); return e; } finally { lock.unlock(); } }
private boolean isEmpty() { return size == 0; }
private boolean isFull() { return size == array.length; } }
|
- public void offer(E e, long timeout) throws InterruptedException 是带超时的版本,可以只等待一段时间,而不是永久等下去,类似的 poll 也可以做带超时的版本,这个留给大家了
注意
- JDK 中 BlockingQueue 接口的方法命名与我的示例有些差异
- 方法 offer(E e) 是非阻塞的实现,阻塞实现方法为 put(E e)
- 方法 poll() 是非阻塞的实现,阻塞实现方法为 take()
双锁实现
单锁的缺点在于:
- 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
- 冲突的主要是生产者之间:多个 offer 线程修改 tail
- 冲突的还有消费者之间:多个 poll 线程修改 head
如果希望进一步提高性能,可以用两把锁
1 2 3 4 5
| ReentrantLock headLock = new ReentrantLock(); Condition headWaits = headLock.newCondition();
ReentrantLock tailLock = new ReentrantLock(); Condition tailWaits = tailLock.newCondition();
|
先看看 offer 方法的初步实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Override public void offer(E e) throws InterruptedException { tailLock.lockInterruptibly(); try { while (isFull()) { tailWaits.await(); } array[tail] = e; if (++tail == array.length) { tail = 0; } size++; } finally { tailLock.unlock(); } }
|
上面代码的缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果。因此,size 需要用下面的代码保证原子性
1 2 3 4
| AtomicInteger size = new AtomicInteger(0);
size.getAndIncrement(); size.getAndDecrement();
|
代码修改为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Override public void offer(E e) throws InterruptedException { tailLock.lockInterruptibly(); try { while (isFull()) { tailWaits.await(); } array[tail] = e; if (++tail == array.length) { tail = 0; } size.getAndIncrement(); } finally { tailLock.unlock(); } }
|
对称地,可以写出 poll 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Override public E poll() throws InterruptedException { E e; headLock.lockInterruptibly(); try { while (isEmpty()) { headWaits.await(); } e = array[head]; if (++head == array.length) { head = 0; } size.getAndDecrement(); } finally { headLock.unlock(); } return e; }
|
下面来看一个难题,就是如何通知 headWaits 和 tailWaits 中等待的线程,比如 poll 方法拿走一个元素,通知 tailWaits:我拿走一个,不满了噢,你们可以放了,因此代码改为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Override public E poll() throws InterruptedException { E e; headLock.lockInterruptibly(); try { while (isEmpty()) { headWaits.await(); } e = array[head]; if (++head == array.length) { head = 0; } size.getAndDecrement(); tailWaits.signal(); } finally { headLock.unlock(); } return e; }
|
问题在于要使用这些条件变量的 await(), signal() 等方法需要先获得与之关联的锁,上面的代码若直接运行会出现以下错误
1
| java.lang.IllegalMonitorStateException
|
那有同学说,加上锁不就行了吗,于是写出了下面的代码

发现什么问题了?两把锁这么嵌套使用,非常容易出现死锁,如下所示

因此得避免嵌套,两段加锁的代码变成了下面平级的样子

性能还可以进一步提升
-
代码调整后 offer 并没有同时获取 tailLock 和 headLock 两把锁,因此两次加锁之间会有空隙,这个空隙内可能有其它的 offer 线程添加了更多的元素,那么这些线程都要执行 signal(),通知 poll 线程队列非空吗?
- 每次调用 signal() 都需要这些 offer 线程先获得 headLock 锁,成本较高,要想法减少 offer 线程获得 headLock 锁的次数
- 可以加一个条件:当 offer 增加前队列为空,即从 0 变化到不空,才由此 offer 线程来通知 headWaits,其它情况不归它管
-
队列从 0 变化到不空,会唤醒一个等待的 poll 线程,这个线程被唤醒后,肯定能拿到 headLock 锁,因此它具备了唤醒 headWaits 上其它 poll 线程的先决条件。如果检查出此时有其它 offer 线程新增了元素(不空,但不是从0变化而来),那么不妨由此 poll 线程来唤醒其它 poll 线程
这个技巧被称之为级联通知(cascading notifies),类似的原因
- 在 poll 时队列从满变化到不满,才由此 poll 线程来唤醒一个等待的 offer 线程,目的也是为了减少 poll 线程对 tailLock 上锁次数,剩下等待的 offer 线程由这个 offer 线程间接唤醒
最终的代码为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| public class BlockingQueue2<E> implements BlockingQueue<E> {
private final E[] array; private int head = 0; private int tail = 0; private final AtomicInteger size = new AtomicInteger(0); ReentrantLock headLock = new ReentrantLock(); Condition headWaits = headLock.newCondition(); ReentrantLock tailLock = new ReentrantLock(); Condition tailWaits = tailLock.newCondition();
public BlockingQueue2(int capacity) { this.array = (E[]) new Object[capacity]; }
@Override public void offer(E e) throws InterruptedException { int c; tailLock.lockInterruptibly(); try { while (isFull()) { tailWaits.await(); } array[tail] = e; if (++tail == array.length) { tail = 0; } c = size.getAndIncrement(); if (c + 1 < array.length) { tailWaits.signal(); } } finally { tailLock.unlock(); } if (c == 0) { headLock.lock(); try { headWaits.signal(); } finally { headLock.unlock(); } } }
@Override public E poll() throws InterruptedException { E e; int c; headLock.lockInterruptibly(); try { while (isEmpty()) { headWaits.await(); } e = array[head]; if (++head == array.length) { head = 0; } c = size.getAndDecrement(); if (c > 1) { headWaits.signal(); } } finally { headLock.unlock(); } if (c == array.length) { tailLock.lock(); try { tailWaits.signal(); } finally { tailLock.unlock(); } } return e; }
private boolean isEmpty() { return size.get() == 0; }
private boolean isFull() { return size.get() == array.length; }
}
|
双锁实现的非常精巧,据说作者 Doug Lea 花了一年的时间才完善了此段代码
9.基础数据结构-优先级队列
11.基础数据结构-堆