Tag: apache spark

在Spark DataFrame中查找每个组的最大行数

我试图使用Spark数据框而不是RDDs,因为它们看起来比RDD更高级,并且倾向于生成更多可读的代码,但是我将非常乐意获得关于手头任务更为惯用的build议。 在一个14节点的Google Dataproc集群中,我有大约6百万个名字被两个不同的系统翻译成ID: sa和sb 。 每Row包含name , id_sa和id_sb 。 我的目标是产生从id_sa到id_sb的映射,使得对于每个id_sa ,相应的id_sb是与id_sb相连的所有名称中最频繁的id。 我们试着用一个例子来澄清一下。 如果我有以下几行: [Row(name='n1', id_sa='a1', id_sb='b1'), Row(name='n2', id_sa='a1', id_sb='b2'), Row(name='n3', id_sa='a1', id_sb='b2'), Row(name='n4', id_sa='a2', id_sb='b2')] 我的目标是产生从a1到b2的映射。 实际上,与a1相关的名称分别是n1 , n2和n3 ,它们分别映射到b1 , b2和b2 ,所以b2是与a1相关联的名称中最频繁的映射。 以同样的方式, a2将被映射到b2 。 假设永远有一个赢家是没有问题的:不需要打破关系。 我希望我可以在我的数据groupBy(df.id_sa)上使用groupBy(df.id_sa) ,但是我不知道下一步该怎么做。 我希望有一个聚合,最后可以产生以下几行: [Row(id_sa=a1, max_id_sb=b2), Row(id_sa=a2, max_id_sb=b2)] 但也许我试图使用错误的工具,我应该回去使用RDD。

如何在Spark SQL中定义和使用用户定义的聚合函数?

我知道如何在Spark SQL中编写UDF: def belowThreshold(power: Int): Boolean = { return power < -40 } sqlContext.udf.register("belowThreshold", belowThreshold _) 我可以做类似的定义一个聚合函数吗? 这是怎么做的? 对于上下文,我想运行下面的SQL查询: val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp FROM ifDF WHERE opticalReceivePower IS NOT null GROUP BY span, timestamp ORDER BY span""") 它应该返回类似的东西 Row(span1, false, T0) 我想要聚合函数来告诉我,如果在由span和timestamp定义的组中的opticalReceivePower有任何值在阈值以下。 我是否需要以不同的方式将我的UDAF写入上面粘贴的UDF?

SPARK SQLreplace为mysql GROUP_CONCAT聚合函数

我有一个两个stringtypes的列表(用户名,朋友),并为每个用户名,我想收集一行中的所有朋友,连接为string('username1','friends1,friends2,friends3')。 我知道MySql是通过GROUP_CONCAT做到的,有没有办法用SPARK SQL来做到这一点? 谢谢

Apache Spark中的案例类相等

为什么Spark中的模式匹配与Scala中的模式匹配不一样? 看下面的例子…函数f()尝试模式匹配的类,它在Scala REPL中工作,但在Spark中失败,并导致所有“???”。 f2()是一种解决方法,它使用.isInstanceOf()在Spark中获得所需的结果,但是我明白在Scala中是不好的forms。 任何帮助模式匹配在这种情况下在火花正确的方式将不胜感激。 abstract class a extends Serializable {val a: Int} case class b(a: Int) extends a case class bNull(a: Int=0) extends a val x: List[a] = List(b(0), b(1), bNull()) val xRdd = sc.parallelize(x) 尝试在Scala REPL中工作的模式匹配,但在Spark中失败 def f(x: a) = x match { case b(n) => "b" case bNull(n) => "bnull" case _ […]

尝试将dataframe行映射到更新的行时发生编码器错误

当我尝试在我的代码中做同样的事情,如下所述 dataframe.map(row => { val row1 = row.getAs[String](1) val make = if (row1.toLowerCase == "tesla") "S" else row1 Row(row(0),make,row(2)) }) 我从这里采取上述参考: 斯卡拉:我怎样才能在Dataframs中使用scalareplace值但是我得到编码器错误 无法find存储在数据集中的types的编码器。 原始types(Int,S tring等)和产品types(case类)通过导入spark.im plicits._支持序列化其他types将在未来版本中添加。 注意:我正在使用spark 2.0!

如何使用Spark DataFrame查询JSON数据列?

我有一个卡桑德拉表为简单起见如下所示: key: text jsonData: text blobData: blob 我可以使用spark和spark-cassandra连接器为此创build一个基本数据框: val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load() 尽pipe我将JSON数据扩展到其底层结构,但我仍在苦苦挣扎。 我最终希望能够根据jsonstring中的属性进行过滤并返回blob数据。 像jsonData.foo =“bar”,并返回blobData。 目前这是可能的吗?

如何从Spark的CSV文件中跳过标题?

假设我给三个文件path来读取一个Spark上下文,并且每个文件在第一行都有一个模式。 我们怎样才能从头文件中跳过模式行? val rdd=sc.textFile("file1,file2,file3") 现在,我们如何跳过这个rdd的标题行?

我如何将RDD分成两个或更多的RDD?

我正在寻找一种将RDD分成两个或更多RDD的方法。 我见过的最接近的是斯卡拉星火:分解成几个RDD? 这仍然是一个单一的RDD。 如果你对SAS很熟悉,像这样: data work.split1, work.split2; set work.preSplit; if (condition1) output work.split1 else if (condition2) output work.split2 run; 这导致了两个不同的数据集。 这将不得不立即坚持得到我想要的结果…

如何使用Sparkfind中位数和分位数

我怎样才能find使用分布式方法,IPython和Spark整数RDD的中位数? RDD大约有70万个元素,因此太大而无法收集和find中位数。 这个问题类似于这个问题。 但是,问题的答案是使用我不知道的Scala。 我如何用Apache Spark计算确切的中位数? 使用Scala的思考答案,我试图用Python编写一个类似的答案。 我知道我首先要对RDD进行sorting。 我不知道怎么。 我看到sortBy (通过给定的keyfunc对此RDD进行sorting)和sortByKey (对此RDDsorting,假定它由(键,值)对组成)方法。 我认为这两个使用键值,我的RDD只有整数元素。 首先,我正在考虑做myrdd.sortBy(lambda x: x) ? 接下来我会findrdd( rdd.count() )的长度。 最后,我想在rdd的中心find元素或2个元素。 我也需要这个方法的帮助。 编辑: 我有一个想法。 也许我可以索引我的RDD ,然后key = index和value = element。 然后我可以尝试按价值sorting? 我不知道这是否可能,因为只有一个sortByKey方法。

Spark java.lang.OutOfMemoryError:Java堆空间

我的集群:1个主站,11个从站,每个节点有6GB的内存。 我的设置: spark.executor.memory=4g, Dspark.akka.frameSize=512 这是问题: 首先 ,我从HDFS读取一些数据(2.19 GB)到RDD: val imageBundleRDD = sc.newAPIHadoopFile(…) 其次 ,在RDD上做一些事情: val res = imageBundleRDD.map(data => { val desPoints = threeDReconstruction(data._2, bg) (data._1, desPoints) }) 最后输出到HDFS: res.saveAsNewAPIHadoopFile(…) 当我运行我的程序时,显示: ….. 14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL) 14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as […]