您好,登录后才能下订单哦!
# Java中Future和FutureTask怎么用
## 一、引言
### 1.1 异步编程的重要性
在现代软件开发中,异步编程已成为提高系统性能和响应能力的关键手段。传统的同步阻塞式编程模型往往无法充分利用多核CPU资源,而异步编程通过将耗时的操作放到后台线程执行,主线程可以继续处理其他任务,显著提升了程序的吞吐量和用户体验。
### 1.2 Java并发编程的演进
Java从早期的Thread/Runnable到JUC(Java Util Concurrent)包的引入,再到Java 8的CompletableFuture,提供了越来越强大的异步编程工具。其中,Future接口及其实现类FutureTask作为Java 5引入的基础异步机制,至今仍在许多场景中发挥着重要作用。
### 1.3 本文内容概览
本文将深入解析Future和FutureTask的原理、使用方法、典型应用场景以及最佳实践,帮助开发者掌握这一基础但强大的并发工具。
## 二、Future接口详解
### 2.1 Future的核心概念
Future表示异步计算的结果,提供了检查计算是否完成的方法,以及获取计算结果的方法。其核心思想是"异步任务的句柄"——提交任务后立即返回Future对象,后续可以通过它获取结果。
```java
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
isDone()
: 判断任务是否完成(包括正常完成、异常结束和取消)isCancelled()
: 判断任务是否被取消get()
: 阻塞直到任务完成并返回结果get(long timeout, TimeUnit unit)
: 带超时的结果获取cancel(boolean mayInterruptIfRunning)
: 尝试取消任务ExecutorService executor = Executors.newFixedThreadPool(3);
Future<Integer> future = executor.submit(() -> {
TimeUnit.SECONDS.sleep(2);
return 42;
});
// 非阻塞检查
System.out.println("任务是否完成: " + future.isDone());
try {
// 阻塞获取结果
Integer result = future.get(3, TimeUnit.SECONDS);
System.out.println("计算结果: " + result);
} catch (TimeoutException e) {
System.err.println("计算超时");
} finally {
executor.shutdown();
}
FutureTask是Future接口的经典实现,同时实现了Runnable接口,因此既可以作为任务被线程执行,又可以作为Future获取任务结果。
public class FutureTask<V> implements RunnableFuture<V> {
// 状态机实现
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
// 其他状态...
// 实际任务
private Callable<V> callable;
// 结果或异常
private Object outcome;
// 运行线程
private volatile Thread runner;
// 等待线程的Treiber栈
private volatile WaitNode waiters;
}
FutureTask使用状态机模型管理任务生命周期: - NEW -> COMPLETING -> NORMAL (成功完成) - NEW -> COMPLETING -> EXCEPTIONAL (执行异常) - NEW -> CANCELLED (被取消) - NEW -> INTERRUPTING -> INTERRUPTED (被中断)
使用volatile变量保证状态变化的可见性,结果通过outcome字段存储,在状态变为完成态后才能被读取。
// 使用Callable构造
FutureTask<Integer> futureTask1 = new FutureTask<>(() -> {
// 长时间计算
return 123;
});
// 使用Runnable和结果值构造
FutureTask<String> futureTask2 = new FutureTask<>(() -> {
System.out.println("任务执行");
}, "预设结果");
// 创建FutureTask
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000);
return "任务完成于: " + System.currentTimeMillis();
});
// 使用线程执行
new Thread(futureTask).start();
// 获取结果
try {
String result = futureTask.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
ExecutorService executor = Executors.newCachedThreadPool();
List<FutureTask<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int taskId = i;
FutureTask<Integer> task = new FutureTask<>(() -> {
Thread.sleep((long) (Math.random() * 1000));
return taskId * taskId;
});
executor.submit(task);
tasks.add(task);
}
// 获取所有任务结果
tasks.forEach(ft -> {
try {
System.out.println("任务结果: " + ft.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
executor.shutdown();
FutureTask<BigInteger> primeTask = new FutureTask<>(() -> {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted()) {
p = p.nextProbablePrime();
System.out.println("计算素数: " + p);
Thread.sleep(300);
}
return p;
});
Thread computeThread = new Thread(primeTask);
computeThread.start();
try {
Thread.sleep(2000);
// 尝试取消任务
boolean cancelled = primeTask.cancel(true);
System.out.println("任务取消结果: " + cancelled);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 第一个任务:获取用户ID
FutureTask<String> userIdTask = new FutureTask<>(() -> {
Thread.sleep(500);
return "user123";
});
// 第二个任务:根据用户ID获取信息
FutureTask<UserInfo> userInfoTask = new FutureTask<>(() -> {
String userId = userIdTask.get(); // 依赖第一个任务
return userService.getUserInfo(userId);
});
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(userIdTask);
executor.submit(userInfoTask);
UserInfo user = userInfoTask.get(); // 最终结果
FutureTask<String> riskyTask = new FutureTask<>(() -> {
Thread.sleep(2000); // 模拟耗时操作
return "敏感数据";
});
new Thread(riskyTask).start();
try {
// 设置500ms超时
String result = riskyTask.get(500, TimeUnit.MILLISECONDS);
System.out.println("获取结果: " + result);
} catch (TimeoutException e) {
System.err.println("操作超时,执行熔断");
riskyTask.cancel(true);
// 返回降级结果
System.out.println("返回默认值");
}
ExecutorService executor = Executors.newFixedThreadPool(5);
List<FutureTask<Integer>> tasks = IntStream.range(0, 20)
.mapToObj(i -> new FutureTask<>(() -> {
Thread.sleep((long) (Math.random() * 1000));
return i * i;
}))
.collect(Collectors.toList());
// 提交所有任务
tasks.forEach(executor::submit);
// 获取完成的任务结果
while (!tasks.isEmpty()) {
Iterator<FutureTask<Integer>> iterator = tasks.iterator();
while (iterator.hasNext()) {
FutureTask<Integer> task = iterator.next();
if (task.isDone()) {
try {
System.out.println("完成结果: " + task.get());
iterator.remove();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Thread.sleep(200);
}
executor.shutdown();
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
使用LockSupport.park()实现线程等待,通过Treiber栈管理等待线程。
特性 | Future/FutureTask | CompletableFuture |
---|---|---|
完成通知 | 需主动轮询 | 支持回调通知 |
组合操作 | 不支持 | 支持thenApply等组合操作 |
异常处理 | 需手动处理 | 内置异常处理机制 |
手动完成 | 不支持 | 支持complete()方法 |
Java版本 | 5+ | 8+ |
内存泄漏示例:
ExecutorService executor = Executors.newFixedThreadPool(2);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 100_000; i++) {
futures.add(executor.submit(() -> {
try {
Thread.sleep(Long.MAX_VALUE); // 模拟长任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));
}
// 忘记取消未完成的任务会导致Future和关联对象无法回收
正确做法:
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
List<Future<?>> futures = new ArrayList<>();
// 提交任务...
} finally {
// 取消所有未完成任务
futures.forEach(f -> f.cancel(true));
executor.shutdownNow();
}
class LoggingFutureTask<V> extends FutureTask<V> {
public LoggingFutureTask(Callable<V> callable) {
super(callable);
}
@Override
protected void done() {
try {
System.out.println("任务完成,结果: " + get());
} catch (InterruptedException | ExecutionException e) {
System.out.println("任务异常: " + e.getCause());
}
}
}
适合以下场景: - 单次异步计算需求 - 需要手动取消的长时间操作 - 简单的异步结果依赖 - 兼容老版本Java的系统
import java.util.concurrent.*;
import java.util.stream.*;
public class FutureExample {
static class UserInfo {
final String userId;
UserInfo(String id) { this.userId = id; }
@Override public String toString() { return "UserInfo["+userId+"]"; }
}
static class UserService {
UserInfo getUserInfo(String userId) throws InterruptedException {
Thread.sleep(800);
return new UserInfo(userId);
}
}
public static void main(String[] args) {
// 初始化服务
UserService userService = new UserService();
ExecutorService executor = Executors.newFixedThreadPool(3);
try {
// 示例1:基本使用
FutureTask<Integer> squareTask = new FutureTask<>(() -> {
Thread.sleep(300);
int x = 5;
return x * x;
});
executor.execute(squareTask);
System.out.println("5的平方: " + squareTask.get());
// 示例2:任务链
FutureTask<String> userIdTask = new FutureTask<>(() -> {
Thread.sleep(500);
return "user_"+System.currentTimeMillis()%1000;
});
FutureTask<UserInfo> userInfoTask = new FutureTask<>(() -> {
String userId = userIdTask.get();
return userService.getUserInfo(userId);
});
executor.execute(userIdTask);
executor.execute(userInfoTask);
System.out.println("用户信息: " + userInfoTask.get());
// 示例3:批量任务
List<FutureTask<Integer>> tasks = IntStream.range(1, 6)
.mapToObj(i -> new FutureTask<>(() -> {
int sleepTime = i * 200;
Thread.sleep(sleepTime);
return i * 10;
}))
.collect(Collectors.toList());
tasks.forEach(executor::execute);
for (FutureTask<Integer> task : tasks) {
System.out.println("批量任务结果: " + task.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
通过本文的系统学习,读者应该能够掌握Future和FutureTask的核心概念、使用方法和实现原理,并能在实际项目中正确应用这些并发工具。对于更复杂的异步编程需求,建议进一步学习Java 8引入的CompletableFuture等高级特性。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。