新聞中心
延時消息是指發(fā)送到 RocketMQ 后不會馬上被消費者拉取到,而是等待固定的時間,才能被消費者拉取到。

成都創(chuàng)新互聯(lián)公司企業(yè)建站,10年網(wǎng)站建設(shè)經(jīng)驗,專注于網(wǎng)站建設(shè)技術(shù),精于網(wǎng)頁設(shè)計,有多年建站和網(wǎng)站代運營經(jīng)驗,設(shè)計師為客戶打造網(wǎng)絡(luò)企業(yè)風格,提供周到的建站售前咨詢和貼心的售后服務(wù)。對于成都網(wǎng)站制作、網(wǎng)站設(shè)計中不同領(lǐng)域進行深入了解和探索,創(chuàng)新互聯(lián)在網(wǎng)站建設(shè)中充分了解客戶行業(yè)的需求,以靈動的思維在網(wǎng)頁中充分展現(xiàn),通過對客戶行業(yè)精準市場調(diào)研,為客戶提供的解決方案。
延時消息的使用場景很多,比如電商場景下關(guān)閉超時未支付的訂單,某些場景下需要在固定時間后發(fā)送提示消息。
1.生產(chǎn)者
首先看一個生產(chǎn)者發(fā)送延時消息的官方示例代碼:
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}從上面的代碼可以看到,跟普通消息不一樣的是,消息設(shè)置 setDelayTimeLevel 屬性值,這里設(shè)置為 3,這里最終將 3 這個延時級別復(fù)制給了 DELAY 屬性。
關(guān)于延時級別,可以看下面這個定義:
//MessageStoreConfig類
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
這里延時級別有 18 個,上面的示例代碼中延遲級別是 3,消息會延遲 10s 后消費者才能拉取。
2.Broker 處理
2.1 寫入消息
Broker 收到消息后,會將消息寫入 CommitLog。在寫入時,會判斷消息 DELAY 屬性是否大于 0。代碼如下:
//CommitLog 類
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
從上面的代碼可以看到,CommitLog 寫入時并沒有直接寫入,而是把 Topic 改為 SCHEDULE_TOPIC_XXXX,把 queueId 改為延時級別減 1。因為延時級別有 18 個,所以這里有 18 個隊列。如下圖:
2.2 調(diào)度消息
延時消息寫入后,會有一個調(diào)度任務(wù)不停地拉取這些延時消息,這個邏輯在類 ScheduleMessageService。這個類的初始化代碼如下:
public void start() {
if (started.compareAndSet(false, true)) {
this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
//省略部分邏輯
for (Map.Entry entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
//省略部分邏輯
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
//省略持久化的邏輯
}
} 上面的 load() 方法會加載一個 delayLevelTable(ConcurrentHashMap類型),key 保存延時級別(從 1 開始),value 保存延時時間(單位是 ms)。
load() 方法結(jié)束后,創(chuàng)建了一個有 18 個核心線程的定時線程池,然后遍歷 delayLevelTable,創(chuàng)建 18 個任務(wù)(DeliverDelayedMessageTimerTask)進行每個延時級別的任務(wù)調(diào)度。任務(wù)調(diào)度的代碼邏輯如下:
public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
if (cq == null) {
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
return;
}
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ == null) {
//省略部分邏輯
this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
return;
}
long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
//省略部分邏輯
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown > 0) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null) {
continue;
}
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
//事務(wù)消息判斷省略
boolean deliverSuc;
//只保留同步
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
if (!deliverSuc) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
}
nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) {
log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
} finally {
bufferCQ.release();
}
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}這段代碼可以參考下面的流程圖來進行理解:
上面有一個修正投遞時間的函數(shù),這個函數(shù)的意義是如果已經(jīng)過了投遞時間,那么立即投遞。代碼如下:
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
long result = deliverTimestamp;
long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
if (deliverTimestamp > maxTimestamp) {
result = now;
}
return result;
}注意:消息從 CommitLog 轉(zhuǎn)發(fā)到 ConsumeQueue 時,會判斷是否是延時消息(Topic = SCHEDULE_TOPIC_XXXX 并且延時級別大于 0),如果是延時消息,就會修改 tagsCode 值為消息投遞的時間戳,而 tagsCode 原值是 tag 的 HashCode。代碼如下:
//CommitLog類checkMessageAndReturnSize方法
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
如下圖:
而 ScheduleMessageService 調(diào)度線程將消息從 ConsumeQueue 重新投遞到原始隊列中時,會把 tagsCode 再次修改為 tag 的 HashCode,代碼如下:
//類MessageExtBrokerInner,這個方法被 messageTimeup 方法調(diào)用。
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
if (null == tags || tags.length() == 0) { return 0; }
return tags.hashCode();
}
如下圖:
2.3 一個問題
如果有一個業(yè)務(wù)場景,要求延時消息 3 小時才能消費,而 RocketMQ 的延時消息最大延時級別只支持延時 2 小時,怎么處理?
這里提供兩個思路供大家參考:
在 Broker 上修改 messageDelayLevel 的默認配置;
在客戶端緩存 msgId,先設(shè)置延時級別是 18(2h),當客戶端拉取到消息后首先判斷有沒有緩存,如果有緩存則再次發(fā)送延時消息,這次延時級別是 17(1h),如果沒有緩存則進行消費。
3 總結(jié)
經(jīng)過上面的講解,延時消息的處理流程如下:
最后,延時消息的延時時間并不精確,這個時間是 Broker 調(diào)度線程把消息重新投遞到原始的 MessageQueue 的時間,如果發(fā)生消息積壓或者 RocketMQ 客戶端發(fā)生流量管控,客戶端拉取到消息后進行處理的時間可能會超出預(yù)設(shè)的延時時間。?
網(wǎng)站題目:五張圖帶你理解 RocketMQ 延時消息機制
瀏覽路徑:http://m.5511xx.com/article/dpgshod.html


咨詢
建站咨詢
