在PySpark中编码和组装多个特征

我有一个Python类,用来加载和处理Spark中的一些数据。 在我需要做的各种事情中,我将生成一个由Spark数据框中各个列派生的虚拟variables列表。 我的问题是,我不知道如何正确定义一个用户定义函数来完成我所需要的。

我现在有一种方法,当映射到基础dataframeRDD时,解决了一半的问题(请记住,这是一个更大的data_processor类中的方法):

 def build_feature_arr(self,table): # this dict has keys for all the columns for which I need dummy coding categories = {'gender':['1','2'], ..} # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file if table == 'users': iter_over = self.config.dyadic_features_to_include elif table == 'activty': iter_over = self.config.user_features_to_include def _build_feature_arr(row): result = [] row = row.asDict() for col in iter_over: column_value = str(row[col]).lower() cats = categories[col] result += [1 if column_value and cat==column_value else 0 for cat in cats] return result return _build_feature_arr 

实质上,对于指定的数据框,它将为指定列获取分类variables值,并返回这些新的虚拟variables值的列表。 这意味着下面的代码:

 data = data_processor(init_args) result = data.user_data.rdd.map(self.build_feature_arr('users')) 

返回类似于:

 In [39]: result.take(10) Out[39]: [[1, 0, 0, 0, 1, 0], [1, 0, 0, 1, 0, 0], [1, 0, 0, 0, 0, 0], [1, 0, 1, 0, 0, 0], [1, 0, 0, 1, 0, 0], [1, 0, 0, 1, 0, 0], [0, 1, 1, 0, 0, 0], [1, 0, 1, 1, 0, 0], [1, 0, 0, 1, 0, 0], [1, 0, 0, 0, 0, 1]] 

这正是我想要生成我想要的虚拟variables列表,但这里是我的问题:我怎样才能(a)使一个类似的function,我可以在一个Spark SQL查询中使用的UDF(或其他方式,我想),或者(b)从上述映射中获取RDD,并将其作为新列添加到user_data数据框中?

无论哪种方式,我需要做的是生成一个新的数据框,其中包含来自user_data的列,以及一个新的列(让我们称之为feature_array )包含上述function的输出(或function相当的东西)。

那么,你可以写一个UDF,但你为什么? 已经有相当多的工具来处理这类任务:

 from pyspark.sql import Row from pyspark.ml.linalg import DenseVector row = Row("gender", "foo", "bar") df = sc.parallelize([ row("0", 3.0, DenseVector([0, 2.1, 1.0])), row("1", 1.0, DenseVector([0, 1.1, 1.0])), row("1", -1.0, DenseVector([0, 3.4, 0.0])), row("0", -3.0, DenseVector([0, 4.1, 0.0])) ]).toDF() 

首先是StringIndexer

 from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df) indexed_df = indexer.transform(df) indexed_df.drop("bar").show() ## +------+----+--------------+ ## |gender| foo|gender_numeric| ## +------+----+--------------+ ## | 0| 3.0| 0.0| ## | 1| 1.0| 1.0| ## | 1|-1.0| 1.0| ## | 0|-3.0| 0.0| ## +------+----+--------------+ 

Next OneHotEncoder

 from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector") encoded_df = encoder.transform(indexed_df) encoded_df.drop("bar").show() ## +------+----+--------------+-------------+ ## |gender| foo|gender_numeric|gender_vector| ## +------+----+--------------+-------------+ ## | 0| 3.0| 0.0|(1,[0],[1.0])| ## | 1| 1.0| 1.0| (1,[],[])| ## | 1|-1.0| 1.0| (1,[],[])| ## | 0|-3.0| 0.0|(1,[0],[1.0])| ## +------+----+--------------+-------------+ 

VectorAssembler

 from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler( inputCols=["gender_vector", "bar", "foo"], outputCol="features") encoded_df_with_indexed_bar = (vector_indexer .fit(encoded_df) .transform(encoded_df)) final_df = assembler.transform(encoded_df) 

如果bar包含分类variables,则可以使用VectorIndexer来设置所需的元数据:

 from pyspark.ml.feature import VectorIndexer vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed") 

但情况并非如此。

最后你可以使用pipe道来包装所有的东西:

 from pyspark.ml import Pipeline pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler]) model = pipeline.fit(df) transformed = model.transform(df) 

可以说,它比从头开始编写所有东西都要健壮和干净。 有一些注意事项,尤其是当你需要不同的数据集之间的一致的编码。 您可以在StringIndexerVectorIndexer的官方文档中阅读更多内容。

另一种获得可比输出的方法是RFormula ,其中 :

RFormula产生一个向量列的特征和一个双列或串列标签。 就像在R中使用公式进行线性回归时一样,stringinput列将被热门编码,而数字列将被转换为双精度。 如果标签列是stringtypes的,则首先将其转换为StringIndexer两倍。 如果DataFrame中不存在标签列,则将使用公式中指定的响应variables创build输出标签列。

 from pyspark.ml.feature import RFormula rf = RFormula(formula="~ gender + bar + foo - 1") final_df_rf = rf.fit(df).transform(df) 

正如你可以看到它更简洁,但更难编写,不允许太多的定制。 尽pipe如此,像这样一个简单的pipe道的结果将是相同的:

 final_df_rf.select("features").show(4, False) ## +----------------------+ ## |features | ## +----------------------+ ## |[1.0,0.0,2.1,1.0,3.0] | ## |[0.0,0.0,1.1,1.0,1.0] | ## |(5,[2,4],[3.4,-1.0]) | ## |[1.0,0.0,4.1,0.0,-3.0]| ## +----------------------+ final_df.select("features").show(4, False) ## +----------------------+ ## |features | ## +----------------------+ ## |[1.0,0.0,2.1,1.0,3.0] | ## |[0.0,0.0,1.1,1.0,1.0] | ## |(5,[2,4],[3.4,-1.0]) | ## |[1.0,0.0,4.1,0.0,-3.0]| ## +----------------------+ 

关于你的问题:

做一个类似的function,我可以在一个Spark SQL查询中使用的UDF(或者其他方式,我想)

这只是一个像其他任何UDF。 确保你使用支持的types,除此之外,一切都应该工作得很好。

从上述映射中获取RDD并将其作为新列添加到user_data数据框?

 from pyspark.ml.linalg import VectorUDT from pyspark.sql.types import StructType, StructField schema = StructType([StructField("features", VectorUDT(), True)]) row = Row("features") result.map(lambda x: row(DenseVector(x))).toDF(schema) 

注意

对于Spark 1.x, pyspark.ml.linalg使用pyspark.mllib.linalgreplacepyspark.mllib.linalg