java中的阻塞队列

阻塞队列是指支持阻塞添加和阻塞移除两种操作的队列.
java中提供了7种阻塞队列:
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

阻塞队列的增删查方法

处理方式/方法 抛出异常 返回值 阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek()

队列简介

ArrayBlockingQueue

ArrayBlockingQueue是一个由数组结构组成的有界阻塞队列。支持公平队列和非公平队列。会按照线程阻塞的顺序来让线程访问队列。默认是非公平队列,可以通过构造函数设置。

1
2
ArrayBlockingQueue(int capacity);
ArrayBlockingQueue(int capacity, boolean fair);

LinkedBlockingQueue

LinkedBlockingQueue是一个由链表结构组成的有界阻塞队列。默认长度和最大长度都是Integer.MAX_VALUE,可以通过构造函数设置长度。

PriorityBlockingQueue

PrioriityBlockingQueue是一个使用优先级队列实现的无界阻塞队列。队列中元素默认按照自然顺序的升序排序。相等的元素不能保证其顺序。
队列中的元素必须可以比较(即满足下面条件的其中之一)

  1. 构造队列是传递Comparator,作为元素的比较器

    1
    2
    PriorityBlockingQueue(int initialCapacity,
    Comparator<? super E> comparator)
  2. 元素自身实现Comparable接口

DelayQueue

DelayQueue支持延迟获取元素的无界阻塞队列,依据优先级队列PriorityQueue实现。

1
DelayQueue<E extends Delayed>

DelayQueue中的元素必须实现Delayed。

实现Delayed接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DelayedElement implements Delayed{
private long time;
private String id;
//1.构造函数设置延迟多久运行
public DelayedElement(String id,long time,TimeUnit unit){
this.id=id;
this.time=System.nanoTime()+TimeUnit.NANOSECONDS.convert(time,unit);
}
//2.获取还需要延迟多久
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time-System.nanoTime(),TimeUnit.NANOSECONDS);
}
//3. 比较,按time升序排序
@Override
public int compareTo(Delayed o) {
long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

@Override
public String toString(){
//纳秒转成毫秒输出
return String.format("id:%s,seconds:%d",id,TimeUnit.SECONDS.convert(time,TimeUnit.NANOSECONDS));
}
}

使用

1
2
3
4
5
6
7
8
9
10
11
DelayQueue delayQueue=new DelayQueue();
//延迟5秒执行
delayQueue.add(new DelayedElement("1",5,TimeUnit.SECONDS));
//延迟2秒执行
delayQueue.add(new DelayedElement("2",2,TimeUnit.SECONDS));
//延迟1秒执行
delayQueue.add(new DelayedElement("3",1,TimeUnit.SECONDS));
//阻塞等待
System.out.println(delayQueue.take());
System.out.println(delayQueue.take());
System.out.println(delayQueue.take());

运行结果:
id=3的最先运行,id=2的在id=3运行后1秒运行,id=1的在id=3运行后4秒运行。和预想结果一直。

1
2
3
4

id:3,seconds:78285
id:2,seconds:78286
id:1,seconds:78289

ScheduledThreadPoolExecutor实现延迟执行和固定周期执行依靠的DelayedWorkQueue与DelayQueue类似。ScheduledFutureTask是Delayed的实现。

SynchronousQueue

SynchronousQueue:一个不存储元素的阻塞队列。SynchronousQueue内部没有容器存储数据,每一个put操作,都需要等待一个take操作。否则不能再添加元素。
支持公平和非公平模式。

1
SynchronousQueue(boolean fair)

适用于生产者消费者模式的数据传递。

LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞队列。
LinkedTransferQueue实现了一个重要的接口TransferQueue,该接口含有下面几个重要方法:

  1. transfer(E e):若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列部,并且等待进入阻塞状态,到有消费者线程取走该元素。
  2. tryTransfer(E e):若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
  3. tryTransfer(E e, long timeout, TimeUnit unit):若当前存在一个正在等待获取的消费者线程,会立即传给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程取,则返回false,同时该元素被移除。
  4. hasWaitingConsumer():判断是否存在消费者线程。
  5. getWaitingConsumerCount():获取所有等待获取元素的消费线程数量。

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。可以从队列的两端添加或者移除元素。因为有两个操作入口,也减少多线程入队时的竞争。增加了以First和Last结尾的操作函数,用于从头操作队列或者从尾部操作队列。