實現(xiàn)原理
先進先出隊列是最常用的隊列,使用Zookeeper實現(xiàn)先進先出隊列就是在特定的目錄下創(chuàng)建PERSISTENT_EQUENTIAL節(jié)點,創(chuàng)建成功時Watcher通知等待的隊列,隊列刪除序列號最小的節(jié)點用以消費。此場景下Zookeeper的znode用于消息存儲,znode存儲的數(shù)據(jù)就是消息隊列中的消息內(nèi)容,SEQUENTIAL序列號就是消息的編號,按序取出即可。由于創(chuàng)建的節(jié)點是持久化的,所以不必擔心隊列消息的丟失問題。
隊列(Queue)
分布式隊列是通用的數(shù)據(jù)結(jié)構(gòu),為了在 Zookeeper 中實現(xiàn)分布式隊列,首先需要指定一個 Znode 節(jié)點作為隊列節(jié)點(queue node), 各個分布式客戶端通過調(diào)用 create() 函數(shù)向隊列中放入數(shù)據(jù),調(diào)用create()時節(jié)點路徑名帶"qn-"結(jié)尾,并設置順序(sequence)節(jié)點標志。 由于設置了節(jié)點的順序標志,新的路徑名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增號。需要從隊列中獲取數(shù)據(jù)/移除數(shù)據(jù)的客戶端首先調(diào)用 getChildren() 函數(shù),有數(shù)據(jù)則獲取(獲取數(shù)據(jù)后可以刪除也可以不刪),沒有則在隊列節(jié)點(queue node)上將 watch 設置為 true,等待觸發(fā)并處理最小序號的節(jié)點(即從序號最小的節(jié)點中取數(shù)據(jù))。
應用場景
Zookeeper隊列不太適合要求高性能的場合,但可以在數(shù)據(jù)量不大的情況下考慮使用。比如已在項目中使用Zookeeper又需要小規(guī)模的隊列應用,這時可以使用Zookeeper實現(xiàn)的隊列;畢竟引進一個消息中間件會增加系統(tǒng)的復雜性和運維的壓力。
詳細代碼
ZookeeperClient工具類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package org.massive.common; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * Created by Massive on 2016/12/18. */ public class ZooKeeperClient { private static String connectionString = "localhost:2181" ; private static int sessionTimeout = 10000 ; public static ZooKeeper getInstance() throws IOException, InterruptedException { //-------------------------------------------------------------- // 為避免連接還未完成就執(zhí)行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss) // 這里等Zookeeper的連接完成才返回實例 //-------------------------------------------------------------- final CountDownLatch connectedSignal = new CountDownLatch( 1 ); ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { connectedSignal.countDown(); } else if (event.getState() == Event.KeeperState.Expired) { } } }); connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS); return zk; } public static int getSessionTimeout() { return sessionTimeout; } public static void setSessionTimeout( int sessionTimeout) { ZooKeeperClient.sessionTimeout = sessionTimeout; } } |
ZooKeeperQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
package org.massive.queue; import org.apache.commons.lang3.RandomUtils; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.massive.common.ZooKeeperClient; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; /** * Created by Allen on 2016/12/22. */ public class ZooKeeperQueue { private ZooKeeper zk; private int sessionTimeout; private static byte [] ROOT_QUEUE_DATA = { 0x12 , 0x34 }; private static String QUEUE_ROOT = "/QUEUE" ; private String queueName; private String queuePath; private Object mutex = new Object(); public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException { this .queueName = queueName; this .queuePath = QUEUE_ROOT + "/" + queueName; this .zk = ZooKeeperClient.getInstance(); this .sessionTimeout = zk.getSessionTimeout(); //---------------------------------------------------- // 確保隊列根目錄/QUEUE和當前隊列的目錄的存在 //---------------------------------------------------- ensureExists(QUEUE_ROOT); ensureExists(queuePath); } public byte [] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException { List<String> nodes = null ; byte [] returnVal = null ; Stat stat = null ; do { synchronized (mutex) { nodes = zk.getChildren(queuePath, new ProduceWatcher()); //---------------------------------------------------- // 如果沒有消息節(jié)點,等待生產(chǎn)者的通知 //---------------------------------------------------- if (nodes == null || nodes.size() == 0 ) { mutex.wait(); } else { SortedSet<String> sortedNode = new TreeSet<String>(); for (String node : nodes) { sortedNode.add(queuePath + "/" + node); } //---------------------------------------------------- // 消費隊列里序列號最小的消息 //---------------------------------------------------- String first = sortedNode.first(); returnVal = zk.getData(first, false , stat); zk.delete(first, - 1 ); System.out.print(Thread.currentThread().getName() + " " ); System.out.print( "consume a message from queue:" + first); System.out.println( ", message data is: " + new String(returnVal, "UTF-8" )); return returnVal; } } } while ( true ); } class ProduceWatcher implements Watcher { @Override public void process(WatchedEvent event) { //---------------------------------------------------- // 生產(chǎn)一條消息成功后通知一個等待線程 //---------------------------------------------------- synchronized (mutex) { mutex.notify(); } } } public void produce( byte [] data) throws KeeperException, InterruptedException, UnsupportedEncodingException { //---------------------------------------------------- // 確保當前隊列目錄存在 // example: /QUEUE/queueName //---------------------------------------------------- ensureExists(queuePath); String node = zk.create(queuePath + "/" , data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.print(Thread.currentThread().getName() + " " ); System.out.print( "produce a message to queue:" + node); System.out.println( " , message data is: " + new String(data, "UTF-8" )); } public void ensureExists(String path) { try { Stat stat = zk.exists(path, false ); if (stat == null ) { zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String queueName = "test" ; final ZooKeeperQueue queue = new ZooKeeperQueue(queueName); for ( int i = 0 ; i < 10 ; i++) { new Thread( new Runnable() { @Override public void run() { try { queue.consume(); System.out.println( "--------------------------------------------------------" ); System.out.println(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }).start(); } new Thread( new Runnable() { @Override public void run() { for ( int i = 0 ; i < 10 ; i++) { try { Thread.sleep(RandomUtils.nextInt( 100 * i, 200 * i)); queue.produce(( "massive" + i).getBytes()); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } }, "Produce-thread" ).start(); } } |
測試
運行main方法,本機器的某次輸出結(jié)果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
Produce-thread produce a message to queue:/QUEUE/test/ 0000000000 , message data is: massive0 Thread- 8 consume a message from queue:/QUEUE/test/ 0000000000 , message data is: massive0 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/ 0000000001 , message data is: massive1 Thread- 6 consume a message from queue:/QUEUE/test/ 0000000001 , message data is: massive1 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/ 0000000002 , message data is: massive2 Thread- 3 consume a message from queue:/QUEUE/test/ 0000000002 , message data is: massive2 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/ 0000000003 , message data is: massive3 Thread- 0 consume a message from queue:/QUEUE/test/ 0000000003 , message data is: massive3 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/ 0000000004 , message data is: massive4 Thread- 5 consume a message from queue:/QUEUE/test/ 0000000004 , message data is: massive4 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/ 0000000005 , message data is: massive5 Thread- 2 consume a message from queue:/QUEUE/test/ 0000000005 , message data is: massive5 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/ 0000000006 , message data is: massive6 Thread- 4 consume a message from queue:/QUEUE/test/ 0000000006 , message data is: massive6 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/ 0000000007 , message data is: massive7 Thread- 9 consume a message from queue:/QUEUE/test/ 0000000007 , message data is: massive7 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/ 0000000008 , message data is: massive8 Thread- 7 consume a message from queue:/QUEUE/test/ 0000000008 , message data is: massive8 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/ 0000000009 , message data is: massive9 Thread- 1 consume a message from queue:/QUEUE/test/ 0000000009 , message data is: massive9 |
總結(jié)
以上就是本文有關(guān)于隊列和基于ZooKeeper實現(xiàn)隊列源碼介紹的全部內(nèi)容,希望對大家有所幫助。
感謝朋友們對本站的支持!
原文鏈接:https://www.2cto.com/kf/201612/582914.html