如何用dubbo源码解析export 远程服务

发布时间:2021-11-09 17:12:03 作者:柒染
来源:亿速云 阅读:109

本篇文章给大家分享的是有关如何用dubbo源码解析export 远程服务,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

服务注册信息恢复com.alibaba.dubbo.registry.support.FailbackRegistry#recover

@Override    protected void recover() throws Exception {        // 获取服务注册的url集合        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());        if (!recoverRegistered.isEmpty()) {            if (logger.isInfoEnabled()) {                logger.info("Recover register url " + recoverRegistered);            }            for (URL url : recoverRegistered) {//                保存注册失败的服务注册url                failedRegistered.add(url);            }        }        // 获取订阅的服务url集合        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());        if (!recoverSubscribed.isEmpty()) {            if (logger.isInfoEnabled()) {                logger.info("Recover subscribe url " + recoverSubscribed.keySet());            }            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {                URL url = entry.getKey();                for (NotifyListener listener : entry.getValue()) {//                    添加订阅失败的服务url                    addFailedSubscribed(url, listener);                }            }        }    }

进入这个方法com.alibaba.dubbo.registry.support.FailbackRegistry#register服务注册

@Override    public void register(URL url) {//        添加注册服务url=》        super.register(url);        failedRegistered.remove(url);        failedUnregistered.remove(url);        try {            // Sending a registration request to the server side 向服务器端发送注册请求=》ZookeeperRegistry            doRegister(url);        } catch (Exception e) {            Throwable t = e;            // If the startup detection is opened, the Exception is thrown directly. 如果打开启动检测,则直接抛出异常            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)                    && url.getParameter(Constants.CHECK_KEY, true)                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());            boolean skipFailback = t instanceof SkipFailbackWrapperException;            if (check || skipFailback) {                if (skipFailback) {                    t = t.getCause();                }                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);            } else {                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);            }            // Record a failed registration request to a failed list, retry regularly 将失败的注册请求记录到失败的列表中,定期重试            failedRegistered.add(url);        }    }

进入这个方法com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister进行zk服务注册,这里的逻辑就是创建zk临时节点

@Override    protected void doRegister(URL url) {        try {//            服务注册,创建zk节点,如果dynamic配置的是true,创建的就是zk临时节点=》            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));        } catch (Throwable e) {            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);        }    }

这里默认创建的是临时节点,这也就是zk注册的服务所在节点挂了之后其他客户端节点本地的服务列表会更新的原因,不会调用到不存在的服务,当然也存在zk临时节点删除,通知其他订阅这个节点的客户端时候出现网络抖动,zk会做处理确保一定能通知到,这种中间处理也能要业务逻辑要做处理了

/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo://172.28.84.147:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=76579&side=provider&timestamp=1569898563184 服务注册的zk path是这样的

如果注册失败的话怎么办呢,在创建com.alibaba.dubbo.registry.support.FailbackRegistry#FailbackRegistry对象的时候构造方法逻辑中,重试参数retry.period 默认值是每5秒钟会做重试处理,这里也可以自定义修改

public FailbackRegistry(URL url) {        super(url);//        每5秒中会重试注册失败的服务信息,可以修改这个参数retry.period        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {            @Override            public void run() {                // Check and connect to the registry                try {                    retry();                } catch (Throwable t) { // Defensive fault tolerance                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);                }            }        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);    }

进入这个方法服务注册失败重试逻辑,com.alibaba.dubbo.registry.support.FailbackRegistry#retry

protected void retry() {        if (!failedRegistered.isEmpty()) {            Set<URL> failed = new HashSet<URL>(failedRegistered);            if (failed.size() > 0) {                if (logger.isInfoEnabled()) {                    logger.info("Retry register " + failed);                }                try {                    for (URL url : failed) {                        try {//                            zk服务注册                            doRegister(url);                            failedRegistered.remove(url);                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry 这里异常不做处理等待下次重试                            logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }        if (!failedUnregistered.isEmpty()) {            Set<URL> failed = new HashSet<URL>(failedUnregistered);            if (!failed.isEmpty()) {                if (logger.isInfoEnabled()) {                    logger.info("Retry unregister " + failed);                }                try {                    for (URL url : failed) {                        try {//                            取消服务注册失败重试                            doUnregister(url);                            failedUnregistered.remove(url);                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                            logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }        if (!failedSubscribed.isEmpty()) {            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);            for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {                if (entry.getValue() == null || entry.getValue().size() == 0) {                    failed.remove(entry.getKey());                }            }            if (failed.size() > 0) {                if (logger.isInfoEnabled()) {                    logger.info("Retry subscribe " + failed);                }                try {                    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {                        URL url = entry.getKey();                        Set<NotifyListener> listeners = entry.getValue();                        for (NotifyListener listener : listeners) {                            try {//                                服务订阅失败的进行重新订阅                                doSubscribe(url, listener);                                listeners.remove(listener);                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                                logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);                            }                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }        if (!failedUnsubscribed.isEmpty()) {            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);            for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {                if (entry.getValue() == null || entry.getValue().isEmpty()) {                    failed.remove(entry.getKey());                }            }            if (failed.size() > 0) {                if (logger.isInfoEnabled()) {                    logger.info("Retry unsubscribe " + failed);                }                try {                    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {                        URL url = entry.getKey();                        Set<NotifyListener> listeners = entry.getValue();                        for (NotifyListener listener : listeners) {                            try {//                                取消订阅                                doUnsubscribe(url, listener);                                listeners.remove(listener);                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                                logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);                            }                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }        if (!failedNotified.isEmpty()) {            Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);            for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {                if (entry.getValue() == null || entry.getValue().size() == 0) {                    failed.remove(entry.getKey());                }            }            if (failed.size() > 0) {                if (logger.isInfoEnabled()) {                    logger.info("Retry notify " + failed);                }                try {                    for (Map<NotifyListener, List<URL>> values : failed.values()) {                        for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {                            try {                                NotifyListener listener = entry.getKey();                                List<URL> urls = entry.getValue();                                listener.notify(urls);                                values.remove(listener);                            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                                logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);                            }                        }                    }                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry                    logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);                }            }        }    }

com.alibaba.dubbo.registry.support.FailbackRegistry#failedRegistered对服务注册失败的重新注册

对服务取消注册失败的进行重新取消服务注册com.alibaba.dubbo.registry.support.FailbackRegistry#failedUnregistered,进入这个方法com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister,这里的操作就是删除删除zk临时节点,删除zk临时节点后其他订阅服务的服务节点会收到zk的监听器重新刷新已经生成的代理invoker对象,客户端在进行负载均衡的时候是直接路由到具体的invoker

 @Override    protected void doUnregister(URL url) {        try {            zkClient.delete(toUrlPath(url));        } catch (Throwable e) {            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);        }    }

以上就是如何用dubbo源码解析export 远程服务,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

推荐阅读:
  1. Dubbo是什么?如何使用Dubbo?
  2. dubbo之dubbo协议使用

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

dubbo export

上一篇:python的13个特性分别是什么

下一篇:Django中的unittest应用是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》