日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
RabbitMQ高可用之如何確保消息成功消費(fèi)
  • 1. 兩種消費(fèi)思路
  • 2. 確保消費(fèi)成功兩種思路
  • 3. 消息拒絕
  • 4. 消息確認(rèn)
    • 4.1 自動(dòng)確認(rèn)
    • 4.2 手動(dòng)確認(rèn)
  • 5. 冪等性問(wèn)題
  • 6. 小結(jié)

前面一篇文章松哥和大家聊了 MQ 高可用之如何確保消息成功發(fā)送,各種配置齊上陣,最終確保了消息的成功發(fā)送,甚至在一些極端情況下還可能發(fā)生同一條消息重復(fù)發(fā)送的情況,不管怎么樣,消息總算發(fā)送出去了,如果小伙伴們還沒(méi)看過(guò)上篇文章,建議先看看,再來(lái)學(xué)習(xí)本文:

創(chuàng)新互聯(lián)公司主要業(yè)務(wù)有網(wǎng)站營(yíng)銷策劃、成都做網(wǎng)站、網(wǎng)站建設(shè)、微信公眾號(hào)開(kāi)發(fā)、小程序設(shè)計(jì)、H5場(chǎng)景定制、程序開(kāi)發(fā)等業(yè)務(wù)。一次合作終身朋友,是我們奉行的宗旨;我們不僅僅把客戶當(dāng)客戶,還把客戶視為我們的合作伙伴,在開(kāi)展業(yè)務(wù)的過(guò)程中,公司還積累了豐富的行業(yè)經(jīng)驗(yàn)、成都全網(wǎng)營(yíng)銷推廣資源和合作伙伴關(guān)系資源,并逐漸建立起規(guī)范的客戶服務(wù)和保障體系。 

四種策略確保 RabbitMQ 消息發(fā)送可靠性!你用哪種?

今天我們就來(lái)聊一聊消息消費(fèi)的問(wèn)題,看看如何確保消息消費(fèi)成功,并且確保冪等性。

1. 兩種消費(fèi)思路

RabbitMQ 的消息消費(fèi),整體上來(lái)說(shuō)有兩種不同的思路:

  • 推(push):MQ 主動(dòng)將消息推送給消費(fèi)者,這種方式需要消費(fèi)者設(shè)置一個(gè)緩沖區(qū)去緩存消息,對(duì)于消費(fèi)者而言,內(nèi)存中總是有一堆需要處理的消息,所以這種方式的效率比較高,這也是目前大多數(shù)應(yīng)用采用的消費(fèi)方式。
  • 拉(pull):消費(fèi)者主動(dòng)從 MQ 拉取消息,這種方式效率并不是很高,不過(guò)有的時(shí)候如果服務(wù)端需要批量拉取消息,倒是可以采用這種方式。

兩種方式我都舉個(gè)例子看下。

先來(lái)看推(push):

這種方式大家比較常見(jiàn),就是通過(guò) @RabbitListener 注解去標(biāo)記消費(fèi)者,如下:

 
 
 
 
  1. @Component
  2. public class ConsumerDemo {
  3.     @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
  4.     public void handle(String msg) {
  5.         System.out.println("msg = " + msg);
  6.     }
  7. }

當(dāng)監(jiān)聽(tīng)的隊(duì)列中有消息時(shí),就會(huì)觸發(fā)該方法。

再來(lái)看拉(pull):

 
 
 
 
  1. @Test
  2. public void test01() throws UnsupportedEncodingException {
  3.     Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
  4.     System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
  5. }

調(diào)用 receiveAndConvert 方法,方法參數(shù)為隊(duì)列名稱,方法執(zhí)行完成后,會(huì)從 MQ 上拉取一條消息下來(lái),如果該方法返回值為 null,表示該隊(duì)列上沒(méi)有消息了。receiveAndConvert 方法有一個(gè)重載方法,可以在重載方法中傳入一個(gè)等待超時(shí)時(shí)間,例如 3 秒。此時(shí),假設(shè)隊(duì)列中沒(méi)有消息了,則 receiveAndConvert 方法會(huì)阻塞 3 秒,3 秒內(nèi)如果隊(duì)列中有了新消息就返回,3 秒后如果隊(duì)列中還是沒(méi)有新消息,就返回 null,這個(gè)等待超時(shí)時(shí)間要是不設(shè)置的話,默認(rèn)為 0。

這是消息兩種不同的消費(fèi)模式。

如果需要從消息隊(duì)列中持續(xù)獲得消息,就可以使用推模式;如果只是單純的消費(fèi)一條消息,則使用拉模式即可。切忌將拉模式放到一個(gè)死循環(huán)中,變相的訂閱消息,這會(huì)嚴(yán)重影響 RabbitMQ 的性能。

2. 確保消費(fèi)成功兩種思路

在上篇文章中,我們想盡辦法確保消息能夠發(fā)送成功,對(duì)于消息消費(fèi)成功,其實(shí)官方提供了相關(guān)的機(jī)制,我們一起來(lái)看下。

為了保證消息能夠可靠的到達(dá)消息消費(fèi)者,RabbitMQ 中提供了消息消費(fèi)確認(rèn)機(jī)制。當(dāng)消費(fèi)者去消費(fèi)消息的時(shí)候,可以通過(guò)指定 autoAck 參數(shù)來(lái)表示消息消費(fèi)的確認(rèn)方式。

當(dāng) autoAck 為 false 的時(shí)候,此時(shí)即使消費(fèi)者已經(jīng)收到消息了,RabbitMQ 也不會(huì)立馬將消息移除,而是等待消費(fèi)者顯式的回復(fù)確認(rèn)信號(hào)后,才會(huì)將消息打上刪除標(biāo)記,然后再刪除。

當(dāng) autoAck 為 true 的時(shí)候,此時(shí)消息消費(fèi)者就會(huì)自動(dòng)把發(fā)送出去的消息設(shè)置為確認(rèn),然后將消息移除(從內(nèi)存或者磁盤中),即使這些消息并沒(méi)有到達(dá)消費(fèi)者。

我們來(lái)看一張圖:

如上圖所示,在 RabbitMQ 的 web 管理頁(yè)面:

  • Ready 表示待消費(fèi)的消息數(shù)量。
  • Unacked 表示已經(jīng)發(fā)送給消費(fèi)者但是還沒(méi)收到消費(fèi)者 ack 的消息數(shù)量。

這是我們可以從 UI 層面觀察消息的消費(fèi)情況確認(rèn)情況。

當(dāng)我們將 autoAck 設(shè)置為 false 的時(shí)候,對(duì)于 RabbitMQ 而言,消費(fèi)分成了兩個(gè)部分:

  • 待消費(fèi)的消息
  • 已經(jīng)投遞給消費(fèi)者,但是還沒(méi)有被消費(fèi)者確認(rèn)的消息

換句話說(shuō),當(dāng)設(shè)置 autoAck 為 false 的時(shí)候,消費(fèi)者就變得非常從容了,它將有足夠的時(shí)間去處理這條消息,當(dāng)消息正常處理完成后,再手動(dòng) ack,此時(shí) RabbitMQ 才會(huì)認(rèn)為這條消息消費(fèi)成功了。如果 RabbitMQ 一直沒(méi)有收到客戶端的反饋,并且此時(shí)客戶端也已經(jīng)斷開(kāi)連接了,那么 RabbitMQ 就會(huì)將剛剛的消息重新放回隊(duì)列中,等待下一次被消費(fèi)。

綜上所述,確保消息被成功消費(fèi),無(wú)非就是手動(dòng) Ack 或者自動(dòng) Ack,無(wú)他。當(dāng)然,無(wú)論這兩種中的哪一種,最終都有可能導(dǎo)致消息被重復(fù)消費(fèi),所以一般來(lái)說(shuō)我們還需要在處理消息時(shí),解決冪等性問(wèn)題。

3. 消息拒絕

當(dāng)客戶端收到消息時(shí),可以選擇消費(fèi)這條消息,也可以選擇拒絕這條消息。我們來(lái)看下拒絕的方式:

 
 
 
 
  1. @Component
  2. public class ConsumerDemo {
  3.     @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
  4.     public void handle(Channel channel, Message message) {
  5.         //獲取消息編號(hào)
  6.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  7.         try {
  8.             //拒絕消息
  9.             channel.basicReject(deliveryTag, true);
  10.         } catch (IOException e) {
  11.             e.printStackTrace();
  12.         }
  13.     }
  14. }

消費(fèi)者收到消息之后,可以選擇拒絕消費(fèi)該條消息,拒絕的步驟分兩步:

  1. 獲取消息編號(hào) deliveryTag。
  2. 調(diào)用 basicReject 方法拒絕消息。

調(diào)用 basicReject 方法時(shí),第二個(gè)參數(shù)是 requeue,即是否重新入隊(duì)。如果第二個(gè)參數(shù)為 true,則這條被拒絕的消息會(huì)重新進(jìn)入到消息隊(duì)列中,等待下一次被消費(fèi);如果第二個(gè)參數(shù)為 false,則這條被拒絕的消息就會(huì)被丟掉,不會(huì)有新的消費(fèi)者去消費(fèi)它了。

需要注意的是,basicReject 方法一次只能拒絕一條消息。

4. 消息確認(rèn)

消息確認(rèn)分為自動(dòng)確認(rèn)和手動(dòng)確認(rèn),我們分別來(lái)看。

4.1 自動(dòng)確認(rèn)

先來(lái)看看自動(dòng)確認(rèn),在 Spring Boot 中,默認(rèn)情況下,消息消費(fèi)就是自動(dòng)確認(rèn)的。

我們來(lái)看如下一個(gè)消息消費(fèi)方法:

 
 
 
 
  1. @Component
  2. public class ConsumerDemo {
  3.     @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
  4.     public void handle2(String msg) {
  5.         System.out.println("msg = " + msg);
  6.         int i = 1 / 0;
  7.     }

通過(guò) @Componet 注解將當(dāng)前類注入到 Spring 容器中,然后通過(guò) @RabbitListener 注解來(lái)標(biāo)記一個(gè)消息消費(fèi)方法,默認(rèn)情況下,消息消費(fèi)方法自帶事務(wù),即如果該方法在執(zhí)行過(guò)程中拋出異常,那么被消費(fèi)的消息會(huì)重新回到隊(duì)列中等待下一次被消費(fèi),如果該方法正常執(zhí)行完沒(méi)有拋出異常,則這條消息就算是被消費(fèi)了。

4.2 手動(dòng)確認(rèn)

手動(dòng)確認(rèn)我又把它分為兩種:推模式手動(dòng)確認(rèn)與拉模式手動(dòng)確認(rèn)。

4.2.1 推模式手動(dòng)確認(rèn)

要開(kāi)啟手動(dòng)確認(rèn),需要我們首先關(guān)閉自動(dòng)確認(rèn),關(guān)閉方式如下:

 
 
 
 
  1. spring.rabbitmq.listener.simple.acknowledge-mode=manual

這個(gè)配置表示將消息的確認(rèn)模式改為手動(dòng)確認(rèn)。

接下來(lái)我們來(lái)看下消費(fèi)者中的代碼:

 
 
 
 
  1. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
  2. public void handle3(Message message,Channel channel) {
  3.     long deliveryTag = message.getMessageProperties().getDeliveryTag();
  4.     try {
  5.         //消息消費(fèi)的代碼寫(xiě)到這里
  6.         String s = new String(message.getBody());
  7.         System.out.println("s = " + s);
  8.         //消費(fèi)完成后,手動(dòng) ack
  9.         channel.basicAck(deliveryTag, false);
  10.     } catch (Exception e) {
  11.         //手動(dòng) nack
  12.         try {
  13.             channel.basicNack(deliveryTag, false, true);
  14.         } catch (IOException ex) {
  15.             ex.printStackTrace();
  16.         }
  17.     }
  18. }

將消費(fèi)者要做的事情放到一個(gè) try..catch 代碼塊中。

如果消息正常消費(fèi)成功,則執(zhí)行 basicAck 完成確認(rèn)。

如果消息消費(fèi)失敗,則執(zhí)行 basicNack 方法,告訴 RabbitMQ 消息消費(fèi)失敗。

這里涉及到兩個(gè)方法:

  • basicAck:這個(gè)是手動(dòng)確認(rèn)消息已經(jīng)成功消費(fèi),該方法有兩個(gè)參數(shù):第一個(gè)參數(shù)表示消息的 id;第二個(gè)參數(shù) multiple 如果為 false,表示僅確認(rèn)當(dāng)前消息消費(fèi)成功,如果為 true,則表示當(dāng)前消息之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息都消費(fèi)成功。
  • basicNack:這個(gè)是告訴 RabbitMQ 當(dāng)前消息未被成功消費(fèi),該方法有三個(gè)參數(shù):第一個(gè)參數(shù)表示消息的 id;第二個(gè)參數(shù) multiple 如果為 false,表示僅拒絕當(dāng)前消息的消費(fèi),如果為 true,則表示拒絕當(dāng)前消息之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息;第三個(gè)參數(shù) requeue 含義和前面所說(shuō)的一樣,被拒絕的消息是否重新入隊(duì)。

當(dāng) basicNack 中最后一個(gè)參數(shù)設(shè)置為 false 的時(shí)候,還涉及到一個(gè)死信隊(duì)列的問(wèn)題,這個(gè)松哥以后再專門寫(xiě)文章和大家細(xì)聊。

4.2.2 拉模式手動(dòng)確認(rèn)

拉模式手動(dòng) ack 比較麻煩一些,在 Spring 中封裝的 RabbitTemplate 中并未找到對(duì)應(yīng)的方法,所以我們得用原生的辦法,如下:

 
 
 
 
  1. public void receive2() {
  2.     Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
  3.     long deliveryTag = 0L;
  4.     try {
  5.         GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
  6.         deliveryTag = getResponse.getEnvelope().getDeliveryTag();
  7.         System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
  8.         channel.basicAck(deliveryTag, false);
  9.     } catch (IOException e) {
  10.         try {
  11.             channel.basicNack(deliveryTag, false, true);
  12.         } catch (IOException ex) {
  13.             ex.printStackTrace();
  14.         }
  15.     }
  16. }

這里涉及到的 basicAck 和 basicNack 方法跟前面的一樣,我就不再贅述。

5. 冪等性問(wèn)題

最后我們?cè)賮?lái)說(shuō)說(shuō)消息的冪等性問(wèn)題。

大家設(shè)想下面一個(gè)場(chǎng)景:

消費(fèi)者在消費(fèi)完一條消息后,向 RabbitMQ 發(fā)送一個(gè) ack 確認(rèn),此時(shí)由于網(wǎng)絡(luò)斷開(kāi)或者其他原因?qū)е?RabbitMQ 并沒(méi)有收到這個(gè) ack,那么此時(shí) RabbitMQ 并不會(huì)將該條消息刪除,當(dāng)重新建立起連接后,消費(fèi)者還是會(huì)再次收到該條消息,這就造成了消息的重復(fù)消費(fèi)。同時(shí),由于類似的原因,消息在發(fā)送的時(shí)候,同一條消息也可能會(huì)發(fā)送兩次(參見(jiàn)四種策略確保 RabbitMQ 消息發(fā)送可靠性!你用哪種?)。種種原因?qū)е挛覀冊(cè)谙M(fèi)消息時(shí),一定要處理好冪等性問(wèn)題。

冪等性問(wèn)題的處理倒也不難,基本上都是從業(yè)務(wù)上來(lái)處理,我來(lái)大概說(shuō)說(shuō)思路。

采用 Redis,在消費(fèi)者消費(fèi)消息之前,現(xiàn)將消息的 id 放到 Redis 中,存儲(chǔ)方式如下:

  • id-0(正在執(zhí)行業(yè)務(wù))
  • id-1(執(zhí)行業(yè)務(wù)成功)

如果 ack 失敗,在 RabbitMQ 將消息交給其他的消費(fèi)者時(shí),先執(zhí)行 setnx,如果 key 已經(jīng)存在(說(shuō)明之前有人消費(fèi)過(guò)該消息),獲取他的值,如果是 0,當(dāng)前消費(fèi)者就什么都不做,如果是 1,直接 ack。

極端情況:第一個(gè)消費(fèi)者在執(zhí)行業(yè)務(wù)時(shí),出現(xiàn)了死鎖,在 setnx 的基礎(chǔ)上,再給 key 設(shè)置一個(gè)生存時(shí)間。生產(chǎn)者,發(fā)送消息時(shí),指定 messageId。

當(dāng)然這只是一個(gè)簡(jiǎn)單思路供大家參考。

松哥在 vhr 項(xiàng)目中也處理了消息冪等性問(wèn)題,感興趣的小伙伴可以查看 vhr 源碼(https://github.com/lenve/vhr),代碼在 mailserver 中。

6. 小結(jié)

好啦,今天就和小伙伴們聊了下 RabbitMQ 中和消息消費(fèi)相關(guān)的幾個(gè)話題,感興趣的小伙伴可以實(shí)踐下哦~

本文轉(zhuǎn)載自微信公眾號(hào)「江南一點(diǎn)雨」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系江南一點(diǎn)雨公眾號(hào)。


當(dāng)前標(biāo)題:RabbitMQ高可用之如何確保消息成功消費(fèi)
當(dāng)前URL:http://m.5511xx.com/article/dhshcjc.html