Hadoop中RPC机制分析Server端

发布时间:2021-12-09 16:05:19 作者:iii
来源:亿速云 阅读:149

这篇文章主要介绍“Hadoop中RPC机制分析Server端”,在日常操作中,相信很多人在Hadoop中RPC机制分析Server端问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Hadoop中RPC机制分析Server端”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

1. Server.Listener

RPC Client 端的 RPC 请求发送到 Server 端后, 首先由 Server.Listener 接收

Server.Listener 类继承自 Thread 类, 监听了 OP_READ 和 OP_ACCEPT 事件

Server.Listener 接收 RPC 请求, 在 Server.Listener.doRead() 方法中读取数据, 在 doRead() 方法中又调用了Server.Connection.readAndProcess() 方法, 

最后会调用 Server.Connection.processRpcRequest() 方法, 源码如下:

private void processRpcRequest(RpcRequestHeaderProto header,
        DataInputStream dis) throws WrappedRpcServerException,
        InterruptedException {
      ...
      Writable rpcRequest;
      // 从成员变量dis中反序列化出Client端发送来的RPC请求( WritableRpcEngine.Invocation对象 )
      try { //Read the rpc request
        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
        rpcRequest.readFields(dis);
      } catch (Throwable t) { // includes runtime exception from newInstance
        ...
      }
      // 构造Server端Server.Call实例对象
      Call call = new Call(header.getCallId(), header.getRetryCount(),
          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
              .getClientId().toByteArray());
      // 将Server.Call实例对象放入调用队列中
      callQueue.put(call);              // queue the call; maybe blocked here
      incRpcCount();  // Increment the rpc count
    }

调用队列 callQueue 是 Server 的成员变量, Server.Listener 和 Server.Handler 是典型的生产者, 消费者模型, 

Server.Listener( 生产者 )的doRead()方法最终调用Server.Connection.processRpcRequest() 方法, 

而Server.Handler( 消费者 )处理RPC请求

2. Server.Handler 继承 Thread 类, 其主要工作是处理 callQueue 中的调用, 都在 run() 方法中完成. 在 run() 的主循环中, 每次处理一个从 callQueue 中出队的请求, Server.call() 是一个抽象方法, 实际是调用了 RPC.Server.call()方法, 最后通过 WritableRPCEngine.call() 方法完成 Server 端方法调用

/** Handles queued calls . */
  private class Handler extends Thread {
    ...
    @Override
    public void run() {
      ...
      ByteArrayOutputStream buf = 
        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
      while (running) {
          ...
          final Call call = callQueue.take();    // 获取一个RPC调用请求
          ...
          Writable value = null;

          value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() {
                     @Override
                     public Writable run() throws Exception {
                       // 调用RPC.Server.call()方法
                       // call.rpcKind : RPC调用请求的类型, 一般为Writable
                       // call.connection.protocolName : RPC协议接口的类名
                       // call.rpcRequest : Invocation实例对象, 包括方法名, 参数列表, 参数列表的Class对象数组
                       // call.timestamp : 调用时间戳
                       return call(call.rpcKind, call.connection.protocolName, 
                                   call.rpcRequest, call.timestamp);
                     }
                   });
      }
      ...
    }
}

RPC.Server.call() 方法如下:

@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
	Writable rpcRequest, long receiveTime) throws Exception {
  return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
	  receiveTime);
}

最后通过 WritableRPCEngine.call() 方法完成 Server 端方法调用, 代码如下:

@Override
public Writable call(org.apache.hadoop.ipc.RPC.Server server,
	  String protocolName, Writable rpcRequest, long receivedTime)
	  throws IOException, RPC.VersionMismatch {

	Invocation call = (Invocation)rpcRequest;	// 将RPC请求强制转成WritableRpcEngine.Invocation对象
	...

	long clientVersion = call.getProtocolVersion();
	final String protoName;
	ProtoClassProtoImpl protocolImpl;	// Server端RPC协议接口的实现类的实例对象
	...
	// Invoke the protocol method
    try {
	  ...
	  // 获取RPC请求中调用的方法对象Method
	  Method method = 
		  protocolImpl.protocolClass.getMethod(call.getMethodName(),
		  call.getParameterClasses());
	  method.setAccessible(true);
	  ...
	  // 在Server端RPC协议接口的实现类的实例对象 protocolImpl 上调用具体的方法
	  Object value = 
		  method.invoke(protocolImpl.protocolImpl, call.getParameters());
	  ...
	  // 调用正常结束, 返回调用结果
	  return new ObjectWritable(method.getReturnType(), value);

	} catch (InvocationTargetException e) {	// 调用出现异常, 用IOException包装异常, 最后抛出该异常
	  Throwable target = e.getTargetException();
	  if (target instanceof IOException) {
		throw (IOException)target;
	  } else {
		IOException ioe = new IOException(target.toString());
		ioe.setStackTrace(target.getStackTrace());
		throw ioe;
	  }
	} catch (Throwable e) {
	  ...
	}
  }
}

在 WritableRpcEngine.call() 方法中, 传入的 rpcRequest 会被强制转换成 WritableRpcEngine.Invocation 类型的对象 call , 并通过 call 这个对象包含的方法名(getMethodName()方法)和参数列表的 Class对象数组(getParameterClasses())获取 Method 对象, 最终通过 Method 对象的invoke() 方法, 调用实现类的实例对象 protocolImpl 上的方法, 完成 Hadoop 的远程过程调用

好了, 现在 Server 端的具体方法已经被调用了, 调用结果分两种情况:

    1) 调用正常结束, 则将方法的返回值和调用结果封装成一个 ObjectWritable 类型的对象, 并返回

    2) 调用出现异常, 抛出 IOException 类型的异常

3. Server.Responder

这个类的功能: 发送 Hadoop 远程过程调用的应答给 Client 端, Server.Responder 类继承自 Thread 类, 监听了 OP_WRITE 事件, 即通道可写.  具体细节写不下去了

到此,关于“Hadoop中RPC机制分析Server端”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. HBase客户端Rpc的重试机制以及客户端参数优化。
  2. Hadoop的RPC

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

hadoop rpc server

上一篇:css3动画类型有几种

下一篇:css中如何隐藏dom元素

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》