网站建设单页/seo软件推荐
Java学习+面试指南:https://javaxiaobear.cn
ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有:
-
ZooKeeper官方的Java客户端API。
-
第三方的Java客户端API,比如Curator。
ZooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下:
-
ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。
-
会话超时之后没有实现重连机制。
-
异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。
-
仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
-
创建节点时如果抛出异常,需要自行检查节点是否存在。
-
无法实现级联删除。
总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。
1、Zookeeper 原生Java客户端使用
1、搭建服务
1、创建一个maven项目,引入zookeeper client依赖
<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.3</version>
</dependency>
注意:保持与服务端版本一致,不然会有很多兼容性的问题
2、学习API
ZooKeeper原生客户端主要使用org.apache.zookeeper.ZooKeeper这个类来使用ZooKeeper服务。
ZooKeeper常用构造器
ZooKeeper (connectString, sessionTimeout, watcher)
-
connectString:使用逗号分隔的列表,每个ZooKeeper节点是一个host.port对,host 是机器名或者IP地址,port是ZooKeeper节点对客户端提供服务的端口号。客户端会任意选取connectString 中的一个节点建立连接。
-
sessionTimeout : session timeout时间。
-
watcher:用于接收到来自ZooKeeper集群的事件。
使用 zookeeper 原生 API,连接zookeeper集群
3、代码连接客户端
public class ZkClientDemo {private static final String CONNECT_STR = "localhost:2181";/*** 连接地址*/private final static String CLUSTER_CONNECT_STR="ip:2181";public static void main(String[] args) throws Exception {final CountDownLatch countDownLatch=new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 4000, watchedEvent -> {if(Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()){//如果收到了服务端的响应事件,连接成功countDownLatch.countDown();System.out.println("连接建立");}});System.out.println("连接中");countDownLatch.await();//CONNECTEDSystem.out.println(zooKeeper.getState());Stat stat = zooKeeper.exists("/zk",false);if(null ==stat){//创建持久节点zooKeeper.create("/zk","javaxiaobear".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}//持久监听zooKeeper.addWatch("/zk",new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println(event);}},AddWatchMode.PERSISTENT);Thread.sleep(Integer.MAX_VALUE);}
}
2、Zookeeper主要方法
-
create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。
-
delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
-
exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
-
getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
-
setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。
-
getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
-
sync(path):把客户端 session 连接节点和 leader 节点进行同步。
方法特点:
-
所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。
-
所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新,这样的更新是条件更新。
-
所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响 应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来 自服务端的响应。
1、连接Linux服务器
ZooKeeper 类通过其构造函数提供连接功能。构造函数的签名如下:
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
- connectionString:ZooKeeper集群主机,可以为集群,也可为单个。
- sessionTimeout:会话超时(以毫秒为单位)。
- watcher:一个实现“Watcher”接口的对象。ZooKeeper集群通过Watcher对象返回连接状态。
- 让我们创建ZooKeeperConnection类并添加一个方法connect。该连接方法创建一个ZooKeeper对象,所连接到的ZooKeeper集群,然后返回该对象。在这里,CountDownLatch用于停止(等待)主进程,直到客户端与ZooKeeper集群连接为止。 ZooKeeper集群通过Watcher回调返回连接状态。客户端与ZooKeeper集群连接后,将调用Watcher回调,并且Watcher回调调用CountDownLatch的countDown方法释放锁,并在主进程中等待。
代码实现如下:
public class ZkClientDemo {private static final String CONNECT_STR = "localhost:2181";/*** 连接地址*/private final static String CLUSTER_CONNECT_STR="ip:2181";public static void main(String[] args) throws Exception {final CountDownLatch countDownLatch=new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 4000, watchedEvent -> {System.out.println(watchedEvent);if(Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()){//如果收到了服务端的响应事件,连接成功countDownLatch.countDown();System.out.println("连接建立");}});System.out.println("连接中");countDownLatch.await();//CONNECTEDSystem.out.println(zooKeeper.getState());}
}
**注意:**如果连接,请检查一下是否开启了防火墙,服务器2181端口是否放开了
2、创建节点
create基础的方法用法如下
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
- path:Znode路径。例如:/javaxiaobear,/javaxiaobear/good
- data:数据存储在指定的Znode点路径
- acl:要创建的节点的访问控制列表。ZooKeeper API提供了一个静态接口ZooDefs.Ids,以获取一些基本的ACL列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE返回打开的znode的acl列表。
- createMode:节点的类型,临时的,序列的或两者兼而有之。这是一个枚举。
代码实现如下:
/*** 创建节点* @param parentPath 父节点路径* @param nodeName 节点名称* @param data 数据*/
public void createNode(String parentPath, String nodeName, String data) throws Exception{zooKeeper.create(parentPath + "/" + nodeName ,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
3、判断节点是否存在
ZooKeeper类提供了exists方法来检查znode的存在。如果指定的znode存在,它将返回znode的元数据。exists方法的用法如下:
exists(String path, boolean watcher)
参数:
- path:节点路径
- watcher:布尔值,指定是否要观看特定Z序节点或不看
代码实现如下:
/*** 判断某个节点是否存在* @param path 节点路径* @throws Exception 异常*/
public Stat existsNode(String path) throws Exception{return zooKeeper.exists(path, true);
}
4、获取节点数据
ZooKeeper类提供了getData方法来获取附加到指定znode中的数据及其状态。getData方法的用法如下:
getData(String path, Watcher watcher, Stat stat)
参数:
- path:Znode路径。
- watcher:类型为Watcher的回调函数。当指定的znode的数据发生更改时,ZooKeeper集群将通过Watcher回调进行通知。这是一次性通知。
- stat:返回Znode点的元数据。
代码实现如下:
/*** 获取节点数据* @param path 路径* @return* @throws Exception*/
public String getData(String path) throws Exception{byte[] data = zooKeeper.getData(path, false, null);return new String(data);
}
5、修改节点
ZooKeeper类提供setData方法来修改附加到指定znode中的数据。setData方法的用法如下:
setData(String path, byte[] data, int version)
参数:
- path:Znode路径
- data:数据存储在指定的Znode点的路径。
- version:znode的当前版本。每当更改数据时,ZooKeeper都会更新znode的版本号。
代码实现如下:
/*** 设置节点数据* @param path 路径* @param data 数据* @throws Exception*/
public void setData(String path, String data) throws Exception{zooKeeper.setData(path, data.getBytes(), zooKeeper.exists(path, true).getVersion());
}
6、获取子节点数据
ZooKeeper类提供了getChildren方法来获取特定znode的所有子节点。getChildren方法的用法如下:
getChildren(String path, Watcher watcher)
参数:
- path:Znode路径。
- watcher:类型为Watcher的回调函数。当指定的znode的数据发生更改时,ZooKeeper集群将通过Watcher回调进行通知。这是一次性通知。
代码实现如下:
/*** 获取路径下的子节点* @param path 路径* @throws Exception*/public void getChildren(String path) throws Exception{//首先判断节点是否存在Stat exists = zooKeeper.exists(path, true);if (null != exists){List<String> children = zooKeeper.getChildren(path, false);for (String child : children) {System.out.println(child);}}}
7、删除节点
ZooKeeper类提供了delete方法来删除指定的znode。delete方法的用法如下:
delete(String path, int version)
参数:
- path:Znode路径。
- version:znode的当前版本。
代码实现如下:
/*** 删除节点* @param path* @throws Exception*/
public void deleteNode(String path) throws Exception{zooKeeper.delete(path, -1);
}
8、监听节点
/*** 监听节点* @param path 路径* @throws Exception*/
public void addWatch(String path) throws Exception{zooKeeper.addWatch(path, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println(watchedEvent.getState());}}, AddWatchMode.PERSISTENT);
}
9、全部代码实现
public class ZkBaseOperations {private static final String ZOOKEEPER_ADDRESS = "ip:2181";private static final int SESSION_TIMEOUT = 3000;private ZooKeeper zooKeeper;public ZkBaseOperations() {try {CountDownLatch connectionLatch = new CountDownLatch(1);zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, event -> {if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {connectionLatch.countDown();System.out.println("连接成功!");}});} catch (Exception e) {e.printStackTrace();}}/*** 关闭连接*/public void close() {if (zooKeeper != null) {try {zooKeeper.close();} catch (InterruptedException e) {e.printStackTrace();}}}/*** 创建节点* @param parentPath 父节点路径* @param nodeName 节点名称* @param data 数据*/public void createNode(String parentPath, String nodeName, String data) throws Exception{zooKeeper.create(parentPath + "/" + nodeName ,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}/*** 判断某个节点是否存在* @param path 节点路径* @throws Exception 异常*/public Stat existsNode(String path) throws Exception{return zooKeeper.exists(path, true);}/*** 获取节点数据* @param path 路径* @return* @throws Exception*/public String getData(String path) throws Exception{byte[] data = zooKeeper.getData(path, false, null);return new String(data);}/*** 设置节点数据* @param path 路径* @param data 数据* @throws Exception*/public void setData(String path, String data) throws Exception{zooKeeper.setData(path, data.getBytes(), zooKeeper.exists(path, true).getVersion());}/*** 获取路径下的子节点* @param path 路径* @throws Exception*/public void getChildren(String path) throws Exception{//首先判断节点是否存在Stat exists = zooKeeper.exists(path, true);if (null != exists){List<String> children = zooKeeper.getChildren(path, false);for (String child : children) {System.out.println(child);}}}/*** 删除节点* @param path* @throws Exception*/public void deleteNode(String path) throws Exception{zooKeeper.delete(path, -1);}public static void main(String[] args) {ZkBaseOperations zookeeper = new ZkBaseOperations();try {//创建节点zookeeper.createNode("/javaxiaobear", "666","javaxiaobear is a good website");//判断阶段是否存在Stat stat = zookeeper.existsNode("/javaxiaobear/666");if(stat != null) {System.out.println("javaxiaobear Node exists and the node version is " +stat.getVersion());} else {System.out.println("Node does not exists");}//获取节点数据String data = zookeeper.getData("/javaxiaobear/666");System.out.println("获取节点的数据为:" + data);//设置节点数据zookeeper.setData("/javaxiaobear", "666");//获取子节点zookeeper.getChildren("/javaxiaobear");//删除节点zookeeper.deleteNode("/javaxiaobear");zookeeper.close();} catch (Exception e) {throw new RuntimeException(e);}}
}
2、zkClient开源客户端
ZkClient是Github上⼀个开源的zookeeper客户端,在Zookeeper原⽣API接⼝之上进⾏了包装,是⼀个更易⽤的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能。
1、如何使用
**添加依赖:**在pom.xml⽂件中添加如下内容
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version>
</dependency>
2、创建会话
使⽤ZkClient可以轻松的创建会话,连接到服务端。
public class CreateSession {/*** 连接地址*/private final static String CLUSTER_CONNECT_STR="ip:2181";public static void main(String[] args) {ZkClient zkClient = new ZkClient(CLUSTER_CONNECT_STR);System.out.println("zkClient 连接成功");}
}
运⾏结果:zkClient 连接成功。结果表明已经成功创建会话。
3、创建节点
ZkClient提供了递归创建节点的接⼝,即其帮助开发者先完成⽗节点的创建,再创建⼦节点
public class Create_Node_Sample {public static void main(String[] args) {ZkClient zkClient = new ZkClient("ip:2181");//createParents的值设置为true,可以递归创建节点zkClient.createPersistent("/javaxiaobear",true);System.out.println("创建节点成功");}
}
运⾏结果:success create znode.
结果表明已经成功创建了节点,值得注意的是,在原⽣态接⼝中是⽆法创建成功的(⽗节点不存在),但是通过ZkClient通过设置createParents参数为true可以递归的先创建⽗节点,再创建⼦节点
4、删除节点
ZkClient提供了递归删除节点的接⼝,即其帮助开发者先删除所有⼦节点(存在),再删除⽗节点。
public class DeleteNode {public static void main(String[] args) {ZkClient zkClient = new ZkClient("ip:2181");zkClient.deleteRecursive("/javaxiaobear");System.out.println("删除节点成功");}
}
运⾏结果: 删除节点成功
结果表明ZkClient可直接删除带⼦节点的⽗节点,因为其底层先删除其所有⼦节点,然后再删除⽗节点
5、获取⼦节点
public class Get_Children_Sample {public static void main(String[] args) throws Exception {ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);List<String> children = zkClient.getChildren("/javaxiaobear");System.out.println(children);//注册监听事件zkClient.subscribeChildChanges(path, new IZkChildListener() {public void handleChildChange(String parentPath, List<String>currentChilds) throws Exception {System.out.println(parentPath + " 's child changed,currentChilds:" + currentChilds);}});zkClient.createPersistent("/javaxiaobear");Thread.sleep(1000);zkClient.createPersistent("/javaxiaobear/c1");Thread.sleep(1000);zkClient.delete("/javaxiaobear/c1");Thread.sleep(1000);zkClient.delete(path);Thread.sleep(Integer.MAX_VALUE);}
}
运⾏结果:
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null
结果表明:客户端可以对⼀个不存在的节点进⾏⼦节点变更的监听。⼀旦客户端对⼀个节点注册了⼦节点列表变更监听之后,那么当该节点的⼦节点列表发⽣变更时,服务端都会通知客户端,并将最新的⼦节点列表发送给客户端,该节点本身的创建或删除也会通知到客户端。
6、获取数据(节点是否存在、更新、删除)
public class Get_Data_Sample {public static void main(String[] args) throws InterruptedException {String path = "/javaxiaobear-zk";ZkClient zkClient = new ZkClient("127.0.0.1:2181");//判断节点是否存在boolean exists = zkClient.exists(path);if (!exists){zkClient.createEphemeral(path, "123");}//注册监听zkClient.subscribeDataChanges(path, new IZkDataListener() {public void handleDataChange(String path, Object data) throwsException {System.out.println(path+"该节点内容被更新,更新后的内容"+data);}public void handleDataDeleted(String s) throws Exception {System.out.println(s+" 该节点被删除");}});//获取节点内容Object o = zkClient.readData(path);System.out.println(o);//更新zkClient.writeData(path,"4567");Thread.sleep(1000);//删除zkClient.delete(path);Thread.sleep(1000);}
}
运⾏结果:
123
/javaxiaobear-zk该节点内容被更新,更新后的内容4567
/javaxiaobear-zk 该节点被删除
3、Curator开源客户端使用
Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。
Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。
Guava is to Java that Curator to ZooKeeper
在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。
官网:https://curator.apache.org/
1、如何使用
引入依赖
Curator 包含了几个包:
-
curator-framework是对ZooKeeper的底层API的一些封装。
-
curator-client提供了一些客户端的操作,例如重试策略等。
-
curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency><!--curator-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
2、常用API
1、创建一个客户端实例
在使用curator-framework包操作ZooKeeper前,首先要创建一个客户端实例。这是一个CuratorFramework类型的对象,有两种方法:
-
使用工厂类CuratorFrameworkFactory的静态newClient()方法。
public static CuratorFramework newClient(String connectString,RetryPolicy retryPolicy) public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
- connectString:指的是需要连接的zookeeperAddress ip
- retryPolicy:指的是连接zk时使用哪种重试策略
- sessionTimeoutMs:指的是会话超时时间
- connectionTimeoutMs:指的是连接超时时间
代码实现如下:
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="ip:2181";public static void main(String[] args) throws Exception {// 重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); // //创建客户端实例CuratorFramework client = CuratorFrameworkFactory.newClient(CLUSTER_CONNECT_STR, retryPolicy);//启动客户端client.start();System.out.println("zookeeper初始化连接成功:" + client);//关闭连接client.close();} }
-
使用工厂类CuratorFrameworkFactory的静态builder构造者方法。
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="ip:2181";public static void main(String[] args) throws Exception {//重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).sessionTimeoutMs(5000) // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("base") // 包含隔离名称.build();//启动客户端client.start();System.out.println("zookeeper初始化连接成功:" + client);//关闭连接client.close();} }
-
connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。
-
retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
策略名称 描述 ExponentialBackoffRetry 重试一组次数,重试之间的睡眠时间增加 RetryNTimes 重试最大次数 RetryOneTime 只重试一次 RetryUntilElapsed 在给定的时间结束之前重试 - 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。
-
2、创建节点
主要方法有以下:
public T forPath(String path) //创建节点,并赋值内容public T forPath(String path, byte[] data)//判断节点是否存在,节点存在了,创建时仍然会报错public ExistsBuilder checkExists()
代码实现如下:
/*** 创建节点* @param nodePath 路径* @param data 数据*/
public void createNode(String nodePath, String data) throws Exception{if (StringUtils.isEmpty(nodePath)) {System.out.println("节点【" + nodePath + "】不能为空");}//1、对节点是否存在进行判断,否则会报错:【NodeExistsException: KeeperErrorCode = NodeExists for /root】Stat exists = client.checkExists().forPath(nodePath);if (null != exists) {System.out.println("节点【" + nodePath + "】已存在,不能新增");}//2、创建节点 永久节点client.create().forPath(nodePath);// 2.1 手动指定节点的类型client.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);//2.2 如果父节点不存在可创建当前的父节点String node = client.create().creatingParentsIfNeeded().forPath(nodePath);System.out.println(node);//创建节点,并为当前节点赋值内容if (!StringUtils.isBlank(data)) {//2.3、创建永久节点,并为当前节点赋值内容client.create().forPath(nodePath, data.getBytes());//2.4、创建永久有序节点client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(nodePath, data.getBytes());//2.5、创建临时节点client.create().withMode(CreateMode.EPHEMERAL).forPath(nodePath, data.getBytes());}//2.6、创建临时有序节点client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(nodePath, data.getBytes());
}
在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。
3、获取节点数据
主要的方法如下:
//获取某个节点数据
client.getData().forPath(nodePath)
//读取zookeeper的数据,并放到Stat中
client.getData().storingStatIn(stat1).forPath(nodePath)
参数说明:
path:指定要读取的节点
stat:指定数据节点的节点状态信息
4、设置修改节点
我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。
/*** 设置修改节点数据* @param nodePath 节点路径* @param data 数据*/
public void setData(String nodePath, String data) throws Exception{//更新节点Stat stat = client.setData().forPath(nodePath, data.getBytes());// 指定版本号,更新节点,更新的时候如果指定数据版本的话,那么需要和zookeeper中当前数据的版本要一致,-1表示匹配任何版本Stat stat1 = client.setData().withVersion(-1).forPath(nodePath, data.getBytes());//异步设置某个节点数据Stat stat2 = client.setData().inBackground().forPath(nodePath, data.getBytes());System.out.println(stat1.toString());
}
5、获取某个节点的子节点
/*** 获取某个节点的子节点* @param nodePath* @throws Exception*/
public void getChildren(String nodePath) throws Exception{List<String> list = client.getChildren().forPath(nodePath);for (String s : list) {System.out.println(s);}
}
6、删除节点
/*** 删除节点* @param nodePath 节点路径* @throws Exception*/
public void delete(String nodePath) throws Exception{client.delete().forPath(nodePath);//删除节点,即使出现网络故障,zookeeper也可以保证删除该节点client.delete().guaranteed().forPath(nodePath);//级联删除节点(如果当前节点有子节点,子节点也可以一同删除)client.delete().deletingChildrenIfNeeded().forPath(nodePath);
}
- guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
- deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。
3、功能接口实现
1、异步接口
Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。
public interface BackgroundCallback
{/*** Called when the async background operation completes** @param client the client* @param event operation result details* @throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
如上接口,主要参数为 client 客户端, 和 服务端事件 event ,inBackground 异步处理默认在EventThread中执行
制定线程池异步处理
/*** 指定线程池 获取节点数据* @param nodePath* @throws Exception*/
public void testExecutorService(String nodePath) throws Exception{ExecutorService executorService = Executors.newSingleThreadExecutor();byte[] data = client.getData().inBackground((o1, o2) -> {}, executorService).forPath(nodePath);System.out.println(new String(data));
}
2、Curator 监听器
API方法如下:
/*** Receives notifications about errors and background events*/
public interface CuratorListener
{/*** Called when a background task has completed or a watch has triggered** @param client client* @param event the event* @throws Exception any errors*/public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
针对 background 通知和错误通知。使用此监听器之后,调用inBackground 方法会异步获得监听
Curator Caches
Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。
node cache:
NodeCache 对某一个节点进行监听
public NodeCache(CuratorFramework client, String path);
可以通过注册监听器来实现,对当前节点数据变化的处理
public void addListener(NodeCacheListener listener)
代码实现:
public class CuratorNodeCacheTest {private static final String ZOOKEEPER_ADDRESS = "ip:2181";private static final int SESSION_TIMEOUT = 3000;public CuratorFramework client;public CuratorNodeCacheTest() {// 重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.newClient(ZOOKEEPER_ADDRESS, retryPolicy);client.start();}/**** @param nodePath 路径*/public void nodeCache(String nodePath) throws Exception {NodeCache nodeCache = new NodeCache(client, nodePath);//监听当前节点路径nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {//查询节点数据byte[] data = client.getData().forPath(nodePath);System.out.println(new String(data));}});nodeCache.start();}
}
path cache:
PathChildrenCache 会对子节点进行监听,但是不会对二级子节点进行监听,
public PathChildrenCache(CuratorFramework client,String path,boolean cacheData)
可以通过注册监听器来实现,对当前节点的子节点数据变化的处理
public void addListener(PathChildrenCacheListener listener)
/*** * @param nodePath 节点路径* @throws Exception*/
public void pathCache(String nodePath) throws Exception{PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println(event);}});// 如果设置为true则在首次启动时就会缓存节点内容到Cache中pathChildrenCache.start(true);
}
tree cache:
TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。
public TreeCache(CuratorFramework client, String path, boolean cacheData)
可以通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变化的处理
public void addListener(TreeCacheListener listener)
public void treeCache(String nodePath) throws Exception{TreeCache treeCache = new TreeCache(client, nodePath);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println(event);}});treeCache.start();
}
相关文章:

Zookeeper整合Java实战,不同客户端使用汇总
Java学习面试指南:https://javaxiaobear.cn ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有: ZooKeeper官方的Java客户端API。 第三方的Java客户端API,比如Curator。 ZooKeeper官方的客户…...

【python】Ubuntu下安装spyder及matplotlib中文显示
一、查看Ubuntu版本 $ lsb_release -a No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 22.04.3 LTS Release: 22.04 Codename: jammy尝试用cat /etc/debian_version命令,竟然可以显示出来Debian的版本。 $ cat /etc/debian_version …...

《运维人员的未来:IT界的“万金油“如何继续闪耀光芒》
文章目录 每日一句正能量前言35岁被称为运维半衰期,究竟为何?如何顺利过渡半衰期运维的职业发展路径后记 每日一句正能量 凡事顺其自然,遇事处于泰然,得意之时淡然,失意之时坦然,艰辛曲折必然,历…...

ip addr和ifconfig
ip addr可以显示更多信息,包括为启动的网络驱动如wlan,而ifocnfig只显示在线的驱动。若wlan是down的,则ip addr会显示信息,ifconfig不会显示信息。 ip addr: ifconfig:...

Crow:Middlewares 庖丁解牛7 after_handlers_call_helper
Crow:Middlewares 庖丁解牛6 middleware_call_helper-CSDN博客 介绍了对插件before_handle的调用 当完成了detail::middleware_call_helper的调用后,如果没有在before_handle中设置req被终止处理,也就是 if (!res.completed_) {need_to_call_after_handlers_ = true;handler…...

ts相关笔记(extends、infer、Pick、Omit)
最近刷了本ts小册,对一些知识点做下笔记。 extends extends 是一个关键字,用于对类型参数做一些约束。 A extends B 意味着 A 是 B 的子类型,比如下面是成立的 ‘abc’ extends string599 extends number 看下面例子: type …...

8.21 PowerBI系列之DAX函数专题-帕累托分析
需求 实现 1 按商品小类累积 var rollup_sales calculate(//计算当前累计销售额 [销售额], filter(allselected(order_2[产品小类]),sum(order_2[订单金额])<[销售额]) ) //按小类累积金额,filter内的销售额为选中的各小类的销售额 //金额从大到小累积,用&l…...

结构体-2-测试排名
22-结构体-2-测试排名 [命题人 : 外部导入] 时间限制 : 1.000 sec 内存限制 : 128 MB 题目描述 为了提升同学们的编程能力,老师们会在平时进行C语言的上机测试,了解班上同学的学习情况,对于一些测试成绩较差的同学,老师会进行督促…...

LeetCode刷题---快乐数
解题思路 该题的解题思路为使用哈希表来存储每次平方的和的结果,看是否有重复的数,如果存在第n次的平方和的数和第i次(i<n)平方和的数想等,那么它就不是一个快乐数。否则,则为快乐数。 代码实现: public boolean i…...

web前端游戏项目-辨色大比拼【附源码】
web前端游戏项目-辨色大比拼【附源码】 《辨色大比拼》是一个旨在测试和提升玩家颜色识别能力的在线游戏。在游戏中,玩家将通过辨识颜色来解谜并推进游戏进程。辨色大比拼也是一个寓教于乐的游戏,它不仅提供了一个有趣的辨色挑战,还能帮助玩…...

MongoDB操作_数据库_集合
.......................................................................................................................................................... 三、MongoDB操作 3.1 数据库操作 一个mongodb中可以建立多个数据库。 MongoDB的默认数据库为"test…...

50个免费的 AI 工具,提升工作效率(附网址)
上次我们已经介绍了20个精选的提高工作效率的免费AI工具,但如果你觉得这些AI工具还不过瘾的话,想进一步成为职场中最了解AI的人,本文将汇总介绍免费最新的50个AI工具。 DeepSwap DeepSwap 是一个基于 AI 的工具,适用于想要制作令人…...

g++ strip debug
strip(1) command_--strip-debug-CSDN博客 strip main.outll main.out -rwxr-xr-x 1 root root 6272 Mar 22 16:14 main.outfile main.out main.out: ELF 64-bit LSB executable, x86-64, version 1 (SYSV), dynamically linked (uses shared libs), for GNU/Linux 2.6.32, Bu…...

微服务实战系列之Dubbo(上)
前言 随着一年一度冬至的到来,2023的步伐也将远去。而博主的系列文章,也将从今天起,越来越聚焦如何构建微服务“内核”上。前序系列文章几乎囊括了微服务的方方面面,无论使用什么框架、组件或工具,皆可拿来用之。 那么…...

一篇讲透:箭头函数、普通函数有什么区别
前言 📫 大家好,我是南木元元,热衷分享有趣实用的文章,希望大家多多支持,一起进步! 🍅 个人主页:南木元元 目录 什么是箭头函数 箭头函数和普通函数的区别 更简洁的语法 箭头函数…...

第40节: Vue3 注册生命周期钩子
在UniApp中使用Vue3框架时,你可以注册生命周期钩子来执行特定的逻辑。以下是一个示例,演示了如何在UniApp中使用Vue3框架注册生命周期钩子: <template> <view> <p>{{ message }}</p> </view> </templ…...

docker给容器分配固定ip
1.为 Docker 容器设置一个固定的 IP 地址 要为 Docker 容器设置一个固定的 IP 地址,有几种常见的方法: 使用自定义网络和静态 IP 地址: 你可以创建一个自定义的 Docker 网络,并在这个网络上为容器分配静态 IP 地址。首先&#x…...

Hadoop入门学习笔记——二、在虚拟机里部署HDFS集群
视频课程地址:https://www.bilibili.com/video/BV1WY4y197g7 课程资料链接:https://pan.baidu.com/s/15KpnWeKpvExpKmOC8xjmtQ?pwd5ay8 Hadoop入门学习笔记(汇总) 目录 二、在虚拟机里部署HDFS集群2.1. 部署node1虚拟机2.2. 部…...

Spring之国际化:i18n
学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。各位小伙伴,如果您: 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持,想组团高效学习… 想写博客但无从下手,急需…...

Java读取类路径下的JSON文件并转换为实体列表
使用 Jackson 库来读取类路径下的 JSON 文件并将其转换为对应实体列表。 在实际开发中可能在本地环境中需要调用别人的接口,别人的接口如果还没开发好或者本地环境不支持外部接口调用的时候,可以读取json文件来造数据,方便调试。 以Student…...
复分析——第1章——复分析准备知识(E.M. Stein R. Shakarchi)
第一章 复分析准备知识 (Preliminaries to Complex Analysis) The sweeping development of mathematics during the last two centuries is due in large part to the introduction of complex numbers; paradoxically, this is based on the seemingly absurd no…...

C++ 继承方式
C++ 继承方式 实验介绍 本章节将学习权限关键字的使用,并将一一举例验证 public、protected、private 的使用,学完本小节实验后将彻底掌握权限关键字的使用。 知识点 权限关键字使用位置继承中的权限关键字public 继承protected 继承private 继承权限关键字使用位置 示例…...

华为云Windows Server服务器下,Node使用pm2-logrotate分割pm2日志,解决pm2日志内存占用过高的问题。
一、简介 PM2 是一个守护进程管理器,它将帮助您管理和保持您的应用程序在线。PM2 入门很简单,它以简单直观的 CLI 形式提供,可通过 NPM 安装。官网地址:https://pm2.keymetrics.io/ 二、问题:pm2日志内存占用过高&am…...

web3风险投资公司之Electric Capital
文章目录 什么是 Electric CapitalElectric团队 Electric Capital 开发者报告参考 什么是 Electric Capital 官网:https://www.electriccapital.com/ 官方github:https://github.com/electric-capital Electric Capital 是一家投资于加密货币、区块链企…...

为什么员工都非常抵触「绩效考核」,该怎么办呢?
员工抵制绩效考核的原因可能有很多,其中一些常见的原因包括: 考核方式不公正:如果考核方式不够客观、公正,或者与员工的实际工作情况不符,员工就会对绩效考核产生不信任感,从而产生抵触情绪。 工作压力增大…...

[node]Node.js 模块系统
[node]模块系统 Node.js中的模块系统模块的使用模块的导入模块的导出导出多个值导出默认值导出可传参的函数 文件查找策略从文件模块缓存中加载从原生模块加载从文件加载 Node.js中的模块系统 为了让Node.js的文件可以相互调用,Node.js提供了一个简单的模块系统。 …...

【数据结构】什么是二叉树?
🦄个人主页:修修修也 🎏所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 目录 📌二叉树的定义 📌二叉树的特点 📌特殊二叉树 📌二叉树的性质 📌二叉树的存储结构 📌二叉树…...

C#教程(四):多态
1、介绍 1.1 什么是多态 在C#中,多态性(Polymorphism)是面向对象编程中的一个重要概念,它允许不同类的对象对同一消息做出响应,即同一个方法可以在不同的对象上产生不同的行为。C#中的多态性可以通过以下几种方式实现…...

电力系统风储联合一次调频MATLAB仿真模型
微❤关注“电气仔推送”获得资料(专享优惠) 简介: 同一电力系统在不同风电渗透率下遭受同一负荷扰动时,其频率变化规律所示: (1)随着电力系统中风电渗透率的不断提高,风电零惯性响…...