如何进行Spark Streaming计算模型及监控

发布时间:2021-12-17 11:08:35 作者:柒染
来源:亿速云 阅读:181

如何进行Spark Streaming计算模型及监控,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

摘要

Spark Streaming是一套优秀的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们很多的场景应用。下面结合我们的应用场景,介结我们在使用Spark Streaming方面的技术架构,并着重讲解Spark  Streaming两种计算模型,无状态和状态计算模型以及该两种模型的注意事项;接着介绍了Spark  Streaming在监控方面所做的一些事情,最后总结了Spark Streaming的优缺点。

一、概述

数据是非常宝贵的资源,对各级企事业单均有非常高的价值。但是数据的爆炸,导致原先单机的数据处理已经无法满足业务的场景需求。因此在此基础上出现了一些优秀的分布式计算框架,诸如Hadoop、Spark等。离线分布式处理框架虽然能够处理非常大量的数据,但是其迟滞性很难满足一些特定的需求场景,比如push反馈、实时推荐、实时用户行为等。为了满足这些场景,使数据处理能够达到实时的响应和反馈,又随之出现了实时计算框架。目前的实时处理框架有Apache  Storm、Apache Flink以及Spark Streaming等。其中Spark  Streaming由于其本身的扩展性、高吞吐量以及容错能力等特性,并且能够和离线各种框架有效结合起来,因而是当下是比较受欢迎的一种流式处理框架。

根据其官方文档介绍,Spark Streaming 有高扩展性、高吞吐量和容错能力强的特点。Spark Streaming  支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark  的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。另外 Spark  Streaming 也能和 MLlib(机器学习)以及 Graphx ***融合。其架构见下图:

如何进行Spark Streaming计算模型及监控

Spark Streaming  其优秀的特点给我们带来很多的应用场景,如网站监控和网络监控、异常监测、网页点击、用户行为、用户迁移等。我们将为大家详细介绍,我们的应用场景中,Spark  Streaming的技术架构、两种状态模型以及Spark Streaming监控等。

二、应用场景

在 Spark Streaming 中,处理数据的单位是一批而不是单条,而数据采集却是逐条进行的,因此 Spark Streaming  系统需要设置间隔使得数据汇总到一定的量后再一并操作,这个间隔就是批处理间隔。批处理间隔是 Spark Streaming 的核心概念和关键参数,它决定了  Spark Streaming 提交作业的频率和数据处理的延迟,同时也影响着数据处理的吞吐量和性能。

如何进行Spark Streaming计算模型及监控

2.1 框架

目前我们Spark  Streaming的业务应用场景包括异常监测、网页点击、用户行为以及用户地图迁徙等场景。按计算模型来看大体可分为无状态的计算模型以及状态计算模型两种。在实际的应用场景中,我们采用Kafka作为实时输入源,Spark  Streaming作为计算引擎处理完数据之后,再持久化到存储中,包括MySQL、HDFS、ElasticSearch以及MongoDB等;同时Spark  Streaming 数据清洗后也会写入Kafka,然后经由Flume持久化到HDFS;接着基于持久化的内容做一些UI的展现。架构见下图:

如何进行Spark Streaming计算模型及监控

2.2 无状态模型

无状态模型只关注当前新生成的DStream数据,所以的计算逻辑均基于该批次的数据进行处理。无状态模型能够很好地适应一些应用场景,比如网站点击实时排行榜、指定batch时间段的用户访问以及点击情况等。该模型由于没有状态,并不需要考虑有状态的情况,只需要根据业务场景保证数据不丢就行。此种情况一般采用Direct方式读取Kafka数据,并采用监听器方式持久化Offsets即可。具体流程如下:

如何进行Spark Streaming计算模型及监控

其上模型框架包含以下几个处理步骤:

受网络、集群等一些因素的影响,实时程序出现长时失败,导致数据出现堆积。此种情况下是丢掉堆积的数据从Kafka largest处消费还是从之前的Kafka  offsets处消费,这个取决具体的业务场景。

2.3 状态模型

有状态模型是指DStreams在指定的时间范围内有依赖关系,具体的时间范围由业务场景来指定,可以是2个及以上的多个batch time  RDD组成。Spark  Streaming提供了updateStateByKey方法来满足此类的业务场景。因涉及状态的问题,所以在实际的计算过程中需要保存计算的状态,Spark  Streaming中通过checkpoint来保存计算的元数据以及计算的进度。该状态模型的应用场景有网站具体模块的累计访问统计、最近N batch time  的网站访问情况以及app新增累计统计等等。具体流程如下:

如何进行Spark Streaming计算模型及监控

上述流程中,每batch time计算时,需要依赖最近2个batch  time内的数据,经过转换及相关统计,最终持久化到MySQL中去。不过为了确保每个计算仅计算2个batch  time内的数据,需要维护数据的状态,清除过期的数据。我们先来看下updateStateByKey的实现,其代码如下:

def updateStateByKey[S: ClassTag](       updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],       partitioner: Partitioner,       rememberPartitioner: Boolean     ): DStream[(K, S)] = ssc.withScope {      new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) }

隐藏了全局状态数据中的key类型,仅对Value提供自定义的方法。

def updateStateByKey[S: ClassTag](       updateFunc: (Seq[V], Option[S]) => Option[S],       partitioner: Partitioner,       initialRDD: RDD[(K, S)]     ): DStream[(K, S)] = ssc.withScope {     val cleanedUpdateF = sparkContext.clean(updateFunc)     val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {       iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))     }     updateStateByKey(newUpdateFunc, partitioner, true, initialRDD) }

以上两种方法分别给我们提供清理过期数据的思路:

三、Spark Streaming监控

同Spark一样,Spark  Streaming也提供了Jobs、Stages、Storage、Enviorment、Executors以及Streaming的监控,其中Streaming监控页的内容如下图:

如何进行Spark Streaming计算模型及监控

上图是Spark UI中提供一些数据监控,包括实时输入数据、Scheduling  Delay、处理时间以及总延迟的相关监控数据的趋势展现。另外除了提供上述数据监控外,Spark UI还提供了Active Batches以及Completed  Batches相关信息。Active Batches包含当前正在处理的batch信息以及堆积的batch相关信息,而Completed  Batches刚提供每个batch处理的明细数据,具体包括batch time、input size、scheduling delay、processing  Time、Total Delay等,具体信息见下图:

如何进行Spark Streaming计算模型及监控

Spark Streaming能够提供如此优雅的数据监控,是因在对监听器设计模式的使用。如若Spark  UI无法满足你所需的监控需要,用户可以定制个性化监控信息。Spark  Streaming提供了StreamingListener特质,通过继承此方法,就可以定制所需的监控,其代码如下:

@DeveloperApi     trait StreamingListener {        /** Called when a receiver has been started */       def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }        /** Called when a receiver has reported an error */       def onReceiverError(receiverError: StreamingListenerReceiverError) { }        /** Called when a receiver has been stopped */       def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }        /** Called when a batch of jobs has been submitted for processing. */       def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }        /** Called when processing of a batch of jobs has started.  */       def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }        /** Called when processing of a batch of jobs has completed. */       def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }        /** Called when processing of a job of a batch has started. */       def onOutputOperationStarted(           outputOperationStarted: StreamingListenerOutputOperationStarted) { }        /** Called when processing of a job of a batch has completed. */       def onOutputOperationCompleted(           outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }     }

目前,我们保存Offsets时,采用继承StreamingListener方式,此是一种应用场景。当然也可以监控实时计算程序的堆积情况,并在达到一阈值后发送报警邮件。具体监听器的定制还得依据应用场景而定。

四、Spark Streaming优缺点

Spark Streaming并非是Storm那样,其并非是真正的流式处理框架,而是一次处理一批次数据。也正是这种方式,能够较好地集成Spark  其他计算模块,包括MLlib(机器学习)、Graphx以及Spark  SQL。这给实时计算带来很大的便利,与此带来便利的同时,也牺牲作为流式的实时性等性能。

4.1 优点

4.2 缺点

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。

推荐阅读:
  1. 从 Spark Streaming 到 Apache Flink : 实时数据流在爱奇艺的演进
  2. 1.spark简介

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark streaming

上一篇:Ceph基础数据结构有哪些

下一篇:python匿名函数怎么创建

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》