您好,登录后才能下订单哦!
小编给大家分享一下Spark sql流式处理的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
Spark sql支持流式处理,流式处理有Source,Sink。Source定义了流的源头,Sink定义了流的目的地,流的执行是从Sink开始触发的。
Dataset的writeStream定义了流的目的地并触发流的真正执行,所以分析就从writeStream开始。
writeStream = new DataStreamWriter[T](this)
DataStreamWriter
DataStreamWriter的作用是将入参的dataset写入到外部存储,比如kafka,database,txt等。
主要触发方法是start方法,返回一个StreamingQuery对象,代码:
def start(): StreamingQuery = { if (source == "memory") { assertNotPartitioned("memory") val (sink, resultDf) = trigger match { case _: ContinuousTrigger => val s = new MemorySinkV2() val r = Dataset.ofRows(df.sparkSession, new MemoryPlanV2(s, df.schema.toAttributes)) (s, r) case _ => val s = new MemorySink(df.schema, outputMode) val r = Dataset.ofRows(df.sparkSession, new MemoryPlan(s)) (s, r) } val chkpointLoc = extraOptions.get("checkpointLocation") val recoverFromChkpoint = outputMode == OutputMode.Complete() val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), chkpointLoc, df, extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = true, recoverFromCheckpointLocation = recoverFromChkpoint, trigger = trigger) resultDf.createOrReplaceTempView(query.name) query } else if (source == "foreach") { assertNotPartitioned("foreach") val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc) df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = true, trigger = trigger) } else { val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",") val sink = ds.newInstance() match { case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w case _ => val ds = DataSource( df.sparkSession, className = source, options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil)) ds.createSink(outputMode) } df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = source == "console", recoverFromCheckpointLocation = true, trigger = trigger) } }
我们这里看最后一个条件分支的代码,ds是对应的DataSource,sink有时候就是ds。最后通过streamingQueryManager的startQuery启动流的计算,返回计算中的StreamingQuery对象。
streamingQueryManager的startQuery方法里主要调用createQuery方法创建StreamingQueryWrapper对象,这是个私有方法:
private def createQuery( userSpecifiedName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: DataFrame, extraOptions: Map[String, String], sink: BaseStreamingSink, outputMode: OutputMode, useTempCheckpointLocation: Boolean, recoverFromCheckpointLocation: Boolean, trigger: Trigger, triggerClock: Clock): StreamingQueryWrapper = { var deleteCheckpointOnStop = false val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => new Path(userSpecified).toUri.toString }.orElse { df.sparkSession.sessionState.conf.checkpointLocation.map { location => new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString } }.getOrElse { if (useTempCheckpointLocation) { // Delete the temp checkpoint when a query is being stopped without errors. deleteCheckpointOnStop = true Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath } else { throw new AnalysisException( "checkpointLocation must be specified either " + """through option("checkpointLocation", ...) or """ + s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") } } // If offsets have already been created, we trying to resume a query. if (!recoverFromCheckpointLocation) { val checkpointPath = new Path(checkpointLocation, "offsets") val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) if (fs.exists(checkpointPath)) { throw new AnalysisException( s"This query does not support recovering from checkpoint location. " + s"Delete $checkpointPath to start over.") } } val analyzedPlan = df.queryExecution.analyzed df.queryExecution.assertAnalyzed() if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) } if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + "is not supported in streaming DataFrames/Datasets and will be disabled.") } (sink, trigger) match { case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) => UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) new StreamingQueryWrapper(new ContinuousExecution( sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, v2Sink, trigger, triggerClock, outputMode, extraOptions, deleteCheckpointOnStop)) case _ => new StreamingQueryWrapper(new MicroBatchExecution( sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, sink, trigger, triggerClock, outputMode, extraOptions, deleteCheckpointOnStop)) } }
它根据是否连续流操作还是微批处理操作分成ContinuousExecution和MicroBatchExecution,他们都是StreamExecution的子类,StreamExecution是流处理的抽象类。稍后会分析StreamExecution的类结构。
ContinuousExecution和MicroBatchExecution两者的代码结构和功能其实是很类似的,我们先拿ContinuousExecution举例吧。
ContinuousExecution
首先ContinuousExecution是没有结束的,是没有结束的流,当暂时流没有数据时,ContinuousExecution会阻塞线程等待新数据的到来,这是通过awaitEpoch方法来控制的。
其实,commit方法在每条数据处理完后被触发,commit方法将当前处理完成的偏移量(offset)写到commitLog中。
再看logicalPlan,在ContinuousExecution中入参的逻辑计划是StreamingRelationV2类型,会被转换成ContinuousExecutionRelation类型的LogicalPlan:
analyzedPlan.transform {
case r @ StreamingRelationV2(
source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
toExecutionRelationMap.getOrElseUpdate(r, {
ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
})
}
还有addOffset方法,在每次读取完offset之后会将当前的读取offset写入到offsetLog中,以便下次恢复时知道从哪里开始。addOffset和commit两个方法一起保证了Exactly-once语义的执行。
以上是“Spark sql流式处理的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。