您好,登录后才能下订单哦!
这篇文章主要介绍“Dstream的创建方法”,在日常操作中,相信很多人在Dstream的创建方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Dstream的创建方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。
案例
object SparkStreaming02_RDDQueue { def main(args: Array[String]): Unit = { //创建配置文件对象 val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]") //创建SparkStreaming上下文环境对象 val ssc: StreamingContext = new StreamingContext(conf,Seconds(3)) //创建队列,里面放的是RDD val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]() //从队列中采集数据,获取DS val queueDS: InputDStream[Int] = ssc.queueStream(rddQueue,false) //处理采集到的数据 val resDS: DStream[(Int, Int)] = queueDS.map((_,1)).reduceByKey(_+_) //打印结果 resDS.print() //启动采集器 ssc.start() //循环创建RDD,并将创建的RDD放到队列里 for( i <- 1 to 5){ rddQueue.enqueue(ssc.sparkContext.makeRDD(6 to 10)) Thread.sleep(2000) } ssc.awaitTermination() } }
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
用一个案例来说明
/** * Author: Felix * Date: 2020/5/20 * Desc: 通过自定义数据源方式创建DStream * 模拟从指定的网络端口获取数据 */ object SparkStreaming03_CustomerReceiver { def main(args: Array[String]): Unit = { //创建配置文件对象 val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]") //创建SparkStreaming上下文环境对象 val ssc: StreamingContext = new StreamingContext(conf,Seconds(3)) //通过自定义数据源创建Dstream val myDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("hadoop202",9999)) //扁平化 val flatMapDS: DStream[String] = myDS.flatMap(_.split(" ")) //结构转换 进行计数 val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1)) //聚合 val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_) //打印输出 reduceDS.print ssc.start() ssc.awaitTermination() } } //Receiver[T] 泛型表示的是 读取的数据类型 class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ private var socket: Socket = _ // 真正的处理接收数据的逻辑 def receive() { try { //创建连接 socket = new Socket(host,port) //根据连接对象获取输入流 val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8)) //定义一个变量,用于接收读取到的一行数据 var input:String = null while((input = reader.readLine())!= null){ store(input) } } catch { case e: ConnectException => restart(s"Error connecting to $host:$port", e) return } finally { onStop() } } override def onStart(): Unit = { new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() } override def onStop(): Unit = { synchronized { if (socket != null) { socket.close() socket = null } } } }
需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency>
编写代码 0-8Receive模式,offset维护在zk中,程序停止后,继续生产数据,再次启动程序,仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看
object Spark04_ReceiverAPI { def main(args: Array[String]): Unit = { //1.创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]") //2.创建StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) //3.使用ReceiverAPI读取Kafka数据创建DStream val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "hadoop202:2181,hadoop203:2181,hadoop204:2181", "bigdata", //v表示的主题的分区数 Map("mybak" -> 2)) //4.计算WordCount并打印 new KafkaProducer[String,String]().send(new ProducerRecord[]()) val lineDStream: DStream[String] = kafkaDStream.map(_._2) val word: DStream[String] = lineDStream.flatMap(_.split(" ")) val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1)) val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _) wordToCountDStream.print() //5.开启任务 ssc.start() ssc.awaitTermination() } }
需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency>
编写代码(自动维护offset1)
offset维护在checkpoint中,但是获取StreamingContext的方式需要改变,目前这种方式会丢失消息
object Spark05_DirectAPI_Auto01 { def main(args: Array[String]): Unit = { //1.创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("Spark05_DirectAPI_Auto01").setMaster("local[*]") //2.创建StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp") //3.准备Kafka参数 val kafkaParams: Map[String, String] = Map[String, String]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092", ConsumerConfig.GROUP_ID_CONFIG -> "bigdata" ) //4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("mybak")) //5.计算WordCount并打印 kafkaDStream.map(_._2) .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print() //6.开启任务 ssc.start() ssc.awaitTermination() } }
编写代码(自动维护offset2)
offset维护在checkpoint中,获取StreamingContext为getActiveOrCreate
这种方式缺点:
checkpoint小文件过多
checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次
object Spark06_DirectAPI_Auto02 { def main(args: Array[String]): Unit = { val ssc: StreamingContext = StreamingContext.getActiveOrCreate("D:\\dev\\workspace\\my-bak\\spark-bak\\cp", () => getStreamingContext) ssc.start() ssc.awaitTermination() } def getStreamingContext: StreamingContext = { //1.创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("DirectAPI_Auto01").setMaster("local[*]") //2.创建StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp") //3.准备Kafka参数 val kafkaParams: Map[String, String] = Map[String, String]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092", ConsumerConfig.GROUP_ID_CONFIG -> "bigdata" ) //4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("mybak")) //5.计算WordCount并打印 kafkaDStream.map(_._2) .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print() //6.返回结果 ssc } }
编写代码(手动维护offset)
object Spark07_DirectAPI_Handler { def main(args: Array[String]): Unit = { //1.创建SparkConf val conf: SparkConf = new SparkConf().setAppName("DirectAPI_Handler").setMaster("local[*]") //2.创建StreamingContext val ssc = new StreamingContext(conf, Seconds(3)) //3.创建Kafka参数 val kafkaParams: Map[String, String] = Map[String, String]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092", ConsumerConfig.GROUP_ID_CONFIG -> "bigdata" ) //4.获取上一次消费的位置信息 val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]( TopicAndPartition("mybak", 0) -> 13L, TopicAndPartition("mybak", 1) -> 10L ) //5.使用DirectAPI手动维护offset的方式消费数据 val kafakDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ssc, kafkaParams, fromOffsets, (m: MessageAndMetadata[String, String]) => m.message()) //6.定义空集合用于存放数据的offset var offsetRanges = Array.empty[OffsetRange] //7.将当前消费到的offset进行保存 kafakDStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd => for (o <- offsetRanges) { println(s"${o.fromOffset}-${o.untilOffset}") } } //8.开启任务 ssc.start() ssc.awaitTermination() } }
需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
导入依赖,为了避免和0-8冲突,我们新建一个module演示
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.1</version> </dependency>
3)编写代码
object Spark01_DirectAPI010 { def main(args: Array[String]): Unit = { //1.创建SparkConf val conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]") //2.创建StreamingContext val ssc = new StreamingContext(conf, Seconds(3)) //3.构建Kafka参数 val kafkaParmas: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) //4.消费Kafka数据创建流 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas)) //5.计算WordCount并打印 kafkaDStream.map(_.value()) .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print() //6.启动任务 ssc.start() ssc.awaitTermination() } }
0-8 ReceiverAPI:
1)专门的Executor读取数据,速度不统一
2)跨机器传输数据,WAL
3)Executor读取数据通过多个线程的方式,想要增加并行度,则需要多个流union
4)offset存储在Zookeeper中
0-8 DirectAPI:
1)Executor读取数据并计算
2)增加Executor个数来增加消费的并行度
3)offset存储
a)CheckPoint(getActiveOrCreate方式创建StreamingContext)
b)手动维护(有事务的存储系统)
c)获取offset必须在第一个调用的算子中:offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0-10 DirectAPI:
1)Executor读取数据并计算
2)增加Executor个数来增加消费的并行度
3)offset存储
i.a.__consumer_offsets系统主题中
ii.b.手动维护(有事务的存储系统)
到此,关于“Dstream的创建方法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。