单线程间通信 多线程间通信
单线程间通信
首先实现一个EventQueue,该Queue有如下三种状态: |
---|
·队列满 ——最多可容纳多少个Event,好比一个系统最多同时能够受理多少业务一样; |
·队列 空——当所有的Event都被处理并且没有新的Event被提交的时候,此时队列将是空的状态; |
·有Event但是没有 满——有新的Event被提交,但是此时没有到达队列的上限。 |
public class EventQueue {
private final int max;
static class Event {
}
private final LinkedList<Event> eventQueue = new LinkedList<>();
private final static int DEFAULT_MAX_EVENT = 10;
public EventQueue() {
this(DEFAULT_MAX_EVENT);
}
public EventQueue(int max) {
this.max = max;
}
public void offer(Event event) {
synchronized (eventQueue) {
if (eventQueue.size() >= max) {
try {
console(" the queue is full.");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
console(" the new event is submitted");
eventQueue.addLast(event);
eventQueue.notify();
}
}
public Event take() {
synchronized (eventQueue) {
if (eventQueue.isEmpty()) {
try {
console(" the queue is empty.");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Event event = eventQueue.removeFirst();
this.eventQueue.notify();
console(" the event " + event + " is handled.");
return event;
}
}
private void console(String message) {
System.out.printf("%s:%s\n", currentThread().getName(), message);
}
}
上述代码中,在EventQueue中定义了一个队列,offer方法会提交一个Event至队尾,如果此时队列已经满了,那么提交的线程将会被阻塞 ,这是调用了wait方法的结果。同样take方法会从队头获取数据,如果队列中没有可用数据,那么工作线程就会被阻塞,这也是调用wait方法的直接结果。此外,还可以看到一个notify方法,该方法的作用是唤醒 那些曾经执行monitor的wait方法而进入阻塞的线程。 |
---|
public class EventClient {
public static void main(String[] args) {
final EventQueue eventQueue = new EventQueue();
new Thread(() -> {
for (; ; ) {
eventQueue.offer(new EventQueue.Event());
}
}, "Producer").start();
new Thread(() -> {
for (; ; ) {
eventQueue.take();
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Consumer").start();
}
}
Producer线程模拟提交Event的客户端几乎没有任何的延迟,而Consumer线程则用于模拟处理请求的工作线程(上面的EventQueue目前只支持一个线程的Producer和一个线程的Consumer,也就是单线程间的通信 )。 |
---|
Producer: the new event is submitted
Producer: the new event is submitted
Producer: the new event is submitted
Producer: the new event is submitted
Producer: the new event is submitted
Producer: the new event is submitted
Producer: the new event is submitted
Producer: the new event is submitted
Producer: the new event is submitted
Producer: the new event is submitted
Producer: the queue is full.
Consumer: the event thread.pcpattern.single.EventQueue$Event@4e879704 is handled.
Producer: the new event is submitted
Producer: the queue is full.
Consumer: the event thread.pcpattern.single.EventQueue$Event@66e5ec31 is handled.
Producer: the new event is submitted
Producer: the queue is full.
Consumer: the event thread.pcpattern.single.EventQueue$Event@64056de6 is handled.
Producer: the new event is submitted
Producer: the queue is full.
Consumer: the event thread.pcpattern.single.EventQueue$Event@84a769f is handled.
Producer: the new event is submitted
Producer: the queue is full.
Consumer: the event thread.pcpattern.single.EventQueue$Event@7a87ef25 is handled.
Producer: the new event is submitted
Producer: the queue is full.
Consumer: the event thread.pcpattern.single.EventQueue$Event@334a0020 is handled.
Producer: the new event is submitted
Producer: the queue is full.
Consumer: the event thread.pcpattern.single.EventQueue$Event@7cef31f2 is handled.
Producer: the new event is submitted
Producer: the queue is full.
Consumer: the event thread.pcpattern.single.EventQueue$Event@27ca5e7e is handled.
Producer: the new event is submitted
Producer: the queue is full.
...
通过上述的输出日志可以看出,Producer线程很快就提交了10个Event数据,此时队列已经满了,那么它将会执行eventQueue的wait方法进而进入阻塞状态,Consumer线程由于要处理数据,所以会花费大概10毫秒的时间来处理其中的一条数据,然后通知Producer线程可以继续提交数据了,如此循环往复。 |
---|
讲解了两个线程间的通信,只有一个线程对EventQueue进行offer操作,也只有一个线程对EventQueue进行take操作,如果多个线程同时进行take或者offer,那么上面的程序就会出现问题。 |
---|
多线程间通信
生产者消费者
只需要将如上代码的临界值的判断if更改为while ,将notify更改为notifyAll 即可 |
---|
public class EventQueuePC {
private final int max;
static class Event {
}
private final LinkedList<Event> eventQueue = new LinkedList<>();
private final static int DEFAULT_MAX_EVENT = 10;
public EventQueuePC() {
this(DEFAULT_MAX_EVENT);
}
public EventQueuePC(int max) {
this.max = max;
}
private void console(String message) {
System.out.printf("%s:%s\n", currentThread().getName(), message);
}
public void offerSafe(Event event) {
synchronized (eventQueue) {
while (eventQueue.size() >= max) {
try {
console(" the queue is full.");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
console(" the new event is submitted");
eventQueue.addLast(event);
eventQueue.notifyAll();
}
}
public Event takeSafe() {
synchronized (eventQueue) {
while (eventQueue.isEmpty()) {
try {
console(" the queue is empty.");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Event event = eventQueue.removeFirst();
this.eventQueue.notifyAll();
console(" the event " + event + " is handled.");
return event;
}
}
}
-——————————————————————————————————————读书笔记摘自书名:Java高并发编程详解:多线程与架构设计 作者:汪文君
还没有评论,来说两句吧...