一、分布式锁

1.1 思路

1)获取锁时创建一个有序临时节点

2)判断创建出来的节点的需要是否为最小的一个,若是则代表已获取到锁,若不是则获取失败

3)获取锁失败后,获取所有节点,找到上一个节点并监听

4)上一个节点删除后,会自动通知下一个节点获取到锁

5)若持有锁的程序异常挂掉后节点也会被删除,保证了不会死锁

6)释放锁时删除节点即可

1.2 实现

public class ZookeeperLock {

    ZooKeeper zooKeeper = null;
    private String connectString = "127.0.0.1:2181";
    private String rootPath = "/locks";
    private String lockPath = "/seq";
    // 当前创建的节点路径
    private String currentPath = null;
    // 要监听的上一个节点路径
    private String prePath = null;
    CountDownLatch preNodeCountDownLatch = null;

    public ZookeeperLock() throws IOException, InterruptedException, KeeperException {
        // 建立连接
        // 使用countDownLatch确保连接成功再进行其他操作
        CountDownLatch countDownLatch = new CountDownLatch(1);
        preNodeCountDownLatch = new CountDownLatch(1);
        zooKeeper = new ZooKeeper(connectString, 2000, watchedEvent -> {
            System.out.println("zookeeper watcher path:" + watchedEvent.getPath()
                    + " state:" + watchedEvent.getState().name()
                    + " type:" + watchedEvent.getType().name());
            // 监听连接状态
            if (countDownLatch.getCount() > 0l && watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.out.println("连接zookeeper服务成功!");
                countDownLatch.countDown();
            }
            // 监听上一个节点删除
            if (preNodeCountDownLatch.getCount() > 0l && watchedEvent.getPath().equals(prePath) && watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
                System.out.println("上一个节点被删除!");
                preNodeCountDownLatch.countDown();
            }
        });
        countDownLatch.await();

        // 创建根节点
        Stat stat = zooKeeper.exists(rootPath, false);
        if (stat == null) {
            try {
                zooKeeper.create(rootPath, "this is a distribute lock;".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException exception) {
                // 可能这个过程中已经被创建出来了
                stat = zooKeeper.exists(rootPath, false);
                if (stat == null) {
                    throw exception;
                }
            }
        }
    }

    public void lock() {
        try {
            // 创建节点
            currentPath = zooKeeper.create(rootPath + lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 获取所有子节点
            List<String> children = zooKeeper.getChildren(rootPath, false);
            if (children.size() == 1) {// 只有一个节点则代表获取到锁
                return;
            } else {
                //
                Collections.sort(children);
                String currentNodeName = currentPath.substring(rootPath.length() + 1);
                int index = children.indexOf(currentNodeName);
                if (index < 0) {
                    // 节点未获取到,可能被人为干掉了,此时当前实例就不能获得锁了
                    throw new NullPointerException("获取锁失败!");
                } else if (index == 0) {
                    // 当前节点排序最小,直接获得锁
                    return;
                } else {
                    // 监听上一个节点的变化
                    prePath = rootPath + "/" + children.get(index - 1);
                    System.out.println("上一个节点是" + prePath);
                    try {
                        zooKeeper.getData(prePath, true, null);
                    } catch (KeeperException exception) {
                        // 上一个节点可能被释放了,所以获取异常时需要判断节点存不存在了,不存在则直接获取锁
                        Stat stat = zooKeeper.exists(prePath, false);
                        if (stat == null) {
                            return;
                        } else {
                            throw exception;
                        }
                    }
                    preNodeCountDownLatch.await();
                    return;
                }
            }
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    public void unlock() throws KeeperException, InterruptedException {
        // 删除节点
        zooKeeper.delete(currentPath, -1);
    }
}

1.3 测试

通过并发计数来测试锁:

public class LockTest {

    public static void main(String[] args) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        // zookeeper同一个Ip默认限制最大连接为60,要测更复杂的情况需要修改配置
        NumTest numTest = new NumTest();
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(new SumRunnable(numTest, countDownLatch));
            thread.start();
        }
        countDownLatch.await();
        System.out.println(numTest.getNum());

    }

    static class NumTest {
        private Integer num;

        public NumTest() {
            this.num = 0;
        }

        public void add() throws Exception {
            ZookeeperLock zookeeperLock = new ZookeeperLock();
            zookeeperLock.lock();
//            synchronized (this){
            this.num++;
//            }

            zookeeperLock.unlock();
        }

        public Integer getNum() {
            return num;
        }

        public void setNum(Integer num) {
            this.num = num;
        }
    }

    static class SumRunnable implements Runnable {

        NumTest numTest;
        CountDownLatch countDownLatch;

        public SumRunnable(NumTest numTest, CountDownLatch countDownLatch) {
            this.numTest = numTest;
            this.countDownLatch = countDownLatch;
        }

        @SneakyThrows
        @Override
        public void run() {
            for (int i = 0; i < 20; i++) {
                numTest.add();
            }
            System.out.println("执行完成 " + Thread.currentThread().getName());
            countDownLatch.countDown();
        }
    }
}

二、服务注册以及发现

2.1 思路

1)服务启动时用自己的服务名称创建一个有序临时节点,其值为自己的调用路径

2)服务异常终止后这个节点会自动删掉

3)消费者启动后监听服务路径

2.2 实现

服务提供者:

public class DistributeServer {
    ZooKeeper zooKeeper = null;
    private String connectString = "127.0.0.1:2181";
    private String serverRootPath = "/servers";
    private String serverName = "/test-server";
    private String serverUrl = "127.0.0.1:8080";

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeServer distributeServer = new DistributeServer();
        // 获取连接
        distributeServer.getConnect();
        // 注册服务
        distributeServer.register();
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }

    /**
     * 注册服务
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    private void register() throws KeeperException, InterruptedException {
        // 利用临时的顺序节点存储服务信息,服务断开后会自动删除服务
        zooKeeper.create(serverRootPath + serverName, serverUrl.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    private void getConnect() throws IOException {
        zooKeeper = new ZooKeeper(connectString, 2000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });

    }
}

消费者:

public class DistributeClient {
    ZooKeeper zooKeeper = null;
    private String connectString = "127.0.0.1:2181";
    private String serverRootPath = "/servers";
    private Map<String, Set<String>> serverUrlsMap = null;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeClient distributeClient = new DistributeClient();
        // 获取连接
        distributeClient.getConnect();
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }

    /**
     * 获取所有服务
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    private void listServers() throws KeeperException, InterruptedException {
        System.out.println("---------监听所有服务开始---------");
        serverUrlsMap = new HashMap<>();
        List<String> children = zooKeeper.getChildren(serverRootPath, true);
        for (String node : children) {
            String serverName = node.substring(0, node.length() - 10);
            byte[] data = zooKeeper.getData(serverRootPath + "/" + node, false, null);
            Set<String> urls = serverUrlsMap.get(serverName);
            if (urls == null) {
                urls = new HashSet<>();
            }
            urls.add(Strings.fromByteArray(data));
            serverUrlsMap.put(serverName, urls);
        }
        System.out.println("所有服务:" + serverUrlsMap.toString());

        System.out.println("---------监听所有服务结束---------");
    }

    /**
     * 获取连接
     *
     * @throws IOException
     */
    private void getConnect() throws IOException {
        // 通过watch监听服务
        zooKeeper = new ZooKeeper(connectString, 2000, new Watcher() {
            @SneakyThrows
            @Override
            public void process(WatchedEvent watchedEvent) {
                listServers();
            }
        });
    }
}