Tag: apache spark sql

在Spark DataFrame中从单个列派生多个列

我有一个巨大的可parsing元数据的DF作为一个数据框中的单个string列,让我们称之为DFA,ColmnA。 我想通过函数ClassXYZ = Func1(ColmnA)将此列ColmnA分成多个列。 这个函数返回一个具有多个variables的ClassXYZ类,每个variables现在都必须映射到新的Column,比如ColmnA1,ColmnA2等。 我如何通过调用这个Func1一次来从1个Dataframe到另外一个列进行这样的转换,而不必重复它来创build所有的列。 如果我每次都要调用这个巨大的函数来添加一个新的列,那么这很容易解决,但这是我想避免的。 请告知工作或伪代码。 谢谢 桑杰

更新spark中的dataframe列

看看新的spark数据框api,目前还不清楚是否可以修改dataframe列。 我将如何去改变数据框的行x列y中的值? 在pandas这将是df.ix[x,y] = new_value 编辑:合并下面说的,你不能修改现有的数据框,因为它是不可变的,但你可以返回一个新的数据框与所需的修改。 如果您只是想根据条件replace列中的值,如np.where : from pyspark.sql import functions as F update_func = (F.when(F.col('update_col') == replace_val, new_value) .otherwise(F.col('update_col'))) df = df.withColumn('new_column_name', update_func) 如果要对列执行一些操作并创build一个添加到数据框的新列: import pyspark.sql.functions as F import pyspark.sql.types as T def my_func(col): do stuff to column here return transformed_value # if we assume that my_func returns a string my_udf = F.UserDefinedFunction(my_func, […]

在Apache Spark中将Dataframe的列值提取为List

我想将一个数据框的string列转换为列表。 我可以从Dataframe APIfind的是RDD,所以我试着先将它转换回RDD,然后将toArray函数应用于RDD。 在这种情况下,长度和SQL工作得很好。 不过,我从RDD获得的结果在每个元素周围都有方括号[A00001] 。 我想知道是否有一个适当的方式将列转换为列表或删除方括号的方法。 任何build议,将不胜感激。 谢谢!

如何添加一个新的列到Spark DataFrame(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1),并想添加一个新的列。 我已经尝试了以下没有任何成功: type(randomed_hours) # => list # Create in Python and transform to RDD new_col = pd.DataFrame(randomed_hours, columns=['new_col']) spark_new_col = sqlContext.createDataFrame(new_col) my_df_spark.withColumn("hours", spark_new_col["new_col"]) 还有一个错误使用这个: my_df_spark.withColumn("hours", sc.parallelize(randomed_hours)) 那么如何使用PySpark将新的列(基于Python向量)添加到现有的DataFrame?

如何在Spark SQL中按降序排列?

我试过df.orderBy("col1").show(10)但它按升序sorting。 df.sort("col1").show(10)也按降序排列。 我查看了stackoverflow,我发现的答案都已经过时或被引用到RDD 。 我想在spark中使用本地数据框。

如何select每个组的第一行?

我有一个DataFrame生成如下: df.groupBy($"Hour", $"Category") .agg(sum($"value") as "TotalValue") .sort($"Hour".asc, $"TotalValue".desc)) 结果如下所示: +—-+——–+———-+ |Hour|Category|TotalValue| +—-+——–+———-+ | 0| cat26| 30.9| | 0| cat13| 22.1| | 0| cat95| 19.6| | 0| cat105| 1.3| | 1| cat67| 28.5| | 1| cat4| 26.8| | 1| cat13| 12.6| | 1| cat23| 5.3| | 2| cat56| 39.6| | 2| cat40| 29.7| | 2| cat187| […]

使用Spark 1.4.0和Tachyon 0.6.4使用OFF_HEAP存储时出错

我试图坚持我的RDD使用堆存储在火花1.4.0和tachyon 0.6.4这样做: val a = sqlContext.parquetFile("a1.parquet") a.persist(org.apache.spark.storage.StorageLevel.OFF_HEAP) a.count() 之后我得到以下例外。 任何想法呢? 15/06/16 10:14:53 INFO : Tachyon client (version 0.6.4) is trying to connect master @ localhost/127.0.0.1:19998 15/06/16 10:14:53 INFO : User registered at the master localhost/127.0.0.1:19998 got UserId 3 15/06/16 10:14:53 INFO TachyonBlockManager: Created tachyon directory at /tmp_spark_tachyon/spark-6b2512ab-7bb8-47ca-b6e2-8023d3d7f7dc/driver/spark-tachyon-20150616101453-ded3 15/06/16 10:14:53 INFO BlockManagerInfo: Added rdd_10_3 on ExternalBlockStore […]

如何定义DataFrame的分区?

我已经开始在Spark 1.4.0中使用Spark SQL和DataFrame。 我想要在Scala中的DataFrames上定义一个自定义分区,但没有看到如何做到这一点。 我正在处理的其中一个数据表包含一个交易清单,按照以下示例,帐户silimar。 Account Date Type Amount 1001 2014-04-01 Purchase 100.00 1001 2014-04-01 Purchase 50.00 1001 2014-04-05 Purchase 70.00 1001 2014-04-01 Payment -150.00 1002 2014-04-01 Purchase 80.00 1002 2014-04-02 Purchase 22.00 1002 2014-04-04 Payment -120.00 1002 2014-04-04 Purchase 60.00 1003 2014-04-02 Purchase 210.00 1003 2014-04-03 Purchase 15.00 至less在最初,大部分计算将发生在账户内的交易之间。 所以我想分区的数据,以便一个帐户的所有交易在同一个Spark分区。 但我没有看到一个方法来定义这个。 DataFrame类有一个名为“repartition(Int)”的方法,您可以在其中指定要创build的分区数。 但是我没有看到任何可用于为DataFrame定义定制分区的方法,例如可以为RDD指定的方法。 源数据存储在Parquet中。 […]

如何更改Spark SQL的DataFrame中的列types?

假设我正在做这样的事情: val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true")) df.printSchema() root |– year: string (nullable = true) |– make: string (nullable = true) |– model: string (nullable = true) |– comment: string (nullable = true) |– blank: string (nullable = true) df.show() year make model comment blank 2012 Tesla S No comment 1997 […]

如何将COGROUP用于大型数据集

我有两个rdd's val tab_a: RDD[(String, String)]和val tab_b: RDD[(String, String)]我正在使用cogroup的数据集,如: val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } } 我正在使用tab_c cogrouped的值映射函数,它适用于小数据集,但在大数据集的情况下,它会抛出Out Of Memory exception 。 我已经尝试将最终值转换为RDD,但没有运气相同的错误 val newcos = spark.sparkContext.parallelize(tab_c) 1.如何将Cogroup用于大型数据集? 我们能坚持这个价值吗? 码 val source_primary_key = source.map(rec => (rec.split(",")(0), rec)) source_primary_key.persist(StorageLevel.DISK_ONLY) val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec)) destination_primary_key.persist(StorageLevel.DISK_ONLY) val cos […]