怎么实现SparkStreaming转化操作

发布时间:2021-12-17 10:47:48 作者:柒染
来源:亿速云 阅读:187

怎么实现SparkStreaming转化操作,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

DStream的转化操作分为无状态有状态 两种

无状态转化


无状态转化操作的实质就说把简单的RDD转化操作应用到每个批次上,也就是转化DStream的每一个RDD

Transform算子

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也 就是对 DStream 中的 RDD 应用转换。

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")
    val sc: StreamingContext = new StreamingContext(conf, Seconds(3))
    val lines = sc.socketTextStream("localhost", 9999)

    // transform方法可以将底层RDD获取到后进行操作
    // 1. DStream功能不完善
    // 2. 需要代码周期性的执行

    // Code : Driver端
    val newDS: DStream[String] = lines.transform(
      rdd => {
        // Code : Driver端,(周期性执行)
        rdd.map(
          str => {
            // Code : Executor端
            str
          }
        )
      }
    )
    // Code : Driver端
    val newDS1: DStream[String] = lines.map(
      data => {
        // Code : Executor端
        data
      }
    )
    sc.start()
    sc.awaitTermination()
  }

join算子

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val data9999 = ssc.socketTextStream("localhost", 9999)
    val data8888 = ssc.socketTextStream("localhost", 8888)

    val map9999: DStream[(String, Int)] = data9999.map((_,9))
    val map8888: DStream[(String, Int)] = data8888.map((_,8))

    // 所谓的DStream的Join操作,其实就是两个RDD的join
    val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)

    joinDS.print()

    ssc.start()
    ssc.awaitTermination()
}

有状态转化


有状态转化操作是跨时间区间跟踪数据的操作,也就是说,一些先前批次的数据也被用来在新的批次中用于计算结果。有状态转换的主要的两种类型:

有状态转化操作需要在StreamingContext中打开检查点机制来提高容错

updateStateByKey

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("updateStateByKey")
    val sc: StreamingContext = new StreamingContext(conf, Seconds(4))
    sc.checkpoint("cp")
    val ds: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 9999)

    val value: DStream[(String, Int)] = ds.map(((_: String), 1))


    // updateStateByKey:根据key对数据的状态进行更新
    // 传递的参数中含有两个值
    // 第一个值表示相同的key的value数据的集合
    // 第二个值表示缓存区key对应的计算值
    val state: DStream[(String, Int)] = value.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {
      val newCount: Int = option.getOrElse(0) + seq.sum
      Option(newCount)
    })

    state.print()

    sc.start()
    sc.awaitTermination()

  }

窗口

所有基于窗口的函数都需要两个参数,分别对应窗口时长滑动步长,并且两者都必须是SparkStreaming的批次间隔的整数倍。
窗口时长控制的是每次用来计算的批次的个数
滑动步长用于控制对新的DStream进行计算的间隔

怎么实现SparkStreaming转化操作

window操作

基于window进行窗口内元素计数操作

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_,1))

    val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))

    val wordToCount = windowDS.reduceByKey(_+_)

    wordToCount.print()

    ssc.start()
    ssc.awaitTermination()
  }
reduce操作

有逆操作规约是一种更高效的规约操作,通过只考虑新进入窗口的元素和离开窗口的元素,让spark增量计算归约的结果,其在代码上的体现就是reduceFuncinvReduceFunc

怎么实现SparkStreaming转化操作

普通归约操作

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)

    lines.reduceByWindow(
      (x: String, y: String) => {
        x + "-" + y
      },
      Seconds(9), Seconds(3)
    ).print()

    ssc.start()
    ssc.awaitTermination()
  }

有逆归约操作

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_,1))

    /**
     * 基于窗口进行有逆归约:通过控制窗口流出和进入的元素来提高性能
     */
    val windowDS: DStream[(String, Int)] =
    wordToOne.reduceByKeyAndWindow(
      (x:Int, y:Int) => { x + y},
      (x:Int, y:Int) => {x - y},
      Seconds(9), Seconds(3))

    windowDS.print()

    ssc.start()
    ssc.awaitTermination()
  }
count操作
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)

    /**
     * 统计窗口中输入数据的个数
     * 比如 3s内输入了10条数据,则打印10
     */
    val countByWindow: DStream[Long] = lines.countByWindow(
      Seconds(9), Seconds(3)
    )
    countByWindow.print()

    /**
     * 统计窗口中每个值的个数
     * 比如 3s内输入了1个3 2个4 3个5,则打印(3,1)(2,4)(3,5)
     */
    val countByValueAndWindow: DStream[(String, Long)] = lines.countByValueAndWindow(
      Seconds(9), Seconds(3)
    )
    countByValueAndWindow.print()

    ssc.start()
    ssc.awaitTermination()
  }

看完上述内容,你们掌握怎么实现SparkStreaming转化操作的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

推荐阅读:
  1. SparkStreaming整合kafka的补充
  2. SparkStreaming整合kafka

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

sparkstreaming

上一篇:ceph-rest-api怎么用

下一篇:python匿名函数怎么创建

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》