Tag: spark dataframe

在Spark DataFrame中从单个列派生多个列

我有一个巨大的可parsing元数据的DF作为一个数据框中的单个string列,让我们称之为DFA,ColmnA。 我想通过函数ClassXYZ = Func1(ColmnA)将此列ColmnA分成多个列。 这个函数返回一个具有多个variables的ClassXYZ类,每个variables现在都必须映射到新的Column,比如ColmnA1,ColmnA2等。 我如何通过调用这个Func1一次来从1个Dataframe到另外一个列进行这样的转换,而不必重复它来创build所有的列。 如果我每次都要调用这个巨大的函数来添加一个新的列,那么这很容易解决,但这是我想避免的。 请告知工作或伪代码。 谢谢 桑杰

更新spark中的dataframe列

看看新的spark数据框api,目前还不清楚是否可以修改dataframe列。 我将如何去改变数据框的行x列y中的值? 在pandas这将是df.ix[x,y] = new_value 编辑:合并下面说的,你不能修改现有的数据框,因为它是不可变的,但你可以返回一个新的数据框与所需的修改。 如果您只是想根据条件replace列中的值,如np.where : from pyspark.sql import functions as F update_func = (F.when(F.col('update_col') == replace_val, new_value) .otherwise(F.col('update_col'))) df = df.withColumn('new_column_name', update_func) 如果要对列执行一些操作并创build一个添加到数据框的新列: import pyspark.sql.functions as F import pyspark.sql.types as T def my_func(col): do stuff to column here return transformed_value # if we assume that my_func returns a string my_udf = F.UserDefinedFunction(my_func, […]

在Spark Scala中重命名数据框的列名

我试图转换Spark-scala中所有的DataFrame的Headers / ColumnNames。 截至目前我拿出下面的代码,只取代一个单一的名字。 请帮忙。 for( i <- 0 to origCols.length – 1){df.withColumnRenamed(df.columns(i),df.columns(i).toLowerCase);}

在Apache Spark中将Dataframe的列值提取为List

我想将一个数据框的string列转换为列表。 我可以从Dataframe APIfind的是RDD,所以我试着先将它转换回RDD,然后将toArray函数应用于RDD。 在这种情况下,长度和SQL工作得很好。 不过,我从RDD获得的结果在每个元素周围都有方括号[A00001] 。 我想知道是否有一个适当的方式将列转换为列表或删除方括号的方法。 任何build议,将不胜感激。 谢谢!

如何添加一个新的列到Spark DataFrame(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1),并想添加一个新的列。 我已经尝试了以下没有任何成功: type(randomed_hours) # => list # Create in Python and transform to RDD new_col = pd.DataFrame(randomed_hours, columns=['new_col']) spark_new_col = sqlContext.createDataFrame(new_col) my_df_spark.withColumn("hours", spark_new_col["new_col"]) 还有一个错误使用这个: my_df_spark.withColumn("hours", sc.parallelize(randomed_hours)) 那么如何使用PySpark将新的列(基于Python向量)添加到现有的DataFrame?

如何select每个组的第一行?

我有一个DataFrame生成如下: df.groupBy($"Hour", $"Category") .agg(sum($"value") as "TotalValue") .sort($"Hour".asc, $"TotalValue".desc)) 结果如下所示: +—-+——–+———-+ |Hour|Category|TotalValue| +—-+——–+———-+ | 0| cat26| 30.9| | 0| cat13| 22.1| | 0| cat95| 19.6| | 0| cat105| 1.3| | 1| cat67| 28.5| | 1| cat4| 26.8| | 1| cat13| 12.6| | 1| cat23| 5.3| | 2| cat56| 39.6| | 2| cat40| 29.7| | 2| cat187| […]

如何将rdd对象转换为spark中的dataframe

如何将RDD( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] )转换为Dataframe org.apache.spark.sql.DataFrame 。 我使用.rdd将dataframe转换为rdd。 处理完之后,我希望它回到数据框中。 我怎样才能做到这一点 ?

DataFrame(Spark 2.0中的DataSet )和Spark中的RDD之间的区别

我只是想知道在Apache Spark中RDD和DataFrame (Spark 2.0.0 DataFrame是Dataset[Row]types别名)之间的区别是什么? 你可以转换一个到另一个?

如何将每个DStream保存/插入到永久表中

我一直在面对“Spark Streaming”关于将输出Dstream插入永久性 SQL表的问题。 我想插入每个输出DStream(来自单个批处理,火花进程)到一个独特的表。 我一直在使用Spark版本1.6.2的Python。 在我的代码的这一部分,我有一个Dstream的一个或多个RDD,我想永久插入/存储到SQL表中,而不会丢失每个处理的批处理结果。 rr = feature_and_label.join(result_zipped)\ .map(lambda x: (x[1][0][0], x[1][1]) ) 这里的每个Dstream都被表示为这个元组的forms:( 4.0,0 )。 我不能使用SparkSQL,因为Spark对待'table'的方式就是像临时表一样 ,因此在每个批处理中都会丢失结果。 这是输出的一个例子: 时间:2016-09-23 00:57:00 (0.0,2) 时间:2016-09-23 00:57:01 (4.0,0) 时间:2016-09-23 00:57:02 (4.0,0) … 如上所示,每个批次仅由一个Dstream生成 。 正如我之前所说,我想将这些结果永久存储在某个地方保存的表中,并可能在稍后查询。 所以我的问题是:有没有办法做到这一点? 我很感激有人能帮我一把,但是特别告诉我这是否可能。 谢谢。

DataFrame连接优化 – 广播散列连接

我试图有效地join两个dataframe,其中一个是大的,第二个是小一点。 有没有办法避免所有这些洗牌? 我不能设置autoBroadCastJoinThreshold ,因为它只支持整数 – 而我试图广播的表略大于整数字节数。 有没有办法强制广播忽略这个variables?