如何进行Spark底层通信RPC源码分析

发布时间:2021-12-16 21:49:53 作者:柒染
来源:亿速云 阅读:159

本篇文章给大家分享的是有关如何进行Spark底层通信RPC源码分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

RPC通信:无论是hadoop2.x的Rpc通信方式还是Spark2.x的Rpc通信方式,简单通俗的来说就是两个进程之间的远程通信,比如java 一个A项目里面有一个class A,里面有一个washA方法一个B项目里面有一个Class B类,里面有一个方法是washB,B项目通过代理模式以及java的反射机制调用到A项目里面的washA,这种情况下就可以理解为是一个简单的Rpc通信方式。

Spark2.x

Spark2.x使用基于RPC的通信方式,去除了1.x的Akka的实现方式,只保留了netty的实现方式,Spark2.x Rpc提供了上层抽象(RpcEndpoint、RpcEnv、RpcEndPointRef),具体的实现方式只要实现了定义的抽象就可以完成Rpc通信,Spark2.x之后目前版本只保留了Netty(NettyRpcEnv、NettyRpcEndpointRef)的实现,定义抽象最大的好处相信开发的朋友都很清楚,以后不管提供了什么方式的实现只要实现了RPCEndpoint,RpcEnv,RpcEndpointRef就可以完成的通信功能。比如自己写一个自己版本的Rpc通信实现。

Spark2.x的Rpc通信方式主要包括一下几个重要方面

RpcEndpoint:消息通信体,主要是用来接收消息、处理消息,实现了RpcEndPoint接口就是一个消息通信体(Master、Work),RpcEndpoint 需要向RpcEnv注册

RpcEnv:Rpc通信的上下文环境,消息发送过来首先经过RpcEnv然后路由给对应的RpcEndPoint,得到RpcEndPoint

RpcEndPointRef:RpcEndPoint的引用如果要想某个RpcEndPoint发送消息,首先要通过RpcEnv得到RpcEndPoint的引用

RpcEndPoint 接口 里面的定义如下

val rpcEnv : RpcEnv //得到RpcEnv对象

final def self: RpcEndpointRef = {//返回一个RpcEnpointRef这个方法通常用来自己给自己发送消息

    rpcEnv.endpointRef(this)

  }

def receive: PartialFunction[Any, Unit]//处理RpcEndPointRef.send 或者RpcEndPointRef.reply方法,该方法不需要进行响应信息

def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]//处理RpcEndPointref.ask发送的消息,处理完之后需要给调用ask的通信端响应消息(reply)

def onError(cause: Throwable)//处理消息失败的时候会调用此方法

def onConnected(remoteAddress: RpcAddress)//远程连接的当前节点的时候触发

def onDisconnected(remoteAddress: RpcAddress)//远程连接断开时候触发

def onNetworkError(cause: Throwable, remoteAddress: RpcAddress)//远程连接发生网络异常时触发

def onStop()//停止RpcEndPoint

def onStart()//启动RpcEndPoint,这里不仅仅是网络上说的启动RpcEndPoint处理任何消息,onStart方法里面很多情况下可以写自己的RpcEndPoint的一些实现比如启动端口,或者创建目录

但是RpcEndPoint只有在onStart方法做一些处理之后 才可以接受RpcEndPointRef发送的消息

private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint//因为receive是并发操作如果要现成安全就是用threadSafeRpcEndPoint

RpcEndPoint的生命周期 构造--> onStart--> receive -->onStop,注意onStart的方法是在调用setRpcEndPoint注册之后就会执行任何RpcEndPoint的onStart方法都是在注册之后执行的

原因后面的源码的提到

RpcEndpointRef:抽象类

  def address: RpcAddress //根据主机名端口返回一个RppAddress

def name: String//name 一个字符串 暂时不知道干嘛的

def send(message: Any): Unit//向RpcEndPoint发送一个消息 不需要返回结果

 def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout) //向RpcEndPoint发送消息并得到返回结果

def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)// 想RpcEndPoint发送消息并在一定时间内返回结果 失败的时候并且进行一定次数的重试

RpcEnv

  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef//传入RpcEndPoint得到RpcEndPointref对象

  def address: RpcAddress//根据主机名端口返回一个RppAddress

def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef//注册RpcEndPoint返回对应的RpcEndPointRef

def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]//通过uri一步获取 RpcEndPointRef

 def stop(endpoint: RpcEndpointRef): Unit//停止RpcEndPoint根据RpcEndPointRef

  def shutdown(): Unit//关闭RpcEndPoint

 def awaitTermination(): Unit//等待RpcEndPoint退出

object RpcEnv

 def create(

      name: String,

      host: String,

      port: Int,

      conf: SparkConf,

      securityManager: SecurityManager,

      clientMode: Boolean = false): RpcEnv = {

    val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)

    new NettyRpcEnvFactory().create(config)

  }

//通过RpcEnvFactory.create创建RpcEnv环境

RpcEnvConfig

private[spark] case class RpcEnvConfig(

    conf: SparkConf,

    name: String,

    host: String,

    port: Int,

    securityManager: SecurityManager,

    clientMode: Boolean)

case类 里面包括SparkConf,name,host,port等

NettyRpcEnv NettyRpcEnv通过NettyRpcEnvFactory的create方法创建

 val nettyEnv =

      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)// 创建nettyEnv

 private val dispatcher: Dispatcher = new Dispatcher(this)

Dispatcher负责RPC消息的路由,它能够将消息路由到对应的RpcEndpoint进行处理,同时存放RpcEndPoint与RpcEndPointRef的映射

NettyStreamManager 负责提供文件服务(文件、JAR文件、目录)

TransportContext负责管理网路传输上下文信息:创建MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer

NettyRpcHandler负责处理网络IO事件,接收RPC调用请求,并通过Dispatcher派发消息

这里说一下Dispatcher 该类主要负责Rpc消息路由 里面有一个内部累EndPointData 但是有一个现成安全的Inbox这里面存放的时候收到的消息,非常重要后面会做具体分析

private class EndpointData(

      val name: String,

      val endpoint: RpcEndpoint,

      val ref: NettyRpcEndpointRef) {

    val inbox = new Inbox(ref, endpoint)

  }

  private val endpoints = new ConcurrentHashMap[String, EndpointData]//存放name->对应的EndPoint的信息

  private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]//存放RpcEndpoint, RpcEndpointRef的映射关系

  private val receivers = new LinkedBlockingQueue[EndpointData]//队列下面会有一个现成不断的从里面取出来处理

 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {

    val addr = RpcEndpointAddress(nettyEnv.address, name)

    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)

    synchronized {

      if (stopped) {

        throw new IllegalStateException("RpcEnv has been stopped")

      }

      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {

        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")

      }

      val data = endpoints.get(name)

      endpointRefs.put(data.endpoint, data.ref)

      receivers.offer(data)  // for the OnStart message

    }

    endpointRef

  }

//注册RpcEndPoint在这里面发生 同时将data put到receivers 

在NettyRpcEndPoint里面有一个threadpool

private val threadpool: ThreadPoolExecutor = {

    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",

      math.max(2, Runtime.getRuntime.availableProcessors()))

    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")

    for (i <- 0 until numThreads) {

      pool.execute(new MessageLoop)

    }

    pool

  }

MessageLoop 是一个实现了Runnable的类,里面的run方法里面不断从receivers取出来进行处理

重要代码  data.inbox.process(Dispatcher.this)

这个里面有一个非常重要的点就是 什么时候调用onStart的方法因为receivers里面存放的是EndPoint的信息 同时创建EndPointData对象

进入Inbox里面看一下

  inbox =>  // Give this an alias so we can use it more clearly in closures.

  @GuardedBy("this")

  protected val messages = new java.util.LinkedList[InboxMessage]()

 inbox.synchronized {

    messages.add(OnStart)

  }

创建这个类的时候会有一个messagelinkedList的list集合 在创建这个结合之后 就会将onStart方法添加到里面,并且是现成安全的

然后process 方法里面会不断的拿到集合的数据来进行对应的操作

 case OnStart =>

            endpoint.onStart()

            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {

              inbox.synchronized {

                if (!stopped) {

                  enableConcurrent = true

                }

              }

            }

这个时候就会调用onStart方法

这个时候相当于RpcEndPoint可以接受消息并且处理了

Spark Rpc通信方式 分为本地消息和远程消息,本地消息相当于调用的方法直接存放到Index(中文收件箱),远程消息需要走NettyRpcHandler

以上就是如何进行Spark底层通信RPC源码分析,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

推荐阅读:
  1. Giraph源码分析(三)—— 消息通信
  2. 怎么进行ActionInvoker源码分析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark rpc

上一篇:怎么分析spark中的shuffle模块

下一篇:python匿名函数怎么创建

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》