Zookeeper(后续简称ZK)是一个分布式的,开放源码的分布式应用程序协调服务,通常以集群模式运转,其协调能力可以理解为是基于观察者设计模式来实现的;ZK服务会使用Znode存储使用者的数据,并将这些数据以树形目录的形式来组织管理,支持使用者以观察者的角色指定自己关注哪些节点\数据的变更,当这些变更发生时,ZK会通知其观察者;为满足本篇目标所需,着重介绍以下几个关键特性:
创新互联是专业的凤冈网站建设公司,凤冈接单;提供成都网站建设、网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行凤冈网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
ZooKeeper's Hierarchical Namespace
ZooKeeper Service
监听机制:给某个节点注册监听器,该节点一旦发生变更(例如更新或者删除),监听者就会收到一个Watch Event,可以感知到节点\数据的变更
临时节点:session链接断开临时节点就没了,不能创建子节点(很关键)
ZK的分布式锁正是基于以上特性来实现的,简单来说是:
可能读者是单篇阅读,这里引入上一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:
基于上文的内容,这里简单总结一下ZK的能力矩阵(其它分布式锁的情况会在后续文章中补充):
能力 |
ZK |
MySql |
Redis原生 |
Redlock |
ETCD |
互斥 |
是 |
||||
安全 |
链接异常,session关闭后锁会自动释放 |
||||
可用性 |
相对还好 |
||||
可重入 |
线程可重入 |
||||
加解锁速度 |
居中 |
||||
阻塞非阻塞 |
都支持 |
||||
公平非公平 |
仅公平锁 |
关于性能不太高的一种说法
因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁。
由于ZooKeeper的高可用特性,在并发量不是太高的场景,也推荐使用ZK的分布式锁。
Zookeeper 客户端框架 Curator 提供的 InterProcessMutex 是分布式锁的一种实现,acquire 方法阻塞|非阻塞获取锁,release 方法释放锁,另外还提供了可撤销、可重入功能。
4.1 接口介绍
// 获取互斥锁
public void acquire() throws Exception;
// 在给定的时间内获取互斥锁
public boolean acquire(long time, TimeUnit unit) throws Exception;
// 释放锁处理
public void release() throws Exception;
// 如果当前线程获取了互斥锁,则返回true
boolean isAcquiredInThisProcess();
4.2 pom依赖
org.apache.logging.log4j
log4j-core
2.8.2
org.apache.zookeeper
zookeeper
3.5.7
org.apache.curator
curator-framework
4.3.0
org.apache.curator
curator-recipes
4.3.0
org.apache.curator
curator-client
4.3.0
4.3 示例
package com.atguigu.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
public static void main(String[] args) {
// 创建分布式锁1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 创建分布式锁2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("线程1 获取到锁");
lock1.acquire();
System.out.println("线程1 再次获取到锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程1 释放锁");
lock1.release();
System.out.println("线程1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("线程2 获取到锁");
lock2.acquire();
System.out.println("线程2 再次获取到锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程2 释放锁");
lock2.release();
System.out.println("线程2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();
// 启动客户端
client.start();
System.out.println("zookeeper 启动成功");
return client;
}
}
通过这个实例对照第2节内容来理解加解锁的流程,以及如何避免惊群效应。
package com.rock.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* zk 分布式锁 v1版本:
* 完成功能 :
* 1. 避免了惊群效应
* 缺失功能:
* 1. 超时控制
* 2. 读写锁
* 3. 重入控制
*/
public class DistributedLock {
private String connectString;
private int sessionTimeout;
private ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentNode;
private String LOCK_ROOT_PATH;
private static String NODE_PREFIX = "w";
public DistributedLock(String connectString, int sessionTimeout, String lockName){
//TODO:数据校验
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
this.LOCK_ROOT_PATH = lockName;
}
public void init() throws IOException, KeeperException, InterruptedException {
// 建联
zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
// connectLatch 连接上zk后 释放
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
});
connectLatch.await();// 等待zk正常连接后
// 判断锁名称节点是否存在
Stat stat = zk.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
// 创建一下锁名称节点
try {
zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
//并发创建冲突忽略。
if (!e.code().name().equals("NODEEXISTS")) {
throw e;
}
}
}
}
/**
* 待补充功能:
* 1. 超时设置
* 2. 读写区分
* 3. 重入控制
*/
public void zklock() throws KeeperException, InterruptedException {
if (!tryLock()) {
waitLock();
zklock();
}
}
/**
*
*/
private void waitLock() throws KeeperException, InterruptedException {
try {
zk.getData(waitPath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent){
// waitLatch 需要释放
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
}, new Stat());
// 等待监听
waitLatch.await();
} catch (KeeperException.NoNodeException e) {
//如果等待的节点已经被清除了,不等了,再尝试去抢锁
return;
}
}
private boolean tryLock() throws KeeperException, InterruptedException {
currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
Listchildren = zk.getChildren(LOCK_ROOT_PATH, false);
// 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
if (children.size() == 1) {
return true;
} else {
String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1);
// 通过w00000000获取该节点在children集合的位置
int index = children.indexOf(thisNode);
if (index == 0) {
//自己就是第一个节点
return true;
}
// 需要监听 他前一个节点变化
waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1);
}
return false;
}
// 解锁
public void unZkLock(){
// 删除节点
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
本文转载自微信公众号「架构染色」,可以通过以下二维码关注。转载本文请联系【架构染色】公众号作者。
分享标题:分布式锁中-基于Zookeeper的实现
标题链接:http://www.shufengxianlan.com/qtweb/news30/40280.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联