接口ExecutorService的方法使用总结 た 入场券 2022-02-24 12:50 203阅读 0赞 ### 1.在ThreadPoolExecutor中使用ExecutorService中的方法 ### 方法invokeAny() 和 invokeAll() 具有阻塞特性。 方法invokeAny()取得第一个完成任务的结果值,当第一个任务执行完成后,会调用interrupt()方法将其他任务中断,所以在这些任务中可以结合if(Thread.currentThread().isInterrupted() == true) 代码来决定任务是否继续运行。 方法invokeAll() 等全部线程任务执行完毕后,取得全部完成任务的结果值。 ### 2.方法invokeAny(Collection tasks) 的使用与interrupt ### invokeAny() 是取得第一个完成任务的结果值,在这个过程中出现两种情况: **2.1 无if(Thread.currentThread().isInterrupted())代码:已经获得第一个运行的结果值后,其他线程继续运行。** **2.2 有if(Thread.currentThread().isInterrupted())代码:已经获得第一个运行的结果值后,其他线程如果使用throw new InterruptedExeception()代码则这些线程中断,虽然throw抛出了异常,但在main线程中并不能捕获异常。如果想捕获异常,则需要在Callable中使用try-catch显式进行捕获。** package mycallable; import java.util.concurrent.Callable; public class MyCallableA implements Callable<String> { public String call() throws Exception { System.out.println("MyCallableA begin " + System.currentTimeMillis()); for (int i = 0; i < 123456; i++) { Math.random(); Math.random(); Math.random(); System.out.println("MyCallableA " + (i + 1)); } System.out.println("MyCallableA end " + System.currentTimeMillis()); return "returnA"; } } package mycallable; import java.util.concurrent.Callable; public class MyCallableB1 implements Callable<String> { public String call() throws Exception { System.out.println("MyCallableB begin " + System.currentTimeMillis()); for (int i = 0; i < 223456; i++) { Math.random(); Math.random(); Math.random(); System.out.println("MyCallableB " + (i + 1)); } System.out.println("MyCallableB end " + System.currentTimeMillis()); return "returnB"; } } package mycallable; import java.util.concurrent.Callable; public class MyCallableB2 implements Callable<String> { public String call() throws Exception { System.out.println("MyCallableB begin " + System.currentTimeMillis()); for (int i = 0; i < 223456; i++) { if (Thread.currentThread().isInterrupted() == false) { Math.random(); Math.random(); Math.random(); System.out.println("MyCallableB " + (i + 1)); } else { System.out.println("***********抛出异常中断了"); throw new InterruptedException(); } } System.out.println("MyCallableB end " + System.currentTimeMillis()); return "returnB"; } } package test.run; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import mycallable.MyCallableA; import mycallable.MyCallableB1; public class Run1 { public static void main(String[] args) { try { List list = new ArrayList(); list.add(new MyCallableA()); list.add(new MyCallableB1()); ExecutorService executor = Executors.newCachedThreadPool(); // invokeAny只取得最先完成任务的结果值 String getValueA = executor.invokeAny(list); System.out.println("============" + getValueA); System.out.println("ZZZZZZZZZZZZZZZZ"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } package test.run; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import mycallable.MyCallableA; import mycallable.MyCallableB2; public class Run2 { public static void main(String[] args) { try { List list = new ArrayList(); list.add(new MyCallableA()); list.add(new MyCallableB2()); ExecutorService executor = Executors.newCachedThreadPool(); // invokeAny只取得最先完成任务的结果值 String getValueA = executor.invokeAny(list); System.out.println("============" + getValueA); System.out.println("ZZZZZZZZZZZZZZZZ"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } **2.3 run1执行体中,虽然方法invokeAny()已经取得returnA的值,但线程B还在继续运行中,直到运行完毕。** **2.4 run2执行体中,线程A执行完毕后,线程池将线程B设置为中断interrupte状态,而线程B可以自定义对中断interrupte状态进行处理,也就是可以决定是否使用Thread.currentThread().isInterrupted()结合throw new InterruptedExeception()的代码。如果使用Thread.currentThread().isInterrupted()结合throw new InterruptedExeception()的代码说明对线程B进行中断的意图更加明确。但invokeAny只取得最先完成任务的结果值。** ### 3.方法invokeAny()与执行快/慢的任务异常 ### 在快的任务优先执行完毕后,执行慢的任务出现异常时,默认情况下不会在控制台输出异常信息。如果显式使用try-catch语句块则可以自定义捕获异常 例子中该种情况成功取得returnA字符串,线程B中断了,但抛出的空指针异常却没有在控制台输出。如果想要在Callable中捕获异常信息, 则需要显式的添加try-catch语句块.同理快的情况。 ### 4.方法invokeAny(CollectionTasks,timeout,timeUnit) 超时的测试 ### 方法<T> T 方法invokeAny(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit)的主要作用就是在指定时间内取得第一个先执行完任务的结果值。 package mycallable; import java.util.concurrent.Callable; public class MyCallableA implements Callable<String> { public String call() throws Exception { System.out.println("MyCallableA begin " + System.currentTimeMillis()); for (int i = 0; i < 1234567; i++) { Math.random(); Math.random(); Math.random(); Math.random(); Math.random(); Math.random(); Math.random(); Math.random(); Math.random(); Math.random(); System.out.println("MyCallableA i=" + (i + 1)); } System.out.println("MyCallableA end " + System.currentTimeMillis()); return "MyCallableAValue"; } } package test.run; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import mycallable.MyCallableA; public class Run { public static void main(String[] args) { try { MyCallableA a = new MyCallableA(); List callableList = new ArrayList(); callableList.add(a); ExecutorService service = Executors.newCachedThreadPool(); String getValue = service.invokeAny(callableList, 1, TimeUnit.SECONDS); System.out.println("=============" + getValue); System.out.println("zzzzzzzzzzzzzzzz"); } catch (InterruptedException e) { System.out.println("进入catch InterruptedException"); e.printStackTrace(); } catch (ExecutionException e) { System.out.println("进入catch ExecutionException"); e.printStackTrace(); } catch (TimeoutException e) { System.out.println("进入catch TimeoutException 超时了"); e.printStackTrace(); } } } 在出现超时异常时,可以将if(Thread.currentThread().isInterrupted() == true)判断和throw new InterruptedExeception()结合以使线程中断执行。 ### 5.方法invokeAll(Collection tasks)全正确(方法具有阻塞性) ### 方法方法invokeAll()会返回所有任务的执行结果,并且此方法执行的效果也是阻塞执行的,要把所有的结果都取回时再继续向下运行。 package extthread; import java.util.concurrent.Callable; public class CallableA implements Callable<String> { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + " begin " + System.currentTimeMillis()); Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + " end " + System.currentTimeMillis()); return "returnA"; } } package extthread; import java.util.concurrent.Callable; public class CallableB implements Callable<String> { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + " begin " + System.currentTimeMillis()); Thread.sleep(8000); System.out.println(Thread.currentThread().getName() + " end " + System.currentTimeMillis()); return "returnB"; } } package test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import extthread.CallableA; import extthread.CallableB; public class Run { public static void main(String[] args) { try { CallableA callableA = new CallableA(); CallableB callableB = new CallableB(); List<Callable<String>> list = new ArrayList<Callable<String>>(); list.add(callableA); list.add(callableB); ExecutorService service = Executors.newCachedThreadPool(); System.out.println("invokeAll begin " + System.currentTimeMillis()); List<Future<String>> listFuture = service.invokeAll(list); System.out.println("invokeAll end " + System.currentTimeMillis()); for (int i = 0; i < listFuture.size(); i++) { System.out.println("返回的结果=" + listFuture.get(i).get() + " " + System.currentTimeMillis()); } } catch (InterruptedException e) { e.printStackTrace(); System.out.println("mainA"); } catch (ExecutionException e) { e.printStackTrace(); System.out.println("mainB"); } } } ### 6.方法invokeAll(Collection tasks,long timeout,TimeUnit timeUnit)对异常的测试 ### ## 总结:接口ExecutorService中的方法都以便携的方式去创建线程池,使用两个主要的方法invokeAny()和invokeAll()来取得第一个首先执行完任务的结果值,以及全部任务的结果值。 ##
还没有评论,来说两句吧...