我怎样才能将一个数据框分成SCALA和SPARK中具有相同列值的数据框?

使用Scala,我怎样才能将dataFrame分割成多个dataFrame(无论是数组还是集合)具有相同的列值。 例如,我想拆分下面的DataFrame:

ID Rate State 1 24 AL 2 35 MN 3 46 FL 4 34 AL 5 78 MN 6 99 FL 

至:

数据集1

 ID Rate State 1 24 AL 4 34 AL 

数据集2

 ID Rate State 2 35 MN 5 78 MN 

数据集3

 ID Rate State 3 46 FL 6 99 FL 

您可以收集独特的状态值,并简单地映射结果数组:

 val states = df.select("State").distinct.collect.flatMap(_.toSeq) val byStateArray = states.map(state => df.where($"State" <=> state)) 

或者映射:

 val byStateMap = states .map(state => (state -> df.where($"State" <=> state))) .toMap 

在Python中同样的事情:

 from itertools import chain from pyspark.sql.functions import col states = chain(*df.select("state").distinct().collect()) # PySpark 2.3 and later # In 2.2 and before col("state") == state) # should give the same outcome, ignoring NULLs # if NULLs are important # (lit(state).isNull() & col("state").isNull()) | (col("state") == state) df_by_state = {state: df.where(col("state").eqNullSafe(state)) for state in states} 

这里显而易见的问题是它需要对每个级别进行全面的数据扫描,所以这是一个昂贵的操作。 如果您正在寻找一种方法来分割输出,请参阅如何将RDD拆分为两个或更多RDD?

特别是您可以编写由感兴趣的列分区的Dataset

 val path: String = ??? df.write.partitionBy("State").parquet(path) 

并在需要时回读:

 // Depend on partition prunning for { state <- states } yield spark.read.parquet(path).where($"State" === state) // or explicitly read the partition for { state <- states } yield spark.read.parquet(s"$path/State=$state") 

根据数据的大小,input的拆分,存储和持久化级别的数量可能比多个filter更快或更慢。

如果将数据框设置为临时表,则非常简单(如果spark版本为2)。

 df1.createOrReplaceTempView("df1") 

现在你可以做这个查询

 var df2 = spark.sql("select * from df1 where state = 'FL'") var df3 = spark.sql("select * from df1 where state = 'MN'") var df4 = spark.sql("select * from df1 where state = 'AL'") 

现在你得到了df2,df3,df4。 如果你想把它们作为列表,你可以使用,

 df2.collect() df3.collect() 

甚至映射/过滤function。 请参考https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes