`
superlxw1234
  • 浏览: 542144 次
  • 性别: Icon_minigender_1
  • 来自: 西安
博客专栏
Bd1c0a0c-379a-31a8-a3b1-e6401e2f1523
Hive入门
浏览量:43167
社区版块
存档分类
最新评论

Spark算子:统计RDD分区中的元素及数量

阅读更多

关键字:Spark算子、Spark RDD分区、Spark RDD分区元素数量

 

 

Spark RDD是被分区的,在生成RDD时候,一般可以指定分区的数量,如果不指定分区数量,当RDD从集合创建时候,则默认为该程序所分配到的资源的CPU核数,如果是从HDFS文件创建,默认为文件的Block数。

 

可以利用RDD的mapPartitionsWithIndex方法来统计每个分区中的元素及数量。

 

关于mapPartitionsWithIndex的介绍可以参考 mapPartitionsWithIndex的介绍

 

http://lxw1234.com/archives/2015/07/348.htm

 

 

具体看例子:

 

 

//创建一个RDD,默认分区15个,因为我的spark-shell指定了一共使用15个CPU资源
//–total-executor-cores 15

 

 

scala> var rdd1 = sc.makeRDD(1 to 50)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at :21
 
scala> rdd1.partitions.size
res15: Int = 15

 

 

//统计rdd1每个分区中元素数量

rdd1.mapPartitionsWithIndex{
        (partIdx,iter) => {
          var part_map = scala.collection.mutable.Map[String,Int]()
            while(iter.hasNext){
              var part_name = "part_" + partIdx;
              if(part_map.contains(part_name)) {
                var ele_cnt = part_map(part_name)
                part_map(part_name) = ele_cnt + 1
              } else {
                part_map(part_name) = 1
              }
              iter.next()
            }
            part_map.iterator
           
        }
      }.collect

res16: Array[(String, Int)] = Array((part_0,3), (part_1,3), (part_2,4), (part_3,3), 
(part_4,3), (part_5,4), (part_6,3), (part_7,3), (part_8,4), (part_9,3), (part_10,3), 
(part_11,4), (part_12,3), (part_13,3), (part_14,4))

//从part_0到part_14,每个分区中的元素数量

 

//统计rdd1每个分区中有哪些元素

 

rdd1.mapPartitionsWithIndex{
        (partIdx,iter) => {
          var part_map = scala.collection.mutable.Map[String,List[Int]]()
            while(iter.hasNext){
              var part_name = "part_" + partIdx;
              var elem = iter.next()
              if(part_map.contains(part_name)) {
                var elems = part_map(part_name)
                elems ::= elem
                part_map(part_name) = elems
              } else {
                part_map(part_name) = List[Int]{elem}
              }
            }
            part_map.iterator
           
        }
      }.collect
res17: Array[(String, List[Int])] = Array((part_0,List(3, 2, 1)), (part_1,List(6, 5, 4)), 
(part_2,List(10, 9, 8, 7)), (part_3,List(13, 12, 11)), (part_4,List(16, 15, 14)), 
(part_5,List(20, 19, 18, 17)), (part_6,List(23, 22, 21)), (part_7,List(26, 25, 24)), 
(part_8,List(30, 29, 28, 27)), (part_9,List(33, 32, 31)), (part_10,List(36, 35, 34)), 
(part_11,List(40, 39, 38, 37)), (part_12,List(43, 42, 41)), (part_13,List(46, 45, 44)), 
(part_14,List(50, 49, 48, 47)))
//从part_0到part14,每个分区中包含的元素

 

//从HDFS文件创建的RDD,包含65个分区,因为该文件由65个Block

scala> var rdd2 = sc.textFile("/logs/2015-07-05/lxw1234.com.log")
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at textFile at :21
 
scala> rdd2.partitions.size
res18: Int = 65

 

//rdd2每个分区的元素数量

 

scala> rdd2.mapPartitionsWithIndex{
     |         (partIdx,iter) => {
     |           var part_map = scala.collection.mutable.Map[String,Int]()
     |             while(iter.hasNext){
     |               var part_name = "part_" + partIdx;
     |               if(part_map.contains(part_name)) {
     |                 var ele_cnt = part_map(part_name)
     |                 part_map(part_name) = ele_cnt + 1
     |               } else {
     |                 part_map(part_name) = 1
     |               }
     |               iter.next()
     |             }
     |             part_map.iterator
     |            
     |         }
     |       }.collect
res19: Array[(String, Int)] = Array((part_0,202496), (part_1,225503), (part_2,214375), 
(part_3,215909), (part_4,208941), (part_5,205379), (part_6,207894), (part_7,209496), 
(part_8,213806), (part_9,216962), (part_10,216091), (part_11,215820), (part_12,217043), 
(part_13,216556), (part_14,218702), (part_15,218625), (part_16,218519), (part_17,221056), 
(part_18,221250), (part_19,222092), (part_20,222339), (part_21,222779), (part_22,223578), 
(part_23,222869), (part_24,221543), (part_25,219671), (part_26,222871), (part_27,223200), 
(part_28,223282), (part_29,228212), (part_30,223978), (part_31,223024), (part_32,222889), 
(part_33,222106), (part_34,221563), (part_35,219208), (part_36,216928), (part_37,216733), 
(part_38,217214), (part_39,219978), (part_40,218155), (part_41,219880), (part_42,215833...

 

更多关于Spark算子的介绍,可参考 Spark算子

http://lxw1234.com/archives/tag/spark%E7%AE%97%E5%AD%90

 

1
1
分享到:
评论

相关推荐

    Java Spark算子:sample

    import org.apache.spark.SparkConf; import org.apache.spark.api.java.... * 对RDD中的数据进行随机采样,会有误差。 * 第一个参数:boolean类型,表示产生的样本是否可以重复:false不重复,也就是不放回的取;t

    sparkStreaming实战学习资料

    Spark中的(弹性分布式数据集)简称RDD: Spark中的Transformation操作之Value数据类型的算子: Spark中的Transformation操作之Key-Value数据类型的算子: Spark中的Action操作: Transformation->map算子: ...

    Spark常用的算子以及Scala函数总结.pdf

    Spark RDD 算子说明,分别讲述了Transformation和Action这两类的算子。

    Java Spark算子:distinct

    import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.... * 去除RDD的重复元素。 * */ public class DistinctDemo { public static void main(String[] args) { S

    Spark1.4.1 RDD算子详解

    结合代码详细描述RDD算子的执行流程,并配上执行流程图

    Spark算子.pdf

    Spark对于大数据行业的实时处理数据来说,有着举足轻重的位置,特此学习整理了RDD 算子的各个含义,希望各位读者能够喜欢。谢谢

    Spark机器学习视频第4课.SparkRDD原理剖析

    课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 课时10:最终获取用户的...

    Spark学习--RDD编码

    RDD:弹性分布式数据集(ResilientDistributed Dataset),是Spark对数据的核心抽象。RDD其实是分布式的元素集合。当Spark对数据操作和转换时,会自动将RDD中的数据分发到集群,并将操作并行化执行。 Spark中的RDD是一...

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    spark: RDD与DataFrame之间的相互转换方法

    今天小编就为大家分享一篇spark: RDD与DataFrame之间的相互转换方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

    大数据spark学习之rdd概述

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有RDD 以及...

    2023-Spark-实验十三:Spark RDD 求员工工资总额

    求员工工资总额样例数据

    Spark机器学习第1课.Spark介绍

    课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 课时10:最终获取用户的...

    Spark机器学习视频第10课.最终获取用户的收藏以及订单转换率

    课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 课时10:最终获取用户的...

    hbase-rdd:Spark RDD从HBase读取,写入和删除

    hbase-rdd:Spark RDD从HBase读取,写入和删除

    Spark机器学习视频第2课.Spark2集群安装

    课时5:Spark2sql从mysql中导入 课时6:Spark1.6.2sql与mysql数据交互 课时7:SparkSQL java操作mysql数据 课时8:Spark统计用户的收藏转换率 课时9:Spark梳理用户的收藏以及订单转换率 课时10:最终获取用户的...

    Spark思维导图之Spark RDD.png

    Spark思维导图之Spark RDD.png

    spark-textFile构建RDD的分区及compute计算策略

    spark-textFile构建RDD的分区及compute计算策略

    大数据学习(二十一)spark高级算子

    这次主要分享一下spark RDD的常用的算子。 然后利用这些算子进行一些实战的操作。 高级算子 mapPartitionsWithIndex 源码: def mapPartitionsWithIndex[U: ClassTag]( f:(Int, Iterator[T]) => Iterator[U], 第一...

    Spark RDD弹性分布式数据集

    RDD可以让用户显式地将数据存储到磁盘和内存中,并且还能控制数据的分区。每个RDD都具有五大特征,具体如下。 它是集群节点上的不可改变的、已分区的集合对象; 通过并行转换的方式来创建如(map、filter、join等)...

Global site tag (gtag.js) - Google Analytics