激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 用了這么久的RabbitMQ異步編程竟然都是錯的!

用了這么久的RabbitMQ異步編程竟然都是錯的!

2020-12-01 23:18JavaEdge Java教程

RabbitMQ雖可將消息落地磁盤,即使MQ異常消息數據也不會丟失,但異步流程在消息發送、傳輸、處理等環節,都可能發生消息丟失。所有MQ都無法確保百分百可用,業務設計都需考慮不可用時異步流程將如何繼續。

優秀的項目都由同步、異步和定時任務三種處理模式相輔相成。當屬異步編程充滿坑點。

1 適用場景

1.1 服務于主流程的分支流程

在注冊流程中,數據寫DB是主流程,但注冊后給用戶發優惠券或歡迎短信是分支流程,時效性也不強。

1.2 用戶無需實時看到結果

比如外賣下單后的配貨、送貨流程完全可異步處理,每個階段處理完成后,再給用戶發推送或短信讓用戶知曉即可。

1.3 MQ

任務的緩沖的分發,流量削峰、服務解耦和消息廣播。

當然了異步處理不僅僅是通過 MQ 來實現,還有其他方式

比如開新線程執行,返回 Future

還有各種異步框架,比如 Vertx,它是通過 callback 的方式實現

2 異步處理之坑

2.1 異步處理需做消息補償以閉環

RabbitMQ雖可將消息落地磁盤,即使MQ異常消息數據也不會丟失,但異步流程在消息發送、傳輸、處理等環節,都可能發生消息丟失。所有MQ都無法確保百分百可用,業務設計都需考慮不可用時異步流程將如何繼續。

因此,對于異步處理流程,必須考慮補償或建立主備雙活流程。

2.1.1 案例

用戶注冊后異步發送歡迎消息。

  • 用戶注冊落DB為同步流程
  • 會員服務收到消息后發送歡迎消息為異步流程

用了這么久的RabbitMQ異步編程竟然都是錯的!

  • 藍線

         MQ異步處理(主線),消息可能丟失(虛線代表異步調用)

  • 綠線

        補償Job定期消息補償(備線),以補償主線丟失的消息

  • 考慮極端的MQ中間件失效場景

        要求備線的處理吞吐能力達到主線性能

代碼示例

UserController 注冊+發送異步消息。注冊方法,一次性注冊10個用戶,用戶注冊消息不能發送出去的概率為50%。

用了這么久的RabbitMQ異步編程竟然都是錯的!

MemberService 會員服務監聽用戶注冊成功的消息,并發送歡迎短信。使用ConcurrentHashMap存放那些發過短信的用戶ID實現冪等,避免相同的用戶補償時重復發短信

用了這么久的RabbitMQ異步編程竟然都是錯的!

對于MQ消費程序,處理邏輯須考慮去重(支持冪等):

  • MQ消息可能會因中間件本身配置錯誤、穩定性等原因出現重復
  • 自動補償重復
  • 比如本例,同一消息可能既走MQ也走補償,肯定會出現重復,而且考慮到高內聚,補償Job本身不會做去重
  • 人工補償重復

出現消息堆積時,異步處理流程必然延遲。若提供補償功能,則在處理遇到延遲時,很可能會先人工補償,過段時間后處理程序又收到消息了,重復處理。

有次MQ故障,MQ中堆積了幾十萬條發放資金消息,導致業務無法及時處理,運營以為程序出錯,就先通過后臺進行人工處理,結果MQ系統恢復后消息又被重復處理一次,造成大量資金重復發放。

異步處理須考慮消息重復可能性,因此處理邏輯須實現冪等,防止重復處理。

接著定義補償Job即備線操作。

定時任務,5秒做一次補償,因Job并不知道哪些用戶注冊的消息可能丟失,所以是全量補償。

  • 補償邏輯

         每5秒補償一次,按順序一次補償5個用戶,下一次補償操作從上一次補償的最后一個用戶ID開始

         補償任務提交到線程池以“異步”處理,提高處理能力

用了這么久的RabbitMQ異步編程竟然都是錯的!

為實現高內聚,主線和備線處理消息,最好使用同一方法。本案例的MemberService監聽到MQ消息和CompensationJob補償,調用的都是welcome。

這里的補償邏輯簡單僅為 demo,實際生產代碼須:

  • 考慮配置補償的頻次、每次處理數量,以及補償線程池大小等參數為合適值,以滿足補償的吞吐量
  • 考慮備線補償數據進行適當延遲
  • 比如,對注冊時間在30s前的用戶再進行補償,以方便和主線MQ實時流程錯開,避免沖突
  • 諸如當前補償到哪個用戶的offset數據,需要落地DB
  • 補償Job本身須高可用,可使用類似xxl-job或ElasticJob等任務系統。

運行程序,執行注冊方法注冊10個用戶,查看日志

用了這么久的RabbitMQ異步編程竟然都是錯的!

可見

  • 共10個用戶,MQ發送成功的用戶有四個:1、5、7、8
  • 補償任務第一次運行,補償了用戶2、3、4,第二次運行補償了用戶6、9,第三次運行補充了用戶10

消息補償閉環的最高標準

能夠達到補償全量數據的吞吐量。即若補償備線足夠完善,即使直接停機MQ,雖會稍微影響處理及時性,但至少確保流程都能正常執行。

小結

實際開發要考慮異步流程丟消息或處理中斷場景。

異步流程需有備線以補償,比如這里的全量補償方式,即便異步流程徹底失效,通過補償也能讓業務繼續進行。

2.2 RabbitMQ廣播、工作隊列模式坑

消息模式是廣播 Or 工作隊列

  • 消息廣播

        同一消息,不同消費者都能分別消費

  • 隊列模式

        不同消費者共享消費同一個隊列的數據,相同消息只能被某一個消費者消費一次。

比如同一用戶的注冊消息

  • 會員服務需監聽以發送歡迎短信
  • 營銷服務需監聽以發送新用戶小禮物

但會員、營銷服務都可能有多實例,業務需求同一用戶的消息,可同時廣播給不同的服務(廣播模式),但對同一服務的不同實例(比如會員服務1和會員服務2),不管哪個實例來處理,處理一次即可(工作隊列模式):

用了這么久的RabbitMQ異步編程竟然都是錯的!

實現代碼時務必確認MQ系統的機制,確保消息的路由按期望。

RocketMQ實現類似功能比較簡單直白:若消費者屬于一個組,那么消息只會由同組的一個消費者消費;若消費者屬不同組,每個組都能消費一遍消息。

而RabbitMQ的消息路由模式采用隊列+交換器,隊列是消息載體,交換器決定消息路由到隊列的方式。

step1:會員服務-監聽用戶服務發出的新用戶注冊消息

若啟動倆會員服務,那么同一用戶的注冊消息應只能被其中一個實例消費。

分別實現RabbitMQ隊列、交換器、綁定三件套。

  • 隊列使用匿名隊列
  • 交換器使用DirectExchange,交換器綁定到匿名隊列的路由Key是空字符串

收到消息之后,打印所在實例使用的端口。

  • 消息發布者、消費者、以及MQ的配置

用了這么久的RabbitMQ異步編程竟然都是錯的!

使用12345和45678兩個端口啟動倆程序實例后,發條消息,輸出的日志,顯示同一會員服務兩個實例都收到了消息:

用了這么久的RabbitMQ異步編程竟然都是錯的!

用了這么久的RabbitMQ異步編程竟然都是錯的!

所以問題在于不明

RabbitMQ直接交換器和隊列的綁定關系

RabbitMQ的直接交換器根據routingKey路由消息。而程序每次啟動都會創建匿名(隨機命名)隊列,所以每個會員服務實例都對應獨立的隊列,以空routingKey綁定到直接交換器。

用戶服務發消息時也設置了空routingKey,所以直接交換器收到消息后,發現匹配倆隊列,于是都轉發消息

用了這么久的RabbitMQ異步編程竟然都是錯的!

修復

對會員服務不要使用匿名隊列,而使用同一隊列。

將上面代碼中的匿名隊列換做普通隊列:

private static final String QUEUE = "newuserQueue";@Beanpublic Queue queue() { return new Queue(QUEUE);}

這樣對同一消息,倆實例中只有一個實例可收到,不同消息被輪詢發給不同實例。

現在的交換器和隊列關系

用了這么久的RabbitMQ異步編程竟然都是錯的!

step2:用戶服務-廣播消息給會員、營銷服務

期望會員、營銷服務都能收到廣播消息,但會員/營銷服務中的每個實例只需收到一次消息。

聲明一個隊列和一個FanoutExchange,然后模擬倆用戶服務和倆營銷服務:

用了這么久的RabbitMQ異步編程竟然都是錯的!

注冊四個用戶。日志發現一條用戶注冊的消息,要么被會員服務收到,要么被營銷服務收到,這不是廣播。可使用的明明是FanoutExchange,為什么沒起效呢?

用了這么久的RabbitMQ異步編程竟然都是錯的!

因為廣播交換器會忽略routingKey,廣播消息到所有綁定的隊列。該案例的倆會員服務和兩個營銷服務都綁定了同一隊列,所以四服務只能收到一次消息:

用了這么久的RabbitMQ異步編程竟然都是錯的!

修復

拆分隊列,會員和營銷兩組服務分別使用一條獨立隊列綁定到廣播交換器

用了這么久的RabbitMQ異步編程竟然都是錯的!

現在的交換器和隊列結構

用了這么久的RabbitMQ異步編程竟然都是錯的!

從日志輸出可以驗證,對每條MQ消息,會員服務和營銷服務分別都會收到一次,一條消息廣播到兩個服務同時,在每一個服務的兩個實例中通過輪詢接收:

用了這么久的RabbitMQ異步編程竟然都是錯的!

異步的消息路由模式一旦配置出錯,輕則可能導致消息重復處理,重則可能導致重要的服務無法接收到消息,最終造成業務邏輯錯誤。

小結

微服務場景下不同服務多個實例監聽消息的情況,一般不同服務需要同時收到相同的消息,而相同服務的多個實例只需要輪詢接收消息。我們需要確認MQ的消息路由配置是否滿足需求,以避免消息重復或漏發問題。

2.3 死信堵塞MQ之坑

始終無法處理的死信消息,可能會引發堵塞MQ。

若線程池的任務隊列無上限,最終可能導致OOM,類似的MQ也要注意任務堆積問題。對于突發流量引起的MQ堆積,問題并不大,適當調整消費者的消費能力應該就可以解決。但在很多時候,消息隊列的堆積堵塞,是因為有大量始終無法處理的消息。

2.3.1 案例

用戶服務在用戶注冊后發出一條消息,會員服務監聽到消息后給用戶派發優惠券,但因用戶并沒有保存成功,會員服務處理消息始終失敗,消息重新進入隊列,然后還是處理失敗。這種在MQ中回蕩的同一條消息,就是死信。

隨著MQ被越來越多的死信填滿,消費者需花費大量時間反復處理死信,導致正常消息的消費受阻,最終MQ可能因數據量過大而崩潰。

定義一個隊列、一個直接交換器,然后把隊列綁定到交換器

用了這么久的RabbitMQ異步編程竟然都是錯的!

sendMessage發送消息到MQ,訪問一次提交一條消息,使用自增標識作為消息內容

用了這么久的RabbitMQ異步編程竟然都是錯的!

收到消息后,直接NPE,模擬處理出錯

用了這么久的RabbitMQ異步編程竟然都是錯的!

調用sendMessage接口發送兩條消息,然后來到RabbitMQ管理臺,可以看到這兩條消息始終在隊列,不斷被重新投遞,導致重新投遞QPS達到1063。

用了這么久的RabbitMQ異步編程竟然都是錯的!

在日志中也可看到大量異常信息。

修復方案

  • 解決死信無限重復進入隊列最簡單方案

        程序處理出錯時,直接拋AmqpRejectAndDontRequeueException,避免消息重新進入隊列

throw new AmqpRejectAndDontRequeueException("error"); 

但更希望對同一消息,能夠先進行幾次重試,解決因為網絡問題導致的偶發消息處理失敗,若依舊失敗,再把消息投遞到專門設置的DLX。對于來自DLX的數據,可能只是記錄日志發送報警,即使出現異常也不會再重復投遞。

邏輯如下

用了這么久的RabbitMQ異步編程竟然都是錯的!

針對該問題,我們來看

Spring AMQP的簡便解決方案

  1. 定義死信交換器、死信隊列。其實都是普通交換器和隊列,只不過專門用于處理死信消息
  2. 通過RetryInterceptorBuilder構建一個RetryOperationsInterceptor以處理失敗時候的重試。策略是最多嘗試5次(重試4次);并且采取指數退避重試,首次重試延遲1秒,第二次2秒,以此類推,最大延遲是10秒;如果第4次重試還是失敗,則使用RepublishMessageRecoverer把消息重新投入一個DLX
  3. 定義死信隊列的處理程序。本案例只記錄日志

代碼

用了這么久的RabbitMQ異步編程竟然都是錯的!

執行程序,發送兩條消息,查看日志:

用了這么久的RabbitMQ異步編程竟然都是錯的!
  • msg2的4次重試間隔分別是1秒、2秒、4秒、8秒,再加上首次的失敗,所以最大嘗試次數是5
  • 4次重試后,RepublishMessageRecoverer把消息發往DLX
  • 死信處理程序輸出了got dead message msg2。

雖然幾乎同時發倆消息,但msg2在msg1四次重試全部結束后才開始處理,因為默認SimpleMessageListenerContainer只有一個消費線程。可通過增加消費線程避免性能問題:

直接設置concurrentConsumers參數為10,來增加到10個工作線程

用了這么久的RabbitMQ異步編程竟然都是錯的!

也可設置maxConcurrentConsumers參數,讓SimpleMessageListenerContainer動態調整消費者線程數。

小結

一般在遇到消息處理失敗的時候,可設置重試。若重試還是不行,可把該消息扔到專門的死信隊列處理,不要讓死信影響到正常消息處理。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 一级电影在线免费观看 | 欧美一区二区三区不卡免费观看 | 日日草日日干 | 狠狠干网站 | 精品国产91久久久久久久妲己 | 成人精品视频网站 | 在线视频 欧美日韩 | 亚洲精品 在线播放 | 国产精品免费大片 | 中国杭州少妇xxxx做受 | 日韩做爰视频免费 | 色啪综合 | 亚洲第一色婷婷 | 99re66热这里只有精品8 | 亚洲不卡 | 久久久一区二区三区精品 | 成年免费在线视频 | 色中色激情影院 | 福利视频亚洲 | 娇妻被各种姿势c到高潮小说 | 欧美激情第一区 | 亚洲精品久久久久久下一站 | 热久久91| av在线免费观看不卡 | 欧美黄色免费视频 | 91香蕉国产亚洲一区二区三区 | 国产98色在线| 欧美aaaaa一级毛片在线 | 91在线看黄 | 在线观看精品视频 | 黄网站在线免费 | 三人弄娇妻高潮3p视频 | 国产精品一品二区三区四区18 | 国产亚洲自拍一区 | 99精品视频网站 | 国产在线观看91精品 | 精品国产观看 | 少妇色诱麻豆色哟哟 | 日本高清在线播放 | 日韩视频一区二区在线观看 | 黄色高清av |