谷粒商城项目:异步的相关概念
1.初始化线程的4种方式
1.1 继承Thread类
public class ThreadTest {
public static void main(String[] args) {
Thread01 thread01 = new Thread01();
thread01.start();
System.out.println("main执行完毕");
}
public static class Thread01 extends Thread{
@Override
public void run() {
System.out.println("异步线程id :" + Thread.currentThread().getId());
}
}
}
1.2 实现Runable接口
public class ThreadTest {
public static void main(String[] args) {
Thread02 thread02 = new Thread02();
new Thread(thread02).start();
System.out.println("main执行完毕");
}
public static class Thread02 implements Runnable{
@Override
public void run() {
System.out.println("异步线程id :" + Thread.currentThread().getId());
}
}
}
1.3 实现Callable接口 + FutureTask
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new Thread03());
new Thread(futureTask).start();
Integer integer = futureTask.get();
System.out.println("返回线程id:" + integer);
System.out.println("main执行完毕");
}
public static class Thread03 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("异步线程id :" + Thread.currentThread().getId());
return Math.toIntExact(Thread.currentThread().getId());
}
}
}
1.4线程池
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
executorService.execute(new Thread02());
System.out.println("main执行完毕");
}
public static class Thread02 implements Runnable{
@Override
public void run() {
System.out.println("异步线程id :" + Thread.currentThread().getId());
}
}
}
1.5 区别
继承Thread类,实现Runable接口没有返回值
实现Callable接口 + FutureTask可以获取返回值
线程池可以控制资源
2.线程池
2.1 线程池的创建
- 使用 Executors 工具类(Executors.newFixedThreadPool(10))
- 直接创建(new ThreadPoolExecutor)
2.2 线程池参数
- corePoolSize:核心线程数。线程池创建好就准备就绪的线程数量,即使空闲也不会释放,除非自主设置
- maximumPoolSize:最大线程数量。并发情况下控制资源
- keepAliveTime:存活时间。如果线程的空闲时间大于该时间,则释放 maximumPoolSize - corePoolSize 数量的线程
- unit:时间单位
- BlockingQueue < Runnable > workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里,线程空闲则会去队列里取新的任务
- threadFactory:线程的创建工厂
- RejectedExecutionHandler handler:拒绝策略。如果队列满了,则按照配置的拒绝策略进行执行
2.3 线程池工作流程
- 线程池创建,准备好 core 数量的核心线程,准备接受任务
- core 满了,就会将再进来的任务放在阻塞队列里。空闲的core 会自己去阻塞队列里获取任务执行
- 阻塞队列满了,就会直接开新线程执行,最大只能有max指定数量的线程
- max满了就会用设置的RejectedExecutionHandler拒绝任务
- max都执行完成后,有空闲线程,在指定的keepAliveTime时间后,释放maximumPoolSize - corePoolSize 数量的线程
例子:
一个线程池,core = 7,max = 20,queue = 50,假设进入100并发,是怎样分配?
7个请求会立即执行
50个请求会进入队列
线程池再开13个线程处理请求
剩下的30个请求根据配置的拒绝策略进行执行
2.4 线程池种类
- Executors.newFixedThreadPool(n):固定大小,当core = max时,所有线程都不可回收
- Executors.newCachedThreadPool():core是0,所有线程都可以进行回收
- Executors.newSingleThreadScheduledExecutor():单线程的线程池,线程从队列里获取任务,挨个执行
- Executors.newScheduledThreadPool(int corePoolSize):定时任务的线程池
3. ComplerableFuture异步编排
3.1 ComplerableFuture类
CompletableFuture类实现了CompletionStage和Future接口。
Future是Java 5添加的类,用来描述一个异步计算的结果,但是获取一个结果时方法较少,要么通过轮询isDone,确认完成后,调用get()获取值,要么调用get()设置一个超时时间。但是这个get()方法会阻塞住调用线程,这种阻塞的方式显然和我们的异步编程的初衷相违背。
为了解决这个问题,JDK吸收了guava的设计思想,加入了Future的诸多扩展功能形成了CompletableFuture。
3.2 创建异步对象
CompletableFuture类提供了四个静态方法:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
区别:
- runXxx:是没有返回结果
- supplyXxx:可以获取返回结果
- Executor executor:表示可以传入自定义的线程池,否则使用默认的
示例:
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.runAsync(() -> {
System.out.println("异步线程id :" + Thread.currentThread().getId());
},executorService);
System.out.println("main执行完毕");
}
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 400;
}, executorService);
Integer integer = future.get();
System.out.println(integer);
System.out.println("main执行完毕");
}
}
3.3 计算完成时回调方法
public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
区别:
- whenComplete 可以处理正常和异常的计算结果
- exceptionally 处理异常情况
- whenComplete:是执行当前任务的线程执行完当前的任务后,继续执行whenComplete的任务
- whenCompleteAsync:是执行把whenCompleteAsync这个任务继续提交给线程池来进行执行
总结:方法不以Async结尾,意味着 Action 使用相同的线程执行,而以Async结尾则使用其他线程执行(如果是相同的线程池,可能会分配的是同一个线程)
示例:
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务一...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 400;
}, executorService).whenComplete((result,excption) -> {
System.out.println("异步任务调用完毕,结果是:" + result);
});
System.out.println("main执行完毕");
}
}
3.4 handle方法
和Complete一样,可以对结果做最后的处理,可改变返回值
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
示例:
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务一...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 400;
}, executorService).handle((result,thr) -> {
if (result == 400){
return result * 2;
}
return result;
});
System.out.println(future.get());
System.out.println("main执行完毕");
}
}
3.5 线程串行话方法
//thenApply方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
//thenAccept:消费处理结果。接收任务的处理结果,并消费处理,无返回结果
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
//thenRun:只要上面的任务执行完毕,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
区别:
- thenRun:不能获取到上一步的执行结果,无返回值
- thenAccept:能获取到上一步的执行结果,无返回值
- thenApply:能获取到上一步的执行结果,有返回值
示例:
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("...任务一...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 400;
}, executorService).thenRunAsync(() -> {
System.out.println("...任务二...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
},executorService);
//System.out.println(future.get());
System.out.println("main执行完毕");
}
}
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("...任务一...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 400;
}, executorService).thenAcceptAsync((result) -> {
System.out.println("...任务二...");
System.out.println("任务一的运行结果:" + result);
System.out.println("异步线程id :" + Thread.currentThread().getId());
},executorService);
//System.out.println(future.get());
System.out.println("main执行完毕");
}
}
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务一...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 400;
}, executorService).thenApplyAsync((result) -> {
System.out.println("...任务二...");
System.out.println("任务一的运行结果:" + result);
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 800;
}, executorService);
System.out.println(future.get());//800
System.out.println("main执行完毕");
}
}
3.6 两任务组合,且都完成
//thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
//thenAcceptBoth:组合两个future,获取两个future的返回结果,然后处理任务,没有返回值
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
//runAfterBoth:组合两个future,不需要获取两个future的返回结果,只需要两个future处理完任务后处理该任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)
示例:
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务一...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 401;
}, executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务二...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 402;
}, executorService);
future1.thenAcceptBothAsync(future2,(result01,result02) -> {
System.out.println("future1:" + result01);
System.out.println("future2:" + result02);
System.out.println("...任务三...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
},executorService);
//System.out.println(future.get());//800
System.out.println("main执行完毕");
}
}
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务一...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 401;
}, executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务二...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 402;
}, executorService);
CompletableFuture<Integer> future = future1.thenCombineAsync(future2, (result01, result02) -> {
System.out.println("future1:" + result01);
System.out.println("future2:" + result02);
System.out.println("...任务三...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return result01 + result02;
}, executorService);
System.out.println(future.get());
System.out.println("main执行完毕");
}
}
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务一...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 401;
}, executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务二...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 402;
}, executorService);
future1.runAfterBothAsync(future2, () -> {
System.out.println("...任务三...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
}, executorService);
System.out.println("main执行完毕");
}
}
3.7 两任务组合,只需一个完成
// applyToEitherAsync:感知结果,自己有返回值
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
//acceptEitherAsync:感知结果,自己没有返回值
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
//runAfterEitherAsync:不感知结果,自己没有返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
3.8 多任务组合
//等待所有任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
//只要有一个任务完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
示例:
public class ThreadTest {
//创建一个线程池
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务一...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 401;
}, executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("...任务二...");
System.out.println("异步线程id :" + Thread.currentThread().getId());
return 402;
}, executorService);
//进行任务组合
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
//等待组合任务全部完成
allOf.get();
System.out.println("main执行完毕");
}
}
还没有评论,来说两句吧...