在Spark DataFrame中从单个列派生多个列

我有一个巨大的可parsing元数据的DF作为一个数据框中的单个string列,让我们称之为DFA,ColmnA。

我想通过函数ClassXYZ = Func1(ColmnA)将此列ColmnA分成多个列。 这个函数返回一个具有多个variables的ClassXYZ类,每个variables现在都必须映射到新的Column,比如ColmnA1,ColmnA2等。

我如何通过调用这个Func1一次来从1个Dataframe到另外一个列进行这样的转换,而不必重复它来创build所有的列。

如果我每次都要调用这个巨大的函数来添加一个新的列,那么这很容易解决,但这是我想避免的。

请告知工作或伪代码。

谢谢

桑杰

一般来说,你想要的不是直接可能的。 UDF当时只能返回一列。 有两种不同的方法可以克服这个限制:

  1. 返回一个复杂types的列。 最一般的解决scheme是一个StructType但你也可以考虑ArrayTypeMapType

     import org.apache.spark.sql.functions.udf val df = Seq( (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c") ).toDF("x", "y", "z") case class Foobar(foo: Double, bar: Double) val foobarUdf = udf((x: Long, y: Double, z: String) => Foobar(x * y, z.head.toInt * y)) val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z")) df1.show // +---+----+---+------------+ // | x| y| z| foobar| // +---+----+---+------------+ // | 1| 3.0| a| [3.0,291.0]| // | 2|-1.0| b|[-2.0,-98.0]| // | 3| 0.0| c| [0.0,0.0]| // +---+----+---+------------+ df1.printSchema // root // |-- x: long (nullable = false) // |-- y: double (nullable = false) // |-- z: string (nullable = true) // |-- foobar: struct (nullable = true) // | |-- foo: double (nullable = false) // | |-- bar: double (nullable = false) 

    以后可以很容易地将其扁平化,但通常没有必要这样做。

  2. 切换到RDD,重塑和重buildDF:

     import org.apache.spark.sql.types._ import org.apache.spark.sql.Row def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = Seq(x * y, z.head.toInt * y) val schema = StructType(df.schema.fields ++ Array(StructField("foo", DoubleType), StructField("bar", DoubleType))) val rows = df.rdd.map(r => Row.fromSeq( r.toSeq ++ foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z")))) val df2 = sqlContext.createDataFrame(rows, schema) df2.show // +---+----+---+----+-----+ // | x| y| z| foo| bar| // +---+----+---+----+-----+ // | 1| 3.0| a| 3.0|291.0| // | 2|-1.0| b|-2.0|-98.0| // | 3| 0.0| c| 0.0| 0.0| // +---+----+---+----+-----+ 

假设你的函数之后会有一系列元素,举例如下:

 val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age") df.show +------------------+---+ | infoComb|age| +------------------+---+ |Mike,1986,Toronto| 30| | Andre,1980,Ottawa| 36| | jill,1989,London| 27| +------------------+---+ 

现在你可以用这个infoComb做什么,你可以开始拆分string,并获得更多的列:

 df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show +-----+----------+-------+---+ | name|yearOfBorn| city|age| +-----+----------+-------+---+ |Mike| 1986|Toronto| 30| |Andre| 1980| Ottawa| 36| | jill| 1989| London| 27| +-----+----------+-------+---+ 

希望这可以帮助。

如果结果列的长度与原始列的长度相同,则可以使用withColumn函数创build全新的列,并应用udf 。 在此之后,您可以删除您的原始列,例如:

  val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn"))) .withColumn("newCol2", myFun2(myDf("originalColumn")) .drop(myDf("originalColumn")) 

myFun是这样定义的udf:

  def myFun= udf( (originalColumnContent : String) => { // do something with your original column content and return a new one } ) 

我select创build一个函数来压扁一列,然后与udf同时调用它。

首先定义这个:

 implicit class DfOperations(df: DataFrame) { def flattenColumn(col: String) = { def addColumns(df: DataFrame, cols: Array[String]): DataFrame = { if (cols.isEmpty) df else addColumns( df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)), cols.tail ) } val field = df.select(col).schema.fields(0) val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name) addColumns(df, newCols).drop(col) } def withColumnMany(colName: String, col: Column) = { df.withColumn(colName, col).flattenColumn(colName) } } 

那么用法很简单:

 case class MyClass(a: Int, b: Int) val df = sc.parallelize(Seq( (0), (1) )).toDF("x") val f = udf((x: Int) => MyClass(x*2,x*3)) df.withColumnMany("test", f($"x")).show() // +---+------+------+ // | x|test_a|test_b| // +---+------+------+ // | 0| 0| 0| // | 1| 2| 3| // +---+------+------+ 

这可以通过使用枢轴function轻松实现

 df4.groupBy("year").pivot("course").sum("earnings").collect()