日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
消息服務(wù):項(xiàng)目整合RocketMQ

在《??SpringCloud Alibaba實(shí)戰(zhàn)??》專欄前面的文章中,我們實(shí)現(xiàn)了用戶微服務(wù)、商品微服務(wù)和訂單微服務(wù)之間的遠(yuǎn)程調(diào)用,并且實(shí)現(xiàn)了服務(wù)調(diào)用的負(fù)載均衡。也基于阿里開源的Sentinel實(shí)現(xiàn)了服務(wù)的限流與容錯(cuò),并詳細(xì)介紹了Sentinel的核心技術(shù)與配置規(guī)則。

成都創(chuàng)新互聯(lián)長(zhǎng)期為數(shù)千家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為吉陽(yáng)企業(yè)提供專業(yè)的做網(wǎng)站、成都網(wǎng)站建設(shè),吉陽(yáng)網(wǎng)站改版等技術(shù)服務(wù)。擁有10多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

簡(jiǎn)單介紹了服務(wù)網(wǎng)關(guān),并對(duì)SpringCloud Gateway的核心架構(gòu)進(jìn)行了簡(jiǎn)要說明,也在項(xiàng)目中整合了SpringCloud Gateway網(wǎng)關(guān)實(shí)現(xiàn)了通過網(wǎng)關(guān)訪問后端微服務(wù)。

同時(shí),也基于SpringCloud Gateway整合Sentinel實(shí)現(xiàn)了網(wǎng)關(guān)的限流功能,詳細(xì)介紹了SpringCloud Gateway網(wǎng)關(guān)的核心技術(shù)。在鏈路追蹤章節(jié),我們開始簡(jiǎn)單介紹了分布式鏈路追蹤技術(shù)與解決方案,隨后在項(xiàng)目中整合Sleuth實(shí)現(xiàn)了鏈路追蹤,并使用Sleuth整合ZipKin實(shí)現(xiàn)了分布式鏈路追蹤的可視化 。

在消息服務(wù)章節(jié),我們介紹了MQ的使用場(chǎng)景,引入MQ后的注意事項(xiàng)以及MQ的選型對(duì)比。接下來,我們就在項(xiàng)目中整合RocketMQ。

本章總覽?

RocketMQ環(huán)境準(zhǔn)備?

RocketMQ是阿里開源的消息中間件,目前是Apache下的頂級(jí)項(xiàng)目。正式在項(xiàng)目中接入RocketMQ之前,我們需要搭建RocketMQ的環(huán)境。這里呢,我把搭建RocketMQ的基礎(chǔ)環(huán)境分為兩個(gè)部分:搭建RocketMQ環(huán)境和搭建RocketMQ控制臺(tái)。

「注意:冰河這里都是先下載RocketMQ的源碼和RocketMQ控制臺(tái)的源碼,然后對(duì)源碼進(jìn)行編譯后,再搭建的。目的也是讓小伙伴們能夠跟著冰河實(shí)現(xiàn)手動(dòng)編譯RocketMQ的源碼,另外,編譯RocketMQ源碼和控制臺(tái)源碼需要JDK1.8+Maven?!?/p>

源碼編譯安裝RocketMQ

(1)到鏈接https://github.com/apache/rocketmq/releases/tag/rocketmq-all-4.9.3下載RocketMQ 4.9.3版本的源碼。下載并解壓后的源碼如下所示。

(2)打開cmd命令行,進(jìn)入RocketMQ的解壓目錄,我這里是E:\Application\RocketMQ\rocketmq-rocketmq-all-4.9.3目錄,然后在cmd命令行輸入如下命令開始編譯打包。

mvn clean install -Dmaven.test.skip=true -Prelease-all

編譯過程如下所示。

編譯打包成功后,如下圖所示。

(3)編譯成功后,會(huì)在RocketMQ解壓目錄下的distribution目錄下的target目錄下生成RocketMQ的安裝包,在我電腦上的目錄就是:E:\Application\RocketMQ\rocketmq-rocketmq-all-4.9.3\distribution\target。如下所示。

這樣,我們就自己下載RocketMQ的源碼,并打包成功了。

注意:這里,為了方便,我還是將RocketMQ部署到我本機(jī)Windows操作系統(tǒng)上,小伙伴們也可以將之前的Nacos、Sentinel和這次的RocketMQ都部署在Linux操作系統(tǒng)上,部署方式幾乎與在Windows操作系統(tǒng)一樣,這里,冰河就不再贅述了?!?/p>

(4)將編譯出的安裝包,解壓到電腦的某個(gè)目錄下,例如我解壓后的目錄為:E:\Application\microservices\RocketMQ\rocketmq-4.9.3。

(5)在RocketMQ的解壓目錄下的conf目錄下修改broker.conf文件,修改后的文件內(nèi)容如下所示。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 自動(dòng)創(chuàng)建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存儲(chǔ)路徑
storePathRootDir=E:/RocketMQ/data/rocketmq/dataDir
# commitLog路徑
storePathCommitLog=E:/RocketMQ/data/rocketmq/dataDir/commitlog
# 消息隊(duì)列存儲(chǔ)路徑
storePathConsumeQueue=E:/RocketMQ/data/rocketmq/dataDir/consumequeue
# 消息索引存儲(chǔ)路徑
storePathIndex=E:/RocketMQ/data/rocketmq/dataDir/index
# checkpoint文件路徑
storeCheckpoint=E:/RocketMQ/data/rocketmq/dataDir/checkpoint
# abort文件存儲(chǔ)路徑
abortFile=E:/RocketMQ/data/rocketmq/dataDir/abort

小伙伴們可以根據(jù)自己的實(shí)際情況,自行修改上述文件中配置的目錄地址。

(6)非常重要的一步,在啟動(dòng)RocketMQ之前,需要配置下ROCKETMQ_HOME環(huán)境變量,否則在啟動(dòng)RocketMQ的時(shí)候,會(huì)提示如下錯(cuò)誤信息。

E:\Application\microservices\RocketMQ\rocketmq-4.9.3\bin>mqnamesrv.cmd
Please set the ROCKETMQ_HOME variable in your environment!

「提示:設(shè)置ROCKETMQ_HOME環(huán)境變量?!?/strong>?

接下來,就在系統(tǒng)環(huán)境變量中,設(shè)置下ROCKETMQ_HOME的環(huán)境變量,如下所示。

(7)配置完RocketMQ的環(huán)境變量后,打開cmd命令行,進(jìn)入RocketMQ的bin目錄,例如,我電腦的目錄是:E:\Application\microservices\RocketMQ\rocketmq-4.9.3\bin。執(zhí)行??mqnamesrv.cmd??命令啟動(dòng)NameServer,如下所示。

打印出如下信息,說明RocketMQ的NameServer啟動(dòng)成功了。

The Name Server boot success. serializeType=JSON

(8)重新打開一個(gè)cmd命令行,進(jìn)入RocketMQ的bin目錄,輸入??mqbroker.cmd -n localhost:9876??命令啟動(dòng)RocketMQ的Broker服務(wù),如下所示。

打印出如下信息,說明RocketMQ的Broker服務(wù)啟動(dòng)成功了。

boot success. serializeType=JSON and name server is localhost:9876

測(cè)試RocketMQ環(huán)境

RocketMQ內(nèi)置了大量的測(cè)試案例,并且這些測(cè)試案例可以通過RocketMQ的bin目錄下的tools.cmd命令進(jìn)行測(cè)試。接下來,我們就使用RocketMQ自帶的tools.cmd命令測(cè)試RocketMQ的環(huán)境。

(1)啟動(dòng)生產(chǎn)者程序向RocketMQ發(fā)送消息。

重新打開cmd命令行,進(jìn)入RocketMQ的bin目錄,在命令行輸入如下命令調(diào)用RocketMQ自帶的生產(chǎn)者程序向RocketMQ發(fā)送消息。

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer

可以看到,執(zhí)行完上述兩條命令后,生產(chǎn)者程序開始向RocketMQ發(fā)送消息。

(2)啟動(dòng)消費(fèi)者程序消費(fèi)RocketMQ中的消息。

重新打開cmd命令行,進(jìn)入RocketMQ的bin目錄,在命令行輸入如下命令調(diào)用RocketMQ自帶的消費(fèi)者程序消費(fèi)RocketMQ中的消息。

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer

可以看到,執(zhí)行完上述兩條命令后,消費(fèi)者程序開始消費(fèi)RocketMQ中的消息。

說明我們使用源碼編譯搭建RocketMQ環(huán)境成功了。

源碼編譯RocketMQ控制臺(tái)

這里需要注意的是:RocketMQ控制臺(tái)本質(zhì)上是一個(gè)SpringBoot程序,啟動(dòng)后默認(rèn)監(jiān)聽的端口是8080。RocketMQ的新版控制臺(tái)已經(jīng)從RocketMQ的rocketmq-externals項(xiàng)目中分離出來了。也就是說,新版的RocketMQ控制臺(tái)已經(jīng)從https://github.com/apache/rocketmq-externals鏈接所示的項(xiàng)目中分離出來,新版控制臺(tái)的鏈接地址為:https://github.com/apache/rocketmq-dashboard。

(1)從鏈接https://github.com/apache/rocketmq-dashboard下載新版的RocketMQ控制臺(tái)源碼。下載后解壓。

(2)進(jìn)入到RocketMQ控制臺(tái)源碼解壓目錄的src/main/resources目錄下,編輯application.yml文件,修改??namesrvAddrs??地址,去掉多余的namesrvAddrs地址。

application.yml文件中原來的配置如下所示。

rocketmq:
config:
# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876
# configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs:
- 127.0.0.1:9876
- 127.0.0.2:9876

將127.0.0.2:9876刪除或者注釋掉,如下所示。

rocketmq:
config:
# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876
# configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs:
- 127.0.0.1:9876
# - 127.0.0.2:9876

RocketMQ控制臺(tái)啟動(dòng)時(shí)默認(rèn)監(jiān)聽的端口是8080,由于我們項(xiàng)目中訂單微服務(wù)監(jiān)聽的端口也是8080,所以,將RocketMQ控制臺(tái)監(jiān)聽的端口修改為10003,修改前的配置如下所示。

server:
port: 8080

修改后的配置如下所示。

server:
port: 10003

(3)修改完application.yml文件后,打開cmd命令行,進(jìn)入RocketMQ控制臺(tái)源碼的根目錄,輸入如下Maven命令開始編譯RocketMQ控制臺(tái)的源碼。

mvn clean install -Dmaven.test.skip=true

編譯過程如下所示。

(4)編譯完成后,會(huì)在RocketMQ控制臺(tái)源碼的根目錄下生成target目錄,如下所示。

進(jìn)入target目錄后,可以看到生成了rocketmq-dashboard-1.0.1-SNAPSHOT.jar文件,如下所示。

這個(gè)jar文件就是RocketMQ控制臺(tái)的運(yùn)行文件。

(5)重新打開cmd命令行,進(jìn)入rocketmq-dashboard-1.0.1-SNAPSHOT.jar文件所在的命令,在命令行直接輸入如下命令啟動(dòng)RocketMQ控制臺(tái)程序。

java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

驗(yàn)證RocketMQ控制臺(tái)

在瀏覽器中輸入??http://localhost:10003??后,出現(xiàn)如下畫面說明RocketMQ啟動(dòng)成功。

界面默認(rèn)是英文,我們也可以點(diǎn)擊右上角的??changeLanguage??切換語(yǔ)言,切換成中文顯示,如下所示。

選擇主題菜單想后如下所示。

可以看到目前RocketMQ中存在一個(gè)名稱為TopicTest的主題,點(diǎn)擊TopicTest主題的狀態(tài)按鈕,如下所示。

會(huì)顯示TopicTest主題的消息隊(duì)列信息,如下所示。

可以看到,正確顯示出了TopicTest主題的消息隊(duì)列信息,說明RocketMQ控制臺(tái)啟動(dòng)成功了。

編碼測(cè)試RocketMQ?

我們使用RocketMQ自帶的生產(chǎn)者和消費(fèi)者程序?qū)崿F(xiàn)了消息的生成與消費(fèi),為了讓小伙伴們能夠更加直觀的感受到消息中間件在項(xiàng)目中的作用,接下來,我們自己編碼測(cè)試下RocketMQ。

導(dǎo)入RocketMQ依賴

在用戶微服務(wù)shop-user的pom.xml中,添加RocketMQ相關(guān)的依賴,如下所示。


org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3


org.apache.rocketmq
rocketmq-client
4.5.2

編寫生產(chǎn)者代碼

在用戶微服務(wù)的sec/test/java目錄下新建??io.binghe.shop.rocketmq.test??包,在包下創(chuàng)建RocketMQProducer類,作為RocketMQ的生產(chǎn)者,代碼如下所示。

/**
* @author binghe
* @version 1.0.0
* @description RocketMQ生產(chǎn)者
*/
public class RocketMQProducer {

public static void main(String[] args) throws Exception {
//創(chuàng)建消息生產(chǎn)者
DefaultMQProducer producer = new DefaultMQProducer("bingheProducerGroup");
//設(shè)置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
//啟動(dòng)生產(chǎn)者
producer.start();
//構(gòu)建消息對(duì)象
Message message = new Message("bingheTopic", "bingheTag", "Hello RocketMQ".getBytes());
System.out.println("生產(chǎn)者發(fā)出的消息為:" + JSONObject.toJSONString(message));
//發(fā)送消息并接收結(jié)果
SendResult sendResult = producer.send(message);
//打印結(jié)果信息
System.out.println("生產(chǎn)者收到的發(fā)送結(jié)果信息為:" + JSONObject.toJSONString(sendResult));
//關(guān)閉生產(chǎn)者
producer.shutdown();
}
}

生產(chǎn)者的代碼比較簡(jiǎn)單,這里就不再贅述了。

編寫消費(fèi)者代碼

在??io.binghe.shop.rocketmq.test??包下新建RocketMQConsumer類,作為RocketMQ的消費(fèi)者,代碼如下所示。

/**
* @author binghe
* @version 1.0.0
* @description RocketMQ消費(fèi)者
*/
public class RocketMQConsumer {
public static void main(String[] args) throws Exception {
try{
//創(chuàng)建消息消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("bingheConsumerGroup");
//設(shè)置NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//訂閱bingheTopic主題
consumer.subscribe("bingheTopic", "*");
//設(shè)置消息監(jiān)聽,當(dāng)收到消息時(shí)RocketMQ會(huì)回調(diào)消息監(jiān)聽
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//打印消息消費(fèi)者收到的RocketMQ消息
System.out.println("消費(fèi)者收到的消息為:" + list);
//返回消息消費(fèi)成功的標(biāo)識(shí)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動(dòng)消費(fèi)者
consumer.start();
System.out.println("消費(fèi)者啟動(dòng)成功");
}catch (Exception e){
e.printStackTrace();
}
}
}

測(cè)試消息的生產(chǎn)與消費(fèi)

(1)為了便于觀察,這里我們先啟動(dòng)消費(fèi)者程序RocketMQConsumer,啟動(dòng)RocketMQConsumer后會(huì)在IDEA的控制臺(tái)打印如下信息。

消費(fèi)者啟動(dòng)成功

說明消費(fèi)者啟動(dòng)成功了。

(2)運(yùn)行生產(chǎn)者程序RocketMQProducer,運(yùn)行后RocketMQProducer程序控制臺(tái)會(huì)輸出如下信息。

生產(chǎn)者發(fā)出的消息為:{"body":"SGVsbG8gUm9ja2V0TVE=","delayTimeLevel":0,"flag":0,"properties":{"WAIT":"true","TAGS":"bingheTag"},"tags":"bingheTag","topic":"bingheTopic","waitStoreMsgOK":true}
生產(chǎn)者收到的發(fā)送結(jié)果信息為:{"messageQueue":{"brokerName":"DESKTOP-PSKC7T1","queueId":1,"topic":"bingheTopic"},"msgId":"C0A8006F538418B4AAC25B9EDDAC0000","offsetMsgId":"C0A8B80100002A9F0000000000036B16","queueOffset":2,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

說明生產(chǎn)者程序RocketMQProducer成功將消息發(fā)送到RocketMQ。

(3)接下來,再看下消費(fèi)者程序RocketMQConsumer的控制臺(tái),如下所示。

消費(fèi)者收到的消息為:[MessageExt [queueId=1, storeSize=206, queueOffset=2, sysFlag=0, bornTimestamp=1652871538093, bornHost=/192.168.184.1:52915, storeTimestamp=1652871538099, storeHost=/192.168.184.1:10911, msgId=C0A8B80100002A9F0000000000036B16, commitLogOffset=224022, bodyCRC=1774740973, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='bingheTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1652871538103, UNIQ_KEY=C0A8006F538418B4AAC25B9EDDAC0000, CLUSTER=DefaultCluster, WAIT=true, TAGS=bingheTag}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId='null'}]]

說明生成者發(fā)送到RocketMQ的消息,被消費(fèi)者成功消費(fèi)到了。

項(xiàng)目整合RocketMQ?

我們?cè)陧?xiàng)目中模擬一個(gè)用戶成功下單后,為用戶發(fā)送通知,通知用戶下單成功的邏輯,具體的流程就是下單成功后將訂單的信息發(fā)送到RocketMQ,然后用戶微服務(wù)訂閱RocketMQ的消息,接收到消息后進(jìn)行打印。

用戶微服務(wù)整合RocketMQ

(1)編碼測(cè)試RocketMQ時(shí),導(dǎo)入了RocketMQ的依賴,這里就不用再次導(dǎo)入了。

(2)在用戶微服務(wù)shop-user的application.yml文件中添加如下RocketMQ的配置。

rocketmq:
name-server: 127.0.0.1:9876

(3)在用戶微服務(wù)shop-user中創(chuàng)建??io.binghe.shop.user.rocketmq??包,在包下創(chuàng)建RocketConsumeListener,實(shí)現(xiàn)org.apache.rocketmq.spring.core.RocketMQListener接口,具體代碼如下所示。

/**
* @author binghe
* @version 1.0.0
* @description 監(jiān)聽消費(fèi)
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "user-group", topic = "order-topic")
public class RocketConsumeListener implements RocketMQListener {
@Override
public void onMessage(Order order) {
log.info("用戶微服務(wù)收到了訂單信息:{}", JSONObject.toJSONString(order));
}
}

其中,RocketConsumeListener類上的@RocketMQMessageListener注解,表示當(dāng)前類是一個(gè)RocketMQ的消費(fèi)者,在@RocketMQMessageListener注解中配置了消費(fèi)者組為user-group,主題為order-topic。

至此,用戶微服務(wù)整合RocketMQ完畢。

訂單微服務(wù)整合RocketMQ

(1)在訂單微服務(wù)shop-order的pom.xml文件中添加RocketMQ的依賴,如下所示。


org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3



org.apache.rocketmq
rocketmq-client
4.5.2

(2)在訂單微服務(wù)shop-order的application.yml文件中添加如下配置。

rocketmq:
name-server: 127.0.0.1:9876
producer:
group: order-group

(3)將??io.binghe.shop.order.service.impl.OrderServiceV6Impl???類,復(fù)制一份成??io.binghe.shop.order.service.impl.OrderServiceV7Impl???類,接下來,在??io.binghe.shop.order.service.impl.OrderServiceV7Impl??類中操作。

將??io.binghe.shop.order.service.impl.OrderServiceV7Impl??類上的@Service注解中的名稱修改為orderServiceV7,如下所示。

@Slf4j
@Service("orderServiceV7")
public class OrderServiceV7Impl implements OrderService {
//省略具體代碼
}

(4)在??io.binghe.shop.order.service.impl.OrderServiceV7Impl??類中,注入RocketMQTemplate對(duì)象,如下所示。

@Autowired
private RocketMQTemplate rocketMQTemplate;

(5)在??io.binghe.shop.order.service.impl.OrderServiceV7Impl#saveOrder()??方法中,提交訂單成功后將訂單信息寫入RocketMQ,如下所示。

@Override
@Transactional(rollbackFor = Exception.class)
public void saveOrder(OrderParams orderParams) {
//省略上面所有代碼
rocketMQTemplate.convertAndSend("order-topic", order);
}

(6)在??io.binghe.shop.order.controller.OrderController??中,將注入的OrderService的名稱修改成orderServiceV7,如下所示。

@Autowired
@Qualifier(value = "orderServiceV7")
private OrderService orderService;

「注意:訂單微服務(wù)shop-order中,修改后的代碼見源碼工程,冰河在這里不再粘貼完整的源代碼?!?/p>

測(cè)試項(xiàng)目整合的RocketMQ

(1)分別啟動(dòng)Nacos,Sentinel,ZipKin和RocketMQ。

(2)分別啟動(dòng)用戶微服務(wù)、商品微服務(wù)、訂單微服務(wù)和網(wǎng)關(guān)服務(wù)。

(3)在瀏覽器中輸入??localhost:10001/server-order/order/submit_order?userId=1001&productId=1001&count=1??,如下所示。

(4)查看用戶微服務(wù)shop-user的控制臺(tái),發(fā)現(xiàn)會(huì)輸出訂單的信息,如下所示。

2022-05-18 20:37:26.440  INFO [server-user,,,] 18064 --- [MessageThread_1] i.b.s
                                                本文題目:消息服務(wù):項(xiàng)目整合RocketMQ                                                
鏈接URL:http://m.5511xx.com/article/cccihed.html