Tag: rdd

HashPartitioner如何工作?

我阅读了HashPartitioner的文档。 不幸的是,除了API调用之外,没有什么可解释的。 我假设HashPartitioner根据密钥的哈希来分配分布式集合。 例如,如果我的数据是 (1,1), (1,2), (1,3), (2,1), (2,2), (2,3) 所以分区器会把这个分区放到不同的分区中,同一个分区中的键也是一样的。 但是我不明白构造函数参数的意义 new HashPartitoner(numPartitions) //What does numPartitions do? 对于上面的数据集,如果我做了,结果会有什么不同 new HashPartitoner(1) new HashPartitoner(2) new HashPartitoner(10) 那么HashPartitioner究竟如何工作呢?

哪些操作保留RDD订单?

RDD有一个有意义的 (与存储模型施加的一些随机顺序相反),如果它是由sortBy()处理的,如本答复所述 。 现在,哪些操作保持这个顺序? 例如,它保证 (在a.sortBy() ) a.map(f).zip(a) === a.map(x => (f(x),x)) 怎么样 a.filter(f).map(g) === a.map(x => (x,g(x))).filter(f(_._1)).map(_._2) 关于什么 a.filter(f).flatMap(g) === a.flatMap(x => g(x).map((x,_))).filter(f(_._1)).map(_._2) 在这里,“平等” ===被理解为“function等同”,即无法通过用户级别的操作(即不读取日志&c)区分结果。

DAG如何在RDD中工作?

Spark研究论文已经在经典的Hadoop MapReduce上规定了一种新的分布式编程模型,声称在许多情况下,特别是在机器学习上,这种简化和巨大的性能提升。 然而,在本文中,用有向无环图揭示Resilient Distributed Datasets internal mechanics的材料似乎是缺乏的。 通过调查源代码是否应该更好地学习?

在Apache Spark Web UI中,“跳过舞台”是指什么?

从我的Spark UI。 这是什么意思跳过?

如何将rdd对象转换为spark中的dataframe

如何将RDD( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] )转换为Dataframe org.apache.spark.sql.DataFrame 。 我使用.rdd将dataframe转换为rdd。 处理完之后,我希望它回到数据框中。 我怎样才能做到这一点 ?

(为什么)我们需要调用caching还是坚持RDD

当从文本文件或集合(或从另一个RDD)创build弹性分布式数据集(RDD)时,是否需要显式调用“cache”或“persist”以将RDD数据存储到内存中? 或者默认情况下RDD数据是以分布的方式存储在内存中的? val textFile = sc.textFile("/user/emp.txt") 根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在所有/某些节点的内存中使用。 如果是这样,为什么我们需要在textFile RDD上调用“cache”或“persist”呢?

DataFrame(Spark 2.0中的DataSet )和Spark中的RDD之间的区别

我只是想知道在Apache Spark中RDD和DataFrame (Spark 2.0.0 DataFrame是Dataset[Row]types别名)之间的区别是什么? 你可以转换一个到另一个?

caching和持久性有什么区别?

就RDD持久性而言,spark cache()和persist()在spark中有什么区别?

PySpark DataFrames – 枚举的方式不转换为pandas?

我有一个非常大的名为df的pyspark.sql.dataframe.DataFrame 。 我需要一些枚举logging的方法,从而能够访问具有特定索引的logging。 (或select索引范围的logging组) pandas,我可以做 indexes=[2,3,6,7] df[indexes] 在这里我想要类似的东西(并且不把数据框转换成pandas) 我能find的最接近的是: 通过以下方式枚举原始数据框中的所有对象: indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes) 使用where()函数search我需要的值。 问题: 为什么它不工作,如何使它工作? 如何将一行添加到数据框? 稍后会做出如下的工作: indexes=[2,3,6,7] df1.where("index in indexes").collect() 任何更快,更简单的方法来处理它?

如何使用spark从hbase读取

下面的代码将从hbase中读取,然后将其转换为json结构并转换为schemaRDD,但问题是我using List来存储jsonstring,然后传递给javaRDD,对于大约100 GB的数据,master将会加载内存中的数据。 从hbase加载数据然后执行操作,然后转换为JavaRDD的正确方法是什么? package hbase_reader; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.JavaSchemaRDD; import org.apache.commons.cli.ParseException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import scala.Function1; import scala.Tuple2; import […]