在Apache Spark 1.3中向数据框添加一列
这是可能的,什么是最有效的整洁的方法添加一列到数据框?
更具体地说,列可以用作现有数据框的行ID。
在简单的情况下,从文件中读取,而不是标记它,我可以想到下面的东西(在斯卡拉),但它完成与错误(在第3行),反正看起来不是最好的路线可能:
var dataDF = sc.textFile("path/file").toDF() val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") dataDF = dataDF.withColumn("ID", rowDF("ID"))
自从我发布这个问题已经有一段时间了,看起来其他一些人也希望得到答案。 以下是我发现的。
因此,最初的任务是在任何给定的数据框中附加一个带有行标识符(基本上,序列1 to numRows
)的列,因此可以跟踪行顺序/在线状态(例如,当您抽样时)。 这可以通过以下方面来实现:
sqlContext.textFile(file). zipWithIndex(). map(case(d, i)=>i.toString + delimiter + d). map(_.split(delimiter)). map(s=>Row.fromSeq(s.toSeq))
关于将任何列附加到任何dataframe的一般情况:
Spark API中的“最接近”这个function是withColumn
和withColumnRenamed
。 根据Scala文档 ,前者通过添加一列来返回新的DataFrame 。 在我看来,这是一个有点混乱和不完整的定义。 这两个函数都可以在this
dataframe上运行,也就是给出两个dataframedf1
和df2
,列col
:
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
因此,除非您可以设法将现有数据withColumnRenamed
的列withColumnRenamed
为所需的形状,否则无法使用withColumn
或withColumnRenamed
来附加任意列(独立或其他数据框)。
正如上面所说的,解决方法可能是使用zipWithIndex
– 虽然可能,但这样做相当麻烦 – 将上面的唯一键与zipWithIndex
到数据框或列都可能工作。 虽然效率是…
很显然,在数据框中添加一列对于分布式环境来说并不是一个简单的function,因此可能根本就没有一个非常高效的整洁的方法。 但是我认为即使有性能警告,仍然有这个核心function是非常重要的。
不知道它是否在火花1.3工作,但在火花1.5我使用列:
import sqlContext.implicits._ import org.apache.spark.sql.functions._ df.withColumn("newName",lit("newValue"))
当我需要使用与数据框的现有列不相关的值时,我使用它
这与@ NehaM的答案类似,但更简单
我从上面的答案帮助。 但是,如果我们想要更改DataFrame
并且Spark 1.6
当前API几乎不同,我觉得它不完整。 zipWithIndex()
返回一个包含每行和相应索引的(Row, Long)
Tuple
。 我们可以根据我们的需要使用它来创build新的Row
。
val rdd = df.rdd.zipWithIndex() .map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq)) val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields)) sqlContext.createDataFrame(rdd, newstructure ).show
我希望这会有所帮助。
您可以使用下面的Window函数的 row_number来获取数据框中每个行的不同的id。
df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))
你也可以像使用monotonically_increasing_id
一样使用
df.withColumn("ID", monotonically_increasing_id())
还有其他一些方法 。