多线程学习--线程通信
线程彼此通信会涉及到等待/通知,其最经典的案例就是“生产者/消费者模式”。原理都是基于wait/notify的。
一生产一消费:
//消费者
class Consumer {
private String lock;
public Consumer(String lock) {
super();
this.lock = lock;
}
public void getValue() {
try {
synchronized (lock) {
if (ValueObject.value.equals("")) {
lock.wait();
}
System.out.println("get的值是" + ValueObject.value);
ValueObject.value = "";
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//生产者
class Producer {
private String lock;
public Producer(String lock) {
super();
this.lock = lock;
}
public void setValue() {
try {
synchronized (lock) {
if (!ValueObject.value.equals("")) {
lock.wait();
}
String value = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println("set的值是" + value);
ValueObject.value = value;
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class test {
public static void main(String[] args) {
String lock = new String("");
Producer producer = new Producer(lock);
Consumer consumer = new Consumer(lock);
new Thread(()->{
while (true) {
producer.setValue();
}
},"生产者").start();
new Thread(()->{
while (true) {
consumer.getValue();
}
},"消费者").start();
}
}
结果:
set的值是1544676746771_186857371485669
get的值是1544676746771_186857371485669
set的值是1544676746771_186857371495584
get的值是1544676746771_186857371495584
set的值是1544676746771_186857371515716
get的值是1544676746771_186857371515716
。。。。。。。。。。。。。。。。。。。。。
假死:无法不保证唤醒的是非当前线程
class ValueObject {
public static String value = "";
}
class Producer {
private String lock;
public Producer(String lock) {
super();
this.lock = lock;
}
public void setValue() {
try {
synchronized (lock) {
while (!ValueObject.value.equals("")) {
System.out.println("生产者 " + Thread.currentThread().getName() + " WAITING了★");
lock.wait();
}
System.out.println("生产者 " + Thread.currentThread().getName() + " RUNNABLE了");
String value = System.currentTimeMillis() + "_" + System.nanoTime();
ValueObject.value = value;
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer {
private String lock;
public Consumer(String lock) {
super();
this.lock = lock;
}
public void getValue() {
try {
synchronized (lock) {
while (ValueObject.value.equals("")) {
System.out.println("消费者 " + Thread.currentThread().getName() + " WAITING了☆");
lock.wait();
}
System.out.println("消费者 " + Thread.currentThread().getName() + " RUNNABLE了");
ValueObject.value = "";
lock.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ThreadP extends Thread {
private Producer p;
public ThreadP(Producer p) {
super();
this.p = p;
}
@Override
public void run() {
while (true) {
p.setValue();
}
}
}
class ThreadC extends Thread {
private Consumer consumer;
public ThreadC(Consumer consumer) {
super();
this.consumer = consumer;
}
@Override
public void run() {
while (true) {
consumer.getValue();
}
}
}
public class Run8_allWait {
public static void main(String[] args) throws InterruptedException {
String lock = new String("");
Producer producer = new Producer(lock);
Consumer consumer = new Consumer(lock);
ThreadP[] pThread = new ThreadP[2];
ThreadC[] cThread = new ThreadC[2];
for (int i = 0; i < 2; i++) {
pThread[i] = new ThreadP(producer);
pThread[i].setName("生产者" + (i + 1));
cThread[i] = new ThreadC(consumer);
cThread[i].setName("消费者" + (i + 1));
pThread[i].start();
cThread[i].start();
}
Thread.sleep(5000);
//获取当前活动的线程
Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
Thread.currentThread().getThreadGroup().enumerate(threads);
for (int i = 0; i < threads.length; i++) {
System.out.println(threads[i].getName() + " " + threads[i].getState());
}
}
}
结果:
生产者 生产者1 RUNNABLE了
生产者 生产者1 WAITING了★
生产者 生产者2 WAITING了★
消费者 消费者1 RUNNABLE了
消费者 消费者1 WAITING了☆
生产者 生产者1 RUNNABLE了
生产者 生产者1 WAITING了★
生产者 生产者2 WAITING了★
消费者 消费者2 RUNNABLE了
消费者 消费者2 WAITING了☆
消费者 消费者1 WAITING了☆
main RUNNABLE
Monitor Ctrl-Break RUNNABLE
生产者1 WAITING
消费者1 WAITING
生产者2 WAITING
消费者2 WAITING
分析:jps/jstack pid 分析线程问题 jdk自带工具
jsp:
jstack 7444:
线程根据等待和唤醒进行通信成功,但是无法不保证唤醒的是非当前线程,比如“生产者”唤醒 “生产者” / “消费者” 唤醒 “消费者”,从而导致了程序最后呈“假死状态”。
解决方法:notify()换成notifyAll().
一生产与多消费:
class MyStack {
private List list = new ArrayList();
synchronized public void push() {
try {
if (list.size() == 1) {
System.out.println("push操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
}
list.add(Math.random());
this.notify();
System.out.println("push = " + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public String pop() {
String returnValue = "";
try {
if (list.size() == 0) {
System.out.println("pop操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
}
returnValue = list.get(0) + " " + Thread.currentThread().getName();
list.remove(0);
this.notify();
System.out.println("pop = " + list.size() + " Mystack的pop方法中 线程" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
class Producer {
private MyStack myStack;
public Producer(MyStack myStack) {
super();
this.myStack = myStack;
}
public void pushService() {
myStack.push();
}
}
class Consumer {
private MyStack myStack;
public Consumer(MyStack myStack) {
super();
this.myStack = myStack;
}
public void popService() {
System.out.println("pop = " + myStack.pop() + " Consumer的popService方法中打印pop返回值");
}
}
class P_Thread extends Thread {
private Producer p;
public P_Thread(Producer p) {
super();
this.p = p;
}
@Override
public void run() {
while (true) {
p.pushService();
}
}
}
class C_Thread extends Thread {
private Consumer c;
public C_Thread(Consumer c) {
super();
this.c = c;
}
@Override
public void run() {
while (true) {
c.popService();
}
}
}
public class Run9_oneP_manyC {
public static void main(String[] args) {
MyStack myStack = new MyStack();
Producer p = new Producer(myStack);
Consumer c1 = new Consumer(myStack);
Consumer c2 = new Consumer(myStack);
Consumer c3 = new Consumer(myStack);
Consumer c4 = new Consumer(myStack);
Consumer c5 = new Consumer(myStack);
P_Thread p_thread = new P_Thread(p);
p_thread.start();
C_Thread c_thread1 = new C_Thread(c1);
C_Thread c_thread2 = new C_Thread(c2);
C_Thread c_thread3 = new C_Thread(c3);
C_Thread c_thread4 = new C_Thread(c4);
C_Thread c_thread5 = new C_Thread(c5);
c_thread1.start();
c_thread2.start();
c_thread3.start();
c_thread4.start();
c_thread5.start();
}
}
结果:
"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.4\lib\idea_rt.jar=51167:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.4\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;C:\Users\zhw\IdeaProjects\java-learning\java-multithread\target\classes" com.brianway.learning.java.multithread.communication.example9.Run9_oneP_manyC
Exception in thread "Thread-1" Exception in thread "Thread-2" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
push = 1
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
push操作中的: Thread-0 线程呈wait状态
pop = 0 Mystack的pop方法中 线程Thread-3
at java.util.ArrayList.get(ArrayList.java:433)
pop = 0.859192167402362 Thread-3 Consumer的popService方法中打印pop返回值
pop操作中的: Thread-2 线程呈wait状态
pop操作中的: Thread-1 线程呈wait状态
at com.brianway.learning.java.multithread.communication.example9.MyStack.pop(Run9_oneP_manyC.java:41)
pop操作中的: Thread-3 线程呈wait状态
push = 1
at com.brianway.learning.java.multithread.communication.example9.Consumer.popService(Run9_oneP_manyC.java:73)
push操作中的: Thread-0 线程呈wait状态
pop = 0 Mystack的pop方法中 线程Thread-5
at com.brianway.learning.java.multithread.communication.example9.C_Thread.run(C_Thread.java:17)
pop = 0.381657285316201 Thread-5 Consumer的popService方法中打印pop返回值
pop操作中的: Thread-4 线程呈wait状态
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
pop操作中的: Thread-5 线程呈wait状态
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.brianway.learning.java.multithread.communication.example9.MyStack.pop(Run9_oneP_manyC.java:41)
at com.brianway.learning.java.multithread.communication.example9.Consumer.popService(Run9_oneP_manyC.java:73)
at com.brianway.learning.java.multithread.communication.example9.C_Thread.run(C_Thread.java:17)
原因:
if判断存在弊端,条件发生改变时没有得到及时的响应,多个呈wait状态的线程被唤醒,继而执行list.remove(0)出现异常java.lang.IndexOutOfBoundsException.
解决办法:
if 改为while, while判断解决条件发生改变时没有得到及时的响应,多个呈wait状态的线程被唤醒的问题,但会出现新的问题:假死,同时需要将notify()换成notifyAll()
class MyStack {
private List list = new ArrayList();
synchronized public void push() {
try {
while (list.size() == 1) {
System.out.println("push操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
}
list.add(Math.random());
this.notifyAll();
System.out.println("push = " + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public String pop() {
String returnValue = "";
try {
while (list.size() == 0) {
System.out.println("pop操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
}
returnValue = list.get(0) + " " + Thread.currentThread().getName();
list.remove(0);
this.notifyAll();
System.out.println("pop = " + list.size() + " Mystack的pop方法中 线程" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
多生产与一消费:notifyAll()代替notify(),将一直运行下去
class MyStack {
private List list = new ArrayList();
synchronized public void push() {
try {
while (list.size() == 1) {
System.out.println("push操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
}
list.add(Math.random());
this.notifyAll();
System.out.println("push = " + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public String pop() {
String returnValue = "";
try {
while (list.size() == 0) {
System.out.println("pop操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
}
returnValue = list.get(0) + " " + Thread.currentThread().getName();
list.remove(0);
this.notifyAll();
System.out.println("pop = " + list.size() + " Mystack的pop方法中 线程" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
class Producer {
private MyStack myStack;
public Producer(MyStack myStack) {
super();
this.myStack = myStack;
}
public void pushService() {
myStack.push();
}
}
class Consumer {
private MyStack myStack;
public Consumer(MyStack myStack) {
super();
this.myStack = myStack;
}
public void popService() {
System.out.println("pop = " + myStack.pop() + " Consumer的popService方法中打印pop返回值");
}
}
class C_Thread extends Thread {
private Consumer c;
public C_Thread(Consumer c) {
super();
this.c = c;
}
@Override
public void run() {
while (true) {
c.popService();
}
}
}
class P_Thread extends Thread {
private Producer p;
public P_Thread(Producer p) {
super();
this.p = p;
}
@Override
public void run() {
while (true) {
p.pushService();
}
}
}
public class test{
public static void main(String[] args) {
MyStack myStack = new MyStack();
int pNum = 6;
Producer[] producers = new Producer[pNum];
P_Thread[] p_threads = new P_Thread[pNum];
for (int i = 0; i < pNum; i++) {
producers[i] = new Producer(myStack);
}
for (int i = 0; i < pNum; i++) {
p_threads[i] = new P_Thread(producers[i]);
p_threads[i].start();
}
Consumer c = new Consumer(myStack);
C_Thread c_thread = new C_Thread(c);
c_thread.start();
}
}
多生产与多消费: 一直运行下去
class MyStack {
private List list = new ArrayList();
synchronized public void push() {
try {
while (list.size() == 1) {
System.out.println("push操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
}
list.add(Math.random());
this.notifyAll();
System.out.println("push = " + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public String pop() {
String returnValue = "";
try {
while (list.size() == 0) {
System.out.println("pop操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
this.wait();
}
returnValue = list.get(0) + " " + Thread.currentThread().getName();
list.remove(0);
this.notifyAll();
System.out.println("pop = " + list.size() + " Mystack的pop方法中 线程" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
class Producer {
private MyStack myStack;
public Producer(MyStack myStack) {
super();
this.myStack = myStack;
}
public void pushService() {
myStack.push();
}
}
class Consumer {
private MyStack myStack;
public Consumer(MyStack myStack) {
super();
this.myStack = myStack;
}
public void popService() {
System.out.println("pop = " + myStack.pop() + " Consumer的popService方法中打印pop返回值");
}
}
class C_Thread extends Thread {
private Consumer c;
public C_Thread(Consumer c) {
super();
this.c = c;
}
@Override
public void run() {
while (true) {
c.popService();
}
}
}
class P_Thread extends Thread {
private Producer p;
public P_Thread(Producer p) {
super();
this.p = p;
}
@Override
public void run() {
while (true) {
p.pushService();
}
}
}
public class test{
public static void main(String[] args) {
MyStack myStack = new MyStack();
int pNum = 6;
Producer[] producers = new Producer[pNum];
P_Thread[] p_threads = new P_Thread[pNum];
for (int i = 0; i < pNum; i++) {
producers[i] = new Producer(myStack);
}
for (int i = 0; i < pNum; i++) {
p_threads[i] = new P_Thread(producers[i]);
p_threads[i].start();
}
int cNum = 8;
Consumer[] consumers = new Consumer[cNum];
C_Thread[] c_threads = new C_Thread[cNum];
for (int i = 0; i < cNum; i++) {
consumers[i] = new Consumer(myStack);
}
for (int i = 0; i < cNum; i++) {
c_threads[i] = new C_Thread(consumers[i]);
c_threads[i].start();
}
}
}
还没有评论,来说两句吧...