Tag: apache spark

有效计数与Apache Spark不同

在一些网站(比如100个网站)上,有1亿个用户点击了1000亿次。 点击stream在大数据集中可用。 使用Apache Spark的抽象,每个网站统计不同访问者的最有效方法是什么?

火花杀死运行应用程序

我有一个正在运行的Spark应用程序,它占用了我的其他应用程序不会被分配任何资源的所有核心。 我做了一些快速的研究,人们build议使用YARN kill或/ bin / spark-class来终止命令。 但是,我使用的CDH版本和/ bin / spark-class根本不存在,YARN kill应用程序也不起作用。 任何人都可以用这个吗?

HashPartitioner如何工作?

我阅读了HashPartitioner的文档。 不幸的是,除了API调用之外,没有什么可解释的。 我假设HashPartitioner根据密钥的哈希来分配分布式集合。 例如,如果我的数据是 (1,1), (1,2), (1,3), (2,1), (2,2), (2,3) 所以分区器会把这个分区放到不同的分区中,同一个分区中的键也是一样的。 但是我不明白构造函数参数的意义 new HashPartitoner(numPartitions) //What does numPartitions do? 对于上面的数据集,如果我做了,结果会有什么不同 new HashPartitoner(1) new HashPartitoner(2) new HashPartitoner(10) 那么HashPartitioner究竟如何工作呢?

使用spark-csv编写单个CSV文件

我正在使用https://github.com/databricks/spark-csv ,我想写一个单一的CSV,但不能够,它正在做一个文件夹。 需要一个带path和文件名参数的scala函数,并写入该CSV文件。

Spark:如何从Spark壳运行Spark文件

我正在使用CDH 5.2。 我能够使用spark-shell来运行命令。 我如何运行包含spark命令的文件(file.spark)。 其次,有没有办法在没有sbt的情况下运行/编译CDH 5.2中的scala程序? 提前致谢,

Apache Spark:如何在Python中使用pyspark 3

我从GH开发大师那里构build了Spark 1.4,构build得很好。 但是当我做一个bin/pyspark我得到了Python 2.7.9版本。 我怎样才能改变这个?

如何在Spark DataFrame中添加一个常量列?

我想在DataFrame添加一个任意值的列(每行都是一样的)。 我在使用withColumn时出现错误,如下所示: dt.withColumn('new_column', 10).head(5) ————————————————————————— AttributeError Traceback (most recent call last) <ipython-input-50-a6d0257ca2be> in <module>() 1 dt = (messages 2 .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt"))) —-> 3 dt.withColumn('new_column', 10).head(5) /Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col) 1166 [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)] 1167 """ -> 1168 return self.select('*', col.alias(colName)) 1169 1170 @ignore_unicode_prefix AttributeError: 'int' object has no attribute 'alias' […]

如何将数据框转换回正常的RDD在pyspark?

我需要使用 (rdd.)partitionBy(npartitions, custom_partitioner) 方法在DataFrame上不可用。 所有的DataFrame方法只涉及DataFrame结果。 那么如何从DataFrame数据创build一个RDD呢? 注意:这是从1.2.0更改(在1.3.0)。 从@dpangmao的答案更新 :方法是.rdd。 我有兴趣了解是否(a)它是公开的和(b)什么是性能影响。 那么(a)是肯定的,(b) – 你可以看到这里有很大的性能影响:必须通过调用mapPartitions来创build一个新的RDD: 在dataframe.py (注意文件名也改变了(是sql.py): @property def rdd(self): """ Return the content of the :class:`DataFrame` as an :class:`RDD` of :class:`Row` s. """ if not hasattr(self, '_lazy_rdd'): jrdd = self._jdf.javaToPython() rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) schema = self.schema def applySchema(it): cls = _create_cls(schema) return itertools.imap(cls, it) […]

哪些操作保留RDD订单?

RDD有一个有意义的 (与存储模型施加的一些随机顺序相反),如果它是由sortBy()处理的,如本答复所述 。 现在,哪些操作保持这个顺序? 例如,它保证 (在a.sortBy() ) a.map(f).zip(a) === a.map(x => (f(x),x)) 怎么样 a.filter(f).map(g) === a.map(x => (x,g(x))).filter(f(_._1)).map(_._2) 关于什么 a.filter(f).flatMap(g) === a.flatMap(x => g(x).map((x,_))).filter(f(_._1)).map(_._2) 在这里,“平等” ===被理解为“function等同”,即无法通过用户级别的操作(即不读取日志&c)区分结果。

如何运行一个spark java程序

我写了一个Java程序的火花。 但是如何从unix命令行运行和编译它。 编译运行时是否需要包含任何jar?