日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
解析SparkStreaming和Kafka集成的兩種方式

解析SparkStreaming和Kafka集成的兩種方式

作者:開源大數(shù)據(jù)EMR 2020-02-21 17:33:17
大數(shù)據(jù)
Kafka
Spark Spark Streaming是基于微批處理的流式計(jì)算引擎,通常是利用Spark Core或者Spark Core與Spark Sql一起來處理數(shù)據(jù)。在企業(yè)實(shí)時(shí)處理架構(gòu)中,通常將Spark Streaming和Kafka集成作為整個(gè)大數(shù)據(jù)處理架構(gòu)的核心環(huán)節(jié)之一。

Spark Streaming是基于微批處理的流式計(jì)算引擎,通常是利用Spark Core或者Spark Core與Spark Sql一起來處理數(shù)據(jù)。在企業(yè)實(shí)時(shí)處理架構(gòu)中,通常將Spark Streaming和Kafka集成作為整個(gè)大數(shù)據(jù)處理架構(gòu)的核心環(huán)節(jié)之一。

針對(duì)不同的Spark、Kafka版本,集成處理數(shù)據(jù)的方式分為兩種:Receiver based Approach和Direct Approach,不同集成版本處理方式的支持,可參考下圖:

Receiver based Approach

基于receiver的方式是使用kafka消費(fèi)者高階API實(shí)現(xiàn)的。

對(duì)于所有的receiver,它通過kafka接收的數(shù)據(jù)會(huì)被存儲(chǔ)于spark的executors上,底層是寫入BlockManager中,默認(rèn)200ms生成一個(gè)block(通過配置參數(shù)spark.streaming.blockInterval決定)。然后由spark streaming提交的job構(gòu)建BlockRdd,最終以spark core任務(wù)的形式運(yùn)行。

關(guān)于receiver方式,有以下幾點(diǎn)需要注意:

  • receiver作為一個(gè)常駐線程調(diào)度到executor上運(yùn)行,占用一個(gè)cpu
  • receiver個(gè)數(shù)由KafkaUtils.createStream調(diào)用次數(shù)決定,一次一個(gè)receiver
  • kafka中的topic分區(qū)并不能關(guān)聯(lián)產(chǎn)生在spark streaming中的rdd分區(qū)
  • 增加在KafkaUtils.createStream()中的指定的topic分區(qū)數(shù),僅僅增加了單個(gè)receiver消費(fèi)的topic的線程數(shù),它不會(huì)增加處理數(shù)據(jù)中的并行的spark的數(shù)量【topicMap[topic,num_threads]map的value對(duì)應(yīng)的數(shù)值是每個(gè)topic對(duì)應(yīng)的消費(fèi)線程數(shù)】
  • receiver默認(rèn)200ms生成一個(gè)block,建議根據(jù)數(shù)據(jù)量大小調(diào)整block生成周期。
  • receiver接收的數(shù)據(jù)會(huì)放入到BlockManager,每個(gè)executor都會(huì)有一個(gè)BlockManager實(shí)例,由于數(shù)據(jù)本地性,那些存在receiver的executor會(huì)被調(diào)度執(zhí)行更多的task,就會(huì)導(dǎo)致某些executor比較空閑

建議通過參數(shù)spark.locality.wait調(diào)整數(shù)據(jù)本地性。該參數(shù)設(shè)置的不合理,比如設(shè)置為10而任務(wù)2s就處理結(jié)束,就會(huì)導(dǎo)致越來越多的任務(wù)調(diào)度到數(shù)據(jù)存在的executor上執(zhí)行,導(dǎo)致任務(wù)執(zhí)行緩慢甚至失敗(要和數(shù)據(jù)傾斜區(qū)分開)

多個(gè)kafka輸入的DStreams可以使用不同的groups、topics創(chuàng)建,使用多個(gè)receivers接收處理數(shù)據(jù)

兩種receiver可靠的receiver:

  • 可靠的receiver在接收到數(shù)據(jù)并通過復(fù)制機(jī)制存儲(chǔ)在spark中時(shí)準(zhǔn)確的向可靠的數(shù)據(jù)源發(fā)送ack確認(rèn)不可靠的receiver:
  • 不可靠的receiver不會(huì)向數(shù)據(jù)源發(fā)送數(shù)據(jù)已接收確認(rèn)。 這適用于用于不支持ack的數(shù)據(jù)源當(dāng)然,我們也可以自定義receiver。
  • receiver處理數(shù)據(jù)可靠性默認(rèn)情況下,receiver是可能丟失數(shù)據(jù)的。
  • 可以通過設(shè)置spark.streaming.receiver.writeAheadLog.enable為true開啟預(yù)寫日志機(jī)制,將數(shù)據(jù)先寫入一個(gè)可靠地分布式文件系統(tǒng)如hdfs,確保數(shù)據(jù)不丟失,但會(huì)失去一定性能

限制消費(fèi)者消費(fèi)的最大速率涉及三個(gè)參數(shù):

  • spark.streaming.backpressure.enabled:默認(rèn)是false,設(shè)置為true,就開啟了背壓機(jī)制;
  • spark.streaming.backpressure.initialRate:默認(rèn)沒設(shè)置初始消費(fèi)速率,第一次啟動(dòng)時(shí)每個(gè)receiver接收數(shù)據(jù)的最大值;
  • spark.streaming.receiver.maxRate:默認(rèn)值沒設(shè)置,每個(gè)receiver接收數(shù)據(jù)的最大速率(每秒記錄數(shù))。每個(gè)流每秒最多將消費(fèi)此數(shù)量的記錄,將此配置設(shè)置為0或負(fù)數(shù)將不會(huì)對(duì)最大速率進(jìn)行限制

在產(chǎn)生job時(shí),會(huì)將當(dāng)前job有效范圍內(nèi)的所有block組成一個(gè)BlockRDD,一個(gè)block對(duì)應(yīng)一個(gè)分區(qū)

kafka082版本消費(fèi)者高階API中,有分組的概念,建議使消費(fèi)者組內(nèi)的線程數(shù)(消費(fèi)者個(gè)數(shù))和kafka分區(qū)數(shù)保持一致。如果多于分區(qū)數(shù),會(huì)有部分消費(fèi)者處于空閑狀態(tài)

Direct Approach

direct approach是spark streaming不使用receiver集成kafka的方式,一般在企業(yè)生產(chǎn)環(huán)境中使用較多。相較于receiver,有以下特點(diǎn):

1.不使用receiver

不需要?jiǎng)?chuàng)建多個(gè)kafka streams并聚合它們

減少不必要的CPU占用

減少了receiver接收數(shù)據(jù)寫入BlockManager,然后運(yùn)行時(shí)再通過blockId、網(wǎng)絡(luò)傳輸、磁盤讀取等來獲取數(shù)據(jù)的整個(gè)過程,提升了效率

無需wal,進(jìn)一步減少磁盤IO操作

2.direct方式生的rdd是KafkaRDD,它的分區(qū)數(shù)與kafka分區(qū)數(shù)保持一致一樣多的rdd分區(qū)來消費(fèi),更方便我們對(duì)并行度進(jìn)行控制

注意:在shuffle或者repartition操作后生成的rdd,這種對(duì)應(yīng)關(guān)系會(huì)失效

3.可以手動(dòng)維護(hù)offset,實(shí)現(xiàn)exactly once語義

4.數(shù)據(jù)本地性問題。在KafkaRDD在compute函數(shù)中,使用SimpleConsumer根據(jù)指定的topic、分區(qū)、offset去讀取kafka數(shù)據(jù)。

但在010版本后,又存在假如kafka和spark處于同一集群存在數(shù)據(jù)本地性的問題

5.限制消費(fèi)者消費(fèi)的最大速率

spark.streaming.kafka.maxRatePerPartition:從每個(gè)kafka分區(qū)讀取數(shù)據(jù)的最大速率(每秒記錄數(shù))。這是針對(duì)每個(gè)分區(qū)進(jìn)行限速,需要事先知道kafka分區(qū)數(shù),來評(píng)估系統(tǒng)的吞吐量。


新聞名稱:解析SparkStreaming和Kafka集成的兩種方式
文章源于:http://m.5511xx.com/article/ccddgeh.html