Java8异步编程-CompletableFuture 谁践踏了优雅 2021-11-23 11:16 424阅读 0赞 ## 总结 ## **创建无返回异步任务** runAsync **创建有返回异步任务** supplyAsync **异步任务正常执行完或者抛出异常时** whenComplete whenCompleteAsync **异步任务抛出异常时** exceptionally **异步任务串行化,前一个有返回异步任务正常执行完,返回值作为下一个有参有返回异步任务的参数** thenApply thenApplyAysnc **异步任务串行化,前一个有返回异步任务正常执行完或者抛出异常时,返回值作为下一个有参有返回异步任务的参数** handle handleAsync **异步任务串行化,前一个有返回异步任务正常执行完,返回值作为下一个有参无返回异步任务的参数** thenAccept thenAcceptAsync **异步任务串行化,前一个异步任务正常执行完,执行下一个无参无返回的异步任务** thenRun thenRunAsync **整合异步任务,两个异步任务都执行完,把两个异步任务的结果放到一块处理, 有参有返回** thenCombine thenCombineAsync **整合异步任务,两个异步任务都执行完,把两个异步任务的结果放到一块处理, 有参无返回** thenAcceptBoth thenAcceptBothAsync **整合异步任务,哪个返回结果快就使用哪个结果,有参有返回** applyToEither applyToEitherAsync **整合异步任务,哪个返回结果快就使用哪个结果,有参无返回** acceptEither acceptEitherAsync **两个异步任务,任何一个执行完成了都会执行下一步操作,无参无返回** runAfterEither runAfterEitherAsync **两个异步任务,都完成了才会执行下一步操作,无参无返回** runAfterBoth runAfterBothAsync **所有的异步任务都完成** allOf **任意一个异步任务完成** anyOf **获取异步任务结果** get join 区别:get会抛异常, join不会抛异常 ## 创建无返回值的异步任务 ## public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) @FunctionalInterface public interface Runnable { public abstract void run(); } 创建一个无返回值的异步任务, 第一个方法使用默认线程池ForkJoinPool\#commonPool() 第二个方法使用自定义线程池 eg @Test public void testRunAsync() { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("执行异步任务")); future.join(); } ## 创建有返回值的异步任务 ## public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) @FunctionalInterface public interface Supplier<T> { T get(); } 创建一个有返回值的异步任务 第一个方法使用默认线程池ForkJoinPool\#commonPool() 第二个方法使用自定义线程池 eg @Test public void testSupplyAsync() { CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { System.out.println("执行异步任务"); return true }); boolean result = future.join(); System.out.println(result); } ## 异步任务正常执行完成或者发生异常时 ## public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) @FunctionalInterface public interface BiConsumer<T, U> { void accept(T t, U u); } 异步任务正常执行完成或者发生异常时 第一个方法,使用执行异步任务的线程继续执行 第二个方法,使用默认线程池中ForkJoinPool\#commonPool()的线程执行 第三个方法,使用自定义线程池中的线程执行 eg ***可以通过判断第二个参数是否为空,来判断是否发生异常,不过不推荐,推荐使用exceptionally处理异常*** @Test public void testSupplyAsync() { CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { System.out.println("执行异步任务"); if (true) { throw new NullPointerException("测试抛出空指针"); } return true; }).whenCompleteAsync((b, e) -> { System.out.println("异步任务执行完成"); if (Objects.nonNull(e)) { System.out.println("异步任务发生异常:" + e.getMessage()); } }); boolean result = future.join(); System.out.println(result); } ***注意: 这个地方虽然抛出的是NullPointerException, 但是在whenCompleteAsync方法中捕获时,异常已经向上转型成了Throwable,所以这个地方无法调用抛出异常的私有方法*** ## 异步任务抛异常时 ## public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) @FunctionalInterface public interface Function<T, R> { R apply(T t); } 异步任务抛出异常时执行 eg @Test public void testSupplyAsync() { CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { System.out.println("执行异步任务"); if (true) { throw new NullPointerException("测试抛出空指针"); } return true; }).whenCompleteAsync((b, e) -> { System.out.println("异步任务执行完成"); if (Objects.nonNull(e)) { System.out.println("异步任务发生异常:" + e.getMessage()); } }).exceptionally(e -> { System.out.println(e.getMessage()); return false; }); boolean result = future.join(); System.out.println(result); } ## 异步任务串行化, 前一个有返回值的异步任务正常执行完, 返回值作为下一个有返回值的异步任务的参数 ## public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) @FunctionalInterface public interface Function<T, R> { R apply(T t); } 异步任务串行化, 前一个有返回值的异步任务正常执行完, 返回值作为下一个有返回值的异步任务的参数 第一个方法,使用执行异步任务的线程继续执行 第二个方法,使用默认线程池中ForkJoinPool\#commonPool()的线程执行 第三个方法,使用自定义线程池中的线程执行 eg @Test public void testWhenCompele() { CompletableFuture<Boolean> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("异步线程1开始执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步线程1执行结束"); return "线程1"; }).thenApply(str -> { System.out.println(str); return false; }); System.out.println(future1.join()); } ## 异步任务串行化, 前一个有返回值的异步任务正常执行完, 返回值作为下一个无返回值的异步任务的参数 ## public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) @FunctionalInterface public interface Consumer<T> { void accept(T t); } 异步任务串行化, 前一个有返回值的异步任务正常执行完, 返回值作为下一个无返回值的异步任务的参数 第一个方法,使用执行异步任务的线程继续执行 第二个方法,使用默认线程池中ForkJoinPool\#commonPool()的线程执行 第三个方法,使用自定义线程池中的线程执行 eg @Test public void testWhenCompele() { CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("异步线程1开始执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步线程1执行结束"); return "线程1"; }).thenAccept(str -> System.out.println(str)); System.out.println(future1.join()); } ## 异步任务串行化, 前一个有返回值的异步任务正常执行完或者发生异常时, 返回值作为下一个有返回值的异步任务的参数 ## public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) @FunctionalInterface public interface BiFunction<T, U, R> { R apply(T t, U u); } 异步任务串行化, 前一个有返回值的异步任务正常执行完或者发生异常时, 返回值作为下一个有返回值的异步任务的参数 第一个方法,使用执行异步任务的线程继续执行 第二个方法,使用默认线程池中ForkJoinPool\#commonPool()的线程执行 第三个方法,使用自定义线程池中的线程执行 eg @Test public void testWhenCompele() { CompletableFuture<Boolean> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("异步线程1开始执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步线程1执行结束"); return "线程1"; }).handle((str, e) -> { System.out.println(str); if (Objects.nonNull(e)) { System.out.println(e.getMessage()); } return false; }); System.out.println(future1.join()); } ## 异步任务串行化, 前一个有返回值的异步任务正常执行完, 继续执行下一个无参数无返回值异步任务 ## ## 常见API ## **runAsync** 创建无返回值的异步任务, 类似于execute方法 public static CompletableFuture<Void> runAsync(Runnable runnable); public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor); **supplyAsync** 创建有返回值的异步任务, 类似于submit方法 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); 注意: 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码 在 JVM 的后台,使用通用的 fork/join 池,该池是所有并行流共享的。 默认情况,fork/join 池会为每个处理器分配一个线程。假设你有一台16核的机器,这样你就只能创建16个线程。 ForkJoinPool 最适合的是计算密集型的任务 对 CPU 密集型的任务来说,这样是有意义的,因为你的机器确实只能执行16个线程。但是真实情况下,不是所有的任务都是 CPU 密集型的。 **whenComplete** 异步任务执行完成时的回调方法 public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); whenComplete和whenCompleteAsync方法的区别在于:前者是由上面的线程继续执行,而后者是将whenCompleteAsync的任务继续交给线程池去做决定。 **exceptionally** 抛出异常时执行 public CompletableFuture<T> exceptionally (Function<Throwable, ? extends T> fn);
还没有评论,来说两句吧...