Java Collection 之 Queue

Collection的子类,除了ListMap外,还有一类集合Queue,它是在两端出入的List,所以也可以用数组或链表来实现。队列可分为两类,普通队列和线程安全队列。普通队列包含LinkedListArrayDeque
线程安全队列又分为无界队列有界队列,有界队列常用于类似于生产者消费者的需求。

线程安全队列:

  • ConcurrentLinkedQueue/ConcurrentLinkedDeque:基于链表的无界的并发优化的Queue,实现了依赖于CAS的无锁算法。
  • LinkedBlockingQueue/LinkedBlockingDeque:基于链表的可选定长的并发优化的BlockingQueue。利用链表的特征,分离了takeLock与putLock两把锁,继续用notEmpty、notFull管理队列满或空时的阻塞状态。
  • ArrayBlockingQueue:定长的并发优化的BlockingQueue,基于循环数组实现。有一把公共的读写锁与notFull、notEmpty两个Condition管理队列满或空时的阻塞状态。
  • SynchronousQueueExecutors.newCachedThreadPool()中所使用的队列,只有当队列有由take()方法或者poll(timeout, unit)进入等待状态时,才能由offer()add()把element添加到队列中。

Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}

Queue继承自Collection并添加了6个方法,此6个方法分可为3组:

  • add / offer
  • remove / poll
  • element / peek

前后两个方法为一组,方法的操作基本一样,所不同的是前一个方法会抛出异常,而后面的方法则不会。

AbstractQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public void clear() {
while (poll() != null)
;
}
}

AbstractQueueQueue中的部分方法做了’实现’,addremoveelement方法分别调用了offerpollpeek

BlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}

BlockingQueueQueue的基础上,添加了puttake、带超时的offerpoll方法。

take vs poll vs peek

  • take: Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
    获取并且删除queue中的第一个元素,如果queue中没有element,则会一直等待直到queue中有可用的元素。
  • poll: Retrieves and removes the head of this queue, or returns null if this queue is empty.
    获取并且删除queue中的第一个元素,如果queue为空则返回null。
  • peek: Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty.
    获取queue中的第一个元素,但并不删除它,如果queue为空则返回null。

LinkedBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/ **
* An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
* linked nodes. 有界的先入先出队列
* This queue orders elements FIFO (first-in-first-out).
* The head of the queue is that element that has been on the
* queue the longest time.
* The tail of the queue is that element that has been on the
* queue the shortest time.
* New elements are inserted at the tail of the queue, and the queue retrieval
* operations obtain elements at the head of the queue.
* Linked queues typically have higher throughput than array-based queues but
* less predictable performance in most concurrent applications.
* /
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// ...
}

Example:

1
2
3
4
5
6
7
8
9
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(2);
queue.offer("A"); // success
queue.offer("B"); // success
queue.offer("C"); // fail
queue.poll(); // A
queue.poll(); // B
queue.offer("C"); // success
queue.offer("D"); // success
queue.remove(); // C

Executors.newFixedThreadPool(int)Executors.newSingleThreadExecutor()都使用了LinkedBlockingQueue

LinkedBlockingDeque

1
2
3
4
5
6
7
8
/ **
* An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
* linked nodes.
* /
public class LinkedBlockingDeque<E> extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
// ...
}

LinkedBlockingQeque不同,LinkedBlockingDeque同时提供了对队列头和尾的相关操作。

BlockingDeque

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
void addFirst(E e);
void addLast(E e);
// 以上两个方法直接进行操作,如果操作失败,则抛出异常
boolean offerFirst(E e);
boolean offerLast(E e);
/**
* Inserts the specified element at the front of this deque,
* waiting if necessary for space to become available.
*/
void putFirst(E e) throws InterruptedException; // waiting
void putLast(E e) throws InterruptedException; // waiting
/**
* Inserts the specified element at the front of this deque,
* waiting up to the specified wait time if necessary for space to
* become available.
*/
boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException;
boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Retrieves and removes the first element of this deque, waiting
* if necessary until an element becomes available.
*/
E takeFirst() throws InterruptedException; // waiting
E takeLast() throws InterruptedException; // waiting
E pollFirst(long timeout, TimeUnit unit) throws InterruptedException; // waiting
E pollLast(long timeout, TimeUnit unit) throws InterruptedException; // waiting
// *** BlockingQueue methods ***
boolean add(E e);
boolean offer(E e);
/**
* <p>This method is equivalent to {@link #putLast(Object) putLast}.
*/
void put(E e) throws InterruptedException;
/**
* Inserts the specified element into the queue represented by this deque
* (in other words, at the tail of this deque), waiting up to the
* specified wait time if necessary for space to become available.
*
* <p>This method is equivalent to {@link #offerLast(Object,long,TimeUnit) offerLast}.
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Retrieves and removes the head of the queue represented by this deque
*
* <p>This method is equivalent to {@link #removeFirst() removeFirst}.
*/
E remove();
E poll();
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
E element();
E peek();
/**
* Removes the first occurrence of the specified element from this deque.
* If the deque does not contain the element, it is unchanged.
*/
boolean remove(Object o);
public int size();
// *** Stack methods ***
/**
* Pushes an element onto the stack represented by this deque. In other
* words, inserts the element at the front of this deque unless it would
* violate capacity restrictions.
*
* <p>This method is equivalent to {@link #addFirst(Object) addFirst}.
*/
void push(E e);
}

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>(2);
deque.offerFirst("A");
deque.add("B");
System.out.println(deque.getFirst()); // A
System.out.println(deque.getLast()); // B
// will remove
System.out.println(deque.poll()); // A
System.out.println(deque.getFirst()); // B
// will remove
System.out.println(deque.takeFirst()); // B
deque.offerLast("D");
System.out.println(deque.remove()); // D
System.out.println(deque.take()); // block waiting