累加器什么时候真的可靠?

我想使用累加器来收集关于我在Spark作业上操作的数据的一些统计信息。 理想情况下,当工作计算所需的转换时,我会这样做,但是由于Spark会在不同情况下重新计算任务,所以累加器不会反映真实的度量标准。 这里是文档如何描述这个:

对于仅在动作内执行的累加器更新,Spark保证每个任务对累加器的更新只应用一次,即重新启动的任务不会更新值。 在转换中,用户应该意识到,如果任务或作业阶段被重新执行,每个任务的更新可能会被应用多次。

这是令人困惑的,因为大多数操作不允许运行自定义代码(可以使用累加器),它们大多数都是从以前的转换(懒惰)中获得结果。 该文档还显示了这一点:

val acc = sc.accumulator(0) data.map(x => acc += x; f(x)) // Here, acc is still 0 because no actions have cause the `map` to be computed. 

但是,如果我们在最后添加data.count() ,这将保证是正确的(没有重复)或不? 很明显, acc并不是用于“仅在内部动作”,因为地图是一种转换。 所以它不应该得到保证。

另一方面,关于相关Jira票的讨论是关于“结果任务”而不是“行动”。 例如在这里和这里 。 这似乎表明,结果确实将被保证是正确的,因为我们正在使用acc之前和行动,因此应该计算为一个阶段。

我猜测这个“结果任务”的概念与所涉及的操作types有关,是最后一个包含操作的操作,就像在这个例子中,它显示了几个操作分成几个阶段(洋红,从这里拍摄的图像):

将几个操作分成多个紫色阶段的工作

因此,假设在该链末端的count()动作将是同一个最后阶段的一部分,并且我将保证在最后一个映射上使用的累加器不会包含任何重复项?

澄清这个问题将是伟大的! 谢谢。

回答“什么时候累加器真的可靠?

答:当他们在一个行动操作中。

根据“操作任务”中的文档,即使存在任何重新启动的任务,它也只会更新一次“累加器”。

对于仅在动作内执行的累加器更新,Spark保证每个任务对累加器的更新只应用一次,即重新启动的任务不会更新值。 在转换中,用户应该意识到,如果任务或作业阶段被重新执行,每个任务的更新可能会被应用多次。

和行动允许运行自定义代码。

例如

 val accNotEmpty = sc.accumulator(0) ip.foreach(x=>{ if(x!=""){ accNotEmpty += 1 } }) 

但是,为什么地图+行动即, 结果对于累加器操作, 任务操作不可靠

  1. 由于代码中的某些exception,任务失败。 Spark将尝试4次(默认的尝试次数)。如果每次任务失败,它将会发出exception。如果偶然成功,则Spark将继续,只是更新成功状态的累加器值,失败状态累加器值将被忽略。
    判决:正确处理
  2. 阶段失败:如果一个执行者节点崩溃,没有用户的错误,但硬件故障 – 如果节点在混洗阶段下降。因为随机输出存储在本地,如果一个节点下降,那个shuffle输出消失。回到生成shuffle输出的阶段,看看哪些任务需要重新运行,然后在一个仍然活着的节点上执行它们。当我们重新生成丢失的shuffle输出后,生成map输出的阶段执行了一些它的任务是多次的。火花计数来自所有的累加器更新。
    结论:在结果Task.Accumulator中不处理将给出错误的输出。
  3. 如果任务运行缓慢,则Spark可以在另一个节点上启动该任务的推测副本。
    结论:没有处理。累赘会给错误的输出。
  4. 被caching的RDD是巨大的,不能驻留在内存中。所以无论何时使用RDD,它将重新运行Map操作来获取RDD,并且再次累加器将被它更新。
    结论:没有处理。累赘会给错误的输出。

所以相同的函数可能会在同一个数据上运行多次,所以Spark不能保证累加器因Map操作而被更新。

所以最好在Spark中使用Accumulator in Action操作。

要了解更多关于Accumulator及其问题,请参阅此博客文章 – 由Imran Rashid。

当任务成功完成时,累加器更新被发送回驱动程序。 所以,当你确定每个任务将被执行一次,每个任务按照你的预期执行时,你的累加器结果保证是正确的。

我更喜欢依靠reduceaggregate来代替累加器,因为列举任何可以执行的任务是相当困难的。

  • 一个动作启动任务。
  • 如果一个行动取决于一个早期阶段,并且该阶段的结果没有(完全)被caching,那么来自早期阶段的任务将开始。
  • 当检测到less量慢速任务时,推测执行开始重复任务。

也就是说,有很多简单的情况下,累加器可以完全信任。

 val acc = sc.accumulator(0) val rdd = sc.parallelize(1 to 10, 2) val accumulating = rdd.map { x => acc += 1; x } accumulating.count assert(acc == 10) 

这将保证是正确的(没有重复)?

是的,如果投机执行被禁用。 mapcount将是一个单独的阶段,就像你说的那样,一个任务不可能被成功执行多次。

但累加器更新是一个副作用。 所以在思考如何执行代码时必须非常小心。 考虑这个,而不是accumulating.count

 // Same setup as before. accumulating.mapPartitions(p => Iterator(p.next)).collect assert(acc == 2) 

这也将为每个分区创build一个任务,每个任务将保证执行一次。 但是map的代码不会在所有元素上执行,只是每个分区中的第一个元素。

累加器就像一个全局variables。 如果您共享一个可以递增累加器的RDD引用,那么其他代码(其他线程)也可以使其增加。

 // Same setup as before. val x = new X(accumulating) // We don't know what X does. // It may trigger the calculation // any number of times. accumulating.count assert(acc >= 10) 

我认为Matei在提到的文件中回答了这个问题:

正如在https://github.com/apache/spark/pull/2524上所讨论的,在一般情况下(非结果阶段内的累加器更新)很难提供良好的语义,原因如下:;

  • RDD可以被计算为多个阶段的一部分。 例如,如果您更新MappedRDD内的累加器,然后将其拖动,那可能是一个阶段。 但是,如果你再次在MappedRDD上调用map(),并对其结果进行洗牌,则会得到第二个阶段,即地图正在stream水线化。 你想计算这个累加器更新两次吗?

  • 如果洗牌文件被周期性清理程序删除,或者由于节点故障而丢失,那么可能会重新提交整个阶段,所以跟踪RDD的任何内容都需要长时间执行(只要RDD在用户程序中可引用),这将是相当复杂的实施。

所以我现在要把这个标记为“不会修复”,除了SPARK-3628的结果部分。