HashPartitioner如何工作?

我阅读了HashPartitioner的文档。 不幸的是,除了API调用之外,没有什么可解释的。 我假设HashPartitioner根据密钥的哈希来分配分布式集合。 例如,如果我的数据是

 (1,1), (1,2), (1,3), (2,1), (2,2), (2,3) 

所以分区器会把这个分区放到不同的分区中,同一个分区中的键也是一样的。 但是我不明白构造函数参数的意义

 new HashPartitoner(numPartitions) //What does numPartitions do? 

对于上面的数据集,如果我做了,结果会有什么不同

 new HashPartitoner(1) new HashPartitoner(2) new HashPartitoner(10) 

那么HashPartitioner究竟如何工作呢?

那么,让我们让你的数据集更有趣:

 val rdd = sc.parallelize(for { x <- 1 to 3 y <- 1 to 2 } yield (x, None), 8) 

我们有六个要素:

 rdd.count 
 Long = 6 

没有分区:

 rdd.partitioner 
 Option[org.apache.spark.Partitioner] = None 

和八个分区:

 rdd.partitions.length 
 Int = 8 

现在让我们定义小助手来计算每个分区的元素数量:

 def countByPartition(rdd: RDD[(Int, None.type)]) = { rdd.mapPartitions(iter => Iterator(iter.length)) } 

由于我们没有partitioner,所以我们的数据集在分区之间是均匀分布的( Spark中的Default Partitioning Scheme ):

 countByPartition(rdd).collect() 
 Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1) 

inital分布

现在让我们重新分区我们的数据集:

 import org.apache.spark.HashPartitioner val rddOneP = rdd.partitionBy(new HashPartitioner(1)) 

由于传递给HashPartitioner参数定义了一个分区的分区数量:

 rddOneP.partitions.length 
 Int = 1 

由于我们只有一个分区,它包含所有元素:

 countByPartition(rddOneP).collect 
 Array[Int] = Array(6) 

散列分割-1

请注意,shuffle之后的值的顺序是非确定性的。

同样的方法,如果我们使用HashPartitioner(2)

 val rddTwoP = rdd.partitionBy(new HashPartitioner(2)) 

我们会得到2个分区:

 rddTwoP.partitions.length 
 Int = 2 

由于rdd被关键数据分割,所以不会再统一分配:

 countByPartition(rddTwoP).collect() 
 Array[Int] = Array(2, 4) 

因为有三个键和只有两个不同的hashCode mod numPartitions值,这里没有什么意外的:

 (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2)) 
 scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1)) 

只是为了确认以上:

 rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect() 
 Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3)) 

散列分割-2-

最后用HashPartitioner(7)得到七个分区,三个非空以及两个元素:

 val rddSevenP = rdd.partitionBy(new HashPartitioner(7)) rddSevenP.partitions.length 
 Int = 7 
 countByPartition(rddTenP).collect() 
 Array[Int] = Array(0, 2, 2, 2, 0, 0, 0) 

散列分割-7-

摘要和注释

  • HashPartitioner采用一个定义分区数量的参数
  • 值使用hash分配给分区。 hash函数可能因语言而异(Scala RDD可能使用hashCodeDataSets使用MurmurHash 3,PySpark, portable_hash )。

    在这种简单的情况下,key是一个小整数,你可以假设hash是一个标识( i = hash(i) )。

    Scala API使用nonNegativeMod根据计算出的散列来确定分区,

  • 如果密钥分配不统一,那么当组件的一部分空闲时,可能会导致密钥分配不均

  • 密钥必须是可散列的。 你可以检查我的答案列表作为PySpark的reduceByKey的关键阅读PySpark的具体问题。 HashPartitioner文档强调了另一个可能的问题:

    Java数组具有基于数组身份而不是其内容的哈希码,所以尝试使用HashPartitioner对RDD [Array [ ]]或RDD [(Array [ ],_)]进行分区将产生意外或不正确的结果。

  • 在Python 3中,你必须确保哈希是一致的。 请看exception:string散列的随机性应该通过pyspark中的PYTHONHASHSEED平均值来禁用?

  • 散列分区器既不是内在的也不是完全的。 可以将多个键分配给单个分区,并且一些分区可以保持为空。

  • 请注意,当与基于REPL定义的案例类( Apache Spark中的案例类相等 )结合使用时,基于散列的方法在Scala中不起作用。

  • HashPartitioner (或任何其他Partitioner )混洗数据。 除非在多个操作之间重复使用分区,否则不会减less要混洗的数据量。

RDD是分布式的,这意味着它被分割成若干部分。 每个分区都可能位于不同的机器上。 带分隔numPartitions分区器numPartitions在放置哪个分区对(key, value)

  1. 创build完全numPartitions分区。
  2. 分区中的位置(key, value) ,分区号为Hash(key) % numPartitions

HashPartitioner.getPartition方法将一个作为参数,并返回该键所属的分区的索引 。 分区器必须知道有效索引是什么,所以它返回正确范围内的数字。 分区数量通过numPartitions构造函数参数指定。

该实现大致返回key.hashCode() % numPartitions 。 有关更多详细信息,请参阅Partitioner.scala 。