日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
SpringBoot整合Kafka實(shí)現(xiàn)數(shù)據(jù)高吞吐

SpringBoot 整合 Kafka 實(shí)現(xiàn)數(shù)據(jù)高吞吐

作者:鴨血粉絲Tang 2022-04-28 07:31:41
云計(jì)算
Kafka 本文主要以SpringBoot技術(shù)框架為背景,結(jié)合實(shí)際業(yè)務(wù)需求,采用 kafka 進(jìn)行數(shù)據(jù)消費(fèi),實(shí)現(xiàn)數(shù)據(jù)量的高吞吐,在下篇文章中,我們會(huì)介紹消費(fèi)失敗的處理流程。

為成都等地區(qū)用戶提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及成都網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為成都網(wǎng)站建設(shè)、成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!

一、介紹

在上篇文章中,我們?cè)敿?xì)的介紹了 kafka 的架構(gòu)模型,在集群環(huán)境中,kafka 可以通過設(shè)置分區(qū)數(shù)來(lái)加快數(shù)據(jù)的消費(fèi)速度。

光知道理論還不行,我們得真真切切的實(shí)踐起來(lái)才行!

下面,我將結(jié)合生產(chǎn)環(huán)境的真實(shí)案例,以SpringBoot技術(shù)框架為基礎(chǔ),向大家介紹 kafka 的使用以及如何實(shí)現(xiàn)數(shù)據(jù)高吞吐!

二、程序?qū)嵺`

最近,公司大數(shù)據(jù)團(tuán)隊(duì)每天凌晨會(huì)將客戶的訂單數(shù)據(jù)進(jìn)行統(tǒng)計(jì)計(jì)算,然后把業(yè)績(jī)數(shù)據(jù)推送給我們,以便銷售人員每天能看到昨天的業(yè)績(jī)數(shù)據(jù),數(shù)據(jù)的體量大約在 1000 多萬(wàn)條,以下是我對(duì)接的過程!

2.1、添加 kafka 依賴包

本次項(xiàng)目的SpringBoot版本為2.1.5.RELEASE,依賴的 kafka 的版本為2.2.6.RELEASE。

https://back-media.51cto.com/editor?id=707646/h6e90be6-7EV6kJbV

2.2、添加 kafka 配置變量

當(dāng)添加完了依賴包之后,我們只需要在application.properties中添加 kafka 配置變量,基本上就可以正常使用了。

# 指定kafka server的地址,集群配多個(gè),中間,逗號(hào)隔開
spring.kafka.bootstrap-servers=197.168.25.196:9092
#重試次數(shù)
spring.kafka.producer.retries=3
#批量發(fā)送的消息數(shù)量
spring.kafka.producer.batch-size=1000
#32MB的批處理緩沖區(qū)
spring.kafka.producer.buffer-memory=33554432
#默認(rèn)消費(fèi)者組
spring.kafka.consumer.group-id=crm-microservice-newperformance
#最早未被消費(fèi)的offset
spring.kafka.consumer.auto-offset-reset=earliest
#批量一次最大拉取數(shù)據(jù)量
spring.kafka.consumer.max-poll-records=4000
#是否自動(dòng)提交
spring.kafka.consumer.enable-auto-commit=true
#自動(dòng)提交時(shí)間間隔,單位ms
spring.kafka.consumer.auto-commit-interval=1000

2.3、創(chuàng)建一個(gè)消費(fèi)者

@Component
public class BigDataTopicListener {

private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);

/**
* 監(jiān)聽kafka數(shù)據(jù)
* @param consumerRecords
* @param ack
*/
@KafkaListener(topics = {"big_data_topic"})
public void consumer(ConsumerRecord consumerRecord) {
log.info("收到bigData推送的數(shù)據(jù)'{}'", consumerRecord.toString());
//...
//db.save(consumerRecord);//插入或者更新數(shù)據(jù)
}

}

2.4、模擬對(duì)方推送數(shù)據(jù)測(cè)試

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {

@Autowired
private KafkaTemplate kafkaTemplate;

@Test
public void testSend(){
for (int i = 0; i < 5000; i++) {
Map map = new LinkedHashMap<>();
map.put("datekey", 20210610);
map.put("userid", i);
map.put("salaryAmount", i);
//向kafka的big_data_topic主題推送數(shù)據(jù)
kafkaTemplate.send("big_data_topic", JSONObject.toJSONString(map));
}
}
}

起初,通過這種單條數(shù)據(jù)消費(fèi)方式,進(jìn)行測(cè)試程序沒太大毛病!

但是,當(dāng)上到生產(chǎn)之后,發(fā)現(xiàn)一個(gè)很大的問題,就是消費(fèi)1000萬(wàn)條數(shù)據(jù),至少需要3個(gè)小時(shí),結(jié)果導(dǎo)致數(shù)據(jù)看板一直沒數(shù)據(jù)。

第二天痛定思痛,決定改成批量消費(fèi)模型,怎么操作呢,請(qǐng)看下面!

2.5、將 kafka 的消費(fèi)模式改成批量消費(fèi)

首先,創(chuàng)建一個(gè)KafkaConfiguration配置類,內(nèi)容如下!

@Configuration
public class KafkaConfiguration {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.producer.retries}")
private Integer retries;

@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;

@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;

@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;

@Value("${spring.kafka.consumer.batch.concurrency}")
private Integer batchConcurrency;

@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;

@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;


/**
* 生產(chǎn)者配置信息
*/
@Bean
public Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

/**
* 生產(chǎn)者工廠
*/
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

/**
* 生產(chǎn)者模板
*/
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}


/**
* 消費(fèi)者配置信息
*/
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

/**
* 消費(fèi)者批量工廠
*/
@Bean
public KafkaListenerContainerFactory batchFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//設(shè)置并發(fā)量,小于或等于Topic的分區(qū)數(shù)
factory.setConcurrency(batchConcurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}

}

同時(shí),新增一個(gè)spring.kafka.consumer.batch.concurrency變量,用來(lái)設(shè)置并發(fā)數(shù),通過這個(gè)參數(shù)我們可以指定幾個(gè)線程來(lái)實(shí)現(xiàn)消費(fèi)。

在application.properties配置文件中,添加如下變量:

#批消費(fèi)并發(fā)量,小于或等于Topic的分區(qū)數(shù)
spring.kafka.consumer.batch.concurrency = 3

#設(shè)置每次批量拉取的最大數(shù)量為4000
spring.kafka.consumer.max-poll-records=4000

#設(shè)置自動(dòng)提交改成false
spring.kafka.consumer.enable-auto-commit=false

最后,將單個(gè)消費(fèi)方法改成批量消費(fèi)方法模式。

@Component
public class BigDataTopicListener {

private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);

/**
* 監(jiān)聽kafka數(shù)據(jù)(批量消費(fèi))
* @param consumerRecords
* @param ack
*/
@KafkaListener(topics = {"big_data_topic"}, containerFactory = "batchFactory")
public void batchConsumer(List> consumerRecords, Acknowledgment ack) {
long start = System.currentTimeMillis();

//...
//db.batchSave(consumerRecords);//批量插入或者批量更新數(shù)據(jù)

//手動(dòng)提交
ack.acknowledge();
log.info("收到bigData推送的數(shù)據(jù),拉取數(shù)據(jù)量:{},消費(fèi)時(shí)間:{}ms", consumerRecords.size(), (System.currentTimeMillis() - start));
}

}

此時(shí),消費(fèi)性能大大的提升,數(shù)據(jù)處理的非常快,500萬(wàn)條數(shù)據(jù),最多 30 分鐘就全部消費(fèi)完畢了。

本例中的消費(fèi)微服務(wù),生產(chǎn)環(huán)境部署了3臺(tái)服務(wù)器,同時(shí)big_data_topic主題的分區(qū)數(shù)為3,因此并發(fā)數(shù)設(shè)置為3比較合適。

隨著推送的數(shù)據(jù)量不斷增加,如果你覺得消費(fèi)速度還不夠,你可以重新設(shè)置每次批量拉取的最大數(shù)量,活著橫向擴(kuò)展微服務(wù)的集群實(shí)例數(shù)量和 topic 的分區(qū)數(shù),以此來(lái)加快數(shù)據(jù)的消費(fèi)速度。

但是,如果在單臺(tái)機(jī)器中,每次批量拉取的最大數(shù)量過大,大對(duì)象也會(huì)很大,會(huì)造成頻繁的 gc 告警!

因此,在實(shí)際的使用過程中,每次批量拉取的最大數(shù)量并不是越大越好,根據(jù)當(dāng)前服務(wù)器的硬件配置,調(diào)節(jié)到合適的閥值,才是最優(yōu)的選擇!

三、小結(jié)

本文主要以SpringBoot技術(shù)框架為背景,結(jié)合實(shí)際業(yè)務(wù)需求,采用 kafka 進(jìn)行數(shù)據(jù)消費(fèi),實(shí)現(xiàn)數(shù)據(jù)量的高吞吐,在下篇文章中,我們會(huì)介紹消費(fèi)失敗的處理流程。


網(wǎng)站題目:SpringBoot整合Kafka實(shí)現(xiàn)數(shù)據(jù)高吞吐
當(dāng)前URL:http://m.5511xx.com/article/dpeshpe.html