zookeeper(14)源码分析-服务器(1)

发布时间:2020-07-22 00:07:07 作者:shayang88
来源:网络 阅读:286

ZooKeeperServer,为所有服务器的父类。
QuorumZooKeeperServer,其是所有参与选举的服务器的父类,是抽象类,其继承了ZooKeeperServer类。
LeaderZooKeeperServer,Leader服务器,继承了QuorumZooKeeperServer类,也会继承ZooKeeperServer中的很多方法。
LearnerZooKeeper,其是Learner服务器的父类,为抽象类,也继承了QuorumZooKeeperServer类。
FollowerZooKeeperServer,Follower服务器,继承了LearnerZooKeeper。
ObserverZooKeeperServer,Observer服务器,继承了LearnerZooKeeper。
ReadOnlyZooKeeperServer,只读服务器,不提供写服务,继承QuorumZooKeeperServer。

ZooKeeperServer

1、类的继承关系

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {}

ZooKeeperServer是ZooKeeper中所有服务器的父类,其实现了Session.Expirer和ServerStats.Provider接口,SessionExpirer中定义了expire方法(表示会话过期)和getServerId方法(表示获取服务器ID),而Provider则主要定义了获取服务器某些数据的方法。

2、类属性

protected static final Logger LOG;

    static {
        LOG = LoggerFactory.getLogger(ZooKeeperServer.class);

        Environment.logEnv("Server environment:", LOG);
    }
    //jmx服务
    protected ZooKeeperServerBean jmxServerBean;
    protected DataTreeBean jmxDataTreeBean;
    // 默认心跳频率
    public static final int DEFAULT_TICK_TIME = 3000;
    protected int tickTime = DEFAULT_TICK_TIME;
    // 最小会话过期时间
    /** value of -1 indicates unset, use default */
    protected int minSessionTimeout = -1;
    // 最大会话过期时间
    /** value of -1 indicates unset, use default */
    protected int maxSessionTimeout = -1;
    protected SessionTracker sessionTracker;
    // 事务日志快照
    private FileTxnSnapLog txnLogFactory = null;
    // Zookeeper内存数据库
    private ZKDatabase zkDb;
    private final AtomicLong hzxid = new AtomicLong(0);
    public final static Exception ok = new Exception("No prob");
    // 请求处理器
    protected RequestProcessor firstProcessor;
    protected volatile State state = State.INITIAL;

    protected enum State {
        INITIAL, RUNNING, SHUTDOWN, ERROR
    }

    /**
     * This is the secret that we use to generate passwords. For the moment,
     * it's more of a checksum that's used in reconnection, which carries no
     * security weight, and is treated internally as if it carries no
     * security weight.
     */
    static final private long superSecret = 0XB3415C00L;

    private final AtomicInteger requestsInProcess = new AtomicInteger(0);
    // 未处理的ChangeRecord
    final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
    // this data structure must be accessed under the outstandingChanges lock
    // 记录path对应的ChangeRecord
    final HashMap<String, ChangeRecord> outstandingChangesForPath =
        new HashMap<String, ChangeRecord>();

    protected ServerCnxnFactory serverCnxnFactory;
    protected ServerCnxnFactory secureServerCnxnFactory;
    // 服务器统计数据
    private final ServerStats serverStats;
    private final ZooKeeperServerListener listener;
    private ZooKeeperServerShutdownHandler zkShutdownHandler;
    private volatile int createSessionTrackerServerId = 1;

3、核心函数

3.1 loadData

该函数用于加载数据,其首先会判断内存库是否已经加载设置zxid,之后会调用killSession函数删除过期的会话

if(zkDb.isInitialized()){ // 内存数据库已被初始化
            // 设置为最后处理的Zxid
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        }
        else {
            // 未被初始化,则加载数据库
            setZxid(zkDb.loadDataBase());
        }

        // Clean up dead sessions
        LinkedList<Long> deadSessions = new LinkedList<Long>();
        for (Long session : zkDb.getSessions()) {// 遍历所有的会话
            if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                deadSessions.add(session);
            }
        }

        for (long session : deadSessions) { // 删除过期的会话
            // XXX: Is lastProcessedZxid really the best thing to use?
            killSession(session, zkDb.getDataTreeLastProcessedZxid());
        }

        // Make a clean snapshot
        //初始化一个快照
        takeSnapshot();

3.2、submitRequest

提交请求,处理器进行处理

public void submitRequest(Request si) {
        if (firstProcessor == null) {// 第一个处理器为空
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    //服务器调用链还未初始化完成
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            // 是否为合法的请求
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                //调用链第一处理器开始处理
                firstProcessor.proce***equest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().proce***equest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

LeaderZooKeeperServer介绍

1、类属性

// 提交请求处理器
CommitProcessor commitProcessor;
//处理链请求第一个处理处理器
PrepRequestProcessor prepRequestProcessor;

2、构造方法

LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
        super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
    }

直接调用父类QuorumZooKeeperServer的构造函数,然后再调用ZooKeeperServer的构造函数,逐级构造。

3、核心函数

3.1、setupRequestProcessors

 @Override
    protected void setupRequestProcessors() {
        //创建FinalRequestProcessor,处理链最后一个处理器
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        //创建ToBeAppliedRequestProcessor
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        // 创建CommitProcessor,提交处理器
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        // 启动CommitProcessor
        commitProcessor.start();
        // 创建ProposalRequestProcessor
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        // 初始化ProposalProcessor
        proposalProcessor.initialize();
        //创建PrepRequestProcessor,作为以第一个处理链处理器
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        // firstProcessor为PrepRequestProcessor
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

        setupContainerManager();
    }

setupRequestProcessors函数表示创建处理链,可以看到其处理链的顺序为PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor。

推荐阅读:
  1. hadoop+hbase+zookeeper完全分布安装(1)
  2. Giraph源码分析(一)— 启动ZooKeeper服务

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

zookeeperserver zookeepe zo

上一篇:2019 Vue开发指南:你都需要学点啥?

下一篇:在AD中设置漫游配置文件与文件夹重定向

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》