并发编程系列之FutureTask源码学习笔记

﹏ヽ暗。殇╰゛Y 2022-09-09 02:23 264阅读 0赞

并发编程系列之FutureTask源码学习笔记

1、什么是FutureTask类?

在上一章节的学习中,我们知道了Future类的基本用法,知道了Future其实就是为了监控线程任务执行的,接着本博客继续学习FutureTask。然后什么是FutureTask类?

Future是1.5版本引入的异步编程的顶层抽象接口,FutureTask则是Future的基础实现类。同时FutureTask还实现了Runnable接口,所以FutureTask也可以作为一个独立的Runnable任务

2、使用FutureTask封装Callable任务

线程中是不能直接传入Callable任务的,所以需要借助FutureTask,FutureTask可以用来封装Callable任务,下面给出一个例子:

  1. package com.example.concurrent.future;
  2. import java.util.Random;
  3. import java.util.concurrent.*;
  4. /** * <pre> * FutureTask例子 * </pre> * <p> * <pre> * @author nicky.ma * 修改记录 * 修改后版本: 修改人: 修改日期: 2021/08/28 18:04 修改内容: * </pre> */
  5. public class FutureTaskExample {
  6. public static void main(String[] args) throws ExecutionException, InterruptedException {
  7. FutureTask futureTask = new FutureTask(new CallableTask());
  8. Thread t = new Thread(futureTask);
  9. t.start();
  10. System.out.println(futureTask.get());
  11. }
  12. static class CallableTask implements Callable<Integer> {
  13. @Override
  14. public Integer call() throws Exception{
  15. Thread.sleep(1000L);
  16. return new Random().nextInt();
  17. }
  18. }
  19. }

3、FutureTask UML类图

翻下FutureTask的源码,可以看出实现了RunnableFuture接口

  1. public class FutureTask<V> implements RunnableFuture<V> {
  2. // ...
  3. }

RunnableFuture接口是怎么样的?可以看出其实是继承了Runnable,Future

  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2. /** * Sets this Future to the result of its computation * unless it has been cancelled. */
  3. void run();
  4. }

在idea里画出FutureTask的uml类图:
在这里插入图片描述
所以,可以说FutureTask本质就是一个Runnable任务

4、FutureTask源码学习

  • FutureTask类属性

    public class FutureTask implements RunnableFuture {

    1. // 状态:存在以下7中状态
    2. private volatile int state;
    3. // 新建
    4. private static final int NEW = 0;
    5. // 任务完成中
    6. private static final int COMPLETING = 1;
    7. // 任务正常完成
    8. private static final int NORMAL = 2;
    9. // 任务异常
    10. private static final int EXCEPTIONAL = 3;
    11. // 任务取消
    12. private static final int CANCELLED = 4;
    13. // 任务中断中
    14. private static final int INTERRUPTING = 5;
    15. // 任务已中断
    16. private static final int INTERRUPTED = 6;
    17. // 支持结果返回的Callable任务
    18. private Callable<V> callable;
    19. // 任务执行结果:包含正常和异常的结果,通过get方法获取
    20. private Object outcome;
    21. // 任务执行线程
    22. private volatile Thread runner;
    23. // 栈结构的等待队列,该节点是栈中的最顶层节点
    24. private volatile WaitNode waiters;

    }

  • 构造方法

    // 传入callable任务
    public FutureTask(Callable callable) {

    1. if (callable == null)
    2. throw new NullPointerException();
    3. this.callable = callable;
    4. this.state = NEW; // ensure visibility of callable

    }

    // 传入runnable任务、结果变量result
    public FutureTask(Runnable runnable, V result) {

    1. this.callable = Executors.callable(runnable, result);
    2. this.state = NEW; // ensure visibility of callable

    }

  • 是一个Runnable任务,run方法实现

    public void run() {

    1. // 两种情况直接返回
    2. // 1:状态不是NEW,说明已经执行过,获取已经取消任务,直接返回
    3. // 2:状态是NEW,将当前执行线程保存在runner字段(runnerOffset)中,如果赋值失败,直接返回
    4. if (state != NEW ||
    5. !UNSAFE.compareAndSwapObject(this, runnerOffset,
    6. null, Thread.currentThread()))
    7. return;
    8. try {
    9. Callable<V> c = callable;
    10. if (c != null && state == NEW) {
    11. V result;
    12. boolean ran;
    13. try {
    14. // 执行了给如的Callable任务
    15. result = c.call();
    16. ran = true;
    17. } catch (Throwable ex) {
    18. result = null;
    19. ran = false;
    20. // 异常的情况,设置异常
    21. setException(ex);
    22. }
    23. if (ran)
    24. // 任务正常执行,设置结果
    25. set(result);
    26. }
    27. } finally {
    28. // runner must be non-null until state is settled to
    29. // prevent concurrent calls to run()
    30. runner = null;
    31. // state must be re-read after nulling runner to prevent
    32. // leaked interrupts
    33. int s = state;
    34. // 任务被中断,执行中断处理
    35. if (s >= INTERRUPTING)
    36. handlePossibleCancellationInterrupt(s);
    37. }

    }

setException方法:

  1. protected void setException(Throwable t) {
  2. // CAS,将状态由NEW改为COMPLETING(中间状态)
  3. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  4. // 返回结果
  5. outcome = t;
  6. // 将状态改为EXCEPTIONAL
  7. UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
  8. finishCompletion();
  9. }
  10. }
  • get获取执行结果

    public V get() throws InterruptedException, ExecutionException {

    1. int s = state;
    2. // 任务还没完成,调用awaitDonw
    3. if (s <= COMPLETING)
    4. s = awaitDone(false, 0L);
    5. // 返回结果
    6. return report(s);
    7. }

get超时的方法

  1. public V get(long timeout, TimeUnit unit)
  2. throws InterruptedException, ExecutionException, TimeoutException {
  3. // unit是时间单位,必须传
  4. if (unit == null)
  5. throw new NullPointerException();
  6. int s = state;
  7. // 超过阻塞时间timeout,抛出TimeoutException
  8. if (s <= COMPLETING &&
  9. (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
  10. throw new TimeoutException();
  11. return report(s);
  12. }

重点看下awaitDone方法:

  1. private int awaitDone(boolean timed, long nanos)
  2. throws InterruptedException {
  3. // 计算截止时间
  4. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  5. WaitNode q = null;
  6. //
  7. boolean queued = false;
  8. // 无限循环,判断条件是否符合
  9. for (;;) {
  10. // 1、线程是否被中断,是的情况,移除节点,同时抛出InterruptedException
  11. if (Thread.interrupted()) {
  12. removeWaiter(q);
  13. throw new InterruptedException();
  14. }
  15. // 2、获取当前状态,如果状态大于COMPLETING
  16. // 说明任务完成了,有可能正常执行完成,也有可能是取消了任务
  17. int s = state;
  18. if (s > COMPLETING) {
  19. if (q != null)
  20. // thread置为null 等待JVM gc
  21. q.thread = null;
  22. //返回结果
  23. return s;
  24. }
  25. //3、如果状态处于中间状态COMPLETING
  26. //表示任务已经结束但是任务执行线程还没来得及给outcome赋值
  27. else if (s == COMPLETING) // cannot time out yet
  28. // 这种情况线程yield让出执行权,给其它线程先执行
  29. Thread.yield();
  30. // 4、如果等待节点为空,则构造一个等待节点
  31. else if (q == null)
  32. q = new WaitNode();
  33. // 5、如果还没有入队列,则把当前节点加入waiters首节点并替换原来waiters
  34. else if (!queued)
  35. queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
  36. q.next = waiters, q);
  37. else if (timed) {
  38. nanos = deadline - System.nanoTime();
  39. //如果需要等待特定时间,则先计算要等待的时间
  40. // 如果已经超时,则删除对应节点并返回对应的状态
  41. if (nanos <= 0L) {
  42. removeWaiter(q);
  43. return state;
  44. }
  45. // 阻塞等待特定时间
  46. LockSupport.parkNanos(this, nanos);
  47. }
  48. else
  49. // 让线程等待,阻塞当前线程
  50. LockSupport.park(this);
  51. }
  52. }
  • cancel取消任务

    public boolean cancel(boolean mayInterruptIfRunning) {

    1. // 如果任务已经结束,则直接返回false

    if (!(state == NEW &&

    1. UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
    2. mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    3. return false;

    try { // in case call to interrupt throws exception

    1. // 需要中断任务的情况
    2. if (mayInterruptIfRunning) {
    3. try {
    4. Thread t = runner;
    5. // 调用线程的interrupt来停止线程
    6. if (t != null)
    7. t.interrupt();
    8. } finally { // final state
    9. // 修改状态为INTERRUPTED
    10. UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
    11. }
    12. }

    } finally {

    1. finishCompletion();

    }
    return true;
    }

finishCompletion方法:

  1. private void finishCompletion() {
  2. // assert state > COMPLETING;
  3. for (WaitNode q; (q = waiters) != null;) {
  4. if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
  5. // 无限循环,遍历waiters列表,唤醒节点中的线程,然后将Callable置为null
  6. for (;;) {
  7. Thread t = q.thread;
  8. if (t != null) {
  9. q.thread = null;
  10. // 唤醒线程
  11. LockSupport.unpark(t);
  12. }
  13. WaitNode next = q.next;
  14. if (next == null)
  15. break;
  16. // 置为null,让JVM gc
  17. q.next = null; // unlink to help gc
  18. q = next;
  19. }
  20. break;
  21. }
  22. }
  23. done();
  24. callable = null; // to reduce footprint
  25. }
  • https://juejin.cn/post/6844904181824749582#heading-11
  • https://www.cnblogs.com/wade-luffy/p/7073827.html

发表评论

表情:
评论列表 (有 0 条评论,264人围观)

还没有评论,来说两句吧...

相关阅读

    相关 FutureTask分析

    1. 可以使用FutureTask来创建一个线程,用来异步执行任务,并且可以保证并发环境下只执行一次(run方法中,通过CAS设置状态,runner指向当前线程来保证),并且

    相关 FutureTask分析

    在之前的章节中,我们提到可以通过`ExecutorService`中定义的submit相关方法向线程池中提交一个任务(`Callable`、`Runnable`),并且获...