在Spark DataFrame中查找每个组的最大行数

我试图使用Spark数据框而不是RDDs,因为它们看起来比RDD更高级,并且倾向于生成更多可读的代码,但是我将非常乐意获得关于手头任务更为惯用的build议。

在一个14节点的Google Dataproc集群中,我有大约6百万个名字被两个不同的系统翻译成ID: sasb 。 每Row包含nameid_said_sb 。 我的目标是产生从id_said_sb的映射,使得对于每个id_sa ,相应的id_sb是与id_sb相连的所有名称中最频繁的id。

我们试着用一个例子来澄清一下。 如果我有以下几行:

 [Row(name='n1', id_sa='a1', id_sb='b1'), Row(name='n2', id_sa='a1', id_sb='b2'), Row(name='n3', id_sa='a1', id_sb='b2'), Row(name='n4', id_sa='a2', id_sb='b2')] 

我的目标是产生从a1b2的映射。 实际上,与a1相关的名称分别是n1n2n3 ,它们分别映射到b1b2b2 ,所以b2是与a1相关联的名称中最频繁的映射。 以同样的方式, a2将被映射到b2 。 假设永远有一个赢家是没有问题的:不需要打破关系。

我希望我可以在我的数据groupBy(df.id_sa)上使用groupBy(df.id_sa) ,但是我不知道下一步该怎么做。 我希望有一个聚合,最后可以产生以下几行:

 [Row(id_sa=a1, max_id_sb=b2), Row(id_sa=a2, max_id_sb=b2)] 

但也许我试图使用错误的工具,我应该回去使用RDD。

使用join (如果join ,会导致组中有多行):

 import pyspark.sql.functions as F from pyspark.sql.functions import count, col cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts") maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs") cnts.join(maxs, (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa")) ).select(col("cnts.id_sa"), col("cnts.id_sb")) 

使用窗口函数(将放弃关系):

 from pyspark.sql.functions import rowNumber from pyspark.sql.window import Window w = Window().partitionBy("id_sa").orderBy(col("cnt").desc()) (cnts .withColumn("rn", rowNumber().over(w)) .where(col("rn") == 1) .select("id_sa", "id_sb")) 

使用structsorting:

 from pyspark.sql.functions import struct (cnts .groupBy("id_sa") .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max")) .select(col("id_sa"), col("max.id_sb"))) 

另请参阅SPARK DataFrame:select每个组的第一行

我想你可能正在寻找窗口函数: http : //spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight= window# pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

下面是Scala中的一个例子(我现在没有Hive可用的Spark Shell,所以我无法testing代码,但我认为它应该可以工作):

 case class MyRow(name: String, id_sa: String, id_sb: String) val myDF = sc.parallelize(Array( MyRow("n1", "a1", "b1"), MyRow("n2", "a1", "b2"), MyRow("n3", "a1", "b2"), MyRow("n1", "a2", "b2") )).toDF("name", "id_sa", "id_sb") import org.apache.spark.sql.expressions.Window val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc) myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb") 

有可能更有效的方法来实现与Window函数相同的结果,但我希望这指出你在正确的方向。