新聞中心
SpringBoot 整合 Kafka 實(shí)現(xiàn)數(shù)據(jù)高吞吐
作者:鴨血粉絲Tang 2022-04-28 07:31:41
云計(jì)算
Kafka 本文主要以SpringBoot技術(shù)框架為背景,結(jié)合實(shí)際業(yè)務(wù)需求,采用 kafka 進(jìn)行數(shù)據(jù)消費(fèi),實(shí)現(xiàn)數(shù)據(jù)量的高吞吐,在下篇文章中,我們會(huì)介紹消費(fèi)失敗的處理流程。

為成都等地區(qū)用戶提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及成都網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為成都網(wǎng)站建設(shè)、成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!
一、介紹
在上篇文章中,我們?cè)敿?xì)的介紹了 kafka 的架構(gòu)模型,在集群環(huán)境中,kafka 可以通過設(shè)置分區(qū)數(shù)來(lái)加快數(shù)據(jù)的消費(fèi)速度。
光知道理論還不行,我們得真真切切的實(shí)踐起來(lái)才行!
下面,我將結(jié)合生產(chǎn)環(huán)境的真實(shí)案例,以SpringBoot技術(shù)框架為基礎(chǔ),向大家介紹 kafka 的使用以及如何實(shí)現(xiàn)數(shù)據(jù)高吞吐!
二、程序?qū)嵺`
最近,公司大數(shù)據(jù)團(tuán)隊(duì)每天凌晨會(huì)將客戶的訂單數(shù)據(jù)進(jìn)行統(tǒng)計(jì)計(jì)算,然后把業(yè)績(jī)數(shù)據(jù)推送給我們,以便銷售人員每天能看到昨天的業(yè)績(jī)數(shù)據(jù),數(shù)據(jù)的體量大約在 1000 多萬(wàn)條,以下是我對(duì)接的過程!
2.1、添加 kafka 依賴包
本次項(xiàng)目的SpringBoot版本為2.1.5.RELEASE,依賴的 kafka 的版本為2.2.6.RELEASE。
https://back-media.51cto.com/editor?id=707646/h6e90be6-7EV6kJbV
2.2、添加 kafka 配置變量
當(dāng)添加完了依賴包之后,我們只需要在application.properties中添加 kafka 配置變量,基本上就可以正常使用了。
# 指定kafka server的地址,集群配多個(gè),中間,逗號(hào)隔開
spring.kafka.bootstrap-servers=197.168.25.196:9092
#重試次數(shù)
spring.kafka.producer.retries=3
#批量發(fā)送的消息數(shù)量
spring.kafka.producer.batch-size=1000
#32MB的批處理緩沖區(qū)
spring.kafka.producer.buffer-memory=33554432
#默認(rèn)消費(fèi)者組
spring.kafka.consumer.group-id=crm-microservice-newperformance
#最早未被消費(fèi)的offset
spring.kafka.consumer.auto-offset-reset=earliest
#批量一次最大拉取數(shù)據(jù)量
spring.kafka.consumer.max-poll-records=4000
#是否自動(dòng)提交
spring.kafka.consumer.enable-auto-commit=true
#自動(dòng)提交時(shí)間間隔,單位ms
spring.kafka.consumer.auto-commit-interval=1000
2.3、創(chuàng)建一個(gè)消費(fèi)者
@Component
public class BigDataTopicListener {
private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);
/**
* 監(jiān)聽kafka數(shù)據(jù)
* @param consumerRecords
* @param ack
*/
@KafkaListener(topics = {"big_data_topic"})
public void consumer(ConsumerRecord, ?> consumerRecord) {
log.info("收到bigData推送的數(shù)據(jù)'{}'", consumerRecord.toString());
//...
//db.save(consumerRecord);//插入或者更新數(shù)據(jù)
}
}
2.4、模擬對(duì)方推送數(shù)據(jù)測(cè)試
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
@Autowired
private KafkaTemplatekafkaTemplate;
@Test
public void testSend(){
for (int i = 0; i < 5000; i++) {
Mapmap = new LinkedHashMap<>();
map.put("datekey", 20210610);
map.put("userid", i);
map.put("salaryAmount", i);
//向kafka的big_data_topic主題推送數(shù)據(jù)
kafkaTemplate.send("big_data_topic", JSONObject.toJSONString(map));
}
}
}
起初,通過這種單條數(shù)據(jù)消費(fèi)方式,進(jìn)行測(cè)試程序沒太大毛病!
但是,當(dāng)上到生產(chǎn)之后,發(fā)現(xiàn)一個(gè)很大的問題,就是消費(fèi)1000萬(wàn)條數(shù)據(jù),至少需要3個(gè)小時(shí),結(jié)果導(dǎo)致數(shù)據(jù)看板一直沒數(shù)據(jù)。
第二天痛定思痛,決定改成批量消費(fèi)模型,怎么操作呢,請(qǐng)看下面!
2.5、將 kafka 的消費(fèi)模式改成批量消費(fèi)
首先,創(chuàng)建一個(gè)KafkaConfiguration配置類,內(nèi)容如下!
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.retries}")
private Integer retries;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.batch.concurrency}")
private Integer batchConcurrency;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
/**
* 生產(chǎn)者配置信息
*/
@Bean
public MapproducerConfigs() {
Mapprops = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/**
* 生產(chǎn)者工廠
*/
@Bean
public ProducerFactoryproducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* 生產(chǎn)者模板
*/
@Bean
public KafkaTemplatekafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* 消費(fèi)者配置信息
*/
@Bean
public MapconsumerConfigs() {
Mapprops = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 消費(fèi)者批量工廠
*/
@Bean
public KafkaListenerContainerFactory> batchFactory() {
ConcurrentKafkaListenerContainerFactoryfactory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//設(shè)置并發(fā)量,小于或等于Topic的分區(qū)數(shù)
factory.setConcurrency(batchConcurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
}
同時(shí),新增一個(gè)spring.kafka.consumer.batch.concurrency變量,用來(lái)設(shè)置并發(fā)數(shù),通過這個(gè)參數(shù)我們可以指定幾個(gè)線程來(lái)實(shí)現(xiàn)消費(fèi)。
在application.properties配置文件中,添加如下變量:
#批消費(fèi)并發(fā)量,小于或等于Topic的分區(qū)數(shù)
spring.kafka.consumer.batch.concurrency = 3
#設(shè)置每次批量拉取的最大數(shù)量為4000
spring.kafka.consumer.max-poll-records=4000
#設(shè)置自動(dòng)提交改成false
spring.kafka.consumer.enable-auto-commit=false
最后,將單個(gè)消費(fèi)方法改成批量消費(fèi)方法模式。
@Component
public class BigDataTopicListener {
private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);
/**
* 監(jiān)聽kafka數(shù)據(jù)(批量消費(fèi))
* @param consumerRecords
* @param ack
*/
@KafkaListener(topics = {"big_data_topic"}, containerFactory = "batchFactory")
public void batchConsumer(List> consumerRecords, Acknowledgment ack) {
long start = System.currentTimeMillis();
//...
//db.batchSave(consumerRecords);//批量插入或者批量更新數(shù)據(jù)
//手動(dòng)提交
ack.acknowledge();
log.info("收到bigData推送的數(shù)據(jù),拉取數(shù)據(jù)量:{},消費(fèi)時(shí)間:{}ms", consumerRecords.size(), (System.currentTimeMillis() - start));
}
}
此時(shí),消費(fèi)性能大大的提升,數(shù)據(jù)處理的非常快,500萬(wàn)條數(shù)據(jù),最多 30 分鐘就全部消費(fèi)完畢了。
本例中的消費(fèi)微服務(wù),生產(chǎn)環(huán)境部署了3臺(tái)服務(wù)器,同時(shí)big_data_topic主題的分區(qū)數(shù)為3,因此并發(fā)數(shù)設(shè)置為3比較合適。
隨著推送的數(shù)據(jù)量不斷增加,如果你覺得消費(fèi)速度還不夠,你可以重新設(shè)置每次批量拉取的最大數(shù)量,活著橫向擴(kuò)展微服務(wù)的集群實(shí)例數(shù)量和 topic 的分區(qū)數(shù),以此來(lái)加快數(shù)據(jù)的消費(fèi)速度。
但是,如果在單臺(tái)機(jī)器中,每次批量拉取的最大數(shù)量過大,大對(duì)象也會(huì)很大,會(huì)造成頻繁的 gc 告警!
因此,在實(shí)際的使用過程中,每次批量拉取的最大數(shù)量并不是越大越好,根據(jù)當(dāng)前服務(wù)器的硬件配置,調(diào)節(jié)到合適的閥值,才是最優(yōu)的選擇!
三、小結(jié)
本文主要以SpringBoot技術(shù)框架為背景,結(jié)合實(shí)際業(yè)務(wù)需求,采用 kafka 進(jìn)行數(shù)據(jù)消費(fèi),實(shí)現(xiàn)數(shù)據(jù)量的高吞吐,在下篇文章中,我們會(huì)介紹消費(fèi)失敗的處理流程。
網(wǎng)站題目:SpringBoot整合Kafka實(shí)現(xiàn)數(shù)據(jù)高吞吐
當(dāng)前URL:http://m.5511xx.com/article/dpeshpe.html


咨詢
建站咨詢
