Tag: rdd

Apache Spark:map vs mapPartitions?

RDD的 map和mapPartitions方法有什么mapPartitions ? mapPartitions map行为像map或像mapPartitions ? 谢谢。 (编辑)即两者之间有什么区别(语义上或执行上) def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) }, preservesPartitioning = true) } 和: def map[A, B](rdd: RDD[A], fn: (A => B)) (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = […]

如何在Spark中转置RDD

我有这样的RDD: 1 2 3 4 5 6 7 8 9 这是一个matrix。 现在我想转置这样的RDD: 1 4 7 2 5 8 3 6 9 我该怎么做?

Spark – 重新分区()vs coalesce()

根据学习星火 请记住,重新分区您的数据是一个相当昂贵的操作。 Spark还有一个名为coalesce()的repartition()的优化版本,它允许避免数据移动,但只有在减lessRDD分区的数量的时候。 我得到的一个区别是,重新分区()分区的数量可以增加/减less,但与coalesce()分区的数量只能减less。 如果分区分散在多台机器上,并且运行coalesce(),它如何避免数据移动?

斯卡拉vs Python的Spark性能

我比Python更喜欢Python。 但是,由于Spark本身就是用Scala编写的,所以我期望我的代码在Scala中运行得比Python版本更快,原因很明显。 有了这个假设,我想学习和写一些非常普通的预处理代码的Scala版本的一些1 GB的数据。 数据来自Kaggle的SpringLeaf竞赛。 只是给出了数据的概述(它包含1936年的维度和145232行)。 数据由各种types组成,如int,float,string,boolean。 我正在使用6个核心中的8个进行Spark处理; 这就是为什么我使用minPartitions=6以便每个核心都有可处理的东西。 斯卡拉代码 val input = sc.textFile("train.csv", minPartitions=6) val input2 = input.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } val delim1 = "\001" def separateCols(line: String): Array[String] = { val line2 = line.replaceAll("true", "1") val line3 = line2.replaceAll("false", "0") val vals: Array[String] = […]

Apache Spark中的案例类相等

为什么Spark中的模式匹配与Scala中的模式匹配不一样? 看下面的例子…函数f()尝试模式匹配的类,它在Scala REPL中工作,但在Spark中失败,并导致所有“???”。 f2()是一种解决方法,它使用.isInstanceOf()在Spark中获得所需的结果,但是我明白在Scala中是不好的forms。 任何帮助模式匹配在这种情况下在火花正确的方式将不胜感激。 abstract class a extends Serializable {val a: Int} case class b(a: Int) extends a case class bNull(a: Int=0) extends a val x: List[a] = List(b(0), b(1), bNull()) val xRdd = sc.parallelize(x) 尝试在Scala REPL中工作的模式匹配,但在Spark中失败 def f(x: a) = x match { case b(n) => "b" case bNull(n) => "bnull" case _ […]

我如何将RDD分成两个或更多的RDD?

我正在寻找一种将RDD分成两个或更多RDD的方法。 我见过的最接近的是斯卡拉星火:分解成几个RDD? 这仍然是一个单一的RDD。 如果你对SAS很熟悉,像这样: data work.split1, work.split2; set work.preSplit; if (condition1) output work.split1 else if (condition2) output work.split2 run; 这导致了两个不同的数据集。 这将不得不立即坚持得到我想要的结果…

如何使用Sparkfind中位数和分位数

我怎样才能find使用分布式方法,IPython和Spark整数RDD的中位数? RDD大约有70万个元素,因此太大而无法收集和find中位数。 这个问题类似于这个问题。 但是,问题的答案是使用我不知道的Scala。 我如何用Apache Spark计算确切的中位数? 使用Scala的思考答案,我试图用Python编写一个类似的答案。 我知道我首先要对RDD进行sorting。 我不知道怎么。 我看到sortBy (通过给定的keyfunc对此RDD进行sorting)和sortByKey (对此RDDsorting,假定它由(键,值)对组成)方法。 我认为这两个使用键值,我的RDD只有整数元素。 首先,我正在考虑做myrdd.sortBy(lambda x: x) ? 接下来我会findrdd( rdd.count() )的长度。 最后,我想在rdd的中心find元素或2个元素。 我也需要这个方法的帮助。 编辑: 我有一个想法。 也许我可以索引我的RDD ,然后key = index和value = element。 然后我可以尝试按价值sorting? 我不知道这是否可能,因为只有一个sortByKey方法。