《弹性分布式数据集:集群内存计算的一种容错抽象》 论文

2022-07-02

《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》 论文本地文件(PDF, 866 KB)

作者:Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica

University of California, Berkeley


总结:Spark 是一个实现了 RDD 的系统。

RDD 有下面几个特点

  • 分布式内存模型(集群上的内存存储,vs HDFS 是集群上的文件系统,vs MapReduce 两个作业间的数据重用是通过外部存储转存);
  • 粗粒度的转换(基于分区的转换,而不是数据行,vs 其他内存抽象,键值对存储,数据库等是细粒度的);
  • 容错(记录数据集的转换关系,失败的分区可以通过血缘关系重新计算出);
  • 通用的计算框架(提供一系列通用高效的编程接口,可以表达其他很多集群编程模型,vs 迭代图计算的 Pregel 或者提供迭代 MapReduce 接口 HaLoop 等);

摘要

我们提出了弹性分布式数据集(Resilient Distributed Datasets,RDDs),这是一个分布式内存抽象,让程序员以容错的方式在大型集群上进行内存计算。RDDs 是由目前计算框架处理效率低下的两类应用所激发的:迭代算法和交互式数据挖掘工具。在这两种情况下,将数据保存在内存中可以提高一个数量级的性能。 为了有效地实现容错,RDDs 提供了一种限制性的共享内存形式,基于粗粒度的转换而不是细粒度的共享状态更新。然而,我们表明,RDD 的表现力足以捕捉到广泛的计算类别,包括最近用于迭代工作的专门编程模型(如 Pregel),以及这些模型没有捕捉到的新应用。我们在一个叫做 Spark 的系统中实现了RDDs,我们通过各种用户应用和基准来评估它。

1 引言

像 MapReduce 和 Dryad 这样的集群计算框架已经被广泛用于大规模数据分析。这些系统让用户使用一组高级运算符编写并行计算,而不必担心工作分配和容错问题。

尽管目前的框架为访问集群的计算资源提供了许多抽象,但它们缺乏利用分布式内存的抽象。这使得它们对于一类重要的新兴应用来说效率低下:那些在多个计算中重复使用中间结果的应用。数据重用在许多迭代机器学习和图形算法中很常见,包括 PageRank、K-means 聚类和 Logistic 回归。另一个引人注目的用例是交互式数据挖掘,用户在同一数据子集上运行多个特定的查询。不幸的是,在目前的大多数框架中,在计算之间重复使用数据的唯一方法(例如,在两个 MapReduce 作业之间)是将其写入一个外部稳定的存储系统,例如,一个分布式文件系统。由于数据复制、磁盘 I/O 和序列化,这产生了大量的开销,这些开销可能会支配应用程序的执行时间。

认识到这个问题,研究人员为一些需要数据重用的应用开发了专门的框架。例如,Pregel 是一个用于迭代图计算的系统,将中间数据保存在内存中,而 HaLoop 提供了一个迭代 MapReduce 接口。然而,这些框架只支持特定的计算模式(例如,循环一系列的 MapReduce 步骤),并为这些模式隐式地执行数据共享。它们没有为更普遍的重用提供抽象,例如,让用户将几个数据集加载到内存中并在它们之间运行临时查询。

在本文中,我们提出了一个新的抽象,称为弹性分布式数据集(RDDs),能够在广泛的应用中实现高效的数据重用。RDDs 是一种容错的并行数据结构,让用户明确地将中间结果保存在内存中,控制它们的分区以优化数据放置,并使用丰富的运算符集来操作它们。

设计 RDD 的主要挑战是定义一个能够有效提供容错的编程接口。现有的集群内存存储抽象,如分布式共享内存、键值存储、数据库和 Piccolo,提供了一个基于细粒度更新易变状态(如表中的单元)的接口。通过这种接口,提供容错的唯一方法是在机器间复制数据或在机器间记录更新。这两种方法对于数据密集型工作负载来说都很昂贵,因为它们需要在集群网络上复制大量数据,而集群网络的带宽远远低于 RAM 的带宽,而且它们会产生大量的存储开销。

与这些系统相比,RDDs 提供了一个基于粗粒度转换的接口(如映射、过滤和连接),对许多数据项进行相同的操作。这使得它们能够通过记录用于构建数据集的转换(其脉络)而不是实际数据,有效地提供容错。如果 RDD 的一个分区丢失,RDD 有足够的信息来了解它是如何从其他 RDD 中衍生出来的,以便重新计算该分区。因此,丢失的数据可以被恢复,通常是相当快的,而不需要昂贵的复制。

尽管基于粗粒度转换的接口起初看起来很有限,但 RDDs 很适合许多并行应用,因为这些应用自然地将相同的操作应用于多个数据项。事实上,我们表明,RDDs 可以有效地表达迄今为止作为独立系统提出的许多集群编程模型,包括 MapReduce、DryadLINQ、SQL、Pregel 和 HaLoop,以及这些系统没有捕捉到的新应用,如交互式数据挖掘。我们认为,RDD 能够满足以前只能通过引入新框架来满足的计算需求,这是 RDD 抽象能力的最可信的证据。

我们在一个名为 Spark 的系统中实现了 RDDs,该系统正被用于加州大学伯克利分校和一些公司的研究和生产应用。Spark 提供了一个类似于 Scala 编程语言中的 DryadLINQ 的便捷的语言集成编程接口。此外,Spark 可以交互式地使用 Scala 解释器来查询大数据集。 我们认为,Spark 是第一个允许通用编程语言以交互速度用于集群上的内存数据挖掘的系统。

我们通过微观基准和用户应用的测量来评估 RDDs 和 Spark。 我们表明,Spark 在迭代应用方面比 Hadoop 快 20 倍,将一个真实世界的数据分析报告的速度提高了 40 倍,并且可以用交互方式扫描一个 1 TB 的数据集,延迟时间为 5-7 秒。更根本的是,为了说明 RDD 的通用性,我们在 Spark 之上实现了 Pregel 和 HaLoop 编程模型,包括它们采用的放置优化,作为相对较小的库(每一个库有 200 行代码)。

本文首先概述了 RDDs(第 2 节)和 Spark(第 3 节)。然后我们讨论了 RDDs 的内部表示(第 4 节)、我们的实现(第 5 节)和实验结果(第 6 节)。最后,我们讨论了 RDDs 如何捕捉现有的几个集群编程模型(第 7 节),调查相关工作(第 8 节),并得出结论。

2 RDDs

本节提供了 RDDs 的概述。我们首先定义了 RDDs(第 2.1 节),并介绍了 Spark 中的编程接口(第 2.2 节)。然后,我们将 RDDs 与细粒度共享内存抽象进行比较(第 2.3 节)。最后,我们讨论了 RDD 模型的局限性(第 2.4 节)。

2.1 RDD 抽象

从形式上看,RDD 是一个只读的、分区的记录集合。RDDs 只能通过对(1)稳定存储中的数据或(2)其他 RDDs 的确定性操作来创建。我们将这些操作称为转换,以区别于对 RDD 的其他操作。转换的例子包括 map、filter 和 join。

RDDs 不需要在任何时候都被物化。相反,一个 RDD 拥有足够的信息,知道它是如何从其他数据集衍生出来的(它的血缘),可以从稳定存储的数据中计算出它的分区。这是一个强大的属性:从本质上讲,一个程序不能引用一个它在失败后不能重建的 RDD。

最后,用户可以控制 RDDs 的另外两个方面:持久性和分区。用户可以指出他们将重复使用哪些 RDDs,并为它们选择存储策略(例如内存存储)。他们还可以要求根据每条记录中的一个键,在不同的机器上对 RDD 的元素进行分区。这对放置优化很有用,比如确保两个将被连接在一起的数据集以相同的方式进行哈希分区。

2.2 Spark 编程接口

Spark 通过与 DryadLINQ 和 FlumeJava 类似的语言集成 API 来展示 RDD,其中每个数据集被表示为一个对象,并使用这些对象的方法来调用转换。

程序员首先通过对稳定存储中的数据进行转换(如 map 和 filter)来定义一个或多个 RDDs。然后,他们可以在操作中使用这些 RDDs,即向应用程序返回一个值或向存储系统输出数据的操作。动作的例子包括 count(返回数据集中的元素数量),collection(返回元素本身),以及 save(将数据集输出到存储系统)。像 DryadLINQ 一样,Spark 在行动中第一次使用 RDD 时,会懒洋洋地计算 RDD,这样它就可以进行管道转换。

此外,程序员可以调用一个持久化方法来表明他们想在未来的操作中重复使用哪些 RDDs。Spark 默认将持久化的 RDDs 保存在内存中,但如果没有足够的 RAM,它可以将它们溢出到磁盘上。用户还可以通过持久化标志(flags to persist)请求其他持久化策略,比如只把 RDD 存储在磁盘上,或者在不同的机器上进行复制。最后,用户可以在每个 RDD 上设置一个持久化优先级,以指定哪些内存中的数据应该先溢出到磁盘上。

2.2.1 例子:终端日志挖掘

假设一个网络服务遇到了错误,一个操作员想在 Hadoop 文件系统(HDFS)中搜索数千兆字节的日志以找到原因。使用 Spark,操作者可以将日志中的错误信息加载到一组节点的 RAM 中,并以交互方式进行查询。 她会首先输入以下 Scala 代码:

lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()

第 1 行定义了一个由 HDFS 文件支持的 RDD(作为文本行的集合),而第 2 行则从中导出了一个过滤的 RDD。然后,第 3 行要求含 errors 的内容在内存中持续存在,以便它可以在不同的查询中共享。请注意,过滤器的参数是 Scala 语法的一个闭包。

在这一点上,没有在集群上进行任何工作。然而,用户现在可以在行动中使用 RDD,例如,计算消息的数量。

errors.count()

用户能用 RDD 做进一步的转换,并使用它们的结果,如下面代码:

// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split('\t')(3))
.collect()

图 1

在包含 error 的第一个动作运行后,Spark 将把 error 的分区存储在内存中,大大加快了对它的后续计算。请注意,基础 RDD 的行并没有被加载到 RAM 中。这是可取的,因为 error 可能只是数据的一小部分(小到可以装入内存)。

最后,为了说明我们的模型是如何实现容错的,我们在图 1 中展示了第三个查询中的 RDD 的血缘关系。在这个查询中,我们从 error 开始,对行进行过滤的结果,并在运行收集之前进一步应用过滤和映射。Spark 调度器将对后两种转换进行流水线处理,并向持有缓存的错误分区的节点发送一组任务来计算它们。此外,如果一个错误分区丢失了,Spark 会通过只对相应的行的分区应用过滤器来重建它。

2.3 RDD 模型的优势

表 1

为了了解 RDD 作为分布式内存抽象的好处,我们将其与表 1 中的分布式共享内存(DSM)进行比较。在 DSM 系统中,应用程序对全局地址空间中的任意位置进行读取和写入。请注意,在这个定义下,我们不仅包括传统的共享内存系统,还包括其他应用程序对共享状态进行细粒度写入的系统,包括提供共享 DHT 的 Piccolo,以及分布式数据库。 DSM 是一个非常通用的抽象,但这种通用性使它更难在商业集群上以高效和容错的方式实现。

RDDs 和 DSM 的主要区别在于,RDDs 只能通过粗粒度的转换来创建(“写入”),而 DSM 允许对每个内存位置进行读和写。特别是,RDDs 不需要招致检查点的开销,因为它们可以使用线程恢复。此外,在故障发生时,只有 RDD 的丢失分区需要重新计算,而且它们可以在不同的节点上并行地重新计算,而不必回滚整个程序。

RDD 的第二个好处是,其不可改变的性质让系统可以通过运行慢速任务的备份来缓解慢速节点(散兵游勇),就像 MapReduce 中那样。 备份任务在 DSM 中很难实现,因为任务的两个副本会访问相同的内存位置并干扰对方的更新。

最后,RDD 比 DSM 还有两个好处。 首先,在对 RDDs 进行批量操作时,运行时可以根据数据位置安排任务,以提高性能。 其次,当没有足够的内存来存储 RDD 时,只要 RDD 只被用于基于扫描的操作,就会优雅地降低性能。不适合 RAM 的分区可以存储在磁盘上,并提供与当前数据并行系统类似的性能。

2.4 不适合 RDD 的场景

正如介绍中所讨论的,RDD 最适合于对数据集的所有元素进行相同操作的批量应用。在这些情况下,RDDs 可以有效地将每个转换记为血缘图中的一个步骤,并且可以恢复丢失的分区,而不必记录大量的数据。RDDs 不太适合于对共享状态进行异步细粒度更新的应用,例如网络应用的存储系统或增量网络爬虫。 对于这些应用,使用执行传统更新记录和数据检查点的系统更为有效,如数据库、RAMCloud、Percolator 和 Piccolo。我们的目标是为批量分析提供一个高效的编程模型,并将这些异步应用留给专门的系统。

3 Spark 编程接口

Spark 通过与 Scala 中的 DryadLINQ 类似的语言集成 API 来提供 RDD 抽象,Scala 是 Java 虚拟机的静态类型的函数式编程语言。我们选择 Scala 是因为它兼具简洁性(便于交互式使用)和效率(由于静态类型)。然而,关于 RDD 抽象的任何内容都不需要使用函数式语言。

图 2

要使用 Spark,开发者要编写一个连接到工作者集群的驱动程序,如图 2 所示。 驱动程序定义了一个或多个 RDDs,并对其进行调用操作。驱动器上的 Spark 代码也跟踪 RDDs 的血缘关系。Worker 是长期存在的进程,可以将 RDD 分区存储在 RAM 中,跨越操作。

正如我们在第 2.2.1 节的日志挖掘示例中所展示的,用户通过传递闭包(函数字面量)为 RDD 操作提供参数,如 map。 Scala 将每个闭包表示为一个 Java 对象,这些对象可以被序列化并加载到另一个节点上,以便在网络上传递闭包。Scala 还将闭包中绑定的任何变量保存为 Java 对象中的字段。例如,我们可以编写类似 var x = 5; rdd.map(_ + x) 这样的代码来给 RDD 的每个元素加 5。

RDD 本身是静态类型的对象,由一个元素类型作为参数。 例如,RDD[Int] 是一个整数的RDD。然而,由于 Scala 支持类型推理,我们的大多数例子都省略了类型。

虽然我们在 Scala 中公开 RDD 的办法在概念上很简单,但我们不得不利用反射来解决 Scala 的闭包对象的问题。我们还需要做更多的工作来使 Spark 可以从 Scala 解释器中使用,这一点我们将在第 5.2 节中讨论。尽管如此,我们并没有修改 Scala 的编译器。

3.1 Spark 中的 RDD 算子

表 2

表 2 列出了 Spark 中可用的主要 RDD 转换和操作。我们给出了每个操作的签名,在方括号中显示了类型参数。回顾一下,转换是定义一个新的 RDD 的惰性操作,而行动则是启动一个计算,向程序返回一个值或向外部存储写入数据。

请注意,有些操作,如 join,只对键值对的 RDD 有效。另外,我们选择的函数名称与 Scala 和其他函数式语言中的其他 API 相匹配;例如,map 是一种一对一的映射,而 flatMap 将每个输入值映射到一个或多个输出(类似于 MapReduce 中的 map)。

除了这些操作符之外,用户还可以要求一个 RDD 来持久化。此外,用户可以获得一个 RDD 的分区顺序,它由一个 Partitioner 类来表示,并根据它对另一个数据集进行分区。像 groupByKey、reduceByKey 和 sort 这样的操作会自动产生一个哈希或范围分区的 RDD。

3.2 应用举例

许多机器学习算法在本质上是迭代的,因为它们运行迭代的优化程序,如梯度下降,以最大化一个函数。因此,通过将数据保存在内存中,它们的运行速度会快很多。

3.2.1 逻辑回归

作为一个例子,下面的程序实现了逻辑回归,这是一种常见的分类算法,搜索一个最能将两组点(例如,垃圾邮件和非垃圾邮件)分开的超平面 w。该算法使用梯度下降法:它从一个随机值 w 开始,在每次迭代中,它在数据上对 w 的函数进行求和,使 w 向一个能改善它的方向移动。

val points = spark.textFile(...)
.map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}

我们首先定义了一个名为 point 的持久性 RDD,它是对文本文件进行地图转换的结果,将每一行文本解析为一个 Point 对象。然后,我们在 point 上反复运行 map 和 reduce,通过对当前 w 的函数求和来计算每一步的梯度。

3.2.2 PageRank

一个更复杂的数据共享模式出现在 PageRank 中。该算法通过将链接到它的文件的贡献加起来,迭代更新每个文件的排名。在每次迭代中,每个文档都会向其邻居发送 r/n 的贡献,其中 r 是其排名,n 是其邻居的数量。然后,它将自己的排名更新为 α/N + (1 - α)∑ci,其中总和是它收到的贡献,N 是文档的总数。我们可以把 PageRank 在 Spark 中写成如下。

// Load graph as an RDD of (URL, outlinks) pairs
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs
// with the contributions sent by each page
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum)
}

图 3

这个程序导致了图 3 中的 RDD 血缘图。在每个迭代中,我们根据前一个迭代中的 contribs 和 rank 以及静态链接数据集创建一个新的 rank 数据集。这个图的一个有趣的特点是,它随着迭代次数的增加而变长。因此,在一个有许多迭代的工作中,可能有必要可靠地复制一些版本的行列,以减少故障恢复时间。用户可以调用带有 RELIABLE 标志的 persist 来做这件事。然而,请注意,链接数据集不需要被复制,因为它的分区可以通过在输入文件的块上重新运行 map 来有效地重建。这个数据集通常会比 rank 大得多,因为每个文件都有很多链接,但只有一个数字作为它的 rank,所以使用 lineage 恢复它比 checkpoint 程序的整个内存状态的系统更节省时间。

最后,我们可以通过控制 RDDs 的分区来优化 PageRank 中的通信。如果我们为链接指定一个分区(例如,按 URL 跨节点对链接列表进行散列分区),我们可以以同样的方式对行列进行分区,并确保链接和行列之间的连接操作不需要通信(因为每个 URL 的行列将和其链接列表在同一台机器上)。 我们还可以编写一个自定义的 Partitioner 类,将相互链接的页面分组在一起(例如,按域名划分 URL)。这两种优化都可以通过在我们定义链接时调用 partitionBy 来表达。

在这个初始调用之后,links 和 rank 之间的连接操作将自动汇总每个 URL 对其链接列表所在机器的贡献,在那里计算其新的 rank,并将其与链接连接起来。这种跨迭代的一致划分是 Pregel 等专业框架的主要优化之一。RDDs 让用户直接表达这个目标。

4 表示 RDDs

提供 RDDs 作为一种抽象的挑战之一是为它们选择一种表示方法,以跟踪各种转换的脉络。理想情况下,实现 RDDs 的系统应该提供尽可能丰富的转换操作符集(例如表 2 中的操作符),并让用户以任意的方式对其进行组合。我们为 RDDs 提出了一个简单的基于图形的表示法,以促进这些目标的实现。我们在 Spark 中使用这种表示法来支持广泛的转换,而不需要为每一种转换在调度器中添加特殊逻辑,这大大简化了系统设计。

简而言之,我们建议通过一个通用接口来表示每个 RDD,该接口暴露了五项信息:一组分区,这是数据集的原子块;一组对父 RDD 的依赖关系;一个基于父 RDD 计算数据集的函数;以及关于其分区方案和数据放置的元数据。例如,代表 HDFS 文件的 RDD 对文件的每个块都有一个分区,并知道每个块在哪个机器上。同时,这个 RDD 上的 map 结果具有相同的分区,但在计算其元素时,将 map 函数应用于父方的数据。我们在表 3 中总结了这个接口。

表 3

设计这个接口时最有趣的问题是如何表示 RDD 之间的依赖关系。 我们发现将依赖关系分为两种类型既充分又有用:窄依赖关系,即父 RDD 的每个分区最多只能被子 RDD 的一个分区使用;宽依赖关系,即多个子分区可能依赖它。例如,map 会导致窄依赖,而 join 会导致宽依赖关系(除非父分区是散列分区)。 图 4 显示了其他的例子。

图 4

这种区分出于两个原因是有用的。首先,窄依赖关系允许在一个集群节点上进行流水线式的执行,它可以计算所有的父分区。例如,我们可以在逐个元素的基础上应用一个 map,然后是一个过滤器。相比之下,宽依赖要求所有父分区的数据都是可用的,并使用类似 MapReduce 的操作在各节点上进行洗牌。其次,节点故障后的恢复在狭义的依赖关系中更为有效,因为只有丢失的父分区需要重新计算,而且它们可以在不同的节点上并行地重新计算。相反,在具有宽依赖关系的血缘图中,一个失败的节点可能会导致 RDD 的所有祖先的一些分区丢失,需要完全重新执行。

这种 RDD 的通用接口使得在 Spark 中用不到 20 行的代码实现大多数转换成为可能。事实上,即使是 Spark 的新用户也在不了解调度器细节的情况下实现了新的转换(例如,抽样和各种类型的连接)。我们在下面简述一些 RDD 的实现。

HDFS files:我们样本中的输入 RDDs 已经是 HDFS 中的文件。对于这些 RDDs,partitions 为文件的每个块返回一个分区(块的偏移量存储在每个 Partition 对象中),preferredLocations 给出块所在的节点,iterator 读取块。

map:在任何 RDD 上调用 map 会返回一个 MappedRDD 对象。该对象具有与其父对象相同的分区和首选位置,但在其迭代器方法中对父对象的记录应用了传递给 map 的函数。

union:在两个 RDD 上调用 union,会返回一个 RDD,其分区是父方的分区的结合。 每个子分区都是通过对相应父分区的狭义依赖来计算的。

sample:抽样与映射类似,只是 RDD 为每个分区存储一个随机数生成器种子,以确定地抽出父记录。

join:连接两个 RDD 可能导致两个窄依赖关系(如果它们都使用相同的分区器进行哈希/范围分区)、两个宽依赖关系或混合关系(如果一个父类有分区器,一个没有)。在任何一种情况下,输出的 RDD 都有一个分区器(要么是从父代继承的分区器,要么是默认的哈希分区器)。

5 实现

我们用大约 14000 行的 Scala 实现了 Spark。该系统在 Mesos 集群管理器上运行,允许它与 Hadoop、MPI 和其他应用程序共享资源。每个 Spark 程序都作为一个单独的 Mesos 应用运行,有自己的驱动(主控)和 Worker,这些应用之间的资源共享由 Mesos 处理。

Spark 可以使用 Hadoop 现有的输入插件 API 从任何 Hadoop 输入源(如 HDFS 或 HBase)读取数据,并在未经修改的 Scala 版本上运行。

我们现在勾勒出系统中几个技术上有趣的部分:我们的作业调度器(第 5.1 节),我们的Spark解释器允许交互式使用(第 5.2 节),内存管理(第 5.3 节),以及对检查点的支持(第 5.4 节)。

5.1 Job 调度

Spark 的调度器使用我们的 RDDs 表示法,在第 4 节中有所描述。

图 5

总的来说,我们的调度器与 Dryad 类似,但它还考虑到了内存中哪些持久性 RDD 的分区是可用的。每当用户在一个 RDD 上运行一个动作(如计数或保存),调度器就会检查该 RDD 的血缘图以建立一个要执行的阶段的 DAG,如图 5 所示。每个阶段都包含尽可能多的具有狭窄依赖关系的流水线转换。阶段的边界是宽依赖关系所需的洗牌操作,或任何已经计算过的分区,这些分区可以绕过父 RDD 的计算。然后,调度器启动任务来计算每个阶段的缺失分区,直到计算出目标 RDD。

我们的调度器使用延迟调度,根据数据位置将任务分配给机器。如果一个任务需要处理的分区在某个节点的内存中是可用的,我们就把它送到该节点。否则,如果一个任务处理一个包含 RDD 的分区,而该分区提供了首选位置(例如,HDFS 文件),我们就把它发送到这些分区。

对于宽依赖关系(即 shuffle 依赖关系),我们目前在持有父分区的节点上具体化中间记录,以简化故障恢复,就像 MapReduce 具体化 map 输出。

如果一个任务失败了,只要它的阶段的父节点仍然可用,我们就在另一个节点上重新运行它。如果某些阶段变得不可用(例如,因为洗牌的“map 侧”的输出丢失了),我们重新提交任务,并行计算丢失的分区。我们还不能容忍调度器故障,尽管复制 RDD 血缘图是很简单的。

最后,尽管目前 Spark 中的所有计算都是根据驱动程序中调用的动作运行的,但我们也在尝试让集群上的任务(例如 map)调用查找操作,该操作提供了对按键进行哈希分区的 RDDs 元素的随机访问。在这种情况下,任务需要告诉调度器计算所需的分区,如果它是缺失的。

5.2 Interpreter 集成

Scala 包括一个类似于 Ruby 和 Python 的交互式外壳。鉴于内存数据的低延迟性,我们希望让用户从解释器中交互式地运行 Spark,以查询大数据集。

Scala 解释器通常通过为用户输入的每一行编译一个类,将其加载到 JVM 中,并对其调用一个函数。这个类包括一个单子对象,包含该行的变量或函数,并在初始化方法中运行该行的代码。例如,如果用户输入 var x = 5,然后是 println(x),解释器定义了一个包含 x 的名为 Line1 的类,并使第二行编译为 println(Line1.getInstance().x)

我们对 Spark 中的解释器做了两个改动:

  1. 类的运送:为了让工作节点获取每一行创建的类的字节码,我们让解释器通过 HTTP 提供这些类。
  2. 修改后代码的生成:通常情况下,为每行代码创建的单子对象是通过其对应的类上的静态方法来访问。 这意味着当我们序列化一个引用前一行定义的变量的闭包时,比如上面的例子中的 Line1.x,Java 不会通过对象图来追踪包裹着 x 的 Line1 实例,因此,工作节点不会收到 x。

图 6

图 6 显示了在我们的修改之后,解释器是如何将用户输入的一组行翻译成 Java 对象的。

我们发现 Spark 解释器在处理作为我们研究的一部分而获得的大型痕迹和探索存储在 HDFS 中的数据集方面非常有用。我们还计划用它来交互地运行更高级别的查询语言,如 SQL。

5.3 内存管理

Spark 为持久性 RDDs 的存储提供了三种选择:作为反序列化 Java 对象的内存存储,作为序列化数据的内存存储,以及磁盘存储。第一个选项提供了最快的性能,因为 Java 虚拟机可以直接访问每个 RDD 元素。第二种方案让用户在空间有限的情况下选择比 Java 对象图更节省内存的表示方法,但代价是性能较低。第三种方案对于那些太大而无法保存在 RAM 中,但每次使用都要重新计算的 RDD 是很有用的。

为了管理有限的可用内存,我们在 RDDs 层面使用 LRU 驱逐策略。当一个新的 RDD 分区被计算出来,但没有足够的空间来存储它时,我们从最近访问次数最少的 RDD 中驱逐一个分区,除非这个分区与有新分区的 RDD 相同。在这种情况下,我们将旧分区保留在内存中,以防止同一 RDD 的分区循环进出。这一点很重要,因为大多数操作将在整个 RDD 上运行任务,所以很可能将来会需要已经在内存中的分区。我们发现这个默认策略在目前所有的应用中都运行良好,但我们也通过每个 RDD 的“持久性优先级”给用户提供了进一步的控制。

最后,集群上的每个 Spark 实例目前都有自己的独立内存空间。在未来的工作中,我们计划研究通过一个统一的内存管理器在 Spark 的各个实例之间共享 RDD。

5.4 支持 Checkpointing

尽管在故障后总是可以使用血缘来恢复 RDD,但对于血缘较长的 RDD 来说,这种恢复可能很费时间。因此,存档一些 RDDs 到稳定的存储是有帮助的。

一般来说,存档点对于具有包含宽依赖关系的长血缘图的 RDD 是有用的,例如我们 PageRank 例子中的等级数据集(第 3.2.2 节)。在这些情况下,集群中的一个节点故障可能会导致每个父 RDD 的一些数据片断丢失,需要重新进行全面计算。相反,对于对稳定存储中的数据依赖性较小的 RDD,如我们的逻辑回归例子中的点(第 3.2.1 节)和 PageRank 中的 links 列表,存档点可能永远不值得。如果一个节点发生故障,这些 RDDs 中丢失的分区可以在其他节点上并行地重新计算,其成本只是复制整个 RDD 的一小部分。

Spark 目前提供了一个用于存档点的 API(一个用于持久化的 REPLIATE 标志),但将存档点的数据决定权留给了用户。然而,我们也在研究如何进行自动检查点。因为我们的调度器知道每个数据集的大小以及第一次计算的时间,它应该能够选择一组最佳的 RDD 来存档点,以最小化系统恢复时间。

最后,请注意,RDDs 的只读性使其比一般的共享内存更容易存档。由于一致性不是一个问题,RDDs 可以在后台写出,而不需要程序暂停或分布式快照方案。

6 评估

我们通过在 Amazon EC2 上的一系列实验以及用户应用的基准,对 Spark 和 RDDs 进行评估。总的来说,我们的结果显示如下:

  • 在迭代式机器学习和图形应用中,Spark 的性能比 Hadoop 高 20 倍。速度的提升来自于通过将数据以 Java 对象的形式存储在内存中,避免了 I/O 和反序列化的成本。
  • 由我们的用户编写的应用程序性能和规模都很好。特别是,我们使用 Spark 将一个在 Hadoop 上运行的分析报告的速度提高了 40 倍。
  • 当节点发生故障时,Spark 可以通过只重建丢失的 RDD 分区来快速恢复。
  • Spark 可以用来交互式地查询一个 1 TB 的数据集,延迟为 5-7 秒。

我们首先介绍了针对 Hadoop 的迭代机器学习应用(第 6.1 节)和 PageRank(第 6.2 节)的基准测试。然后,我们评估了 Spark 的故障恢复(第 6.3 节)和数据集不适合内存时的行为(第 6.4 节)。最后,我们讨论了用户应用(第 6.5 节)和交互式数据挖掘(第 6.6 节)的结果。

除非另有说明,我们的测试使用了具有 4 个内核和 15 GB 内存的 m1.xlarge EC2 节点。我们使用 HDFS 进行存储,有 256 MB 的块。在每次测试之前,我们清除了操作系统的缓冲缓存,以准确测量 IO 成本。

6.1 迭代性机器学习应用

我们实现了两个迭代机器学习应用,逻辑回归和 K-means,以比较以下系统的性能。

  • Hadoop:Hadoop 0.20.2 稳定版。
  • HadoopBinMem:一个 Hadoop 部署,在第一个迭代中将输入数据转换为低开销的二进制格式,以消除后面的文本解析,并将其存储在内存 HDFS 实例中。
  • Spark:我们对 RDDs 的实现。

我们使用 25-100 台机器在 100 GB 的数据集上运行了两种算法,进行了 10 次迭代。这两个应用程序之间的关键区别是它们在每一字节的数据上进行的计算量。k-means 的迭代时间由计算主导,而逻辑回归的计算密集度较低,因此对反序列化和 I/O 的时间更加敏感。

由于典型的学习算法需要几十次迭代才能收敛,我们分别报告了第一次迭代和后续迭代的时间。我们发现,通过 RDDs 共享数据大大加快了未来的迭代。

图 7

第一次迭代 所有三个系统在第一次迭代中都从HDFS读取文本输入。如图 7 中的光条所示,Spark 在整个实验中比 Hadoop 快一些。这种差异是由于 Hadoop 的心跳协议在其主站和工作者之间的信号开销造成的。HadoopBinMem 是最慢的,因为它运行了一个额外的 MapReduce 作业,将数据转换为二进制,并不得不通过网络将这些数据写入一个复制的内存 HDFS 实例。

图 8

后续迭代 图 7 还显示了后续迭代的平均运行时间,而图 8 显示了这些时间是如何随集群大小而变化的。对于逻辑回归,Spark 在 100 台机器上比 Hadoop 和 HadoopBinMem 分别快 25.3 倍和 20.7 倍。对于计算量更大的 k-means 应用,Spark 仍然实现了 1.9 倍到 3.2 倍的速度提升。

了解速度的提升 我们惊讶地发现,Spark 的表现甚至超过了带有二进制数据内存存储的 Hadoop(HadoopBinMem),差距达到了 20 倍。在 HadoopBinMem 中,我们使用了 Hadoop 的标准二进制格式(SequenceFile)和 256 MB 的块大小,并且我们强迫 HDFS 的数据目录在内存文件系统中。然而,由于几个因素,Hadoop 的运行速度仍然较慢。

  1. Hadoop 软件栈的最小开销。
  2. HDFS 在服务数据时的开销,以及
  3. 将二进制记录转换为可用的内存 Java 对象的反序列化成本。

我们依次调查了这些因素中的每一个。为了衡量(1),我们运行了无操作的 Hadoop 作业,发现这些作业在完成作业设置、启动任务和清理等最低要求时,至少会产生 25 秒的开销。 关于(2),我们发现 HDFS 进行了多次内存拷贝和校验,以服务于每个块。

最后,为了测量(3),我们在一台机器上进行了微测试,在 256 MB 的不同格式的输入上运行逻辑回归的计算。特别是,我们比较了处理来自 HDFS(HDFS 堆栈的开销会体现出来)和内存中的本地文件(内核可以非常有效地将数据传递给程序)的文本和二进制输入的时间。

图 9

我们在图 9 中展示了这些测试的结果。内存中的 HDFS 和本地文件之间的差异显示,通过 HDFS 的读取引入了 2 秒钟的开销,即使数据在本地机器的内存中。文本和二进制输入之间的差异表明,解析开销为 7 秒。最后,即使从内存文件中读取,将预先解析的二进制数据转换为 Java 对象也需要 3 秒,这仍然几乎与逻辑回归本身一样昂贵。通过将 RDD 元素直接作为 Java 对象存储在内存中,Spark 避免了所有这些开销。

6.2 PageRank

图 10

我们使用 54 GB 的维基百科转储来比较 Spark 与 Hadoop 在 PageRank 方面的性能。我们对 PageRank 算法进行了 10 次迭代,以处理大约 400 万篇文章的链接图。图 10 显示,在 30 个节点上,仅内存存储就为 Spark 提供了比 Hadoop 高出 2.4 倍的速度。 此外,如第 3.2.2 节所述,控制 RDDs 的分区,使其在不同的迭代中保持一致,将速度提高到 7.4 倍。结果也几乎线性地扩展到 60 个节点。

我们还评估了使用我们在 Spark 上的 Pregel 实现编写的 PageRank 版本,我们在第 7.1 节中描述了这一版本。迭代时间与图 10 中的相似,但长了大约 4 秒,因为 Pregel 在每次迭代时都会运行一个额外的操作,让顶点“投票”决定是否完成工作。

6.3 失败恢复

图 11

我们评估了在 k-means 应用中出现节点故障后,使用血缘重构 RDD 分区的成本。图 11 比较了一个 75 个节点的集群在正常运行情况下的 10 次 k-means 迭代的运行时间,以及一个节点在第 6 次迭代开始时发生故障的情况。在没有任何故障的情况下,每个迭代由 400 个任务组成,工作在 100 GB 的数据上。

直到第 5 次迭代结束,迭代时间约为 58 秒。在第 6 次迭代中,其中一台机器被杀,导致运行在该机器上的任务和存储在那里的 RDD 分区丢失。Spark 在其他机器上重新并行运行这些任务,它们重新读取相应的输入数据,并通过线程重建 RDD,这使得迭代时间增加到 80 秒。一旦丢失的 RDD 分区被重建,迭代时间回落到 58 秒。

请注意,在基于存档点的故障恢复机制下,恢复可能需要重新运行至少几个迭代,这取决于存档点的频率。此外,系统需要在网络上复制应用程序的 100 GB 工作集(文本输入数据转换为二进制),要么消耗两倍于 Spark 的内存在 RAM 中进行复制,要么不得不等待将 100 GB 写入磁盘。相比之下,我们例子中的 RDDs 的血缘图的大小都小于 10KB。

6.4 内存不足时的表现

图 12

到目前为止,我们确保集群中的每台机器都有足够的内存来存储整个迭代过程中的所有 RDDs。一个自然的问题是,如果没有足够的内存来存储作业的数据,Spark 如何运行。在这个实验中,我们将 Spark 配置为不使用超过一定比例的内存来存储每台机器上的 RDDs。我们在图 12 中展示了逻辑回归的各种存储空间量的结果。我们看到,随着空间的减少,性能会优雅地下降。

6.5 Spark 构建的用户应用

内存分析 Conviva 公司,一家视频发行公司,使用 Spark 来加速一些以前在 Hadoop 上运行的数据分析报告。例如,一个报告作为一系列的 Hive 查询运行,计算客户的各种统计数据。这些查询都是针对相同的数据子集(与客户提供的过滤器相匹配的记录),但在不同的分组字段上执行聚合(平均数、百分比和 COUNT DISTINCT),需要单独的 MapReduce 作业。通过在 Spark 中实现查询,并将它们之间共享的数据子集一次性加载到一个 RDD 中,该公司能够将报告的速度提高 40 倍。一份关于 200 GB 压缩数据的报告,在 Hadoop 集群上需要 20 个小时,现在只用两台 Spark 机器就能在 30 分钟内运行。此外,Spark 程序只需要 96 GB 的内存,因为它只在 RDD 中存储与客户的过滤器相匹配的行和列,而不是整个解压文件。

图 13

流量建模 伯克利移动千年项目的研究人员并行化了一种学习算法,用于从零星的汽车 GPS 测量数据中推断出道路交通拥堵情况。源数据是一个大都市地区的 10,000 个链接的道路网络,以及 600,000 个装有 GPS 的汽车的点对点旅行时间样本(每个路径的旅行时间可能包括多个道路链接)。使用一个交通模型,该系统可以估计出穿越单个道路链接所需的时间。研究人员使用期望最大化(EM)算法训练这个模型,该算法迭代地重复了两个地图和 reduceByKey 步骤。如图 13(a)所示,该应用从 20 到 80 个节点,每个节点有 4 个核心,几乎是线性扩展。

推特垃圾邮件分类 伯克利的 Monarch 项目使用 Spark 来识别 Twitter 信息中的垃圾链接。他们在 Spark 上面实现了一个逻辑回归分类器,与第 6.1 节中的例子类似,但他们使用了一个分布式的 reduceByKey 来并行求和梯度向量。在图 13(b)中,我们展示了在 50 GB 的数据子集上训练分类器的缩放结果。25 万个 URL 和 107 个与每个 URL 的网络和内容属性相关的特征/维度。由于每次迭代的固定通信成本较高,所以扩展性没有那么接近线性。

6.6 交互式数据挖掘

为了证明 Spark 在交互式查询大数据集方面的能力,我们用它来分析 1 TB 的维基百科页面浏览日志(2 年的数据)。在这个实验中,我们使用了 100 个 m2.4xlarge EC2 实例,每个实例有 8 个内核和 68 GB 的内存。我们运行查询,以找到(1)所有页面的总浏览量,(2)标题完全匹配给定单词的页面,以及(3)标题部分匹配单词的页面。 每个查询都扫描了整个输入数据。

图 14

图 14 显示了在全部数据集和一半及十分之一的数据上查询的响应时间。即使是 1 TB 的数据,在 Spark 上查询也需要 5-7 秒。这比使用磁盘上的数据要快一个数量级以上;例如,从磁盘上查询 1 TB 的文件需要 170 秒。这说明,RDDs 使 Spark 成为交互式数据挖掘的强大工具。

7 讨论

尽管 RDDs 由于其不可改变的性质和粗粒度的转换,似乎提供了一个有限的编程界面,但我们发现它们适用于广泛的应用类别。特别是,RDD 可以表达数量惊人的集群编程模型,这些模型迄今为止被作为单独的框架提出,使用户能够在一个程序中组成这些模型(例如,运行 MapReduce 操作来构建一个图,然后在其上运行 Pregel),并在它们之间共享数据。在这一节中,我们讨论了 RDDs 可以表达哪些编程模型,以及它们为什么如此广泛地适用(第 7.1 节)。此外,我们还讨论了我们所追求的 RDDs 中的行文信息的另一个好处,即为跨这些模型的调试提供便利(第 7.2 节)。

7.1 表达现有的编程模型

RDDs 可以有效地表达迄今已被单独提出的一些集群编程模型。所谓 “高效”,我们的意思是,不仅 RDD 可以用来产生与这些模型中编写的程序相同的输出,而且 RDD 还可以捕捉这些框架所执行的优化,例如将特定数据保留在内存中,对其进行分区以最小化通信,并有效地从故障中恢复。可使用 RDDs 表达的模型包括。

MapReduce 这个模型可以用 Spark 中的 flatMap 和 groupByKey 操作来表达,如果有一个组合器,则可以用 reduceByKey 来表达。

DryadLINQ DryadLINQ 系统在更通用的 Dryad 运行时上提供了比 MapReduce 更广泛的操作,但这些都是直接对应于 Spark 中可用的 RDD 转换的批量操作(map、groupByKey、join 等)。

SQL 像 DryadLINQ 表达式一样,SQL 查询在记录集上形成数据并行操作。

Pregel 谷歌的 Pregel 是一个专门用于迭代图应用的模型,初看起来与其他系统中的面向集合的编程模型有很大不同。在 Pregel 中,一个程序以一系列联合的“超级步骤”的形式运行。在每个超级步骤中,图中的每个顶点都运行一个用户函数,该函数可以更新与顶点相关的状态,改变图的拓扑结构,并向其他顶点发送消息,以便在下一个超级步骤中使用。 这个模型可以表达许多图的算法,包括最短路径、双点匹配和 PageRank。

让我们用 RDD 实现这个模型的关键观察点是,Pregel 在每次迭代中对所有顶点应用相同的用户函数。因此,我们可以将每次迭代的顶点状态存储在一个 RDD 中,并进行批量转换(flatMap)来应用这个函数,生成一个信息的 RDD。然后,我们可以将这个 RDD 与顶点状态连接起来,以执行消息的改变。同样重要的是,RDD 允许我们像 Pregel 那样将顶点状态保存在内存中,通过控制它们的分区来最小化通信,并在故障时支持部分恢复。我们将 Pregel 作为 Spark 上面的一个 200 行库来实现,更多细节请读者参考。

Iterative MapReduce 最近提出的几个系统,包括 HaLoop 和 Twister,提供了一个迭代 MapReduce 模型,用户给系统一系列的 MapReduce 作业来循环。这些系统在不同的迭代中保持数据分区的一致性,Twister 也可以将数据保存在内存中。这两种优化都很容易用 RDD 来表达,我们能够用 Spark 将 HaLoop 实现为一个 200 行的库。

Batched Stream Processing 研究人员最近提出了几个增量处理系统,用于定期用新数据更新结果的应用。例如,一个应用程序每 15 分钟更新一次关于广告点击的统计数据,应该能够将前 15 分钟窗口的中间状态与新日志的数据结合起来。这些系统执行与 Dryad 类似的批量操作,但在分布式文件系统中存储应用状态。将中间状态放在 RDDs 中会加快它们的处理速度。

解释 RDDs 的表达能力 为什么 RDD 能够表达这些不同的编程模型?原因是,在许多并行应用中,对 RDDs 的限制影响不大。特别是,尽管 RDDs 只能通过批量转换来创建,但许多并行程序自然会对许多记录应用相同的操作,从而使它们易于表达。同样地,RDDs 的不变性也不是一个障碍,因为人们可以创建多个 RDDs 来表示同一个数据集的版本。事实上,今天的许多 MapReduce 应用是在不允许更新文件的文件系统上运行的,比如 HDFS。

最后一个问题是,为什么以前的框架没有提供同样水平的通用性。我们认为,这是因为这些系统探索了 MapReduce 和 Dryad 不能很好处理的具体问题,如迭代,而没有观察到这些问题的共同原因是缺乏数据共享的抽象。

7.2 利用 RDDs 进行调试

虽然我们最初设计的 RDDs 是可确定地重新计算以实现容错,但这一特性也有利于调试。特别是,通过记录作业期间创建的 RDDs 的血缘,我们可以(1)以后重建这些 RDDs,并让用户交互式地查询它们;(2)通过重新计算它所依赖的 RDD 分区,在单进程调试器中重新运行作业的任何任务。与一般分布式系统的传统重放调试器不同,它必须捕捉或推断多个节点上的事件顺序,这种方法几乎没有增加任何记录开销,因为只需要记录 RDD 血缘关系。

8 相关工作

集群编程模型 集群编程模型的相关工作可分为几类。首先,数据流模型,如 MapReduce、Dryad 和 Ciel,支持处理数据的丰富运算符集,但通过稳定的存储系统共享数据。 RDDs 代表了比稳定存储更有效的数据共享抽象,因为它们避免了数据复制、I/O 和序列化的成本。

其次,几个数据流系统的高级编程接口,包括 DryadLINQ 和 FlumeJava,提供了语言集成的 API,用户通过 map 和 join 等操作符来操作 “并行集合”。然而,在这些系统中,并行集合代表了磁盘上的文件或用于表达查询计划的短暂数据集。尽管这些系统会在同一查询中跨运算符进行数据输送(例如,一个 map 接着另一个 map),但它们不能在不同的查询中有效地分享数据。 我们将 Spark 的 API 建立在并行集合模型的基础上,因为它很方便,并不要求语言集成接口的新颖性,但通过提供RDD作为这个接口背后的存储抽象,我们允许它支持更广泛的应用类别。

第三类系统为需要数据共享的特定类别的应用提供高级接口。 例如,Pregel 支持迭代图应用,而 Twister 和 HaLoop 是迭代的 MapReduce 运行。然而,这些框架为它们支持的计算模式隐含地执行了数据共享,并没有提供一个通用的抽象,用户可以用它来在她选择的操作中共享她选择的数据。例如,用户不能使用 Pregel 或 Twister 将一个数据集加载到内存中,然后决定对其运行什么查询。RDD 明确地提供了一个分布式存储抽象,因此可以支持这些专门系统所不具备的应用,如交互式数据挖掘。

最后,一些系统暴露了共享的可变状态,允许用户在内存中进行计算。 例如,Piccolo 让用户运行读取和更新分布式哈希表单元的并行函数。分布式共享内存(DSM)系统和 RAMCloud 等键值存储提供了一个类似的模型。RDDs 在两个方面与这些系统不同。 首先,RDDs 提供了一个基于 map、sort 和 join 等操作符的高级编程接口,而 Piccolo 和 DSM 的接口只是对表单元的读取和更新。其次,Piccolo 和 DSM 系统通过存档点和回滚实现恢复,这在许多应用中比 RDD 的基于行的策略更昂贵。最后,正如第 2.3 节所讨论的,RDDs 还提供了比 DSM 更多的优势,如减少散兵游勇。

缓存系统 Nectar 可以通过程序分析识别共同的子表达式,在 DryadLINQ 作业中重复使用中间结果。这种能力将引人注目地添加到基于 RDD 的系统中。 然而,Nectar 不提供内存缓存(它将数据放在一个分布式文件系统中),也不允许用户明确控制哪些数据集要持久化以及如何划分它们。Ciel 和 FlumeJava 同样可以缓存任务结果,但不提供内存缓存或明确控制哪些数据被缓存。

Ananthanarayanan 等人提出在分布式文件系统中加入内存缓存,以利用数据访问的时间和空间定位性。虽然这种解决方案提供了对已经在文件系统中的数据的更快访问,但它并不像 RDDs 那样是一种在应用中共享中间结果的有效手段,因为它仍然需要应用在各阶段之间将这些结果写入文件系统。

血缘关系 捕捉数据的脉络或来源信息长期以来一直是科学计算和数据库的一个研究课题,其应用包括解释结果、允许他人复制、以及在工作流程中发现错误或数据集丢失时重新计算数据。我们请读者参考和对这项工作的调查。 RDDs 提供了一个并行编程模型,在这个模型中,细粒度的血缘不容易被捕获,因此它可以被用于故障恢复。

我们基于血缘关系的恢复机制也类似于 MapReduce 和 Dryad 中计算(作业)内使用的恢复机制,后者追踪任务 DAG 之间的依赖关系。然而,在这些系统中,作业结束后,行式信息就会丢失,这就需要使用一个复制的存储系统来共享计算中的数据。相比之下,RDDs 应用行式在计算中有效地保存内存数据,而没有复制和磁盘 I/O 的成本。

关系型数据库 RDDs 在概念上类似于数据库中的视图,而持久的 RDDs 类似于物化的视图。然而,与 DSM 系统一样,数据库通常允许对所有记录进行细粒度的读写访问,需要对操作和数据进行记录以实现容错,并需要额外的开销来维持一致性。而 RDDs 的粗粒度转换模型则不需要这些开销。

9 结论

我们提出了弹性分布式数据集(RDDs),这是一个高效、通用和容错的抽象,用于在集群应用中共享数据。RDDs 可以表达广泛的并行应用,包括许多已经提出的用于迭代计算的专门编程模型,以及这些模型没有捕获的新应用。与现有的集群存储抽象不同的是,RDDs 提供了一个基于粗粒度转换的 API,可以让它们使用线程有效地恢复数据。我们已经在一个名为 Spark 的系统中实现了 RDDs,该系统在迭代应用中的性能比 Hadoop 高 20 倍,并且可以交互式地用于查询数百千兆字节的数据。

我们在 spark-project.org 上开源了 Spark,作为可扩展数据分析和系统研究的载体。

感谢

我们感谢第一批 Spark 用户,包括 Tim Hunter、Lester Mackey、Dilip Joseph 和 Jibin Zhan,他们在实际应用中尝试了我们的系统,提供了许多好的建议,并在此过程中发现了一些研究上的挑战。我们也感谢我们的牧羊人 Ed Nightingale 和我们的审稿人的反馈。这项研究得到了伯克利 AMP 实验室赞助商谷歌、SAP、亚马逊网络服务、云时代、华为、IBM、英特尔、微软、NEC、NetApp 和 VMWare、DARPA(合同#FA8650-11-C-7136)、谷歌博士奖学金以及加拿大自然科学和工程研究委员会的部分支持。