日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
RocketMQ如何保證消息的可靠性投遞?

介紹

要想保證消息的可靠型投遞,無非保證如下3個階段的正常執(zhí)行即可。

  1. 生產(chǎn)者將消息成功投遞到broker
  2. broker將投遞過程的消息持久化下來
  3. 消費者能從broker消費到消息

發(fā)送端消息重試

producer向broker發(fā)送消息后,沒有收到broker的ack時,rocketmq會自動重試。重試的次數(shù)可以設(shè)置,默認為2次

 
 
 
 
  1. DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME); 
  2. // 同步發(fā)送設(shè)置重試次數(shù)為5次 
  3. producer.setRetryTimesWhenSendFailed(5); 
  4. // 異步發(fā)送設(shè)置重試次數(shù)為5次 
  5. producer.setRetryTimesWhenSendAsyncFailed(5); 

消息持久化

我們先來了解一下消息的存儲流程,這個知識對后面分析消費端消息重試非常重要。

和消息相關(guān)的文件有如下幾種

  1. CommitLog:存儲消息的元數(shù)據(jù)
  2. ConsumerQueue:存儲消息在CommitLog的索引
  3. IndexFile:可以通過Message Key,時間區(qū)間快速查找到消息

整個消息的存儲流程如下

  1. Producer將消息順序?qū)懙紺ommitLog中
  2. 有一個線程根據(jù)消息的隊列信息,寫入到相關(guān)的ConsumerQueue中(minOffset為寫入的初始位置,consumerOffset為當前消費到的位置,maxOffset為ConsumerQueue最新寫入的位置)和IndexFile
  3. Consumer從ConsumerQueue的consumerOffset讀取到當前應(yīng)該消費的消息在CommitLog中的偏移量,到CommitLog中找到對應(yīng)的消息,消費成功后移動consumerOffset

刷盤機制

「異步刷盤」:消息被寫入內(nèi)存的PAGECACHE,返回寫成功狀態(tài),當內(nèi)存里的消息量積累到一定程度時,統(tǒng)一觸發(fā)寫磁盤操作,快速寫入 。吞吐量高,當磁盤損壞時,會丟失消息

「同步刷盤」:消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,給應(yīng)用返回消息寫成功的狀態(tài)。吞吐量低,但不會造成消息丟失

主從復制

如果一個broker有master和slave時,就需要將master上的消息復制到slave上,復制的方式有兩種

  1. 「同步復制」:master和slave均寫成功,才返回客戶端成功。maste掛了以后可以保證數(shù)據(jù)不丟失,但是同步復制會增加數(shù)據(jù)寫入延遲,降低吞吐量
  2. 「異步復制」:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當master出現(xiàn)故障后,有可能造成數(shù)據(jù)丟失

消費端消息重試

順序消息的重試

對于順序消息,當消費者消費消息失敗后,消息隊列RocketMQ版會自動不斷地進行消息重試(每次間隔時間為1秒),這時,應(yīng)用會出現(xiàn)消息消費被阻塞的情況。所以一定要做好監(jiān)控,避免阻塞現(xiàn)象的發(fā)生

「順序消息消費失敗后不會消費下一條消息而是不斷重試這條消息,應(yīng)該是考慮到如果跨過這條消息消費后面的消息會對業(yè)務(wù)邏輯產(chǎn)生影響」

「順序消息暫時僅支持集群消費模式,不支持廣播消費模式」

無序消息的重試

對于無序消息(普通、定時、延時、事務(wù)消息),當消費者消費消息失敗時,您可以通過設(shè)置返回狀態(tài)達到消息重試的結(jié)果。

「無序消息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續(xù)消費新的消息」

「消費時候后,重試的配置方式有如下三種」

  1. 返回Action.ReconsumeLater(推薦)
  2. 返回Null
  3. 拋出異常
 
 
 
 
  1. public class MessageListenerImpl implements MessageListener { 
  2.  
  3.     @Override 
  4.     public Action consume(Message message, ConsumeContext context) { 
  5.         //消息處理邏輯拋出異常,消息將重試。 
  6.         doConsumeMessage(message); 
  7.         //方式1:返回Action.ReconsumeLater,消息將重試。 
  8.         return Action.ReconsumeLater; 
  9.         //方式2:返回null,消息將重試。 
  10.         return null; 
  11.         //方式3:直接拋出異常,消息將重試。 
  12.         throw new RuntimeException("Consumer Message exception"); 
  13.     } 

「消費失敗后,無需重試的配置方式」

集群消費方式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回Action.CommitMessage,此后這條消息將不會再重試。

 
 
 
 
  1. public class MessageListenerImpl implements MessageListener { 
  2.  
  3.     @Override 
  4.     public Action consume(Message message, ConsumeContext context) { 
  5.         try { 
  6.             doConsumeMessage(message); 
  7.         } catch (Throwable e) { 
  8.             //捕獲消費邏輯中的所有異常,并返回Action.CommitMessage; 
  9.             return Action.CommitMessage; 
  10.         } 
  11.         //消息處理正常,直接返回Action.CommitMessage; 
  12.         return Action.CommitMessage; 
  13.     } 

「消息重試次數(shù)」

「RocketMQ默認允許每條消息最多重試16次,每次消費失敗發(fā)送一條延時消息到重試隊列,同一條消息失敗一次將延時等級提高一次,然后再放到重試隊列。重試16次后如果還沒有消費成功,則將消息放到死信隊列中?!?/p>

「注意:重試隊列和死信隊列都是按照Consumer Group劃分的」

重試隊列topic名字:%RETRY% + consumerGroup

死信隊列topic名字:%DLQ% + consumerGroup

「為什么重試隊列和死信隊列要按照Consumer Group來進行劃分?」

「因為在RocketMQ的時候使用一定要保持訂閱關(guān)系一致。即一個Consumer Group訂閱的topic和tag要完全一致,不然可能會導致消費邏輯混亂,消息丟失」

如下任意一種情況都表現(xiàn)為訂閱關(guān)系不一致

  • 相同ConsumerGroup下的Consumer實例訂閱了不同的Topic。
  • 相同ConsumerGroup下的Consumer實例訂閱了相同的Topic,但訂閱的Tag不一致。

我們可以通過控制臺查看各種類型的主題

消息每次重試的間隔時間如下

第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間

第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間
110 秒97 分鐘
230 秒108 分鐘
31 分鐘119 分鐘
42 分鐘1210 分鐘
53 分鐘1320 分鐘
64 分鐘1430 分鐘
75 分鐘151 小時
86 分鐘162 小時

「前面說到RocketMQ的消息重試是通過往重試隊列發(fā)送定時消息來實現(xiàn)的。」 RocketMQ支持18個級別的定時延時,每個級別定時消息的延時時間如下。

 
 
 
 
  1. // MessageStoreConfig.java 
  2. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 

消息重試只是把定時消息的前2個級別去掉,每次發(fā)送下一個級別的定時消息

我們可以設(shè)置消費端消息重試次數(shù)

  1. 最大重試次數(shù)小于等于16次,則重試時間間隔同上表描述。
  2. 最大重試次數(shù)大于16次,超過16次的重試時間間隔均為每次2小時。
 
 
 
 
  1. Properties properties = new Properties(); 
  2. // 配置對應(yīng)Group ID的最大消息重試次數(shù)為20次,最大重試次數(shù)為字符串類型。 
  3. properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); 
  4. Consumer consumer =ONSFactory.createConsumer(properties); 

「那么重試隊列中的消息是如何被消費的?」

消息消費者在啟動的時候,會訂閱正常的topic和重試隊列的topic

定時消息的實現(xiàn)邏輯也比較簡單,可以歸納為如下幾步

1.發(fā)送延時消息

1.1 替換topic為SCHEDULE_TOPIC_XXXX,queueId為消息延遲等級(如果不替換topic直接發(fā)到對應(yīng)的consumeQueue中,則消息會被立馬消費)

1.2 將消息原來的topic,queueId放到消息擴展屬性中

1.3 將消息應(yīng)該執(zhí)行的時間放到tagsCode中

將消息順序?qū)懙紺ommitLog中

將消息對應(yīng)的信息分發(fā)到對應(yīng)的ConsumerQueue中(topic為SCHEDULE_TOPIC_XXXX總共有18個queue,對應(yīng)18個延遲級別)

定時任務(wù)不斷判斷消息是否到達投遞時間,沒有到達則后續(xù)執(zhí)行投遞

如果到達投遞時間,則從commitLog中拉取消息的內(nèi)容,重新設(shè)置消息topic,queueId為原來的(原來的topic,queueId在消息擴展屬性中),然后將消息投遞到commitLog中,此時消息就會被分發(fā)到對應(yīng)的隊列中,然后被消費。

本文轉(zhuǎn)載自微信公眾號「Java識堂」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系Java識堂公眾號。


網(wǎng)頁題目:RocketMQ如何保證消息的可靠性投遞?
網(wǎng)頁網(wǎng)址:http://m.5511xx.com/article/cccshdh.html