日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
SparkValue類型的常用算子

Spark RDD常用算子:Value類型

Spark之所以比Hadoop靈活和強(qiáng)大,其中一個(gè)原因是Spark內(nèi)置了許多有用的算子,也就是方法。通過(guò)對(duì)這些方法的組合,編程人員就可以寫(xiě)出自己想要的功能。說(shuō)白了spark編程就是對(duì)spark算子的使用,下面為大家詳細(xì)講解一下SparkValue類型的常用算子

創(chuàng)新互聯(lián)建站是一家專注于成都網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)與策劃設(shè)計(jì),云陽(yáng)網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)建站做網(wǎng)站,專注于網(wǎng)站建設(shè)10年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:云陽(yáng)等地區(qū)。云陽(yáng)做網(wǎng)站價(jià)格咨詢:13518219792

map

函數(shù)說(shuō)明:

map() 接收一個(gè)函數(shù),該函數(shù)將RDD中的元素逐條進(jìn)行映射轉(zhuǎn)換,可以是類型的轉(zhuǎn)換,也可以是值的轉(zhuǎn)換,將函數(shù)的返回結(jié)果作為結(jié)果RDD編程。

函數(shù)簽名:

def map[U: ClassTag](f: T => U): RDD[U]

案例演示

   val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
   val sc = new SparkContext(sparkConf)
   //算子 -map
   val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
   val mapRdd1 = rdd.map(
     _*2
   )
   mapRdd1.collect().foreach(println)
   sc.stop()

運(yùn)行結(jié)果

2
4
6
8

mapPartitons

函數(shù)說(shuō)明:

將待處理的數(shù)據(jù)以分區(qū)為單位發(fā)送到待計(jì)算節(jié)點(diǎn)上進(jìn)行處理,mapPartition是對(duì)RDD的每一個(gè)分區(qū)的迭代器進(jìn)行操作,返回的是迭代器。這里的處理可以進(jìn)行任意的處理。

函數(shù)簽名:

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

案例演示

 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
   val sc = new SparkContext(sparkConf)
   //算子 -mapPartitons 計(jì)算每個(gè)分區(qū)的最大數(shù)
   val rdd = sc.makeRDD(List(1, 34, 36,345,2435,2342,62,35, 4),4)
   val mapParRdd = rdd.mapPartitions(
     iter => {
       List(iter.max).iterator
     }
   )
   mapParRdd.foreach(println)
   sc.stop()
 }

運(yùn)行結(jié)果:

62
2435
34
345

mapPartitonsWithIndex

函數(shù)說(shuō)明:

將待處理的數(shù)據(jù)以分區(qū)為單位發(fā)送到計(jì)算節(jié)點(diǎn)上,這里的處理可以進(jìn)行任意的處理,哪怕是過(guò)濾數(shù)據(jù),在處理的同時(shí)可以獲取當(dāng)前分區(qū)的索引值。

函數(shù)簽名:

def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

案例演示:

  1. 將數(shù)據(jù)進(jìn)行扁平化映射并且打印所在的分區(qū)數(shù)
def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala", "Word Count"),2)
   val mapRDD = rdd.flatMap(_.split(" "))
   val mpwiRdd = mapRDD.mapPartitionsWithIndex(
     (index, datas) => {
       datas.map(
         num => {
           (index, num)
         }
       )
     }
   )
   mpwiRdd.collect().foreach(println)
 }

運(yùn)行結(jié)果:

(0,Hello)
(0,Spark)
(1,Hello)
(1,Scala)
(1,Word)
(1,Count)
  1. 將數(shù)據(jù)進(jìn)行扁平化映射只打印所在第一分區(qū)的數(shù)據(jù)
def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala", "Word Count"),2)
   val mapRDD = rdd.flatMap(_.split(" "))
   val mpwiRdd = mapRDD.mapPartitionsWithIndex(
     (index, datas) => {
       if (index==0){
         datas.map(
           num => {
             (index, num)
           }
         )
       }else{
       Nil.iterator
       }
     }
   )
   mpwiRdd.collect().foreach(println)

運(yùn)行結(jié)果:

(0,Hello)
(0,Spark)

flatMap

函數(shù)說(shuō)明:

將數(shù)據(jù)進(jìn)行扁平化之后在做映射處理,所以算子也稱為扁平化映射

函數(shù)簽名:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

案例演示:

將每個(gè)單詞進(jìn)行扁平化映射

def main(args: Array[String]): Unit = {
 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
 val sc = new SparkContext(sparkConf)
 //算子 -map
 val rdd = sc.makeRDD(List("Hello Scala","Hello Spark"), 2)
 val FltRdd = rdd.flatMap(
   _.split(" ")
 )
 FltRdd.foreach(println)
 sc.stop()
}

運(yùn)行結(jié)果:

Hello
Scala
Hello
Spark

glom

函數(shù)說(shuō)明:

glom的作用就是將一個(gè)分區(qū)的數(shù)據(jù)合并到一個(gè)array中。

函數(shù)簽名:

def glom(): RDD[Array[T]]

案例演示:

  1. 將不同分區(qū)rdd的元素合并到一個(gè)分區(qū)
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9),2)
   val glomRdd = rdd.glom()
   glomRdd.collect().foreach(data=>println(data.mkString(",")))
   sc.stop()
 }

運(yùn)行結(jié)果:

1,2,3,4
5,6,7,8,9

groupBy

函數(shù)說(shuō)明:

將數(shù)據(jù)根據(jù)指定的規(guī)則進(jìn)行分組,分區(qū)默認(rèn)不變,單數(shù)數(shù)據(jù)會(huì)被打亂,我們成這樣的操作為shuffer,

函數(shù)簽名:

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

案例演示:

  1. 按照奇偶數(shù)進(jìn)行g(shù)roupby分區(qū)
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8,10),2)
   val groupByRDD = rdd.groupBy(_ % 2 == 0)
   groupByRDD.collect().foreach(println)
   sc.stop()
 }

運(yùn)行結(jié)果:

(false,CompactBuffer(1, 3, 5, 7))
(true,CompactBuffer(2, 4, 6, 8, 10))
  1. 按照單詞的首字母進(jìn)行分組
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello","Tom","Timi","Scala","Spark"))
   val groupByRDD = rdd.groupBy(_.charAt(0))
   groupByRDD.collect().foreach(println)
   sc.stop()
 }

運(yùn)行結(jié)果:

(T,CompactBuffer(Tom, Timi))
(H,CompactBuffer(Hello))
(S,CompactBuffer(Scala, Spark))

filter

函數(shù)說(shuō)明:

filter即過(guò)濾器的意思,所以filter算子的作用就是過(guò)濾的作用。filter將根據(jù)指定的規(guī)則進(jìn)行篩選過(guò)濾,符合條件的數(shù)據(jù)保留,不符合的數(shù)據(jù)丟棄,當(dāng)數(shù)據(jù)進(jìn)行篩選過(guò)濾之后,分區(qū)不變,但分區(qū)內(nèi)的數(shù)據(jù)可能不均衡,生產(chǎn)環(huán)境下,可能會(huì)出現(xiàn)數(shù)據(jù)傾斜。

函數(shù)簽名:

def filter(f: T => Boolean): RDD[T]

案例演示:

  1. 篩選出能被二整除的數(shù)字
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List(46,235,246,2346,3276,235,234,6234,6245,246,24,6246,235,26,265))
   val filterRDD = rdd.filter(_ % 2 == 0)
   filterRDD.collect().foreach(println)
   sc.stop()
 }

運(yùn)行結(jié)果:

46
246
2346
3276
234
6234
246
24
6246
26

2.篩選單詞中包含H的

 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello","Horber","Hbeer","ersfgH","Scala","Hadoop","Zookeeper"))
   val filterRDD = rdd.filter(_.contains("H"))
   filterRDD.collect().foreach(println)
   sc.stop()
 }

運(yùn)行結(jié)果:

Hello
Horber
Hbeer
ersfgH
Hadoop

網(wǎng)站名稱:SparkValue類型的常用算子
分享地址:http://m.5511xx.com/article/cospdjo.html