Java并发编程——CompletableFuture类 不念不忘少年蓝@ 2022-09-01 07:59 144阅读 0赞 ## 从Future接口开始 ## java.util.concurrent.Future接口是Java 5添加的类,用来描述一个异步计算的结果。可以使用该接口的isDone()方法检查计算是否完成,或者使用get()阻塞住调用线程,直到计算完成返回结果,也可以使用cancel()方法停止任务的执行。 ExecutorService es = Executors.newFixedThreadPool(10); Future<Integer> f = es.submit(() ->{ // 长时间的异步计算 // …… // 然后返回结果 return 100; }); // while(!f.isDone()) //阻塞式拿结果,拿不到就一直在这一行等待 f.get(); 虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果。 很多框架或库,采用回调的方式实现异步编程。比如Netty,自己扩展了Java的Future接口,提供了addListener等多个扩展方法: ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { // SUCCESS } else { // FAILURE } } }); Google guava也提供了通用的ListenableFuture、SettableFuture以及辅助类Futures等,方便异步编程。 ExecutorService es = Executors.newFixedThreadPool(10); final String name = "abc"; ListenableFuture<Result> future = service.query(name); future.addListener(new Runnable() { public void run() { // do Something } }, es); 正统的JDK,终于在1.8中增强了自身的功能。 新增加了一个包含50个方法左右的类: CompletableFuture,提供了强大的Future扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,提供了通过回调的方式处理计算结果的能力,并且提供了转换和组合CompletableFuture的方法。它的灵活性和更强大的功能是Future无法比拟的。 ## CompletableFuture的创建 ## CompletableFuture类实现了CompletionStage和Future接口,所以你还是可以像以前一样通过阻塞或者轮询的方式获得结果,尽管这种方式不推荐使用。 ### 使用默认线程池创建 ### CompletableFuture<String> future = new CompletableFuture<>(); 默认使用ForkJoinPool.commonPool(),这个commonPool是一个会被很多任务共享的线程池,比如同一JVM上的所有CompletableFuture、parallelStream都将共享commonPool,commonPool设计时的目标场景是运行非阻塞的CPU密集型任务,为最大化利用CPU,其线程数默认为CPU数量-1。查看java.util.concurrent.ForkJoinPool\#makeCommonPool这个方法的源码,可以知道commonPool线程池在创建时,使用的并行度、threadFactory、exceptionHandler、阻塞队列等参数。 ### 使用自定义线程池 ### ExecutorService threadPool = new ThreadPoolExecutor( 50, 100, 50L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(50), new ThreadFactoryBuilder().setNameFormat("CustomThreadPool-%d").build(), new ThreadPoolExecutor.DiscardPolicy()); CompletableFuture.runAsync(() -> System.out.println("只是一个线程而已"), threadPool); 以下四个静态方法也可以用来为一段异步执行的代码创建CompletableFuture对象,第二个参数executor用于传入自定义的线程池。 public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) runAsync方法以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。 supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture可以返回计算结果,结果类型为U。 ## 使用CompletableFuture ## 自定义类去模拟一个运行时间长的任务。 class MyTask { private final int number; private final int duration; public MyTask(int number, int duration) { this.number = number; this.duration = duration; } public int calculate() { System.out.println(Thread.currentThread().getName()); try { Thread.sleep(duration * 1000); } catch (final InterruptedException e) { throw new RuntimeException(e); } return number; } @Override public String toString() { return "MyTask{" + "number=" + number + ", duration=" + duration + '}'; } } 创建10个任务,每个持续1秒,如何避免长时间等待,快速执行这10个任务呢?最容易想到的是parallelStream,如下面。 @Test public void testParallel() { List<MyTask> taskList = IntStream.range(0, 10) .mapToObj(i -> new MyTask(i, 1)) .collect(toList()); StopWatch stopWatch = new StopWatch(); stopWatch.start(); List<Integer> result = taskList.parallelStream() .map(MyTask::calculate) .collect(toList()); System.out.printf("Processed %d tasks in %d millis\n", taskList.size(), stopWatch.getTime()); System.out.println(result); } 打印结果如下,10个任务总共耗时2015ms,用的线程池是ForkJoinPool.commonPool(),用了7个ForkJoinPool线程和1个主线程。 ForkJoinPool.commonPool-worker-6 ForkJoinPool.commonPool-worker-4 main ForkJoinPool.commonPool-worker-7 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-5 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-4 Processed 10 tasks in 2015 millis [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 不使用parallelStream,改用CompletableFuture来执行看效果如何。 @Test public void testCompletableFutureSupplyAsync() { List<MyTask> taskList = IntStream.range(0, 10) .mapToObj(i -> new MyTask(i, 1)) .collect(toList()); StopWatch stopWatch = new StopWatch(); stopWatch.start(); List<Integer> result = taskList.stream() .map(item -> CompletableFuture.supplyAsync(item::calculate)) .collect(toList()).stream().map(CompletableFuture::join) .collect(toList()); //main线程 System.out.printf("Processed task in %d millis\n", stopWatch.getTime()); System.out.println(result); } 获取到CompletableFuture集合后,然后在每个future上调用join方法去等待他们逐一执行完。注意,join方法类似于get方法,唯一的不通点是前者不会抛出任何的受检查异常,join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别,所以在lambda表达式中更方便一些。 输出结果如下,耗时2014ms,从耗时上看没有什么特殊的优势,但这一次用了7个ForkJoinPool线程(其中1、2、3号线程被复用),与parallelStream不同的是,主线程没有被用到。 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-4 ForkJoinPool.commonPool-worker-6 ForkJoinPool.commonPool-worker-5 ForkJoinPool.commonPool-worker-7 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-1 Processed task in 2014 millis [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 使用带有自定义Executor的CompletableFuture private ExecutorService threadPool = new ThreadPoolExecutor( 10, 50, 50L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(50), new ThreadFactoryBuilder().setNameFormat("CustomThreadPool-%d").build(), new ThreadPoolExecutor.DiscardPolicy()); @Test public void testCompletableFutureThreadPool() { List<MyTask> taskList = IntStream.range(0, 10) .mapToObj(i -> new MyTask(i, 1)) .collect(toList()); StopWatch stopWatch = new StopWatch(); stopWatch.start(); List<Integer> result = taskList.stream() .map(item -> CompletableFuture.supplyAsync(item::calculate, threadPool)) .collect(toList()).stream().map(CompletableFuture::join) .collect(toList()); //main线程 System.out.printf("Processed task in %d millis\n", stopWatch.getTime()); System.out.println(result); } CompletableFuture比parallelStream优点之一是你可以指定自定义的Executor去处理他们的任务。以上10个任务是CPU密集型的任务,无I/O处理。自定义一个corePoolSize为10的线程池,供CompletableFuture使用,运行效果如下。 CustomThreadPool-0 CustomThreadPool-1 CustomThreadPool-2 CustomThreadPool-3 CustomThreadPool-4 CustomThreadPool-5 CustomThreadPool-6 CustomThreadPool-7 CustomThreadPool-8 CustomThreadPool-9 Processed task in 1012 millis [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 改进之后,它花费了1012ms去处理这10个任务。 对一个包含多个任务的集合进行异步并行处理有两种方式:并行流和CompletableFuture。CompletableFuture更加的灵活,我们可以配置线程池的大小确保整体的计算不会因为等待I/O而发生阻塞。《Java并发编程实战》书上给出的建议是:如果要执行的任务是计算密集型的并且没有IO操作,推荐parallelStream,因为实现简单效率也高,各个线程都很忙碌(运行状态),没有必要创建比核数更多的线程。如果要执行的任务涉及到I/O或网络等操作,CompletableFuture灵活性更好,因为大部分线程处于等待状态,需要将它们利用起来,让它们更加忙碌,并且在逻辑中加入异常处理可以更有效的监控是什么原因触发了等待。 CompletableFuture由于回调风格的实现,使用者不必因为等待一个计算完成而阻塞着调用线程,而是告诉CompletableFuture当计算完成的时候请执行某个function。而且我们还可以将这些操作串联起来,或者将CompletableFuture组合起来。 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 上面这一组函数的功能是当原来的CompletableFuture计算完后,将结果传递给函数fn,将fn的结果作为新的CompletableFuture计算结果。因此它的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>。 这三个函数的区别和上面介绍的一样,不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。Java的CompletableFuture类总是遵循这样的原则。 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 100; }); CompletableFuture<String> f = future.thenApplyAsync(i -> i * 10) .thenApply(i -> i.toString()); System.out.println(f.get()); //"1000" 就是在CompletableFuture执行完成之后(done)再执行指定所的方法。这些动作并不是马上执行的,也不会阻塞,而是在前一个stage完成后继续执行。 ## 参考文档 ## https://segmentfault.com/a/1190000039721242 Java 8: Definitive guide to CompletableFuture https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html
还没有评论,来说两句吧...