日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
使用Kafka和Druid了解Spark流

使用Kafka和Druid了解Spark流

作者:聞數(shù)起舞 2020-05-14 10:26:27

大數(shù)據(jù)

Kafka

Spark 在本博文中,我將分享通過(guò)將Spark Streaming,Kafka和Apache Druid結(jié)合在一起以構(gòu)建實(shí)時(shí)分析儀表板,以確保精確的數(shù)據(jù)表示而獲得的知識(shí)。

成都創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括潛江網(wǎng)站建設(shè)、潛江網(wǎng)站制作、潛江網(wǎng)頁(yè)制作以及潛江網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,潛江網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到潛江省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

作為一名數(shù)據(jù)工程師,我正在研究大數(shù)據(jù)技術(shù),例如Spark Streaming,Kafka和Apache Druid。 他們都有自己的教程和RTFM頁(yè)面。 但是,將這些技術(shù)大規(guī)模地組合在一起時(shí),您會(huì)發(fā)現(xiàn)自己正在尋找涵蓋更復(fù)雜的生產(chǎn)用例的解決方案。 在本博文中,我將分享通過(guò)將Spark Streaming,Kafka和Apache Druid結(jié)合在一起以構(gòu)建實(shí)時(shí)分析儀表板,以確保精確的數(shù)據(jù)表示而獲得的知識(shí)。

在開始之前……關(guān)于實(shí)時(shí)分析的幾句話

實(shí)時(shí)分析是大數(shù)據(jù)技術(shù)的新趨勢(shì),通常具有顯著的業(yè)務(wù)影響。 在分析新鮮數(shù)據(jù)時(shí),見解更加精確。 例如,為數(shù)據(jù)分析師,BI和客戶經(jīng)理團(tuán)隊(duì)提供實(shí)時(shí)分析儀表板可以幫助這些團(tuán)隊(duì)做出快速?zèng)Q策。 大規(guī)模實(shí)時(shí)分析的常用架構(gòu)基于Spark Streaming和Kafka。 這兩種技術(shù)都具有很好的可擴(kuò)展性。 它們?cè)谌杭线\(yùn)行,并在許多計(jì)算機(jī)之間分配負(fù)載。 Spark作業(yè)的輸出可以到達(dá)許多不同的目的地,這取決于特定的用例和體系結(jié)構(gòu)。 我們的目標(biāo)是提供顯示實(shí)時(shí)事件的可視工具。 為此,我們選擇了Apache Druid數(shù)據(jù)庫(kù)。

Apache Druid中的數(shù)據(jù)可視化

Druid是高性能的實(shí)時(shí)分析數(shù)據(jù)庫(kù)。 它的好處之一是能夠使用來(lái)自Kafka主題的實(shí)時(shí)數(shù)據(jù),并使用Pivot模塊在其上構(gòu)建強(qiáng)大的可視化效果。 它的可視化功能可以運(yùn)行各種臨時(shí)的"切片和切塊"查詢,并快速獲得可視化結(jié)果。 這對(duì)于分析各種用例非常有用,例如特定運(yùn)動(dòng)在某些國(guó)家的表現(xiàn)。 實(shí)時(shí)檢索數(shù)據(jù),延遲1-2分鐘。

架構(gòu)

因此,我們決定基于Kafka事件和Apache Druid構(gòu)建實(shí)時(shí)分析系統(tǒng)。 我們已經(jīng)在Kafka主題中進(jìn)行過(guò)活動(dòng)。 但是我們不能將它們直接攝取到德魯伊中。 我們需要為每個(gè)事件添加更多維度。 我們需要用更多的數(shù)據(jù)豐富每個(gè)事件,以便在德魯伊中方便地查看它。 關(guān)于規(guī)模,我們每分鐘要處理數(shù)十萬(wàn)個(gè)事件,因此我們需要使用能夠支持這些數(shù)字的技術(shù)。 我們決定使用Spark Streaming作業(yè)豐富原始的Kafka事件。

圖1.實(shí)時(shí)分析架構(gòu)

Spark Streaming作業(yè)永遠(yuǎn)運(yùn)行? 并不是的。

Spark Streaming作業(yè)的想法是它始終在運(yùn)行。 這項(xiàng)工作永遠(yuǎn)都不應(yīng)停止。 它不斷讀取來(lái)自Kafka主題的事件,對(duì)其進(jìn)行處理,并將輸出寫入另一個(gè)Kafka主題。 但是,這是一個(gè)樂觀的看法。 在現(xiàn)實(shí)生活中,事情更加復(fù)雜。 Spark群集中存在驅(qū)動(dòng)程序故障,在這種情況下,作業(yè)將重新啟動(dòng)。 有時(shí)新版本的spark應(yīng)用程序已部署到生產(chǎn)中。 在這種情況下會(huì)發(fā)生什么? 重新啟動(dòng)的作業(yè)如何讀取Kafka主題并處理事件? 在深入研究這些細(xì)節(jié)之前,此圖顯示了重新啟動(dòng)Spark Streaming作業(yè)時(shí)在Druid中看到的內(nèi)容:

圖2.作業(yè)重新啟動(dòng)時(shí)數(shù)據(jù)丟失

絕對(duì)是數(shù)據(jù)丟失!

我們要解決什么問(wèn)題?

我們正在處理Spark Streaming應(yīng)用程序,該應(yīng)用程序從一個(gè)Kafka主題讀取事件,并將事件寫入另一個(gè)Kafka主題。 這些事件稍后將在Druid中顯示。 我們的目標(biāo)是在重新啟動(dòng)Spark Streaming應(yīng)用程序期間實(shí)現(xiàn)平滑的數(shù)據(jù)可視化。 換句話說(shuō),我們需要確保在Spark Streaming作業(yè)重啟期間不會(huì)丟失或重復(fù)任何事件。

都是關(guān)于補(bǔ)償

為了理解為什么作業(yè)重新啟動(dòng)時(shí)會(huì)丟失數(shù)據(jù),我們需要熟悉Kafka體系結(jié)構(gòu)中的一些術(shù)語(yǔ)。 您可以在這里找到Kafka的官方文檔。 簡(jiǎn)而言之:Kafka中的事件存儲(chǔ)在主題中; 每個(gè)主題都分為多個(gè)分區(qū)。 分區(qū)中的每個(gè)記錄都有一個(gè)偏移量-一個(gè)連續(xù)的數(shù)字,它定義了記錄的順序。 當(dāng)應(yīng)用程序使用該主題時(shí),它可以通過(guò)多種方式處理偏移量。 默認(rèn)行為始終是從最新的偏移量讀取。 另一個(gè)選擇是提交偏移量,即持久保留偏移量,以便作業(yè)可以在重新啟動(dòng)時(shí)讀取已提交的偏移量并從此處繼續(xù)。 讓我們看一下解決方案的步驟,并在每個(gè)步驟中加深對(duì)Kafka膠印管理的了解。

步驟#1-自動(dòng)提交偏移量

默認(rèn)行為始終是從最新的偏移量讀取。 這將不起作用,因?yàn)橹匦聠?dòng)作業(yè)時(shí),該主題中有新事件。 如果作業(yè)從最新讀取,它將丟失重新啟動(dòng)期間添加的所有消息,如圖2所示。Spark Streaming中有一個(gè)" enable.auto.commit"參數(shù)。 默認(rèn)情況下,其值為false。 圖3顯示了將其值更改為true,運(yùn)行Spark應(yīng)用程序并重新啟動(dòng)后的行為。

圖3.作業(yè)重啟的數(shù)據(jù)峰值

我們可以看到,使用Kafka自動(dòng)提交功能會(huì)產(chǎn)生新的效果。 沒有"數(shù)據(jù)丟失",但是現(xiàn)在我們看到重復(fù)的事件。 沒有真正的事件"爆發(fā)"。 實(shí)際發(fā)生的情況是自動(dòng)提交機(jī)制"不時(shí)"提交偏移量。 輸出主題中有許多未提交的消息。 重新啟動(dòng)后,作業(yè)將使用最新提交的偏移量中的消息,并再次處理其中一些事件。 這就是為什么在輸出中會(huì)出現(xiàn)大量事件的原因。

顯然,將這些重復(fù)項(xiàng)合并到我們的可視化中可能會(huì)誤導(dǎo)業(yè)務(wù)消費(fèi)者此數(shù)據(jù),并影響他們的決策和對(duì)系統(tǒng)的信任。

步驟#2:手動(dòng)提交Kafka偏移

因此,我們不能依靠Kafka自動(dòng)提交功能。 我們需要自己進(jìn)行卡夫卡補(bǔ)償。 為了做到這一點(diǎn),讓我們看看Spark Streaming如何使用Kafka主題中的數(shù)據(jù)。 Spark Streaming使用稱為離散流或DStream的體系結(jié)構(gòu)。 DStream由一系列連續(xù)的RDD(彈性分布式數(shù)據(jù)集)表示,這是Spark的主要抽象之一。 大多數(shù)Spark Streaming作業(yè)如下所示:

  
 
 
 
  1. dstream.foreachRDD { rdd => rdd.foreach { record => process(record)} } 

在我們的案例中,處理記錄意味著將記錄寫入輸出Kafka主題。 因此,為了提交Kafka偏移量,我們需要執(zhí)行以下操作:

  
 
 
 
  1. dstream.foreachRDD { rdd => val offsetRanges =  
  2. rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreach { record  
  3. => process(record)}  
  4. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } 

這是一種簡(jiǎn)單明了的方法,在我們深入討論之前,讓我們看一下大局。 假設(shè)我們正確處理了偏移量。 即,在每次RDD處理之后都保存偏移量。 當(dāng)我們停止工作時(shí)會(huì)怎樣? 該作業(yè)在RDD的處理過(guò)程中停止。 微批處理的部分將寫入輸出Kafka主題,并且不會(huì)提交。 一旦作業(yè)再次運(yùn)行,它將第二次處理某些消息,并且重復(fù)消息的峰值將(與以前一樣)出現(xiàn)在Druid中:

圖4.作業(yè)重新啟動(dòng)時(shí)的數(shù)據(jù)峰值

正常關(guān)機(jī)

事實(shí)證明,有一種方法可以確保在RDD處理期間不會(huì)殺死作業(yè)。這稱為"正常關(guān)機(jī)"。有幾篇博客文章描述了如何優(yōu)雅地殺死Spark應(yīng)用程序,但是其中大多數(shù)與舊版本的Spark有關(guān),并且有很多限制。我們一直在尋找一種適用于任何規(guī)模且不依賴于特定Spark版本或操作系統(tǒng)的"安全"解決方案。要啟用正常關(guān)機(jī),應(yīng)使用以下參數(shù)創(chuàng)建Spark上下文:spark.streaming.stopGracefullyOnShutdown = true。這指示Spark在JVM關(guān)閉時(shí)(而不是立即)正常關(guān)閉StreamingContext。另外,我們需要一種機(jī)制來(lái)有意地停止工作,例如在部署新版本時(shí)。我們已經(jīng)通過(guò)簡(jiǎn)單地檢查是否存在指示作業(yè)關(guān)閉的HDFS文件來(lái)實(shí)現(xiàn)該機(jī)制的第一個(gè)版本。當(dāng)文件顯示在HDFS中時(shí),流上下文將使用以下參數(shù)停止:ssc.stop(stopSparkContext = true,stopGracefully = true)

在這種情況下,只有在完成所有接收到的數(shù)據(jù)處理之后,Spark應(yīng)用程序才會(huì)正常停止。 這正是我們所需要的。

步驟#3:Kafka commitAsync

讓我們回顧一下到目前為止的情況。 我們有意在每個(gè)RDD處理中提交Kafka偏移量(使用Kafka commitAsync API),并使用Spark正常關(guān)機(jī)。 顯然,還有另一個(gè)警告。 深入研究Kafka API和Kafka commitAsync()源代碼的文檔,我了解到commitAsync()僅將offsetRanges放入隊(duì)列中,實(shí)際上僅在下一個(gè)foreachRDD循環(huán)中進(jìn)行處理。 即使Spark作業(yè)正常停止并完成了所有RDD的處理,實(shí)際上也不會(huì)提交最后一個(gè)RDD的偏移量。 為解決此問(wèn)題,我們實(shí)現(xiàn)了一個(gè)代碼,該代碼可同步保留Kafka偏移量,并且不依賴于Kafka commitAsync()。 然后,對(duì)于每個(gè)RDD,我們將提交的偏移量存儲(chǔ)在HDFS文件中。 當(dāng)作業(yè)再次開始運(yùn)行時(shí),它將從HDFS加載偏移文件,并從這些偏移開始使用Kafka主題。

在這里,它有效!

僅僅是正常關(guān)機(jī)和Kafka偏移量同步存儲(chǔ)的組合,才為我們提供了理想的結(jié)果。 重新啟動(dòng)期間沒有數(shù)據(jù)丟失,沒有數(shù)據(jù)高峰:

圖5.重新啟動(dòng)Spark作業(yè)期間沒有峰值數(shù)據(jù)丟失

結(jié)論

解決Spark Streaming和Kafka之間的集成問(wèn)題是構(gòu)建實(shí)時(shí)分析儀表板的重要里程碑。 我們找到了可以確保穩(wěn)定的數(shù)據(jù)流的解決方案,而不會(huì)在Spark Streaming作業(yè)重啟期間丟失事件或重復(fù)。 現(xiàn)在,我們獲得了在Druid中可視化的可信賴數(shù)據(jù)。 因此,我們將更多類型的事件(Kafka主題)添加到了Druid中,并建立了實(shí)時(shí)儀表板。 這些儀表板為各種團(tuán)隊(duì)提供了見解,例如BI,產(chǎn)品和客戶支持。 我們的下一個(gè)目標(biāo)是利用Druid的更多功能,例如新的分析功能和警報(bào)。


分享名稱:使用Kafka和Druid了解Spark流
本文網(wǎng)址:http://m.5511xx.com/article/cdidsgh.html