ZooKeeper命令及JavaAPI操作代码分析

发布时间:2023-03-14 16:30:47 作者:iii
来源:亿速云 阅读:91

本文小编为大家详细介绍“ZooKeeper命令及JavaAPI操作代码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“ZooKeeper命令及JavaAPI操作代码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

ZooKeeper数据模型

ZooKeeper服务端常用命令

ZooKeeper客户端命令

使用Curator API操作Zookeeper

建立连接

@Test
public void testConnect() {
    //重试策略
    ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);
    //第一种方式
    CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.130.120:2181", 60 * 1000, 15 * 1000, retry);

    //第二种方式
    CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.130.120:2181")
        .sessionTimeoutMs(60 * 1000)
        .connectionTimeoutMs(15 * 1000)
        .retryPolicy(retry).namespace("hrbu").build();
    //开启连接
    client.start();
}

参数解读

sessionTimeoutMs – session timeout (会话超时时间)
connectionTimeoutMs – connection timeout (连接超时时间)
retryPolicy – retry policy to use (重试策略)

ZooKeeper命令及JavaAPI操作代码分析

会话超时时间和连接超时时间有默认值。

第二种链式编程的方式可以指定一个工作空间,在此客户端下的所有操作都会将此工作空间作为根目录。

注意

如果使用的是云服务器需要将指定端口打开

firewall-cmd --zone=public --add-port=2181/tcp --permanent 开放端口

firewall-cmd --zone=public --list-ports 查看已经开放的端口

systemctl restart firewalld 重启防火墙生效

最后别忘了在服务器的安全组里面添加端口,将2181端口打开

添加节点

@Test
public void testCreate1() throws Exception {
    //基本创建
    CreateBuilder createBuilder = client.create();
    //创建时不指定数据,会将当前客户端ip存到里面
    createBuilder.forPath("/app1");
    //指定数据
    createBuilder.forPath("/app2", "hello".getBytes());
}

@Test
public void testCreate2() throws Exception {
    CreateBuilder createBuilder = client.create();

    //设置节点类型,默认的类型是持久化
    //CreateMode是枚举类型
    createBuilder.withMode(CreateMode.EPHEMERAL).forPath("/app3");
}

@Test
public void testCreate3() throws Exception {
    CreateBuilder createBuilder = client.create();
    //创建多级节点,如果父节点不存在,则创建父节点。
    createBuilder.creatingParentContainersIfNeeded().forPath("/app4/app4_1");
}

查询节点

@Test
public void testGet() throws Exception {
    //查询数据
    byte[] bytes = client.getData().forPath("/app1");
    System.out.println(new String(bytes));

    //查询子节点
    List<String> strings = client.getChildren().forPath("/app4");
    strings.forEach(System.out::println);

    //查询节点状态信息
    Stat stat = new Stat();
    client.getData().storingStatIn(stat).forPath("/app1");
    System.out.println(stat);
}

修改节点

@Test
public void testSet() throws Exception {
    //修改数据
    client.setData().forPath("/app1","hrbu".getBytes());

    //根据版本修改
    int version  = 0;
    Stat stat = new Stat();
    client.getData().storingStatIn(stat).forPath("/app1");
    version = stat.getVersion();

    client.setData().withVersion(version).forPath("/app1", "HRBU".getBytes());
}

删除节点

@Test
public void testDelete() throws Exception {
    //删除单个节点
    client.delete().forPath("/app4/app4_1");

    //删除带有子节点的节点
    client.delete().deletingChildrenIfNeeded().forPath("/app4");

    //强制删除
    client.delete().guaranteed().forPath("/app4");

    //回调
    client.delete().guaranteed().inBackground(new BackgroundCallback() {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            System.out.println("执行删除操作");
        }
    }).forPath("/app4");

}

Watch事件监听

NodeCache

@Test
public void testNodeCache() throws Exception {
    //NodeCache:指定一个节点注册监听器

    //创建NodeCache对象
    final NodeCache nodeCache = new NodeCache(client, "/app1");
    //注册监听
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            System.out.println("app1节点发生变化");

            //获取修改节点后的数据
            byte[] data = nodeCache.getCurrentData().getData();
            System.out.println("变化后的节点:"+new String(data));
        }
    });
    //开启监听,如果为true,则开启则开启监听,加载缓冲数据
    nodeCache.start(true);
}

PathChildrenCache

@Test
public void testPathChildrenCache() throws Exception {
    //PathChildrenCache:监听某个节点的所有子节点
    //创建监听对象
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/hrbu", true);

    //绑定监听器
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        @Override
        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            System.out.println("子节点发生变化");
            System.out.println(pathChildrenCacheEvent);
            //监听子节点的数据变更,并且得到变更后的数据

            //获取类型
            PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();

            //判断类型
            if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                //获取数据
                byte[] data = pathChildrenCacheEvent.getData().getData();
                System.out.println(new String(data));
            }
        }
    });

    //开启
    pathChildrenCache.start();
}

TreeCache

@Test
public void testTreeCache() throws Exception {
    //创建监听器
    TreeCache treeCache = new TreeCache(client, "/");

    //注册监听
    treeCache.getListenable().addListener(new TreeCacheListener() {
        @Override
        public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
            System.out.println("节点发生变化");
            System.out.println(treeCacheEvent);
        }
    });

    //开启
    treeCache.start();

}

分布式锁实现

概述

Zookeeper分布式锁原理

Curator实现分布式锁API

在Curator中有五种锁方案:

package com.hrbu.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class Ticket12306 implements Runnable{

    private int tickets = 10;//数据库的票数

    private InterProcessMutex lock ;


    public Ticket12306(){
        //重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.130.32.75:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .build();
        //开启连接
        client.start();
        lock = new InterProcessMutex(client,"/lock");
    }

    @Override
    public void run() {

        while(true){
            //获取锁
            try {
                lock.acquire(3, TimeUnit.SECONDS);
                if(tickets > 0){
                    System.out.println(Thread.currentThread()+":"+tickets);
                    Thread.sleep(100);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
package com.hrbu.curator;
public class LockTest {
    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();

        //创建客户端
        Thread t1 = new Thread(ticket12306,"携程");
        Thread t2 = new Thread(ticket12306,"飞猪");

        t1.start();
        t2.start();
    }
}

读到这里,这篇“ZooKeeper命令及JavaAPI操作代码分析”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注亿速云行业资讯频道。

推荐阅读:
  1. zookeeper清理日志
  2. (第7篇)灵活易用易维护的hadoop数据仓库工具——Hive

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

zookeeper javaapi

上一篇:Spring populateBean属性赋值和自动注入的方法是什么

下一篇:python线程池ThreadPoolExecutor怎么传单个参数和多个参数

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》