区块链技术博客
www.b2bchain.cn

Spark常用RDD算子详解!!!

这篇文章主要介绍了Spark常用RDD算子详解!!!的讲解,通过具体代码实例进行17293 讲解,并且分析了Spark常用RDD算子详解!!!的详细步骤与相关技巧,需要的朋友可以参考下https://www.b2bchain.cn/?p=17293

本文实例讲述了2、树莓派设置连接WiFi,开启VNC等等的讲解。分享给大家供大家参考文章查询地址https://www.b2bchain.cn/7039.html。具体如下:

文章目录

  • 1. Transformation转换算子
    • 1.1 Value类型
      • 1.1.1 map()映射
      • 1.1.2 mapPartitions()以分区为单位执行Map
      • 1.1.3 map()和mapPartitions()区别
      • 1.1.4 mapPartitionsWithIndex()带分区号
      • 1.1.5 flatMap()压平
      • 1.1.6 glom()分区转换数组
      • 1.1.7 groupBy()分组
      • 1.1.8 GroupBy之WordCount
      • 1.1.9 filter()过滤
      • 1.1.10 sample()采样
      • 1.1.11 distinct()去重
      • 1.1.12 coalesce()重新分区
      • 1.1.13 repartition()重新分区(执行Shuffle)
      • 1.1.14 coalesce和repartition区别
      • 1.1.15 sortBy()排序
      • 1.1.16 pipe()调用脚本
    • 1.2 双Value类型交互
      • 1.2.1 union()并集
      • 1.2.2 subtract ()差集
      • 1.2.3 intersection()交集
      • 1.2.4 zip()拉链
    • 1.3 Key-Value类型
      • 1.3.1 partitionBy()按照K重新分区
      • 1.3.2 reduceByKey()按照K聚合V
      • 1.3.3 groupByKey()按照K重新分组
      • 1.3.4 reduceByKey和groupByKey区别
      • 1.3.5 aggregateByKey()按照K处理分区内和分区间逻辑
      • 1.3.6 foldByKey()分区内和分区间相同的aggregateByKey()
      • 1.3.7 combineByKey()转换结构后分区内和分区间操作
      • 1.3.8 reduceByKey、aggregateByKey、foldByKey、combineByKey
      • 1.3.9 sortByKey()按照K进行排序
      • 1.3.10 mapValues()只对V进行操作
      • 1.3.11 join()连接 将相同key对应的多个value关联在一起
      • 1.3.12 cogroup() 类似全连接,但是在同一个RDD中对key聚合
  • 2. Action行动算子
    • 2.5.1 reduce()聚合
    • 2.5.2 collect()以数组的形式返回数据集
    • 2.5.3 count()返回RDD中元素个数
    • 2.5.4 first()返回RDD中的第一个元素
    • 2.5.5 take()返回由RDD前n个元素组成的数组
    • 2.5.6 takeOrdered()返回该RDD排序后前n个元素组成的数组
    • 2.5.7 aggregate()案例
    • 2.5.8 fold()案例
    • 2.5.9 countByKey()统计每种key的个数
    • 2.5.10 save相关算子
    • 2.5.11 foreach(f)遍历RDD中每一个元素

1. Transformation转换算子

RDD整体上分为Value类型、双Value类型和Key-Value类型

1.1 Value类型

1.1.1 map()映射

Spark常用RDD算子详解!!!

4)具体实现

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}  object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.parallelize(1 to 4,2)     val mapRdd: RDD[Int] = rdd.map(_*2)     mapRdd.collect().foreach(println)     sc.stop()   } } 

1.1.2 mapPartitions()以分区为单位执行Map

Spark常用RDD算子详解!!!

4)具体实现

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}  object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.parallelize(1 to 4,2)     val mapRdd: RDD[Int] = rdd.mapPartitions(x=>x.map(_*2))     mapRdd.collect().foreach(println)     sc.stop()   } } 

1.1.3 map()和mapPartitions()区别

Spark常用RDD算子详解!!!

1.1.4 mapPartitionsWithIndex()带分区号

Spark常用RDD算子详解!!!

4)具体实现

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}  object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.parallelize(1 to 4,2)     val mapRdd: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index,items)=>{items.map((index,_))})     mapRdd.collect().foreach(println)     sc.stop()   } } 
//扩展功能:第二个分区元素*2,其余分区不变 val mapRdd: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index,items)=>{items.map(x=>{       if (index==1){         (index,x*2)       }else{         (index,x)       }     })}) 

1.1.5 flatMap()压平

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val listRDD=sc.makeRDD(List(List(1,2),List(3,4),List(5,6),List(7)), 2)     listRDD.flatMap(x=>x).collect.foreach(println)     sc.stop()   } } 

1.1.6 glom()分区转换数组

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.parallelize(1 to 4,2)     val max: RDD[Int] = rdd.glom().map(_.max)     max.collect.foreach(println)     sc.stop()   } } 

1.1.7 groupBy()分组

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.parallelize(1 to 4,2)     val groupRdd: RDD[(Int, Iterable[Int])] = rdd.groupBy(x=>x%2)     groupRdd.collect.foreach(println)     sc.stop()   } } 

按照首字母第一个单词相同分组

val rdd1: RDD[String] = sc.makeRDD(List("hello","hive","hadoop","spark","scala")) 
rdd1.groupBy(x=>x.substring(0,1)).collect().foreach(println) 

groupBy会存在shuffle过程

shuffle:将不同的分区数据进行打乱重组的过程

shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。

1.1.8 GroupBy之WordCount

Spark常用RDD算子详解!!!

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[String] = sc.makeRDD( List("Hello Scala", "Hello Spark", "Hello World"))     rdd.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map(x=>(x._1,x._2.size)).collect.foreach(println)     sc.stop()   } } 

扩展复杂版WordCount

val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2))) 
rdd.map(x=>(x._1+" ")*x._2).flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map(x=>(x._1,x._2.size)).foreach(println) 

1.1.9 filter()过滤

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.parallelize(1 to 4,2)     rdd.filter(_%2==0).collect.foreach(println)     sc.stop()   } } 

1.1.10 sample()采样

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.parallelize(1 to 100,2)     //放回抽样结果     rdd.sample(true,0.4,2).collect.foreach(println)     //不放回抽样结果     rdd.sample(false,0.2,4).collect.foreach(println)      sc.stop()   } } 

1.1.11 distinct()去重

Spark常用RDD算子详解!!!

具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(1,2,1,5,2,9,6,1))     rdd.distinct().collect().foreach(println)     //对rdd采用多个Task去重,提高并发度     rdd.distinct(4).collect().foreach(println)     sc.stop()   } } 

1.1.12 coalesce()重新分区

Coalesce算子包括:配置执行Shuffle和配置不执行Shuffle两种方式。

1、不执行Shuffle方式

Spark常用RDD算子详解!!!

5)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4),4)     val coalesceRdd: RDD[Int] = rdd.coalesce(4)      //打印查看对应的分区数     val indexRdd: RDD[Int] = coalesceRdd.mapPartitionsWithIndex((index, datas) => {       //打印每个分区数据,并带分区号       datas.foreach(data => {         println(index + ":" + data)       })       //返回分区的数据       datas     })     indexRdd.collect()     sc.stop()   } } 

2、执行Shuffle方式

val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),3) val coalesceRdd: RDD[Int] = rdd.coalesce(4,true) 

输出结果:

0:2 3:1 0:4 3:3 0:6 3:5 

3、Shuffle原理

Spark常用RDD算子详解!!!

1.1.13 repartition()重新分区(执行Shuffle)

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),3)     val coalesceRdd: RDD[Int] = rdd.coalesce(4,true)      val repartitionRdd: RDD[Int] = rdd.repartition(2)       //打印查看对应的分区数     val indexRdd: RDD[Int] = repartitionRdd.mapPartitionsWithIndex((index, datas) => {       //打印每个分区数据,并带分区号       datas.foreach(data => {         println(index + ":" + data)       })       //返回分区的数据       datas     })     indexRdd.collect()      sc.stop()   } } 

1.1.14 coalesce和repartition区别

1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。

2)repartition实际上是调用的coalesce,进行shuffle。源码如下:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {     coalesce(numPartitions, shuffle = true) } 

3)coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的,repartition扩大分区执行shuffle。

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9,10,11),3)     //合并分区(没有shuffle)     // coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的     val cRdd: RDD[Int] = rdd.coalesce(4)      //重新分区(有shuffle)     val rRdd: RDD[Int] = rdd.repartition(4)      //打印查看对应的分区数     val indexRdd: RDD[Int] = rRdd.mapPartitionsWithIndex((index, datas) => {       //打印每个分区数据,并带分区号       datas.foreach(data => {         println(index + ":" + data)       })       //返回分区的数据       datas     })     indexRdd.collect()      sc.stop()   } } 

1.1.15 sortBy()排序

Spark常用RDD算子详解!!!

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(2,1,5,3,4,6))     rdd.sortBy(x=>x).collect().foreach(print)      rdd.sortBy(x=>x,false).collect().foreach(print)      sc.stop()   } } 

1.1.16 pipe()调用脚本

Spark常用RDD算子详解!!!

3)需求说明:编写一个脚本,使用管道将脚本作用于RDD上。

(1)编写一个脚本,并增加执行权限

vi pipe.sh 
#!/bin/sh echo "Start" while read LINE; do    echo ">>>"${LINE} done 
chmod 777 pipe.sh 

(2)创建一个只有一个分区的RDD

val rdd = sc.makeRDD (List("hi","Hello","how","are","you"),1) 

(3)将脚本作用该RDD并打印

rdd.pipe("/opt/spark/pipe.sh").collect() 

(4)创建一个有两个分区的RDD

val rdd = sc.makeRDD(List("hi","Hello","how","are","you"),2) 

(5)将脚本作用该RDD并打印

rdd.pipe("/opt/spark/pipe.sh").collect() 

1.2 双Value类型交互

1.2.1 union()并集

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(1 to 4)     val rdd1: RDD[Int] = sc.makeRDD(4 to 8)     rdd.union(rdd1).collect().foreach(println)     sc.stop()   } } 

1.2.2 subtract ()差集

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(1 to 4)     val rdd1: RDD[Int] = sc.makeRDD(4 to 8)     rdd.subtract(rdd1).collect().foreach(println)     sc.stop()   } } 

1.2.3 intersection()交集

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(1 to 4)     val rdd1: RDD[Int] = sc.makeRDD(4 to 8)     rdd.intersection(rdd1).collect().foreach(println)     sc.stop()   } } 

1.2.4 zip()拉链

Spark常用RDD算子详解!!!

3)需求说明:创建两个RDD,并将两个RDD组合到一起形成一个(k,v)RDD

4)代码实现:

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd1: RDD[Int] = sc.makeRDD(Array(1,2,3,4),3)     val rdd2: RDD[String] = sc.makeRDD(Array("a","b","c","d"),3)     val rdd3: RDD[String] = sc.makeRDD(Array("a","b","c"),3)     val rdd4: RDD[String] = sc.makeRDD(Array("a","b","c"),4)      //元素个数不同,不能拉链     //Can only zip RDDs with same number of elements in each partition     //rdd1.zip(rdd3).collect().foreach(println)      //分区数不同,不能拉链     //Can't zip RDDs with unequal numbers of partitions: List(3, 4)     //rdd1.zip(rdd4).collect().foreach(println)      rdd1.zip(rdd2).collect().foreach(println)     sc.stop()   } } 

1.3 Key-Value类型

1.3.1 partitionBy()按照K重新分区

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)     val rdd1: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))     println(rdd1.partitions.size)     sc.stop()   } } 

5)HashPartitioner源码解读

class HashPartitioner(partitions: Int) extends Partitioner {     require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")          def numPartitions: Int = partitions          def getPartition(key: Any): Int = key match {         case null => 0         case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)     }          override def equals(other: Any): Boolean = other match {         case h: HashPartitioner =>             h.numPartitions == numPartitions         case _ =>             false     }          override def hashCode: Int = numPartitions } 

6)自定义分区器

//自定义分区 class MyPartitioner(num:Int) extends Partitioner{   //设置分区数   override def numPartitions: Int = num   //具体分区逻辑   override def getPartition(key: Any): Int = {     if (key.isInstanceOf[Int]){       if (key.asInstanceOf[Int]%2==0){         0       }else{         1       }     }else{       0     }   } } object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)     val rdd1: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))      val indexRdd: RDD[(Int, String)] = rdd1.mapPartitionsWithIndex((index, datas) => {       datas.foreach(data => {         println(index + ":" + data)       })       datas     })     indexRdd.collect()      sc.stop()   } } 

1.3.2 reduceByKey()按照K聚合V

Spark常用RDD算子详解!!!

4)具体实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))     val value: RDD[(String, Int)] = rdd.reduceByKey(_+_)     value.foreach(println)     sc.stop()   } } 

1.3.3 groupByKey()按照K重新分组

Spark常用RDD算子详解!!!

3)需求说明:创建一个pairRDD,将相同key对应值聚合到一个seq中,并计算相同key对应值的相加结果。

4)代码实现:

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))     val groupRdd: RDD[(String, Iterable[Int])] = rdd.groupByKey()     groupRdd.collect().foreach(println)     groupRdd.map(x=>(x._1,x._2.sum)).collect().foreach(println)     sc.stop()   } } 

1.3.4 reduceByKey和groupByKey区别

1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。

2)groupByKey:按照key进行分组,直接进行shuffle。

3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。

1.3.5 aggregateByKey()按照K处理分区内和分区间逻辑

Spark常用RDD算子详解!!!

4)需求分析

Spark常用RDD算子详解!!!

5)代码实现:

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)     rdd.aggregateByKey(0)(math.max(_,_),_+_).collect().foreach(println)     sc.stop()   } } 

1.3.6 foldByKey()分区内和分区间相同的aggregateByKey()

Spark常用RDD算子详解!!!

4)代码实现:

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)     rdd.foldByKey(0)(_+_).collect().foreach(println)     sc.stop()   } } 

1.3.7 combineByKey()转换结构后分区内和分区间操作

Spark常用RDD算子详解!!!

3)需求说明:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key对应值的总和以及key出现的次数,再相除得到结果)

4)需求分析:

Spark常用RDD算子详解!!!

5)代码实现

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)     val combineRdd: RDD[(String, (Int, Int))] = rdd.combineByKey(       (_, 1),       (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),       (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)     )     combineRdd.map(x=>{       (x._1,x._2._1/x._2._2)     }).collect().foreach(println)     sc.stop()   } } 

1.3.8 reduceByKey、aggregateByKey、foldByKey、combineByKey

Spark常用RDD算子详解!!!

1.3.9 sortByKey()按照K进行排序

Spark常用RDD算子详解!!!

3)需求说明:创建一个pairRDD,按照key的正序和倒序进行排序

4)代码实现:

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))     rdd.sortByKey(true).collect().foreach(println)     rdd.sortByKey(false).collect().foreach(println)     sc.stop()   } } 

1.3.10 mapValues()只对V进行操作

Spark常用RDD算子详解!!!

4)代码实现:

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))     rdd.mapValues(x=>x+"|||").collect().foreach(println)     sc.stop()   } } 

1.3.11 join()连接 将相同key对应的多个value关联在一起

Spark常用RDD算子详解!!!

3)需求说明:创建两个pairRDD,并将key相同的数据聚合到一个元组。

4)代码实现:

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))     val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))      rdd.join(rdd1).collect().foreach(println)     sc.stop()   } } 

1.3.12 cogroup() 类似全连接,但是在同一个RDD中对key聚合

操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。

Spark常用RDD算子详解!!!

4)代码实现:

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))     val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))      rdd.cogroup(rdd1).collect().foreach(println)     sc.stop()   } } 

练习

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[String] = sc.textFile("in/agent.log")      val rdd1: RDD[Array[String]] = rdd.map(x=>x.split(" "))      val rdd2: RDD[(String, Int)] = rdd1.map(x=>(x(1)+"-"+x(4),1))     rdd2.reduceByKey(_+_).map(x=>{       val strings: Array[String] = x._1.split("-")       (strings(0),(strings(1),x._2))     }).groupByKey().mapValues(x=>{       x.toList.sortBy(-_._2).take(3)     }).collect().foreach(println)     sc.stop()   } } 

2. Action行动算子

行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。

2.5.1 reduce()聚合

1)函数签名:def reduce(f: (T, T) => T): T

2)功能说明:f函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。

Spark常用RDD算子详解!!!

3)需求说明:创建一个RDD,将所有元素聚合得到结果

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))     println(rdd.reduce(_ + _))     sc.stop()   } } 

2.5.2 collect()以数组的形式返回数据集

1)函数签名:def collect(): Array[T]

2)功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。

Spark常用RDD算子详解!!!

注意:所有的数据都会被拉取到Driver端,慎用

3)需求说明:创建一个RDD,并将RDD内容收集到Driver端打印

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))     println(rdd.collect().toList)     sc.stop()   } } 

2.5.3 count()返回RDD中元素个数

1)函数签名:def count(): Long

2)功能说明:返回RDD中元素的个数

Spark常用RDD算子详解!!!

3)需求说明:创建一个RDD,统计该RDD的条数

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))     println(rdd.count())     sc.stop()   } } 

2.5.4 first()返回RDD中的第一个元素

1)函数签名: def first(): T

2)功能说明:返回RDD中的第一个元素

Spark常用RDD算子详解!!!

3)需求说明:创建一个RDD,返回该RDD中的第一个元素

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))     println(rdd.first())     sc.stop()   } } 

2.5.5 take()返回由RDD前n个元素组成的数组

1)函数签名: def take(num: Int): Array[T]

2)功能说明:返回一个由RDD的前n个元素组成的数组

Spark常用RDD算子详解!!!

3)需求说明:创建一个RDD,统计该RDD的条数

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))     println(rdd.take(3).toList)     sc.stop()   } } 

2.5.6 takeOrdered()返回该RDD排序后前n个元素组成的数组

1)函数签名: def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

2)功能说明:返回该RDD排序后的前n个元素组成的数组

Spark常用RDD算子详解!!!

3)需求说明:创建一个RDD,获取该RDD排序后的前2个元素

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,2,3,4))     println(rdd.takeOrdered(3).toList)     sc.stop()   } } 

2.5.7 aggregate()案例

Spark常用RDD算子详解!!!

3)需求说明:创建一个RDD,将所有元素相加得到结果

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,2,3,4))     println(rdd.aggregate(0)(_ + _, _ + _))     sc.stop()   } } 

2.5.8 fold()案例

Spark常用RDD算子详解!!!

3)需求说明:创建一个RDD,将所有元素相加得到结果

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,2,3,4))     println(rdd.fold(0)(_ + _))     sc.stop()   } } 

2.5.9 countByKey()统计每种key的个数

1)函数签名:def countByKey(): Map[K, Long]

2)功能说明:统计每种key的个数

Spark常用RDD算子详解!!!

3)需求说明:创建一个PairRDD,统计每种key的个数

object ScalaRDD {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")     val sc = new SparkContext(conf)     val rdd = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))     rdd.countByKey().foreach(println)     sc.stop()   } } 

2.5.10 save相关算子

1)saveAsTextFile(path)保存成Text文件

(1)函数签名

(2)功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

2)saveAsSequenceFile(path)?保存成Sequencefile文件

(1)函数签名

(2)功能说明:将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

注意:只有kv类型RDD有该操作,单值的没有

3)saveAsObjectFile(path)?序列化成对象保存到文件

(1)函数签名

(2)功能说明:用于将RDD中的元素序列化成对象,存储到文件中。

2.5.11 foreach(f)遍历RDD中每一个元素

Spark常用RDD算子详解!!!

本文转自互联网,侵权联系删除Spark常用RDD算子详解!!!

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » Spark常用RDD算子详解!!!
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们