Version: Next

阻塞队列

  • 阻塞
    • 写入:如果队列满了,就阻塞,等待队列中的东西被取走
    • 读取:如果队列为空,就阻塞,等待队列中有东西
  • 队列
    • 先进先出 FIFO

什么场景需要使用阻塞队列?

  • 多线程并发
  • 线程池

学会使用队列添加元素和删除元素

Interface BlockingDeque

  • 继承自Queue接口
  • Queue接口继承自Collection接口,与ListSet接口同级
  • 实现类:
    • ArrayBlockingQueue
    • DelayQueue
    • LinkedBlockingDeque
    • LinkedBlockingQueue
    • LinkedTransferQueue
    • PriorityBlockingQueue
    • SynchronousQueue——同步队列

四组API

方式抛出异常有返回值(不报错)阻塞等待超时等待
添加add()offer()put()offer(obj, 超时时间, 时间单位)
移除remove()poll()take()poll(超时时间, 时间单位)
判断队列首element()peek()--

1. 抛出异常

  • 定义一个ArrayBlockingQueue,初始容量为3
  • 4个元素,则会抛出Queue full的异常
  • 添加add()
  • 移除remove()
  • 队首element()
@Test
public void test1() {
// 定义队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean result1 = arrayBlockingQueue.add("A");
boolean result2 = arrayBlockingQueue.add("B");
boolean result3 = arrayBlockingQueue.add("C");
boolean result4 = arrayBlockingQueue.add("D");
// 抛出异常
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
System.out.println(result4);
}
java.lang.IllegalStateException: Queue full
  • 再测试元素
  • remove(),没有参数,直接弹出队列尾的元素,并返回
  • 如果移除4个元素,就会抛出异常
@Test
public void test1() {
// 定义队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean result1 = arrayBlockingQueue.add("A");
boolean result2 = arrayBlockingQueue.add("B");
boolean result3 = arrayBlockingQueue.add("C");
// 抛出异常
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
System.out.println("===========");
// 移除元素
Object remove1 = arrayBlockingQueue.remove();
Object remove2 = arrayBlockingQueue.remove();
Object remove3 = arrayBlockingQueue.remove();
Object remove4 = arrayBlockingQueue.remove();
System.out.println(remove1);
System.out.println(remove2);
System.out.println(remove3);
System.out.println(remove4);
}
java.util.NoSuchElementException

2. 不会抛出异常

不会抛出异常,而是反映在返回值上

  • 添加offer()
  • 移除poll()
  • 队首peek()
/***
* 不会抛出异常
*/
@Test
public void test2() {
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
boolean result1 = arrayBlockingQueue.offer("A");
boolean result2 = arrayBlockingQueue.offer("B");
boolean result3 = arrayBlockingQueue.offer("C");
boolean result4 = arrayBlockingQueue.offer("D");
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
System.out.println(result4);
System.out.println("=============");
Object poll1 = arrayBlockingQueue.poll();
Object poll2 = arrayBlockingQueue.poll();
Object poll3 = arrayBlockingQueue.poll();
Object poll4 = arrayBlockingQueue.poll();
System.out.println(poll1);
System.out.println(poll2);
System.out.println(poll3);
System.out.println(poll4);
}
true
true
true
false
=============
A
B
C
null

3. 阻塞等待

队列满了、或者为空,对应的读写操作就阻塞,开始等待

  • 一直阻塞等待
  • 阻塞等待一端时间
  • put()
  • take()
@Test
public void test3() throws InterruptedException {
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.put("A");
arrayBlockingQueue.put("B");
arrayBlockingQueue.put("C");
Object take1 = arrayBlockingQueue.take();
Object take2 = arrayBlockingQueue.take();
Object take3 = arrayBlockingQueue.take();
Object take4 = arrayBlockingQueue.take();
}

程序不会报错,也不会执行完毕,而是进入阻塞等待的状态

4. 超时等待

等待超过指定时间,则返回false或null

  • offer(obj, 时间, 时间单位)
  • poll(时间,时间单位)
@Test
public void test4() throws InterruptedException {
ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.offer("A");
arrayBlockingQueue.offer("B");
arrayBlockingQueue.offer("C");
arrayBlockingQueue.offer("D",2,TimeUnit.SECONDS);
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS));
}

Class SynchronousQueue

同步队列

  • 容量为1,添加一个元素,必须等待取出来之后,才能再往里面放
  • put()
  • take()
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " | " + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " | " + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " | " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
T1 put 1
T2 | 1
T1 put 2
T2 | 2
T1 put 3
T2 | 3