如何定义DataFrame的分区?

我已经开始在Spark 1.4.0中使用Spark SQL和DataFrame。 我想要在Scala中的DataFrames上定义一个自定义分区,但没有看到如何做到这一点。

我正在处理的其中一个数据表包含一个交易清单,按照以下示例,帐户silimar。

Account Date Type Amount 1001 2014-04-01 Purchase 100.00 1001 2014-04-01 Purchase 50.00 1001 2014-04-05 Purchase 70.00 1001 2014-04-01 Payment -150.00 1002 2014-04-01 Purchase 80.00 1002 2014-04-02 Purchase 22.00 1002 2014-04-04 Payment -120.00 1002 2014-04-04 Purchase 60.00 1003 2014-04-02 Purchase 210.00 1003 2014-04-03 Purchase 15.00 

至less在最初,大部分计算将发生在账户内的交易之间。 所以我想分区的数据,以便一个帐户的所有交易在同一个Spark分区。

但我没有看到一个方法来定义这个。 DataFrame类有一个名为“repartition(Int)”的方法,您可以在其中指定要创build的分区数。 但是我没有看到任何可用于为DataFrame定义定制分区的方法,例如可以为RDD指定的方法。

源数据存储在Parquet中。 我确实看到,在写一个DataFrame到Parquet时,你可以指定一个列进行分区,所以大概我可以告诉Parquet通过“Account”列对数据进行分区。 但是可能有数百万个帐户,如果我正确理解Parquet,它会为每个帐户创build一个不同的目录,这听起来不是一个合理的解决scheme。

有没有办法让Spark分区这个数据框,以便一个帐户的所有数据在同一个分区?

Spark> = 1.6

在Spark> = 1.6中,可以使用按列分区进行查询和caching。 参见:使用repartition方法的SPARK-11410和SPARK-4849 :

 val df = Seq( ("A", 1), ("B", 2), ("A", 3), ("C", 1) ).toDF("k", "v") val partitioned = df.repartition($"k") partitioned.explain // scala> df.repartition($"k").explain(true) // == Parsed Logical Plan == // 'RepartitionByExpression ['k], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27 // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#7], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27 // // == Optimized Logical Plan == // RepartitionByExpression [k#7], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27 // // == Physical Plan == // TungstenExchange hashpartitioning(k#7,200), None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- Scan PhysicalRDD[_1#5,_2#6] 

RDDs不同,Spark Dataset (包括Dataset[Row]又名DataFrame )目前无法使用定制分区程序。 通常可以通过创build一个人工分区列来解决这个问题,但是它不会给你相同的灵活性。

Spark <1.6.0

你可以做的一件事就是在创buildDataFrame之前对input数据进行预分区

 import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.HashPartitioner val schema = StructType(Seq( StructField("x", StringType, false), StructField("y", LongType, false), StructField("z", DoubleType, false) )) val rdd = sc.parallelize(Seq( Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0), Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99) )) val partitioner = new HashPartitioner(5) val partitioned = rdd.map(r => (r.getString(0), r)) .partitionBy(partitioner) .values val df = sqlContext.createDataFrame(partitioned, schema) 

由于从RDD创buildDataFrame只需要一个简单的映射阶段,现有的分区布局应该保留*:

 assert(df.rdd.partitions == partitioned.partitions) 

用同样的方法可以重新分区现有的DataFrame

 sqlContext.createDataFrame( df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values, df.schema ) 

所以看起来不是不可能的。 问题依然存在,如果它是有道理的。 我会认为大部分时间没有:

  1. 重新分区是一个昂贵的过程。 在典型的情况下,大部分数据必须被序列化,洗牌和反序列化。 另一方面,从预分区数据中受益的操作的数量相对较小,并且如果内部API没有被devise为利用该属性,则进一步受限。

    • 在某些情况下join,但需要内部支持,
    • 窗口函数与匹配的分区程序调用。 同上,仅限于单个窗口的定义。 它已经在内部进行了分区,所以预分区可能是多余的,
    • 使用GROUP BY简单聚合 – 可以减less临时缓冲区**的内存占用量,但总体成本要高得多。 或多或less等同于groupByKey.mapValues(_.reduce) (当前行为)vs reduceByKey (预分区)。 不太可能在实践中有用。
    • SqlContext.cacheTable数据压缩。 由于它看起来像使用运行长度编码,应用OrderedRDDFunctions.repartitionAndSortWithinPartitions可以提高压缩率。
  2. 性能高度依赖于密钥的分配。 如果偏斜,将会导致资源利用不理想。 在最坏的情况下,根本无法完成这项工作。

  3. 使用高级声明式API的一个重点是将自己与低级别的实现细节隔离开来。 正如@dwysakowicz和@RomiKuntsman所提到的,优化是Catalyst Optimizer的一个工作。 这是一个非常复杂的野兽,我真的怀疑你可以很容易地改善,不深入内部深入。

使用JDBC源进行分区

JDBC数据源支持predicates参数 。 它可以使用如下:

 sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props) 

它为每个谓词创build一个JDBC分区。 请记住,如果使用单个谓词创build的集合不是不相交的,则会在结果表中看到重复项。

DataFrameWriter partitionBy方法

Spark DataFrameWriter提供了partitionBy方法,可用于在写入时“分区”数据。 它使用提供的一组列来分隔写入数据

 val df = Seq( ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6) ).toDF("k", "v") df.write.partitionBy("k").json("/tmp/foo.json") 

这使谓词下推阅读基于关键的查询:

 val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json") df1.where($"k" === "bar") 

但它不等同于DataFrame.repartition 。 特别是像下面这样的聚合:

 val cnts = df1.groupBy($"k").sum() 

仍然需要TungstenExchange

 cnts.explain // == Physical Plan == // TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93]) // +- TungstenExchange hashpartitioning(k#90,200), None // +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99]) // +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json 

bucketBy方法 (Spark> = 2.0):

bucketBybucketBy具有类似的应用程序,但仅适用于表( saveAsTable )。 截至今天(Spark 2.1.0),看起来好像没有对bucketed表应用任何执行计划优化。


*通过分区布局我的意思是只有一个数据分布。 partitioned RDD不再有分区器。 **假设没有早期预测。 如果聚合仅涵盖列的一小部分,则可能没有任何收益。

使用由以下方法返回的DataFrame:

 yourDF.orderBy(account) 

没有明确的方法在DataFrame上使用partitionBy,只能在PairRDD上使用,但是在对DataFrame进行sorting时,它将使用LogicalPlan中的数据,这将有助于在每个Account上进行计算。

我只是偶然遇到同样的问题,我想通过帐户分区的数据框。 我假设,当你说“想要分区的数据,以便一个帐户的所有交易在同一个Spark分区”,你想它的规模和性能,但你的代码不依赖于它(如使用mapPartitions()等),对不对?

在Spark <1.6如果你创build一个HiveContext ,而不是普通的旧的SqlContext你可以使用HiveQL DISTRIBUTE BY colX... (确保N个reducer中的每一个都获得非重叠的x范围)& CLUSTER BY colX...分配和sorting)例如;

 df.registerTempTable("partitionMe") hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date") 

不知道如何适应与火花DF api。 正常的SqlContext不支持这些关键字(注意你不需要有一个configuration单元存储使用HiveContext)

编辑:火花1.6+现在在原生的DataFrame API中有这个

我能够使用RDD来做到这一点。 但是我不知道这对你是否可以接受。 一旦将DF作为RDD提供,您可以应用repartitionAndSortWithinPartitions来执行数据的自定义重新分区。

这是我用过的一个样本:

 class DatePartitioner(partitions: Int) extends Partitioner { override def getPartition(key: Any): Int = { val start_time: Long = key.asInstanceOf[Long] Objects.hash(Array(start_time)) % partitions } override def numPartitions: Int = partitions } myRDD .repartitionAndSortWithinPartitions(new DatePartitioner(24)) .map { v => v._2 } .toDF() .write.mode(SaveMode.Overwrite) 

所以,以某种答案开始:) – 你不能

我不是专家,但据我了解DataFrames,他们不等于rdd和DataFrame没有这样的事情分区。

一般DataFrame的想法是提供另一个抽象层次来处理这样的问题本身。 DataFrame上的查询被转换为逻辑计划,并进一步转换为RDD上的操作。 你build议的分区可能会自动应用或至less应该是。

如果您不相信SparkSQL会提供某种最优化的工作,那么您可以按照注释中的build议将DataFrame始终转换为RDD [Row]。