zookeeper(13)源码分析-请求处理链(3)

发布时间:2020-07-18 10:39:23 作者:shayang88
来源:网络 阅读:245

FinalRequestProcessor是请求处理链中最后的一个处理器。

public class FinalRequestProcessor implements RequestProcessor {
    ZooKeeperServer zks;
}

FinalRequestProcessor只实现了RequestProcessor接口,需要实现process Request方法和shutdown方法。

核心属性为zks,表示Zookeeper服务器,可以通过zks访问到Zookeeper内存数据库。

我们看一下核心方法process Request代码:

同步代码块

synchronized (zks.outstandingChanges) {
            // Need to process local session requests
            // 当前节点,处理请求,若为事务性请求,则提交到ZooKeeper内存数据库中。
            // 对于processTxn函数而言,其最终会调用DataTree的processTxn
            rc = zks.processTxn(request);

            // request.hdr is set for write requests, which are the only ones
            // that add to outstandingChanges.
            //只有写请求才会有消息头
            if (request.getHdr() != null) {
                TxnHeader hdr = request.getHdr();
                Record txn = request.getTxn();
                long zxid = hdr.getZxid();
                //当outstandingChanges不为空且其首元素的zxid小于等于请求的zxid时,
                // 就会一直从outstandingChanges中取出首元素,并且对outstandingChangesForPath做相应的操作
                while (!zks.outstandingChanges.isEmpty()
                       && zks.outstandingChanges.peek().zxid <= zxid) {
                    ChangeRecord cr = zks.outstandingChanges.remove();
                    if (cr.zxid < zxid) {
                        LOG.warn("Zxid outstanding " + cr.zxid
                                 + " is less than current " + zxid);
                    }
                    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                        zks.outstandingChangesForPath.remove(cr.path);
                    }
                }
            }

            // do not add non quorum packets to the queue.
            //判断是否为事务性请求则是通过调用isQuorum函数
            //只将quorum包(事务性请求)添加进队列
            //addCommittedProposal函数将请求添加至ZKDatabase的committedLog结构中
            if (request.isQuorum()) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }

如果请求是ping

根据请求的创建时间来更新Zookeeper服务器的延迟,updateLatency函数中会记录最大延迟、最小延迟、总的延迟和延迟次数。
然后更新响应中的状态,如请求创建到响应该请求总共花费的时间、最后的操作类型等。然后设置响应后返回

case OpCode.ping: {
                //更新延迟
                zks.serverStats().updateLatency(request.createTime);

                lastOp = "PING";
                // 更新响应的状态
                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                        request.createTime, Time.currentElapsedTime());
                // 设置响应
                cnxn.sendResponse(new ReplyHeader(-2,
                        zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
                return;
            }

其他请求与此类似,
最后会根据其他请求再次更新服务器的延迟,设置响应的状态等

// 获取最后处理的zxid
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
// 响应头
        ReplyHeader hdr =
            new ReplyHeader(request.cxid, lastZxid, err.intValue());
// 更新服务器延迟
        zks.serverStats().updateLatency(request.createTime);
                // 更新状态
        cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
                    request.createTime, Time.currentElapsedTime());

最后使用sendResponse函数将响应发送给请求方。

try {
            //返回相应
            cnxn.sendResponse(hdr, rsp, "response");
            if (request.type == OpCode.closeSession) {
                //关闭会话
                cnxn.sendCloseSession();
            }
        } catch (IOException e) {
            LOG.error("FIXMSG",e);
        }
推荐阅读:
  1. Giraph源码分析(一)— 启动ZooKeeper服务
  2. zookeeper(15)源码分析-服务器(2)

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

finalrequestprocess 13 zookeepe

上一篇:ES6中Generator基本使用方法的实例解析

下一篇:golang mail 实现ssl发送邮件

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》