日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線(xiàn)溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
SpringBoot整合RabbitMQ保證消息的可靠的投遞及消費(fèi)

環(huán)境:SpringBoot2.7.9

10年的揭西網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都全網(wǎng)營(yíng)銷(xiāo)推廣的優(yōu)勢(shì)是能夠根據(jù)用戶(hù)設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整揭西建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)公司從事“揭西網(wǎng)站設(shè)計(jì)”,“揭西網(wǎng)站推廣”以來(lái),每個(gè)客戶(hù)項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

消息丟失場(chǎng)景

  1. 生產(chǎn)者丟失消息
    生產(chǎn)者發(fā)出的數(shù)據(jù)由于網(wǎng)絡(luò)原因沒(méi)有到底MQ Server丟失
  2. MQ Server丟消息
    由于消息隊(duì)列沒(méi)有持久化或者是消息沒(méi)有持久化,在Server重啟后消息丟失
  3. 消費(fèi)者丟消息
    接收到消息后,業(yè)務(wù)還沒(méi)有處理完成,服務(wù)宕機(jī)(當(dāng)你是自動(dòng)ACK)。

生產(chǎn)者丟失解決方案

  1. 通過(guò)事務(wù)(不推薦)
  2. 確認(rèn)機(jī)制(推薦)

這里只講如何通過(guò)確認(rèn)機(jī)制保證生產(chǎn)者不丟失消息

  • 引入依賴(lài)

org.springframework.boot
spring-boot-starter-amqp

  • 聲明交換機(jī)及隊(duì)列
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("akf.exchange", true, false) ;
}
@Bean
public Queue queue() {
return new Queue("akf.queue", true, false, false) ;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("akf.#") ;
}

  • RabbitMQ配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
template:
mandatory: true

注意:spring.rabbitmq.publisher-confirm-type及spring.rabbitmq.publisher-returns 的配置值。

接下來(lái)是為RabbitTemplate配置對(duì)應(yīng)的Callback,Publisher確認(rèn)回調(diào),Publisher返回回調(diào)。

  1. 確認(rèn)回調(diào)
    當(dāng)消息發(fā)送到了交換機(jī)則ack=true,當(dāng)消息無(wú)法發(fā)送到交換機(jī)則ack=false。
  2. 返回回調(diào)
    當(dāng)消息能夠發(fā)送到交換機(jī),但是不能路由到隊(duì)列則會(huì)調(diào)用該return回調(diào)。

RabbitTemplate是單例的可以通過(guò)兩種方式配置對(duì)應(yīng)的回調(diào)。

  1. 自定義RabbitTemplate。
  2. 通過(guò)AWare接口獲取RabbitTemplate配置。

這里只講通過(guò)AWare接口配置回調(diào)。

  • 配置Callback
@Component
public class ConfigRabbitTemplate implements ApplicationContextAware {

@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class) ;
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlation: " + correlationData) ;
if (ack) {
System.out.println("消息發(fā)送到交換機(jī)") ;
} else {
System.out.println("消息發(fā)送失敗 - " + ", cause" + cause) ;
}
}
});
rabbitTemplate.setReturnsCallback(new ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(returned.getExchange() + ", " + returned.getRoutingKey() + ", " + returned.getReplyCode() + ", " + returned.getMessage().toString()) ;
}
});
}

}

使用錯(cuò)誤的交換機(jī)和錯(cuò)誤的路由key分別測(cè)試即可以看到上面的輸出信息了。

MQ Server丟消息

在通過(guò)@Bean聲明交換機(jī)和隊(duì)列時(shí)設(shè)置持久性,在消息上設(shè)置持久化。

@Bean
public TopicExchange topicExchange() {
// 這里的第二個(gè)參數(shù)就是設(shè)置是否持久化,如果設(shè)置為false,當(dāng)服務(wù)重啟交換機(jī)將丟失
// 第三個(gè)參數(shù)是否自動(dòng)刪除,當(dāng)不再使用該交換機(jī)時(shí)會(huì)自動(dòng)刪除該交換機(jī)
return new TopicExchange("akf.exchange", true, false) ;
}
@Bean
public Queue queue() {
// 第二個(gè)參數(shù)true設(shè)置隊(duì)列是持久化的,當(dāng)服務(wù)重啟隊(duì)列不會(huì)丟失
return new Queue("akf.queue", true, false, false) ;
}

設(shè)置消息持久化。

Message message = MessageBuilder.withBody("Hello".getBytes())
// 設(shè)置消息投遞模式為持久化的(默認(rèn)不設(shè)置就是持久化的)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build() ;

消費(fèi)者丟消息

關(guān)閉自動(dòng)應(yīng)答機(jī)制。

默認(rèn)是自動(dòng)應(yīng)答,當(dāng)消息監(jiān)聽(tīng)方法中沒(méi)有異常時(shí)則正常應(yīng)答,當(dāng)發(fā)生異常時(shí),在默認(rèn)情況下會(huì)重新入隊(duì)列(這樣就會(huì)出現(xiàn)死循環(huán))。

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: manual #設(shè)置為手動(dòng)應(yīng)答

消息監(jiān)聽(tīng)。

@RabbitListener(queues = {"akf.queue"})
public void onMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("接收到消息: " + new String(message.getBody()));
// ... 這里處理我們的業(yè)務(wù)代碼
// 當(dāng)消費(fèi)者把消息消費(fèi)成功,再手動(dòng)應(yīng)答RabbitMQ
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 如果發(fā)生了異常,我們一般的處理是直接扔掉死信隊(duì)列,一般這里出現(xiàn)錯(cuò)誤都是消息有問(wèn)題
// 如果消息出現(xiàn)問(wèn)題,你重試再入隊(duì)列是無(wú)意義的
}
}

消息重試

如果消息消費(fèi)時(shí)出現(xiàn)錯(cuò)誤,你又希望能夠通過(guò)重試來(lái)盡可能的處理掉該消息,Spring也提供了相應(yīng)的重試機(jī)制。

修改配置:

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: auto
concurrency: 1
retry:
# 開(kāi)啟重試
enabled: true
# 延遲1s后開(kāi)始重試
initialInterval: 1000
# 每次消息重試的間隔乘數(shù)
multiplier: 3
# 2次間的重試最大間隔時(shí)間
maxInterval: 20000
maxAttempts: 4 #重試4次,1s, 3s, 9s
stateless: true #如果消息處理中存在事務(wù)則需要將其設(shè)置為false

如果只是做上面的配置,重試指定次數(shù)后消息將會(huì)被丟棄,這是默認(rèn)行為。Spring提供了 MessageRecoverer接口來(lái)決定消息如何處理。默認(rèn)Spring提供如下幾種實(shí)現(xiàn):

  1. ImmediateRequeueMessageRecoverer
  2. RejectAndDontRequeueRecoverer
  3. RepublishMessageRecoverer

我們只需要定義一個(gè)Bean為MessageRecoverer即可,這里我們就用Spring提供的RepublishMessageRecoverer重新發(fā)布消息。

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error") ;
}

這里將消息重新發(fā)布一個(gè)專(zhuān)門(mén)的隊(duì)列(重試指定次數(shù)后)。


分享標(biāo)題:SpringBoot整合RabbitMQ保證消息的可靠的投遞及消費(fèi)
URL網(wǎng)址:http://m.5511xx.com/article/dpihshc.html