火花dataframe同一列上的多个聚合操作

我有三个stringtypes的数组包含以下信息:

  • groupBy数组:包含我想要按数据分组的列的名称。
  • 聚合数组:包含我要聚合的列的名称。
  • 操作数组:包含我想要执行的聚合操作

我正在尝试使用火花数据框来实现这一点。 Spark数据框提供了一个可以传递Map [String,String](列名和相应的聚合操作)作为input的agg(),但是我想对同一列数据执行不同的聚合操作。 有关如何实现这一目标的任何build议?

斯卡拉

例如,您可以使用从名称到函数的定义mappingmapping函数列表:

 import org.apache.spark.sql.functions.{col, min, max, mean} import org.apache.spark.sql.Column val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v") val mapping: Map[String, Column => Column] = Map( "min" -> min, "max" -> max, "mean" -> avg) val groupBy = Seq("k") val aggregate = Seq("v") val operations = Seq("min", "max", "mean") val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c)))) df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show // +---+------+------+------+ // | k|min(v)|max(v)|avg(v)| // +---+------+------+------+ // | 1| 3.0| 3.0| 3.0| // | 2| -5.0| -5.0| -5.0| // +---+------+------+------+ 

要么

 df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show 

不幸的是,在内部使用的parsing器SQLContext没有公开暴露,但是您可以随时尝试构build普通的SQL查询:

 df.registerTempTable("df") val groupExprs = groupBy.mkString(",") val aggExprs = aggregate.flatMap(c => operations.map( f => s"$f($c) AS ${c}_${f}") ).mkString(",") sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs") 

Python

 from pyspark.sql.functions import mean, sum, max, col df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"]) groupBy = ["k"] aggregate = ["v"] funs = [mean, sum, max] exprs = [f(col(c)) for f in funs for c in aggregate] # or equivalent df.groupby(groupBy).agg(*exprs) df.groupby(*groupBy).agg(*exprs)