新聞中心
大家好,我是君哥。

你所需要的網(wǎng)站建設(shè)服務(wù),我們均能行業(yè)靠前的水平為你提供.標準是產(chǎn)品質(zhì)量的保證,主要從事成都網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計、企業(yè)網(wǎng)站建設(shè)、手機網(wǎng)站制作、網(wǎng)頁設(shè)計、成都品牌網(wǎng)站建設(shè)、網(wǎng)頁制作、做網(wǎng)站、建網(wǎng)站。創(chuàng)新互聯(lián)擁有實力堅強的技術(shù)研發(fā)團隊及素養(yǎng)的視覺設(shè)計專才。
RocketMQ 選擇了自己寫 NameServer 做注冊中心而沒有選擇 Zookeeper,這是為什么呢?
首先看一下 RocketMQ 的架構(gòu),如下圖:
RocketMQ 的 Broker 注冊到 NameServer 集群,而生產(chǎn)者和消費者則需要從 NameServer 拉取消息。
1 NameServer
1.1 Broker 注冊
Broker 啟動時,會向 NameServer 發(fā)送注冊消息,相關(guān)的 UML 類圖如下:
我們看一下 BrokerOuterAPI 的 registerBrokerAll 方法,代碼如下:
//BrokerOuterAPI.java
public ListregisterBrokerAll(
//省略參數(shù)
final boolean compressed) {
final ListregisterBrokerResultList = new CopyOnWriteArrayList<>();
ListnameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//省略 requestHeader 封裝
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
} catch (Exception e) {
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
可以看到,當 Broker 啟動時,會向所有的 NameServer 發(fā)送注冊消息,NameServer 端的注冊內(nèi)容如下:
從上圖中看出,需要在 NameServer 上保存的數(shù)據(jù)其實是很少的。
注意:
- Broker 向 NameServer 注冊時,會注冊到所有的 NameServer 服務(wù)器, NameServer 并不是分布式存儲,NameServer 集群是去中心化的。
- NameServer 會有定時任務(wù)(每 10s 一次)檢查 Broker 是否下線了,判斷依據(jù)是 120s 內(nèi)有沒有收到心跳,如果沒有收到,則關(guān)閉 channel,把 Broker 信息從本地緩存移除。代碼見 RouteInfoManager 類 scanNotActiveBroker 方法。
- Broker 啟動時,同時會啟動定時任務(wù),每 30s 向 NameServer 發(fā)送注冊消息,NameServer 收到注冊消息后更新心跳時間(BrokerLiveInfo.lastUpdateTimestamp)。
下面是 Broker 對 NameServer 的兩個請求碼:
- 注冊:RequestCode.REGISTER_BROKER
- 心跳:RequestCode.QUERY_DATA_VERSION?
1.2 新建 Topic
創(chuàng)建 Topic 時,Broker 會向 NameServer 發(fā)送注冊消息。代碼如下:
//BrokerController 類
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
TopicConfig registerTopicConfig = topicConfig;
//省略
ConcurrentMaptopicConfigTable = new ConcurrentHashMap ();
topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}
最終調(diào)用了上一節(jié)的 registerBrokerAll 的方法。NameServer 收到注冊消息后更新本地保存的數(shù)據(jù),所保存的數(shù)據(jù)并沒有增加新數(shù)據(jù)。
1.3 客戶端
對于生產(chǎn)者和消費者,在發(fā)送和拉取消息時,首先會從本地緩存獲取 Topic 路由信息,如果獲取失敗,則需要從 NameServer 進行獲取。下面是獲取 Topic 路由信息的 UML 類圖:
看一下更新 Topic 路由信息的核心代碼:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
//根據(jù)默認 Topic 來取
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
//省略部分邏輯
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
if (topicRouteData != null) {
//判斷路由信息是否發(fā)送變化
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
if (!producerTable.isEmpty()) {
//更新生產(chǎn)者緩存
}
// Update sub info
if (!consumerTable.isEmpty()) {
//更新消費者緩存
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
} catch (RemotingException e) {
} finally {
this.lockNamesrv.unlock();
}
} else {}
} catch (InterruptedException e) {
}
return false;
}
注意:客戶端會有定時任務(wù),默認每隔 30s 向 NameServer 拉取 Topic 路由信息來刷新本地緩存。
2.放棄 Zookeeper
RocketMQ 設(shè)計之初也是使用 Zookeeper 做注冊中心的,這參考了 Kafka 的設(shè)計。Zookeeper 是一個非常成熟的注冊中心,還有支持主節(jié)點選舉、強一致等特性。使用 Zookeeper 的架構(gòu)如下:
2.1 輕量級
從上面的分析中可以看到,RocketMQ 需要保存的數(shù)據(jù)非常少,完全不必引入 Zookeeper 這種重量級的注冊中心。
2.2 一致性
NameServer 集群各節(jié)點是對等的,相互之間并不會進行通信,這樣確實會有短暫不一致。Broker 啟動時會跟所有的 NameServer 建立長鏈接,發(fā)送注冊信息。注冊成功后,每 30s 會向 NameServer 發(fā)送心跳,NameServer 收到心跳后更新 Broker 的 lastUpdateTimestamp。
Zookeeper 使用 ZAB 協(xié)議來保證節(jié)點之間數(shù)據(jù)的強一致性,這要求在每一個寫請求都需要在節(jié)點上寫事務(wù)日志,同時需要將內(nèi)存數(shù)據(jù)持久化到磁盤以保證一致性和持久性。對于 RocketMQ 這種元數(shù)據(jù)非常少的簡單場景,有點小題大做了。
放棄強一致而選擇可用性也是 RocketMQ 放棄 Zookeeper 的選擇,這也讓 NameServer 的設(shè)計更加簡單。
2.3 并發(fā)注冊
NameServer 處理 Broker 注冊的時候,考慮到多個 Broker 并發(fā)注冊的問題,保存路由信息時采用了 ReadWriteLock 中的寫鎖,代碼如下:
public RegisterBrokerResult registerBroker(
//省略參數(shù)
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
SetbrokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet();
this.clusterAddrTable.put(clusterName, brokerNames);
}
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap());
this.brokerAddrTable.put(brokerName, brokerData);
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
}
return result;
}
2.4 Broker 上下線
如果有新的 Broker 加入時,NameServer 并不會主動向客戶端推送新的 Broker 信息,而是需要客戶端的定時任務(wù)(30s 一次)去主動拉取,這樣客戶端保存的路由信息跟 NameServer 會有短暫的不一致。
同樣,Broker 掉線后,NameServer 會用定時任務(wù)(10s 一次)檢測 Broker 最后更新時間是否超過 120s,如果超過就把 Broker 路由信息刪除。在客戶端,同樣需要定時任務(wù)(30s 一次)去主動拉取,客戶端保存的路由信息跟 NameServer 也會有短暫的不一致。
2.5 擴展性
從上面分析看到,NameServer 集群各節(jié)點是對等的,當集群有壓力時,橫向擴展非常容易。而 Zookeeper 在寫擴展方面非常不靈活。
2.6 Broker 主從集群
在 Broker 主從集群中,RocketMQ 實現(xiàn)了基于 raft 協(xié)議的 DLedger 算法,可以基于 DLedger 進行日志復(fù)制。如果 Master 節(jié)點發(fā)生故障,可以基于 DLedger 自動進行主從切換。這可以完全不依賴于 Zookeeper 的實現(xiàn)。
2.7 運維
如果引入 Zookeeper,運維人員必須要具備運維 Zookeeper 的能力,這又增加了運維的復(fù)雜性。
3.總結(jié)
對于注冊中心,RocketMQ 集群需要保存的元數(shù)據(jù)非常少,完全沒有必要引入 Zookeeper 這種重量級的注冊中心。
RocketMQ 實現(xiàn)了基于 raft 協(xié)議的 DLedger 算法,可以保證 Broker 集群高可用,不用依賴 Zookeeper。
NameServer 是 RocketMQ 內(nèi)部組件,實現(xiàn)簡單,易于擴展,不用考慮運維復(fù)雜性。
分享題目:五張圖告訴你RocketMQ為什么不使用Zookeeper做注冊中心
網(wǎng)站地址:http://m.5511xx.com/article/dphjccc.html


咨詢
建站咨詢
