日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
警惕!這八個場景下RocketMQ會發(fā)生流量控制

大家好,我是君哥。

在使用 RocketMQ 的過程中,有時候我們會看到下面的日志:

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5

這是因為 RocketMQ 觸發(fā)了流量控制。今天我們來聊一聊哪些場景下 RocketMQ 會觸發(fā)流量控制。

如上圖,生產(chǎn)者把消息寫入 Broker,Consumer 從 Broker 拉取消息。Broker 是 RocketMQ 的核心 ,觸發(fā)流量控制主要就是為了防止 Broker 壓力過大而宕機。

一、 Broker 流控

1、 broker busy

RockerMQ 默認(rèn)采用異步刷盤策略,Producer 把消息發(fā)送到 Broker 后,Broker 會先把消息寫入 Page Cache,刷盤線程定時地把數(shù)據(jù)從 Page Cache 刷到磁盤上,如下圖:

那 broker busy 是怎么導(dǎo)致的呢?

Broker 默認(rèn)是開啟快速失敗的,處理邏輯類是 BrokerFastFailure,這個類中有一個定時任務(wù)用來清理過期的請求,每 10 ms 執(zhí)行一次,代碼如下:

public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}

(1)Page Cache 繁忙

清理過期請求之前首先會判斷 Page Cache 是否繁忙,如果繁忙,就會給 Producer 返回一個系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),也就是本文開頭的異常日志。那怎么判斷 Page Cache 繁忙呢?Broker 收到一條消息后會追加到 Page Cache 或者內(nèi)存映射文件,這個過程首先獲取一個 CommitLog 寫入鎖,如果持有鎖的時間大于 osPageCacheBusyTimeOutMills(默認(rèn) 1s,可以配置),就認(rèn)為 Page Cache 繁忙。具體代碼見 DefaultMessageStore 類 isOSPageCacheBusy 方法。

(2)清理過期請求

清理過期請求時,如果請求線程的創(chuàng)建時間到當(dāng)前系統(tǒng)時間間隔大于 waitTimeMillsInSendQueue(默認(rèn) 200ms,可以配置)就會清理這個請求,然后給 Producer 返回一個系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")。

system busy

這個異常在 NettyRemotingAbstract#processRequestCommand 方法。

拒絕請求

如果 NettyRequestProcessor 拒絕了請求,就會給 Producer 返回一個系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")。那什么情況下請求會被拒絕呢?看下面這段代碼:

//SendMessageProcessor類
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}

從代碼中可以看到,請求被拒絕的情況有兩種可能,一個是 Page Cache 繁忙,另一個是 TransientStorePoolDeficient。

跟蹤 isTransientStorePoolDeficient 方法,發(fā)現(xiàn)判斷依據(jù)是在開啟 transientStorePoolEnable 配置的情況下,是否還有可用的 ByteBuffer。

注意:在開啟 transientStorePoolEnable 的情況下,寫入消息時會先寫入堆外內(nèi)存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盤。而讀取消息是從 Page Cache,這樣可以實現(xiàn)讀寫分離,避免讀寫都在 Page Cache 帶來的問題。如下圖:

線程池拒絕

Broker 收到請求后,會把處理邏輯封裝成到 Runnable 中,由線程池來提交執(zhí)行,如果線程池滿了就會拒絕請求(這里線程池中隊列的大小默認(rèn)是 10000,可以通過參數(shù) sendThreadPoolQueueCapacity 進行配置),線程池拒絕后會拋出異常 RejectedExecutionException,程序捕獲到異常后,會判斷是不是單向請求(OnewayRPC),如果不是,就會給 Producer 返回一個系統(tǒng)繁忙的狀態(tài)碼(code=2,remark="[OVERLOAD]system busy, start flow control for a while")。

判斷 OnewayRPC 的代碼如下,flag = 2 或者 3 時是單向請求:

public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}

(3) 消息重試

Broker 發(fā)生流量控制的情況下,返回給 Producer 系統(tǒng)繁忙的狀態(tài)碼(code=2),Producer 收到這個狀態(tài)碼是不會進行重試的。下面是會進行重試的響應(yīng)碼:

//DefaultMQProducer類
private final Set retryResponseCodes = new CopyOnWriteArraySet(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));

二、 Consumer 流控

DefaultMQPushConsumerImpl 類中有 Consumer 流控的邏輯 。

1、 緩存消息數(shù)量超過閾值

ProcessQueue 保存的消息數(shù)量超過閾值(默認(rèn) 1000,可以配置),源碼如下:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

2、緩存消息大小超過閾值

ProcessQueue 保存的消息大小超過閾值(默認(rèn) 100M,可以配置),源碼如下:

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

3、 緩存消息跨度超過閾值

對于非順序消費的場景,ProcessQueue 中保存的最后一條和第一條消息偏移量之差超過閾值(默認(rèn) 2000,可以配置)。源代碼如下:

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}

4、獲取鎖失敗

對于順序消費的情況,ProcessQueue 加鎖失敗,也會延遲拉取,這個延遲時間默認(rèn)是 3s,可以配置。

三、總結(jié)

本文介紹了 RocketMQ 發(fā)生流量控制的 8 個場景,其中 Broker 4 個場景,Consumer 4 個場景。Broker 的流量控制,本質(zhì)是對 Producer 的流量控制,最好的解決方法就是給 Broker 擴容,增加 Broker 寫入能力。而對于 Consumer 端的流量控制,需要解決 Consumer 端消費慢的問題,比如有第三方接口響應(yīng)慢或者有慢 SQL。

在使用的時候,根據(jù)打印的日志可以分析具體是哪種情況的流量控制,并采用相應(yīng)的措施。


分享題目:警惕!這八個場景下RocketMQ會發(fā)生流量控制
當(dāng)前地址:http://m.5511xx.com/article/dhoidge.html