Java异步编程中如何进行FutureTask源码分析

发布时间:2021-10-12 10:35:33 作者:柒染
来源:亿速云 阅读:129

Java异步编程中如何进行FutureTask源码分析

引言

在Java并发编程中,FutureTask是一个非常重要的类,它实现了Future接口和Runnable接口,可以用来表示一个异步计算的结果。FutureTask可以用于包装CallableRunnable对象,并且可以通过ExecutorService提交给线程池执行。本文将深入分析FutureTask的源码,探讨其内部实现机制,以及如何在Java异步编程中使用它。

1. FutureTask概述

1.1 Future接口

Future接口表示一个异步计算的结果。它提供了检查计算是否完成的方法,以及获取计算结果的方法。如果计算尚未完成,get方法将会阻塞,直到计算完成。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

1.2 RunnableFuture接口

RunnableFuture接口继承了RunnableFuture接口,表示一个可以运行的FutureFutureTask实现了RunnableFuture接口。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

1.3 FutureTask类

FutureTask类实现了RunnableFuture接口,可以用来包装CallableRunnable对象,并且可以通过ExecutorService提交给线程池执行。

public class FutureTask<V> implements RunnableFuture<V> {
    // 内部状态
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    // 内部任务
    private Callable<V> callable;
    private Object outcome; // 结果或异常
    private volatile Thread runner;
    private volatile WaitNode waiters;

    // 构造方法
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW; // 初始状态为NEW
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW; // 初始状态为NEW
    }

    // 其他方法...
}

2. FutureTask的状态机

FutureTask内部使用一个状态机来管理任务的执行状态。状态机的状态包括:

状态转换图如下:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

3. FutureTask的核心方法

3.1 run方法

run方法是Runnable接口的实现,用于执行任务。run方法的主要逻辑如下:

  1. 检查任务状态,如果状态不是NEW,则直接返回。
  2. 设置当前线程为执行线程。
  3. 调用Callablecall方法执行任务。
  4. 根据任务执行结果设置状态和结果。
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

3.2 set方法

set方法用于设置任务的结果,并将状态从COMPLETING转换为NORMAL

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

3.3 setException方法

setException方法用于设置任务的异常结果,并将状态从COMPLETING转换为EXCEPTIONAL

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

3.4 finishCompletion方法

finishCompletion方法用于唤醒所有等待任务完成的线程。

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

3.5 get方法

get方法用于获取任务的结果。如果任务尚未完成,get方法将会阻塞,直到任务完成。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

3.6 awaitDone方法

awaitDone方法用于等待任务完成。如果任务尚未完成,当前线程将会被阻塞。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

3.7 report方法

report方法用于根据任务的状态返回结果或抛出异常。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

4. FutureTask的使用示例

4.1 使用FutureTask执行Callable任务

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<Integer> task = () -> {
            Thread.sleep(1000);
            return 42;
        };

        FutureTask<Integer> futureTask = new FutureTask<>(task);
        Thread thread = new Thread(futureTask);
        thread.start();

        System.out.println("Waiting for result...");
        int result = futureTask.get();
        System.out.println("Result: " + result);
    }
}

4.2 使用FutureTask执行Runnable任务

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Runnable task = () -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        FutureTask<Void> futureTask = new FutureTask<>(task, null);
        Thread thread = new Thread(futureTask);
        thread.start();

        System.out.println("Waiting for task to complete...");
        futureTask.get();
        System.out.println("Task completed.");
    }
}

5. 总结

FutureTask是Java并发编程中一个非常重要的类,它实现了Future接口和Runnable接口,可以用来表示一个异步计算的结果。通过深入分析FutureTask的源码,我们可以更好地理解其内部实现机制,以及如何在Java异步编程中使用它。FutureTask的状态机、核心方法(如runsetget等)以及使用示例都为我们提供了丰富的知识,帮助我们更好地掌握Java并发编程的技巧。

在实际开发中,FutureTask可以用于包装CallableRunnable对象,并且可以通过ExecutorService提交给线程池执行。通过合理地使用FutureTask,我们可以实现高效的异步编程,提升程序的并发性能。

推荐阅读:
  1. java中Callable、FutureTask和Future接口的介绍
  2. Java FutureTask类的用法

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java futuretask

上一篇:如何解决VBS中字符串连接的性能问题

下一篇:vbs如何合并多个excel文件

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》