PySpark DataFrames – 枚举的方式不转换为pandas?
我有一个非常大的名为df的pyspark.sql.dataframe.DataFrame 。 我需要一些枚举logging的方法,从而能够访问具有特定索引的logging。 (或select索引范围的logging组)
pandas,我可以做
indexes=[2,3,6,7] df[indexes]
在这里我想要类似的东西(并且不把数据框转换成pandas)
我能find的最接近的是:
-
通过以下方式枚举原始数据框中的所有对象:
indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes)
- 使用where()函数search我需要的值。
问题:
- 为什么它不工作,如何使它工作? 如何将一行添加到数据框?
-
稍后会做出如下的工作:
indexes=[2,3,6,7] df1.where("index in indexes").collect()
-
任何更快,更简单的方法来处理它?
这是行不通的,因为:
-
withColumn
的第二个参数应该是一个Column
而不是一个集合。np.array
不会在这里工作 - 当您将
"index in indexes"
作为SQLexpression式传递到indexes
超出作用域且不作为有效标识符parsing的位置时
PySpark> = 1.4.0
您可以使用相应的窗口函数添加行号并使用 Column.isin
方法进行查询,或者使用适当格式的查询string:
from pyspark.sql.functions import col, rowNumber from pyspark.sql.window import Window w = Window.orderBy() indexed = df.withColumn("index", rowNumber().over(w)) # Using DSL indexed.where(col("index").isin(set(indexes))) # Using SQL expression indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
它看起来像没有PARTITION BY
子句调用的窗口函数将所有数据移动到单个分区,所以上面可能不是最好的解决scheme。
任何更快,更简单的方法来处理它?
不是真的。 Spark DataFrames不支持随机行访问。
如果使用HashPartitioner
数据进行分区, PairedRDD
可以使用lookup
方法进行访问。 还有索引rdd项目,支持高效查找。
编辑 :
独立PySpark版本,你可以尝试这样的事情:
from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType row = Row("char") row_with_index = Row("char", "index") df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF() df.show(5) ## +----+ ## |char| ## +----+ ## | a| ## | b| ## | c| ## | d| ## | e| ## +----+ ## only showing top 5 rows # This part is not tested but should work and save some work later schema = StructType( df.schema.fields[:] + [StructField("index", LongType(), False)]) indexed = (df.rdd # Extract rdd .zipWithIndex() # Add index .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows .toDF(schema)) # It will work without schema but will be more expensive # inSet in Spark < 1.3 indexed.where(col("index").isin(indexes))
如果你想保证不会碰撞但不需要.over(partitionBy())
的数字范围,那么你可以使用monotonicallyIncreasingId()
。
from pyspark.sql.functions import monotonicallyIncreasingId df.select(monotonicallyIncreasingId().alias("rowId"),"*")
请注意,这些值并不是特别的“整齐”。 每个分区都有一个数值范围,输出不会连续。 例如0, 1, 2, 8589934592, 8589934593, 8589934594
。
这是2015年4月28日添加到Spark在这里: https : //github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2
你当然可以添加一个索引数组,确实是你select的一个数组:在Scala中,首先我们需要创build一个索引数组:
val index_array=(1 to df.count.toInt).toArray index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
你现在可以将这个列添加到你的DF。 首先,为此,您需要打开DF并将其作为数组获取,然后使用index_array压缩它,然后将新数组转换回RDD。 最后一步是把它作为DF:
final_df = sc.parallelize((df.collect.map( x=>(x(0),x(1))) zip index_array).map( x=>(x._1._1.toString,x._1._2.toString,x._2))). toDF("column_name")
之后索引会更清晰。