激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Netty + ZooKeeper 實現簡單的服務注冊與發現

Netty + ZooKeeper 實現簡單的服務注冊與發現

2019-07-08 12:51fengzhizi715 Java教程

服務注冊和發現一直是分布式的核心組件。本文介紹了借助 ZooKeeper 做注冊中心,如何實現一個簡單的服務注冊和發現。,需要的朋友可以參考下

一. 背景

最近的一個項目:我們的系統接收到上游系統的派單任務后,會推送到指定的門店的相關設備,并進行相應的業務處理。

二. Netty 的使用

在接收到派單任務之后,通過 Netty 推送到指定門店相關的設備。在我們的系統中 Netty 實現了消息推送、長連接以及心跳機制。

Netty + ZooKeeper 實現簡單的服務注冊與發現

2.1 Netty Server 端:

每個 Netty 服務端通過 ConcurrentHashMap 保存了客戶端的 clientId 以及它連接的 SocketChannel。

服務器端向客戶端發送消息時,只要獲取 clientId 對應的 SocketChannel,往 SocketChannel 里寫入相應的 message 即可。

EventLoopGroup boss = new NioEventLoopGroup(1);
  EventLoopGroup worker = new NioEventLoopGroup();
  ServerBootstrap bootstrap = new ServerBootstrap();
  bootstrap.group(boss, worker)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .option(ChannelOption.TCP_NODELAY, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer() {
     @Override
     protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      p.addLast(new MessageEncoder());
      p.addLast(new MessageDecoder());
      p.addLast(new PushServerHandler());
     }
    });
  ChannelFuture future = bootstrap.bind(host,port).sync();
  if (future.isSuccess()) {
   logger.info("server start...");
  }

2.2 Netty Client 端:

客戶端用于接收服務端的消息,隨即進行業務處理??蛻舳诉€有心跳機制,它通過 IdleEvent 事件定時向服務端放送 Ping 消息以此來檢測 SocketChannel 是否中斷。

public PushClientBootstrap(String host, int port) throws InterruptedException {
  this.host = host;
  this.port = port;
  start(host,port);
 }
 private void start(String host, int port) throws InterruptedException {
  bootstrap = new Bootstrap();
  bootstrap.channel(NioSocketChannel.class)
    .option(ChannelOption.SO_KEEPALIVE, true)
    .group(workGroup)
    .remoteAddress(host, port)
    .handler(new ChannelInitializer(){
     @Override
     protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      p.addLast(new IdleStateHandler(20, 10, 0)); // IdleStateHandler 用于檢測心跳
      p.addLast(new MessageDecoder());
      p.addLast(new MessageEncoder());
      p.addLast(new PushClientHandler());
     }
    });
  doConnect(port, host);
 }
 /**
  * 建立連接,并且可以實現自動重連.
  * @param port port.
  * @param host host.
  * @throws InterruptedException InterruptedException.
  */
 private void doConnect(int port, String host) throws InterruptedException {
  if (socketChannel != null && socketChannel.isActive()) {
   return;
  }
  final int portConnect = port;
  final String hostConnect = host;
  ChannelFuture future = bootstrap.connect(host, port);
  future.addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture futureListener) throws Exception {
    if (futureListener.isSuccess()) {
     socketChannel = (SocketChannel) futureListener.channel();
     logger.info("Connect to server successfully!");
    } else {
     logger.info("Failed to connect to server, try connect after 10s");
     futureListener.channel().eventLoop().schedule(new Runnable() {
      @Override
      public void run() {
       try {
        doConnect(portConnect, hostConnect);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      }
     }, 10, TimeUnit.SECONDS);
    }
   }
  }).sync();
 }

三. 借助 ZooKeeper 實現簡單的服務注冊與發現

3.1 服務注冊

服務注冊本質上是為了解耦服務提供者和服務消費者。服務注冊是一個高可用強一致性的服務發現存儲倉庫,主要用來存儲服務的api和地址對應關系。為了高可用,服務注冊中心一般為一個集群,并且能夠保證分布式一致性。目前常用的有 ZooKeeper、Etcd 等等。

在我們項目中采用了 ZooKeeper 實現服務注冊。

public class ServiceRegistry {
 private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
 private CountDownLatch latch = new CountDownLatch(1);
 private String registryAddress;
 public ServiceRegistry(String registryAddress) {
  this.registryAddress = registryAddress;
 }
 public void register(String data) {
  if (data != null) {
   ZooKeeper zk = connectServer();
   if (zk != null) {
    createNode(zk, data);
   }
  }
 }
 /**
  * 連接 zookeeper 服務器
  * @return
  */
 private ZooKeeper connectServer() {
  ZooKeeper zk = null;
  try {
   zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
     if (event.getState() == Event.KeeperState.SyncConnected) {
      latch.countDown();
     }
    }
   });
   latch.await();
  } catch (IOException | InterruptedException e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 創建節點
  * @param zk
  * @param data
  */
 private void createNode(ZooKeeper zk, String data) {
  try {
   byte[] bytes = data.getBytes();
   String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
   logger.debug("create zookeeper node ({} => {})", path, data);
  } catch (KeeperException | InterruptedException e) {
   logger.error("", e);
  }
 }
}

有了服務注冊,在 Netty 服務端啟動之后,將 Netty 服務端的 ip 和 port 注冊到 ZooKeeper。

EventLoopGroup boss = new NioEventLoopGroup(1);
  EventLoopGroup worker = new NioEventLoopGroup();
  ServerBootstrap bootstrap = new ServerBootstrap();
  bootstrap.group(boss, worker)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 128)
    .option(ChannelOption.TCP_NODELAY, true)
    .childOption(ChannelOption.SO_KEEPALIVE, true)
    .childHandler(new ChannelInitializer() {
     @Override
     protected void initChannel(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      p.addLast(new MessageEncoder());
      p.addLast(new MessageDecoder());
      p.addLast(new PushServerHandler());
     }
    });
  ChannelFuture future = bootstrap.bind(host,port).sync();
  if (future.isSuccess()) {
   logger.info("server start...");
  }
  if (serviceRegistry != null) {
   serviceRegistry.register(host + ":" + port);
  }

3.2 服務發現

這里我們采用的是客戶端的服務發現,即服務發現機制由客戶端實現。

客戶端在和服務端建立連接之前,通過查詢注冊中心的方式來獲取服務端的地址。如果存在有多個 Netty 服務端的話,可以做服務的負載均衡。在我們的項目中只采用了簡單的隨機法進行負載。

public class ServiceDiscovery {
 private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);
 private CountDownLatch latch = new CountDownLatch(1);
 private volatile List<String> serviceAddressList = new ArrayList<>();
 private String registryAddress; // 注冊中心的地址
 public ServiceDiscovery(String registryAddress) {
  this.registryAddress = registryAddress;
  ZooKeeper zk = connectServer();
  if (zk != null) {
   watchNode(zk);
  }
 }
 /**
  * 通過服務發現,獲取服務提供方的地址
  * @return
  */
 public String discover() {
  String data = null;
  int size = serviceAddressList.size();
  if (size > 0) {
   if (size == 1) { //只有一個服務提供方
    data = serviceAddressList.get(0);
    logger.info("unique service address : {}", data);
   } else {   //使用隨機分配法。簡單的負載均衡法
    data = serviceAddressList.get(ThreadLocalRandom.current().nextInt(size));
    logger.info("choose an address : {}", data);
   }
  }
  return data;
 }
 /**
  * 連接 zookeeper
  * @return
  */
 private ZooKeeper connectServer() {
  ZooKeeper zk = null;
  try {
   zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
     if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
      latch.countDown();
     }
    }
   });
   latch.await();
  } catch (IOException | InterruptedException e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 獲取服務地址列表
  * @param zk
  */
 private void watchNode(final ZooKeeper zk) {
  try {
   //獲取子節點列表
   List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY_PATH, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
     if (event.getType() == Event.EventType.NodeChildrenChanged) {
      //發生子節點變化時再次調用此方法更新服務地址
      watchNode(zk);
     }
    }
   });
   List<String> dataList = new ArrayList<>();
   for (String node : nodeList) {
    byte[] bytes = zk.getData(Constants.ZK_REGISTRY_PATH + "/" + node, false, null);
    dataList.add(new String(bytes));
   }
   logger.debug("node data: {}", dataList);
   this.serviceAddressList = dataList;
  } catch (KeeperException | InterruptedException e) {
   logger.error("", e);
  }
 }
}

Netty 客戶端啟動之后,通過服務發現獲取 Netty 服務端的 ip 和 port。

/**
  * 支持通過服務發現來獲取 Socket 服務端的 host、port
  * @param discoveryAddress
  * @throws InterruptedException
  */
 public PushClientBootstrap(String discoveryAddress) throws InterruptedException {

  serviceDiscovery = new ServiceDiscovery(discoveryAddress);
  serverAddress = serviceDiscovery.discover();

  if (serverAddress!=null) {
   String[] array = serverAddress.split(":");
   if (array!=null && array.length==2) {

    String host = array[0];
    int port = Integer.parseInt(array[1]);

    start(host,port);
   }
  }
 }

四. 總結

服務注冊和發現一直是分布式的核心組件。本文介紹了借助 ZooKeeper 做注冊中心,如何實現一個簡單的服務注冊和發現。其實,注冊中心的選擇有很多,例如 Etcd、Eureka 等等。選擇符合我們業務需求的才是最重要的。

以上所述是小編給大家介紹的Netty + ZooKeeper 實現簡單的服務注冊與發現,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
如果你覺得本文對你有幫助,歡迎轉載,煩請注明出處,謝謝!

延伸 · 閱讀

精彩推薦
  • Java教程Java之Springcloud Feign組件詳解

    Java之Springcloud Feign組件詳解

    這篇文章主要介紹了Java之Springcloud Feign組件詳解,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內容,需要的朋友可以參考下...

    深情以改10322021-11-12
  • Java教程淺談Java(SpringBoot)基于zookeeper的分布式鎖實現

    淺談Java(SpringBoot)基于zookeeper的分布式鎖實現

    這篇文章主要介紹了Java(SpringBoot)基于zookeeper的分布式鎖實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的...

    LJY_SUPER5742021-07-21
  • Java教程java 中鎖的性能提高辦法

    java 中鎖的性能提高辦法

    這篇文章主要介紹了java 中鎖的性能提高辦法的相關資料,需要的朋友可以參考下...

    Java之家3092020-08-13
  • Java教程JAVA中通過自定義注解進行數據驗證的方法

    JAVA中通過自定義注解進行數據驗證的方法

    java 自定義注解驗證可自己添加所需要的注解,下面這篇文章主要給大家介紹了關于JAVA中通過自定義注解進行數據驗證的相關資料,文中通過示例代碼介紹...

    Decouple6362021-05-25
  • Java教程JavaWeb 實現驗證碼功能(demo)

    JavaWeb 實現驗證碼功能(demo)

    在 WEB-APP 中一般應用于:登錄、注冊、買某票、秒殺等場景,大家都接觸過這個驗證碼操作,今天小編通過實例代碼給大家講解javaweb實現驗證碼功能,需要...

    java教程網12832020-08-05
  • Java教程SpringBoot引入Thymeleaf的實現方法

    SpringBoot引入Thymeleaf的實現方法

    這篇文章主要介紹了SpringBoot引入Thymeleaf的實現方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下...

    Bobby6472021-07-28
  • Java教程springboot ehcache 配置使用方法代碼詳解

    springboot ehcache 配置使用方法代碼詳解

    EhCache是一個比較成熟的Java緩存框架,Springboot對ehcache的使用非常支持,所以在Springboot中只需做些配置就可使用,且使用方式也簡易,今天給大家分享spri...

    m1719309529412912021-09-16
  • Java教程Java list.remove( )方法注意事項

    Java list.remove( )方法注意事項

    這篇文章主要介紹了Java list.remove( )方法注意事項,非常簡單易懂,需要的朋友可以參考下...

    妖久9552021-05-25
主站蜘蛛池模板: 国产精品久久久久久久久粉嫩 | 色淫湿视频 | 国产精品美女久久久久久网站 | 成人毛片视频在线观看 | 思思久而久而蕉人 | 日本网站在线播放 | 一级黄色毛片播放 | 欧美一级黄视频 | 国产精品亚洲欧美一级在线 | 圆产精品久久久久久久久久久 | 成人福利电影在线观看 | 国产免费视频在线 | 中文字幕在线永久视频 | 强伦女教师视频 | 国产精品一区二区三区在线播放 | 免费黄网站在线播放 | 国产亚洲自拍一区 | 91网站免费在线观看 | 国产精品久久久久久久久久10秀 | 国产精品免费成人 | 精品国产一区在线观看 | 一区二区三区毛片 | 草莓视频久久 | 手机av免费电影 | 久久精品视频一区二区 | 肉文女配h| 电视剧全部免费观看 | 国产午夜精品一区二区三区视频 | 欧美爱爱一区二区 | 久久久国产视频 | 国产免费久久久久 | 久久久久久久久成人 | 亚洲午夜视频在线 | 成人在线视频一区 | 国产乱色精品成人免费视频 | 又黄又爽免费无遮挡在线观看 | 亚洲日韩精品欧美一区二区 | 欧美日韩国产成人在线观看 | 久久另类视频 | 天天干天天透 | 欧美18—19sex性护士中国 |