SPARK SQLreplace为mysql GROUP_CONCAT聚合函数
我有一个两个stringtypes的列表(用户名,朋友),并为每个用户名,我想收集一行中的所有朋友,连接为string('username1','friends1,friends2,friends3')。 我知道MySql是通过GROUP_CONCAT做到的,有没有办法用SPARK SQL来做到这一点?
谢谢
在继续之前:这个操作是又一个groupByKey
。 虽然它有多个合法的应用程序,但相对昂贵,所以一定要在需要的时候使用它。
不完全简洁或有效的解决scheme,但您可以使用Spark 1.5.0中引入的UserDefinedAggregateFunction
:
object GroupConcat extends UserDefinedAggregateFunction { def inputSchema = new StructType().add("x", StringType) def bufferSchema = new StructType().add("buff", ArrayType(StringType)) def dataType = StringType def deterministic = true def initialize(buffer: MutableAggregationBuffer) = { buffer.update(0, ArrayBuffer.empty[String]) } def update(buffer: MutableAggregationBuffer, input: Row) = { if (!input.isNullAt(0)) buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0)) } def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)) } def evaluate(buffer: Row) = UTF8String.fromString( buffer.getSeq[String](0).mkString(",")) }
用法示例:
val df = sc.parallelize(Seq( ("username1", "friend1"), ("username1", "friend2"), ("username2", "friend1"), ("username2", "friend3") )).toDF("username", "friend") df.groupBy($"username").agg(GroupConcat($"friend")).show ## +---------+---------------+ ## | username| friends| ## +---------+---------------+ ## |username1|friend1,friend2| ## |username2|friend1,friend3| ## +---------+---------------+
您也可以创build一个Python包装器,如Spark所示:如何使用Scala或Java用户定义的函数映射Python?
实际上,提取RDD, groupByKey
, mkString
和重buildDataFrame可能会更快。
通过将collect_list
函数(Spark> = 1.6.0)与concat_ws
结合起来,您可以获得类似的效果:
import org.apache.spark.sql.functions.{collect_list, udf, lit} df.groupBy($"username") .agg(concat_ws(",", collect_list($"friend")).alias("friends"))
你可以尝试collect_list函数
sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A
或者你可以注册一个UDF的东西
sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b))
你可以在查询中使用这个函数
sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A")
用pyspark <1.6来做到这一点,不幸的是,它不支持用户定义的聚合函数:
byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y)
如果你想再次使它成为一个数据框:
sqlContext.createDataFrame(byUsername, ["username", "friends"])
从1.6开始,您可以使用collect_list然后join创build的列表:
from pyspark.sql import functions as F from pyspark.sql.types import StringType join_ = F.udf(lambda x: ", ".join(x), StringType()) df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends"))
语言 :斯卡拉Spark版本 :1.5.2
我有同样的问题,也试图解决它使用udfs
但不幸的是,由于types不一致,这导致了代码后面更多的问题。 我能够解决这个问题,首先将DF
转换为RDD
然后按照所需的方式对数据进行分组和操作,然后将RDD
转换回DF
,如下所示:
val df = sc .parallelize(Seq( ("username1", "friend1"), ("username1", "friend2"), ("username2", "friend1"), ("username2", "friend3"))) .toDF("username", "friend") +---------+-------+ | username| friend| +---------+-------+ |username1|friend1| |username1|friend2| |username2|friend1| |username2|friend3| +---------+-------+ val dfGRPD = df.map(Row => (Row(0), Row(1))) .groupByKey() .map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))} .toDF("username", "groupOfFriends") +---------+---------------+ | username| groupOfFriends| +---------+---------------+ |username1|friend2,friend1| |username2|friend3,friend1| +---------+---------------+