一、安装

docker run -d --name zookeeper --privileged=true -p 2181:2181 -e TZ=“Asia/Shanghai” -v /opt/zookeeper/data:/data -v /opt/zookeeper/conf:/conf -v /opt/zookeeper/logs:/datalog zookeeper:3.5.7

检查服务状态

> docker exec -it zookeeper /bin/bash ./bin/zkServer.sh status

ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone

使用本地客户端进入

docker exec -it zookeeper zkCli.sh
  • 注意关闭防火墙
> systemctl status firewalld.service

● firewalld.service - firewalld - dynamic firewall daemon
   Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
   Active: inactive (dead)
     Docs: man:firewalld(1)

二、推荐工具

  • PrettyZoo (https://github.com/vran-dev/PrettyZoo/releases)

三、编程实现

在这里插入图片描述

实现原理: 利用zk的临时顺序节点的特性,会随着后端服务的下线而自动剔除会话中的zk节点。第一步,程序启动成功后,手动注册到zk某个目录下;第二步、编写ZkListener,监听该zk目录下的节点的新增和移除事件;事件处理中,你需要把节点的内容解析出来,然后放到Map集合中,维护在线的节点列表。因为后面的代码中,很依赖于实时在线的节点列表。这里要求实时性,也就说,后端服务的下线,其他服务节点能够及时地捕捉得到,Map集合是动态且实时更新。

关于业务代码具体处理,在本文不会体现,后期会在websocket集群的实现中讲述到。

1、引入jar包

         <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>

2、ZookeeperConfig配置

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class ZookeeperConfig {
    @Value("${zk.url}")
    private String zkUrl;

    @Bean
    public CuratorFramework curatorFramework() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl, retryPolicy);
        client.start();
        return client;
    }
}

application.yml

#ZK服务配置
zk:
  url: 192.168.80.226:2181

3、ZkListener 监听节点的变化(新增和删除)

import com.alibaba.fastjson.JSONObject;
import com.xxx.ws.test.util.ThreadUtils;
import com.xxx.ws.test.zookeeper.dto.ServerNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

import static com.xhtech.ws.test.constants.Constants.MANAGE_PATH;

@Slf4j
@Service
@RequiredArgsConstructor
public class ZkListener {
    private final CuratorFramework curatorFramework;

    @PostConstruct
    public void init() throws Exception {
        // 订阅节点的增加和删除事件
        PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, MANAGE_PATH, true);
        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client,
                                   PathChildrenCacheEvent event) throws Exception {
                ChildData data = event.getData();
                switch (event.getType()) {
                    case CHILD_ADDED:
                        processAdd(data);
                        break;
                    case CHILD_REMOVED:
                        processRemove(data);
                        break;
                    case CHILD_UPDATED:
                        break;
                    default:
                        log.debug("[PathChildrenCache]节点数据为空, path={}", data == null ? "null" : data.getPath());
                        break;
                }
            }
        };
        childrenCache.getListenable().addListener(childrenCacheListener, ThreadUtils.getExecutor());
        log.info("Register zk watcher successfully!");
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    }

    /**
     * zk节点新增
     *
     * @param data
     */
    private void processAdd(ChildData data) {
        ServerNode serverNode = JSONObject.parseObject(data.getData(), ServerNode.class);

        log.info("监听器--新节点加入:{}", serverNode);
    }

    /**
     * zk节点删除
     *
     * @param data
     */
    private void processRemove(ChildData data) {
        ServerNode serverNode = JSONObject.parseObject(data.getData(), ServerNode.class);

        log.info("监听器--节点删除:{}", serverNode);
    }

}

4、ZkService 节点的注册

import com.alibaba.fastjson.JSON;
import com.xxx.ws.test.zookeeper.dto.ServerNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.List;

import static com.xxx.ws.test.constants.Constants.MANAGE_PATH;
import static com.xxx.ws.test.constants.Constants.PATH_PREFIX;

@Slf4j
@Component
@RequiredArgsConstructor
public class ZkService {
    private final CuratorFramework curatorFramework;

    public void registry(ServerNode serverNode) {
        try {
            //判断根节点是否存在
            if (this.checkNodeExists(MANAGE_PATH)) {
                this.createPersistentNode(MANAGE_PATH);
            }
            this.createNode(PATH_PREFIX, serverNode);
        } catch (Exception e) {
            log.error("操作zk出现异常", e);
        }
    }

    public boolean checkNodeExists(String path) throws Exception {
        Stat stat = curatorFramework.checkExists().forPath(path);
        return stat == null ? false : true;
    }

    public String createPersistentNode(String path) throws Exception {
        String pathRegistered = curatorFramework.create()
                .creatingParentsIfNeeded()
                .withProtection()
                .withMode(CreateMode.PERSISTENT)
                .forPath(path);
        return pathRegistered;
    }

    public String createNode(String prefix, ServerNode serverNode) throws Exception {
        byte[] payload = JSON.toJSONString(serverNode).getBytes(StandardCharsets.UTF_8);
        String pathRegistered = curatorFramework.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(prefix, payload);
        return pathRegistered;
    }

    public List<String> listNodes(String path) throws Exception {
        List<String> nodeList = curatorFramework.getChildren().forPath(path);
        return nodeList;
    }

    public void removeNode(String path) throws Exception {
        curatorFramework.delete().forPath(path);
    }
}

5、ServerNode实体

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class ServerNode {
    //worker 的mac
    private String mac;

    //Netty 服务 IP
    private String host;

    //Netty 服务 端口
    private Integer port;

}

  • 常量,定义zk节点的目录与临时节点的前缀。
    public static final String MANAGE_PATH = "/im/nodes";
    public static final String PATH_PREFIX = MANAGE_PATH + "/seq-";

测试入口

import cn.hutool.core.date.DateUtil;
import com.xxx.ws.test.zookeeper.ZkService;
import com.xxx.ws.test.zookeeper.dto.ServerNode;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class ZookeeperApiController {
    private final ZkService zkService;

    @GetMapping("/registry/{host}")
    public ResponseEntity<?> registry(@PathVariable(value = "host") String host) {
        zkService.registry(ServerNode.builder()
                .host(host)
                .build());
        return ResponseEntity.ok(DateUtil.now());
    }
}

四、测试结果

除了主动注册外,随着服务节点的下线,zk会自动删除节点。这里重点看ZkListener的增与删的方法。

这里以两个节点的启动为例。下面看到的日志所在节点是192.168.8.28,另外一个节点是192.168.8.18。

2023-04-18 17:31:14.277  INFO 26793 --- [           main] c.x.w.t.z.registration.ZkListener        : Register zk watcher successfully!

// 因为zk目录下,已有一个节点,故在应用重启的时候,就检测到有新节点。
2023-04-18 17:31:14.465  INFO 26793 --- [pool-4-thread-2] c.x.w.t.z.registration.ZkListener        : 监听器--新节点加入:ServerNode(mac=null, host=192.168.8.18, port=8889)
2023-04-18 17:31:14.962  INFO 26793 --- [           main] c.x.ws.test.WebsocketTestApplication     : Started WebsocketTestApplication in 5.947 seconds (JVM running for 8.067)


// 本节点作为新节点,加入到zk的目录下
2023-04-18 17:31:54.840  INFO 26793 --- [pool-4-thread-4] c.x.w.t.z.registration.ZkListener        : 监听器--新节点加入:ServerNode(mac=null, host=192.168.8.28, port=8889)

// 接下来,测试另一个节点的停和启服务,可以看到,本节点都可以监听得到它的下线和上线。
2023-04-18 17:32:08.055  INFO 26793 --- [pool-4-thread-5] c.x.w.t.z.registration.ZkListener        : 监听器--节点删除:ServerNode(mac=null, host=192.168.8.18, port=8889)
2023-04-18 17:32:52.655  INFO 26793 --- [pool-4-thread-6] c.x.w.t.z.registration.ZkListener        : 监听器--新节点加入:ServerNode(mac=null, host=192.168.8.18, port=8889)

在这里插入图片描述

在这里插入图片描述

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐