更新spark中的dataframe列

看看新的spark数据框api,目前还不清楚是否可以修改dataframe列。

我将如何去改变数据框的行xy中的值?

pandas这将是df.ix[x,y] = new_value

编辑:合并下面说的,你不能修改现有的数据框,因为它是不可变的,但你可以返回一个新的数据框与所需的修改。

如果您只是想根据条件replace列中的值,如np.where

 from pyspark.sql import functions as F update_func = (F.when(F.col('update_col') == replace_val, new_value) .otherwise(F.col('update_col'))) df = df.withColumn('new_column_name', update_func) 

如果要对列执行一些操作并创build一个添加到数据框的新列:

 import pyspark.sql.functions as F import pyspark.sql.types as T def my_func(col): do stuff to column here return transformed_value # if we assume that my_func returns a string my_udf = F.UserDefinedFunction(my_func, T.StringType()) df = df.withColumn('new_column_name', my_udf('update_col')) 

如果您希望新列与旧列名称相同,则可以添加附加步骤:

 df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col') 

虽然您不能修改列,但您可以在列上操作并返回反映该更改的新DataFrame。 为此,您首先创build一个UserDefinedFunction实现要应用的操作,然后将该functionselect性地应用于目标列。 在Python中:

 from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import StringType name = 'target_column' udf = UserDefinedFunction(lambda x: 'new_value', StringType()) new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns]) 

new_df现在与new_df具有相同的模式(假设old_df.target_column的types也是StringType ),但是target_column列中的所有值都将是new_value

通常在更新列时,我们希望将旧值映射到新值。 这里有一个方法可以在没有UDF的情况下做到这一点:

 # update df[update_col], mapping old_value --> new_value from pyspark.sql import functions as F df = df.withColumn(update_col, F.when(df[update_col]==old_value,new_value). otherwise(df[update_col])). 

DataFrames基于RDD。 RDD是不可变的结构,不允许现场更新元素。 要更改值,您需要通过使用类似SQL的DSL或RDD操作(如map转换原始数据框来创build新的DataFrame。

强烈推荐的幻灯片: 在Spark中引入DataFrames用于大型数据科学 。

就像maasg说的,你可以从应用到旧的DataFrame的地图结果创build一个新的DataFrame。 具有两行的给定DataFrame df的示例:

 val newDf = sqlContext.createDataFrame(df.map(row => Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema) 

请注意,如果列的types发生更改,则需要为其提供正确的模式而不是df.schema 。 查看org.apache.spark.sql.Row的api获取可用的方法: https : //spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[更新]或者在Scala中使用UDF:

 import org.apache.spark.sql.functions._ val toLong = udf[Long, String] (_.toLong) val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName") 

如果列名需要保持不变,则可以将其重命名为:

 modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")