Spark Barrier Execution Mode 屏障执行模式
2021-05-08
Coding
Spark
👋 ‍️‍️阅读
❤️ 喜欢
💬 评论

Spark Barrier Execution Mode 屏障执行模式

屏障执行模式(Barrier Execution Mode)是Spark 2.4版本引入的新的执行模式。 旨在支持让Spark平台支持更多样化的工作类型,例如调度ML(机器学习)/DL(深度学习)训练任务。 本文将带你了解什么是屏障执行模式,为什么Spark需要,以及如何使用。

执行模式

MapReduce

MapReduce是大多数流行框架的执行模式,通过划分Map和Reduce阶段(Stage),可以灵活的处理各种作业(Job)。

在MapReduce中

  • Job是Stage的集合,每个Stage都是Map或Reduce,阶段之间可能会进行Shuffle
  • Stage是Task的集合,Task之间没有依赖。这种结构被称为Shared Nothing,有很强的伸缩性。
  • 基于上述的SN结构,任何Task失败后可以单独进行重试而不影响其他Task
  • Map Task的数量由数据量决定,而Reduce Task的数量由开发者决定

Barrier

虽然MapReduce已经实践了很多年,但是在近几年流行的深度学习领域却无能为力。 神经网络中的任务很难通过MapReduce模型进行抽象,一般通过MPI或其他模型。 为了能够支持调度深度学习任务,Spark需要在MapReduce之外引入新的执行模型,即Barrier。

在Barrier中

  • Job是Stage的集合,Stage之间可能会进行Shuffle
  • Stage是Task的集合,所有Task同时运行,并且会存在依赖关系。在MPI中,Task会相互通信。这是与MapReduce的主要区别。
  • 由于上述的依赖关系,当一个Task失败,所有Task都需要重试。
  • Task数量始终由开发者指定,开发者需要保证有足够的资源来执行

Spark RDDBarrier

Barrier执行模式已经在Spark 2.4得到了支持。 新的RDDBarrier类型被引入,我们可以通过它来处理Barrier任务。

创建

调用RDD.barrier方法,你就可以获得一个RDDBarrier对象。 RDDBarrier对象只有一个方法mapPartitions,它和通常的RDD.mapPartitions类似,接收分区数据并返回结果集。

df = spark.range(0, 10, numPartitions=4)
barrier_rdd = df.rdd.barrier()

屏障

屏障的功能通过BarrierTaskContext来提供。

带屏障的mapPartitions操作一般形如:

def map_rows(rows):
    context = BarrierTaskContext.get() # ①
    # do something
    context.barrier() # ②
    # do something
    context.barrier() # ③
    # do somthing
    return ...
  1. 获得当前屏障任务上下文
  2. 创建一个屏障,只有当该Stage的所有任务全都到达屏障,才会继续往下执行。
  3. 另一个屏障,作用同上。需要注意的是,每个任务中屏障的数量必须相同,否则会一直等到屏障直到超时。

下面一段代码可以帮助你理解屏障的作用

df = spark.range(0, 10, numPartitions=4)

def no_barrier(rows):
    print(1)
    print(2)
    return rows

df.rdd.mapPartitions(no_barrier).collect() # ①

def with_barrier(rows):
    context = BarrierTaskContext.get()
    print(1)
    context.barrier()
    print(2)
    return rows

df.rdd.barrier().mapPartitions(with_barrier).collect() # ②
  1. 第一种普通模式的mapPartitions,输出结果为12121212(有小概率会看到不同的结果)
  2. 第二种屏障模式的mapPartitions,输出结果一定11112222

并行资源

前文讲到,Barrier模式的任务数量由开发者控制,而且所有任务必须同时进行。 所以如果资源不足,则会一直等待资源(对于同样的情况,MapReduce不会等待)。

spark: SparkSession = SparkSession.builder.master('local[1]').getOrCreate()

df = spark.range(0, 10, numPartitions=4)
df.rdd.barrier().mapPartitions(lambda e:e).collect()

执行上面的测试代码,你会得到如下警告:

21/05/08 14:57:58 WARN DAGScheduler: Barrier stage in job 0 requires 4 slots, but only 1 are available. Will retry up to 40 more times
21/05/08 14:58:13 WARN DAGScheduler: Barrier stage in job 0 requires 4 slots, but only 1 are available. Will retry up to 39 more times

Spark调度器抱怨没有足够的资源执行任务。所以我们在使用Barrier时需要注意使用正确的分片数量。

MPI

有了以上基础,我们就可以很容易的在Spark中调度MPI作业。

def run_mpi_task(rows):
    context = BarrierTaskContext.get()
    if (context.partitionId() == 0) {
    addrs = context.getTaskInfos().map(_.address)
    ... # 启动MPI任务
    }
    context.barrier() # 等待任务完成
}

集成

归根结底,屏障执行模式是为了让Spark更快更便捷地拥抱ML/DL。 为此,各种集成框架和工具应运而生,以快速支持现有的ML/DL代码。

Horovod

Horovod是一个分布式DL训练框架,支持TensorFlow, Keras, PyTorch, and Apache MXNet。 该项目最初由Uber发起,后由LF AI基金会维护。

from tensorflow import keras
import tensorflow as tf
import horovod.spark.keras as hvd

model = keras.models.Sequential()
    .add(keras.layers.Dense(8, input_dim=2))
    .add(keras.layers.Activation('tanh'))
    .add(keras.layers.Dense(1))
    .add(keras.layers.Activation('sigmoid'))

optimizer = keras.optimizers.SGD(lr=0.1)
loss = 'binary_crossentropy'

keras_estimator = hvd.KerasEstimator(model, optimizer, loss)

keras_model = keras_estimator.fit(train_df)
predict_df = keras_model.transform(test_df)

更多细节,参阅这里

Tensorflow

如果你使用的是Tensorflow,其官方已于2020年提供了spark-tensorflow-distributor,利用Barrier执行模式直接在Spark上调度。

更多细节,参阅这里

总结

值得一提的是,除了屏障执行模式,Spark还在3.0版本中提供了GPU支持。 有了这两件利器,Spark可以更加紧密地拥抱AI时代。

Reference


Copyright © 2020-2022 Dean Xu. All Rights reserved.