阶段如何分解成Spark中的任务?

我们假设以下每个时间点只有一个Spark作业正在运行。

我到目前为止

以下是我理解Spark中发生的事情:

  1. 当创buildSparkContext ,每个工作节点启动一个执行器。 执行程序是独立的进程(JVM),连接到驱动程序。 每个执行者都有驱动程序的jar。 退出司机,closures执行者。 每个执行者可以容纳一些分区。
  2. 执行作业时,根据谱系图创build执行计划。
  3. 执行工作被分成几个阶段,在阶段中包含尽可能多的邻居(在谱系图中)转换和行动,但没有洗牌。 因此阶段是通过洗牌分开的。

图片1

我明白那个

  • 任务是通过序列化函数对象从驱动程序发送给执行程序的命令。
  • 执行程序(使用驱动程序jar)反序列化命令(任务)并在分区上执行它。

问题(S)

我如何将舞台分成这些任务?

特别:

  1. 任务是由转换和行动决定的,还是可以是多个转换/行动?
  2. 任务是由分区确定的(例如,每个分区每个阶段一个任务)。
  3. 任务是由节点确定的(例如,每个节点每个阶段有一个任务)?

我认为(只有部分答案,即使是正确的)

在https://0x0fff.com/spark-architecture-shuffle中 ,洗牌是用图像解释的

在这里输入图像说明

我得到的是这个规则的印象

每个阶段被分成#个分区数量的任务,不考虑节点的数量

对于我的第一个图像,我会说我会有3个地图任务和3个减less任务。

对于来自0x0fff的图片,我会说有8个地图任务和3个减less任务(假设只有三个橙色和三个深绿色的文件)。

在任何情况下打开问题

那是对的吗? 但即使这是正确的,我上面的问题也没有得到回答,因为它仍然是开放的,多个操作(例如多个映射)是在一个任务内还是在每个操作中被分成一个任务。

别人怎么说

什么是Spark的任务? Spark工作人员如何执行jar文件? 以及Apache Spark调度程序如何将文件分解为任务? 是相似的,但我不觉得我的问题在那里得到了明确的答复。

你在这里有一个非常好的轮廓。 回答你的问题

  • 每个stage每个数据分区需要启动一个单独的task 。 请考虑每个分区可能驻留在不同的物理位置 – 例如HDFS中的块或本地文件系统的目录/卷。

请注意, Stage的提交由DAG Scheduler驱动。 这意味着不相互依赖的阶段可以提交给集群并行执行:这最大化了集群的并行化能力。 因此,如果我们的数据stream中的操作可以同时发生,我们将期望看到多个阶段启动。

我们可以看到,在下面的玩具示例中,我们执行以下types的操作:

  • 加载两个数据源
  • 分别在两个数据源上执行一些映射操作
  • join他们
  • 对结果执行一些映射和过滤操作
  • 保存结果

那么我们最终会有多less个阶段呢?

  • 每个阶段用于并行加载两个数据源= 2个阶段
  • 第三阶段代表依赖于其他两个阶段的联合
  • 注意:所有对连接数据进行的后续操作都可能在同一阶段执行,因为它们必须按顺序进行。 启动额外的阶段是没有好处的,因为直到先前的操作完成才能开始工作。

这是玩具程序

 val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) } val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }} val spj = sfi.join(sp) val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }} val sf = sm.filter{ case (k,v) => v % 10 == 0 } sf.saveAsTextFile("/data/blah/out") 

这里是结果的DAG

在这里输入图像说明

现在:有多less任务 ? 任务的数量应该等于

Stage * #Partitions in the stage Stage总和)

在我的情况下, #Partitions in the stage#Partitions in the stage等于我的集群机器上number of processors

如果我理解正确,有2(相关)的东西让你感到困惑:

1)什么决定了任务的内容?

2)什么决定了要执行的任务数量?

Spark的引擎将连续rdds上的简单操作“粘”在一起,例如:

 rdd1 = sc.textFile( ... ) rdd2 = rdd1.filter( ... ) rdd3 = rdd2.map( ... ) rdd3RowCount = rdd3.count 

所以当rdd3被(懒惰地)计算出来时,spark会为rdd1的每个分区生成一个任务,每个任务都会执行filter和每行的map来生成rdd3。

任务的数量由分区数决定。 每个RDD都有一个定义数量的分区。 对于从HDFS读取的源RDD(例如,使用sc.textFile(…)),分区数是由input格式生成的分割数。 RDD上的某些操作可能导致RDD具有不同数量的分区:

 rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ). 

另一个例子是join:

 rdd3 = rdd1.join( rdd2 , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ). 

(大多数)改变分区数量的操作涉及洗牌,当我们做例如:

 rdd2 = rdd1.repartition( 1000 ) 

实际情况是rdd1的每个分区上的任务需要产生一个可以在下一个阶段读取的结束输出,以便使rdd2具有完全1000个分区(它们是如何实现的? 散列或sorting )。 这方面的任务有时被称为“地图(侧面)任务”。 稍后在rdd2上运行的任务将作用于一个分区(rdd2!),并且必须弄清楚如何读取/组合与该分区相关的地图侧输出。 这方面的任务有时被称为“减less(侧面)任务”。

这两个问题是相关的:一个阶段中的任务数量是分区的数量(连续rdds“共同粘在一起”),一个rdd的分区数量可以在不同阶段间变化(通过指定分区的数量洗牌造成的操作)。

一旦阶段执行开始,其任务可以占用任务槽。 并发任务槽的数量是numExecutors * ExecutorCores。 一般来说,这些可以被来自不同的,非依赖的阶段的任务占用。

这可能会帮助你更好地理解不同的部分:

  • 阶段:是一个任务的集合。 相同的进程针对不同的数据子集(分区)运行。
  • 任务:表示分布式数据集的分区上的一个工作单元。 因此,在每个阶段,任务数量=分区数量,或者如您所说的“每个分区每个阶段一个任务”。
  • 每个执行器在一个纱线容器上运行,每个容器驻留在一个节点上。
  • 每个阶段使用多个执行器,每个执行器分配多个核心。
  • 每个vcore一次只能执行一个任务
  • 所以在任何阶段,可以同时执行多个任务。 运行的任务数量=正在使用的核心数量。