【总结】Spark优化(1)-多Job并发执行

发布时间:2020-09-20 07:42:12 作者:巧克力黒
来源:网络 阅读:18337

Spark程序中一个Job的触发是通过一个Action算子,比如count(), saveAsTextFile()等

在这次Spark优化测试中,从Hive中读取数据,将其另外保存四份,其中两个Job采用串行方式,另外两个Job采用并行方式。将任务提交到Yarn中执行。能够明显看出串行与兵线处理的性能。


每个Job执行时间:

JobID开始时间结束时间耗时
Job 016:59:4517:00:3449s
Job 117:00:3417:01:1339s
Job 217:01:1517:01:55
40s
Job 317:01:1617:02:1256s

四个Job都是自执行相同操作,Job0,Job1一组采用串行方式,Job2,Job3采用并行方式。

Job0,Job1串行方式耗时等于两个Job耗时之和 49s+39s=88s

Job2,Job3并行方式耗时等于最先开始和最后结束时间只差17:02:12-17:01:15=57s


【总结】Spark优化(1)-多Job并发执行

代码:

package com.cn.ctripotb;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;


import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * Created by Administrator on 2016/9/12.
 */
public class HotelTest {
    static ResourceBundle rb = ResourceBundle.getBundle("filepath");
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("MultiJobWithThread")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        JavaSparkContext sc = new JavaSparkContext(conf);
        HiveContext hiveContext = new HiveContext(sc.sc());  //测试真实数据时要把这里放开

        final DataFrame df = getHotelInfo(hiveContext);
        //没有多线程处理的情况,连续执行两个Action操作,生成两个Job
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file1",com.hadoop.compression.lzo.LzopCodec.class);
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file2",com.hadoop.compression.lzo.LzopCodec.class);

        //用Executor实现多线程方式处理Job
        java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(new Callable<Void>() {
            @Override
            public Void call(){
                df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class);
                return null;
            }
        });
        executorService.submit(new Callable<Void>() {
            @Override
            public Void call(){
                df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class);
                return null;
            }
        });

        executorService.shutdown();
    }
    public static DataFrame getHotelInfo(HiveContext hiveContext){
        String sql = "select * from common.dict_hotel_ol";
        return  hiveContext.sql(sql);
    }
}


推荐阅读:
  1. Hbase万亿级存储性能优化总结
  2. hbase学习笔记1——脚本简单总结

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

优化 thread spark

上一篇:Restful API中的错误处理方法

下一篇:Apache Flink 官方文档--概览

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》