日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
RabbitMQ客戶端源碼系列-Channel

前言

續(xù)上次分享 RabbitMQ 客戶端源碼系列 - Connection ,繼續(xù)分享Channel相關(guān)的源碼分析 (com.rabbitmq:amqp-client:4.8.3)。

創(chuàng)新互聯(lián)專注于多倫企業(yè)網(wǎng)站建設(shè),成都響應(yīng)式網(wǎng)站建設(shè),成都商城網(wǎng)站開發(fā)。多倫網(wǎng)站建設(shè)公司,為多倫等地區(qū)提供建站服務(wù)。全流程定制網(wǎng)站建設(shè),專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)

友情提醒:本次分享適合的人群,需要對 RabbitMQ 有一定的了解

Channels

https://www.rabbitmq.com/channels.html。

RabbitMQ client Demo

基于上次 Java Client Connecting to RabbitMQ Demo 針對 RabbitMQ Channel 繼續(xù)深入分析。

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

Connection conn = factory.newConnection();
//本次重點分析內(nèi)容
Channel channel = connection.createChannel();

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
channel.close();
connection.close();

AMQP 協(xié)議交互 -- Channel

可以看到簡單地調(diào)用了 Channel channel = connection.createChannel(); 方法創(chuàng)建Channel,以及可以看到 Channel 相應(yīng)的 AMQP 協(xié)議交互:「客戶端發(fā)送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客戶端 創(chuàng)建通道;broker 收到并創(chuàng)建通道完成)」。

整個 AMQP 協(xié)議的交互流程(172.30.0.74 為客戶端即本機 ip;192.168.17.160 為 RabbitMQ Broker 的 ip)

RabbitMQ client 緩存模式為 Channel

本次分析 RabbitMQ client 采用緩存模式為 Channel:一個 Connection 對應(yīng)多個 Channel(默認情況下 2048個 channel,其中一個是特殊 channel0)

  • 「Connection」:主要用于AMQP協(xié)議解析,信道復(fù)用。
  • 「Channel」:路由、安全性、協(xié)調(diào)。
  • 「Queue」:內(nèi)存中的消息,永久隊列索引(在 channel 與 queue 之間還有一個 exchange作為交換機,此處就不展開說了)。

RabbitMQ client CacheMode為 Channel模式

channel 源碼分析

上面簡單地介紹 AMQP 協(xié)議交互流程中 Channel 連接、Connection 與 Channel的關(guān)系。

開始本次主要介紹 Channel 以及涉及到 Connection 相關(guān)的源碼,從connection.createChannel開始深入分析。

/** Public API - {@inheritDoc} */
@Override
public Channel createChannel() throws IOException {
// 確認 connection 為打開的狀態(tài)
ensureIsOpen();
// 管理channel
ChannelManager cm = _channelManager;
if (cm == null) return null;
// 創(chuàng)建 channel 核心的方法
Channel channel = cm.createChannel(this);
// 用于暴露指標
metricsCollector.newChannel(channel);
return channel;
}

可以看到 channel 由 connection 調(diào)用并管理:

  • ensureIsOpen() -- 確認 connection 為打開的狀態(tài),邏輯比較簡單判斷 shutdownCause 為空即可(connection關(guān)閉的話,shutdownCause同時會附帶指示關(guān)閉的情況)。
  • channelManager -- 統(tǒng)一由 connection 進行初始化及管理,在之前connection與broker 創(chuàng)建連接交互(Connection.Tune --> Connection.TuneOk)中初始化完成,默認 ChannelMax 為 2047 (2048 - 1,這個1對應(yīng)的特殊的 channel0 )。

重點看下 channelManager.createChannel(this) 邏輯。

public ChannelN createChannel(AMQConnection connection) throws IOException {
ChannelN ch;
// 該 monitor 主要監(jiān)控 _channelMap 和 channelNumberAllocator
synchronized (this.monitor) {
// 獲取 channel 分配的編號
int channelNumber = channelNumberAllocator.allocate();
if (channelNumber == -1) {
return null;
} else {
// 新增新的 channel
ch = addNewChannel(connection, channelNumber);
}
}
// 將新增的 channel 打開
ch.open(); // now that it's been safely added
return ch;
}

channelManager 管理著 channel 的創(chuàng)建連接釋放等:

  • synchronized (this.monitor) -- 首先獲取 channelManager 的 monitor 鎖,防止多線程并發(fā)操作。
  • channelNumberAllocator.allocate -- 獲取范圍內(nèi)未被分配的 channelNumber,返回 -1 則認為不可再分配新的 channel,內(nèi)部主要的邏輯由 BitSet 實現(xiàn)的(感興趣的可以了解下)。

后續(xù)重點分析 addNewChannel 和 open 邏輯。

private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
// 判重
if (_channelMap.containsKey(channelNumber)) {
// That number's already allocated! Can't do it
// This should never happen unless something has gone
// badly wrong with our implementation.
throw new IllegalStateException("We have attempted to "
+ "create a channel with a number that is already in "
+ "use. This should never happen. "
+ "Please report this as a bug.");
}
// 構(gòu)建
ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
// 放入 _channelMap 統(tǒng)一管理
_channelMap.put(ch.getChannelNumber(), ch);
return ch;
}
public ChannelN(AMQConnection connection, int channelNumber,
ConsumerWorkService workService, MetricsCollector metricsCollector) {
// AMQChannel 構(gòu)造函數(shù)
super(connection, channelNumber);
// 構(gòu)建 消費分配器
this.dispatcher = new ConsumerDispatcher(connection, this, workService);
this.metricsCollector = metricsCollector;
}

這塊邏輯比較簡單,執(zhí)行 instantiateChannel 構(gòu)建和初始化 channel,主要涉及到 連接、channel編號、超時時間、dispatcher等等,每一個 channel 都擁有一個 dispatcher,但是 「連接和線程池」 是與同一個 connection 共享。

最終獲取到新創(chuàng)建的 channel,進行打開 ch.open()。

public void open() throws IOException {
// 對rabbitmq broker 發(fā)送Channel.Open,并等待broker返回 Channel.OpenOk
exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
}


public AMQCommand exnWrappingRpc(Method m)
throws IOException
{
try {
// 針對該方法進行rpc調(diào)用
return privateRpc(m);
} catch (AlreadyClosedException ace) {
// Do not wrap it since it means that connection/channel
// was closed in some action in the past
throw ace;
} catch (ShutdownSignalException ex) {
throw wrap(ex);
}
}


private AMQCommand privateRpc(Method m)
throws IOException, ShutdownSignalException
{
// 用于 rpc調(diào)用過程中 阻塞等待
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k);

// 不超時等待
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
// 超時等待
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
}

打開新的 channel 邏輯比較簡單:主要是和 rabbitmq broker 進行 rpc 調(diào)用:「客戶端發(fā)送 Channel.Open,broker 接收到后返回 Channel.OpenOk,這個過程完成后 創(chuàng)建通道完成,也就可以進行后續(xù)的 channel使用」。

最后

本次分享 RabbitMQ Client 與 RabbitMQ Broker 根據(jù) AMQP 協(xié)議交互流程中 根據(jù) channel 源碼進行分析,其中還有很多 channel 源碼細節(jié)感興趣的讀者可以進行深入了解。


當前名稱:RabbitMQ客戶端源碼系列-Channel
本文URL:http://m.5511xx.com/article/cdogcpi.html