您好,登录后才能下订单哦!
本篇文章给大家分享的是有关Spark广播变量分析以及如何动态更新广播变量,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
广播变量存储目前基于Spark实现的BlockManager分布式存储系统,Spark中的shuffle数据、加载HDFS数据时切分过来的block块都存储在BlockManager中,不是今天的讨论点,这里先不做详述了。
广播变量的创建方式和获取
//创建广播变量
val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))
//获取广播变量
broadcastVar.value1.首先调用val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
3.通过广播工厂的newBroadcast方法进行创建
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
在调用BroadcastManager的newBroadcast方法时已完成对广播工厂的初始化(initialize方法),我们只需看BroadcastFactory的实现TorrentBroadcastFactory中对TorrentBroadcast的实例化过程:
new TorrentBroadcast[T](value_, id)
conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024广播变量初始化过程
从driver端或者其他的executor中读取,将读取的对象存储到本地,并存于缓存中
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
Spark两种广播变量对比
TorrentBroadcast在executor端存储一个对象的同时会将获取的block存储于BlockManager,并向driver端的BlockManager汇报block的存储信息。
总之就是HttpBroadcast导致获取广播变量的请求集中于driver端,容易引起driver端单点故障,网络IO过高影响性能等问题,而TorrentBroadcast获取广播变量的请求服务即可以请求到driver端也可以在executor,避免了上述问题,当然这只是主要的优化点。
既然无法更新,那么只能动态生成,应用场景有实时风控中根据业务情况调整规则库、实时日志ETL服务中获取最新的日志格式以及字段变更等。
@volatile private var instance: Broadcast[Array[Int]] = null
//获取广播变量单例对象
def getInstance(sc: SparkContext, ctime: Long): Broadcast[Array[Int]] = {
  if (instance == null) {
    synchronized {
      if (instance == null) {
        instance = sc.broadcast(fetchLastestData())
      }
    }
  }
  instance
}
//加载要广播的数据,并更新广播变量
def updateBroadCastVar(sc: SparkContext, blocking: Boolean = false): Unit = {
  if (instance != null) {
    //删除缓存在executors上的广播副本,并可选择是否在删除完成后进行block等待
    //底层可选择是否将driver端的广播副本也删除
    instance.unpersist(blocking)
    
    instance = sc.broadcast(fetchLastestData())
  }
}
def fetchLastestData() = {
  //动态获取需要更新的数据
  //这里是伪代码
  Array(1, 2, 3)
}val dataFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
...
...
stream.foreachRDD { rdd =>
  val current_time = dataFormat.format(new Date())
  val new_time = current_time.substring(14, 16).toLong
  //每10分钟更新一次
  if (new_time % 10 == 0) {
    updateBroadCastVar(rdd.sparkContext, true)
  }
  rdd.foreachPartition { records =>
    instance.value
    ...
  }
}Spark流式程序中为何使用单例模式
1.广播变量是只读的,使用单例模式可以减少Spark流式程序中每次job生成执行,频繁创建广播变量带来的开销
2.广播变量单例模式也需要做同步处理。在FIFO调度模式下,基本不会发生并发问题。但是如果你改变了调度模式,如采用公平调度模式,同时设置Spark流式程序并行执行的job数大于1,如设置参数spark.streaming.concurrentJobs=4,则必须加上同步代码
3.在多个输出流共享广播变量的情况下,同时配置了公平调度模式,也会产生并发问题。建议在foreachRDD或者transform中使用局部变量进行广播,避免在公平调度模式下不同job之间产生影响。
除了广播变量,累加器也是一样。在Spark流式组件如Spark Streaming底层,每个输出流都会产生一个job,形成一个job集合提交到线程池里并发执行。
以上就是Spark广播变量分析以及如何动态更新广播变量,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。