说说你理解的sparkRdd的 partion(☆☆☆☆☆)
Spark中提供了通用的接口来抽象每个Rdd,这些接口包括
- 分区信息
- 依赖关系
- 函数,基于父Rdd的计算方法
- 划分策略和数据位置的元数据
举个🌰
一个HDFS文件的RDD将文件的每个文件块表示为一个分区,并且知道每个文件块的位置,对RDD 进行map之后分区将具有相同的划分
RDD分区的作用
一个HDFS文件的RDD将文件的每个文件块表示为一个分区,并且知道每个文件块的位置信息。这些对应着数据块的分区分布到集群的节点中,因此,分区的多少涉及对这个RDD进行并行计算的粒度。首先,分区是一个逻辑概念, 变换前后的新旧分区在物理上可能是同一块内存或者是存储。需要注意的是,如果没有指定分区数将使用默认值,而默认值是该程序所分配到CPU核数,如果是从HDFS文件创建,默认为文件的数据块数。
移动计算而不移动数据
在Spark形成任务有向无环图时,会尽可能地把计算分配到靠近数据的位置,减少数据的网络传输。当RDD分区被缓存, 则计算应该被发送到缓存分区所在的节点进行,另外,RDD的血统也会影响子RDD的位置,回溯RDD的血统,直到找到具有首选位置属性的父RDD,并据此决定子RDD的位置。
RDD分区函数
分区的划分对于shuffle类操作很关键,决定了该操作的父RDD和子RDD的依赖类型。比如之前提到的join操作,如果是协同划分的话,两个父RDD之间, 父RDD与子RDD之间能形成一致的分区安排。即同一个Key保证被映射到同一个分区,这样就是窄依赖。而如果不是协同划分,就会形成宽依赖。所谓的协同划分就是指定分区划分器以产生前后一致的分区安排。
Spark提供两种划分器,HashPartitioner (哈希分区划分器),(RangePartitioner) 范围分区划分器. 需要注意的是分区划分器只存在于PairRDD中,普通非(K,V)类型的Partitioner为None.
在以下程序中,首先构造一个MappedRDD,其partitioner的值为none,然后对RDD进行groupByKey操作group_rdd变量,对于groupByKey操作而言,这里创建了新的HashPartitioner对象。
1 | scala> var part=sc.textFile("file:/hadoop/spark/README.md") |
使用以下分区函数进行分区
coalesce(numpartitions:Int,shuffle:Boolean=false):RDD[T]repartition(numPartitions:Int):RDD[T]coalesce和repartition都是对RDD进行重新分区。coalesce操作使用HashPartitioner进行重分区,第一个参数为重分区的数目,第二个为是否shuffle,默认情况为false。repartition操作是coalesce函数第二个参数为true的实现。如果分区的数目大于原来的分区数,那么必须指定shuffle参数为true,否则分区数不变。
glom():RDD[Array[T]]glom操作是RDD中的每一个分区所有类型为T的数据转变成元素类型为T的数组[Array[T]]
mapPartitionsmapPartitions操作和map类似,只不过映射的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,mapPartitionsWithIndex作用类似于mapPartitions,只是输入参数多了一个分区索引。
1 | scala> var rdd1=sc.makeRDD(1 to 5,2) |
partitionBy(partitioner:Partitioner):RDD[(K,V)]
partitionBy操作根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区
1 | scala> var rdd1=sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) |
分区数的计算
通过scala 集合方式parallelize生成rdd,如,
val rdd = sc.parallelize(1 to 10)这种方式下,如果在parallelize操作时没有指定分区数,则rdd的分区数 =sc.defaultParallelism通过textFile方式生成的rdd,如
val rdd = sc.textFile(“path/file”)
有两种情况:
从本地文件file:///生成的rdd,操作时如果没有指定分区数,则默认分区数规则为:(按照官网的描述,本地file的分片规则,应该按照hdfs的block大小划分,但实测的结果是固定按照32M来分片,可能是bug,不过不影响使用因为spark能用所有hadoop接口支持的存储系统,所以spark textFile使用hadoop接口访问本地文件时和访问hdfs还是有区别的)rdd的分区数 =
max(本地file的分片数, sc.defaultMinPartitions)从hdfs分布式文件系统hdfs://生成的rdd,操作时如果没有指定分区数,则默认分区数规则为:rdd的分区数 =
max(hdfs文件的block数目, sc.defaultMinPartitions)
spark.default.parallelism的默认值(前提是配置文件spark-default.conf中没有显示的配置,如果配置了,则spark.default.parallelism = 配置的值)
参考
https://blog.csdn.net/justlpf/article/details/80107582
https://blog.csdn.net/weixin_37353303/article/details/86575171