zookeeper(9)源码分析-事件监听Watcher(2)

发布时间:2020-07-22 17:51:29 作者:shayang88
来源:网络 阅读:258

接着上一篇文章,继续分析和Watcher相关的类的源码。

ClientWatchManager

public Set<Watcher> materialize(Watcher.Event.KeeperState state,
        Watcher.Event.EventType type, String path);

该接口,只有一个方法,需要实现。该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。

ZKWatchManager

1、ZKWatchManager是ZooKeeper的内部类,实现了ClientWatchManager。
2、ZKWatchManager定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。

static class ZKWatchManager implements ClientWatchManager {
        //数据变化的watchers
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
        //节点存在与否的watchers
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
        //子节点变化的watchers
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();

3、materialize方法

该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。

@Override
        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            //返回结果集合
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) {//事件类型
            case None://无类型
                //添加默认watcher
                result.add(defaultWatcher);
                //根据disableAutoWatchReset和Zookeeper的状态是否为同步连接判断是否需要清空
                boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
                //针对3个不同的watcherMap进行操作
                synchronized(dataWatches) {
                    for(Set<Watcher> ws: dataWatches.values()) {
                        // 添加至结果集合
                        result.addAll(ws);
                    }
                    if (clear) { // 是否需要清空
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }

                return result;
            case NodeDataChanged:// 节点数据变化
            case NodeCreated:// 创建节点
                synchronized (dataWatches) {
                    //移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    //移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            case NodeChildrenChanged: // 节点子节点变化
                synchronized (childWatches) {
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            case NodeDeleted:// 删除节点
                synchronized (dataWatches) {
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn't be needed, but just in case
                synchronized (existWatches) {
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        addTo(list, result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                    }
                }
                synchronized (childWatches) {
                    //移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default:
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }

            return result;
        }
    }
推荐阅读:
  1. Giraph源码分析(一)— 启动ZooKeeper服务
  2. zookeeper(15)源码分析-服务器(2)

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

zkwatchmanager he tc

上一篇:Quartz2D的简单使用概述(一)

下一篇:vmware导入ova文件时遇到的问题

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》