星火代码组织和最佳实践

所以,在代码重用,devise模式和最佳实践经常被考虑在面向对象的世界里花费了很多年的时候,我发现自己在代码组织和代码重用方面有点挣扎。

如果我试图以可重用的方式编写代码,它几乎总会带来性能成本,我最终将其重写为任何最适合我的特殊用例。 这个常量“写出对这个特定用例最合适的”也会影响代码的组织,因为当“它全部真的属于一起”时,将代码分解成不同的对象或模块是困难的,因此我最终得到的只有很less的“神”复杂的转化链。 事实上,我经常想,如果我在面向对象的世界工作时,大部分我现在正在写的Spark代码,我会畏缩并将其视为“意大利面代码”。

我上网试图find某种相当于面向对象的世界的最佳实践,但没有太多的运气。 我可以find一些函数式编程的“最佳实践”,但是Spark只是增加了一个额外的层,因为性能在这里是一个很重要的因素。

所以我对你的问题是,有没有你的火花大师发现了一些编写Spark代码的最佳实践,你可以推荐?

编辑

正如在评论中写的,我实际上并没有期待任何人就如何解决这个问题发表一个答案,而是我希望在这个社区中有人遇到了一些Martin Fowlertypes,他们在某处写过som文章或博客文章关于如何解决Spark世界中代码组织的问题。

@DanielDarabosbuild议我可以举一个代码组织和性能相互冲突的例子。 虽然我发现在日常工作中我经常遇到这个问题,但是我发现把它简化成一个很好的小例子有点困难;但是我会尽力的。

在面向对象的世界中,我是单一职责原则的忠实粉丝,所以我会确保我的方法只对一件事负责。 它使得它们可重用,易于testing。 所以,如果我不得不说,计算列表中的一些数字的总和(匹配一些标准),我必须计算相同数字的平均数,我肯定会创build两个方法 – 一个计算总和和一个计算平均值。 喜欢这个:

def main(implicit args: Array[String]): Unit = { val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)) println("Summed weights for DK = " + summedWeights(list, "DK") println("Averaged weights for DK = " + averagedWeights(list, "DK") } def summedWeights(list: List, country: String): Double = { list.filter(_._1 == country).map(_._2).sum } def averagedWeights(list: List, country: String): Double = { val filteredByCountry = list.filter(_._1 == country) filteredByCountry.map(_._2).sum/ filteredByCountry.length } 

我当然可以继续在斯巴克荣获SRP:

 def main(implicit args: Array[String]): Unit = { val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight") println("Summed weights for DK = " + summedWeights(df, "DK") println("Averaged weights for DK = " + averagedWeights(df, "DK") } def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = { import org.apache.spark.sql.functions._ import sqlContext.implicits._ val countrySpecific = df.filter('country === country) val summedWeight = countrySpecific.agg(avg('weight)) summedWeight.first().getDouble(0) } def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = { import org.apache.spark.sql.functions._ import sqlContext.implicits._ val countrySpecific = df.filter('country === country) val summedWeight = countrySpecific.agg(sum('weight)) summedWeight.first().getDouble(0) } 

但是,因为我的df可能包含数十亿行,我宁愿不必执行filter两次。 事实上,绩效与EMR成本直接相关,所以我真的不想这样做。 为了克服这个问题,我决定违反SRP,并简单地把这两个函数放在一个,并确保我调用坚持在国家过滤的DataFrame ,如下所示:

 def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = { import org.apache.spark.sql.functions._ import sqlContext.implicits._ val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER) val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0) val averagedWeights = summedWeights / countrySpecific.count() (summedWeights, averagedWeights) } 

现在,这个例子当然是对现实生活中遇到的很大的简化。 在这里,我可以简单地通过过滤并坚持df来解决它, 然后将其交给总和和平均函数(这也将是更多的SRP),但在现实生活中可能会有一些中间计算需要一次又一次地进行。 换句话说,这里的filter函数只是一个试图做一个简单的例子,从持久化中受益。 实际上,我认为要persist呼叫是一个关键词。 调用persist会大大加快我的工作速度,但代价是我必须紧密连接依赖于持久DataFrame所有代码 – 即使它们在逻辑上是分开的。

我想你可以在YouTube上订阅Apache Sparkdatabricks频道,更多地了解和了解更多,特别是从别人的经验和教训。

  • Apache Spark
  • databricks
  • 星火技术中心

这里是一些video推荐:

  • SparkUI可视化
  • slide SparkUI可视化

  • 生产中的火花:来自100多个生产用户的经验教训

  • 生产中的星火:来自100多个生产用户的经验教训

  • 适用于企业系统pipe理员的Spark调整

  • 适用于企业系统pipe理员的Spark调整

  • 构build,debugging和debuggingSpark机器学习pipe道 – Joseph Bradley(Databricks)

  • slide构build,debugging和debuggingSpark机器学习pipe道

  • 编写Spark应用程序时排名前5的错误

  • 编写Spark应用程序时出现前5个错误

  • debugging和debuggingApache Spark

  • slide调整和debuggingApache Spark

  • 对Spark内部的深入理解–Aaron Davidson(Databricks)

  • slide深入理解火花内部 – 亚伦·戴维森(Databricks)

我已经发布并在我的github和博客上更新它:

  • githubpost
  • 博客文章

希望这可以帮助你〜