您好,登录后才能下订单哦!
本篇文章给大家分享的是有关结构化Kafka sql的代码框架是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
结构化流的典型应用是持续的读取kafka流。实现机制从SparkSession的readStream开始,readStream就是DataStreamReader:
def readStream: DataStreamReader = new DataStreamReader(self)
下面从DataStreamReader开始。可以想象得到,最终肯定是生成一个RDD来持续读取kafka流数据。
例子:
// Create DataFrame representing the stream of input lines from connection to localhost:9999 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
分两步:找到TableProvider;找到SupportRead然后生成StreamingRelationV2。
最后用StreamingRelationV2来调用Dataset.ofRows返回DataFrame,DataFrame就是Dataset[Row]。
下面首先要看看TableProvider接口和SupportRead接口是啥东东。
TableProvider接口未找到在哪里定义。
先看看kafkaSourceRDD这个类,这是基础类,最基础的来读取kafka数据的RDD,入参包含一个offsetRange,表示读取kafka数据的区间范围。如果是Kafka.lastest则可以表示永久读取kafka。
既然是RDD,那么最重要的方法就是compute方法了,代码不解析了很简单,就是用Kafka的API来读取kafka分区的数据,形成RDD。
KafkaSource顾名思义就是Kafka的读取者。
KafkaSource的父类是Source,最重要的方法是:getOffset和getBatch。
getBatch返回DataFrame,那么getBatch又是怎么返回DataFrame的呢?看代码就知道原来是通过创建KafkaSourceRDD来达到生成DataFrame的目的的。所以可以认为KafkaSource是KafkaSourceRDD的一种封装形式罢了。
The provider class for all Kafka readers and writers。这个类是用来生成各种各样的Kafka的读取者和写入者的,比较重要,先看看这个类的定义:
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSourceProvider
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with TableProvider
with Logging
继承了很多的特性或接口。比如:StreamSourceProvider、TableProvider、RelationProvider等等。我们这里就看看和读相关的特性吧,和写相关的不看了(道理差不多)。
(1)createSource
createSource方法返回Source,看代码其实返回的是KafkaSource,KafkaSource前面已经说过了,这里就不涉及了。
(2)createRelation
createRelation返回BaseRelation,实际返回的是KafkaRelation。
KafkaRelation继承BaseRelation,重写父 类的buildScan方法,buildScan方法返回KafkaSourceRDD作为RDD[Row]。
(3)KafkaTable
KafkaTable继承Table并且继承SupportsRead特性,其定义:
class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite
里面辗转反侧看看如何生成ContinuousStream,主要是方法toContinuousStream,返回的ContinuousStream就是KafkaContinuousStream。
(4)KafkaContinuousStream
KafkaContinuousStream继承自ContinuousStream,具体的看代码,最后反正都是调用了Kafka的API来读取数据,所不同的只是外部表现形式的不同罢了。
以上就是结构化Kafka sql的代码框架是怎样的,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。