Version: Next
阻塞队列
- 阻塞
- 写入:如果队列满了,就阻塞,等待队列中的东西被取走
- 读取:如果队列为空,就阻塞,等待队列中有东西
- 队列
- 先进先出 FIFO
什么场景需要使用阻塞队列?
- 多线程并发
- 线程池
学会使用队列添加元素和删除元素
Interface BlockingDeque
- 继承自
Queue
接口Queue
接口继承自Collection
接口,与List
和Set
接口同级- 实现类:
ArrayBlockingQueue
DelayQueue
LinkedBlockingDeque
LinkedBlockingQueue
LinkedTransferQueue
PriorityBlockingQueue
SynchronousQueue
——同步队列
四组API
方式 | 抛出异常 | 有返回值(不报错) | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(obj, 超时时间, 时间单位) |
移除 | remove() | poll() | take() | poll(超时时间, 时间单位) |
判断队列首 | element() | peek() | - | - |
1. 抛出异常
- 定义一个
ArrayBlockingQueue
,初始容量为3
- 存
4
个元素,则会抛出Queue full
的异常- 添加
add()
- 移除
remove()
- 队首
element()
@Test
public void test1() {
// 定义队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean result1 = arrayBlockingQueue.add("A");
boolean result2 = arrayBlockingQueue.add("B");
boolean result3 = arrayBlockingQueue.add("C");
boolean result4 = arrayBlockingQueue.add("D");
// 抛出异常
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
System.out.println(result4);
}
java.lang.IllegalStateException: Queue full
- 再测试取元素
remove()
,没有参数,直接弹出队列尾的元素,并返回- 如果移除
4
个元素,就会抛出异常
@Test
public void test1() {
// 定义队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean result1 = arrayBlockingQueue.add("A");
boolean result2 = arrayBlockingQueue.add("B");
boolean result3 = arrayBlockingQueue.add("C");
// 抛出异常
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
System.out.println("===========");
// 移除元素
Object remove1 = arrayBlockingQueue.remove();
Object remove2 = arrayBlockingQueue.remove();
Object remove3 = arrayBlockingQueue.remove();
Object remove4 = arrayBlockingQueue.remove();
System.out.println(remove1);
System.out.println(remove2);
System.out.println(remove3);
System.out.println(remove4);
}
java.util.NoSuchElementException
2. 不会抛出异常
不会抛出异常,而是反映在返回值上
- 添加
offer()
- 移除
poll()
- 队首
peek()
/***
* 不会抛出异常
*/
@Test
public void test2() {
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean result1 = arrayBlockingQueue.offer("A");
boolean result2 = arrayBlockingQueue.offer("B");
boolean result3 = arrayBlockingQueue.offer("C");
boolean result4 = arrayBlockingQueue.offer("D");
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
System.out.println(result4);
System.out.println("=============");
Object poll1 = arrayBlockingQueue.poll();
Object poll2 = arrayBlockingQueue.poll();
Object poll3 = arrayBlockingQueue.poll();
Object poll4 = arrayBlockingQueue.poll();
System.out.println(poll1);
System.out.println(poll2);
System.out.println(poll3);
System.out.println(poll4);
}
true
true
true
false
=============
A
B
C
null
3. 阻塞等待
队列满了、或者为空,对应的读写操作就阻塞,开始等待
- 一直阻塞等待
- 阻塞等待一端时间
put()
take()
@Test
public void test3() throws InterruptedException {
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.put("A");
arrayBlockingQueue.put("B");
arrayBlockingQueue.put("C");
Object take1 = arrayBlockingQueue.take();
Object take2 = arrayBlockingQueue.take();
Object take3 = arrayBlockingQueue.take();
Object take4 = arrayBlockingQueue.take();
}
程序不会报错,也不会执行完毕,而是进入阻塞等待的状态
4. 超时等待
等待超过指定时间,则返回false或null
offer(obj, 时间, 时间单位)
poll(时间,时间单位)
@Test
public void test4() throws InterruptedException {
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.offer("A");
arrayBlockingQueue.offer("B");
arrayBlockingQueue.offer("C");
arrayBlockingQueue.offer("D",2,TimeUnit.SECONDS);
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS));
}
Class SynchronousQueue
同步队列
- 容量为1,添加一个元素,必须等待取出来之后,才能再往里面放
put()
take()
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " | " + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " | " + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " | " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
T1 put 1
T2 | 1
T1 put 2
T2 | 2
T1 put 3
T2 | 3