1. 阻塞队列概述
① 什么是阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法。
支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。支持阻塞的移除方法:当队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者场景,生产者是向队列里添加元素的线程,消费者是从队列里获取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
② 两个附加操作的4种处理方式
在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如下:
方法/处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(time, unit)移除方法remove()poll()take()poll(time, unit)检查方法element()peek()不可用不可用抛出异常: 当队列满时,如果再往队列里插入元素(再执行add(e)操作),会抛出IllegalArgumentException异常。当队列空时,从队列里获取元素(执行element()操作)会抛出NoSuchElementException异常。返回特殊值: 当往队列插入元素时(执行offer(e)操作),会返回元素是否插入成功,成功返回true。如果是移除方法(执行poll()操作或者peek()操作),则是从队列里取出一个元素,如果没有则返回null。一直阻塞: 当队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。超时退出: 当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。当队列为空时,队列会阻塞消费者线程一段时间,如果超过一定的时间,消费者线程会退出。CachedThreadPool中使用的SynchronousQueue中,就是使用的poll(time,unit)方法从队列中取任务。注意:NoSuchElementException异常是由element()方法抛出的,它将peek()方法进行了封装。peek()方法要么返回队列头部元素,要么返回null。当peek()方法返回null时,element()返回NoSuchElementException异常。
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
2. Java中的7种阻塞队列
① ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序。构造方法如下:
public ArrayBlockingQueue(int capacity) {}
public ArrayBlockingQueue(int capacity, boolean fair) {}
public ArrayBlockingQueue(int capacity, boolean fair, Collection extends E> c) {}
参数fair用于设置线程是否公平访问队列,默认值为false,即指非公平地访问队列。参数capacity用于设置ArrayBlockingQueue的大小。如果capacity <= 0, 会抛出IllegalArgumentException异常。
什么叫公平访问队列?
所谓公平访问是指阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平是对先等待的线程是非公平的。当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。因此,参数fair的默认值为false。
访问者的公平性使用ReentrantLock实现:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。
② LinkedBlockingQueue
LinkedBlockingQueue是一个用链表实现的有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序。此队列的默认和最大容量为Integer.MAX_VALUE,实际上被看做是无界阻塞队列。LinkedBlockingQueue的构造方法如下:
public LinkedBlockingQueue() {}
public LinkedBlockingQueue(int capacity) {}
public LinkedBlockingQueue(Collection extends E> c) {}
无参构造函数创建的LinkedBlockingQueue,capacity为默认值Integer.MAX_VALUE。也可以指定LinkedBlockingQueue的容量,也可以在创建时就向LinkedBlockingQueue中添加元素。
LinkedBlockingQueue有putLock和takeLock两个锁,都是可重入锁ReentrantLock。当往队列中添加元素时,使用putLock;从队列中获取元素时,使用takeLock。都是先进行加锁(lock()),再在finally方法中进行解锁(unlock())。
③ PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来进行排序。注意: 不能保证同优先级元素的顺序,在ScheduledFutureTask中为了避免这种情况,除了依靠time进行优先级排序,还依靠sequenceNumber进行优先级排序。PriorityBlockingQueue的构造方法如下:
public PriorityBlockingQueue() {}
public PriorityBlockingQueue(int initialCapacity){}
public PriorityBlockingQueue(Collection extends E> c) {}
public PriorityBlockingQueue(int initialCapacity, Comparator super E> comparator) {
使用无参构造函数创建时,默认初始化大小DEFAULT_INITIAL_CAPACITY = 11。好奇?明明是一个无界阻塞队列,为何可以指定大小?原来使用size 记录中的元素个数,当size >= queue.length时,会使用tryGrow()方法进行扩容。
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。
④ DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列使用 PriorityQueue 来实现。DelayQueue 的构造方法如下:
public DelayQueue() {}
public DelayQueue(Collection extends E> c) {}
DelayQueue的核心属性为PriorityQueue的实例q,用它可以按照某种优先级对元素排序。DelayQueue使用无参构造函数创建对象时,是有默认大小的。这是由实例q决定的,DEFAULT_INITIAL_CAPACITY = 11。当向DelayQueue中添加元素时,实际是向PriorityQueue中添加元素,会调用PriorityQueue的相应方法。当遇到size >= queue.length时,会使用grow(size+1)方法对队列进行扩容。
只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。队列中的元素必须实现 Delayed 接口,并且实现 compareTo ()方法来指定元素的顺序。为什么要实现Delayed接口?
ScheduledThreadPoolExecutor中的ScheduledFutureTask类,实现了RunnableScheduledFuture接口。RunnableScheduledFuture接口继承了ScheduledFuture接口,而ScheduledFuture接口继承了Delayed接口。所以ScheduledFutureTask类需要实现 Delayed 接口。
ScheduledFutureTask类有如下属性:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns; // 任务下一次执行的具体时间
this.period = period; // 任务执行的时间间隔
this.sequenceNumber = sequencer.getAndIncrement(); // 任务在ScheduledThreadPoolExecutor中的序号
}
Delayed 接口只有一个getDelay(unit)方法,ScheduledFutureTask类实现了该方法。虽然传入了时间unit,但是内部直接使用的NANOSECONDS(纳秒)
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
compareTo()方法的代码如下:
先比较二者是否为同一对象,如果是同一对象直接返回0.接着比较time大小,如果this.time > other.time返回1,小于返回-1,如果相等则继续比较sequenceNumber的大小。如果this.sequenceNumber > other.sequenceNumber返回1,小于返回-1,相等的话,继续比较二者的延迟时间。this.getDelay > other.getDelay返回1,小于返回-1,相等返回0。
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask> x = (ScheduledFutureTask>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
在siftUp方法中,将需要较早执行的元素向队列头部移动。如果发现该元素的不是较早执行的(key.compareTo(e) >= 0),直接退出循环。
private void siftUp(int k, RunnableScheduledFuture> key) {
while (k > 0) {
...
if (key.compareTo(e) >= 0)
break;
...
}
...
}
我们可以将 DelayQueue 运用在以下应用场景:
缓存系统的设计: 可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。定时任务调度: 使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的。
⑤ SynchronousQueue
SynchronousQueue 是一个不存储元素的阻塞队列,有如下特点:
每一个添加元素的操作必须等待另一个线程获取元素的操作,否则不能继续添加元素。反之亦然。SynchronousQueue 可以看成是一个传球手,本身并不存储任何元素。只是负责把生产者线程处理的数据直接传递给消费者线程,非常适合于传递性场景。支持公平访问队列,默认情况下采用非公平访问策略。SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
SynchronousQueue 的构造函数如下:
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue
}
使用无参构造函数,创建的 SynchronousQueue 采用默认的非公平访问策略。当有元素可获取时,所有被阻塞的线程都有机会争抢获取元素的资格,可能会导致先阻塞的线程最后才获取到元素。也可以指定是否公平访问队列。如果是true,则transferer属性赋值为TransferQueue对象;否则,transferer属性赋值为TransferStack对象。
注意: transferer属性是一个volatitle变量, SynchronousQueue 没有使用到可重入锁ReentrantQueue。
⑥ LinkedTransferQueue
LinkedTransferQueue 是一个用链表实现的的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 transfer()和 tryTransfer()方法。LinkedTransferQueue的构造方法如下:
public LinkedTransferQueue() {}
public LinkedTransferQueue(Collection extends E> c) {}
transfer()方法:
如果有正在等待接收元素的消费者,transfer()方法将生产者产生的元素立刻传输给消费者。如果没有正在等待接收元素的消费者,transfer()方法将生产这产生的元素放到队列尾部,直到有消费者消费了该元素才返回。
tryTransfer()方法:
有两种形式,一种是不带时间参数的tryTransfer(E e)方法,一种是带时间参数的tryTransfer(E e, long timeout, TimeUnit unit)方法。不带时间参数的tryTransfer(e)方法:如果有正在等待接收元素的消费者,则立即将生产者产生的元素传递给消费者;如果没有正在接收获取元素的消费者,则直接返回false。带时间参数的tryTransfer(E e, long timeout, TimeUnit unit)方法:与tryTransfer()方法不同的是,如果没有正在等待接收元素的消费者,它会等待指定的时间再返回。如果超时还没有被消费者消费,则返回false;如果在等待的时间内被消费者消费,则返回true。
⑦ LinkedBlockingDeque
LinkedBlockingDeque 是一个用链表实现的双向有界阻塞队列,可以从队列的两端插入和移除元素。LinkedBlockingDeque可以运用在工作窃取模式中。与LinkedBlockingQueue一样,默认的长度和最大容量为Integer.MAX_VALUE。双向队列由于存在两个出入口,可以减少多线程同时入队的竞争。LinkedBlockingDeque 的构造方法如下:
public LinkedBlockingDeque() {}
public LinkedBlockingDeque(int capacity) {}
public LinkedBlockingDeque(Collection extends E> c) {}
使用默认的无参构造函数,创建的LinkedBlockingDeque的容量为默认值Integer.MAX_VALUE。为了防止LinkedBlockingDeque过度膨胀,可以使用待capacity参数的构造函数,设置容量。
相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast 等方法。以 First 单词结尾的方法,表示插入、获取、移除队列中的第一个元素(队列头部);以 Last 单词结尾方法,表示插入、获取、移除队列中的最后一个元素(队列尾部)。LinkedBlockingDeque 也保留了原始的add、remove等方法,但是默认的方向具有差异:有的默认操作队列头部,有的默认操作队列尾部。所以使用时,最好使用以Last和First结尾的方法,指明操作队列的方向。只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。
3. 阻塞队列的总结
是否有界?
有界阻塞队列:ArrayBlockingQueue、LinkedBlockingQueue(经常被看做无界)、LinkedBlockingDeque(经常被看做无界)无界阻塞队列:DelayQueue(可以指定大小,因为可以扩容)、PriorityBlockingQueue(可以指定大小,因为可以扩容)不存储元素的阻塞队列:SynchronousQueue
是否可以公平访问队列?
支持公平访问的队列有:ArrayBlockingQueue、SynchronousQueue带有锁的队列
有一个可重入锁的队列:ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue、LinkedBlockingDeque有两个可重入锁的队列:LinkedBlockingQueue不带可重入锁的队列:SynchronousQueue(transfer属性是volatile变量)
4. JDK中阻塞队列的实现原理
JDK中的阻塞队列使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了队列中的一个元素后,会通知生产者当前队列可用。ArrayBlockingQueue 使用了 Condition 来实现,有两种状态:notFull状态和notEmpty状态。
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略其他代码
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
在notFull.await()方法中,如果队列满会使用LockSupport.park()方法阻塞队列。
5. 常见问题总结
1. 自己如何实现阻塞队列?
问到了就说不会吧。。。。
2. 常见的阻塞队列?
主要是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue讲解他们实现、是否有界、特殊性质等