java多线程如何通过CompletableFuture组装异步计算单元

发布时间:2023-05-12 10:29:55 作者:zzz
来源:亿速云 阅读:148

Java多线程如何通过CompletableFuture组装异步计算单元

目录

  1. 引言
  2. CompletableFuture简介
  3. 创建CompletableFuture
  4. 异步任务的执行
  5. 任务结果的获取
  6. 任务结果的组合
  7. 异常处理
  8. 任务结果的转换
  9. 任务结果的消费
  10. 任务结果的合并
  11. 任务结果的等待
  12. 任务结果的取消
  13. 任务结果的超时处理
  14. 任务结果的依赖
  15. 任务结果的并行执行
  16. 任务结果的串行执行
  17. 任务结果的回调
  18. 任务结果的链式调用
  19. 任务结果的并行流
  20. 任务结果的CompletableFuture与Future的区别
  21. 任务结果的CompletableFuture与RxJava的区别
  22. 任务结果的CompletableFuture与Reactor的区别
  23. 任务结果的CompletableFuture与Akka的区别
  24. 任务结果的CompletableFuture与Vert.x的区别
  25. 任务结果的CompletableFuture与Spring的区别
  26. 任务结果的CompletableFuture的局限性
  27. 总结

引言

在现代软件开发中,异步编程已经成为一种常见的编程范式。Java作为一种广泛使用的编程语言,提供了多种方式来实现异步编程。其中,CompletableFuture是Java 8引入的一个强大的工具,用于处理异步计算和任务编排。本文将详细介绍如何使用CompletableFuture来组装异步计算单元,并探讨其在实际开发中的应用。

CompletableFuture简介

CompletableFuturejava.util.concurrent包中的一个类,它实现了Future接口,并提供了更丰富的功能来处理异步任务。与传统的Future相比,CompletableFuture不仅支持任务的异步执行,还支持任务的组合、异常处理、回调等高级功能。

创建CompletableFuture

1. 使用CompletableFuture.supplyAsync()

supplyAsync()方法用于创建一个异步任务,并返回一个CompletableFuture对象。该方法接受一个Supplier函数式接口作为参数,表示要执行的任务。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

2. 使用CompletableFuture.runAsync()

runAsync()方法用于创建一个异步任务,但不返回任何结果。该方法接受一个Runnable函数式接口作为参数,表示要执行的任务。

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Task completed.");
});

异步任务的执行

1. 使用默认的线程池

默认情况下,CompletableFuture使用ForkJoinPool.commonPool()作为线程池来执行异步任务。这个线程池是一个全局共享的线程池,适用于大多数场景。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

2. 使用自定义的线程池

如果需要使用自定义的线程池,可以将Executor作为第二个参数传递给supplyAsync()runAsync()方法。

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
}, executor);

任务结果的获取

1. 使用get()方法

get()方法用于获取异步任务的结果。该方法会阻塞当前线程,直到任务完成并返回结果。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

String result = future.get();
System.out.println(result);

2. 使用join()方法

join()方法与get()方法类似,但它不会抛出InterruptedExceptionExecutionException异常,而是直接抛出CompletionException异常。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

String result = future.join();
System.out.println(result);

任务结果的组合

1. 使用thenApply()

thenApply()方法用于在任务完成后对结果进行转换。该方法接受一个Function函数式接口作为参数,表示对结果进行转换的逻辑。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

CompletableFuture<String> transformedFuture = future.thenApply(result -> result.toUpperCase());
String result = transformedFuture.join();
System.out.println(result);

2. 使用thenCompose()

thenCompose()方法用于在任务完成后执行另一个异步任务,并将两个任务的结果组合在一起。该方法接受一个Function函数式接口作为参数,表示要执行的另一个异步任务。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

CompletableFuture<String> composedFuture = future.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " How are you?"));
String result = composedFuture.join();
System.out.println(result);

异常处理

1. 使用exceptionally()

exceptionally()方法用于处理任务执行过程中发生的异常。该方法接受一个Function函数式接口作为参数,表示异常处理逻辑。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    throw new RuntimeException("Task failed!");
});

CompletableFuture<String> handledFuture = future.exceptionally(ex -> "Error: " + ex.getMessage());
String result = handledFuture.join();
System.out.println(result);

2. 使用handle()

handle()方法用于在任务完成后处理结果或异常。该方法接受一个BiFunction函数式接口作为参数,表示处理逻辑。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    throw new RuntimeException("Task failed!");
});

CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
    if (ex != null) {
        return "Error: " + ex.getMessage();
    } else {
        return result;
    }
});
String result = handledFuture.join();
System.out.println(result);

任务结果的转换

1. 使用thenApply()

thenApply()方法用于在任务完成后对结果进行转换。该方法接受一个Function函数式接口作为参数,表示对结果进行转换的逻辑。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

CompletableFuture<String> transformedFuture = future.thenApply(result -> result.toUpperCase());
String result = transformedFuture.join();
System.out.println(result);

2. 使用thenCompose()

thenCompose()方法用于在任务完成后执行另一个异步任务,并将两个任务的结果组合在一起。该方法接受一个Function函数式接口作为参数,表示要执行的另一个异步任务。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

CompletableFuture<String> composedFuture = future.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " How are you?"));
String result = composedFuture.join();
System.out.println(result);

任务结果的消费

1. 使用thenAccept()

thenAccept()方法用于在任务完成后消费结果。该方法接受一个Consumer函数式接口作为参数,表示消费结果的逻辑。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

future.thenAccept(result -> System.out.println("Result: " + result));

2. 使用thenRun()

thenRun()方法用于在任务完成后执行一个操作,但不消费结果。该方法接受一个Runnable函数式接口作为参数,表示要执行的操作。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

future.thenRun(() -> System.out.println("Task completed."));

任务结果的合并

1. 使用thenCombine()

thenCombine()方法用于将两个CompletableFuture的结果合并。该方法接受另一个CompletableFuture和一个BiFunction函数式接口作为参数,表示合并逻辑。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "CompletableFuture!";
});

CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
String result = combinedFuture.join();
System.out.println(result);

2. 使用thenAcceptBoth()

thenAcceptBoth()方法用于在两个CompletableFuture完成后消费它们的结果。该方法接受另一个CompletableFuture和一个BiConsumer函数式接口作为参数,表示消费逻辑。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "CompletableFuture!";
});

future1.thenAcceptBoth(future2, (result1, result2) -> System.out.println(result1 + " " + result2));

任务结果的等待

1. 使用allOf()

allOf()方法用于等待多个CompletableFuture全部完成。该方法接受一个CompletableFuture数组作为参数,并返回一个新的CompletableFuture,该CompletableFuture在所有任务完成后完成。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "CompletableFuture!";
});

CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
allFutures.join();
System.out.println("All tasks completed.");

2. 使用anyOf()

anyOf()方法用于等待多个CompletableFuture中的任意一个完成。该方法接受一个CompletableFuture数组作为参数,并返回一个新的CompletableFuture,该CompletableFuture在任意一个任务完成后完成。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "CompletableFuture!";
});

CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
Object result = anyFuture.join();
System.out.println("First completed task result: " + result);

任务结果的取消

1. 使用cancel()

cancel()方法用于取消一个CompletableFuture的执行。该方法接受一个boolean参数,表示是否中断正在执行的任务。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

future.cancel(true);
System.out.println("Task cancelled.");

任务结果的超时处理

1. 使用orTimeout()

orTimeout()方法用于设置一个超时时间,如果任务在指定时间内未完成,则抛出TimeoutException异常。该方法接受一个long参数,表示超时时间(以毫秒为单位)。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

future.orTimeout(1000, TimeUnit.MILLISECONDS).exceptionally(ex -> {
    System.out.println("Task timed out: " + ex.getMessage());
    return null;
}).join();

2. 使用completeOnTimeout()

completeOnTimeout()方法用于设置一个超时时间,如果任务在指定时间内未完成,则使用默认值完成任务。该方法接受一个T参数,表示默认值,以及一个long参数,表示超时时间(以毫秒为单位)。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

future.completeOnTimeout("Timeout", 1000, TimeUnit.MILLISECONDS).join();
System.out.println("Task completed with: " + future.join());

任务结果的依赖

1. 使用thenCompose()

thenCompose()方法用于在任务完成后执行另一个异步任务,并将两个任务的结果组合在一起。该方法接受一个Function函数式接口作为参数,表示要执行的另一个异步任务。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟一个耗时任务
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello, CompletableFuture!";
});

CompletableFuture<String> composedFuture = future.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " How are you?"));
String result = composedFuture.join();
System.out.println(result);

2. 使用thenCombine()

thenCombine()方法用于将两个CompletableFuture的结果合并。该方法接受另一个CompletableFuture和一个BiFunction函数式接口作为参数,表示合并逻辑。

”`java CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { // 模拟一个耗时任务 try { Thread

推荐阅读:
  1. java怎么校验json的格式是否符合要求
  2. Java中StringBuffer和StringBuilder怎么使用

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java completablefuture

上一篇:java如何遍历数组中的每一个元素

下一篇:java内存泄漏如何检查

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》