如何将C编译模块(例如,python-Levenshtein)发送到spark集群中的每个节点? 我知道我可以使用独立的python脚本(下面的示例代码)在spark中发布python文件: from pyspark import SparkContext sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'MyOtherFile.py']) 但是在没有“.py”的情况下,我该如何运送模块?
我有两个rdd's val tab_a: RDD[(String, String)]和val tab_b: RDD[(String, String)]我正在使用cogroup的数据集,如: val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } } 我正在使用tab_c cogrouped的值映射函数,它适用于小数据集,但在大数据集的情况下,它会抛出Out Of Memory exception 。 我已经尝试将最终值转换为RDD,但没有运气相同的错误 val newcos = spark.sparkContext.parallelize(tab_c) 1.如何将Cogroup用于大型数据集? 我们能坚持这个价值吗? 码 val source_primary_key = source.map(rec => (rec.split(",")(0), rec)) source_primary_key.persist(StorageLevel.DISK_ONLY) val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec)) destination_primary_key.persist(StorageLevel.DISK_ONLY) val cos […]
我试图运行2个函数在单个RDD上使用pyspark并行执行完全独立的转换。 有什么办法可以做同样的事情? def doXTransforms(sampleRDD): (X transforms) def doYTransforms(sampleRDD): (Y Transforms) if __name__ == "__main__": sc = SparkContext(appName="parallelTransforms") sqlContext = SQLContext(sc) hive_context = HiveContext(sc) rows_rdd = hive_context.sql("select * from tables.X_table") p1 = Process(target=doXTransforms , args=(rows_rdd,)) p1.start() p2 = Process(target=doYTransforms, args=(rows_rdd,)) p2.start() p1.join() p2.join() sc.stop() 这不起作用,我现在明白这是行不通的。 但是有没有其他方法可以使这项工作? 具体是否有任何python-spark特定的解决scheme?
我有一个非常大的名为df的pyspark.sql.dataframe.DataFrame 。 我需要一些枚举logging的方法,从而能够访问具有特定索引的logging。 (或select索引范围的logging组) pandas,我可以做 indexes=[2,3,6,7] df[indexes] 在这里我想要类似的东西(并且不把数据框转换成pandas) 我能find的最接近的是: 通过以下方式枚举原始数据框中的所有对象: indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes) 使用where()函数search我需要的值。 问题: 为什么它不工作,如何使它工作? 如何将一行添加到数据框? 稍后会做出如下的工作: indexes=[2,3,6,7] df1.where("index in indexes").collect() 任何更快,更简单的方法来处理它?
我们正在使用与Spark 1.3.1接口的PySpark库。 我们有两个数据框: documents_df := {document_id, document_text}和keywords_df := {keyword} 。 我们希望join两个数据框,并使用keyword_df.keyword出现在document_df.document_textstring中的条件返回带有{document_id, keyword}对的结果数据框。 例如,在PostgreSQL中,我们可以使用以下forms的ON子句来实现: document_df.document_text ilike '%' || keyword_df.keyword || '%' 然而,在PySpark中,我无法获得任何forms的连接语法。 有没有人做过这样的事情? 亲切的问候, 将
我正在评估用于生产基于ML的应用程序的工具,我们的一个选项是Spark MLlib,但是我有一些关于如何在培训之后为模型提供服务的问题? 例如,在Azure ML中,一旦训练完成,该模型将作为一种可从任何应用程序中使用的Web服务公开,这与Amazon ML也是类似的情况。 你如何在Apache Spark中部署ML模型?
上下文:我有一个DataFrame 2列:单词和vector。 其中“向量”的列types是VectorUDT 。 一个例子: word | vector assert | [435,323,324,212…] 我想得到这个: word | v1 | v2 | v3 | v4 | v5 | v6 …… assert | 435 | 5435| 698| 356|…. 题: 如何使用pyspark为每个维度在多个列中使用向量分隔列? 提前致谢
我来自pandas的背景,习惯于从CSV文件中读取数据到数据框中,然后使用简单的命令简单地将列名更改为有用的东西: df.columns = new_column_name_list 但是,在使用sqlContext创build的pyspark数据框中不起作用。 我可以想出的唯一解决scheme就是: df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt") oldSchema = df.schema for i,k in enumerate(oldSchema.fields): k.name = new_column_name_list[i] df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema) 这基本上定义了两次variables,首先推断模式,然后重命名列名,然后再次使用更新的模式加载数据框。 有没有比pandas更好更高效的方法来做到这一点? 我的火花版本是1.5.0
下面的代码将从hbase中读取,然后将其转换为json结构并转换为schemaRDD,但问题是我using List来存储jsonstring,然后传递给javaRDD,对于大约100 GB的数据,master将会加载内存中的数据。 从hbase加载数据然后执行操作,然后转换为JavaRDD的正确方法是什么? package hbase_reader; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.JavaSchemaRDD; import org.apache.commons.cli.ParseException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import scala.Function1; import scala.Tuple2; import […]
这是别人在另一个论坛上的问题的副本,从来没有回答,所以我想我会在这里再问一次,因为我有同样的问题。 (见http://geekple.com/blogs/feeds/Xgzu7/posts/351703064084736 ) 我已经正确地在我的机器上安装了Spark,并且能够在使用./bin/pyspark作为我的python解释器时,运行带有pyspark模块的python程序而不会出错。 但是,当我尝试运行常规Python shell时,当我尝试导入pyspark模块时,出现此错误: from pyspark import SparkContext 它说 "No module named pyspark". 我该如何解决这个问题? 是否有一个环境variables,我需要设置指向Python的pyspark标题/库/等? 如果我的火花装置是/ spark /,我需要包括哪些pysparkpath? 或者pyspark程序只能从pyspark解释器运行?