ES学习笔记之health api的实现

发布时间:2020-10-11 13:32:34 作者:sbp810050504
来源:网络 阅读:931

使用health api可以查看es集群的健康度。 health api的用法如下:

curl 'http://localhost:9200/_cluster/health' 

health api的返回值中有一个核心的字段statusstatus 有3种取值: green, yellow, red。分别代表集群的3种状态: 主分片和副本都已经分配,主分片已经分配副本分片没有,主分片和副本都都没有分配。

也就是说, health api关注的核心在于数据的高可用。

看health api的实现:
请求会路由到master节点,然后读取clusterState中的routing_table.

基于routing_table, 判断索引的各个分片状态

    public ClusterShardHealth(int shardId, final IndexShardRoutingTable shardRoutingTable) {
        this.shardId = shardId;
        for (ShardRouting shardRouting : shardRoutingTable) {
            if (shardRouting.active()) {
                activeShards++;
                if (shardRouting.relocating()) {
                    // the shard is relocating, the one it is relocating to will be in initializing state, so we don't count it
                    relocatingShards++;
                }
                if (shardRouting.primary()) {
                    primaryActive = true;
                }
            } else if (shardRouting.initializing()) {
                initializingShards++;
            } else if (shardRouting.unassigned()) {
                unassignedShards++;
            }
        }
        if (primaryActive) {
            if (activeShards == shardRoutingTable.size()) {
                status = ClusterHealthStatus.GREEN;
            } else {
                status = ClusterHealthStatus.YELLOW;
            }
        } else {
            status = ClusterHealthStatus.RED;
        }
    }

基于分片,决定索引的状态

    public ClusterIndexHealth(IndexMetaData indexMetaData, IndexRoutingTable indexRoutingTable) {
        this.index = indexMetaData.getIndex();
        this.numberOfShards = indexMetaData.getNumberOfShards();
        this.numberOfReplicas = indexMetaData.getNumberOfReplicas();
        this.validationFailures = indexRoutingTable.validate(indexMetaData);

        for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
            int shardId = shardRoutingTable.shardId().id();
            shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable));
        }

        // update the index status
        status = ClusterHealthStatus.GREEN;

        for (ClusterShardHealth shardHealth : shards.values()) {
            if (shardHealth.isPrimaryActive()) {
                activePrimaryShards++;
            }
            activeShards += shardHealth.getActiveShards();
            relocatingShards += shardHealth.getRelocatingShards();
            initializingShards += shardHealth.getInitializingShards();
            unassignedShards += shardHealth.getUnassignedShards();

            if (shardHealth.getStatus() == ClusterHealthStatus.RED) {
                status = ClusterHealthStatus.RED;
            } else if (shardHealth.getStatus() == ClusterHealthStatus.YELLOW && status != ClusterHealthStatus.RED) {
                // do not override an existing red
                status = ClusterHealthStatus.YELLOW;
            }
        }
        if (!validationFailures.isEmpty()) {
            status = ClusterHealthStatus.RED;
        } else if (shards.isEmpty()) { // might be since none has been created yet (two phase index creation)
            status = ClusterHealthStatus.RED;
        }
    }

基于索引,决定集群的状态。

    public ClusterStateHealth(ClusterState clusterState, String[] concreteIndices) {
        RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
        validationFailures = validation.failures();
        numberOfNodes = clusterState.nodes().size();
        numberOfDataNodes = clusterState.nodes().dataNodes().size();

        for (String index : concreteIndices) {
            IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
            IndexMetaData indexMetaData = clusterState.metaData().index(index);
            if (indexRoutingTable == null) {
                continue;
            }

            ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetaData, indexRoutingTable);

            indices.put(indexHealth.getIndex(), indexHealth);
        }

        status = ClusterHealthStatus.GREEN;

        for (ClusterIndexHealth indexHealth : indices.values()) {
            activePrimaryShards += indexHealth.getActivePrimaryShards();
            activeShards += indexHealth.getActiveShards();
            relocatingShards += indexHealth.getRelocatingShards();
            initializingShards += indexHealth.getInitializingShards();
            unassignedShards += indexHealth.getUnassignedShards();
            if (indexHealth.getStatus() == ClusterHealthStatus.RED) {
                status = ClusterHealthStatus.RED;
            } else if (indexHealth.getStatus() == ClusterHealthStatus.YELLOW && status != ClusterHealthStatus.RED) {
                status = ClusterHealthStatus.YELLOW;
            }
        }

        if (!validationFailures.isEmpty()) {
            status = ClusterHealthStatus.RED;
        } else if (clusterState.blocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE)) {
            status = ClusterHealthStatus.RED;
        }

        // shortcut on green
        if (status.equals(ClusterHealthStatus.GREEN)) {
            this.activeShardsPercent = 100;
        } else {
            List<ShardRouting> shardRoutings = clusterState.getRoutingTable().allShards();
            int activeShardCount = 0;
            int totalShardCount = 0;
            for (ShardRouting shardRouting : shardRoutings) {
                if (shardRouting.active()) activeShardCount++;
                totalShardCount++;
            }
            this.activeShardsPercent = (((double) activeShardCount) / totalShardCount) * 100;
        }
    }

理解health api, 需要理解clusterState。 好在这些都是只读的信息,不难理解。

推荐阅读:
  1. ES源码学习之--Get API的实现逻辑
  2. ES学习笔记之-Translog实现机制的理解

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

elasticsearch es health api

上一篇:Java 图片与byte数组互相转换实例

下一篇:C++实现简单射击小游戏

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》