6.spark core之键值对操作

发布时间:2020-07-12 23:33:29 作者:菲立思教育
来源:网络 阅读:368

  键值对RDD(pair RDD)是spark中许多操作所需要的常见数据类型,通常用来进行聚合计算。

创建Pair RDD

  spark有多种方式可以创建pair RDD。比如:很多存储键值对的数据格式在读取时直接返回pair RDD;通过map()算子将普通的RDD转为pair RDD。

scala

# 使用第一个单词作为键创建一个pair RDD
val pairs = lines.map(x => (x.split(" ")(0), x))

java

# 使用第一个单词作为键创建一个pair RDD
# jdk1.8后也支持lambda表达式方式
PairFunction<String, String, String> keyData = new PairFunction<String, String, String>() {
  public Tuple2<String, String> call(String x) {
    return new Tuple2(x.split(" ")[0], x);
  }
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);

python

# 使用第一个单词作为键创建一个pair RDD
pairs = lines.map(lambda x: (x.split(" ")[0], x))

  从一个内存中的数据集创建pair RDD时,scala和python只需要对这个二元组集合调用SparkContext的parallelize()方法即可;而java需要使用SparkContext.parallelizePairs()方法。

pair RDD转化操作

转化操作总览

针对单个Pair RDD的转化操作
函数名 作用 示例
reduceByKey(func) 合并具有相同键的值 rdd.reduceByKey((x, y) => x + y)
groupByKey() 对具有相同键的值进行分组 rdd.groupByKey()
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) 使用不同的返回类型合并具有相同键的值 rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
mapValues(func) 对pair RDD中的每个值应用一个函数而不改变键 rdd.mapValues(x => x + 1)
flatMapValues(func) 对pair RDD中的每个值应用一个返回迭代器的函数,生成对应原键的键值对记录 rdd.flatMapValues(x => (x to 5))
keys() 返回一个仅包含键的RDD rdd.keys
values() 返回一个仅包含值得RDD rdd.values
sortByKey() 返回一个根据键排序的RDD rdd.sortByKey()
针对两个Pair RDD的转化操作
函数名 作用 示例
subtractByKey 删除RDD中键与other RDD中键相同的元素 rdd.subtractByKey(other)
join 对两个RDD进行内连接 rdd.join(other)
leftOuterJoin 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接) rdd.leftOuterJoin(other)
rightOuterJoin 对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接) rdd.rightOuterJoin(other)
cogroup 将两个RDD中拥有相同键的数据分组在一起 rdd.cogroup(other)

聚合

scala
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
python
rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
scala
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
java
JavaRDD<String> input = sc.textFile("s3://...");
JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
   public Iterable<String> call(String x) {
        return Arrays.asList(x.split(" "));
   }
});
JavaPairRDD<String, Integer> result = words.mapToPair(new PairFunction<String, String, Integer>() {
  public Tuple2<String, Integer> call(String x) {
    return new Tuple2(x, 1);
  }
}).reduceByKey(
    new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
)
python
rdd = sc.textFile("s3://...")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
scala
val result = rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).map{case (key, value) => (key, value._1 / value._2.toFloat)}
java
public static class AvgCount implements Serializable {
    public int total_;
    public int num_;
    public AvgCount(int total, int num) {
        total_ = total;
        num_ = num;
    }
    public float avg() {
        return total_/(float)num_;
    }
}

Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
    public AvgCount call(Integer x) {
        return new AvgCount(x, 1);
    }
};

Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() {
    public AvgCount call(AvgCount a, Integer x) {
        a.total_ += x;
        a.num_ += 1;
        return a;
    }
};

Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
    public AvgCount call(AvgCount a, AvgCount b) {
        a.total_ += b.total_;
        a.num_ += b.num_;
        return a;
    }
};

AvgCount initial = new AvgCount(0, 0);
JavaPairRDD<String, AvgCount> avgCounts = input.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
    System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
python
sumCount = input.combineByKey((lambda x: (x, 1)), (lambda x, y: (x[0] + y, x[1] + 1)), (lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

分组

  对于单个RDD数据进行分组时,使用groupByKey()。如果先使用groupByKey(),再使用reduce()或fold()时,可能使用一种根据键进行聚合的函数更高效。比如,rdd.reduceByKey(func)与rdd.groupByKey().mapValues(value => value.reduce(func))等价,但前者更高效,因为避免了为每个键存放值列表的步骤。

  对多个共享同一个键的RDD进行分组时,使用cogroup()。cogroup方法会得到结果RDD类型为[(K, (Iterable[V], Iterable[W]))]。

连接

  将一组有键的数据与另一组有键的数据连接使用是对键值对数据执行的常用操作。连接方式主要有:内连接、左外连接、右外连接。

val storeAddress = sc.parallelize(Seq((Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"), (Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")))
val storeRating = sc.parallelize(Seq(Store("Ritual"), 4.9), (Store("Philz"), 4.8)))
# 内连接
storeAddress.join(storeRating)
#左外连接
storeAddress.leftOuterJoin(storeRating)
#右外连接
storeAddress.rightOuterJoin(storeRating)

排序

  将数据排序输出是很常见的场景。sortByKey()函数接收一个叫做ascending的参数,表示是否让结果升序排序(默认true)。有时,也可以提供自定义比较函数。比如,以字符串顺序对整数进行自定义排序。

scala
implicit val sortIntegersByString = new Ordering[Int] {
    override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()
java
class IntegerComparator implements Comparator<Integer> {
    public int compare(Integer a, Integer b) {
        return String.valueOf(a).compareTo(String.valueOf(b))
    }
}
rdd.sortByKey(new IntegerComparator());
python
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x))

Pair RDD行动操作

  和转化操作一样,所有基础RDD支持的行动操作也都在pair RDD上可用。另外,Pair RDD提供了一些额外的行动操作。

函数 作用 示例
countByKey 对每个键对应的元素分别计数 rdd.countByKey()
collectAsMap 将结果以映射表的形式返回 rdd.collectAsMap()
lookup(key) 返回指定键对应的所有值 rdd.lookup(3)

忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。

6.spark core之键值对操作

推荐阅读:
  1. 【全集】IDEA入门到实战
  2. 【问题】spark运行python写的mapreduce任务,hadoop平台报错,java.net.ConnectException: 连接超时

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark pair rdd 聚合

上一篇:Django 模板语法取值

下一篇:TCP 的三次握手,四次挥手和重要的细节—干货满满,建议细读

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》