【并发编程】FutureTask 源码分析

ゝ一世哀愁。 2022-05-13 11:42 370阅读 0赞

前言

Github:https://github.com/yihonglei/jdk-source-code-reading(java-concurrent)

一 FutureTask实例

  1. package com.jpeony.concurrent.thread;
  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.FutureTask;
  5. /**
  6. * 实现Callable接口,获取线程执行返回值
  7. * @author yihonglei
  8. */
  9. public class MyCallable implements Callable<String> {
  10. /**
  11. * 实现Callable中的call方法
  12. */
  13. public String call() throws Exception {
  14. return "Test Callable";
  15. }
  16. public static void main(String[] args) {
  17. /** 根据MyCallable创建FutureTask对象 */
  18. FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
  19. try {
  20. /** 启动线程 */
  21. new Thread(futureTask).start();
  22. /** 获取线程执行返回值 */
  23. String s = futureTask.get();
  24. /** 打印返回值 */
  25. System.out.println(s);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. } catch (ExecutionException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }

运行结果:

70

成功拿到了线程执行的返回值。

二 FutureTask源码分析

以下从源码角度分析拿到返回值的全过程,首先需要简单了解下Callable和FutureTask的结构。

Callable是一个函数式接口,源码如下:

  1. package java.util.concurrent;
  2. @FunctionalInterface
  3. public interface Callable<V> {
  4. V call() throws Exception;
  5. }

该接口有一个call方法,返回任意类型值。

FutureTask实现RunnableFuture接口,而RunnableFuture继承了Runnable, Future,源码如下:

  1. package java.util.concurrent;
  2. import java.util.concurrent.locks.LockSupport;
  3. public class FutureTask<V> implements RunnableFuture<V> {
  4. ......
  5. }
  6. package java.util.concurrent;
  7. public interface RunnableFuture<V> extends Runnable, Future<V> {
  8. /**
  9. * Sets this Future to the result of its computation
  10. * unless it has been cancelled.
  11. */
  12. void run();
  13. }

所以FutureTask具有Runnable和Future功能,因此,在上面的Demo中,以下代码具有Runable特性:

  1. /** 根据MyCallable创建FutureTask对象 */
  2. FutureTask<String> futureTask = new FutureTask<>(new MyCallable());

创建线程对象,通过start()方法启动线程:

  1. /** 启动线程 */
  2. new Thread(futureTask).start();

start()方法源码如下:

  1. public synchronized void start() {
  2. if (threadStatus != 0)
  3. throw new IllegalThreadStateException();
  4. group.add(this);
  5. boolean started = false;
  6. try {
  7. start0();
  8. started = true;
  9. } finally {
  10. try {
  11. if (!started) {
  12. group.threadStartFailed(this);
  13. }
  14. } catch (Throwable ignore) {
  15. /* do nothing. If start0 threw a Throwable then
  16. it will be passed up the call stack */
  17. }
  18. }
  19. }
  20. // 本地方法
  21. private native void start0();

start()方法最后会调用本地方法,由JVM通知操作系统,创建线程,最后线程通过JVM访问到Runnable中的run()方法。

而FutureTask实现了Runnable的run()方法,看下FutureTask中的run()方法源码:

  1. public void run() {
  2. if (state != NEW ||
  3. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  4. null, Thread.currentThread()))
  5. return;
  6. try {
  7. /**
  8. 这里的callable就是我们创建FutureTask的时候传进来的MyCallable对象,
  9. 该对象实现了Callable接口的call()方法。
  10. */
  11. Callable<V> c = callable;
  12. if (c != null && state == NEW) {
  13. V result;
  14. boolean ran;
  15. try {
  16. /**
  17. 调用Callable的call方法,即调用实现类MyCallable的call()方法,
  18. 执行完会拿到MyCallable的call()方法的返回值“Test Callable”。
  19. */
  20. result = c.call();
  21. ran = true;
  22. } catch (Throwable ex) {
  23. result = null;
  24. ran = false;
  25. setException(ex);
  26. }
  27. if (ran)
  28. /** 将返回值传入到set方法中,这里是能获取线程执行返回值的关键 */
  29. set(result);
  30. }
  31. } finally {
  32. // runner must be non-null until state is settled to
  33. // prevent concurrent calls to run()
  34. runner = null;
  35. // state must be re-read after nulling runner to prevent
  36. // leaked interrupts
  37. int s = state;
  38. if (s >= INTERRUPTING)
  39. handlePossibleCancellationInterrupt(s);
  40. }
  41. }

从run()方法源码可以知道,MyCallabel执行call()方法的返回值被传入到了一个set()方法中,能拿到线程返回值最关键的

就是这个FutureTask的set()方法源码

  1. protected void set(V v) {
  2. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  3. /**
  4. 将MyCallable执行call()方法的返回值传进来赋值给了outcome,
  5. 这个outcome是FutureTask的一个成员变量。
  6. 该变量用于存储线程执行返回值或异常堆栈,通过对应的get()方法获取值。
  7. private Object outcome;
  8. */
  9. outcome = v;
  10. UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  11. finishCompletion();
  12. }
  13. }

到这里,得出一个结论就是,MyCallable执行的call()方法结果通过FutureTask的set()方法存到了成员变量outcome中,

通过我们熟悉的get方法就可以获取到outcome对应赋的值。

在Demo中获取返回值的代码:

  1. /** 获取线程执行返回值 */
  2. String s = futureTask.get();

FutureTask中get()方法的源码:

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. if (s <= COMPLETING)
  4. s = awaitDone(false, 0L);
  5. /** 调用report方法 */
  6. return report(s);
  7. }
  8. private V report(int s) throws ExecutionException {
  9. /** outcome赋值给Object x */
  10. Object x = outcome;
  11. if (s == NORMAL)
  12. /** 返回outcome的值,也就是线程执行run()方法时通过set()方法放进去的MyCallable的call()执行的返回值 */
  13. return (V)x;
  14. if (s >= CANCELLED)
  15. throw new CancellationException();
  16. throw new ExecutionException((Throwable)x);
  17. }

get()方法调用report()方法,report()方法会将outcome赋值并返回,set方法成功拿到返回的outcome,

也就是MyCallable()的call()方法执行结果。到这里,我们大概理解了FutureTask.get()能拿到线程执行返回值的本质原理,

也就基于FutureTask的成员变量outcome(Object)进行的set赋值和get取值的过程。

三 总结

1)FutureTask通过MyCallable创建;

2)new Thread()创建线程,通过start()方法启动线程;

3)执行FutureTask中的run()方法;

4)run()方法中调用了MyCallable中的call()方法,拿到返回值set到FutureTask的outcome(Object)成员变量中;

5)FutureTask通过get方法获取outcome(Object)对象值,从而成功拿到线程执行的返回值;

其实,本质上就是一个基于FutureTask成员变量outcome进行的set和get的过程,饶了一圈而已。

发表评论

表情:
评论列表 (有 0 条评论,370人围观)

还没有评论,来说两句吧...

相关阅读

    相关 FutureTask分析

    1. 可以使用FutureTask来创建一个线程,用来异步执行任务,并且可以保证并发环境下只执行一次(run方法中,通过CAS设置状态,runner指向当前线程来保证),并且

    相关 FutureTask分析

    在之前的章节中,我们提到可以通过`ExecutorService`中定义的submit相关方法向线程池中提交一个任务(`Callable`、`Runnable`),并且获...