前言

Java并发编程(六)——三种方式实现生产者消费者模式 一节中我们使用阻塞队列(BlockingQueue)来实现生产者消费者模式,但是我们对阻塞队列内部的原理并没有进行深入的探讨,本节我们来学习一下Java中阻塞队列的相关知识。

主要并发队列

Java中队列的接口是Queue,其实现类如下图所示:

分为阻塞队列和非阻塞队列两种。

何为阻塞队列

阻塞队列,也就是BlockingQueue,是一个接口,它的源码定义如下:

1
2
3
4
5
public interface BlockingQueue<E> extends Queue<E> {
void put(E e) throws InterruptedException;
boolean offer(E e);
.....
}

BlockingQueue继承了Queue接口,是队列的一种,是Java5中引入的。

BlockingQueue 是线程安全的,我们在很多场景下都可以利用线程安全的队列来优雅地解决我们业务自身的线程安全问题。比如说,使用生产者/消费者模式的时候,我们生产者只需要往队列里添加元素,而消费者只需要从队列里取出它们就可以了,如图所示:

在图中,左侧有三个生产者线程,它会把生产出来的结果放到中间的阻塞队列中,而右侧的三个消费者也会从阻塞队列中取出它所需要的内容并进行处理。因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问题。

阻塞队列的两个好处:

  1. 因为阻塞队列本身是线程安全的,队列可以安全地从一个线程向另外一个线程传递数据,所以我们的生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从“你”转移到了“队列”上,降低了我们开发的难度和工作量。

  2. 队列它还能起到一个隔离的作用。比如说我们开发一个银行转账的程序,那么生产者线程不需要关心具体的转账逻辑,只需要把转账任务,如账户和金额等信息放到队列中就可以,而不需要去关心银行这个类如何实现具体的转账业务。而作为银行这个类来讲,它会去从队列里取出来将要执行的具体的任务,再去通过自己的各种方法来完成本次转账。这样就实现了具体任务与执行任务类之间的解耦,任务被放在了阻塞队列中,而负责放任务的线程是无法直接访问到我们银行具体实现转账操作的对象的,实现了隔离,提高了安全性。

阻塞队列的常用方法

我们把 BlockingQueue 中最常用的和添加、删除相关的 8 个方法列出来,并且把它们分为三组,每组方法都和添加、移除元素相关。

这三组方法由于功能很类似,所以比较容易混淆。它们的区别仅在于特殊情况:当队列满了无法添加元素,或者是队列空了无法移除元素时,不同组的方法对于这种特殊情况会有不同的处理方式:

抛出异常:add、remove、element

返回结果但不抛出异常:offer、poll、peek

阻塞:put、take

总结如下:

常见的阻塞队列

阻塞队列的典型例子就是BlockingQueue的实现类,有6种主要的实现类:

ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue 和 LinkedTransferQueue。

ArrayBlockingQueue

有界队列,内部是使用数组存储元素,利用 ReentrantLock 实现线程安全

构造函数中有两个参数,一个capacity表示数组容量,需要在创建的时候指定,之后不能扩容。另一个参数指定是否公平。正如 ReentrantLock 一样,如果 ArrayBlockingQueue 被设置为非公平的,那么就存在插队的可能;如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。

在此学习下系统吞吐量的概念:

吞吐量是指在单位时间内央处理器(CPU)从存储设备读取->处理->存储信息的量。系统吞吐量主要是 cpu性能、时钟频率等有关。

LinkedBlockingQueue

内部用链表实现的 BlockingQueue,如果我们不指定它的初始容量,那么它容量默认就为整型的最大值 Integer.MAX_VALUE,由于这个数非常大,我们通常不可能放入这么多的数据,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限。

SynchronousQueue

​ SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。如下图所示:

SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

由于SynchronousQueue容量为0,所以她有几个方法比较特殊,举例如下:

isEmpty()方法返回始终为true,size()方法始终返回为0。

peek()方法返回始终为null。

PriorityBlockingQueue

前面我们所说的 ArrayBlockingQueue 和 LinkedBlockingQueue 都是采用先进先出的顺序进行排序,可是如果有的时候我们需要自定义排序怎么办呢?这时就需要使用 PriorityBlockingQueue。

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。同时,插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。

它的 take 方法在队列为空的时候会阻塞,但是正因为它是无界队列,而且会自动扩容,所以它的队列永远不会满,所以它的 put 方法永远不会阻塞,添加操作始终都会成功,也正因为如此,它的成员变量里只有一个 Condition:

1
2
3
4
 /**
* Condition for blocking when empty
*/
private final Condition notEmpty;

这和之前的 ArrayBlockingQueue 拥有两个 Condition(分别是 notEmpty 和 notFull)形成了鲜明的对比,我们的 PriorityBlockingQueue 不需要 notFull,因为它永远都不会满,真是“有空间就可以任性”。

DelayQueue

DelayQueue 这个队列比较特殊,具有“延迟”的功能。我们可以设定让队列中的任务延迟多久之后执行,比如 10 秒钟之后执行,这在例如“30 分钟后未付款自动取消订单”等需要延迟执行的场景中被大量使用。

它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public interface Delayed extends Comparable<Delayed> {

/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}

可以看出这个 Delayed 接口继承自 Comparable,里面有一个需要实现的方法,就是 getDelay。这里的 getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,如果返回 0 或者负数则代表任务已过期。

元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。

DelayQueue 内部使用了 PriorityQueue 的能力来进行排序,而不是自己从头编写,offer()、take()等方法内部都调用了PriorityQueue 的对应方法来进行实现,避免了重复造轮子。

总结

Java中阻塞队列的典型例子就是BlockingQueue的实现类,常用的有

ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue五种,它们各自有不同的特点,比如内存结构、排序方式等区别。