日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
創(chuàng)新互聯(lián)ApacheKafka教程:ApacheKafka與Spark的集成

在本章中,我們將討論如何將Apache Kafka與Spark Streaming API集成。

公司主營(yíng)業(yè)務(wù):網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站設(shè)計(jì)、移動(dòng)網(wǎng)站開(kāi)發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。成都創(chuàng)新互聯(lián)公司是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開(kāi)放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。成都創(chuàng)新互聯(lián)公司推出崆峒免費(fèi)做網(wǎng)站回饋大家。

關(guān)于Spark

Spark Streaming API支持實(shí)時(shí)數(shù)據(jù)流的可擴(kuò)展,高吞吐量,容錯(cuò)流處理。 數(shù)據(jù)可以從諸如Kafka,F(xiàn)lume,Twitter等許多源中提取,并且可以使用復(fù)雜的算法來(lái)處理,例如地圖,縮小,連接和窗口等高級(jí)功能。 最后,處理的數(shù)據(jù)可以推送到文件系統(tǒng),數(shù)據(jù)庫(kù)和活動(dòng)儀表板。 彈性分布式數(shù)據(jù)集(RDD)是Spark的基本數(shù)據(jù)結(jié)構(gòu)。 它是一個(gè)不可變的分布式對(duì)象集合。 RDD中的每個(gè)數(shù)據(jù)集劃分為邏輯分區(qū),可以在集群的不同節(jié)點(diǎn)上計(jì)算。

與Spark集成

Kafka是Spark流式傳輸?shù)臐撛谙鬟f和集成平臺(tái)。 Kafka充當(dāng)實(shí)時(shí)數(shù)據(jù)流的中心樞紐,并使用Spark Streaming中的復(fù)雜算法進(jìn)行處理。 一旦數(shù)據(jù)被處理,Spark Streaming可以將結(jié)果發(fā)布到另一個(gè)Kafka主題或存儲(chǔ)在HDFS,數(shù)據(jù)庫(kù)或儀表板中。 下圖描述了概念流程。

現(xiàn)在,讓我們?cè)敿?xì)了解Kafka-Spark API。

SparkConf API

它表示Spark應(yīng)用程序的配置。 用于將各種Spark參數(shù)設(shè)置為鍵值對(duì)。

SparkConf 類(lèi)有以下方法 -

  • set(string key,string value) - 設(shè)置配置變量。

  • remove(string key) - 從配置中移除密鑰。

  • setAppName(string name) - 設(shè)置應(yīng)用程序的應(yīng)用程序名稱。

  • get(string key) - get key

StreamingContext API

這是Spark功能的主要入口點(diǎn)。 SparkContext表示到Spark集群的連接,可用于在集群上創(chuàng)建RDD,累加器和廣播變量。 簽名的定義如下所示。

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq jars, 
   scala.collection.Map environment)
  • - 要連接的群集網(wǎng)址(例如mesos:// host:port,spark:// host:port,local [4])。

  • appName - 作業(yè)的名稱,以顯示在集群Web UI上

  • batchDuration - 流式數(shù)據(jù)將被分成批次的時(shí)間間隔

public StreamingContext(SparkConf conf, Duration batchDuration)

通過(guò)提供新的SparkContext所需的配置創(chuàng)建StreamingContext。

  • conf - Spark參數(shù)

  • batchDuration - 流式數(shù)據(jù)將被分成批次的時(shí)間間隔

KafkaUtils API

KafkaUtils API用于將Kafka集群連接到Spark流。 此API具有如下定義的顯著方法 createStream

public static ReceiverInputDStream> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map topics, StorageLevel storageLevel)

上面顯示的方法用于創(chuàng)建從Kafka Brokers提取消息的輸入流。

  • ssc - StreamingContext對(duì)象。

  • zkQuorum - Zookeeper quorum。

  • groupId - 此消費(fèi)者的組ID。

  • 主題 - 返回要消費(fèi)的主題的地圖。

  • storageLevel - 用于存儲(chǔ)接收的對(duì)象的存儲(chǔ)級(jí)別。

KafkaUtils API有另一個(gè)方法createDirectStream,用于創(chuàng)建一個(gè)輸入流,直接從Kafka Brokers拉取消息,而不使用任何接收器。 這個(gè)流可以保證來(lái)自Kafka的每個(gè)消息都包含在轉(zhuǎn)換中一次。

示例應(yīng)用程序在Scala中完成。 要編譯應(yīng)用程序,請(qǐng)下載并安裝 sbt ,scala構(gòu)建工具(類(lèi)似于maven)。 主要應(yīng)用程序代碼如下所示。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount   ")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

構(gòu)建腳本

spark-kafka集成取決于Spark,Spark流和Spark與Kafka的集成jar。 創(chuàng)建一個(gè)新文件 build.sbt ,并指定應(yīng)用程序詳細(xì)信息及其依賴關(guān)系。 在編譯和打包應(yīng)用程序時(shí), sbt 將下載所需的jar。

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

編譯/包裝

運(yùn)行以下命令以編譯和打包應(yīng)用程序的jar文件。 我們需要將jar文件提交到spark控制臺(tái)以運(yùn)行應(yīng)用程序。

sbt package

提交到Spark

啟動(dòng)Kafka Producer CLI(在上一章中解釋),創(chuàng)建一個(gè)名為 my-first-topic 的新主題,并提供一些樣本消息,如下所示。

Another spark test message

運(yùn)行以下命令將應(yīng)用程序提交到spark控制臺(tái)。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181   

此應(yīng)用程序的示例輸出如下所示。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

網(wǎng)頁(yè)名稱:創(chuàng)新互聯(lián)ApacheKafka教程:ApacheKafka與Spark的集成
URL地址:http://m.5511xx.com/article/dpdjdhh.html