PySpark DataFrames – 枚举的方式不转换为pandas?

我有一个非常大的名为df的pyspark.sql.dataframe.DataFrame 。 我需要一些枚举logging的方法,从而能够访问具有特定索引的logging。 (或select索引范围的logging组)

pandas,我可以做

indexes=[2,3,6,7] df[indexes] 

在这里我想要类似的东西(并且不把数据框转换成pandas)

我能find的最接近的是:

  • 通过以下方式枚举原始数据框中的所有对象:

     indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes) 
    • 使用where()函数search我需要的值。

问题:

  1. 为什么它不工作,如何使它工作? 如何将一行添加到数据框?
  2. 稍后会做出如下的工作:

      indexes=[2,3,6,7] df1.where("index in indexes").collect() 
  3. 任何更快,更简单的方法来处理它?

这是行不通的,因为:

  1. withColumn的第二个参数应该是一个Column而不是一个集合。 np.array不会在这里工作
  2. 当您将"index in indexes"作为SQLexpression式传递到indexes超出作用域且不作为有效标识符parsing的位置时

PySpark> = 1.4.0

您可以使用相应的窗口函数添加行号并使用Column.isin方法进行查询,或者使用适当格式的查询string:

 from pyspark.sql.functions import col, rowNumber from pyspark.sql.window import Window w = Window.orderBy() indexed = df.withColumn("index", rowNumber().over(w)) # Using DSL indexed.where(col("index").isin(set(indexes))) # Using SQL expression indexed.where("index in ({0})".format(",".join(str(x) for x in indexes))) 

它看起来像没有PARTITION BY子句调用的窗口函数将所有数据移动到单个分区,所以上面可能不是最好的解决scheme。

任何更快,更简单的方法来处理它?

不是真的。 Spark DataFrames不支持随机行访问。

如果使用HashPartitioner数据进行分区, PairedRDD可以使用lookup方法进行访问。 还有索引rdd项目,支持高效查找。

编辑

独立PySpark版本,你可以尝试这样的事情:

 from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType row = Row("char") row_with_index = Row("char", "index") df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF() df.show(5) ## +----+ ## |char| ## +----+ ## | a| ## | b| ## | c| ## | d| ## | e| ## +----+ ## only showing top 5 rows # This part is not tested but should work and save some work later schema = StructType( df.schema.fields[:] + [StructField("index", LongType(), False)]) indexed = (df.rdd # Extract rdd .zipWithIndex() # Add index .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows .toDF(schema)) # It will work without schema but will be more expensive # inSet in Spark < 1.3 indexed.where(col("index").isin(indexes)) 

如果你想保证不会碰撞但不需要.over(partitionBy())的数字范围,那么你可以使用monotonicallyIncreasingId()

 from pyspark.sql.functions import monotonicallyIncreasingId df.select(monotonicallyIncreasingId().alias("rowId"),"*") 

请注意,这些值并不是特别的“整齐”。 每个分区都有一个数值范围,输出不会连续。 例如0, 1, 2, 8589934592, 8589934593, 8589934594

这是2015年4月28日添加到Spark在这里: https : //github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2

你当然可以添加一个索引数组,确实是你select的一个数组:在Scala中,首先我们需要创build一个索引数组:

 val index_array=(1 to df.count.toInt).toArray index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 

你现在可以将这个列添加到你的DF。 首先,为此,您需要打开DF并将其作为数组获取,然后使用index_array压缩它,然后将新数组转换回RDD。 最后一步是把它作为DF:

 final_df = sc.parallelize((df.collect.map( x=>(x(0),x(1))) zip index_array).map( x=>(x._1._1.toString,x._1._2.toString,x._2))). toDF("column_name") 

之后索引会更清晰。