激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術(shù)及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 基于ZooKeeper實現(xiàn)隊列源碼

基于ZooKeeper實現(xiàn)隊列源碼

2021-01-05 11:11MassiveStars Java教程

這篇文章主要介紹了基于ZooKeeper實現(xiàn)隊列源碼的相關(guān)內(nèi)容,包括其實現(xiàn)原理和應用場景,以及對隊列的簡單介紹,具有一定參考價值,需要的朋友可以了解下。

實現(xiàn)原理

先進先出隊列是最常用的隊列,使用Zookeeper實現(xiàn)先進先出隊列就是在特定的目錄下創(chuàng)建PERSISTENT_EQUENTIAL節(jié)點,創(chuàng)建成功時Watcher通知等待的隊列,隊列刪除序列號最小的節(jié)點用以消費。此場景下Zookeeper的znode用于消息存儲,znode存儲的數(shù)據(jù)就是消息隊列中的消息內(nèi)容,SEQUENTIAL序列號就是消息的編號,按序取出即可。由于創(chuàng)建的節(jié)點是持久化的,所以不必擔心隊列消息的丟失問題。

隊列(Queue)

分布式隊列是通用的數(shù)據(jù)結(jié)構(gòu),為了在 Zookeeper 中實現(xiàn)分布式隊列,首先需要指定一個 Znode 節(jié)點作為隊列節(jié)點(queue node), 各個分布式客戶端通過調(diào)用 create() 函數(shù)向隊列中放入數(shù)據(jù),調(diào)用create()時節(jié)點路徑名帶"qn-"結(jié)尾,并設置順序(sequence)節(jié)點標志。 由于設置了節(jié)點的順序標志,新的路徑名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增號。需要從隊列中獲取數(shù)據(jù)/移除數(shù)據(jù)的客戶端首先調(diào)用 getChildren() 函數(shù),有數(shù)據(jù)則獲取(獲取數(shù)據(jù)后可以刪除也可以不刪),沒有則在隊列節(jié)點(queue node)上將 watch 設置為 true,等待觸發(fā)并處理最小序號的節(jié)點(即從序號最小的節(jié)點中取數(shù)據(jù))。

應用場景

Zookeeper隊列不太適合要求高性能的場合,但可以在數(shù)據(jù)量不大的情況下考慮使用。比如已在項目中使用Zookeeper又需要小規(guī)模的隊列應用,這時可以使用Zookeeper實現(xiàn)的隊列;畢竟引進一個消息中間件會增加系統(tǒng)的復雜性和運維的壓力。

詳細代碼

ZookeeperClient工具類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package org.massive.common;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * Created by Massive on 2016/12/18.
 */
public class ZooKeeperClient {
 private static String connectionString = "localhost:2181";
 private static int sessionTimeout = 10000;
 public static ZooKeeper getInstance() throws IOException, InterruptedException {
 //--------------------------------------------------------------
 // 為避免連接還未完成就執(zhí)行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)
 // 這里等Zookeeper的連接完成才返回實例
 //--------------------------------------------------------------
 final CountDownLatch connectedSignal = new CountDownLatch(1);
 ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
  @Override
  public void process(WatchedEvent event) {
   if (event.getState() == Event.KeeperState.SyncConnected) {
   connectedSignal.countDown();
   } else if (event.getState() == Event.KeeperState.Expired) {
   }
  }
  });
 connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
 return zk;
 }
 public static int getSessionTimeout() {
 return sessionTimeout;
 }
 public static void setSessionTimeout(int sessionTimeout) {
 ZooKeeperClient.sessionTimeout = sessionTimeout;
 }
}

ZooKeeperQueue

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package org.massive.queue;
import org.apache.commons.lang3.RandomUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
/**
 * Created by Allen on 2016/12/22.
 */
public class ZooKeeperQueue {
 private ZooKeeper zk;
 private int sessionTimeout;
 private static byte[] ROOT_QUEUE_DATA = {0x12,0x34};
 private static String QUEUE_ROOT = "/QUEUE";
 private String queueName;
 private String queuePath;
 private Object mutex = new Object();
 public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException {
 this.queueName = queueName;
 this.queuePath = QUEUE_ROOT + "/" + queueName;
 this.zk = ZooKeeperClient.getInstance();
 this.sessionTimeout = zk.getSessionTimeout();
 //----------------------------------------------------
 // 確保隊列根目錄/QUEUE和當前隊列的目錄的存在
 //----------------------------------------------------
 ensureExists(QUEUE_ROOT);
 ensureExists(queuePath);
 }
 public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException {
 List<String> nodes = null;
 byte[] returnVal = null;
 Stat stat = null;
 do {
  synchronized (mutex) {
  nodes = zk.getChildren(queuePath, new ProduceWatcher());
  //----------------------------------------------------
  // 如果沒有消息節(jié)點,等待生產(chǎn)者的通知
  //----------------------------------------------------
  if (nodes == null || nodes.size() == 0) {
   mutex.wait();
  } else {
   SortedSet<String> sortedNode = new TreeSet<String>();
   for (String node : nodes) {
   sortedNode.add(queuePath + "/" + node);
   }
   //----------------------------------------------------
   // 消費隊列里序列號最小的消息
   //----------------------------------------------------
   String first = sortedNode.first();
   returnVal = zk.getData(first, false, stat);
   zk.delete(first, -1);
   System.out.print(Thread.currentThread().getName() + " ");
   System.out.print("consume a message from queue:" + first);
   System.out.println(", message data is: " + new String(returnVal,"UTF-8"));
   return returnVal;
  }
  }
 } while (true);
 }
 class ProduceWatcher implements Watcher {
 @Override
 public void process(WatchedEvent event) {
  //----------------------------------------------------
  // 生產(chǎn)一條消息成功后通知一個等待線程
  //----------------------------------------------------
  synchronized (mutex) {
  mutex.notify();
  }
 }
 }
 public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException {
 //----------------------------------------------------
 // 確保當前隊列目錄存在
 // example: /QUEUE/queueName
 //----------------------------------------------------
 ensureExists(queuePath);
 String node = zk.create(queuePath + "/", data,
  ZooDefs.Ids.OPEN_ACL_UNSAFE,
  CreateMode.PERSISTENT_SEQUENTIAL);
 System.out.print(Thread.currentThread().getName() + " ");
 System.out.print("produce a message to queue:" + node);
 System.out.println(" , message data is: " + new String(data,"UTF-8"));
 }
 public void ensureExists(String path) {
 try {
  Stat stat = zk.exists(path, false);
  if (stat == null) {
  zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  }
 } catch (KeeperException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
 }
 public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
 String queueName = "test";
 final ZooKeeperQueue queue = new ZooKeeperQueue(queueName);
 for (int i = 0; i < 10; i++) {
  new Thread(new Runnable() {
  @Override
  public void run() {
   try {
   queue.consume();
   System.out.println("--------------------------------------------------------");
   System.out.println();
   } catch (InterruptedException e) {
   e.printStackTrace();
   } catch (KeeperException e) {
   e.printStackTrace();
   } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
   }
  }
  }).start();
 }
 new Thread(new Runnable() {
  @Override
  public void run() {
  for (int i = 0; i < 10; i++) {
   try {
   Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i));
   queue.produce(("massive" + i).getBytes());
   } catch (InterruptedException e) {
   e.printStackTrace();
   } catch (KeeperException e) {
   e.printStackTrace();
   } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
   }
  }
  }
 },"Produce-thread").start();
 }
}

測試

運行main方法,本機器的某次輸出結(jié)果

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0
Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1
Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2
Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3
Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4
Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5
Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6
Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7
Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8
Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9
Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9

總結(jié)

以上就是本文有關(guān)于隊列和基于ZooKeeper實現(xiàn)隊列源碼介紹的全部內(nèi)容,希望對大家有所幫助。

感謝朋友們對本站的支持!

原文鏈接:https://www.2cto.com/kf/201612/582914.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: ⅴideo裸体秀hd | 成年人在线免费播放视频 | 国产视频精品在线 | 成人免费一区二区三区 | 香蕉国产在线视频 | 久久久国产精品网站 | 欧美日韩亚洲成人 | 成人在线视频播放 | 精品国产乱码久久久久久久久 | 亚洲伊人色欲综合网 | 伊人在线 | 国产99久久久久久免费看 | 蜜桃一本色道久久综合亚洲精品冫 | 国产精品6区 | 日本a在线观看 | 久久骚 | 国产精品久久久久久久午夜片 | 羞羞视频免费网站男男 | 免费福利在线视频 | 毛片a片免费看 | 国产免费www| 午夜精品视频免费观看 | 中国女警察一级毛片视频 | 成人啪啪18免费网站 | 中文字幕一二区 | 国产91在线免费 | 亚州精品天堂中文字幕 | 亚洲男人天堂 | 国产视频aa| 精品亚洲一区二区三区 | 黄色av免费电影 | 日韩视频在线观看免费视频 | 国产一级二级毛片 | 欧美精品久久久久久久久久 | 午夜久久久精品一区二区三区 | 护士hd欧美free性xxxx | 国产精品1区2区在线观看 | 亚洲一二区视频 | 天堂在线中文资源 | 羞羞的| 欧美日韩在线中文字幕 |