转载
  • 发表于 2019年09月12日
  • 浏览 (416)
  • 评论 (0)

JDK源码分析-FutureTask

概述

FutureTask 是一个可取消的、异步执行任务的类, 它的继承结构如下:

JDK源码分析-FutureTask

它实现了 RunnableFuture 接口,而该接口又继承了 Runnable 接口和 Future 接口,因此 FutureTask 也具有这两个接口所定义的特征。 Future Task 的主要功能:

1. 异步执行任务,并且任务只执行一次;

2. 监控任务是否完成、 取消任务

3. 获取任务执行结果。

下面分析其代码实现。

代码分析

分析 FutureTask 的代码之前,先看下它实现的接口。 RunnableFuture 接口定义如下:


 

public interface RunnableFuture<V> extends Runnable, Future<V> {

/**

* Sets this Future to the result of its computation

* unless it has been cancelled.

*/

void run();

}

R unnableFuture  接口继承了 Runnable 接口和 Future 接口,而  Runnable 接口只有一个 run 方法 ,这里不再赘述。下面分析 Future 接口。

Future 接口

Future 接口方法定义如下:

JDK源码分析-FutureTask

主要方法分析:


 

/*

* 尝试取消执行任务。若任务已完成、已取消,或者由于其他某些原因无法取消,则尝试失败。

* 若成功,且调用该方法时任务未启动,则此任务不会再运行;

* 若任务已启动,则根据参数 mayInterruptIfRunning 决定是否中断该任务。

*/

boolean cancel(boolean mayInterruptIfRunning);


// 若该任务正常结束之前被取消,则返回 true

boolean isCancelled();


/*

* 若该任务已完成,则返回 true

* 这里的“完成”,可能是由于正常终止、异常,或者取消,这些情况都返回 true

*/

boolean isDone();


// 等待计算完成(如果需要),然后获取结果

V get() throws InterruptedException, ExecutionException;


// 如果需要,最多等待计算完成的给定时间,然后检索其结果(如果可用)

// PS: 该方法与前者的区别在于加了超时等待

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

FutureTask 代码分析

任务的状态变量:


 

// 任务的状态

private volatile int state;

private static final int NEW = 0;

private static final int COMPLETING = 1;

private static final int NORMAL = 2;

private static final int EXCEPTIONAL = 3;

private static final int CANCELLED = 4;

private static final int INTERRUPTING = 5;

private static final int INTERRUPTED = 6;

其中 state 表示任务的状态,总共有 7 种,它们之间的状态转换可能有以下 4 种情况:

1. 任务执行正常:NEW -> COMPLETING -> NORMAL

2. 任务执行异常:NEW -> COMPLETING -> EXCEPTIONAL

3. 任务取消:NEW -> CANCELLED

4. 任务中断:NEW -> INTERRUPTING -> INTERRUPTED

示意图:

JDK源码分析-FutureTask

在分析其他成员变量之前,先看一个 内部嵌套类 WaitNode:


 

static final class WaitNode {

volatile Thread thread;

volatile WaitNode next;

WaitNode() { thread = Thread.currentThread(); }

}

代码比较简 单,就是对 Thread 的封装,可以理解为单链表的节点。

其他成员变量:


 

/** The underlying callable; nulled out after running */

// 提交的任务

private Callable<V> callable;


/** The result to return or exception to throw from get() */

// get() 方法返回的结果(或者异常)

private Object outcome; // non-volatile, protected by state reads/writes


/** The thread running the callable; CASed during run() */

// 执行任务的线程

private volatile Thread runner;


/** Treiber stack of waiting threads */

// 等待线程的 Treiber 栈

private volatile WaitNode waiters;

其中 waiters 是一个 Treiber 栈,简单来说,就是由单链表组成的线程安全的栈,如图所示:

JDK源码分析-FutureTask

构造器


 

// 创建一个 FutureTask 对象,在运行时将执行给定的 Callable

public FutureTask(Callable<V> callable) {

if (callable == null)

throw new NullPointerException();

this.callable = callable;

this.state = NEW; // ensure visibility of callable

}


// 创建一个 FutureTask,在运行时执行给定的 Runnable,

// 并安排 get 将在成功完成时返回给定的结果

public FutureTask(Runnable runnable, V result) {

this.callable = Executors.callable(runnable, result);

this.state = NEW; // ensure visibility of callable

}

这两个构造器分别传入 Callable 对象和 Runnable 对象(适配为 Callable 对象),然后将其状态初始化为 NEW。

run: 执行任务


 

public void run() {

// 使用 CAS 进行并发控制,防止任务被执行多次

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset,

null, Thread.currentThread()))

return;

try {

Callable<V> c = callable;

if (c != null && state == NEW) {

V result;

boolean ran;

try {

// 调用 Callable 的 call 方法执行任务

result = c.call();

ran = true;

} catch (Throwable ex) {

// 异常处理

result = null;

ran = false;

setException(ex);

}

// 正常处理

if (ran)

set(result);

}

} finally {

// runner must be non-null until state is settled to

// prevent concurrent calls to run()

runner = null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

int s = state;

// 线程被中断

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

}

set & setException: 更新状态值,唤醒栈中等待的线程


 

protected void set(V v) {

// CAS 将 state 修改为 COMPLETING,该状态是一个中间状态

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = v; // 输出结果赋值

// 将 state 更新为 NORMAL

UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

finishCompletion();

}

}


protected void setException(Throwable t) {

// CAS 将 state 修改为 COMPLETING,该状态是一个中间状态

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = t; // 输出结果赋值

// 将 state 更新为 EXCEPTIONAL

UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

finishCompletion();

}

}

这两个方法的操作类似,都是更新 state 的值并给返回结果 outcome 赋值,然后执行结束操作   finishCompletion 方法


 

private void finishCompletion() {

// assert state > COMPLETING;

for (WaitNode q; (q = waiters) != null;) {

// 将 waiters 置空

if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

for (;;) {

Thread t = q.thread;

if (t != null) {

q.thread = null;

// 唤醒 WaitNode 封装的线程

LockSupport.unpark(t);

}

WaitNode next = q.next;

if (next == null)

break;

q.next = null; // unlink to help gc

q = next;

}

break;

}

}


done();

callable = null; // to reduce footprint

}

finishCompletion  方法的作用就是唤醒栈中所有等待的线程,并清空栈。 其中的 done 方法实现为空:

protected void done() { }

子类可以重写该方法实现回调功能。

get: 获取执行结果


 

// 获取执行结果(阻塞式)

public V get() throws InterruptedException, ExecutionException {

int s = state;

// 若任务未执行完,则等待它执行完成

if (s <= COMPLETING)

// 任务未完成

s = awaitDone(false, 0L);

// 封装返回结果

return report(s);

}


// 获取执行结果(有超时等待)

public V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

if (unit == null)

throw new NullPointerException();

int s = state;

if (s <= COMPLETING &&

(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

throw new TimeoutException();

return report(s);

}

这两个方法都是获取任务执行的结果,原理也基本一样,区别在于后者有超时等待(超时会抛出 TimeoutException 异常)。

awaitDone: 等待任务执行完成


 

// Awaits completion or aborts on interrupt or timeout.

private int awaitDone(boolean timed, long nanos)

throws InterruptedException {

final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;

boolean queued = false;

for (;;) {

// 响应线程中断

if (Thread.interrupted()) {

removeWaiter(q);

throw new InterruptedException();

}

int s = state;

// s > COMPLETING 表示任务已执行完成(包括正常执行、异常等状态)

// 则返回对应的状态值

if (s > COMPLETING) {

if (q != null)

q.thread = null;

return s;

}

// s == COMPLETING 是一个中间状态,表示任务尚未完成

// 这里让出 CPU 时间片

else if (s == COMPLETING) // cannot time out yet

Thread.yield();

// 执行到这里,表示 s == NEW,将当前线程封装为一个 WaitNode 节点

else if (q == null)

q = new WaitNode();

// 这里表示 q 并未入栈,CAS 方式将当 WaitNode 入栈

else if (!queued)

queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

q.next = waiters, q);

// 有超时的情况

else if (timed) {

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

removeWaiter(q);

return state;

}

LockSupport.parkNanos(this, nanos);

}

// 将当前线程挂起

else

LockSupport.park(this);

}

}

该方法的主要判断步骤如下:

1. 若线程被中断,则响应中断;

2. 若任务已完成,则返回状态值;

3. 若任务正在执行,则让出 CPU 时间片;

4. 若任务未执行,则将当前线程封装为 WaitNode 节点;

5. 若 WaitNode 未入栈,则执行入栈;

6. 若已入栈,则将线程挂起。

以上步骤是循环执行的,其实该方法的主要作用就是:当任务执行完成时,返回状态值;否则将当前线程挂起。

removeWaiter: 移除栈中的节点


 

private void removeWaiter(WaitNode node) {

if (node != null) {

node.thread = null;

retry:

for (;;) { // restart on removeWaiter race

for (WaitNode pred = null, q = waiters, s; q != null; q = s) {

s = q.next;

if (q.thread != null)

pred = q;

else if (pred != null) {

pred.next = s;

if (pred.thread == null) // check for race

continue retry;

}

else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))

continue retry;

}

break;

}

}

}

report 方法:封装返回结果


 

private V report(int s) throws ExecutionException {

Object x = outcome; // 输出结果赋值

// 正常结束

if (s == NORMAL)

return (V)x;

// 取消

if (s >= CANCELLED)

throw new CancellationException();

// 执行异常

throw new ExecutionException((Throwable)x);

}

该方法就是对返回结果的包装,无论是正常结束或是抛出异常。

cancel: 取消任务


 

public boolean cancel(boolean mayInterruptIfRunning) {

if (!(state == NEW &&

UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

return false;

try { // in case call to interrupt throws exception

if (mayInterruptIfRunning) {

try {

// 若允许中断,则尝试中断线程

Thread t = runner;

if (t != null)

t.interrupt();

} finally { // final state

UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

}

}

} finally {

finishCompletion();

}

return true;

}

场景举例

FutureTask 适合多线程执行一些耗时的操作,然后获取执行结果。下面结合线程池简单分析其用法, 示例代码如下(仅供参考):


 

public class FutureTaskTest {

public static void main(String[] args) throws Exception {

ExecutorService executorService = Executors.newFixedThreadPool(5);

List<FutureTask<Integer>> taskList = new ArrayList<>();

for (int i = 0; i < 10; i++) {

int finalI = i;

FutureTask<Integer> futureTask = new FutureTask<>(() -> {

// 模拟耗时任务

TimeUnit.SECONDS.sleep(finalI * 2);

System.out.println(Thread.currentThread().getName() + " 计算中……");

return finalI * finalI;

});

taskList.add(futureTask);

executorService.submit(futureTask); // 提交到线程池

}


System.out.println("任务全部提交,主线程做其他操作");

// 获取执行结果

for (FutureTask<Integer> futureTask : taskList) {

Integer result = futureTask.get();

System.out.println("result-->" + result);

}

// 关闭线程池

executorService.shutdown();

}

}

小结

FutureTask 是一个封装任务(Runnable 或 Callable)的类,可以异步执行任务,并获取执行结果,适用于耗时操作场景。

参考链接:

http://www.hchstudio.cn/article/2017/2b8f/

https://segmentfault.com/a/1190000016572591

https://www.jianshu.com/p/43dab9b7c25b

JDK源码分析-FutureTask

原文  http://mp.weixin.qq.com/s?__biz=MzU4NzYyMDE4MQ==&mid=2247484035&idx=1&sn=4a5dab20cff4c2fee08138776588a518
正文到此结束

热门推荐

  • openfire数据库安装指南
    浏览(13,429) 评论(0)
  • Caffe 深度学习框架上手教程
    浏览(10,230) 评论(0)
  • ReactiveCocoa入门教程:第一部分
    浏览(11,060) 评论(0)
  • 开源HIDS-OSSEC使用实例:监测CC攻击
    浏览(10,938) 评论(0)
  • Decorators in ES7
    浏览(15,485) 评论(4)
  • 用Electron(Atom编辑器的兄弟项目)开发桌面应用
    浏览(28,497) 评论(0)
  • Windows下JetBrains CLion中文输出乱码的解决方法
    浏览(11,959) 评论(1)
  • 同步-@synchronized, NSLock, pthread, OSSpinLock性能比较
    浏览(10,945) 评论(0)
  • Seaweedfs之Volume读请求重定向
    浏览(25,220) 评论(3)
  • HTML、CSS及JavaScript : 有Promise,不会搞大肚子
    浏览(13,093) 评论(0)

相关文章

  • 高并发编程:HashMap 深入解析
  • elasticsearch连接客户端
  • Spring IO Platform Cairo-SR8 发布
  • Mybatis通用Mapper的实现
  • jdk和jre的区别,你真的懂吗?
  • java in a Nutshell <2>
  • Java 线程基础
  • spring5源码分析系列(四)——IOC容器的初始化(二)
  • 关于 JPA 连表查询和 redis 序列化遇到的小问题
  • Dresdon二次开发
阿里云首购8折
Loading...

深圳SEO优化公司虎门电子网站优化好吗南城五金网站优化哪个好青浦区推广网站优化价格网站seo优化今日价格高端网站优化公司电话网站优化排名工具静安区网站优化德州网站优化简历娄底网站优化咨询佳木斯优化网站收费seo网站关键词优化兴化百度网站优化软件滁州搜狗网站优化巩义网站推广优化哪家售后好滁州企业网站优化系统湘潭网站优化怎么做郴州网站页面优化宜人贷网站升级优化公告黄山网站优化如何选鹤壁网站排名优化找哪家天津网站优化推广案例岳阳营销型网站优化平台从化企业网站推广优化费用商丘网站优化排名多少钱优化招聘网站萧山优化网站哪家好兰州网站整站优化案例九江网站优化方式盐城规模大的珠宝行业网站优化珠海服务好的电商网站优化歼20紧急升空逼退外机英媒称团队夜以继日筹划王妃复出草木蔓发 春山在望成都发生巨响 当地回应60岁老人炒菠菜未焯水致肾病恶化男子涉嫌走私被判11年却一天牢没坐劳斯莱斯右转逼停直行车网传落水者说“没让你救”系谣言广东通报13岁男孩性侵女童不予立案贵州小伙回应在美国卖三蹦子火了淀粉肠小王子日销售额涨超10倍有个姐真把千机伞做出来了近3万元金手镯仅含足金十克呼北高速交通事故已致14人死亡杨洋拄拐现身医院国产伟哥去年销售近13亿男子给前妻转账 现任妻子起诉要回新基金只募集到26元还是员工自购男孩疑遭霸凌 家长讨说法被踢出群充个话费竟沦为间接洗钱工具新的一天从800个哈欠开始单亲妈妈陷入热恋 14岁儿子报警#春分立蛋大挑战#中国投资客涌入日本东京买房两大学生合买彩票中奖一人不认账新加坡主帅:唯一目标击败中国队月嫂回应掌掴婴儿是在赶虫子19岁小伙救下5人后溺亡 多方发声清明节放假3天调休1天张家界的山上“长”满了韩国人?开封王婆为何火了主播靠辱骂母亲走红被批捕封号代拍被何赛飞拿着魔杖追着打阿根廷将发行1万与2万面值的纸币库克现身上海为江西彩礼“减负”的“试婚人”因自嘲式简历走红的教授更新简介殡仪馆花卉高于市场价3倍还重复用网友称在豆瓣酱里吃出老鼠头315晚会后胖东来又人满为患了网友建议重庆地铁不准乘客携带菜筐特朗普谈“凯特王妃P图照”罗斯否认插足凯特王妃婚姻青海通报栏杆断裂小学生跌落住进ICU恒大被罚41.75亿到底怎么缴湖南一县政协主席疑涉刑案被控制茶百道就改标签日期致歉王树国3次鞠躬告别西交大师生张立群任西安交通大学校长杨倩无缘巴黎奥运

深圳SEO优化公司 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化