新聞中心
前言
續(xù)上次分享 RabbitMQ 客戶端源碼系列 - Connection ,繼續(xù)分享Channel相關(guān)的源碼分析 (com.rabbitmq:amqp-client:4.8.3)。

創(chuàng)新互聯(lián)專注于多倫企業(yè)網(wǎng)站建設(shè),成都響應(yīng)式網(wǎng)站建設(shè),成都商城網(wǎng)站開發(fā)。多倫網(wǎng)站建設(shè)公司,為多倫等地區(qū)提供建站服務(wù)。全流程定制網(wǎng)站建設(shè),專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
友情提醒:本次分享適合的人群,需要對 RabbitMQ 有一定的了解
Channels
https://www.rabbitmq.com/channels.html。
RabbitMQ client Demo
基于上次 Java Client Connecting to RabbitMQ Demo 針對 RabbitMQ Channel 繼續(xù)深入分析。
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
//本次重點分析內(nèi)容
Channel channel = connection.createChannel();
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
channel.close();
connection.close();
AMQP 協(xié)議交互 -- Channel
可以看到簡單地調(diào)用了 Channel channel = connection.createChannel(); 方法創(chuàng)建Channel,以及可以看到 Channel 相應(yīng)的 AMQP 協(xié)議交互:「客戶端發(fā)送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客戶端 創(chuàng)建通道;broker 收到并創(chuàng)建通道完成)」。
整個 AMQP 協(xié)議的交互流程(172.30.0.74 為客戶端即本機 ip;192.168.17.160 為 RabbitMQ Broker 的 ip)
RabbitMQ client 緩存模式為 Channel
本次分析 RabbitMQ client 采用緩存模式為 Channel:一個 Connection 對應(yīng)多個 Channel(默認情況下 2048個 channel,其中一個是特殊 channel0)
- 「Connection」:主要用于AMQP協(xié)議解析,信道復(fù)用。
- 「Channel」:路由、安全性、協(xié)調(diào)。
- 「Queue」:內(nèi)存中的消息,永久隊列索引(在 channel 與 queue 之間還有一個 exchange作為交換機,此處就不展開說了)。
RabbitMQ client CacheMode為 Channel模式
channel 源碼分析
上面簡單地介紹 AMQP 協(xié)議交互流程中 Channel 連接、Connection 與 Channel的關(guān)系。
開始本次主要介紹 Channel 以及涉及到 Connection 相關(guān)的源碼,從connection.createChannel開始深入分析。
/** Public API - {@inheritDoc} */
@Override
public Channel createChannel() throws IOException {
// 確認 connection 為打開的狀態(tài)
ensureIsOpen();
// 管理channel
ChannelManager cm = _channelManager;
if (cm == null) return null;
// 創(chuàng)建 channel 核心的方法
Channel channel = cm.createChannel(this);
// 用于暴露指標
metricsCollector.newChannel(channel);
return channel;
}可以看到 channel 由 connection 調(diào)用并管理:
- ensureIsOpen() -- 確認 connection 為打開的狀態(tài),邏輯比較簡單判斷 shutdownCause 為空即可(connection關(guān)閉的話,shutdownCause同時會附帶指示關(guān)閉的情況)。
- channelManager -- 統(tǒng)一由 connection 進行初始化及管理,在之前connection與broker 創(chuàng)建連接交互(Connection.Tune --> Connection.TuneOk)中初始化完成,默認 ChannelMax 為 2047 (2048 - 1,這個1對應(yīng)的特殊的 channel0 )。
重點看下 channelManager.createChannel(this) 邏輯。
public ChannelN createChannel(AMQConnection connection) throws IOException {
ChannelN ch;
// 該 monitor 主要監(jiān)控 _channelMap 和 channelNumberAllocator
synchronized (this.monitor) {
// 獲取 channel 分配的編號
int channelNumber = channelNumberAllocator.allocate();
if (channelNumber == -1) {
return null;
} else {
// 新增新的 channel
ch = addNewChannel(connection, channelNumber);
}
}
// 將新增的 channel 打開
ch.open(); // now that it's been safely added
return ch;
}channelManager 管理著 channel 的創(chuàng)建連接釋放等:
- synchronized (this.monitor) -- 首先獲取 channelManager 的 monitor 鎖,防止多線程并發(fā)操作。
- channelNumberAllocator.allocate -- 獲取范圍內(nèi)未被分配的 channelNumber,返回 -1 則認為不可再分配新的 channel,內(nèi)部主要的邏輯由 BitSet 實現(xiàn)的(感興趣的可以了解下)。
后續(xù)重點分析 addNewChannel 和 open 邏輯。
private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
// 判重
if (_channelMap.containsKey(channelNumber)) {
// That number's already allocated! Can't do it
// This should never happen unless something has gone
// badly wrong with our implementation.
throw new IllegalStateException("We have attempted to "
+ "create a channel with a number that is already in "
+ "use. This should never happen. "
+ "Please report this as a bug.");
}
// 構(gòu)建
ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
// 放入 _channelMap 統(tǒng)一管理
_channelMap.put(ch.getChannelNumber(), ch);
return ch;
}
public ChannelN(AMQConnection connection, int channelNumber,
ConsumerWorkService workService, MetricsCollector metricsCollector) {
// AMQChannel 構(gòu)造函數(shù)
super(connection, channelNumber);
// 構(gòu)建 消費分配器
this.dispatcher = new ConsumerDispatcher(connection, this, workService);
this.metricsCollector = metricsCollector;
}這塊邏輯比較簡單,執(zhí)行 instantiateChannel 構(gòu)建和初始化 channel,主要涉及到 連接、channel編號、超時時間、dispatcher等等,每一個 channel 都擁有一個 dispatcher,但是 「連接和線程池」 是與同一個 connection 共享。
最終獲取到新創(chuàng)建的 channel,進行打開 ch.open()。
public void open() throws IOException {
// 對rabbitmq broker 發(fā)送Channel.Open,并等待broker返回 Channel.OpenOk
exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
}
public AMQCommand exnWrappingRpc(Method m)
throws IOException
{
try {
// 針對該方法進行rpc調(diào)用
return privateRpc(m);
} catch (AlreadyClosedException ace) {
// Do not wrap it since it means that connection/channel
// was closed in some action in the past
throw ace;
} catch (ShutdownSignalException ex) {
throw wrap(ex);
}
}
private AMQCommand privateRpc(Method m)
throws IOException, ShutdownSignalException
{
// 用于 rpc調(diào)用過程中 阻塞等待
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k);
// 不超時等待
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
// 超時等待
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
}打開新的 channel 邏輯比較簡單:主要是和 rabbitmq broker 進行 rpc 調(diào)用:「客戶端發(fā)送 Channel.Open,broker 接收到后返回 Channel.OpenOk,這個過程完成后 創(chuàng)建通道完成,也就可以進行后續(xù)的 channel使用」。
最后
本次分享 RabbitMQ Client 與 RabbitMQ Broker 根據(jù) AMQP 協(xié)議交互流程中 根據(jù) channel 源碼進行分析,其中還有很多 channel 源碼細節(jié)感興趣的讀者可以進行深入了解。
當前名稱:RabbitMQ客戶端源碼系列-Channel
本文URL:http://m.5511xx.com/article/cdogcpi.html


咨詢
建站咨詢
