您好,登录后才能下订单哦!
密码登录
            
            
            
            
        登录注册
            
            
            
        点击 登录注册 即表示同意《亿速云用户服务条款》
        实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。
使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer。最后调用flatMap将ArrayBuffer中的元素分拆。
比如下面的代码实现了对某个字段聚合时,按照时间条件进行选择性的聚合:
val rdd1 = sc.textFile(dayDayDir).union(sc.textFile(thisDayDir))
    .map(line => line.split("\\|"))
    .filter(arr => if(arr.length != 14 || !arr(3).substring(0, 8).equals(lastDay)) false else true)
    .map(arr => (arr(0), arr))
    .reduceByKey( (pure, after) => reduceSession(pure, after))
    .map(tup => (tup._2(13), tup._2))
    .combineByKey( x => ArrayBuffer(x),
    (x:ArrayBuffer[Array[String]],y) => combineMergeValue(x, y),
    (x:ArrayBuffer[Array[String]],y:ArrayBuffer[Array[String]]) => combineMergeCombiners(x, y))
    .flatMap(tup => arrToStr(tup._2))
def combineMergeValue(x:ArrayBuffer[Array[String]], y:Array[String])
                    : ArrayBuffer[Array[String]] = {
    var outList = x.clone()
    var outarr = y.clone()
    var flag = true
    for(i <- 0 until outList.length){
        if(checkTime(outList(i)(3), outList(i)(4), y(3), y(4))) {
            outarr = reduceSession(outList(i), y)
            outList(i) = outarr
            flag = false
        }
    }
    if(flag) {
        outList += y
    }
    outList
}
def combineMergeCombiners(x:ArrayBuffer[Array[String]], y:ArrayBuffer[Array[String]])
                : ArrayBuffer[Array[String]] = {
    var outList = x.clone();
    for(i <- 0 until y.length){
    var outarr = y(i).clone()
    var flag = true
    for(j <- 0 until outList.length){
        if(checkTime(outList(j)(3), outList(j)(4), y(i)(3), y(i)(4))) {
            outarr = reduceSession(outList(j), y(i))
            outList(j) = outarr
            flag = false
        }
    }
    if(flag) {
        outList += y(i)
    }
    }
    outList
}
													免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。