连接apache spark数据框中的列

我们如何在数据框中连接2列? 有没有在火花SQL中的任何function,我们可以用来连接一个DF表中的2列。

使用原始的SQL你可以使用CONCAT

  • 在Python中

     df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v")) df.registerTempTable("df") sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df") 
  • 在斯卡拉

     import sqlContext.implicits._ val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v") df.registerTempTable("df") sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df") 

自Spark 1.5.0以来,您可以使用带有DataFrame API的concat函数:

  • 在Python中:

     from pyspark.sql.functions import concat, col, lit df.select(concat(col("k"), lit(" "), col("v"))) 
  • 在Scala中:

     import org.apache.spark.sql.functions.{concat, lit} df.select(concat($"k", lit(" "), $"v")) 

还有一个concat_ws函数,它将string分隔符作为第一个参数。

如果你想使用DF,你可以使用udf来添加一个基于现有列的新列。

 val sqlContext = new SQLContext(sc) case class MyDf(col1: String, col2: String) //here is our dataframe val df = sqlContext.createDataFrame(sc.parallelize( Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F")) )) //Define a udf to concatenate two passed in string values val getConcatenated = udf( (first: String, second: String) => { first + " " + second } ) //use withColumn method to add a new column called newColName df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show() 

以下是如何做自定义命名

 import pyspark from pyspark.sql import functions as sf sc = pyspark.SparkContext() sqlc = pyspark.SQLContext(sc) df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2']) df.show() 

给,

 +--------+--------+ |colname1|colname2| +--------+--------+ | row11| row12| | row21| row22| +--------+--------+ 

通过连接创build新列:

 df = df.withColumn('joined_column', sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2'))) df.show() +--------+--------+-------------+ |colname1|colname2|joined_column| +--------+--------+-------------+ | row11| row12| row11_row12| | row21| row22| row21_row22| +--------+--------+-------------+ 

这是另一种做这个pyspark的方法:

 #import concat and lit functions from pyspark.sql.functions from pyspark.sql.functions import concat, lit #Create your data frame countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa']) #Use select, concat, and lit functions to do the concatenation personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African')) #Show the new data frame personDF.show() ----------RESULT------------------------- 84 +------------+ |East African| +------------+ | Ethiopian| | Kenyan| | Ugandan| | Rwandan| +------------+ 

当您不知道数据框中列的编号或名称时,build议您提供一个build议。

 val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*)) 

pySpark使用sqlContext的另一种方法…

 #Suppose we have a dataframe: df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2']) # Now we can concatenate columns and assign the new column a name df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))