更新spark中的dataframe列
看看新的spark数据框api,目前还不清楚是否可以修改dataframe列。
我将如何去改变数据框的行x
列y
中的值?
在pandas
这将是df.ix[x,y] = new_value
编辑:合并下面说的,你不能修改现有的数据框,因为它是不可变的,但你可以返回一个新的数据框与所需的修改。
如果您只是想根据条件replace列中的值,如np.where
:
from pyspark.sql import functions as F update_func = (F.when(F.col('update_col') == replace_val, new_value) .otherwise(F.col('update_col'))) df = df.withColumn('new_column_name', update_func)
如果要对列执行一些操作并创build一个添加到数据框的新列:
import pyspark.sql.functions as F import pyspark.sql.types as T def my_func(col): do stuff to column here return transformed_value # if we assume that my_func returns a string my_udf = F.UserDefinedFunction(my_func, T.StringType()) df = df.withColumn('new_column_name', my_udf('update_col'))
如果您希望新列与旧列名称相同,则可以添加附加步骤:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
虽然您不能修改列,但您可以在列上操作并返回反映该更改的新DataFrame。 为此,您首先创build一个UserDefinedFunction
实现要应用的操作,然后将该functionselect性地应用于目标列。 在Python中:
from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import StringType name = 'target_column' udf = UserDefinedFunction(lambda x: 'new_value', StringType()) new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
现在与new_df
具有相同的模式(假设old_df.target_column
的types也是StringType
),但是target_column
列中的所有值都将是new_value
。
通常在更新列时,我们希望将旧值映射到新值。 这里有一个方法可以在没有UDF的情况下做到这一点:
# update df[update_col], mapping old_value --> new_value from pyspark.sql import functions as F df = df.withColumn(update_col, F.when(df[update_col]==old_value,new_value). otherwise(df[update_col])).
DataFrames
基于RDD。 RDD是不可变的结构,不允许现场更新元素。 要更改值,您需要通过使用类似SQL的DSL或RDD操作(如map
转换原始数据框来创build新的DataFrame。
强烈推荐的幻灯片: 在Spark中引入DataFrames用于大型数据科学 。
就像maasg说的,你可以从应用到旧的DataFrame的地图结果创build一个新的DataFrame。 具有两行的给定DataFrame df
的示例:
val newDf = sqlContext.createDataFrame(df.map(row => Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
请注意,如果列的types发生更改,则需要为其提供正确的模式而不是df.schema
。 查看org.apache.spark.sql.Row
的api获取可用的方法: https : //spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[更新]或者在Scala中使用UDF:
import org.apache.spark.sql.functions._ val toLong = udf[Long, String] (_.toLong) val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
如果列名需要保持不变,则可以将其重命名为:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")