区块链技术博客
www.b2bchain.cn

spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey

这篇文章主要介绍了spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey的讲解,通过具体代码实例进行16796 讲解,并且分析了spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey的详细步骤与相关技巧,需要的朋友可以参考下https://www.b2bchain.cn/?p=16796

本文实例讲述了2、树莓派设置连接WiFi,开启VNC等等的讲解。分享给大家供大家参考文章查询地址https://www.b2bchain.cn/7039.html。具体如下:

章节目录

  • 一、reduceByKey
    • scala版本
    • Java版本
  • 二、foldByKey
    • scala版本
  • 三、sortByKey
    • scala版本
    • Java版本

一、reduceByKey

函数定义

def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] 

接收一个函数,按照相同的key进行reduce操作,类似于scala的reduce的操作

scala版本

// reduceByKey val rdd1 = sc.parallelize(List((1,2),(1,3),(4,6),(4,8),(5,2),(1,7))) val reduceByKeyRDD = rdd1.reduceByKey((x,y) => {println("one:"+x,"two:"+y);x+y} ) reduceByKeyRDD.collect.foreach(println)  //单词计数 val lines = sc.textFile("in/word.txt") lines.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println) 

spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey

Java版本

public static void main(String[] args) {     SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("ReduceByKeyJava");     JavaSparkContext sc = new JavaSparkContext(conf);      JavaRDD<String> rdd1 = sc.textFile("in/sample.txt");      PairFlatMapFunction<String, String, Integer> pairFlatMapFunction = new PairFlatMapFunction<String, String, Integer>() {         @Override         public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {             String[] split = s.split(" ");             ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();             for (String str :                     split) {                 Tuple2<String, Integer> tup2 = new Tuple2<String, Integer>(str, 1);                 list.add(tup2);             }             return list.iterator();         }     };      Function2<Integer, Integer, Integer> function2 = new Function2<Integer, Integer, Integer>() {         @Override         public Integer call(Integer v1, Integer v2) throws Exception {             return v1 + v2;         }     };      JavaPairRDD<String, Integer> stringIntegerJavaPairRDD =             rdd1.flatMapToPair(pairFlatMapFunction).reduceByKey(function2);      List<Tuple2<String, Integer>> collect = stringIntegerJavaPairRDD.collect();     for (Tuple2<String, Integer> tuple2 :             collect) {         System.out.println(tuple2);     } } 

spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey

二、foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]  def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] 

该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V

与reduce不同的是 foldByKey开始折叠的第一个元素不是集合中的第一个元素,而是传入的一个元素

scala版本

object FoldByKeyScala {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setMaster("local[*]").setAppName("foldByKeyScala")     val sc = new SparkContext(conf)      val rdd1 = sc.parallelize(List(("A",2),("A",3),("B",5),("B",8)))     val foldByKeyRDD = rdd1.foldByKey(10)((x,y)=>{println("one:"+x+" two:"+y);x+y})     foldByKeyRDD.collect.foreach(println)   } } 

spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey

三、sortByKey

函数定义

def sortByKey(ascending : scala.Boolean = { /* compiled code */ }, numPartitions : scala.Int = { /* compiled code */ }) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } 

SortByKey用于对pairRDD按照key进行排序,第一个参数可以设置true或者false,默认是true

scala版本

scala> val rdd = sc.parallelize(Array((3, 4),(1, 2),(4,4),(2,5), (6,5), (5, 6))); rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at < console>:24  // sortByKey不是Action操作,只能算是转换操作 scala> rdd.sortByKey() res0: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at sortByKey at <console>:26  //看看sortByKey后是什么类型 scala> rdd.sortByKey().collect() res1: Array[(Int, Int)] = Array((1,2), (2,5), (3,4), (4,4), (5,6), (6,5))  //降序排序 scala> rdd.sortByKey(false).collect() res2: Array[(Int, Int)] = Array((6,5), (5,6), (4,4), (3,4), (2,5), (1,2)) 

spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey

Java版本

public class SortByKeyJava {     public static void main(String[] args) {         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sortByKeyJava");         JavaSparkContext sc = new JavaSparkContext(conf);          List<Tuple2<Integer, String>> list = new ArrayList<>();         list.add(new Tuple2<>(5,"study"));         list.add(new Tuple2<>(2,"java"));         list.add(new Tuple2<>(4,"spark"));         list.add(new Tuple2<>(3,"kb09"));         list.add(new Tuple2<>(1,"hello"));          JavaRDD<Tuple2<Integer, String>> rdd1 = sc.parallelize(list);          PairFunction<Tuple2<Integer, String>, Integer, String> pairFunction = new PairFunction<Tuple2<Integer, String>, Integer,  String>() {             @Override             public Tuple2<Integer, String> call(Tuple2<Integer, String> tup2) throws Exception {                 System.out.println(tup2._1 + " " + tup2._2);                 return tup2;             }         };         JavaPairRDD<Integer, String> integerStringJavaPairRDD = rdd1.mapToPair(pairFunction);         // false:降序排序         JavaPairRDD<Integer, String> sortByKeyRDD = integerStringJavaPairRDD.sortByKey(false);         List<Tuple2<Integer, String>> collect = sortByKeyRDD.collect();         for (Tuple2 tup2 :                 collect) {             System.out.println(tup2);         }     } } 

spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey

本文转自互联网,侵权联系删除spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们