Version: Next

线程通信

生产者消费者模式

image-20200626135714353

synchronized版

生产者消费者模式并不是一种设计模式

  • 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品买走
  • 如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者买走为止
  • 如果仓库中有产品,则消费者可以将产品买走,否则停止消费并等待,直到仓库中再次放入产品为止

$$ Producer(生产者) → 数据缓冲区 → Consumer(消费者) $$

这是一个线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件

  • 对于生产者,没有生产产品之前,要通知消费者等待。而生产了产品之后,又需要马上通知消费者消费
  • 对于消费者,在消费之后,要通知生产者已经结束消费,需要生产新的产品以供消费
  • 在生产者消费者问题中,仅有synchronized是不够的的
    • synchronized可组织并发更新同一个资源,实现了同步
    • synchronized不能用来实现不同线程之间的消息传递(通信)

Java提供了几个方法来解决线程之间的通信问题

方法名作用
wait()表示线程会一直等待,直到其他线程通知,与sleep不同,会释放锁
wait(long timeout)指定等待的毫秒数
notify()唤醒一个处于等待状态的线程
notifyAll()唤醒同一个对象上所有调用wait()方法的线程,优先级高的线程优先调度
caution
  • 均是Object类的方法,都只能在同步方法或者同步代码块中使用,否则会抛出异常IllegalMonitorStateException
  • wait()语句总是写在循环中,防止在很多个一起访问时,出现虚假唤醒现象

解决方式1——管程法

并发协作模型”生产者/消费者模式“ —— 管程法

  • 生产者:负责生产数据的模块(可能是方法、对象、线程、进程)
  • 消费者:负责处理数据的模块(可能是方法、对象、线程、进程)
  • 缓冲区:消费者不能直接使用生产者的数据,他们之间有个"缓冲区"

生产者将生产好的数据放入缓冲区,消费者从缓冲区拿走数据

// 测试生产者消费者模型 -> 利用缓冲区解决
// 管程法
// 生产者 消费者 产品 缓冲区
public class Demo01 {
public static void main(String[] args) {
SynContainer container = new SynContainer();
new Producer(container).start();
new Consumer(container).start();
}
}
//生产者
class Producer extends Thread {
SynContainer container;
public Producer(SynContainer container) {
this.container = container;
}
@Override
public void run() {
for (int id = 0; id < 100; id++) {
container.push(new Chicken(id));
System.out.println("生产了第" + id + "只鸡");
}
}
}
//消费者
class Consumer extends Thread {
SynContainer container;
public Consumer(SynContainer container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
container.pop();
System.out.println("消费了第" + i + "只鸡");
}
}
}
//产品
class Chicken {
int id;
public Chicken() {
}
public Chicken(int id) {
this.id = id;
}
}
//缓冲区
class SynContainer {
//容器容量
Chicken[] chickens = new Chicken[10];
int count = 0;
//生产者放入产品
public synchronized void push(Chicken chicken) {
//如果容器满了,等待消费者消费
while (count >= chickens.length) {
//通知消费者消费
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果为空,放入生产好的产品
chickens[count] = chicken;
count++;
//可以通知消费者消费了
this.notifyAll();
}
//消费者消费产品
public synchronized Chicken pop() {
//判断能否消费
while (count <= 0) {
//等待生产,消费者等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果可以消费,就消费
count--;
Chicken chicken = chickens[count];
//通知生产者
this.notifyAll();
return chicken;
}
}

解决方式2——信号灯法

并发协作模型“生产者/消费者模式”——信号灯法

  • 通过标志位解决
//生产着消费者模式——测试信号灯法,通过标志位解决
public class Demo02 {
public static void main(String[] args) {
ChickenFactory factory = new ChickenFactory();
new Producer(factory).start();
new Concumer(factory).start();
}
}
//生产者
class Producer extends Thread {
ChickenFactory factory;
public Producer(ChickenFactory factory) {
this.factory = factory;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
this.factory.product(new Chicken(i + ""));
}
}
}
//消费者
class Concumer extends Thread {
ChickenFactory factory;
public Concumer(ChickenFactory factory) {
this.factory = factory;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
this.factory.buy();
}
}
}
//产品
class Chicken {
String name;
public Chicken() {
}
public Chicken(String name) {
this.name = name;
}
@Override
public String toString() {
return "Chicken{" +
"name=" + name +
'}';
}
}
//产品工厂
class ChickenFactory {
Chicken chicken;
boolean flag = true; // 真 进行生产,消费者等待 | 假 进行消费,生产者等待
//生产
public synchronized void product(Chicken chicken) {
while (!flag) {
//生产者等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生产了鸡 -> " + chicken);
this.chicken = chicken;
this.flag = !this.flag;
//通知消费者消费
this.notifyAll();
}
//消费
public synchronized void buy() {
while (flag) {
//消费者等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费了鸡 -> " + chicken);
//通知生产新的鸡
this.flag = !this.flag;
this.notifyAll();
}
}
生产了鸡 -> Chicken{name=0}
消费了鸡 -> Chicken{name=0}
生产了鸡 -> Chicken{name=1}
消费了鸡 -> Chicken{name=1}
生产了鸡 -> Chicken{name=2}
消费了鸡 -> Chicken{name=2}
生产了鸡 -> Chicken{name=3}
消费了鸡 -> Chicken{name=3}
生产了鸡 -> Chicken{name=4}
消费了鸡 -> Chicken{name=4}
生产了鸡 -> Chicken{name=5}
消费了鸡 -> Chicken{name=5}
生产了鸡 -> Chicken{name=6}
消费了鸡 -> Chicken{name=6}
生产了鸡 -> Chicken{name=7}
消费了鸡 -> Chicken{name=7}
生产了鸡 -> Chicken{name=8}
消费了鸡 -> Chicken{name=8}
生产了鸡 -> Chicken{name=9}
消费了鸡 -> Chicken{name=9}
生产了鸡 -> Chicken{name=10}
消费了鸡 -> Chicken{name=10}
生产了鸡 -> Chicken{name=11}
消费了鸡 -> Chicken{name=11}
生产了鸡 -> Chicken{name=12}
消费了鸡 -> Chicken{name=12}
生产了鸡 -> Chicken{name=13}
消费了鸡 -> Chicken{name=13}
生产了鸡 -> Chicken{name=14}
消费了鸡 -> Chicken{name=14}
生产了鸡 -> Chicken{name=15}
消费了鸡 -> Chicken{name=15}
生产了鸡 -> Chicken{name=16}
消费了鸡 -> Chicken{name=16}
生产了鸡 -> Chicken{name=17}
消费了鸡 -> Chicken{name=17}
生产了鸡 -> Chicken{name=18}
消费了鸡 -> Chicken{name=18}
生产了鸡 -> Chicken{name=19}
消费了鸡 -> Chicken{name=19}

Lock版 (JUC)——精准唤醒线程

通过Lock找到Condition

  • 本质上和synchronized、wait()、notify()没区别
  • 这种写法的优势是,可以精准唤醒线程
public class LockVersion {
public static void main(String[] args) {
SynContainer synContainer = new SynContainer();
Producer2 producer1 = new Producer2(synContainer);
Producer2 producer2 = new Producer2(synContainer);
Consumer2 consumer1 = new Consumer2(synContainer);
Consumer2 consumer2 = new Consumer2(synContainer);
producer1.setName("producer1");
producer2.setName("producer2");
consumer1.setName("consumer1");
consumer2.setName("consumer2");
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
}
}
//生产者
class Producer2 extends Thread {
SynContainer container;
public Producer2(SynContainer container) {
this.container = container;
}
@Override
public void run() {
for (int id = 0; id < 100; id++) {
try {
container.push(new Chicken2(id));
System.out.println(Thread.currentThread().getName() + "生产了第" + id + "只鸡");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者
class Consumer2 extends Thread {
SynContainer container;
public Consumer2(SynContainer container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
container.pop();
System.out.println(Thread.currentThread().getName() + "消费了第" + i + "只鸡");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//产品
class Chicken2 {
int id;
public Chicken2() {
}
public Chicken2(int id) {
this.id = id;
}
}
//缓冲区
class SynContainer {
//容器容量
Chicken2[] chickens = new Chicken2[10];
int count = 0;
//用Lock锁而不是synchronized
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition(); //通过condition接口调用await()和singalAll()
//生产者放入产品
public void push(Chicken2 chicken) throws InterruptedException {
lock.lock();
try {
//如果容器满了,等待消费者消费
while (count >= chickens.length) {
//通知消费者消费
condition.await();
}
//如果为空,放入生产好的产品
chickens[count] = chicken;
count++;
//可以通知消费者消费了
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//消费者消费产品
public void pop() throws InterruptedException {
lock.lock();
try {
//判断能否消费
while (count <= 0) {
//等待生产,消费者等待
condition.await();
}
//如果可以消费,就消费
count--;
//通知生产者
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

精准唤醒线程

  • 每一个condition都是一个监视器
  • 可以设置多个condition监视器对象,从而精准唤醒线程
  • 业务、判断、执行、通知
public class Demo04LockAccurate {
public static void main(String[] args) {
Data data = new Data();
//假设有三个线程,操作同一个资源类,希望他们顺序执行 A->B B->C C->A
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printC();
}
}, "C").start();
}
}
class Data { //资源类
private Lock lock = new ReentrantLock();
private int state = 1;
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();
public void printA() {
lock.lock();
try { // 业务、判断、执行、通知
while (state != 1) {
// 不是1就等待
conditionA.await();
}
//走到这里,说明是1,1就是跑A自己的代码
System.out.println(Thread.currentThread().getName() + " is running");
//执行完了,唤醒,唤醒指定的线程
state = 2;
conditionB.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (state != 2) {
conditionB.await();
}
System.out.println(Thread.currentThread().getName() + " is running");
state = 3;
conditionC.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (state != 3) {
conditionC.await();
}
System.out.println(Thread.currentThread().getName() + " is running");
state = 1;
conditionA.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
A is running
B is running
C is running
A is running
B is running
C is running
A is running
B is running
C is running
A is running
B is running
C is running

线程池

  • 背景:经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大

  • 思路:提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。 可以避免频繁创建销毁、实现重复利用

  • 好处

    • 提高响应速度,减少了创建新线程的时间
    • 降低资源消耗,重复利用线程池中的线程,不需要每次都创建
    • 便于线程管理
      • corePoolSize:核心池大小
      • maximumPoolSize:最大线程数
      • keepAliveTime:线程没有任务时最多保持多超时间然后终止

使用线程池

  • JDK 5.0起提供了线程池相关API:ExecutorServiceExecutors
  • ExecutorService:真正的线程池接口,常见子类ThreadPoolExecutor
    • void execute(Runnable command):执行任务/命令,没有返回值,一般用来执行Runnable
    • <T> Future<T> submit(Callable<T> tast):执行任务,有返回值,一般用来执行Callable
    • Void shutdown():关闭连接池
  • Executors:工具类、线程池的工厂类,用于创建并返回不同类型的线程池
public class Demo01ThreadPool {
public static void main(String[] args) {
//1.创建服务,创建线程池
// 参数为线程池大小
ExecutorService executorService = Executors.newFixedThreadPool(10);
//接受一个实现Runnable接口的对象,进行执行
executorService.execute(new MyThread());
executorService.execute(new MyThread());
executorService.execute(new MyThread());
executorService.execute(new MyThread());
//2.关闭连接
executorService.shutdown();
}
}
class MyThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < 20; i++) {
System.out.println(Thread.currentThread().getName() + " | " + i);
}
}
}
pool-1-thread-2 | 0
pool-1-thread-2 | 1
pool-1-thread-2 | 2
pool-1-thread-2 | 3
pool-1-thread-2 | 4
pool-1-thread-2 | 5
pool-1-thread-2 | 6
pool-1-thread-2 | 7
pool-1-thread-2 | 8
pool-1-thread-2 | 9
pool-1-thread-2 | 10
pool-1-thread-2 | 11
pool-1-thread-2 | 12
pool-1-thread-2 | 13
pool-1-thread-2 | 14
pool-1-thread-2 | 15
pool-1-thread-2 | 16
pool-1-thread-2 | 17
pool-1-thread-2 | 18
pool-1-thread-2 | 19
pool-1-thread-4 | 0
pool-1-thread-3 | 0
pool-1-thread-3 | 1
pool-1-thread-3 | 2
pool-1-thread-1 | 0
pool-1-thread-1 | 1
pool-1-thread-1 | 2
pool-1-thread-1 | 3
pool-1-thread-1 | 4
pool-1-thread-1 | 5
pool-1-thread-1 | 6
pool-1-thread-1 | 7
pool-1-thread-3 | 3
pool-1-thread-4 | 1
pool-1-thread-3 | 4
pool-1-thread-1 | 8
pool-1-thread-3 | 5
pool-1-thread-3 | 6
pool-1-thread-3 | 7
pool-1-thread-3 | 8
pool-1-thread-3 | 9
pool-1-thread-4 | 2
pool-1-thread-3 | 10
pool-1-thread-1 | 9
pool-1-thread-3 | 11
pool-1-thread-3 | 12
pool-1-thread-3 | 13
pool-1-thread-3 | 14
pool-1-thread-4 | 3
pool-1-thread-3 | 15
pool-1-thread-3 | 16
pool-1-thread-3 | 17
pool-1-thread-3 | 18
pool-1-thread-1 | 10
pool-1-thread-3 | 19
pool-1-thread-4 | 4
pool-1-thread-1 | 11
pool-1-thread-4 | 5
pool-1-thread-1 | 12
pool-1-thread-4 | 6
pool-1-thread-1 | 13
pool-1-thread-4 | 7
pool-1-thread-1 | 14
pool-1-thread-4 | 8
pool-1-thread-1 | 15
pool-1-thread-4 | 9
pool-1-thread-1 | 16
pool-1-thread-4 | 10
pool-1-thread-4 | 11
pool-1-thread-4 | 12
pool-1-thread-4 | 13
pool-1-thread-4 | 14
pool-1-thread-4 | 15
pool-1-thread-4 | 16
pool-1-thread-4 | 17
pool-1-thread-4 | 18
pool-1-thread-1 | 17
pool-1-thread-1 | 18
pool-1-thread-1 | 19
pool-1-thread-4 | 19

三个线程轮流打印

public class TestSync {
public static void main(String[] args) {
AtomicInteger state = new AtomicInteger(0);
Runnable runnable = new PrintThread(state);
Thread t1 = new Thread(runnable, "A");
Thread t2 = new Thread(runnable, "B");
Thread t3 = new Thread(runnable, "C");
t1.start();
t2.start();
t3.start();
}
}
class PrintThread implements Runnable {
private AtomicInteger state;
public PrintThread(AtomicInteger state) {
this.state = state;
}
@Override
public void run() {
print();
}
private void print() {
for (int i = 0; i < 10; i++) {
synchronized (state) {
if (state.get() % 3 == 0) {
System.out.println("A");
state.incrementAndGet();
} else if (state.get() % 3 == 1) {
System.out.println("B");
state.incrementAndGet();
} else if (state.get() % 3 == 2) {
System.out.println("C");
state.incrementAndGet();
}
}
}
}
}