zookeeper(12)源码分析-请求处理链(2)

发布时间:2020-06-30 11:05:29 作者:shayang88
来源:网络 阅读:193

SyncRequestProcessor,该处理器将请求存入磁盘,其将请求批量的存入磁盘以提高效率,请求在写入磁盘之前是不会被转发到下个处理器的。

类的核心属性

SyncRequestProcessor维护了ZooKeeperServer实例,其用于获取ZooKeeper的数据库和其他信息;维护了一个处理请求的队列,其用于存放请求;维护了一个处理快照的线程,用于处理快照;维护了一个running标识,标识SyncRequestProcessor是否在运行;同时还维护了一个等待被刷新到磁盘的请求队列。

// Zookeeper服务器
    private final ZooKeeperServer zks;
    // 请求队列
    private final LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();
    // 下个处理器
    private final RequestProcessor nextProcessor;
    // 快照处理线程
    private Thread snapInProcess = null;
    // 是否在运行中
    volatile private boolean running;

    /**
     * Transactions that have been written and are waiting to be flushed to
     * disk. Basically this is the list of SyncItems whose callbacks will be
     * invoked after flush returns successfully.
     */
    // 等待被刷新到磁盘的请求队列
    private final LinkedList<Request> toFlush = new LinkedList<Request>();
    // 随机数生成器
    private final Random r = new Random();
    /**
     * The number of log entries to log before starting a snapshot
     */
    // 快照个数
    private static int snapCount = ZooKeeperServer.getSnapCount();
    // 结束请求标识
    private final Request requestOfDeath = Request.requestOfDeath;

构造函数

构造函数首先会调用父类的构造函数,然后根据构造函数参数给类的属性赋值,其中会确定下个处理器,并会设置该处理器正在运行的标识。

public SyncRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("SyncThread:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        running = true;
    }

核心方法

1、run

@Override
    public void run() {
        try {
            // 写日志数量初始化为0
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            // 防止集群中所有机器在同一时刻进行数据快照,对是否进行数据快照增加随机因素
            int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
                // 没有需要刷新到磁盘的请求
                if (toFlush.isEmpty()) {
                    // 从请求队列中取出一个请求,若queuedRequests队列为空会阻塞
                    si = queuedRequests.take();
                } else {
                    // 从请求队列中取出一个请求,若queuedRequests队列为空,则返回空,不会阻塞
                    si = queuedRequests.poll();
                    // 取出的请求为空
                    if (si == null) {
                        // 刷新数据磁盘
                        flush(toFlush);
                        continue;
                    }
                }
                // 在关闭处理器之后,会添加requestOfDeath请求到queuedRequests队列,表示关闭后不再处理请求
                if (si == requestOfDeath) {
                    break;
                }
                // 请求不为空,处理请求
                if (si != null) {
                    // track the number of records written to the log
                    // 将写请求添加至事务日志文件 FileTxnSnapLog.append(si)
                    if (zks.getZKDatabase().append(si)) {
                        // 日志写入,logCount加1
                        logCount++;
                                                //确定是否需要进行数据快照
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            // 滚动日志,从当前日志文件滚到下一个日志文件,不是回滚
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {  // 正在进行快照
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                // 创建线程来处理快照
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                // 进行快照
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                // 开始快照线程处理
                                snapInProcess.start();
                            }
                            // 重置为0
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {// 读请求会走到这里,查看此时toFlush是否为空,如果为空,说明近段时间读多写少,直接响应
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            // 下个处理器开始处理请求
                            nextProcessor.proce***equest(si);
                            // 处理器是Flushable的,刷新数据到磁盘
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    // 将请求添加至被刷新至磁盘队列
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {// 队列大小大于1000,直接刷新到磁盘
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        } finally{
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }

2、flush

flush将toFlush队列中的请求刷新到磁盘中。

 private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException
    {
        if (toFlush.isEmpty())
            return;
        // 提交事务至ZK数据库
        zks.getZKDatabase().commit();
        while (!toFlush.isEmpty()) {
            // 从队列移除请求
            Request i = toFlush.remove();
            // 下个处理器开始处理请求
            if (nextProcessor != null) {
                nextProcessor.proce***equest(i);
            }
        }
        if (nextProcessor != null && nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();
        }
    }

3、shutdown

函数用于关闭SyncRequestProcessor处理器,其首先会在queuedRequests队列中添加一个结束请求requestOfDeath,然后再判断SyncRequestProcessor是否还在运行,若是,则会等待其结束;之后判断toFlush队列是否为空,若不为空,则刷新到磁盘中

public void shutdown() {
        LOG.info("Shutting down");
        // 添加结束请求请求至队列
        queuedRequests.add(requestOfDeath);
        try {
            // 还在运行
            if(running){
                this.join();// 等待该线程终止
            }
            if (!toFlush.isEmpty()) {// 队列不为空,刷新到磁盘
                flush(toFlush);
            }
        } catch(InterruptedException e) {
            LOG.warn("Interrupted while wating for " + this + " to finish");
        } catch (IOException e) {
            LOG.warn("Got IO exception during shutdown");
        } catch (RequestProcessorException e) {
            LOG.warn("Got request processor exception during shutdown");
        }
        if (nextProcessor != null) {
            nextProcessor.shutdown();
        }
    }
推荐阅读:
  1. Giraph源码分析(一)— 启动ZooKeeper服务
  2. zookeeper(15)源码分析-服务器(2)

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

syncrequestprocesso zookeepe zo

上一篇:HTTPS和HTTP的区别:

下一篇:idea搭建python的方法步骤

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》