日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線(xiàn)溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
RocketMQ控制臺(tái)消費(fèi)者堆棧信息展示優(yōu)化分析

背景介紹

專(zhuān)有云企業(yè)版v_3_12,消息隊(duì)列RocketMQ控制臺(tái)->Group管理,查看Group ID下單個(gè)消費(fèi)端堆棧信息,期望只展示與該Group ID相關(guān)的堆棧信息,在以下場(chǎng)景與期望不符。

成都創(chuàng)新互聯(lián)公司專(zhuān)注于鳳城企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站,成都商城網(wǎng)站開(kāi)發(fā)。鳳城網(wǎng)站建設(shè)公司,為鳳城等地區(qū)提供建站服務(wù)。全流程定制設(shè)計(jì),專(zhuān)業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,成都創(chuàng)新互聯(lián)公司專(zhuān)業(yè)和態(tài)度為您提供的服務(wù)

場(chǎng)景介紹

在同一個(gè)程序中創(chuàng)建兩個(gè)不同Group ID的消費(fèi)端實(shí)例,在控制臺(tái)中查看一個(gè)Group ID下單個(gè)消費(fèi)端堆棧信息,堆棧信息中包含了兩個(gè)Group ID消費(fèi)端的堆棧信息,給排查問(wèn)題造成了困擾。

示例代碼

pom


com.aliyun.openservices
ons-client
1.8.8.3.Final

code

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class Main {
public static void main(String[] args){
String nameSrvAddr = "xxx";
String accessKey = "xxx";
String secretKey = "xxx";
String groupId1 = "Goup_ID_1";
String topic1 = "xxx_1";
String tag1 = "xxx_1";
BatchMessageListener batchMessageListener1 = (messages, context) -> Action.CommitMessage;
BatchConsumerBean batchConsumerBean1 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,
groupId1,topic1,tag1,batchMessageListener1);
batchConsumerBean1.start();

String groupId2 = "Goup_ID_2";
String topic2 = "xxx_2";
String tag2 = "xxx_2";
BatchMessageListener batchMessageListener2 = (messages, context) -> Action.CommitMessage;
BatchConsumerBean batchConsumerBean2 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,
groupId2,topic2,tag2,batchMessageListener2);
batchConsumerBean2.start();
}

private static BatchConsumerBean batchConsumerBean(String nameSrvAddr,String accessKey,String secretKey,String groupId,String topic,String tag,BatchMessageListener batchMessageListener){
BatchConsumerBean batchConsumerBean = new BatchConsumerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESRV_ADDR,nameSrvAddr);
properties.put(PropertyKeyConst.AccessKey,accessKey);
properties.put(PropertyKeyConst.SecretKey,secretKey);
properties.put(PropertyKeyConst.GROUP_ID,groupId);
batchConsumerBean.setProperties(properties);

Subscription subscription = new Subscription();
subscription.setTopic(topic);
subscription.setExpression(tag);
Map subscriptionTable = new HashMap<>();
subscriptionTable.put(subscription,batchMessageListener);

batchConsumerBean.setSubscriptionTable(subscriptionTable);
return batchConsumerBean;
}
}

分析過(guò)程

首先分析示例代碼中與BatchConsumerBean相關(guān)聯(lián)的對(duì)象,然后分析控制臺(tái)展示消費(fèi)端堆棧信息的流程,最后分析下不同版本的RocketMQ Client SDK對(duì)消費(fèi)端消費(fèi)線(xiàn)程命名方式的變化。

BatchConsumerBean

示例代碼中創(chuàng)建了兩個(gè)BatchConsumerBean實(shí)例,與BatchConsumerBean實(shí)例相關(guān)聯(lián)的對(duì)象如下:

與BatchConsumerBean關(guān)聯(lián)的對(duì)象

從上圖看,BatchConsumerBean實(shí)例是比較重的,所以上面的示例代碼可以?xún)?yōu)化為只創(chuàng)建一個(gè)BatchConsumerBean實(shí)例,與該問(wèn)題不太相關(guān),暫時(shí)忽略;
上圖中與該問(wèn)題直接相關(guān)的是ClientRemotingProcessor、MQClientInstance、DefaultMQPushConsumerImpl、ConsumerStatsManager,下面繼續(xù)分析。

堆棧信息展示流程

下面描述的是在瀏覽器請(qǐng)求一個(gè)Group ID單個(gè)消費(fèi)端堆棧信息的流程。

堆棧信息展示流程

瀏覽器請(qǐng)求控制臺(tái)應(yīng)用

當(dāng)在控制臺(tái)單機(jī)某個(gè)消費(fèi)端堆棧信息的時(shí)候,瀏覽器會(huì)向控制臺(tái)應(yīng)用發(fā)起http請(qǐng)求,主要請(qǐng)求參數(shù)是:
GroupID,ClientId,其中每個(gè)MQClientInstance實(shí)例對(duì)應(yīng)一個(gè)ClientId。

控制臺(tái)應(yīng)用請(qǐng)求Broker

控制臺(tái)應(yīng)用收到瀏覽器請(qǐng)求后,主要進(jìn)行以下操作:

String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List brokerDatas = topicRouteData.getBrokerDatas();
if (brokerDatas != null) {
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack,timeoutMillis * 3);
}
}
}
  1. 根據(jù)%RETRY% + GroupIID查找對(duì)應(yīng)的TopicRouteData
  2. 從TopicRouteData中選擇一個(gè)Broker的地址發(fā)送getConsumerRunningInfo請(qǐng)求

Broker請(qǐng)求Consumer

Broker收到請(qǐng)求后,主要進(jìn)行以下操作:

ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);
newRequest.setExtFields(request.getExtFields());
newRequest.setBody(request.getBody());
return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
  1. AdminBrokerProcessor響應(yīng)查詢(xún)請(qǐng)求
  2. 根據(jù)GroupID和ClientId找到對(duì)應(yīng)Consumer實(shí)例的channel socket
  3. 通過(guò)channel socket發(fā)送請(qǐng)求到Consumer實(shí)例

Consumer處理邏輯

Consumer收到請(qǐng)求后,主要進(jìn)行以下操作:

ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
if (requestHeader.isJstackEnable()) {
Map map = Thread.getAllStackTraces();
String jstack = UtilAll.jstack(map);
consumerRunningInfo.setJstack(jstack);
}
  1. 通過(guò)MQClientInstance實(shí)例請(qǐng)求Consumer實(shí)例的consumerRunningInfo方法獲取Consumer運(yùn)行信息,如:pullRT、pullTPS、consumeRT、consumeOKTPS、consumeFailedTPS等信息
  2. 獲取JVM所有線(xiàn)程棧信息
  3. 將獲取到的ConsumerRunningInfo返回給Broker。

其中第2步【獲取JVM所有線(xiàn)程棧信息】就是我們需要查看的堆棧信息,目前控制臺(tái)主要展示了以ConsumeMessageThread__開(kāi)頭的線(xiàn)程和RebalanceService線(xiàn)程,這塊期望只展示與該消費(fèi)端相關(guān)的ConsumeMessageThread__線(xiàn)程和Rebalance線(xiàn)程,不期望將不相關(guān)的消費(fèi)端線(xiàn)程也展示出來(lái)。

ConsumeMessageThread線(xiàn)程的命名

在當(dāng)前版本中處理業(yè)務(wù)的消費(fèi)者線(xiàn)程名的形式是:ConsumeMessageThread_數(shù)字,
ConsumeMessageConcurrentlyService類(lèi)中相關(guān)代碼如下:

//該線(xiàn)程池用于處理業(yè)務(wù)邏輯
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));

新版本中線(xiàn)程的命名中增加了GroupId,相關(guān)代碼如下:

String consumeThreadPrefix = null;
if (consumerGroup.length() > 100) {
consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup, 0, 100).append("_").toString();
} else {
consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
}
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));

線(xiàn)程名形式為:ConsumeMessageThread_GroupId__數(shù)字,從一定程度對(duì)以上問(wèn)題進(jìn)行了優(yōu)化。

總結(jié)

  1. ONS SDK對(duì)RocketMQ Client進(jìn)行了封裝,更加方便業(yè)務(wù)的使用,Consumer對(duì)象比較重,需要根據(jù)業(yè)務(wù)采用合理的初始化方式
  2. ConsumerStatsManager記錄了消費(fèi)端的一些統(tǒng)計(jì)信息
  3. ConsumeMessageConcurrentlyService對(duì)消費(fèi)端線(xiàn)程命名進(jìn)行了優(yōu)化?

本文題目:RocketMQ控制臺(tái)消費(fèi)者堆棧信息展示優(yōu)化分析
本文鏈接:http://m.5511xx.com/article/cdegoho.html