zk中FinalRequestProcessor的作用是什么

发布时间:2021-06-21 14:56:04 作者:Leah
来源:亿速云 阅读:161

这篇文章给大家介绍zk中FinalRequestProcessor的作用是什么,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

是处理器最后一个环节

FinalRequestProcessor implements RequestProcessor

处理器链最后一个环节处理事务和非事务请求最后一个环节

构造器

public FinalRequestProcessor(ZooKeeperServer zks) {
    this.zks = zks;
    this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector();
}

处理命令信息

public void processRequest(Request request) {
    LOG.debug("Processing request:: {}", request);

    // request.addRQRec(">final");
    long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
    if (request.type == OpCode.ping) {
        traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
    }
    if (LOG.isTraceEnabled()) {
        ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
    }

    ProcessTxnResult rc = zks.processTxn(request);

    // ZOOKEEPER-558:
    // In some cases the server does not close the connection (e.g., closeconn buffer
    // was not being queued — ZOOKEEPER-558) properly. This happens, for example,
    // when the client closes the connection. The server should still close the session, though.
    // Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
    if (request.type == OpCode.closeSession && connClosedByClient(request)) {
        // We need to check if we can close the session id.
        // Sometimes the corresponding ServerCnxnFactory could be null because
        // we are just playing diffs from the leader.
        if (closeSession(zks.serverCnxnFactory, request.sessionId)
            || closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
            return;
        }
    }

    if (request.getHdr() != null) {
        /*
         * Request header is created only by the leader, so this must be
         * a quorum request. Since we're comparing timestamps across hosts,
         * this metric may be incorrect. However, it's still a very useful
         * metric to track in the happy case. If there is clock drift,
         * the latency can go negative. Note: headers use wall time, not
         * CLOCK_MONOTONIC.
         */
        long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
        if (propagationLatency >= 0) {
            ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
        }
    }

    if (request.cnxn == null) {
        return;
    }
    ServerCnxn cnxn = request.cnxn;

    long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();

    String lastOp = "NA";
    // Notify ZooKeeperServer that the request has finished so that it can
    // update any request accounting/throttling limits
    zks.decInProcess();
    zks.requestFinished(request);
    Code err = Code.OK;
    Record rsp = null;
    String path = null;
    try {
        if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
            /*
             * When local session upgrading is disabled, leader will
             * reject the ephemeral node creation due to session expire.
             * However, if this is the follower that issue the request,
             * it will have the correct error code, so we should use that
             * and report to user
             */
            if (request.getException() != null) {
                throw request.getException();
            } else {
                throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
            }
        }

        KeeperException ke = request.getException();
        if (ke instanceof SessionMovedException) {
            throw ke;
        }
        if (ke != null && request.type != OpCode.multi) {
            throw ke;
        }

        LOG.debug("{}", request);

        if (request.isStale()) {
            ServerMetrics.getMetrics().STALE_REPLIES.add(1);
        }

        switch (request.type) {
        case OpCode.ping: {
            lastOp = "PING";
            updateStats(request, lastOp, lastZxid);

            cnxn.sendResponse(new ReplyHeader(-2, lastZxid, 0), null, "response");
            return;
        }
        case OpCode.createSession: {
            lastOp = "SESS";
            updateStats(request, lastOp, lastZxid);

            zks.finishSessionInit(request.cnxn, true);
            return;
        }
        case OpCode.multi: {
            lastOp = "MULT";
            rsp = new MultiResponse();

            for (ProcessTxnResult subTxnResult : rc.multiResult) {

                OpResult subResult;

                switch (subTxnResult.type) {
                case OpCode.check:
                    subResult = new CheckResult();
                    break;
                case OpCode.create:
                    subResult = new CreateResult(subTxnResult.path);
                    break;
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                    subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
                    break;
                case OpCode.delete:
                case OpCode.deleteContainer:
                    subResult = new DeleteResult();
                    break;
                case OpCode.setData:
                    subResult = new SetDataResult(subTxnResult.stat);
                    break;
                case OpCode.error:
                    subResult = new ErrorResult(subTxnResult.err);
                    if (subTxnResult.err == Code.SESSIONMOVED.intValue()) {
                        throw new SessionMovedException();
                    }
                    break;
                default:
                    throw new IOException("Invalid type of op");
                }

                ((MultiResponse) rsp).add(subResult);
            }

            break;
        }
        case OpCode.multiRead: {
            lastOp = "MLTR";
            MultiOperationRecord multiReadRecord = new MultiOperationRecord();
            ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
            rsp = new MultiResponse();
            OpResult subResult;
            for (Op readOp : multiReadRecord) {
                try {
                    Record rec;
                    switch (readOp.getType()) {
                    case OpCode.getChildren:
                        rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                        subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
                        break;
                    case OpCode.getData:
                        rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                        GetDataResponse gdr = (GetDataResponse) rec;
                        subResult = new GetDataResult(gdr.getData(), gdr.getStat());
                        break;
                    default:
                        throw new IOException("Invalid type of readOp");
                    }
                } catch (KeeperException e) {
                    subResult = new ErrorResult(e.code().intValue());
                }
                ((MultiResponse) rsp).add(subResult);
            }
            break;
        }
        case OpCode.create: {
            lastOp = "CREA";
            rsp = new CreateResponse(rc.path);
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.create2:
        case OpCode.createTTL:
        case OpCode.createContainer: {
            lastOp = "CREA";
            rsp = new Create2Response(rc.path, rc.stat);
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.delete:
        case OpCode.deleteContainer: {
            lastOp = "DELE";
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.setData: {
            lastOp = "SETD";
            rsp = new SetDataResponse(rc.stat);
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.reconfig: {
            lastOp = "RECO";
            rsp = new GetDataResponse(
                ((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(),
                rc.stat);
            err = Code.get(rc.err);
            break;
        }
        case OpCode.setACL: {
            lastOp = "SETA";
            rsp = new SetACLResponse(rc.stat);
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.closeSession: {
            lastOp = "CLOS";
            err = Code.get(rc.err);
            break;
        }
        case OpCode.sync: {
            lastOp = "SYNC";
            SyncRequest syncRequest = new SyncRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
            rsp = new SyncResponse(syncRequest.getPath());
            requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
            break;
        }
        case OpCode.check: {
            lastOp = "CHEC";
            rsp = new SetDataResponse(rc.stat);
            err = Code.get(rc.err);
            break;
        }
        case OpCode.exists: {
            lastOp = "EXIS";
            // TODO we need to figure out the security requirement for this!
            ExistsRequest existsRequest = new ExistsRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
            path = existsRequest.getPath();
            if (path.indexOf('\0') != -1) {
                throw new KeeperException.BadArgumentsException();
            }
            Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
            rsp = new ExistsResponse(stat);
            requestPathMetricsCollector.registerRequest(request.type, path);
            break;
        }
        case OpCode.getData: {
            lastOp = "GETD";
            GetDataRequest getDataRequest = new GetDataRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
            path = getDataRequest.getPath();
            rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
            requestPathMetricsCollector.registerRequest(request.type, path);
            break;
        }
        case OpCode.setWatches: {
            lastOp = "SETW";
            SetWatches setWatches = new SetWatches();
            // TODO We really should NOT need this!!!!
            request.request.rewind();
            ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
            long relativeZxid = setWatches.getRelativeZxid();
            zks.getZKDatabase()
               .setWatches(
                   relativeZxid,
                   setWatches.getDataWatches(),
                   setWatches.getExistWatches(),
                   setWatches.getChildWatches(),
                   cnxn);
            break;
        }
        case OpCode.getACL: {
            lastOp = "GETA";
            GetACLRequest getACLRequest = new GetACLRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
            path = getACLRequest.getPath();
            DataNode n = zks.getZKDatabase().getNode(path);
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }
            zks.checkACL(
                request.cnxn,
                zks.getZKDatabase().aclForNode(n),
                ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path,
                null);

            Stat stat = new Stat();
            List<ACL> acl = zks.getZKDatabase().getACL(path, stat);
            requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath());

            try {
                zks.checkACL(
                    request.cnxn,
                    zks.getZKDatabase().aclForNode(n),
                    ZooDefs.Perms.ADMIN,
                    request.authInfo,
                    path,
                    null);
                rsp = new GetACLResponse(acl, stat);
            } catch (KeeperException.NoAuthException e) {
                List<ACL> acl1 = new ArrayList<ACL>(acl.size());
                for (ACL a : acl) {
                    if ("digest".equals(a.getId().getScheme())) {
                        Id id = a.getId();
                        Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
                        acl1.add(new ACL(a.getPerms(), id1));
                    } else {
                        acl1.add(a);
                    }
                }
                rsp = new GetACLResponse(acl1, stat);
            }
            break;
        }
        case OpCode.getChildren: {
            lastOp = "GETC";
            GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
            path = getChildrenRequest.getPath();
            rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
            requestPathMetricsCollector.registerRequest(request.type, path);
            break;
        }
        case OpCode.getAllChildrenNumber: {
            lastOp = "GETACN";
            GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
            path = getAllChildrenNumberRequest.getPath();
            DataNode n = zks.getZKDatabase().getNode(path);
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }
            zks.checkACL(
                request.cnxn,
                zks.getZKDatabase().aclForNode(n),
                ZooDefs.Perms.READ,
                request.authInfo,
                path,
                null);
            int number = zks.getZKDatabase().getAllChildrenNumber(path);
            rsp = new GetAllChildrenNumberResponse(number);
            break;
        }
        case OpCode.getChildren2: {
            lastOp = "GETC";
            GetChildren2Request getChildren2Request = new GetChildren2Request();
            ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
            Stat stat = new Stat();
            path = getChildren2Request.getPath();
            DataNode n = zks.getZKDatabase().getNode(path);
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }
            zks.checkACL(
                request.cnxn,
                zks.getZKDatabase().aclForNode(n),
                ZooDefs.Perms.READ,
                request.authInfo, path,
                null);
            List<String> children = zks.getZKDatabase()
                                       .getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
            rsp = new GetChildren2Response(children, stat);
            requestPathMetricsCollector.registerRequest(request.type, path);
            break;
        }
        case OpCode.checkWatches: {
            lastOp = "CHKW";
            CheckWatchesRequest checkWatches = new CheckWatchesRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
            WatcherType type = WatcherType.fromInt(checkWatches.getType());
            path = checkWatches.getPath();
            boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
            if (!containsWatcher) {
                String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
                throw new KeeperException.NoWatcherException(msg);
            }
            requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
            break;
        }
        case OpCode.removeWatches: {
            lastOp = "REMW";
            RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
            WatcherType type = WatcherType.fromInt(removeWatches.getType());
            path = removeWatches.getPath();
            boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
            if (!removed) {
                String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
                throw new KeeperException.NoWatcherException(msg);
            }
            requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
            break;
        }
        case OpCode.getEphemerals: {
            lastOp = "GETE";
            GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
            String prefixPath = getEphemerals.getPrefixPath();
            Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
            List<String> ephemerals = new ArrayList<>();
            if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) {
                ephemerals.addAll(allEphems);
            } else {
                for (String p : allEphems) {
                    if (p.startsWith(prefixPath)) {
                        ephemerals.add(p);
                    }
                }
            }
            rsp = new GetEphemeralsResponse(ephemerals);
            break;
        }
        }
    } catch (SessionMovedException e) {
        // session moved is a connection level error, we need to tear
        // down the connection otw ZOOKEEPER-710 might happen
        // ie client on slow follower starts to renew session, fails
        // before this completes, then tries the fast follower (leader)
        // and is successful, however the initial renew is then
        // successfully fwd/processed by the leader and as a result
        // the client and leader disagree on where the client is most
        // recently attached (and therefore invalid SESSION MOVED generated)
        cnxn.sendCloseSession();
        return;
    } catch (KeeperException e) {
        err = e.code();
    } catch (Exception e) {
        // log at error level as we are returning a marshalling
        // error to the user
        LOG.error("Failed to process " + request, e);
        StringBuilder sb = new StringBuilder();
        ByteBuffer bb = request.request;
        bb.rewind();
        while (bb.hasRemaining()) {
            sb.append(Integer.toHexString(bb.get() & 0xff));
        }
        LOG.error("Dumping request buffer: 0x" + sb.toString());
        err = Code.MARSHALLINGERROR;
    }

    ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());

    updateStats(request, lastOp, lastZxid);

    try {
        if (request.type == OpCode.getData && path != null && rsp != null) {
            // Serialized read responses could be cached by the connection object.
            // Cache entries are identified by their path and last modified zxid,
            // so these values are passed along with the response.
            GetDataResponse getDataResponse = (GetDataResponse) rsp;
            Stat stat = null;
            if (getDataResponse.getStat() != null) {
                stat = getDataResponse.getStat();
            }
            cnxn.sendResponse(hdr, rsp, "response", path, stat);
        } else {
            cnxn.sendResponse(hdr, rsp, "response");
        }
        if (request.type == OpCode.closeSession) {
            cnxn.sendCloseSession();
        }
    } catch (IOException e) {
        LOG.error("FIXMSG", e);
    }
}

关于zk中FinalRequestProcessor的作用是什么就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

推荐阅读:
  1. ZK Studio 1.0.2的特性是什么
  2. flink中zk引起的重启怎么解决

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

zk

上一篇:如何解决redis分布式锁的问题

下一篇:HashMap的扩容操作有什么作用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》