日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線(xiàn)溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
下一代MQ中間件,不來(lái)了解下?

哈嘍,大家好,我是指北君。

創(chuàng)新互聯(lián)公司專(zhuān)注于察哈爾右翼前企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站建設(shè),商城開(kāi)發(fā)。察哈爾右翼前網(wǎng)站建設(shè)公司,為察哈爾右翼前等地區(qū)提供建站服務(wù)。全流程按需求定制網(wǎng)站,專(zhuān)業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)公司專(zhuān)業(yè)和態(tài)度為您提供的服務(wù)

最近項(xiàng)目中準(zhǔn)備使用消息中間件Apache Pulsar,借著機(jī)會(huì)先做個(gè)簡(jiǎn)單了解吧。

Apache Pulsar

Apache Pulsar是Apache軟件基金會(huì)頂級(jí)項(xiàng)目,是下一代云原生分布式消息流平臺(tái)。

Pulsar 作為下一代云原生分布式消息流平臺(tái),支持多租戶(hù)、持久化存儲(chǔ)、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐以及低延時(shí)的高可擴(kuò)展流數(shù)據(jù)存儲(chǔ)特性, 內(nèi)置諸多其他系統(tǒng)商業(yè)版本才有的特性,是云原生時(shí)代解決實(shí)時(shí)消息流數(shù)據(jù)傳輸、存儲(chǔ)和計(jì)算的最佳解決方案。

Pulsar簡(jiǎn)介

  • 系統(tǒng)架構(gòu)

  • 功能特色
    租戶(hù)和命名空間(namespace)是 Pulsar 支持多租戶(hù)的兩個(gè)核心概念。在租戶(hù)級(jí)別,Pulsar 為特定的租戶(hù)預(yù)留合適的存儲(chǔ)空間、應(yīng)用授權(quán)與認(rèn)證機(jī)制。在命名空間級(jí)別,Pulsar 有一系列的配置策略(policy),包括存儲(chǔ)配額、流控、消息過(guò)期策略和命名空間之間的隔離策略。
    Pulsar 做了隊(duì)列模型和流模型的統(tǒng)一,在 Topic 級(jí)別只需保存一份數(shù)據(jù),同一份數(shù)據(jù)可多次消費(fèi)。以流式、隊(duì)列等方式計(jì)算不同的訂閱模型大大提升了靈活度。
    Pulsar 使用計(jì)算與存儲(chǔ)分離的云原生架構(gòu),數(shù)據(jù)從 Broker 搬離,存在共享存儲(chǔ)內(nèi)部。上層是無(wú)狀態(tài) Broker,復(fù)制消息分發(fā)和服務(wù);下層是持久化的存儲(chǔ)層 Bookie 集群。Pulsar 存儲(chǔ)是分片的,這種構(gòu)架可以避免擴(kuò)容時(shí)受限制,實(shí)現(xiàn)數(shù)據(jù)的獨(dú)立擴(kuò)展和快速恢復(fù)。
    Pulsar 原生支持跨地域復(fù)制,因此 Pulsar 可以跨不同地理位置的數(shù)據(jù)中心復(fù)制數(shù)據(jù)。當(dāng)數(shù)據(jù)中心中斷或網(wǎng)絡(luò)分區(qū)時(shí),在多個(gè)數(shù)據(jù)中心存有消息副本尤為重要,提高可用性。
    Pulsar Functions 是基于 Pulsar 的輕量級(jí)流處理方式。Pulsar Functions 直接部署在 broker 節(jié)點(diǎn)上(或作為 Kubernetes 集群中的容器)。通過(guò) Pulsar Functions,Pulsar 可以直接解決許多流處理任務(wù),簡(jiǎn)化操作。?
  • 支持客戶(hù)端

Java 客戶(hù)端

C++ 客戶(hù)端

.Net/C# 客戶(hù)端

Go 客戶(hù)端

NodeJS 客戶(hù)端

Ruby 客戶(hù)端

Pulsar安裝與部署

目前Pulsar不支持Window,下面通過(guò)Docker進(jìn)行安裝,可以參考官網(wǎng)??https://pulsar.apache.org/docs/next/getting-started-docker/??

同時(shí)可以安裝Pulsar Manager,具體操作可以參考官方文檔 ??https://pulsar.apache.org/docs/next/administration-pulsar-manager/??

其中Pulsar Manager 是一個(gè)網(wǎng)頁(yè)式可視化管理與監(jiān)測(cè)工具,支持多環(huán)境下的動(dòng)態(tài)配置??捎糜诠芾砗捅O(jiān)測(cè)租戶(hù)、命名空間、topic、訂閱、broker、集群等。

  1. window環(huán)境使用docker推薦使用Docker Desktop,和linux一樣可以通過(guò)docker命令管理鏡像、部署容器等操作。

打開(kāi)并啟動(dòng)Docker Desktop后,在終端執(zhí)行命令執(zhí)行

_> docker search pulsar

可以查詢(xún)到pulsar相關(guān)的鏡像

  1. 鏡像下載

這里我們選擇分別下載紅框的兩個(gè)鏡像,執(zhí)行命令

_> docker pull apachepulsar/pulsar _> docker pull apachepulsar/pulsar-manager

  1. 啟動(dòng)
  • 啟動(dòng)Pulsar
docker run -it -p 6650:6650 -p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar bin/pulsar standalone
  • 啟動(dòng)Pulsar Manager
docker run --name pulsar-manager -dit \
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
apachepulsar/pulsar-manager

添加用戶(hù):

for /f "tokens=1" %A in ('curl http://localhost:7750/pulsar-manager/csrf-token') do set CSRF_TOKEN=%A
curl -X PUT "X-XSRF-TOKEN: %CSRF_TOKEN%" -H "Cookie: XSRF-TOKEN=%CSRF_TOKEN%;"
-H "Content-Type: application/json" -d "{\"name\": \"admin\", \"password\": \"123456\", \"description\": \"super user admin\", \"email\": \"admin@test.com\"}"
"http://localhost:7750/pulsar-manager/users/superuser"

訪(fǎng)問(wèn):

http://localhost:9527/ 
用戶(hù)名密碼:admin/123456

配置environments:

這里需要保證Pulsar Manager應(yīng)用服務(wù)能夠訪(fǎng)問(wèn)到Pulsar應(yīng)用,由于都是通過(guò)Docker部署,配置Service URL需要使用網(wǎng)絡(luò)IP,不要用localhost。

管理界面:

Pulsar與SpringBoot集成

  • springboot version : 2.3.7.RELEASE
  • pulsar client: 2.10.2
  1. 通過(guò)Properties簡(jiǎn)單定義一些Broker相關(guān)的屬性
@Data
@ConfigurationProperties(prefix = "pulsar")
public class PulsarProperties {

private String cluster;

private String namespace;

private String serverUrl;

private String token;
}
  1. 通過(guò)配置定義了一些常用的組件,比如生產(chǎn)、消費(fèi)工廠(chǎng)
@Configuration
@EnableConfigurationProperties({PulsarProperties.class})
public class PulsarBootstrapConfiguration {

private final PulsarProperties properties;

public PulsarBootstrapConfiguration(PulsarProperties properties) {
this.properties = properties;
}

@Bean(destroyMethod = "close")
public PulsarClient pulsarClient() throws PulsarClientException {
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(properties.getServerUrl());
return clientBuilder.build();
}

@Bean
public PulsarProducerFactory pulsarProducerFactory() throws PulsarClientException {
return new PulsarProducerFactory(pulsarClient(), properties);
}

@Bean
public PulsarConsumerFactory pulsarConsumerFactory() throws PulsarClientException {
return new PulsarConsumerFactory(pulsarClient(), properties);
}

}
  1. 啟動(dòng)服務(wù),在服務(wù)啟動(dòng)后,通過(guò)實(shí)現(xiàn)SmartInitializingSingleton接口,完成容器基本啟動(dòng)(不包含Lazy的Bean)后,開(kāi)始對(duì)消費(fèi)者Consumer監(jiān)聽(tīng)
@Slf4j
@SpringBootApplication
public class PulsarApplication implements SmartInitializingSingleton {

@Autowired
private PulsarConsumerFactory consumerFactory;

public static void main(String[] args) {
SpringApplication.run(PulsarApplication.class,args);
}

@Override
public void afterSingletonsInstantiated() {
startConsumerListener();
}

private void startConsumerListener(){
Consumer consumer = createConsumer();
if( consumer != null ){
while (!Thread.currentThread().isInterrupted()){
CompletableFuture> completableFuture = consumer.receiveAsync();
Message message = null;
try {
message = completableFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("錯(cuò)誤",e);
} catch (ExecutionException e) {
log.error("錯(cuò)誤",e);
}

if( message!=null ){
try {
log.info(" 接收消息:{} ", message.getValue() );
consumer.acknowledge(message);
} catch (PulsarClientException e) {
consumer.negativeAcknowledge(message);
throw new RuntimeException(e);
}
}
}
}
}

private Consumer createConsumer() {
try {
return consumerFactory.getConsumer(Constants.TOPIC_DEMO);
} catch (PulsarClientException e) {
log.error("創(chuàng)建consumer出錯(cuò):{}", e.getMessage(),e);
}
return null;
}
}
  1. 消息發(fā)送測(cè)試
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class PulsarBootTests {

@Autowired
private PulsarProducerFactory producerFactory;

@Test
public void sendMessage() throws PulsarClientException {
Producer producer = producerFactory.getProducer(Constants.TOPIC_DEMO);

producer.send(" 測(cè)試消息: " + new Date());

producer.close();
}

}
  1. 檢查消息接收情況
2023-02-05 12:05:14.043  INFO 23472 --- [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [TOPIC_DEMO] [sub-TOPIC_DEMO] [7c2b2] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2023-02-05 12:06:16.425 INFO 23472 --- [ main] com.sucl.pulsar.PulsarApplication : 接收消息: 測(cè)試消息: Sun Feb 05 12:06:16 CST 2023

結(jié)束語(yǔ)

該篇主要通過(guò)官網(wǎng)對(duì)Apache Pulsar做了簡(jiǎn)單的了解與嘗試,同時(shí)基于SpringBoot,以簡(jiǎn)單的示例代碼實(shí)現(xiàn)了消息的發(fā)送與接收,其中各個(gè)組件僅僅使用了默認(rèn)的配置,在生產(chǎn)環(huán)境需要根據(jù)Pulsar的特性以及官方API使其具有擴(kuò)展性與易用性。


本文名稱(chēng):下一代MQ中間件,不來(lái)了解下?
地址分享:http://m.5511xx.com/article/cogcgie.html