线程间协作-等待唤醒机制

线程间协作

多线程开发中,线程往往都不是孤立的。一个线程往往需要多线程协作完成其待执行的任务。等待唤醒机制就是用来协调线程间的协作。例如:街边的小吃店都是生产一份等销售完再生产,这是典型的生产者消费者模式。下面用代码实现这个场景。

等待唤醒机制的好处:
节省cpu。线程间通讯也可以通过轮询的方式来检查条件进行协作,但是会消耗大量cpu。用生产者/消费者模式举例。在生产者生产的时候,消费者并不需要执行。如果不用等待唤醒机制,消费者只能轮询监控生存者是否完成生产,消耗cpu。通过等待唤醒机制,可以在生产这生产时,消费者进入等待状态,不消耗cpu。待生存者生产完成后再唤醒消费者。

wait/notify

内部锁是通过wait/notify/notifyAll这三个方法实现等待唤醒。wait方法会是一个线程进入等待状态,notify会随机唤醒一个等待状态的线程,notifyAll会唤醒所有等待的线程。

1
2
3
4
5
6
7
8
9
10
void notify() 
唤醒在此对象监视器上等待的单个线程。
void notifyAll()
唤醒在此对象监视器上等待的所有线程。
void wait()
在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,导致当前线程等待。
void wait(long timeout)
在其他线程调用此对象的 notify() 方法或 notifyAll() 方法,或者超过指定的时间量前,导致当前线程等待。
void wait(long timeout, int nanos)
在其他线程调用此对象的 notify() 方法或 notifyAll() 方法,或者其他某个线程中断当前线程,或者已超过某个实际时间量前,导致当前线程等待。

注意:

  1. wait/notify/notifyAll只能在内部锁作用范围内调用。
  2. wait/notify/notifyAll都是通过该内部锁的锁对象调用。
  3. wait会释放锁。
生存者消费者模式代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package Demo1;

public class Product {

private int id;

private boolean flag;
public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public synchronized void product() throws InterruptedException {
while (id<100) {

if (flag) {
wait();
}
//用id的变化模拟生产一个物品
id++;
System.out.println("product............id:" + id);
//物品生产好后flag变成true并唤醒消费者
flag = true;
notify();
}
}

public synchronized void comsume() throws InterruptedException {
while (id<100) {
if (!flag) {
wait();
}
System.out.println("comsume............id:" + id);
flag = false;
notify();
}
}


}

public class Consumer implements Runnable {

private Product p;
Consumer(Product p){
this.p=p;
}
public void run(){
try {
p.comsume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


public class Producer implements Runnable {

private Product p;
Producer(Product p){
this.p=p;
}
public void run() {
try {
p.product();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public class Main {
public static void main(String[] args) throws InterruptedException {

Product product = new Product();
Producer producer = new Producer(product);
Consumer consumer = new Consumer(product);
Thread thread = new Thread(producer);
thread.start();
new Thread(consumer).start();
thread.join();
}
}

结果如下,生产一个产品,再消费一个产品。

1
2
3
4
5
6
7
8
9
10
product............id:1
comsume............id:1
product............id:2
comsume............id:2
product............id:3
comsume............id:3
product............id:4
comsume............id:4
product............id:5
comsume............id:5

wait/notify 存在的问题
  1. 过早唤醒。因为notify唤醒具有随机性。在不确保一定能唤醒想要唤醒的线程时,必须使用notifyAll。notifyAll会将所有线程唤醒,有些线程过早被唤醒,浪费资源。例如:
    如果个多生产者多消费者模式,由于notify唤醒线程的随机性,有可能唤醒的都是生产者或消费者线程。这时需要notifyAll来实现唤醒。notifyAll存在的弊端就是会把所有等待线程唤醒,无论是生产者还是消费者。这个问题可以通过显示锁的Condition来解决。

显示锁的等待唤醒机制 await/signal

jdk1.5加入的Lock可以通过创建多个Condition对象来实现分组的等待和唤醒。Condition对象只会唤醒用该Condition对象调用等待的对象。

Condition的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void await() 
造成当前线程在接到信号或被中断之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
void awaitUninterruptibly()
造成当前线程在接到信号之前一直处于等待状态。
boolean awaitUntil(Date deadline)
造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
void signal()
唤醒一个等待线程。
void signalAll()
唤醒所有等待线程。
Condtion版本生存者/消费者模式

以下Product2类,其他类和上面的一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package Demo2;

import Demo1.Product;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Product2 {

private int id;
private boolean flag;
private final Lock lock = new ReentrantLock();
private final Condition pro = lock.newCondition();
private final Condition coms = lock.newCondition();
private int size;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public void product() throws InterruptedException {
try {
lock.lock();
while (id < 100) {
if (flag) {
//已生产一个产品,使用生产者的Condtion对象使生产者线程等待
pro.await();
}
System.out.println(String.format("product............id:%d" , ++id));
flag = true;
//使用消费者的Condition是消费者线程唤醒
coms.signalAll();
}
} finally {
lock.unlock();
}
}

public void comsume() throws InterruptedException {
try {
lock.lock();
while (id < 100) {
if (!flag) {
coms.await();
}
System.out.println(String.format("consumer............id:%d" , id));
flag = false;
pro.signalAll();
}
}finally {
lock.unlock();
}
}
}