Fork/Join框架怎么用

发布时间:2021-12-04 09:48:18 作者:小新
来源:亿速云 阅读:140

这篇文章给大家分享的是有关Fork/Join框架怎么用的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

fork/join框架是ExecutorService接口的一种具体实现,会将任务分发给线程池中的工作线程,更好地利用多处理器带来的好处,提供程序性能。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。

fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。

fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。

使用fork/join框架的第一步是编写执行一部分工作的代码,类似的伪代码如下:

if (当前这个任务工作量足够小)

  直接完成这个任务

else

  把当前任务分解成两个部分
  调用这两个部分并等待结果

此代包装在ForkJoinTask的子类中。不过,通常是RecursiveTask(会返回一个结果),或RecursiveAction。

fork分解出新任务,join汇集任务结果,其大致过程如下:

Fork/Join框架怎么用

Fork/Join实际用例

fork/join的核心思想就是分而治之,将一个大任务拆分成一个一个的小任务。fork/join的使用需要定义一个任务类去实现RecursiveTask或RecursiveAction,重写compute()方法,在compute()方法中定义任务拆分的逻辑,然后借助ForkJoinPool提交任务去执行,fork/join框架会根据compute()方法中定义的拆分逻辑对任务进行具体的拆分,如果有返回值,可以借助ForkJoinTask获取返回值。假设现在有很多网络请求需要并发的去执行,然后汇总结果,使用fork/join的代码实现如下:

public class ForkJoinTest {

  // 测试数据  
  static ArrayList<String> urls =
      new ArrayList<String>() {
        {
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
          add("http://www.baidu.com");
          add("http://www.sina.com");
        }
      };

  // 核心ForkJoinPool
  static ForkJoinPool firkJoinPool =
      new ForkJoinPool(3, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
  public static void main(String args[]) throws ExecutionException, InterruptedException {
    Job job = new Job(urls, 0, urls.size());
    ForkJoinTask<String> forkJoinTask = firkJoinPool.submit(job);
    String result = forkJoinTask.get();
    System.out.println(result);
  }
  public static String doRequest(String url) {
    // 模拟网络请求
    return "Kody ... test ... " + url + "\n";
  }

  // 自定义任务  
  static class Job extends RecursiveTask<String> {

    List<String> urls;
    int start;

    int end;

    public Job(List<String> urls, int start, int end) {
      this.urls = urls;
      this.start = start;
      this.end = end;

    }

    /** 核心任务的计算,任务拆分逻辑实现 */
    @Override
    protected String compute() {

      // 计算任务的大小      
      int count = end - start;

      // 如果任务拆分的足够小,则执行任务      
      if (count <= 5) {
        // 直接执行        
        String result = "";
        for (int i = start; i < end; i++) {
          String response = doRequest(urls.get(i));
          result += response;
        }
        return result;

      } else { // 如果任务较大,继续拆分任务

        int x = (start + end) / 2;
        Job job1 = new Job(urls, start, x);
        job1.fork();
        Job job2 = new Job(urls, x, end);
        job2.fork();

        // 汇总fork出的子任务结果        
        return String.join("", job1.join(), job2.join());

      }
    }
  }
}

感谢各位的阅读!关于“Fork/Join框架怎么用”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

推荐阅读:
  1. Flink如何与Hadoop和Spark区别开来
  2. 在Flink中如何实现实时数据流处理

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

上一篇:Python有哪些基础知识是必学的

下一篇:网页里段落的html标签是哪些

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》