并发编程之如何理解Future&FutureTask

发布时间:2021-10-23 09:37:08 作者:iii
来源:亿速云 阅读:157

本篇内容介绍了“并发编程之如何理解Future&FutureTask”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

 前言

Java线程实现方式主要有四种:

其中前两种方式线程执行完后都没有返回值,后两种是带返回值的。

Callable 和 Runnable 接口

Runnable接口

// 实现Runnable接口的类将被Thread执行,表示一个基本的任务 public interface Runnable {     // run方法就是它所有的内容,就是实际执行的任务     public abstract void run(); }

没有返回值

run 方法没有返回值,虽然有一些别的方法也能实现返回值得效果,比如编写日志文件或者修改共享变量等等,但是不仅容易出错,效率也不高。

不能抛出异常

public class RunThrowExceptionDemo {      /**      * 普通方法可以在方法签名中抛出异常      *      * @throws IOException      */     public void normalMethod() throws IOException {         throw new IOException();     }      class RunnableImpl implements Runnable {          /**          * run 方法内无法抛出 checked Exception,除非使用 try catch 进行处理          */         @Override         public void run() {             try {                 throw new IOException();             } catch (IOException e) {                 e.printStackTrace();             }         }     }  }

可以看到普通方法 normalMethod 可以在方法签名上抛出异常,这样上层接口就可以捕获这个异常进行处理,但是实现 Runnable 接口的类,run  方法无法抛出 checked Exception,只能在方法内使用 try catch 进行处理,这样上层就无法得知线程中的异常。

设计导致

其实这两个缺陷主要原因就在于 Runnable 接口设计的 run 方法,这个方法已经规定了 run() 方法的返回类型是  void,而且这个方法没有声明抛出任何异常。所以,当实现并重写这个方法时,我们既不能改返回值类型,也不能更改对于异常抛出的描述,因为在实现方法的时候,语法规定是不允许对这些内容进行修改的。

Runnable 为什么设计成这样?

假设 run() 方法可以返回返回值,或者可以抛出异常,也无济于事,因为我们并没有办法在外层捕获并处理,这是因为调用 run() 方法的类(比如  Thread 类和线程池)是 Java 直接提供的,而不是我们编写的。 所以就算它能有一个返回值,我们也很难把这个返回值利用到,而 Callable  接口就是为了解决这两个问题。

Callable接口

public interface Callable<V> {     //返回接口,或者抛出异常     V call() throws Exception; }

可以看到 Callable 和 Runnable 接口其实比较相似,都只有一个方法,也就是线程任务执行的方法,区别就是 call 方法有返回值,而且声明了  throws Exception。

Callable 和 Runnable 的不同之处

与 Callable 配合的有一个 Future 接口,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是  Runnable 做不到的,Callable 的功能要比 Runnable 强大。

Future接口

Future的作用

简单来说就是利用线程达到异步的效果,同时还可以获取子线程的返回值。

比如当做一定运算的时候,运算过程可能比较耗时,有时会去查数据库,或是繁重的计算,比如压缩、加密等,在这种情况下,如果我们一直在原地等待方法返回,显然是不明智的,整体程序的运行效率会大大降低。

我们可以把运算的过程放到子线程去执行,再通过 Future  去控制子线程执行的计算过程,最后获取到计算结果。这样一来就可以把整个程序的运行效率提高,是一种异步的思想。

Future的方法

Future 接口一共有5个方法,源代码如下:

并发编程之如何理解Future&FutureTask

public interface Future<V> {    /**    * 尝试取消任务,如果任务已经完成、已取消或其他原因无法取消,则失败。    * 1、如果任务还没开始执行,则该任务不应该运行    * 2、如果任务已经开始执行,由参数mayInterruptIfRunning来决定执行该任务的线程是否应该被中断,这只是终止任务的一种尝试。若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。    * 3、调用这个方法后,以后对isDone方法调用都返回true。    * 4、如果这个方法返回true,以后对isCancelled返回true。    */     boolean cancel(boolean mayInterruptIfRunning);     /**     * 判断任务是否被取消了,如果调用了cance()则返回true     */     boolean isCancelled();     /**     * 如果任务完成,则返回ture     * 任务完成包含正常终止、异常、取消任务。在这些情况下都返回true     */     boolean isDone();     /**     * 线程阻塞,直到任务完成,返回结果     * 如果任务被取消,则引发CancellationException     * 如果当前线程被中断,则引发InterruptedException     * 当任务在执行的过程中出现异常,则抛出ExecutionException     */     V get() throws InterruptedException, ExecutionException;     /**     * 线程阻塞一定时间等待任务完成,并返回任务执行结果,如果则超时则抛出TimeoutException     */     V get(long timeout, TimeUnit unit)         throws InterruptedException, ExecutionException, TimeoutException; }

get方法(获取结果)

get 方法最主要的作用就是获取任务执行的结果,该方法在执行时的行为取决于 Callable 任务的状态,可能会发生以下 7 种情况。

参考示例:

package com.niuh.future;  import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;  public class FutureDemo {     public static void main(String[] args) {         ExecutorService executorService = Executors.newSingleThreadExecutor();         Future<Integer> future = executorService.submit(new FutureTask());         try {             Integer res = future.get(2000, TimeUnit.MILLISECONDS);             System.out.println("Future线程返回值:" + res);         } catch (InterruptedException e) {             e.printStackTrace();         } catch (ExecutionException e) {             e.printStackTrace();         } catch (TimeoutException e) {             e.printStackTrace();         }     }      static class FutureTask implements Callable<Integer> {          @Override         public Integer call() throws Exception {             Thread.sleep(new Random().nextInt(3000));             return new Random().nextInt(10);         }     } }

isDone方法(判断是否执行完毕)

isDone() 方法,该方法是用来判断当前这个任务是否执行完毕了

package com.niuh.future;  import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;  public class FutureIsDoneDemo {     public static void main(String[] args) {         ExecutorService executorService = Executors.newSingleThreadExecutor();         Future<Integer> future = executorService.submit(new FutureTask());         try {             for (int i = 0; i < 3; i++) {                 Thread.sleep(1000);                 System.out.println("线程是否完成:" + future.isDone());             }             Integer res = future.get(2000, TimeUnit.MILLISECONDS);             System.out.println("Future 线程返回值:" + res);         } catch (InterruptedException e) {             e.printStackTrace();         } catch (ExecutionException e) {             e.printStackTrace();         } catch (TimeoutException e) {             e.printStackTrace();         }     }      static class FutureTask implements Callable<Integer> {          @Override         public Integer call() throws Exception {             Thread.sleep(2000);             return new Random().nextInt(10);         }     } }

执行结果:

线程是否完成:false 线程是否完成:false 线程是否完成:true Future 线程返回值:9

可以看到前两次 isDone 方法的返回结果是 false,因为线程任务还没有执行完成,第三次 isDone 方法的返回结果是 ture。

注意:这个方法返回 true 则代表执行完成了,返回 false 则代表还没完成。但返回  true,并不代表这个任务是成功执行的,比如说任务执行到一半抛出了异常。那么在这种情况下,对于这个 isDone 方法而言,它其实也是会返回 true  的,因为对它来说,虽然有异常发生了,但是这个任务在未来也不会再被执行,它确实已经执行完毕了。所以 isDone 方法在返回 true  的时候,不代表这个任务是成功执行的,只代表它执行完毕了。

我们将上面的示例稍作修改再来看下结果,修改 FutureTask 代码如下:

static class FutureTask implements Callable<Integer> {  @Override  public Integer call() throws Exception {   Thread.sleep(2000);   throw new Exception("故意抛出异常");     } }

执行结果:

并发编程之如何理解Future&FutureTask

虽然抛出了异常,但是 isDone 方法的返回结果依然是 ture。

这段代码说明了:

cancel方法(取消任务的执行)

如果不想执行某个任务了,则可以使用 cancel 方法,会有以下三种情况:

参考示例:

package com.niuh.future;  import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;  public class FutureCancelDemo {      static ExecutorService executorService = Executors.newSingleThreadExecutor();      public static void main(String[] args) {         // 当任务还没有开始执行         // demo1();          // 如果任务已经执行完         // demo2();          // 如果任务正在进行中         demo3();     }      private static void demo1() {         for (int i = 0; i < 1000; i++) {             executorService.submit(new FutureTask());         }          Future<String> future = executorService.submit(new FutureTask());         try {             boolean cancel = future.cancel(false);             System.out.println("Future 任务是否被取消:" + cancel);             String res = future.get(2000, TimeUnit.MILLISECONDS);             System.out.println("Future 线程返回值:" + res);         } catch (InterruptedException e) {             e.printStackTrace();         } catch (ExecutionException e) {             e.printStackTrace();         } catch (TimeoutException e) {             e.printStackTrace();         } finally {             executorService.shutdown();         }     }       private static void demo2() {         Future<String> future = executorService.submit(new FutureTask());         try {             Thread.sleep(1000);             boolean cancel = future.cancel(false);             System.out.println("Future 任务是否被取消:" + cancel);         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             executorService.shutdown();         }     }      private static void demo3() {         Future<String> future = executorService.submit(new FutureInterruptTask());         try {             Thread.sleep(1000);             boolean cancel = future.cancel(true);             System.out.println("Future 任务是否被取消:" + cancel);         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             executorService.shutdown();         }     }       static class FutureTask implements Callable<String> {          @Override         public String call() throws Exception {             return "正常返回";         }     }      static class FutureInterruptTask implements Callable<String> {          @Override         public String call() throws Exception {             while (!Thread.currentThread().isInterrupted()) {                 System.out.println("循环执行");                 Thread.sleep(500);             }             System.out.println("线程被中断");             return "正常返回";         }     } }

这里,我们来分析下第三种情况(任务正在进行中),当我们设置 true 时,线程停止

循环执行 循环执行 Future 任务是否被取消:true

当我们设置 false 时,任务虽然也被取消成功,但是线程依然执行。

循环执行 循环执行 Future 任务是否被取消:true 循环执行 循环执行 循环执行 循环执行 ......

那么如何选择传入 true 还是 false 呢?

需要注意的是,虽然示例中写了 !Thread.currentThread().isInterrupted()  方法来判断中断,但是实际上并不是通过我们的代码来进行中断,而是 Future#cancel(true) 内部调用 t.interrupt  方法修改线程的状态之后,Thread.sleep 会抛出 InterruptedException 异常,线程池中会执行异常的相关逻辑,并退出当前任务。  sleep 和 interrupt 会产生意想不到的效果。

比如我们将 FutureInterruptTask 代码修改为 while(true) 形式,调用 cancel(true)  方法线程还是会被中断。

static class FutureInterruptTask implements Callable<String> {  @Override  public String call() throws Exception {   while (true) {             System.out.println("循环执行");             Thread.sleep(500);   }  } }

isCancelled方法(判断是否被取消)

isCancelled 方法,判断是否被取消,它和 cancel 方法配合使用,比较简单,可以参考上面的示例。

Callable 和 Future 的关系

Callable 接口相比于 Runnable 的一大优势是可以有返回结果,返回结果就可以用 Future 类的 get 方法来获取 。因此,Future  相当于一个存储器,它存储了 Callable 的 call 方法的任务结果。

除此之外,我们还可以通过 Future 的 isDone 方法来判断任务是否已经执行完毕了,还可以通过 cancel  方法取消这个任务,或限时获取任务的结果等,总之 Future 的功能比较丰富。

FutureTask

Future只是一个接口,不能直接用来创建对象,其实现类是FutureTask,JDK1.8修改了FutureTask的实现,JKD1.8不再依赖AQS来实现,而是通过一个volatile变量state以及CAS操作来实现。FutureTask结构如下所示:

并发编程之如何理解Future&FutureTask

我们来看一下 FutureTask 的代码实现:

public class FutureTask implements RunnableFuture {...}

可以看到,它实现了一个接口,这个接口叫作 RunnableFuture。

RunnableFuture接口

我们来看一下 RunnableFuture 接口的代码实现:

public interface RunnableFuture<V> extends Runnable, Future<V> {     /**      * Sets this Future to the result of its computation      * unless it has been cancelled.      */     void run(); }

既然 RunnableFuture 继承了 Runnable 接口和 Future 接口,而 FutureTask 又实现了 RunnableFuture  接口,所以 FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

FutureTask源码分析

成员变量

/*  * 当前任务运行状态  * NEW -> COMPLETING -> NORMAL(正常结束,返回结果)  * NEW -> COMPLETING -> EXCEPTIONAL(返回异常结果)  * NEW -> CANCELLED(任务被取消,无结果)  * NEW -> INTERRUPTING -> INTERRUPTED(任务被打断,无结果)  */ private volatile int state; private static final int NEW          = 0; // 新建 0 private static final int COMPLETING   = 1; // 执行中 1 private static final int NORMAL       = 2; // 正常 2 private static final int EXCEPTIONAL  = 3; // 异常 3 private static final int CANCELLED    = 4; // 取消 4 private static final int INTERRUPTING = 5; // 中断中 5 private static final int INTERRUPTED  = 6; // 被中断 6  /** 将要被执行的任务 */ private Callable<V> callable; /** 存放执行结果,用于get()方法获取结果,也可能用于get()方法抛出异常 */ private Object outcome; // non-volatile, protected by state reads/writes /** 执行任务Callable的线程; */ private volatile Thread runner; /** 栈结构的等待队列,该节点是栈中最顶层的节点 */ private volatile WaitNode waiters;

为了后面更好的分析FutureTask的实现,这里有必要解释下各个状态。

有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。

构造方法

// Callable 构造方法 public FutureTask(Callable<V> callable) {     if (callable == null)         throw new NullPointerException();     this.callable = callable;     this.state = NEW;       // ensure visibility of callable }  // Runnable 构造方法 public FutureTask(Runnable runnable, V result) {     this.callable = Executors.callable(runnable, result);     this.state = NEW;       // ensure visibility of callable }

Runnable的构造器,只有一个目的,就是通过Executors.callable把入参转化为RunnableAdapter,主要是因为Callable的功能比Runnable丰富,Callable有返回值,而Runnable没有。

/** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> {     final Runnable task;     final T result;     RunnableAdapter(Runnable task, T result) {         this.task = task;         this.result = result;     }     public T call() {         task.run();         return result;     } }

这是一个典型的适配模型,我们要把 Runnable 适配成 Callable,首先要实现 Callable 的接口,接着在 Callable 的 call  方法里面调用被适配对象(Runnable)的方法。

内部类

static final class WaitNode {  volatile Thread thread;  volatile WaitNode next;  WaitNode() { thread = Thread.currentThread(); } }

run方法

/**  * run方法可以直接被调用  * 也可以开启新的线程调用  */ public void run() {  // 状态不是任务创建,或者当前任务已经有线程在执行了,直接返回     if (state != NEW ||         !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))         return;     try {         Callable<V> c = callable;         // Callable 不为空,并且已经初始化完成         if (c != null && state == NEW) {             V result;             boolean ran;             try {              //调用执行                 result = c.call();                 ran = true;             } catch (Throwable ex) {                 result = null;                 ran = false;//执行失败                 //通过CAS算法设置返回值(COMPLETING)和状态值(EXCEPTIONAL)                 setException(ex);             }             //执行成功通过CAS(UNSAFE)设置返回值(COMPLETING)和状态值(NORMAL)             if (ran)              //将result赋值给outcome                 set(result);         }     } finally {         // runner must be non-null until state is settled to         // prevent concurrent calls to run()         //将任务runner设置为null,避免发生并发调用run()方法         runner = null;         // state must be re-read after nulling runner to prevent         // leaked interrupts         //须重新读取任务状态,避免不可达(泄漏)的中断         int s = state;         //确保cancle(ture)操作时,运行中的任务能接收到中断指令         if (s >= INTERRUPTING)             handlePossibleCancellationInterrupt(s);     } }
  1. 鸿蒙官方战略合作共建——HarmonyOS技术社区

  2. run方法是没有返回值的,通过给outcome属性赋值(set(result)),get时就能从outcome属性中拿到返回值。

  3. FutureTask 两种构造器,最终都转化成了 Callable,所以在 run 方法执行的时候,只需要执行 Callable 的 call  方法即可,在执行 c.call()代码时,如果入参是 Runnable 的话, 调用路径为 c.call() ->  RunnableAdapter.call() -> Runnable.run(),如果入参是 Callable 的话,直接调用。

setException(Throwable t)方法

//发生异常时,将返回值设置到outcome(=COMPLETING)中,并更新任务状态(EXCEPTIONAL) protected void setException(Throwable t) {  //调用UNSAFE类封装的CAS算法,设置值  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {      outcome = t;     UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state     //唤醒因等待返回值而阻塞的线程     finishCompletion();     } }

由于篇幅有限,更多源码解析请查看文章扩展链接

Future的使用

FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。

FutureTask执行多任务计算的使用场景

利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果时,在异步获取子线程的执行结果。

//任务正常完成,将返回值设置到outcome(=COMPLETING)中,并更新任务状态(=NORMAL) protected void set(V v) {  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {   outcome = v;         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state         finishCompletion();     } }

执行结果:

生成子线程计算任务: 0 生成子线程计算任务: 1 生成子线程计算任务: 2 生成子线程计算任务: 3 生成子线程计算任务: 4 生成子线程计算任务: 5 生成子线程计算任务: 6 生成子线程计算任务: 7 生成子线程计算任务: 8 生成子线程计算任务: 9 所有计算任务提交完毕, 主线程接着干其他事情! 子线程计算任务: 0 执行完成! 子线程计算任务: 1 执行完成! 子线程计算任务: 3 执行完成! 子线程计算任务: 4 执行完成! 子线程计算任务: 2 执行完成! 子线程计算任务: 5 执行完成! 子线程计算任务: 7 执行完成! 子线程计算任务: 9 执行完成! 子线程计算任务: 8 执行完成! 子线程计算任务: 6 执行完成! 多任务计算后的总结果是:990

FutureTask在高并发环境下确保任务只执行一次

在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个带key的连接池,当key存在时,即直接返回key对应的对象;当key不存在时,则创建连接。对于这样的应用场景,通常采用的方法为使用一个Map对象来存储key和连接池对应的对应关系,典型的代码如下面所示:

//移除所有等待线程并发出信号,调用done(),以及将任务callable清空 private void finishCompletion() {     // assert state > COMPLETING;     for (WaitNode q; (q = waiters) != null;) {         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {             //循环唤醒阻塞线程,直到阻塞队列为空             for (;;) {                 Thread t = q.thread;                 if (t != null) {                     q.thread = null;                     LockSupport.unpark(t);                 }                 WaitNode next = q.next;                 //一直到阻塞队列为空,跳出循环                 if (next == null)                     break;                 q.next = null; // unlink to help gc   方便gc在适当的时候回收                 q = next;             }             break;         }     }      done();      callable = null;        // to reduce footprint }

在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了connection只创建一次,然而却牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高。

package com.niuh.future;  import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.ConcurrentHashMap;  /**  * @description: 改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高。  * <p>  * 但是在高并发的情况下有可能出现Connection被创建多次的现象。  * 为什么呢?因为创建Connection是一个耗时操作,假设多个线程涌入getConnection方法,都发现key对应的键不存在,  * 于是所有涌入的线程都开始执行conn=createConnection(),只不过最终只有一个线程能将connection插入到map里。  * 但是这样以来,其它线程创建的的connection就没啥价值,浪费系统开销。  */ public class FutureTaskConnection2 {     private static ConcurrentHashMap<String, Connection> connectionPool = new ConcurrentHashMap<>();      public static Connection getConnection(String key) {         Connection connection = connectionPool.get(key);         if (connection == null) {             connection = createConnection();             //根据putIfAbsent的返回值判断是否有线程抢先插入了             Connection returnConnection = connectionPool.putIfAbsent(key, connection);             if (returnConnection != null) {                 connection = returnConnection;             }         } else {             return connection;         }         return connection;     }      private static Connection createConnection() {         try {             return DriverManager.getConnection("");         } catch (SQLException e) {             e.printStackTrace();         }         return null;     }  }

但是在高并发的情况下有可能出现Connection被创建多次的现象。 为什么呢?

因为创建Connection是一个耗时操作,假设多个线程涌入getConnection方法,都发现key对应的键不存在,于是所有涌入的线程都开始执行conn=createConnection(),只不过最终只有一个线程能将connection插入到map里。但是这样以来,其它线程创建的的connection就没啥价值,浪费系统开销。

这时最需要解决的问题就是当key不存在时,创建Connection的动作(conn=createConnection();)能放在connectionPool.putIfAbsent()之后执行,这正是FutureTask发挥作用的时机,基于ConcurrentHashMap和FutureTask的改造代码如下:

package com.niuh.future;  import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask;  /**  * @description: FutureTask在高并发环境下确保任务只执行一次  * 这时最需要解决的问题就是当key不存在时,创建Connection的动作(conn=createConnection();)  * 能放在connectionPool.putIfAbsent()之后执行,这正是FutureTask发挥作用的时机,  * 基于ConcurrentHashMap和FutureTask的改造代码如下:  */ public class FutureTaskConnection3 {     private static ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();      public static Connection getConnection(String key) {         FutureTask<Connection> connectionFutureTask = connectionPool.get(key);         try {             if (connectionFutureTask != null) {                 return connectionFutureTask.get();             } else {                 Callable<Connection> callable = new Callable<Connection>() {                     @Override                     public Connection call() throws Exception {                         return createConnection();                     }                 };                 FutureTask<Connection> newTask = new FutureTask<>(callable);                 FutureTask<Connection> returnFt = connectionPool.putIfAbsent(key, newTask);                 if (returnFt == null) {                     connectionFutureTask = newTask;                     newTask.run();                 }                 return connectionFutureTask.get();             }         } catch (ExecutionException e) {             e.printStackTrace();         } catch (InterruptedException e) {             e.printStackTrace();         }         return null;     }      private static Connection createConnection() {         try {             return DriverManager.getConnection("");         } catch (SQLException e) {             e.printStackTrace();         }         return null;     } }

FutureTask任务执行完回调

FutureTask有一个方法 void done()会在每个线程执行完成return结果时回调。  假设现在需要实现每个线程完成任务执行后主动执行后续任务。

private void handlePossibleCancellationInterrupt(int s) {     // It is possible for our interrupter to stall before getting a     // chance to interrupt us.  Let's spin-wait patiently.     //自旋等待cancle(true)结束(中断结束)     if (s == INTERRUPTING)         while (state == INTERRUPTING)              Thread.yield(); // wait out pending interrupt      // assert state == INTERRUPTED;      // We want to clear any interrupt we may have received from     // cancel(true).  However, it is permissible to use interrupts     // as an independent mechanism for a task to communicate with     // its caller, and there is no way to clear only the     // cancellation interrupt.     //     // Thread.interrupted(); }

执行结果:

11:01:37.134 [Thread-0] INFO com.niuh.future.FutureTaskDemo1 - 老板给我来一个月饼 11:01:37.139 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 - 月饼制作中。。。。 11:01:37.139 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 - 月饼制作中。。。。 11:01:37.139 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 - 月饼制作中。。。。 11:01:42.151 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 -  编号[804]月饼已打包好 11:01:42.151 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 -  编号[88]月饼已打包好 11:01:42.151 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 -  编号[166]月饼已打包好

“并发编程之如何理解Future&FutureTask”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

推荐阅读:
  1. Python全栈开发之并发编程
  2. 掌握之并发编程-3.锁

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java

上一篇:怎么让Linux机器加入Windows的AD域

下一篇:怎么在Linux/Unix之上绑定ntpd到特定的IP地址

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》