我在HDFS有一个巨大的文件,具有时间序列数据点(雅虎股票价格)。 我想要find时间序列的移动平均线,我该如何着手编写Apache Spark作业来做到这一点。
我比Python更喜欢Python。 但是,由于Spark本身就是用Scala编写的,所以我期望我的代码在Scala中运行得比Python版本更快,原因很明显。 有了这个假设,我想学习和写一些非常普通的预处理代码的Scala版本的一些1 GB的数据。 数据来自Kaggle的SpringLeaf竞赛。 只是给出了数据的概述(它包含1936年的维度和145232行)。 数据由各种types组成,如int,float,string,boolean。 我正在使用6个核心中的8个进行Spark处理; 这就是为什么我使用minPartitions=6以便每个核心都有可处理的东西。 斯卡拉代码 val input = sc.textFile("train.csv", minPartitions=6) val input2 = input.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } val delim1 = "\001" def separateCols(line: String): Array[String] = { val line2 = line.replaceAll("true", "1") val line3 = line2.replaceAll("false", "0") val vals: Array[String] = […]
我正试图在Windows上设置Apache Spark。 经过一番search,我明白独立模式是我想要的。 我下载哪些二进制文件,以便在Windows中运行Apache spark? 我在火花下载页面看到hadoop和cdh的发行版。 我没有在这个网站的参考。 一步一步的指导,这是高度赞赏。
我如何增加可用于Apache火花执行器节点的内存? 我有一个适合加载到Apache Spark的2 GB文件。 我在一台机器上运行apache spark,所以驱动程序和执行程序在同一台机器上。 机器有8 GB的内存。 当我设置要caching在内存中的文件后,尝试统计文件的行,我得到这些错误: 2014-10-25 22:25:12 WARN CacheManager:71 – Not enough space to cache partition rdd_1_1 in memory! Free memory is 278099801 bytes. 我查看了这里的文档,并在$SPARK_HOME/conf/spark-defaults.conf设置为4g UI显示这个variables是在Spark环境中设置的。 你可以在这里find截图 但是当我去执行选项卡时 ,我单个执行程序的内存限制仍设置为265.4 MB。 我也仍然得到相同的错误。 我尝试了这里提到的各种各样的东西,但是我仍然得到错误,并且没有清楚的想法,我应该改变设置。 我正在从spark-shell交互式地运行我的代码
我正在尝试在JSON文件上创build一个LDA模型。 用JSON文件创build一个spark上下文: import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder .master("local") .appName("my-spark-app") .config("spark.some.config.option", "config-value") .getOrCreate() val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt") 显示df应该显示DataFrame display(df) 标记文本 import org.apache.spark.ml.feature.RegexTokenizer // Set params for RegexTokenizer val tokenizer = new RegexTokenizer() .setPattern("[\\W_]+") .setMinTokenLength(4) // Filter away tokens with length < 4 .setInputCol("text") .setOutputCol("tokens") // Tokenize document val tokenized_df = tokenizer.transform(df) 这应该显示tokenized_df display(tokenized_df) 获取stopwords %sh […]
据此 Catalyst使用逻辑优化(如谓词下推)。 优化器可以将筛选谓词向下推送到数据源中,使物理执行跳过不相关的数据。 Spark支持将谓词按下到数据源。 此function是否也可用于JDBC? (从检查数据库日志,我可以看到它不是现在的默认行为 – 完整的查询被传递给数据库,即使它后来被火花filter限制) 更多细节 使用PostgreSQL 9.4运行Spark 1.5 代码片段: from pyspark import SQLContext, SparkContext, Row, SparkConf from data_access.data_access_db import REMOTE_CONNECTION sc = SparkContext() sqlContext = SQLContext(sc) url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION) sql = "dummy" df = sqlContext.read.jdbc(url=url, table=sql) df = df.limit(1) df.show() SQL跟踪: < 2015-09-15 07:11:37.718 EDT >LOG: execute <unnamed>: SET extra_float_digits = […]
我试图有效地join两个dataframe,其中一个是大的,第二个是小一点。 有没有办法避免所有这些洗牌? 我不能设置autoBroadCastJoinThreshold ,因为它只支持整数 – 而我试图广播的表略大于整数字节数。 有没有办法强制广播忽略这个variables?
这个问题的目的是logging: 在PySpark中使用JDBC连接读取和写入数据所需的步骤 JDBC源和可能的解决scheme可能存在的问题 只需稍作更改,这些方法就可以与Scala和R等其他支持的语言一起工作。
假设我们有DataFrame df由以下列组成: 名称,姓氏,大小,宽度,长度,称重 现在我们要执行一些操作,例如我们想创build一些包含大小和宽度数据的DataFrame。 val df1 = df.groupBy("surname").agg( sum("size") ) val df2 = df.groupBy("surname").agg( sum("width") ) 正如您可以注意到的那样,其他列(如Length)不会在任何地方使用。 Spark是否足够聪明,可以在混洗阶段之前删除多余的列,或者他们是否随身携带? Wil跑步: val dfBasic = df.select("surname", "size", "width") 之前分组莫名其妙地影响性能?
我有三个stringtypes的数组包含以下信息: groupBy数组:包含我想要按数据分组的列的名称。 聚合数组:包含我要聚合的列的名称。 操作数组:包含我想要执行的聚合操作 我正在尝试使用火花数据框来实现这一点。 Spark数据框提供了一个可以传递Map [String,String](列名和相应的聚合操作)作为input的agg(),但是我想对同一列数据执行不同的聚合操作。 有关如何实现这一目标的任何build议?