Tag: spark

如何在Spark DataFrame中添加一个常量列?

我想在DataFrame添加一个任意值的列(每行都是一样的)。 我在使用withColumn时出现错误,如下所示: dt.withColumn('new_column', 10).head(5) ————————————————————————— AttributeError Traceback (most recent call last) <ipython-input-50-a6d0257ca2be> in <module>() 1 dt = (messages 2 .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt"))) —-> 3 dt.withColumn('new_column', 10).head(5) /Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col) 1166 [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)] 1167 """ -> 1168 return self.select('*', col.alias(colName)) 1169 1170 @ignore_unicode_prefix AttributeError: 'int' object has no attribute 'alias' […]

如何运行2个函数在单个RDD上使用pyspark并行执行完全独立的转换?

我试图运行2个函数在单个RDD上使用pyspark并行执行完全独立的转换。 有什么办法可以做同样的事情? def doXTransforms(sampleRDD): (X transforms) def doYTransforms(sampleRDD): (Y Transforms) if __name__ == "__main__": sc = SparkContext(appName="parallelTransforms") sqlContext = SQLContext(sc) hive_context = HiveContext(sc) rows_rdd = hive_context.sql("select * from tables.X_table") p1 = Process(target=doXTransforms , args=(rows_rdd,)) p1.start() p2 = Process(target=doYTransforms, args=(rows_rdd,)) p2.start() p1.join() p2.join() sc.stop() 这不起作用,我现在明白这是行不通的。 但是有没有其他方法可以使这项工作? 具体是否有任何python-spark特定的解决scheme?

如何将Vector分割成列 – 使用PySpark

上下文:我有一个DataFrame 2列:单词和vector。 其中“向量”的列types是VectorUDT 。 一个例子: word | vector assert | [435,323,324,212…] 我想得到这个: word | v1 | v2 | v3 | v4 | v5 | v6 …… assert | 435 | 5435| 698| 356|…. 题: 如何使用pyspark为每个维度在多个列中使用向量分隔列? 提前致谢