zk的应用主要是针对三类:
在紫云等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供网站制作、做网站 网站设计制作定制网站建设,公司网站建设,企业网站建设,品牌网站建设,成都全网营销,成都外贸网站建设公司,紫云网站建设费用合理。
在学Java API之前,我们先来了解一下zookeeper的常用命令。
连接zookeeper server。
[root@jt2 bin]# sh zkCli.sh -server 127.0.0.1:2181
获取帮助help。
连接远程节点。
connect 192.168.8.75:2181
关闭连接。
close
显示集群。
[zk: localhost:2181(CONNECTED) 0] config
server.0=jt2:2888:3888:participant
server.1=jt3:2888:3888:participant
server.2=jt4:2888:3888:participant
version=0
创建一个znode。
命令语法:create [-s] [-e] [-c] [-t ttl] path [data] [acl]
-s:创建的是带序列号的节点,序列号用0填充节点路径。
-e:创建的是临时节点。
-c:创建的是容器节点
path:znode的路径,ZooKeeper中没有相对路径,所有路径都必须以’/'开头。
data:znode携带的数据。
acl:这个节点的ACL。
#创建一个永久节点
[zk: localhost:2181(CONNECTED) 2] create /zkBase
Created /zkBase
#创建一个临时节点
[zk: localhost:2181(CONNECTED) 3] create -e /ephemeral_node
Created /ephemeral_node
删除znode节点。
#删除节点前要求节点目录为空,不存在子节点
[zk: localhost:2181(CONNECTED) 34] delete /config
Node not empty: /config
[zk: localhost:2181(CONNECTED) 35] delete /config/topics/test
[zk: localhost:2181(CONNECTED) 27] delete /ephemeral_node
#如果要删除整个节点及子节点可以使用deleteall
[zk: 192.168.0.143:2181(CONNECTED) 36] deleteall /config
显示一个节点的状态。
[zk: localhost:2181(CONNECTED) 11] stat /test
cZxid = 0x180000000e
ctime = Thu Jul 28 03:25:08 CST 2022
mZxid = 0x180000000e
mtime = Thu Jul 28 03:25:08 CST 2022
pZxid = 0x180000000e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
查看路径子节点。
命令语法:ls [-s] [-w] [-R] path。
获取指定路径下的数据。
[zk: localhost:2181(CONNECTED) 16] get /zookeeper/config
server.0=jt2:2888:3888:participant
server.1=jt3:2888:3888:participant
server.2=jt4:2888:3888:participant
version=0
设置或者更新路径数据。
[zk: localhost:2181(CONNECTED) 19] set /test/hehe "haha"
[zk: localhost:2181(CONNECTED) 20] get /test/hehe
haha
设置ACL。
ACL权限 |
ACL 简写 |
允许的操作 |
CREATE |
c |
创建子节点 |
READ |
r |
获取节点的数据和它的子节点 |
WRITE |
w |
设置节点的数据 |
DELETE |
d |
删除子节点 (仅下一级节点) |
ADMIN |
a |
设置 ACL 权限 |
ZooKeeper内置了一些权限控制方案,可以用以下方案为每个节点设置权限:
方案 |
描述 |
world |
只有一个用户:anyone,代表所有人(默认) |
ip |
使用IP地址认证 |
auth |
使用已添加认证的用户认证 |
digest |
使用“用户名:密码”方式认证 |
[zk: localhost:2181(CONNECTED) 21] getAcl /test
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 22] create /mynode1 hello
Created /mynode1
[zk: localhost:2181(CONNECTED) 23] addauth digest admin:admin
[zk: localhost:2181(CONNECTED) 24] setAcl /mynode1 auth:admin:cdrwa
[zk: localhost:2181(CONNECTED) 25] getAcl /mynode1
'digest,'admin:x1nq8J5GOJVPY6zgzhtTtA9izLc=
: cdrwa
同步数据集群间数据。
[zk: localhost:2181(CONNECTED) 26] sync /
Sync is OK
查看命令执行历史。
[zk: localhost:2181(CONNECTED) 27] history
17 - help
18 - getAllChildrenNumber /zookeeper
19 - set /test/hehe "haha"
20 - get /test/hehe
21 - getAcl /test
22 - create /mynode1 hello
23 - addauth digest admin:admin
24 - setAcl /mynode1 auth:admin:cdrwa
25 - getAcl /mynode1
26 - sync /
27 - history
退出客户端。
[zk: localhost:2181(CONNECTED) 28] quit
WATCHER::
WatchedEvent state:Closed type:None path:null
2022-07-28 03:33:49,307 [myid:] - INFO [main:ZooKeeper@1422] - Session: 0xcebb0001 closed
2022-07-28 03:33:49,308 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0xcebb0001
zookeeper客户端和服务器会话的建立是一个异步的过程,也就是说在程序中,程序方法在处理完客户端初始化后立即返回(即程序继续往下执行代码,这样,在大多数情况下并没有真正的构建好一个可用会话,在会话的生命周期处于“CONNECTED”时才算真正的建立完毕,所以需要使用到多线程中的一个工具类CountDownLatch)。
(一共有4个构造方法,根据参数不同)。
Zookeeper(String connectString,int sessionTimeout,Watcher watcher)
Zookeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly)
Zookeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd)
Zookeeper(String connectString,int sessionTimeout,Watcher watcher,long sessionId,byte[] sessionPasswd,boolean canBeReadOnly)
参数说明:
注意,整个创建会话的过程是异步的,构造方法会在初始化连接后即返回,并不代表真正建立好了一个会话,此时会话处于"CONNECTING"状态。当会话真正创建起来后,服务器会发送事件通知给客户端,只有客户端获取到这个通知后,会话才真正建立。
public class ZkConnect implements Watcher {
private static final Logger log = LoggerFactory.getLogger(ZkConnect.class);
//public static final String zkServerPath = "192.168.8.74:2181,192.168.8.75:2181,192.168.8.76:2181";
public static final String zkServerPath = "127.0.0.1:2181";
public static final Integer timeout = 5000;
public static CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* 客户端与zkServer连接是一个异步的过程,当连接成功后,客户端会收到一个watch通知
* 参数:
* connectString: 连接服务器的ip字符串
* sessionTimeout: 超时时间,心跳收不到了,就超时
* watcher: 通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,就设置为null
* canBeReadOnly: 可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写;此时数据被读取到的可能
* 是旧数据,此处建议设置为false
* sessionId: 会话id
* sessionPasswd: 会话密码,当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话
*/
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkConnect());
log.warn("客户端开始连接zookeeper服务器。连接状态: {}", zk.getState());
countDownLatch.await(); // 如果不停顿一段时间, 会收不到watch通知
log.warn("连接状态: {}", zk.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接收到watch通知: {}", event);
countDownLatch.countDown();
}
}
public class ZkReconnect implements Watcher {
private static final Logger log = LogManager.getLogger(ZkReconnect.class);
public static final String zkServerPath = "127.0.0.1:2181";
public static final Integer timeout = 5000;
public static CountDownLatch countDownLatch1 = new CountDownLatch(1);
public static CountDownLatch countDownLatch2 = new CountDownLatch(2);
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkReconnect());
long sessionId = zk.getSessionId();
byte[] sessionPasswd = zk.getSessionPasswd();
log.warn("客户端开始连接zookeeper服务器。连接状态: {}", zk.getState());
countDownLatch1.await(); // 如果不停顿一段时间, 会收不到watch通知
log.warn("连接状态: {}", zk.getState());
Thread.sleep(1000);
log.warn("开始会话重连...");
ZooKeeper zkSession = new ZooKeeper(zkServerPath, timeout, new ZkReconnect(), sessionId, sessionPasswd);
log.warn("重新连接, 状态: {}", zk.getState());
countDownLatch2.await();
log.warn("重新连接, 状态: {}", zk.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接收到watch通知: {}", event);
countDownLatch1.countDown();
countDownLatch2.countDown();
}
}
提供了两套创建节点的方法,同步和异步创建节点方式。
String create(final String path,byte data[],Listacl,CreateMode createMode);//同步方式创建
void create(final String path,byte data[],Listacl,CreateMode createMode,StringCallback cb,Object ctx);//异步方式创建
path:节点路径(名称):/nodeName。不允许递归创建节点,在父节点不存在的情况下,不允许创建子节点。
data[]:节点内容:要求类型是字节数组,也就是说不支持序列话方式,如果需要实现序列化,可使用java相关序列化框架,如Hessian,Kryo。
acl:节点权限:使用Ids.OPEN_ACL_UNSAFE开放权限即可。
createMode:节点类型:创建节点的类型,CreateMode.*,提供了如下所示的四种节点类型:
异步方式(在同步方法参数的基础上增加两个参数):
cb:回调方法:注册一个异步回调方法,要实现。
AsynCallBack.StringCallBack接口,重写processResult(int rc, String path, Object ctx, String name)方法,当节点创建完成后执行此方法。
ctx:传递给回调方法的参数,一般为上下文(Context)信息。
public class ZkNodeCreate implements Watcher {
private ZooKeeper zooKeeper = null;
private static final Logger log = LoggerFactory.getLogger(ZkNodeCreate.class);
private static final String zkServerPath = "127.0.0.1:2181";
private static final Integer timeout = 5000;
public ZkNodeCreate() {}
public ZkNodeCreate(String connectString) {
try {
zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeCreate());
} catch (Exception e) {
e.printStackTrace();
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
ZkNodeCreate zkNodeOperator = new ZkNodeCreate(zkServerPath);
zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
new CountDownLatch(1).await();
}
/**
* 同步或异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
* 参数:
* path: 创建的路径
* data: 存储的数据
* acl: 控制权限策略. Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
* Ids.CREATOR_ALL_ACL --> auth:user:password:cdrwa
* createMode: 节点类型,是一个枚举
* PERSISTENT 持久节点
* PERSISTENT_SEQUENTIAL 持久顺序节点
* EPHEMERAL 临时节点
* EPHEMERAL_SEQUENTIAL 临时顺序节点
*
* @param path
* @param data
* @param acls
*/
private void createZKNode(String path, byte[] data, ArrayListacls) {
String result = "";
try {
// 同步创建
//result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL);
//log.warn("同步创建临时节点: {} 成功。", result);
// 异步创建
String ctx = "{'create':'success'}";
zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL, new CreateNodeCallBack(), ctx);
Thread.sleep(5000);
log.warn("异步创建临时节点: {} 成功。", result);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
log.warn("客户端连接接收到watch通知: {}", event);
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
private static class CreateNodeCallBack implements AsyncCallback.StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.warn("异步创建节点:{}, ctx: {}", path, (String)ctx);
}
}
}
修改节点数据。
public class ZkNodeUpdate implements Watcher {
private当前标题:Zookeeper系列—Zookeeper应用及常用命令
标题链接:http://www.shufengxianlan.com/qtweb/news4/69354.html网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联