一、分布式锁
1.1 思路
1)获取锁时创建一个有序临时节点
2)判断创建出来的节点的需要是否为最小的一个,若是则代表已获取到锁,若不是则获取失败
3)获取锁失败后,获取所有节点,找到上一个节点并监听
4)上一个节点删除后,会自动通知下一个节点获取到锁
5)若持有锁的程序异常挂掉后节点也会被删除,保证了不会死锁
6)释放锁时删除节点即可
1.2 实现
public class ZookeeperLock {
ZooKeeper zooKeeper = null;
private String connectString = "127.0.0.1:2181";
private String rootPath = "/locks";
private String lockPath = "/seq";
// 当前创建的节点路径
private String currentPath = null;
// 要监听的上一个节点路径
private String prePath = null;
CountDownLatch preNodeCountDownLatch = null;
public ZookeeperLock() throws IOException, InterruptedException, KeeperException {
// 建立连接
// 使用countDownLatch确保连接成功再进行其他操作
CountDownLatch countDownLatch = new CountDownLatch(1);
preNodeCountDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(connectString, 2000, watchedEvent -> {
System.out.println("zookeeper watcher path:" + watchedEvent.getPath()
+ " state:" + watchedEvent.getState().name()
+ " type:" + watchedEvent.getType().name());
// 监听连接状态
if (countDownLatch.getCount() > 0l && watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接zookeeper服务成功!");
countDownLatch.countDown();
}
// 监听上一个节点删除
if (preNodeCountDownLatch.getCount() > 0l && watchedEvent.getPath().equals(prePath) && watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
System.out.println("上一个节点被删除!");
preNodeCountDownLatch.countDown();
}
});
countDownLatch.await();
// 创建根节点
Stat stat = zooKeeper.exists(rootPath, false);
if (stat == null) {
try {
zooKeeper.create(rootPath, "this is a distribute lock;".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException exception) {
// 可能这个过程中已经被创建出来了
stat = zooKeeper.exists(rootPath, false);
if (stat == null) {
throw exception;
}
}
}
}
public void lock() {
try {
// 创建节点
currentPath = zooKeeper.create(rootPath + lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取所有子节点
List<String> children = zooKeeper.getChildren(rootPath, false);
if (children.size() == 1) {// 只有一个节点则代表获取到锁
return;
} else {
//
Collections.sort(children);
String currentNodeName = currentPath.substring(rootPath.length() + 1);
int index = children.indexOf(currentNodeName);
if (index < 0) {
// 节点未获取到,可能被人为干掉了,此时当前实例就不能获得锁了
throw new NullPointerException("获取锁失败!");
} else if (index == 0) {
// 当前节点排序最小,直接获得锁
return;
} else {
// 监听上一个节点的变化
prePath = rootPath + "/" + children.get(index - 1);
System.out.println("上一个节点是" + prePath);
try {
zooKeeper.getData(prePath, true, null);
} catch (KeeperException exception) {
// 上一个节点可能被释放了,所以获取异常时需要判断节点存不存在了,不存在则直接获取锁
Stat stat = zooKeeper.exists(prePath, false);
if (stat == null) {
return;
} else {
throw exception;
}
}
preNodeCountDownLatch.await();
return;
}
}
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
public void unlock() throws KeeperException, InterruptedException {
// 删除节点
zooKeeper.delete(currentPath, -1);
}
}
1.3 测试
通过并发计数来测试锁:
public class LockTest {
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(3);
// zookeeper同一个Ip默认限制最大连接为60,要测更复杂的情况需要修改配置
NumTest numTest = new NumTest();
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new SumRunnable(numTest, countDownLatch));
thread.start();
}
countDownLatch.await();
System.out.println(numTest.getNum());
}
static class NumTest {
private Integer num;
public NumTest() {
this.num = 0;
}
public void add() throws Exception {
ZookeeperLock zookeeperLock = new ZookeeperLock();
zookeeperLock.lock();
// synchronized (this){
this.num++;
// }
zookeeperLock.unlock();
}
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
}
static class SumRunnable implements Runnable {
NumTest numTest;
CountDownLatch countDownLatch;
public SumRunnable(NumTest numTest, CountDownLatch countDownLatch) {
this.numTest = numTest;
this.countDownLatch = countDownLatch;
}
@SneakyThrows
@Override
public void run() {
for (int i = 0; i < 20; i++) {
numTest.add();
}
System.out.println("执行完成 " + Thread.currentThread().getName());
countDownLatch.countDown();
}
}
}
二、服务注册以及发现
2.1 思路
1)服务启动时用自己的服务名称创建一个有序临时节点,其值为自己的调用路径
2)服务异常终止后这个节点会自动删掉
3)消费者启动后监听服务路径
2.2 实现
服务提供者:
public class DistributeServer {
ZooKeeper zooKeeper = null;
private String connectString = "127.0.0.1:2181";
private String serverRootPath = "/servers";
private String serverName = "/test-server";
private String serverUrl = "127.0.0.1:8080";
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServer distributeServer = new DistributeServer();
// 获取连接
distributeServer.getConnect();
// 注册服务
distributeServer.register();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
/**
* 注册服务
*
* @throws KeeperException
* @throws InterruptedException
*/
private void register() throws KeeperException, InterruptedException {
// 利用临时的顺序节点存储服务信息,服务断开后会自动删除服务
zooKeeper.create(serverRootPath + serverName, serverUrl.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
private void getConnect() throws IOException {
zooKeeper = new ZooKeeper(connectString, 2000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
}
消费者:
public class DistributeClient {
ZooKeeper zooKeeper = null;
private String connectString = "127.0.0.1:2181";
private String serverRootPath = "/servers";
private Map<String, Set<String>> serverUrlsMap = null;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient distributeClient = new DistributeClient();
// 获取连接
distributeClient.getConnect();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
/**
* 获取所有服务
*
* @throws KeeperException
* @throws InterruptedException
*/
private void listServers() throws KeeperException, InterruptedException {
System.out.println("---------监听所有服务开始---------");
serverUrlsMap = new HashMap<>();
List<String> children = zooKeeper.getChildren(serverRootPath, true);
for (String node : children) {
String serverName = node.substring(0, node.length() - 10);
byte[] data = zooKeeper.getData(serverRootPath + "/" + node, false, null);
Set<String> urls = serverUrlsMap.get(serverName);
if (urls == null) {
urls = new HashSet<>();
}
urls.add(Strings.fromByteArray(data));
serverUrlsMap.put(serverName, urls);
}
System.out.println("所有服务:" + serverUrlsMap.toString());
System.out.println("---------监听所有服务结束---------");
}
/**
* 获取连接
*
* @throws IOException
*/
private void getConnect() throws IOException {
// 通过watch监听服务
zooKeeper = new ZooKeeper(connectString, 2000, new Watcher() {
@SneakyThrows
@Override
public void process(WatchedEvent watchedEvent) {
listServers();
}
});
}
}