Zookeeper系列—Zookeeper应用及常用命令

第1章 常用命令

zk的应用主要是针对三类:

在紫云等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供网站制作、做网站 网站设计制作定制网站建设,公司网站建设,企业网站建设,品牌网站建设,成都全网营销,成都外贸网站建设公司,紫云网站建设费用合理。

  • java原生zk客户端的API操作(不用去学这部分内容,会增加太多的学习成本,了解一下就好了)。
  • zkClient的使用,它是对Zookeeper原生API的封装。
  • Apache Curator,也是对Zookeeper API 的封装(本文讲的应用针对这部分内容)。

在学Java API之前,我们先来了解一下zookeeper的常用命令。

连接zookeeper server。

[root@jt2 bin]# sh zkCli.sh -server 127.0.0.1:2181

获取帮助help。

连接远程节点。

connect 192.168.8.75:2181

关闭连接。

close

显示集群。

[zk: localhost:2181(CONNECTED) 0] config
server.0=jt2:2888:3888:participant
server.1=jt3:2888:3888:participant
server.2=jt4:2888:3888:participant
version=0

创建一个znode。

命令语法:create [-s] [-e] [-c] [-t ttl] path [data] [acl]

-s:创建的是带序列号的节点,序列号用0填充节点路径。
-e:创建的是临时节点。
-c:创建的是容器节点
path:znode的路径,ZooKeeper中没有相对路径,所有路径都必须以’/'开头。
data:znode携带的数据。
acl:这个节点的ACL。
#创建一个永久节点
[zk: localhost:2181(CONNECTED) 2] create /zkBase
Created /zkBase
#创建一个临时节点
[zk: localhost:2181(CONNECTED) 3] create -e /ephemeral_node
Created /ephemeral_node

删除znode节点。

#删除节点前要求节点目录为空,不存在子节点
[zk: localhost:2181(CONNECTED) 34] delete /config
Node not empty: /config
[zk: localhost:2181(CONNECTED) 35] delete /config/topics/test
[zk: localhost:2181(CONNECTED) 27] delete /ephemeral_node
#如果要删除整个节点及子节点可以使用deleteall
[zk: 192.168.0.143:2181(CONNECTED) 36] deleteall /config

显示一个节点的状态。

[zk: localhost:2181(CONNECTED) 11] stat /test
cZxid = 0x180000000e
ctime = Thu Jul 28 03:25:08 CST 2022
mZxid = 0x180000000e
mtime = Thu Jul 28 03:25:08 CST 2022
pZxid = 0x180000000e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0

查看路径子节点。

命令语法:ls [-s] [-w] [-R] path。

  • -s 同时显示stat信息。
  • -w 只显示子节点信息,默认选项。
  • -R 递归显示。

获取指定路径下的数据。

[zk: localhost:2181(CONNECTED) 16] get /zookeeper/config
server.0=jt2:2888:3888:participant
server.1=jt3:2888:3888:participant
server.2=jt4:2888:3888:participant
version=0

设置或者更新路径数据。

[zk: localhost:2181(CONNECTED) 19] set /test/hehe "haha"
[zk: localhost:2181(CONNECTED) 20] get /test/hehe
haha

设置ACL。

ACL权限

ACL 简写

允许的操作

CREATE

c

创建子节点

READ

r

获取节点的数据和它的子节点

WRITE

w

设置节点的数据

DELETE

d

删除子节点 (仅下一级节点)

ADMIN

a

设置 ACL 权限

ZooKeeper内置了一些权限控制方案,可以用以下方案为每个节点设置权限:

方案

描述

world

只有一个用户:anyone,代表所有人(默认)

ip

使用IP地址认证

auth

使用已添加认证的用户认证

digest

使用“用户名:密码”方式认证

[zk: localhost:2181(CONNECTED) 21] getAcl /test
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 22] create /mynode1 hello
Created /mynode1
[zk: localhost:2181(CONNECTED) 23] addauth digest admin:admin
[zk: localhost:2181(CONNECTED) 24] setAcl /mynode1 auth:admin:cdrwa
[zk: localhost:2181(CONNECTED) 25] getAcl /mynode1
'digest,'admin:x1nq8J5GOJVPY6zgzhtTtA9izLc=
: cdrwa

同步数据集群间数据。

[zk: localhost:2181(CONNECTED) 26] sync /
Sync is OK

查看命令执行历史。

[zk: localhost:2181(CONNECTED) 27] history
17 - help
18 - getAllChildrenNumber /zookeeper
19 - set /test/hehe "haha"
20 - get /test/hehe
21 - getAcl /test
22 - create /mynode1 hello
23 - addauth digest admin:admin
24 - setAcl /mynode1 auth:admin:cdrwa
25 - getAcl /mynode1
26 - sync /
27 - history

退出客户端。

[zk: localhost:2181(CONNECTED) 28] quit
WATCHER::
WatchedEvent state:Closed type:None path:null
2022-07-28 03:33:49,307 [myid:] - INFO [main:ZooKeeper@1422] - Session: 0xcebb0001 closed
2022-07-28 03:33:49,308 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0xcebb0001

第2章 Java API使用

zookeeper客户端和服务器会话的建立是一个异步的过程,也就是说在程序中,程序方法在处理完客户端初始化后立即返回(即程序继续往下执行代码,这样,在大多数情况下并没有真正的构建好一个可用会话,在会话的生命周期处于“CONNECTED”时才算真正的建立完毕,所以需要使用到多线程中的一个工具类CountDownLatch)。

1、创建会话

(一共有4个构造方法,根据参数不同)。

Zookeeper(String connectString,int sessionTimeout,Watcher watcher)
Zookeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly)
Zookeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd)
Zookeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd,boolean canBeReadOnly)

参数说明:

  • connectString :host:port指定的服务器列表,多个host:port之间用英文逗号分隔。还可以可选择地指定一个基路径,如果指定了一个基路径,则所有后续操作基于这个及路径进行。
  • sessionTimeOut:会话超时时间。以毫秒为单位。客户端和服务器端之间的连接通过心跳包进行维系,如果心跳包超过这个指定时间则认为会话超时失效。
  • watcher:指定默认观察者。如果为null表示不需要观察者。
  • canBeReadOnly :是否支持只读服务。只当一个服务器失去过半连接后不能再进行写入操作时,是否继续支持读取操作。
  • sessionId、SessionPassword:会话编号 会话密码(通过两个确定唯一一台客户端),用来实现会话恢复(重复回话)。

注意,整个创建会话的过程是异步的,构造方法会在初始化连接后即返回,并不代表真正建立好了一个会话,此时会话处于"CONNECTING"状态。当会话真正创建起来后,服务器会发送事件通知给客户端,只有客户端获取到这个通知后,会话才真正建立。

代码演示:

public class ZkConnect implements Watcher {
private static final Logger log = LoggerFactory.getLogger(ZkConnect.class);
//public static final String zkServerPath = "192.168.8.74:2181,192.168.8.75:2181,192.168.8.76:2181";
public static final String zkServerPath = "127.0.0.1:2181";
public static final Integer timeout = 5000;
public static CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* 客户端与zkServer连接是一个异步的过程,当连接成功后,客户端会收到一个watch通知
* 参数:
* connectString: 连接服务器的ip字符串
* sessionTimeout: 超时时间,心跳收不到了,就超时
* watcher: 通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,就设置为null
* canBeReadOnly: 可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写;此时数据被读取到的可能
* 是旧数据,此处建议设置为false
* sessionId: 会话id
* sessionPasswd: 会话密码,当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话
*/
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkConnect());
log.warn("客户端开始连接zookeeper服务器。连接状态: {}", zk.getState());
countDownLatch.await(); // 如果不停顿一段时间, 会收不到watch通知
log.warn("连接状态: {}", zk.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接收到watch通知: {}", event);
countDownLatch.countDown();
}
}
public class ZkReconnect implements Watcher {
private static final Logger log = LogManager.getLogger(ZkReconnect.class);
public static final String zkServerPath = "127.0.0.1:2181";
public static final Integer timeout = 5000;
public static CountDownLatch countDownLatch1 = new CountDownLatch(1);
public static CountDownLatch countDownLatch2 = new CountDownLatch(2);
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkReconnect());
long sessionId = zk.getSessionId();
byte[] sessionPasswd = zk.getSessionPasswd();
log.warn("客户端开始连接zookeeper服务器。连接状态: {}", zk.getState());
countDownLatch1.await(); // 如果不停顿一段时间, 会收不到watch通知
log.warn("连接状态: {}", zk.getState());
Thread.sleep(1000);
log.warn("开始会话重连...");
ZooKeeper zkSession = new ZooKeeper(zkServerPath, timeout, new ZkReconnect(), sessionId, sessionPasswd);
log.warn("重新连接, 状态: {}", zk.getState());
countDownLatch2.await();
log.warn("重新连接, 状态: {}", zk.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接收到watch通知: {}", event);
countDownLatch1.countDown();
countDownLatch2.countDown();
}
}

2、创建节点

提供了两套创建节点的方法,同步和异步创建节点方式。

String create(final String path,byte data[],List acl,CreateMode createMode);//同步方式创建
void create(final String path,byte data[],List acl,CreateMode createMode,StringCallback cb,Object ctx);//异步方式创建

同步方式:

path:节点路径(名称):/nodeName。不允许递归创建节点,在父节点不存在的情况下,不允许创建子节点。

data[]:节点内容:要求类型是字节数组,也就是说不支持序列话方式,如果需要实现序列化,可使用java相关序列化框架,如Hessian,Kryo。

acl:节点权限:使用Ids.OPEN_ACL_UNSAFE开放权限即可。

createMode:节点类型:创建节点的类型,CreateMode.*,提供了如下所示的四种节点类型:

  • PERSISTENT(持久节点)。
  • PERSISTENT_SEQUENTIAL(持久顺序节点)。
  • EPHEMERAL(临时节点,本次会话有效)。
  • EPHEMERAL_SEQUENTIAL(临时顺序节点,本次会话有效)。

异步方式(在同步方法参数的基础上增加两个参数):

cb:回调方法:注册一个异步回调方法,要实现。
AsynCallBack.StringCallBack接口,重写processResult(int rc, String path, Object ctx, String name)方法,当节点创建完成后执行此方法。

  • rc:服务端响应码,0表示调用成功、-4表示端口连接、-110表示指定节点存在、-112表示会话已过期。
  • path:接口调用时传入的数据节点的路径参数。
  • ctx:调用接口传入的ctx值。
  • name:实际在服务端创建的节点的名称。

ctx:传递给回调方法的参数,一般为上下文(Context)信息。

代码演示

public class ZkNodeCreate implements Watcher {
private ZooKeeper zooKeeper = null;
private static final Logger log = LoggerFactory.getLogger(ZkNodeCreate.class);
private static final String zkServerPath = "127.0.0.1:2181";
private static final Integer timeout = 5000;
public ZkNodeCreate() {}
public ZkNodeCreate(String connectString) {
try {
zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeCreate());
} catch (Exception e) {
e.printStackTrace();
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
ZkNodeCreate zkNodeOperator = new ZkNodeCreate(zkServerPath);
zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
new CountDownLatch(1).await();
}
/**
* 同步或异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
* 参数:
* path: 创建的路径
* data: 存储的数据
* acl: 控制权限策略. Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
* Ids.CREATOR_ALL_ACL --> auth:user:password:cdrwa
* createMode: 节点类型,是一个枚举
* PERSISTENT 持久节点
* PERSISTENT_SEQUENTIAL 持久顺序节点
* EPHEMERAL 临时节点
* EPHEMERAL_SEQUENTIAL 临时顺序节点
*
* @param path
* @param data
* @param acls
*/
private void createZKNode(String path, byte[] data, ArrayList acls) {
String result = "";
try {
// 同步创建
//result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL);
//log.warn("同步创建临时节点: {} 成功。", result);

// 异步创建
String ctx = "{'create':'success'}";
zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL, new CreateNodeCallBack(), ctx);
Thread.sleep(5000);
log.warn("异步创建临时节点: {} 成功。", result);

} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
log.warn("客户端连接接收到watch通知: {}", event);
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
private static class CreateNodeCallBack implements AsyncCallback.StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.warn("异步创建节点:{}, ctx: {}", path, (String)ctx);
}
}
}

3、节点操作

修改节点数据。

public class ZkNodeUpdate implements Watcher {
private 当前标题:Zookeeper系列—Zookeeper应用及常用命令
标题链接:http://www.shufengxianlan.com/qtweb/news4/69354.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联