大佬教程收集整理的这篇文章主要介绍了Zookeeper(2)-分布式锁的基础实现,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
在进行分布式锁操作之前,我们得知道什么是分布式锁。在单体应用中,使用 Java API 自带的 Lock 或者是 synchronize 就可以解决多线程带来的并发问题。但是在集群环境
中,上述的方法并不能解决服务与服务之间的并发问题。
分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务
使用 Zookeeper 创建临时顺序节点,判断自己是不是当前节点下的最小节点
,是的话就是获取到了锁,直接执行业务代码。不是的话,便对前一个节点进行监听。获取到锁,执行完业务代码后,delete 节点释放当前锁,然后下面的节点接收到通知。
public class DiStributedLock {
private final String connectString = "192.168.3.33:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zooKeeper;
private final String rootNode = "locks";
private final String subNode = "seq-";
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentNode;
public DiStributedLock() throws IOException, KeeperException, InterruptedException {
// 获取连接
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件
if (event.getType() == Event.EventType.Nodedeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
connectLatch.await();
// 判断节点/locks 是否存在
Stat stat = zooKeeper.exists("/" + rootNode, falsE);
// 如果根节点不存在则创建永久根节点
if (stat == null) {
System.out.println("根节点不存在!");
zooKeeper.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 获取锁
public void zkLock() throws KeeperException, InterruptedException {
// 在根节点下创建临时顺序节点,返回值为创建的节点路径
currentNode = zooKeeper.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取所有的节点
List <String> children = zooKeeper.getChildren("/" + rootNode, falsE);
// 列表中只有一个节点,就直接获取到锁
if (children.size() == 0) {
return;
} else {
// 对节点进行排序
Collections.sort(children);
//当前节点名称
String thisnode = currentNode.subString(("/" + rootNode + "/").length());
// 获取当前节点在数组中的位置
int indexOf = children.indexOf(thisnodE);
if (indexOf == -1) {
System.out.println("数据异常");
} else if (indexOf == 0) {
// index == 0 说明 thisnode 在列表中最小,当前 client 获取锁
return;
} else {
// 获得排名比 currentNode 前 1 位的节点
this.waitPath = "/" + rootNode + "/" + children.get(indexOf - 1);
// 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
zooKeeper.getData(waitPath, true, new Stat());
waitLatch.await();
return;
}
}
}
// 释放锁
public void unZkLock() {
try {
zooKeeper.delete(this.currentNode, -1);
} catch (InterruptedException | KeeperException E) {
e.printStackTrace();
}
}
}
public class DiStributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
// 创建分布式锁 1
final DiStributedLock lock1 = new DiStributedLock();
// 创建分布式锁 2
final DiStributedLock lock2 = new DiStributedLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("线程 1 获取锁");
Thread.sleep(5 * 1000);
lock1.unZkLock();
System.out.println("线程 1 释放锁");
} catch (Exception E) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("线程 2 获取锁");
Thread.sleep(5 * 1000);
lock2.unZkLock();
System.out.println("线程 2 释放锁");
} catch (Exception E) {
e.printStackTrace();
}
}
}).start();
}
}
线程 1 获取到锁了!
线程 1 再次获取到锁了!
休息一下!
线程 1 释放锁了!
线程 1 释放锁了!
线程 2 获取到锁了!
线程 2 再次获取到锁了!
休息一下!
线程 2 释放锁了!
线程 2 释放锁了!
在上面获取 Zookeeper 连接的代码中自定义 ZKClientConfig
配置信息,将 ENABLE_CLIENT_SASL_KEY
改成 false。
ZKClientConfig config = new ZKClientConfig();
config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
// 获取连接
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件
if (event.getType() == Event.EventType.Nodedeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
}, config);
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
public class CuratorLockTest {
// 测试代码
public static void main(String[] args) throws Exception {
// 创建分布式锁1
InterProcessMutex locks1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 创建分布式锁2
InterProcessMutex locks2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取到锁
locks1.acquire();
System.out.println("线程 1 获取到锁了!");
locks1.acquire();
System.out.println("线程 1 再次获取到锁了!");
System.out.println("休息一下!");
Thread.sleep(5 * 1000);
locks1.release();
System.out.println("线程 1 释放锁了!");
locks1.release();
System.out.println("线程 1 释放锁了!");
} catch (Exception E) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取到锁
locks2.acquire();
System.out.println("线程 2 获取到锁了!");
locks2.acquire();
System.out.println("线程 2 再次获取到锁了!");
System.out.println("休息一下!");
Thread.sleep(5 * 1000);
locks2.release();
System.out.println("线程 2 释放锁了!");
locks2.release();
System.out.println("线程 2 释放锁了!");
} catch (Exception E) {
e.printStackTrace();
}
}
}).start();
}
// 创建连接
private static CuratorFramework getCuratorFramework() throws Exception {
ExponentialBACkoffRetry BACkoffRetry = new ExponentialBACkoffRetry(3000, 3);
DefaultZookeeperFactory zookeeperFactory = new DefaultZookeeperFactory();
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
.sessionTimeoutms(2000).retryPolicy(BACkoffRetry)..build();
client.start();
System.out.println("客户端启动成功!");
return client;
}
}
线程 2 获取到锁了!
线程 2 再次获取到锁了!
休息一下!
线程 2 释放锁了!
线程 2 释放锁了!
线程 1 获取到锁了!
线程 1 再次获取到锁了!
休息一下!
线程 1 释放锁了!
线程 1 释放锁了!
使用 Curator 出现这个问题的方案还是和上面原生的是一样,因为其本质还是通过 Zookeeper 的客户端代码去进行一个连接。
public class DefaultZookeeperFactory implements ZookeeperFactory {
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, Boolean b) throws Exception {
// 自定义 ZKClientConfig 配置
ZKClientConfig config = new ZKClientConfig();
config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
return new ZooKeeper(connectString, sessionTimeout, watcher, b, config);
}
}
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
.sessionTimeoutms(2000).retryPolicy(BACkoffRetry).
zookeeperFactory(zookeeperFactory).build();
以上是大佬教程为你收集整理的Zookeeper(2)-分布式锁的基础实现全部内容,希望文章能够帮你解决Zookeeper(2)-分布式锁的基础实现所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。