Java高并发之阻塞队列(什么是阻塞队列、4对操作、7种阻塞队列、实现原理)

Java高并发之阻塞队列(什么是阻塞队列、4对操作、7种阻塞队列、实现原理)

1. 阻塞队列概述

① 什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法。

支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。支持阻塞的移除方法:当队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者场景,生产者是向队列里添加元素的线程,消费者是从队列里获取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

② 两个附加操作的4种处理方式

在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如下:

方法/处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(time, unit)移除方法remove()poll()take()poll(time, unit)检查方法element()peek()不可用不可用抛出异常: 当队列满时,如果再往队列里插入元素(再执行add(e)操作),会抛出IllegalArgumentException异常。当队列空时,从队列里获取元素(执行element()操作)会抛出NoSuchElementException异常。返回特殊值: 当往队列插入元素时(执行offer(e)操作),会返回元素是否插入成功,成功返回true。如果是移除方法(执行poll()操作或者peek()操作),则是从队列里取出一个元素,如果没有则返回null。一直阻塞: 当队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。超时退出: 当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。当队列为空时,队列会阻塞消费者线程一段时间,如果超过一定的时间,消费者线程会退出。CachedThreadPool中使用的SynchronousQueue中,就是使用的poll(time,unit)方法从队列中取任务。注意:NoSuchElementException异常是由element()方法抛出的,它将peek()方法进行了封装。peek()方法要么返回队列头部元素,要么返回null。当peek()方法返回null时,element()返回NoSuchElementException异常。

public E element() {

E x = peek();

if (x != null)

return x;

else

throw new NoSuchElementException();

}

2. Java中的7种阻塞队列

① ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序。构造方法如下:

public ArrayBlockingQueue(int capacity) {}

public ArrayBlockingQueue(int capacity, boolean fair) {}

public ArrayBlockingQueue(int capacity, boolean fair, Collection c) {}

参数fair用于设置线程是否公平访问队列,默认值为false,即指非公平地访问队列。参数capacity用于设置ArrayBlockingQueue的大小。如果capacity <= 0, 会抛出IllegalArgumentException异常。

什么叫公平访问队列?

所谓公平访问是指阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平是对先等待的线程是非公平的。当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。因此,参数fair的默认值为false。

访问者的公平性使用ReentrantLock实现:

public ArrayBlockingQueue(int capacity, boolean fair) {

if (capacity <= 0)

throw new IllegalArgumentException();

this.items = new Object[capacity];

lock = new ReentrantLock(fair);

notEmpty = lock.newCondition();

notFull = lock.newCondition();

}

只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。

② LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列,此队列按照先进先出(FIFO) 的原则对元素进行排序。此队列的默认和最大容量为Integer.MAX_VALUE,实际上被看做是无界阻塞队列。LinkedBlockingQueue的构造方法如下:

public LinkedBlockingQueue() {}

public LinkedBlockingQueue(int capacity) {}

public LinkedBlockingQueue(Collection c) {}

无参构造函数创建的LinkedBlockingQueue,capacity为默认值Integer.MAX_VALUE。也可以指定LinkedBlockingQueue的容量,也可以在创建时就向LinkedBlockingQueue中添加元素。

LinkedBlockingQueue有putLock和takeLock两个锁,都是可重入锁ReentrantLock。当往队列中添加元素时,使用putLock;从队列中获取元素时,使用takeLock。都是先进行加锁(lock()),再在finally方法中进行解锁(unlock())。

③ PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列,默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来进行排序。注意: 不能保证同优先级元素的顺序,在ScheduledFutureTask中为了避免这种情况,除了依靠time进行优先级排序,还依靠sequenceNumber进行优先级排序。PriorityBlockingQueue的构造方法如下:

public PriorityBlockingQueue() {}

public PriorityBlockingQueue(int initialCapacity){}

public PriorityBlockingQueue(Collection c) {}

public PriorityBlockingQueue(int initialCapacity, Comparator comparator) {

使用无参构造函数创建时,默认初始化大小DEFAULT_INITIAL_CAPACITY = 11。好奇?明明是一个无界阻塞队列,为何可以指定大小?原来使用size 记录中的元素个数,当size >= queue.length时,会使用tryGrow()方法进行扩容。

while ((n = size) >= (cap = (array = queue).length))

tryGrow(array, cap);

只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。

④ DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列使用 PriorityQueue 来实现。DelayQueue 的构造方法如下:

public DelayQueue() {}

public DelayQueue(Collection c) {}

DelayQueue的核心属性为PriorityQueue的实例q,用它可以按照某种优先级对元素排序。DelayQueue使用无参构造函数创建对象时,是有默认大小的。这是由实例q决定的,DEFAULT_INITIAL_CAPACITY = 11。当向DelayQueue中添加元素时,实际是向PriorityQueue中添加元素,会调用PriorityQueue的相应方法。当遇到size >= queue.length时,会使用grow(size+1)方法对队列进行扩容。

只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。队列中的元素必须实现 Delayed 接口,并且实现 compareTo ()方法来指定元素的顺序。为什么要实现Delayed接口?

ScheduledThreadPoolExecutor中的ScheduledFutureTask类,实现了RunnableScheduledFuture接口。RunnableScheduledFuture接口继承了ScheduledFuture接口,而ScheduledFuture接口继承了Delayed接口。所以ScheduledFutureTask类需要实现 Delayed 接口。

ScheduledFutureTask类有如下属性:

ScheduledFutureTask(Runnable r, V result, long ns, long period) {

super(r, result);

this.time = ns; // 任务下一次执行的具体时间

this.period = period; // 任务执行的时间间隔

this.sequenceNumber = sequencer.getAndIncrement(); // 任务在ScheduledThreadPoolExecutor中的序号

}

Delayed 接口只有一个getDelay(unit)方法,ScheduledFutureTask类实现了该方法。虽然传入了时间unit,但是内部直接使用的NANOSECONDS(纳秒)

public long getDelay(TimeUnit unit) {

return unit.convert(time - now(), NANOSECONDS);

}

compareTo()方法的代码如下:

先比较二者是否为同一对象,如果是同一对象直接返回0.接着比较time大小,如果this.time > other.time返回1,小于返回-1,如果相等则继续比较sequenceNumber的大小。如果this.sequenceNumber > other.sequenceNumber返回1,小于返回-1,相等的话,继续比较二者的延迟时间。this.getDelay > other.getDelay返回1,小于返回-1,相等返回0。

public int compareTo(Delayed other) {

if (other == this) // compare zero if same object

return 0;

if (other instanceof ScheduledFutureTask) {

ScheduledFutureTask x = (ScheduledFutureTask)other;

long diff = time - x.time;

if (diff < 0)

return -1;

else if (diff > 0)

return 1;

else if (sequenceNumber < x.sequenceNumber)

return -1;

else

return 1;

}

long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);

return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;

}

在siftUp方法中,将需要较早执行的元素向队列头部移动。如果发现该元素的不是较早执行的(key.compareTo(e) >= 0),直接退出循环。

private void siftUp(int k, RunnableScheduledFuture key) {

while (k > 0) {

...

if (key.compareTo(e) >= 0)

break;

...

}

...

}

我们可以将 DelayQueue 运用在以下应用场景:

缓存系统的设计: 可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。定时任务调度: 使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的。

⑤ SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列,有如下特点:

每一个添加元素的操作必须等待另一个线程获取元素的操作,否则不能继续添加元素。反之亦然。SynchronousQueue 可以看成是一个传球手,本身并不存储任何元素。只是负责把生产者线程处理的数据直接传递给消费者线程,非常适合于传递性场景。支持公平访问队列,默认情况下采用非公平访问策略。SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。

SynchronousQueue 的构造函数如下:

public SynchronousQueue() {

this(false);

}

public SynchronousQueue(boolean fair) {

transferer = fair ? new TransferQueue() : new TransferStack();

}

使用无参构造函数,创建的 SynchronousQueue 采用默认的非公平访问策略。当有元素可获取时,所有被阻塞的线程都有机会争抢获取元素的资格,可能会导致先阻塞的线程最后才获取到元素。也可以指定是否公平访问队列。如果是true,则transferer属性赋值为TransferQueue对象;否则,transferer属性赋值为TransferStack对象。

注意: transferer属性是一个volatitle变量, SynchronousQueue 没有使用到可重入锁ReentrantQueue。

⑥ LinkedTransferQueue

LinkedTransferQueue 是一个用链表实现的的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 transfer()和 tryTransfer()方法。LinkedTransferQueue的构造方法如下:

public LinkedTransferQueue() {}

public LinkedTransferQueue(Collection c) {}

transfer()方法:

如果有正在等待接收元素的消费者,transfer()方法将生产者产生的元素立刻传输给消费者。如果没有正在等待接收元素的消费者,transfer()方法将生产这产生的元素放到队列尾部,直到有消费者消费了该元素才返回。

tryTransfer()方法:

有两种形式,一种是不带时间参数的tryTransfer(E e)方法,一种是带时间参数的tryTransfer(E e, long timeout, TimeUnit unit)方法。不带时间参数的tryTransfer(e)方法:如果有正在等待接收元素的消费者,则立即将生产者产生的元素传递给消费者;如果没有正在接收获取元素的消费者,则直接返回false。带时间参数的tryTransfer(E e, long timeout, TimeUnit unit)方法:与tryTransfer()方法不同的是,如果没有正在等待接收元素的消费者,它会等待指定的时间再返回。如果超时还没有被消费者消费,则返回false;如果在等待的时间内被消费者消费,则返回true。

⑦ LinkedBlockingDeque

LinkedBlockingDeque 是一个用链表实现的双向有界阻塞队列,可以从队列的两端插入和移除元素。LinkedBlockingDeque可以运用在工作窃取模式中。与LinkedBlockingQueue一样,默认的长度和最大容量为Integer.MAX_VALUE。双向队列由于存在两个出入口,可以减少多线程同时入队的竞争。LinkedBlockingDeque 的构造方法如下:

public LinkedBlockingDeque() {}

public LinkedBlockingDeque(int capacity) {}

public LinkedBlockingDeque(Collection c) {}

使用默认的无参构造函数,创建的LinkedBlockingDeque的容量为默认值Integer.MAX_VALUE。为了防止LinkedBlockingDeque过度膨胀,可以使用待capacity参数的构造函数,设置容量。

相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast 等方法。以 First 单词结尾的方法,表示插入、获取、移除队列中的第一个元素(队列头部);以 Last 单词结尾方法,表示插入、获取、移除队列中的最后一个元素(队列尾部)。LinkedBlockingDeque 也保留了原始的add、remove等方法,但是默认的方向具有差异:有的默认操作队列头部,有的默认操作队列尾部。所以使用时,最好使用以Last和First结尾的方法,指明操作队列的方向。只有一个锁lock,是一种可重入锁ReentrantLock。不管是向队列中添加元素,还是获取元素,都使用先通过lock.lock()加锁,最后在finally语句块中通过lock.unlock()解锁。

3. 阻塞队列的总结

是否有界?

有界阻塞队列:ArrayBlockingQueue、LinkedBlockingQueue(经常被看做无界)、LinkedBlockingDeque(经常被看做无界)无界阻塞队列:DelayQueue(可以指定大小,因为可以扩容)、PriorityBlockingQueue(可以指定大小,因为可以扩容)不存储元素的阻塞队列:SynchronousQueue

是否可以公平访问队列?

支持公平访问的队列有:ArrayBlockingQueue、SynchronousQueue带有锁的队列

有一个可重入锁的队列:ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue、LinkedBlockingDeque有两个可重入锁的队列:LinkedBlockingQueue不带可重入锁的队列:SynchronousQueue(transfer属性是volatile变量)

4. JDK中阻塞队列的实现原理

JDK中的阻塞队列使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了队列中的一个元素后,会通知生产者当前队列可用。ArrayBlockingQueue 使用了 Condition 来实现,有两种状态:notFull状态和notEmpty状态。

private final Condition notFull;

private final Condition notEmpty;

public ArrayBlockingQueue(int capacity, boolean fair) {

// 省略其他代码

notEmpty = lock.newCondition();

notFull = lock.newCondition();

}

public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == items.length)

notFull.await();

insert(e);

} finally {

lock.unlock();

}

}

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == 0)

notEmpty.await();

return extract();

} finally {

lock.unlock();

}

}

private void insert(E x) {

items[putIndex] = x;

putIndex = inc(putIndex);

++count;

notEmpty.signal();

}

在notFull.await()方法中,如果队列满会使用LockSupport.park()方法阻塞队列。

5. 常见问题总结

1. 自己如何实现阻塞队列?

问到了就说不会吧。。。。

2. 常见的阻塞队列?

主要是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue讲解他们实现、是否有界、特殊性质等

风雨相关

松下家庭影院报价
网上365平台被黑提款

松下家庭影院报价

🌀 10-11 💧 阅读 3730
皮米 (pm)到米 (m)转换器
网上365平台被黑提款

皮米 (pm)到米 (m)转换器

🌀 08-05 💧 阅读 7060
逆天的美!8岁小女孩被誉“全球最美”,她爸辞职专门当她保镖
365体育娱乐手机平台

逆天的美!8岁小女孩被誉“全球最美”,她爸辞职专门当她保镖

🌀 07-25 💧 阅读 9549
新浪金融借钱靠谱吗?全面解析贷款理财安全与风险
网上365平台被黑提款

新浪金融借钱靠谱吗?全面解析贷款理财安全与风险

🌀 10-22 💧 阅读 6694