新聞中心
這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
RocketMQ5.x的生產(chǎn)者,應(yīng)該怎么配?
在RocketMQ 5.x中,生產(chǎn)者的配置主要包括以下幾個(gè)方面:

1、創(chuàng)建生產(chǎn)者實(shí)例
2、設(shè)置生產(chǎn)者屬性
3、發(fā)送消息
4、關(guān)閉生產(chǎn)者
下面詳細(xì)介紹每個(gè)方面的配置方法。
創(chuàng)建生產(chǎn)者實(shí)例
在RocketMQ中,生產(chǎn)者實(shí)例是通過(guò)DefaultMQProducer類創(chuàng)建的,首先需要引入RocketMQ的依賴,然后創(chuàng)建一個(gè)DefaultMQProducer實(shí)例。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class RocketMQProducer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建一個(gè)名為"producerGroup"的生產(chǎn)者組
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
// 設(shè)置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 啟動(dòng)生產(chǎn)者實(shí)例
producer.start();
}
}
設(shè)置生產(chǎn)者屬性
在創(chuàng)建完生產(chǎn)者實(shí)例后,可以通過(guò)setProperty方法設(shè)置生產(chǎn)者的屬性,以下是一些常用的屬性設(shè)置:
| 屬性名 | 默認(rèn)值 | 描述 |
sendMessageTimeout | 3000 | 發(fā)送消息的超時(shí)時(shí)間,單位為毫秒 |
retryTimesWhenSendFailed | 2 | 發(fā)送失敗時(shí)的重試次數(shù) |
maxReconsumeTimes | 18 | 消息最大重試次數(shù) |
topicPublishInfoBatchMaxSize | 1000 | 批量發(fā)送主題消息的最大數(shù)量 |
compressMsgBodyOverHowmuch | 1024 * 4 | 壓縮消息體的大小閾值,超過(guò)該值則進(jìn)行壓縮 |
batchSize | 16 | 批量發(fā)送消息的大小閾值,超過(guò)該值則進(jìn)行批量發(fā)送 |
sendMessageThreadPoolNums | 128 | 發(fā)送消息的線程池?cái)?shù)量 |
pullMessageThreadPoolNums | 128 | 拉取消息的線程池?cái)?shù)量 |
checkFrequency | 60000 | 檢查Broker是否可用的頻率,單位為毫秒 |
storePathRootDir | null | 消息存儲(chǔ)路徑的根目錄 |
storePathCommitLog | null | commitLog存儲(chǔ)路徑 |
mappedFileSizeCommitLog | 1GB | commitLog文件的大小閾值,超過(guò)該值則新建一個(gè)文件 |
flushIntervalCommitLog | 500ms | commitLog刷新到磁盤的時(shí)間間隔,單位為毫秒 |
cleanResourceInterval | 1000 * 60 * 60 | 清理資源的時(shí)間間隔,單位為毫秒 |
deleteWhen | 04 | commitLog文件刪除的時(shí)間點(diǎn),取值為小時(shí)(024)和分鐘(059)的組合,例如04表示凌晨4點(diǎn)刪除文件 |
fileReservedTime | 72 | commitLog文件保留的時(shí)間,單位為小時(shí),超過(guò)該時(shí)間則刪除文件 |
maxTransferBytesOnMessageInMemory | 1 | 消息內(nèi)存中傳輸?shù)淖畲笞止?jié)數(shù),超過(guò)該值則將消息寫入磁盤隊(duì)列中等待傳輸 |
maxTransferCountOnMessageInMemory | 1 | 消息內(nèi)存中傳輸?shù)淖畲蟠螖?shù),超過(guò)該值則將消息寫入磁盤隊(duì)列中等待傳輸 |
enableMsgTrace | false | 是否開(kāi)啟消息軌跡追蹤功能,開(kāi)啟后會(huì)記錄消息的發(fā)送和接收情況,用于排查問(wèn)題 |
msgTraceTopic | null | 消息軌跡追蹤的主題名稱,如果為null則使用生產(chǎn)者組名作為主題名稱 |
msgTraceConsumerGroup | null | 消息軌跡追蹤的消費(fèi)者組名稱,如果為null則使用生產(chǎn)者組名作為消費(fèi)者組名稱 |
flushConsumerQueueLaterally | false | 是否啟用側(cè)邊隊(duì)列刷盤機(jī)制,當(dāng)消費(fèi)者消費(fèi)速度過(guò)快時(shí),可以將部分消息暫時(shí)存儲(chǔ)在側(cè)邊隊(duì)列中,待消費(fèi)者慢下來(lái)后再進(jìn)行消費(fèi),提高系統(tǒng)吞吐量 |
maxReconsumeTimeDifference | 1800000000L | 兩次重試之間的最大時(shí)間差,超過(guò)該時(shí)間差則重新投遞未被消費(fèi)的消息,單位為毫秒,默認(rèn)值為1表示禁用該功能 |
maxDelayTime | 1L | 如果設(shè)置了延遲消息,則該參數(shù)表示延遲的最大時(shí)間,單位為毫秒,默認(rèn)值為1表示禁用該功能 |
maxOffsetDelta | 1L | 如果設(shè)置了延遲消息或者定時(shí)消息,則該參數(shù)表示允許的最大offset變化量,單位為毫秒,默認(rèn)值為1表示禁用該功能 |
traceDispatcherListenerStackSize | 1L | traceDispatcherListener的堆棧大小,默認(rèn)值為1表示使用JVM默認(rèn)值,建議設(shè)置為32K或更大以減少OOM異常的發(fā)生概率,注意:此參數(shù)僅在開(kāi)啟traceDispatcherEnable時(shí)生效。 |
| traceDispatcherEnable
網(wǎng)站欄目:RocketMQ5.x的生產(chǎn)者,應(yīng)該怎么配?
網(wǎng)站地址:http://m.5511xx.com/article/dhsogic.html


咨詢
建站咨詢
