激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - SpringBoot整合分布式消息平臺Pulsar

SpringBoot整合分布式消息平臺Pulsar

2022-01-10 23:00君哥聊技術(shù)朱晉君 Java教程

從 SpringBoot 整合 Java 客戶端使用來看,Pulsar 的 api 是非常友好的,使用起來方便簡潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。

SpringBoot整合分布式消息平臺Pulsar

大家好,我是君哥。

作為優(yōu)秀的消息流平臺,Pulsar 的使用越來越多,這篇文章講解 Pulsar 的 Java 客戶端。

部署 Pulsar

Pulsar 的部署方式主要有 3 種,本地安裝二進(jìn)制文件、docker 部署、在 Kubernetes 上部署。

本文采用 docker 部署一個單節(jié)點的 Pulsar 集群。實驗環(huán)境是 2 核 CPU 和 4G 內(nèi)存。

部署命令如下:

  1. docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone 

安裝過程可能會出現(xiàn)下面的錯誤:

  1. unknown flag: --mount 
  2. See 'docker run --help'

這是因為 docker 版本低,不支持 mount 參數(shù),把 docker 版本升級到 17.06 以上就可以了。

部署過程中可能會因為網(wǎng)絡(luò)的原因失敗,多試幾次就可以成功了。如果看到下面的日志,就說明啟動成功了。

  1. 2022-01-08T22:27:58,726+0000 [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone 

本地單節(jié)點集群啟動后,會創(chuàng)建一個 namespace,名字叫 public/default

Pulsar 客戶端

目前 Pulsar 支持多種語言的客戶端,包括:

Java 客戶端Go 客戶端Python 客戶端C++ 客戶端Node.js 客戶端WebSocket 客戶端C# 客戶端

SpringBoot 配置

使用 SpringBoot 整合 Pulsar 客戶端,首先引入 Pulsar 客戶端依賴,代碼如下:

  1. <dependency> 
  2.     <groupId>org.apache.pulsar</groupId> 
  3.     <artifactId>pulsar-client</artifactId> 
  4.     <version>2.9.1</version> 
  5. </dependency> 

然后在 properties 文件中添加配置:

  1. # Pulsar 地址 
  2. pulsar.url=pulsar://192.168.59.155:6650 
  3. # topic 
  4. pulsar.topic=testTopic 
  5. # consumer group 
  6. pulsar.subscription=topicGroup 

創(chuàng)建 Client

創(chuàng)建客戶端非常簡單,代碼如下:

  1. client = PulsarClient.builder() 
  2.                 .serviceUrl(url) 
  3.                 .build(); 

上面的 url 就是 properties 文件中定義的 pulsar.url 。

創(chuàng)建 Client 時,即使集群沒有啟成功,程序也不會報錯,因為這時還沒有真正地去連接集群。

創(chuàng)建 Producer

  1. producer = client.newProducer() 
  2.                 .topic(topic) 
  3.                 .compressionType(CompressionType.LZ4) 
  4.                 .sendTimeout(0, TimeUnit.SECONDS) 
  5.                 .enableBatching(true
  6.                 .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) 
  7.                 .batchingMaxMessages(1000) 
  8.                 .maxPendingMessages(1000) 
  9.                 .blockIfQueueFull(true
  10.                 .roundRobinRouterBatchingPartitionSwitchFrequency(10) 
  11.                 .batcherBuilder(BatcherBuilder.DEFAULT
  12.                 .create(); 

創(chuàng)建 Producer,會真正的連接集群,這時如果集群有問題,就會報連接錯誤。

下面解釋一下創(chuàng)建 Producer 的參數(shù):

topic:Producer 要寫入的 topic。

compressionType:壓縮策略,目前支持 4 種策略 (NONE、LZ4、ZLIB、ZSTD),從 Pulsar2.3 開始,只有 Consumer 的版本在 2.3 以上,這個策略才會生效。

sendTimeout:超時時間,如果 Producer 在超時時間為收到 ACK,會進(jìn)行重新發(fā)送。

enableBatching:是否開啟消息批量處理,這里默認(rèn) true,這個參數(shù)只有在異步發(fā)送 (sendAsync) 時才能生效,選擇同步發(fā)送會失效。

batchingMaxPublishDelay:批量發(fā)送消息的時間段,這里定義的是 10ms,需要注意的是,設(shè)置了批量時間,就不會受消息數(shù)量的影響。批量發(fā)送會把要發(fā)送的批量消息放在一個網(wǎng)絡(luò)包里發(fā)送出去,減少網(wǎng)絡(luò) IO 次數(shù),大大提高網(wǎng)卡的發(fā)送效率。

batchingMaxMessages:批量發(fā)送消息的最大數(shù)量。

maxPendingMessages:等待從 broker 接收 ACK 的消息隊列最大長度。如果這個隊列滿了,producer 所有的 sendAsync 和 send 都會失敗,除非設(shè)置了 blockIfQueueFull 值是 true。

blockIfQueueFull:Producer 發(fā)送消息時會把消息先放入本地 Queue 緩存,如果緩存滿了,就會阻塞消息發(fā)送。

roundRobinRouterBatchingPartition-SwitchFrequency:如果發(fā)送消息時沒有指定 key,那默認(rèn)采用 round robin 的方式發(fā)送消息,使用 round robin 的方式,切換 partition 的周期是 (frequency * batchingMaxPublishDelay)。

創(chuàng)建 Consumer

Pulsar 的消費模型如下圖:

SpringBoot整合分布式消息平臺Pulsar

從圖中可以看到,Consumer 要綁定一個 subscription 才能進(jìn)行消費。

  1. consumer = client.newConsumer() 
  2.         .topic(topic) 
  3.         .subscriptionName(subscription) 
  4.         .subscriptionType(SubscriptionType.Shared) 
  5.         .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) 
  6.         .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS) 
  7.         .receiverQueueSize(1000) 
  8.         .subscribe(); 

下面解釋一下創(chuàng)建 Consumer 的參數(shù):

topic:Consumer 要訂閱的 topic。

subscriptionName:consumer 要關(guān)聯(lián)的 subscription 名字。

subscriptionType:訂閱類型,Pulsar 支持四種類型訂閱:

Exclusive:獨占模式,同一個 Topic 只能有一個消費者,如果多個消費者,就會出錯。Failover:災(zāi)備模式,同一個 Topic 可以有多個消費者,但是只能有一個消費者消費,其他消費者作為故障轉(zhuǎn)移備用,如果當(dāng)前消費者出了故障,就從備用消費者中選擇一個進(jìn)行消費。如下圖:

SpringBoot整合分布式消息平臺Pulsar

Shared:共享模式,同一個 Topic 可以由多個消費者訂閱和消費。消息通過 round robin 輪詢機(jī)制分發(fā)給不同的消費者,并且每個消息僅會被分發(fā)給一個消費者。當(dāng)消費者斷開,如果發(fā)送給它消息沒有被消費,這些消息會被重新分發(fā)給其它存活的消費者。如下圖:

SpringBoot整合分布式消息平臺Pulsar

Key_Shared:消息和消費者都會綁定一個key,消息只會發(fā)送給綁定同一個key的消費者。如果有新消費者建立連接或者有消費者斷開連接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好處是既可以讓消費者并發(fā)地消費消息,又能保證同一Key下的消息順序。如下圖:

SpringBoot整合分布式消息平臺Pulsar

subscriptionInitialPosition:創(chuàng)建新的 subscription 時從哪里開始消費,有兩個選項:

Latest:從最新的消息開始消費Earliest:從最早的消息開始消費

negativeAckRedeliveryDelay:消費失敗后間隔多久 broker 重新發(fā)送。

receiverQueueSize:在調(diào)用 receive 方法之前,最多能累積多少條消息。可以設(shè)置為 0,這樣每次只從 broker 拉取一條消息。在 Shared 模式下,receiverQueueSize 設(shè)置為 0,可以防止批量消息多發(fā)給一個 Consumer 而導(dǎo)致其他 Consumer 空閑。

Consumer 接收消息有四種方式:同步單條、同步批量、異步單條和異步批量,代碼如下:

  1. Message message = consumer.receive() 
  2. CompletableFuture<Message> message = consumer.receiveAsync(); 
  3. Messages message = consumer.batchReceive(); 
  4. CompletableFuture<Messages> message = consumer.batchReceiveAsync(); 

對于批量接收,也可以設(shè)置批量接收的策略,代碼如下:

  1. consumer = client.newConsumer() 
  2.     .topic(topic) 
  3.     .subscriptionName(subscription) 
  4.         .batchReceivePolicy(BatchReceivePolicy.builder() 
  5.         .maxNumMessages(100) 
  6.         .maxNumBytes(1024 * 1024) 
  7.         .timeout(200, TimeUnit.MILLISECONDS) 
  8.         .build()) 
  9.     .subscribe(); 

代碼中的參數(shù)說明如下:

maxNumMessages:批量接收的最大消息數(shù)量。maxNumBytes:批量接收消息的大小,這里是 1MB。

測試

首先編寫 Producer 發(fā)送消息的代碼,如下:

  1. public void sendMsg(String key, String data) { 
  2.     CompletableFuture<MessageId> future = producer.newMessage() 
  3.         .key(key
  4.         .value(data.getBytes()).sendAsync(); 
  5.     future.handle((v, ex) -> { 
  6.         if (ex == null) { 
  7.             logger.info("發(fā)送消息成功, key:{}, msg: {}"key, data); 
  8.         } else { 
  9.             logger.error("發(fā)送消息失敗, key:{}, msg: {}"key, data); 
  10.         } 
  11.         return null
  12.     }); 
  13.     future.join(); 
  14.     logger.info("發(fā)送消息完成, key:{}, msg: {}"key, data); 

然后編寫一個 Consumer 消費消息的代碼,如下:

  1. public void start() throws Exception{ 
  2.     while (true) { 
  3.         Message message = consumer.receive(); 
  4.         String key = message.getKey(); 
  5.         String data = new String(message.getData()); 
  6.         String topic = message.getTopicName(); 
  7.         if (StringUtils.isNotEmpty(data)) { 
  8.             try{ 
  9.                 logger.info("收到消息, topic:{}, key:{}, data:{}", topic, key, data); 
  10.             }catch(Exception e){ 
  11.                 logger.error("接收消息異常,topic:{}, key:{}, data:{}", topic, key, data, e); 
  12.             } 
  13.         } 
  14.         consumer.acknowledge(message); 
  15.     } 

最后編寫一個 Controller 類,調(diào)用 Producer 發(fā)送消息,代碼如下:

  1. @RequestMapping("/send"
  2. @ResponseBody 
  3. public String send(@RequestParam String key, @RequestParam String data) { 
  4.     logger.info("收到消息發(fā)送請求, key:{}, value:{}"key, data); 
  5.     pulsarProducer.sendMsg(key, data); 
  6.     return "success"

調(diào)用 Producer 發(fā)送一條消息,key=key1,data=data1,具體操作為在瀏覽器中輸入下面的 url 后回車:

  1. http://192.168.157.1:8083/pulsar/send?key=key1&data=data1 

可以看到控制臺輸出下面日志:

  1. 2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - 發(fā)送消息成功, key:key1, msg: data1 
  2. 2022-01-08 22:42:33,200 [http-nio-8083-exec-1] [INFO] boot.pulsar.PulsarProducer - 發(fā)送消息完成, key:key1, msg: data1 
  3. 2022-01-08 22:42:33,232 [Thread-22] [INFO] boot.pulsar.PulsarConsumer - 收到消息, topic:persistent://public/default/testTopic, key:key1, data:data1 
  4. 2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 
  5. 2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 69.000 ms - 95pct: 69.000 ms - 99pct: 69.000 ms - 99.9pct: 69.000 ms - max: 69.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0 

從日志中看到,這里使用的 namespace 就是創(chuàng)建集群時生成的public/default。

總結(jié)

從 SpringBoot 整合 Java 客戶端使用來看,Pulsar 的 api 是非常友好的,使用起來方便簡潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。

原文鏈接:https://mp.weixin.qq.com/s/4w0eucDNcrYrsiDXHzLwuQ

延伸 · 閱讀

精彩推薦
  • Java教程Java實現(xiàn)搶紅包功能

    Java實現(xiàn)搶紅包功能

    這篇文章主要為大家詳細(xì)介紹了Java實現(xiàn)搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程xml與Java對象的轉(zhuǎn)換詳解

    xml與Java對象的轉(zhuǎn)換詳解

    這篇文章主要介紹了xml與Java對象的轉(zhuǎn)換詳解的相關(guān)資料,需要的朋友可以參考下...

    Java教程網(wǎng)2942020-09-17
  • Java教程Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決

    Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關(guān)于小米推送Java代碼,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩(wěn)中求8032021-07-12
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發(fā)項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發(fā)現(xiàn)了對于集合操作轉(zhuǎn)換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關(guān)于Java8中S...

    阿杜7482021-02-04
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學(xué)習(xí)使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經(jīng)有好久沒有升過級了。升級完畢重啟之后,突然發(fā)現(xiàn)好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25
主站蜘蛛池模板: 91精品片| 视频国产一区二区 | 一级做a爱片性色毛片高清 国产精品色在线网站 | 成人不卡免费视频 | 久久精品视频3 | 黄色高清免费 | 成人午夜激情视频 | 欧美精品18videos性欧美 | 失禁高潮抽搐喷水h | 在线播放黄色网址 | 欧美91看片特黄aaaa | 久久里面有精品 | 99精品视频网站 | 亚洲第一页视频 | 久草在线新视觉 | 欧美一级黄色网 | 亚洲情av | 成人在线视频免费播放 | 黄视频网站免费观看 | 黄色三级三级三级 | 成人免费毛片在线观看 | 亚洲精品欧美二区三区中文字幕 | 九九热国产视频 | 黄色av网站在线观看 | 国产成人精品区一区二区不卡 | 国产91丝袜在线熟 | 精品国产一区在线 | 狠狠久久伊人中文字幕 | 91精品国产综合久久婷婷香蕉 | 一级看片免费视频 | 在线 日本 制服 中文 欧美 | 成年人免费视频大全 | 色综合久久久久综合99 | 黄污污网站 | 毛片免费视频网站 | 精品一区二区中文字幕 | 日本一区二区三区视频在线 | 永久在线观看电影 | 亚洲精品一区二区三区在线看 | 久草欧美 | 国内精品久久久久久久星辰影视 |