RDD的 map和mapPartitions方法有什么mapPartitions ? mapPartitions map行为像map或像mapPartitions ? 谢谢。 (编辑)即两者之间有什么区别(语义上或执行上) def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) }, preservesPartitioning = true) } 和: def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = […]
我一直在面对“Spark Streaming”关于将输出Dstream插入永久性 SQL表的问题。 我想插入每个输出DStream(来自单个批处理,火花进程)到一个独特的表。 我一直在使用Spark版本1.6.2的Python。 在我的代码的这一部分,我有一个Dstream的一个或多个RDD,我想永久插入/存储到SQL表中,而不会丢失每个处理的批处理结果。 rr = feature_and_label.join(result_zipped)\ .map(lambda x: (x[1][0][0], x[1][1]) ) 这里的每个Dstream都被表示为这个元组的forms:( 4.0,0 )。 我不能使用SparkSQL,因为Spark对待'table'的方式就是像临时表一样 ,因此在每个批处理中都会丢失结果。 这是输出的一个例子: 时间:2016-09-23 00:57:00 (0.0,2) 时间:2016-09-23 00:57:01 (4.0,0) 时间:2016-09-23 00:57:02 (4.0,0) … 如上所示,每个批次仅由一个Dstream生成 。 正如我之前所说,我想将这些结果永久存储在某个地方保存的表中,并可能在稍后查询。 所以我的问题是:有没有办法做到这一点? 我很感激有人能帮我一把,但是特别告诉我这是否可能。 谢谢。
使用Scala,我怎样才能将dataFrame分割成多个dataFrame(无论是数组还是集合)具有相同的列值。 例如,我想拆分下面的DataFrame: ID Rate State 1 24 AL 2 35 MN 3 46 FL 4 34 AL 5 78 MN 6 99 FL 至: 数据集1 ID Rate State 1 24 AL 4 34 AL 数据集2 ID Rate State 2 35 MN 5 78 MN 数据集3 ID Rate State 3 46 FL 6 99 FL
如何处理分类数据 spark-ml 而不 spark-mllib ? 认为文档不是很清楚,看起来像RandomForestClassifier , LogisticRegression这样的分类器有一个featuresCol参数,它指定DataFrame列的名称,以及一个labelCol参数,它指定了标签类的列名在DataFrame 。 很显然,我想在预测中使用多个特征,所以我尝试使用VectorAssembler将所有特征放在featuresCol下的单个vector中。 然而, VectorAssembler只接受数字types,布尔types和向量types(根据Spark网站),所以我不能把string放入我的特征向量中。 我应该如何继续?
我有这样的RDD: 1 2 3 4 5 6 7 8 9 这是一个matrix。 现在我想转置这样的RDD: 1 4 7 2 5 8 3 6 9 我该怎么做?
我在线学习了一些教程,但是他们不能在OS X El Capitan(10.11)上使用Spark 1.5.1 , 基本上我已经运行这个命令下载apache-spark brew update brew install scala brew install apache-spark 更新.bash_profile # For a ipython notebook and pyspark integration if which pyspark > /dev/null; then export SPARK_HOME="/usr/local/Cellar/apache-spark/1.5.1/libexec/" export PYSPARK_SUBMIT_ARGS="–master local[2]" fi 跑 ipython profile create pyspark 创build了一个以这种方式configuration的启动文件~/.ipython/profile_pyspark/startup/00-pyspark-setup.py # Configure the necessary Spark environment import os import sys # Spark home […]
比方说,我们的团队已经selectPython作为用Spark开发的参考语言。 但后来出于性能方面的原因,我们想开发特定的Scala或Java特定的库,以便将它们与我们的Python代码(类似于具有Scala或Java框架的Python存根)进行映射。 难道你不觉得是否有可能通过一些Scala或Java用户定义函数来接口新的自定义Python方法?
我有最新版本的R – 3.2.1。 现在我想在R上安装SparkR。执行完后: > install.packages("SparkR") 我回来了: Installing package into '/home/user/R/x86_64-pc-linux-gnu-library/3.2' (as 'lib' is unspecified) Warning in install.packages : package 'SparkR' is not available (for R version 3.2.1) 我也在我的机器上安装了Spark Spark 1.4.0 我怎么能解决这个问题(实际上我使用RStudio或只是从terminal)
这是可能的,什么是最有效的整洁的方法添加一列到数据框? 更具体地说,列可以用作现有数据框的行ID。 在简单的情况下,从文件中读取,而不是标记它,我可以想到下面的东西(在斯卡拉),但它完成与错误(在第3行),反正看起来不是最好的路线可能: var dataDF = sc.textFile("path/file").toDF() val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") dataDF = dataDF.withColumn("ID", rowDF("ID"))
根据学习星火 请记住,重新分区您的数据是一个相当昂贵的操作。 Spark还有一个名为coalesce()的repartition()的优化版本,它允许避免数据移动,但只有在减lessRDD分区的数量的时候。 我得到的一个区别是,重新分区()分区的数量可以增加/减less,但与coalesce()分区的数量只能减less。 如果分区分散在多台机器上,并且运行coalesce(),它如何避免数据移动?