谷粒商城项目:异步的相关概念

矫情吗;* 2023-01-10 10:24 209阅读 0赞

1.初始化线程的4种方式

1.1 继承Thread类

  1. public class ThreadTest {
  2. public static void main(String[] args) {
  3. Thread01 thread01 = new Thread01();
  4. thread01.start();
  5. System.out.println("main执行完毕");
  6. }
  7. public static class Thread01 extends Thread{
  8. @Override
  9. public void run() {
  10. System.out.println("异步线程id :" + Thread.currentThread().getId());
  11. }
  12. }
  13. }

1.2 实现Runable接口

  1. public class ThreadTest {
  2. public static void main(String[] args) {
  3. Thread02 thread02 = new Thread02();
  4. new Thread(thread02).start();
  5. System.out.println("main执行完毕");
  6. }
  7. public static class Thread02 implements Runnable{
  8. @Override
  9. public void run() {
  10. System.out.println("异步线程id :" + Thread.currentThread().getId());
  11. }
  12. }
  13. }

1.3 实现Callable接口 + FutureTask

  1. public class ThreadTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. FutureTask<Integer> futureTask = new FutureTask<>(new Thread03());
  4. new Thread(futureTask).start();
  5. Integer integer = futureTask.get();
  6. System.out.println("返回线程id:" + integer);
  7. System.out.println("main执行完毕");
  8. }
  9. public static class Thread03 implements Callable<Integer> {
  10. @Override
  11. public Integer call() throws Exception {
  12. System.out.println("异步线程id :" + Thread.currentThread().getId());
  13. return Math.toIntExact(Thread.currentThread().getId());
  14. }
  15. }
  16. }

1.4线程池

  1. public class ThreadTest {
  2. //创建一个线程池
  3. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. public static void main(String[] args) throws ExecutionException, InterruptedException {
  5. executorService.execute(new Thread02());
  6. System.out.println("main执行完毕");
  7. }
  8. public static class Thread02 implements Runnable{
  9. @Override
  10. public void run() {
  11. System.out.println("异步线程id :" + Thread.currentThread().getId());
  12. }
  13. }
  14. }

1.5 区别

继承Thread类,实现Runable接口没有返回值
实现Callable接口 + FutureTask可以获取返回值
线程池可以控制资源

2.线程池

2.1 线程池的创建

  1. 使用 Executors 工具类(Executors.newFixedThreadPool(10))
  2. 直接创建(new ThreadPoolExecutor)

2.2 线程池参数

  1. corePoolSize:核心线程数。线程池创建好就准备就绪的线程数量,即使空闲也不会释放,除非自主设置
  2. maximumPoolSize:最大线程数量。并发情况下控制资源
  3. keepAliveTime:存活时间。如果线程的空闲时间大于该时间,则释放 maximumPoolSize - corePoolSize 数量的线程
  4. unit:时间单位
  5. BlockingQueue < Runnable > workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里,线程空闲则会去队列里取新的任务
  6. threadFactory:线程的创建工厂
  7. RejectedExecutionHandler handler:拒绝策略。如果队列满了,则按照配置的拒绝策略进行执行

2.3 线程池工作流程

  1. 线程池创建,准备好 core 数量的核心线程,准备接受任务
  2. core 满了,就会将再进来的任务放在阻塞队列里。空闲的core 会自己去阻塞队列里获取任务执行
  3. 阻塞队列满了,就会直接开新线程执行,最大只能有max指定数量的线程
  4. max满了就会用设置的RejectedExecutionHandler拒绝任务
  5. max都执行完成后,有空闲线程,在指定的keepAliveTime时间后,释放maximumPoolSize - corePoolSize 数量的线程
例子:

一个线程池,core = 7,max = 20,queue = 50,假设进入100并发,是怎样分配?

7个请求会立即执行
50个请求会进入队列
线程池再开13个线程处理请求
剩下的30个请求根据配置的拒绝策略进行执行

2.4 线程池种类

  1. Executors.newFixedThreadPool(n):固定大小,当core = max时,所有线程都不可回收
  2. Executors.newCachedThreadPool():core是0,所有线程都可以进行回收
  3. Executors.newSingleThreadScheduledExecutor():单线程的线程池,线程从队列里获取任务,挨个执行
  4. Executors.newScheduledThreadPool(int corePoolSize):定时任务的线程池

3. ComplerableFuture异步编排

3.1 ComplerableFuture类

CompletableFuture类实现了CompletionStage和Future接口。
在这里插入图片描述

Future是Java 5添加的类,用来描述一个异步计算的结果,但是获取一个结果时方法较少,要么通过轮询isDone,确认完成后,调用get()获取值,要么调用get()设置一个超时时间。但是这个get()方法会阻塞住调用线程,这种阻塞的方式显然和我们的异步编程的初衷相违背。

为了解决这个问题,JDK吸收了guava的设计思想,加入了Future的诸多扩展功能形成了CompletableFuture。

3.2 创建异步对象

CompletableFuture类提供了四个静态方法:

  1. public static CompletableFuture<Void> runAsync(Runnable runnable)
  2. public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
  3. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  4. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
区别:
  • runXxx:是没有返回结果
  • supplyXxx:可以获取返回结果
  • Executor executor:表示可以传入自定义的线程池,否则使用默认的
示例:
  1. public class ThreadTest {
  2. //创建一个线程池
  3. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. public static void main(String[] args) throws ExecutionException, InterruptedException {
  5. CompletableFuture.runAsync(() -> {
  6. System.out.println("异步线程id :" + Thread.currentThread().getId());
  7. },executorService);
  8. System.out.println("main执行完毕");
  9. }
  10. public class ThreadTest {
  11. //创建一个线程池
  12. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  13. public static void main(String[] args) throws ExecutionException, InterruptedException {
  14. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  15. System.out.println("异步线程id :" + Thread.currentThread().getId());
  16. return 400;
  17. }, executorService);
  18. Integer integer = future.get();
  19. System.out.println(integer);
  20. System.out.println("main执行完毕");
  21. }
  22. }

3.3 计算完成时回调方法

  1. public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action)
  2. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
  3. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
  4. public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
区别:
  • whenComplete 可以处理正常和异常的计算结果
  • exceptionally 处理异常情况
  • whenComplete:是执行当前任务的线程执行完当前的任务后,继续执行whenComplete的任务
  • whenCompleteAsync:是执行把whenCompleteAsync这个任务继续提交给线程池来进行执行

总结:方法不以Async结尾,意味着 Action 使用相同的线程执行,而以Async结尾则使用其他线程执行(如果是相同的线程池,可能会分配的是同一个线程)

示例:
  1. public class ThreadTest {
  2. //创建一个线程池
  3. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. public static void main(String[] args) throws ExecutionException, InterruptedException {
  5. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("...任务一...");
  7. System.out.println("异步线程id :" + Thread.currentThread().getId());
  8. return 400;
  9. }, executorService).whenComplete((result,excption) -> {
  10. System.out.println("异步任务调用完毕,结果是:" + result);
  11. });
  12. System.out.println("main执行完毕");
  13. }
  14. }

3.4 handle方法

和Complete一样,可以对结果做最后的处理,可改变返回值

  1. public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
  2. public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
  3. public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
示例:
  1. public class ThreadTest {
  2. //创建一个线程池
  3. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. public static void main(String[] args) throws ExecutionException, InterruptedException {
  5. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("...任务一...");
  7. System.out.println("异步线程id :" + Thread.currentThread().getId());
  8. return 400;
  9. }, executorService).handle((result,thr) -> {
  10. if (result == 400){
  11. return result * 2;
  12. }
  13. return result;
  14. });
  15. System.out.println(future.get());
  16. System.out.println("main执行完毕");
  17. }
  18. }

3.5 线程串行话方法

  1. //thenApply方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值
  2. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  3. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  4. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  5. //thenAccept:消费处理结果。接收任务的处理结果,并消费处理,无返回结果
  6. public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
  7. public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
  8. public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
  9. //thenRun:只要上面的任务执行完毕,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作
  10. public CompletableFuture<Void> thenRun(Runnable action)
  11. public CompletableFuture<Void> thenRunAsync(Runnable action)
  12. public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
区别:
  • thenRun:不能获取到上一步的执行结果,无返回值
  • thenAccept:能获取到上一步的执行结果,无返回值
  • thenApply:能获取到上一步的执行结果,有返回值
示例:
  1. public class ThreadTest {
  2. //创建一个线程池
  3. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. public static void main(String[] args) throws ExecutionException, InterruptedException {
  5. CompletableFuture.supplyAsync(() -> {
  6. System.out.println("...任务一...");
  7. System.out.println("异步线程id :" + Thread.currentThread().getId());
  8. return 400;
  9. }, executorService).thenRunAsync(() -> {
  10. System.out.println("...任务二...");
  11. System.out.println("异步线程id :" + Thread.currentThread().getId());
  12. },executorService);
  13. //System.out.println(future.get());
  14. System.out.println("main执行完毕");
  15. }
  16. }
  17. public class ThreadTest {
  18. //创建一个线程池
  19. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  20. public static void main(String[] args) throws ExecutionException, InterruptedException {
  21. CompletableFuture.supplyAsync(() -> {
  22. System.out.println("...任务一...");
  23. System.out.println("异步线程id :" + Thread.currentThread().getId());
  24. return 400;
  25. }, executorService).thenAcceptAsync((result) -> {
  26. System.out.println("...任务二...");
  27. System.out.println("任务一的运行结果:" + result);
  28. System.out.println("异步线程id :" + Thread.currentThread().getId());
  29. },executorService);
  30. //System.out.println(future.get());
  31. System.out.println("main执行完毕");
  32. }
  33. }
  34. public class ThreadTest {
  35. //创建一个线程池
  36. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  37. public static void main(String[] args) throws ExecutionException, InterruptedException {
  38. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  39. System.out.println("...任务一...");
  40. System.out.println("异步线程id :" + Thread.currentThread().getId());
  41. return 400;
  42. }, executorService).thenApplyAsync((result) -> {
  43. System.out.println("...任务二...");
  44. System.out.println("任务一的运行结果:" + result);
  45. System.out.println("异步线程id :" + Thread.currentThread().getId());
  46. return 800;
  47. }, executorService);
  48. System.out.println(future.get());//800
  49. System.out.println("main执行完毕");
  50. }
  51. }

3.6 两任务组合,且都完成

  1. //thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
  2. public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
  3. public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
  4. public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
  5. //thenAcceptBoth:组合两个future,获取两个future的返回结果,然后处理任务,没有返回值
  6. public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
  7. public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
  8. public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
  9. //runAfterBoth:组合两个future,不需要获取两个future的返回结果,只需要两个future处理完任务后处理该任务
  10. public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
  11. public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action)
  12. public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)
示例:
  1. public class ThreadTest {
  2. //创建一个线程池
  3. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. public static void main(String[] args) throws ExecutionException, InterruptedException {
  5. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("...任务一...");
  7. System.out.println("异步线程id :" + Thread.currentThread().getId());
  8. return 401;
  9. }, executorService);
  10. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
  11. System.out.println("...任务二...");
  12. System.out.println("异步线程id :" + Thread.currentThread().getId());
  13. return 402;
  14. }, executorService);
  15. future1.thenAcceptBothAsync(future2,(result01,result02) -> {
  16. System.out.println("future1:" + result01);
  17. System.out.println("future2:" + result02);
  18. System.out.println("...任务三...");
  19. System.out.println("异步线程id :" + Thread.currentThread().getId());
  20. },executorService);
  21. //System.out.println(future.get());//800
  22. System.out.println("main执行完毕");
  23. }
  24. }
  25. public class ThreadTest {
  26. //创建一个线程池
  27. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  28. public static void main(String[] args) throws ExecutionException, InterruptedException {
  29. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
  30. System.out.println("...任务一...");
  31. System.out.println("异步线程id :" + Thread.currentThread().getId());
  32. return 401;
  33. }, executorService);
  34. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
  35. System.out.println("...任务二...");
  36. System.out.println("异步线程id :" + Thread.currentThread().getId());
  37. return 402;
  38. }, executorService);
  39. CompletableFuture<Integer> future = future1.thenCombineAsync(future2, (result01, result02) -> {
  40. System.out.println("future1:" + result01);
  41. System.out.println("future2:" + result02);
  42. System.out.println("...任务三...");
  43. System.out.println("异步线程id :" + Thread.currentThread().getId());
  44. return result01 + result02;
  45. }, executorService);
  46. System.out.println(future.get());
  47. System.out.println("main执行完毕");
  48. }
  49. }
  50. public class ThreadTest {
  51. //创建一个线程池
  52. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  53. public static void main(String[] args) throws ExecutionException, InterruptedException {
  54. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
  55. System.out.println("...任务一...");
  56. System.out.println("异步线程id :" + Thread.currentThread().getId());
  57. return 401;
  58. }, executorService);
  59. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
  60. System.out.println("...任务二...");
  61. System.out.println("异步线程id :" + Thread.currentThread().getId());
  62. return 402;
  63. }, executorService);
  64. future1.runAfterBothAsync(future2, () -> {
  65. System.out.println("...任务三...");
  66. System.out.println("异步线程id :" + Thread.currentThread().getId());
  67. }, executorService);
  68. System.out.println("main执行完毕");
  69. }
  70. }

3.7 两任务组合,只需一个完成

  1. // applyToEitherAsync:感知结果,自己有返回值
  2. public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
  3. public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
  4. public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
  5. //acceptEitherAsync:感知结果,自己没有返回值
  6. public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
  7. public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
  8. public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
  9. //runAfterEitherAsync:不感知结果,自己没有返回值
  10. public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
  11. public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
  12. public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)

3.8 多任务组合

  1. //等待所有任务完成
  2. public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
  3. //只要有一个任务完成
  4. public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
示例:
  1. public class ThreadTest {
  2. //创建一个线程池
  3. public static ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. public static void main(String[] args) throws ExecutionException, InterruptedException {
  5. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("...任务一...");
  7. System.out.println("异步线程id :" + Thread.currentThread().getId());
  8. return 401;
  9. }, executorService);
  10. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
  11. System.out.println("...任务二...");
  12. System.out.println("异步线程id :" + Thread.currentThread().getId());
  13. return 402;
  14. }, executorService);
  15. //进行任务组合
  16. CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
  17. //等待组合任务全部完成
  18. allOf.get();
  19. System.out.println("main执行完毕");
  20. }
  21. }

发表评论

表情:
评论列表 (有 0 条评论,209人围观)

还没有评论,来说两句吧...

相关阅读