您好,登录后才能下订单哦!
重点来了!!!如果要不断的更新每个key的state,就一定会涉及到状态的保存和容错,这个时候就需要开启checkpoint机制和功能,需要说明的是checkpoint可以保存一切可以存储在文件系统上的内容,例如:程序未处理的数据及已经拥有的状态。
补充说明:关于流式处理对历史状态进行保存和更新具有重大实用意义,例如进行广告(投放广告和运营广告效果评估的价值意义,热点随时追踪、热力图)
简单的来说,如果我们需要进行wordcount,每个batchInterval都会计算出新的一批数据,这批数据如何更新到以前计算的结果上?updateStateByKey就能实现此功能。
函数定义如下:
def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner()) }
updateStateByKey 需要传入一个函数,该函数有两个参数Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key对应的以前的值。返回的时一个key的最新值。
下面我们用实例演示:
package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2016/5/3. */ object UpdateStateByKeyDemo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("UpdateStateByKeyDemo") val ssc = new StreamingContext(conf,Seconds(20)) //要使用updateStateByKey方法,必须设置Checkpoint。 ssc.checkpoint("/checkpoint/") val socketLines = ssc.socketTextStream("spark-master",9999) socketLines.flatMap(_.split(",")).map(word=>(word,1)) .updateStateByKey( (currValues:Seq[Int],preValue:Option[Int]) =>{ val currValue = currValues.sum Some(currValue + preValue.getOrElse(0)) }).print() ssc.start() ssc.awaitTermination() ssc.stop() } }
打包上传至spark集群。
打开nc,发送测试数据
root@spark-master:~# nc -lk 9999 hadoop,spark,scala,hive hadoop,Hbase,spark
运行spark 程序
root@spark-master:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.UpdateStateByKeyDemo --master spark://spark-master:7077 ./spark.jar
查看运行结果:
------------------------------------------- Time: 1462282180000 ms ------------------------------------------- (scala,1) (hive,1) (spark,2) (hadoop,2) (Hbase,1)
我们在nc中再输入一些数据
root@spark-master:~# nc -lk 9999 hadoop,spark,scala,hive hadoop,Hbase,spark hadoop,spark,scala,hive hadoop,Hbase,spark
再次查看结果:
------------------------------------------- Time: 1462282200000 ms ------------------------------------------- (scala,2) (hive,2) (spark,4) (hadoop,4) (Hbase,2)
可见,它将我们两次统计结果合并了。
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
开发者交流群:
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。