《Apache Tez: 用于建模和构建数据处理应用程序的统一框架》 论文

2022-09-11

《Apache Tez: A Unifying Framework for Modeling and Building Data Processing Applications》 论文本地文件(PDF, 1.8 MB)

作者:Bikas Saha, Hitesh Shah, Siddharth Seth, Gopal Vijayaraghavanh, Arun Murthyh, Carlo Curino

Hortonworks, Microsoft

摘要

Hadoop 的广泛成功已经引导出了一个以 YARN 资源管理层为基础的,快速发展且多样的应用引擎生态系统。 开源实现的 MapReduce 框架正慢慢地被一系列专门用于特定垂直领域的引擎所取代。 这导致了越来越多的分散和重复的工作,每一个新的垂直引擎都需要从头重新实现基本功能(例如:容错性、安全性、处理掉队任务等)。

在本文中,我们介绍了 Apache Tez,这是一个开源的框架,旨在建立数据流驱动的工作。 Tez 提供了一个脚手架和库组件,可用于迅速建立可扩展和高效的以数据流为中心的引擎。 我们设计的核心是促进组件的重复使用,而不妨碍在性能导向的数据平面上可定制性。 事实上,这是与上一代系统(如 Dryad,MapReduce)相比的关键区别,也是与新兴系统(如 Spark)的关键区别,后者提供并规定了一个固定的数据平面实现。 此外,Tez 还提供了原生支持来构建运行时优化,例如 Hive 的动态分区修剪。

Tez 已经部署在雅虎、微软 Azure、LinkedIn 和众多 Hortonworks 客户的网站上,而且越来越多的引擎正在与之集成。 这证实了我们的直觉,即大多数流行的垂直引擎可以利用一套核心的构建块。 我们用定量的实验证据来补充现实世界的采用情况,在流行的基准测试(TPC-DS,TPC-H)和生产工作负载中,基于 Tez 的 Hive、Pig、Spark 和 Cascading 的实现要优于其原始基于 YARN 的实现。

1. 引言

大规模数据分析曾经是大型网络公司专用的特殊技术,如今已成为大多数现代组织可用和不可缺少的。 这一更广泛的用户群促进了对这一领域的兴趣,并导致了一个蓬勃发展的大数据产业。 在本文中,我们使用 Hadoop 生态系统的视角来描述整个行业的趋势,因为这为介绍我们的 Apache Tez 系统提供了理想的背景。 我们将把与相关项目,如 Dryad、Nephele、Hyracks [24, 14, 15] 等,更广泛的比较推迟到第 8 节, 这些项目不可否认地成为了 Tez 设计的灵感来源,有时甚至是蓝图。

Hadoop,最初被设计为一个单一用途的系统(运行 MapReduce 作业以建立网络索引),现在已经演变成一个万能的数据分析系统。 这个工作的第一阶段是在 MapReduce [21] 上提出了更高层次的抽象,其中的例子有 Hive [30]、Pig [28] 和 Cascading [4]。 这加快了 Hadoop 的应用,但也导致了效率低下和性能不佳 [29]。 这些限制和对更多灵活性和效率的压力导致了 Hadoop 被重新调整为一个通用的、类似操作系统的资源管理层,即 YARN [32],以及一个允许任意执行引擎的应用框架层。 这使得不同的应用程序可以共享一个集群,并使 MapReduce 仅为 Hadoop 生态系统中的另一个应用。 重要的例子有摆脱了 MapReduce 模型(和 runtime)的 Spark [38],Impala [25] 和 Flink [5]。 这加快了创新,但也导致了一个效率较低的生态系统,相同的通用功能在不同的框架中被重复创造,例如,MapReduce 和 Spark 独立开发了实现延迟调度的机制 [36]。

在本文中,我们介绍了 Tez,这个项目接受了向 YARN 的架构转变,并进一步推动了它,提出了一个可重用的、灵活的和可扩展的脚手架。 该脚手架可以支持面向数据流的框架,同时避免复制功能。 Tez 的 API 允许框架使用最少的代码对数据流程图的逻辑和物理语义进行清晰的建模。 需要澄清的是,Tez 是一个建立基于数据流的 runtime/引擎的库,而不是一个引擎本身。 例如,Hadoop 的 Hive runtime 引擎已经在 0.13 版本中被重写以使用 Tez。

Tez 的主要贡献如下:

  1. 允许用户将计算建模为一个 DAG(有向无环图),类似于 Dryad/Nephele/Hyracks。它的创新之处在于对经典的顶点和边缘概念做了更精细的分解,为数据平面提供了更大的控制和扩展性。
  2. 提供 API 来动态发展(更细粒度的)DAG 定义。这使得复杂的运行时查询操作成为可能,例如根据在线信息修剪数据分区。
  3. 为最先进的功能提供可扩展和高效的实现,例如,与 YARN 兼容的安全性、数据本地化的感知、资源再利用、容错和推测。
  4. 为框架编写者和研究人员提供了快速创新的机会,并通过可插拔的实验支持来创造现实世界的影响,并提供一个开放源码的社区来了解该项目并做出贡献。

Tez与许多其他的“统一框架”提案不同之处在于 1)经过验证的灵活性和动态适应性,2)对操作问题的关注(生产准备就绪),以及 3)由社区驱动的将 Tez 嵌入到多个现有的特定领域的引擎中。

这一点从 Tez 对 MapReduce、Hive、Pig、Spark、Flink、Cascading 和 Scalding 的支持,以及它在雅虎、微软 Azure、LinkedIn 以及其他一些使用 Hortonworks 数据平台的组织的生产数据处理集群的采用中得以验证。 除了讨论 Tez 的广泛实际应用外,我们展示了其支持 Hive、Pig 和 Spark 在运行标准的基准测试的能力,如 TPC-H 和 TPC-DS,以及在雅虎公司的生产工作负载。

本文的其余部分组织如下:第 2 节提供了更多的历史背景,以及设计 Tez 的理由。 而第 3 节则介绍了 Tez 的架构。第 4 节讨论了 Tez 的实现,并强调了对效率和生产的实用性考虑。 第 5 节和第 6 节致力于证明它的实际意义,介绍了真实的应用,以及广泛的实验评估。 最后,我们在第 7 和第 8 节中讨论了未来和相关的工作,并在第 9 节中得出结论。

2. 背景和理由

为了理解设计 Tez 的动机和原理,我们必须先提供一些术语的背景,以及 Hadoop 中分布式计算的历史。 对这个历史和动机的观点不感兴趣的读者,请直接跳到第 3 节,在那里我们将深入探讨 Tez 架构的技术问题。

术语。到目前为止,我们一直使用图的术语,来吸引读者的直觉。现在我们更精确地介绍我们的术语:

  • DAG:有向无环图,代表数据处理工作流程的结构。数据沿着边的方向流动。
  • 顶点(Vertex):代表处理流程上的一个逻辑步骤。一个处理步骤通过应用应用程序提供的代码来转换数据,过滤或修改数据。
  • 逻辑 DAG:逻辑 DAG 是由一组顶点组成的,其中每个顶点代表计算的一个特定步骤。
  • 任务(Task):代表顶点中的一个工作单元。在分布式处理中,由单个顶点代表的逻辑工作在物理上被执行为一组任务(tasks),这些任务可能在集群的多个机器上运行。每个任务都是顶点的实例化,它处理该顶点的输入数据的一个子集(或分区)。
  • 物理 DAG:物理 DAG 包括一组任务,它们由逻辑 DAG 的顶点扩展而成。
  • 边(Edge):代表生产者和消费者之间的数据移动。逻辑 DAG 顶点之间的边表示它们之间的逻辑数据依赖关系。物理 DAG 中任务之间的边代表任务之间的数据传输。

这适用于不同的步骤可以被分割成更小的部分,从而可以被并行处理的问题。通常情况下,分区与分布式的分片数据相一致,并试图将处理过程与分片数据放在一起,从而减少计算的成本 [21]。

Hadoop 1 和 Hadoop 2(YARN)。Hadoop 从一个 MapReduce 是其上唯一的执行引擎的单体软件栈起步 [32]。所有的数据处理的行为需要将逻辑转换成一个 MapReduce job 或者一系列的 MapReduce jobs 上。 MapReduce 也负责集群资源的管理和分配。Hadoop 2 是当前一代的 Hadoop,它创造出一个名为 YARN 的通用型的资源管理层来承接分离出的这些职责 [32]。 这个从 Hadoop 核心平台上分离出来的应用允许除了 MapReduce 之外的多种类型的应用跑在 Hadoop 集群上。之前的很多领域相关应用都依赖于 MapReduce 来执行它们的逻辑,比如 Apache Hive 用作 SQL-like 数据处理,Apache Pig 用作 ETL 脚本,Cascading 用作 Java 写的数据处理应用。现在这些应用都可以更定制化的实现它们的逻辑并且原生地跑在 YARN 上。

Evolution of Hadoop

虽然定制化能带来性能上的优势,但有很大的可能来创建一套通用的构件块,这些应用可以使用这些构件块以实现在 YARN 上的自定义代码。 我们试图通过 Tez 来抓住这个机会,这一点接下来会讨论。对流行的 Hadoop 生态系统应用(诸如 Apache Hive、Pig、Spark 等)的分析表明,有一些通用的功能是它们都需要的。 这些功能包括从 YARN 协商资源,以运行应用程序的任务,处理集群内的安全问题,从硬件故障中恢复,发布度量和统计数据等。 这其中有很多是高度专业化、很难的基础设施,每个人从头开始构建时都必须复制这些基础设施。 一个共同的实现使编写应用程序更容易,因为它消除了应用程序编写者的负担,让他们专注于其应用程序的独特逻辑。

此后,除非另有说明,当我们提到 Hadoop 时,我们指的是以 YARN 作为底层资源分配层的 Hadoop 2 计算栈;而提到 Hadoop 生态系统则意味着包括在 YARN 上运行的开源和商业项目,如 Apache Hive、Pig 等。

提供这些共同特征的努力需要创建一个框架来表达和优化这些工作负载的模型。 然后这个模型可以通过一个共享的底层库在 YARN 应用框架上应用和执行。 以下是对这样一个共享库的合理要求,我们之前已经通过与 MapReduce 的比较强调了这一点,一个充当共享底层的通用引擎。

表达能力。MapReduce 有一个简单的建模 API 来描述计算, 它通过要求所有的应用算法转化为 map 和 reduce 函数来描述计算的情况。 正如其他学者所观察到的 [24, 14, 15],这太有约束性了,而面向 DAG 的模型可以更自然地捕获更广泛的组合。因此,我们将 Tez 的中心模型也定义为围绕 DAG 的执行。 此外,MapReduce 还为运行在 map/reduce 步骤中的逻辑提供了内置的语义,并强加了一个数据在 map 和 reduce 步骤之间的 sortedpartitioned 移动。 这些内置的语义,在一些核心用例中是合理的,在许多其他情况下可能是纯粹的开销,甚至在某些情况下是不需要的。 需要注意的是,这里需要一个 API 来描述任意 DAG 的结构,而不会添加一些不相关的语义到 DAG 结构中。

数据平面的可定制性。一旦分布式计算的结构被定义,就可以有在该结构中执行的实际逻辑的各种实现。这些不同之处可能是算法上的,例如,不同的数据分区方式,或与使用不同的硬件有关,例如,如果可以的话使用远程内存访问(RDMA)。 如果在 MapReduce 中,引擎的内置语义使得这样的定制化很难,因为它们侵入了引擎本身的实现。其次,在集群上执行 MapReduce 作业的单体结构使得插入独立的的实现很困难。 这就促使由数据转换和数据移动定义出的数据平面需要完全可定制。 还需要可以对任务执行的不同方面进行建模,以允许个别的执行的不同方面,例如读取输入、处理数据等等,可以很容易地进行定制。 我们采访了 Hadoop 社区的几位成员,证实了对现有引擎的改进(例如,改变 MapReduce 中的 shuffle 行为)是很有必要的。

虽然其他框架已经支持了 DAG 中的更一般概念,但是它们也有与 MapReduce 一样的局限性,内置的语义和数据平面的实现。而 Tez 能提供更低层次的抽象,这使得这样的语义和特殊实现能基于基础的共享脚手架来实现。

后绑定的 runtime 优化。应用从性能上考虑它们的数据处理逻辑时需要做出后绑定的决定。算法应该能基于被读取到的数据动态地变化,比如 join 的策略和扫描的机制。如果应用更好地理解它地数据和环境,就能更好地调整分区数和工作分工。Hadoop 集群的使用和负载特征上可以非常动态。用户和 jobs 持续地进出集群,它们的资源利用率却各不相同。这使得一个应用很难基于当前的集群状态去确定它的运行特征。我们把 Tez 设计成可以在 runtime 时更新核心抽象以使后绑定和在线决策更容易实现。

这就是我们对建立 Tez 的历史背景和合理性的概述。我们现在转向 Tez 的高层次架构,并且提供对关键构建块的一些看法。

3. 架构

Apache Tez 被设计和开发成专注于解决上面讨论过的这些问题,简而言之,1)底层模型的表达,2)数据平面的可定制性,3)促进 runtime 优化。与其建立一个通用计算引擎,我们意识到需要 Tez 去提供一个统一的框架来创建专门的引擎,以便为他们的特定需求定制数据处理。Tez 解决了通用但却困难的问题,编排和在 Hadoop 上执行分布式数据处理应用,使得应用可以专注于提供特定的语义和优化。应用层和 Tez 库层的关注点有着清晰的分界。Apache Tez 提供了集群资源的协调,容错性,资源弹性,安全性,内置的性能优化以及一个可随时使用的共享组件库。应用程序提供自定义应用逻辑,自定义数据平面和专门的优化。

这就带来了三个主要的好处:1)摊销开发成本(Hive 和 Pig 在 6 个月内用 Tez 库完全重写了它们的引擎),2)改进的性能(我们将在第 6 节展示用 Tez 有最高到 10 倍的性能改进),3)让未来多个引擎的管道更高效,因为它们共享了一个底层库。

Tez 由一组定义数据处理的核心 API 和一个在集群上启动的编排框架。应用程序应该实现这些 API 为编排框架提供执行环境。把 Tez 当作一个用于创建表示数据流结构的脚手架的库是很有用的,应用程序把自己的逻辑(比如算子)和数据传输代码(比如从远程机器的磁盘上读入数据)注入其中。这个设计即是战术性的也是战略性的。长期来看,这使得应用程序保持对 Tez 的未知,短期来看,这让现有的应用程序如 Hive 或 Pig 在不大幅改变它们的核心操作管道下能用上 Tez。我们开始描述 DAG API 和 Runtime API。这些是应用程序主要面对的接口,用来描述应用的 DAG 结构和执行时的代码。接着我们解释由 VertexManagers 和 DataSourceInitializers 组成的基于事件的控制平面去支持运行时对 DAG 的优化。最后,在第 4 节我们描述在 Hadoop 集群上执行所有代码的基于 YARN 的编排框架。特别地,我们专注于这个实现的性能和生产准备方面。

3.1 DAG API

Tez 的 DAG API 暴露给 runtime engine builders,作为一种以简洁的模式表达它们计算结构的方式。我们关注的是一类可以自然地表现为 DAGs 的数据处理应用,数据从数据源向数据汇,在中间的节点做数据转换。Tez 关注于无环图,而且通过假定在顶点上的确定性计算,在边上的数据流向,我们能基于容错重算,类似于 Dryad,进一步的解释在第 4.3 节。作为 DAG 的模型计算并不新鲜,但迄今为止,大多数系统在支持更高级别引擎的背景下一般都会设计 DAG APIs。Tez 被设计成把数据流程图建模作为主要关注点的系统。用众所周知的顶点和边的概念,DAG API 能给出一个关于计算结构的清晰和简洁的描述。

顶点(Vertex)。在 DAG API 中的一个顶点表示数据转换,是数据处理中的一步。这是作用到数据上的核心应用逻辑所在。因此一个顶点必须配置用户提供的 processor 类,并在其中定义每个 task 都要执行的逻辑。DAG 图中的一个“顶点”经常并行执行一些(可能是大量的)tasks。顶点的定义中控制了这个并行度。并行度通常由待处理的分布在集群上的数据决定或者是由需要切分成更小单元的大运算决定。一个顶点上 task 的并行度可以在 DAG 定义过程中静态地定义,但通常是在运行时动态地决定。

边(Edge)。DAG 图中的一条边表示从生产者顶点到消费者顶点的数据移动的逻辑和物理观点。

  • 连接模式(Connection Pattern):从逻辑概念上看,一条边就是在生产者和消费者任务顶点的连接模式和它们的调度依赖关系。这使得编排框架能正确地把数据从生产者任务的输出传递到消费者任务的输入。这个路由表通过一个可拔插的 EdgeManagerPlugin API 接口配置实现。图 3 展示了 3 种常见的连接模式(one-to-one, broadcast, scatter-gather),能用来表达大多数的 DAG 连接,也是这个项目内置。对一些场景需要自定义路由,应用也可以通过自己实现自己的路由(在 5.2 节我们会给一个具体的例子)。
  • 传输机制(Transport Mechanism):从物理概念上看,一条边就是实现移动数据的存储和传输机制。这可能是本地磁盘,或者本地/远程内存等等。一条边表示的实际的数据传输操作通过一对这条边指定互相兼容的 input 和 output 类执行的。兼容性基于使用相同的数据格式和物理传输机制。比如,两个操作都基于键值对格式和在磁盘上的操作,或者都基于字节流并使用内存。Tez 内置常用的 input 和 output 使用场景会在 4.1 节介绍。

Tez 用顶点的并行度和边的性质在执行过程中把逻辑 DAG 扩展成真实物理 task 执行的 DAG,如下图 2 所示。

Figure 2: Expansion of the logical vertex DAG to a physical task DAG based on vertex parallelism and edge definition

Figure 3: Edge properties: define movement of data between producers and consumers

数据源和汇。DAG 能通过 DAG API 来定义顶点和连接它们的边来定义。一般地,数据流会从一些数据源读取初始输入,并写入最终结果到一些数据汇中。数据源可以与一个 DataSourceInitializer 相关联,它在执行时能决定从初始输入的优化读取模式。比如,在 MapReduce 的术语中,这对应着 split 计算,一个 split 对应着被一个 map 任务读取的分布式数据的一个分片。对 map 任务的初始 split 计算能通过一个初始器完成,需要考虑数据的分布,数据的本地化,可获取的计算容量来决定 split 的数量以及对每个 split 的最优大小。类似的,数据汇可以和一个 DataSinkCommitter 相结合,它可以在运行时提交最终的输出。提交的定义随着输出类型的不同而不同,但是会保证只完成一次,而且通常会让外部观察者在成功输出结束后可以感知到。

这种 DAG 的组装方式允许可拔插和可重复使用的组件。输入输出的通用共享库能被不同的应用重复使用,因此只需要提供顶点的处理逻辑。反之,相同的 DAG 结构在不同的硬件环境中通过替换边上的不同的输入输出能以更优化的方式执行。Tez 自带一个适用于 Hadoop 平台内置的数据服务的输入输出库,即适用于 HDFS 和 YARN Shuffle Service。这使得 Hadoop 生态体系中的应用(比如 Hive 和 Pig)通过实现自定义处理器能很快地利用 Tez。图 4 展示了用 API 来描述逻辑 DAG 的一个浓缩例子。

Figure 4: Essence of the Tez API shown via pseudo-code for the canonical WordCount example

3.2 Runtime API

DAP API 定义了数据处理的脚手架结构。Runtime API 被用来将实际应用的代码注入脚手架中。具体来说,Runtime API 定义要实现的接口去创建上述 DAG 中指定的 processor,input 和 output 类。

Inputs, Processor, Oouputs。一个顶点是 DAG 图中一个转换步骤的逻辑表示。实际的转换是通过在集群的机器上运行该顶点的任务来完成的。Tez 定义每个 task 由一组 inputs,一个 processor,以及一组 outputs(IPO)组成。处理器是由该任务的顶点定义的。输入的定义是由该顶点的输入边上游的输出类定义的。输出由该顶点传出的边的输入类定义的。这使得处理器对处理过程有一个逻辑性观点,因此保留了在 MapReduce 中流行的简化编程模型。输入和输出隐藏了细节信息,比如数据传输,数据分区和/或分布式分片数据的汇聚。Runtime API 是一个瘦包装器(thin wrapper),用于实例化以及与输入,处理器和输出进行交互。在 IPO 对象创建后,它们就等着被配置了。

IPO 配置。框架通过在创建 DAG 时指定一个不透明的二进制有效载荷来配置 IPOs。这种二进制有效载荷的配置方式是一个常见的主题,用于配置 Tez 中任何应用特定实体。这允许应用使用它们选择的任何机制去实例化它们的代码。这样做不仅能做简单的配置,也能做代码注入(如 5.4 节的例子)。配置后,处理器被装备上所有输入和输出,并被要求执行。此后,由处理器,输入和输出来互相操作来完成任务。框架通过一个 context 对象和它们交互,发送和接收有关完成,更新进度,报告错误等事件。

数据平面不可知。Tez 没有指定任何数据格式,而且事实上在 DAG 执行时也不是数据平面的一部分。实际的数据传输由输入和输出完成,Tez 只是路由生产者和消费者之间的连接信息。当一个生产者任务输出产生数据,它接着发送相关元数据,比如数据的读取 URL 和大小,通过 Tez 给到消费者任务的输入。Tez 使用连接生产者和消费者的边所编码的连接模式来路由这些元数据。因此 Tez 在数据平面上增加的开销最小。这也使得 Tez 数据格式不可知。输入,处理器和输出能选择适用于应用的它们自己数据格式(比如 bytes,records 或者 key-value pairs 等等),在 5.5 节中所举例子。

这种新颖的基于 IPO 的任务组合方式允许关注点的分离,并使系统具备可拔插性。相同的 DAG 结构能基于环境相关的 IOs 进行实例化。比如,不同云环境能插入对它们存储子系统优化后的 IOs。我们在下节将会看到,如何在执行过程中动态地配置 IPOs,以及进一步地运行时定制。

3.3 基于事件的控制面板

Tez 编排框架地开放架构需要解耦控制平面以使各类实体互相交换控制信息。为了达到这样的效果 Tez 有一个基于事件的控制平面,同时也暴露给应用。软件组件生成事件路由给接收者。根据设计,这是一个异步,非阻塞,基于推送的通信方法。事件用作所有的通信,无论是框架发给框架,应用程序到框架,反之亦然。如图 5 中展示的,一个 DataEvent 由一个生产者任务的输出产生,包含输出元数据(比如 URL),以便给消费者任务去读取数据。这事件由框架接收,通过边指定的连接信息路由给消费者任务的输入。如果一个任务的输入在读取数据时发生错误,它会发送一个 ErrorEvent 给框架。基于这个错误事件,Tez 能重新执行生产者任务以重新生成数据。其他事件能用作发送统计数据,进度等等。基于事件的通信也提供了灵活性,可以在不改变交互模型或 APIs 的情况下增加更多的实体和通信渠道。Tez 只对事件进行路由。每个事件都有一个不透明的二进制有效载荷由发送方和接收方解释,以交换控制元数据。事件在每次任务心跳时流向和流出协调器。事件传输延迟取决于心跳延迟和协调器的处理延迟。这些延迟随着 job 的大小成比例地增加,因为它们依赖并发连接的数量和协调器能支持地事件负载。如果控制平面事件位于数据平面关键路径,那么它们会对应用延迟产生负面影响,但是如果它们只用作数据平面的设置,那么 Tez 就不会对低延迟的应用在数据平面上引入任何额外的延迟。

Figure 5: Events used to route metadata between application IOs and error notifications from applications to the framework

3.4 VertexManager:动态适应执行

如前所述,数据处理集群在计算能力和数据分布(数据存储在物理节点上的位置)方面存在差异,应用程序可以考虑这些差异来计划它们的工作。数据依赖性的行动,比如基于样本的范围分区或者分区裁剪的优化,都需要有在执行时改变 DAG 的能力。不可能对所有这些当前和未来的图的重新配置进行静态编码,Tez 也不能独自完成(因为它需要太多的领域知识)。因此 Tez 需要允许应用在执行时做出这样的决定并与 Tez 协调以动态调整 DAG 和它的执行。这都是通过 VertexManager 抽象完成的,类似于 [24] 。

Runtime Graph Re-configuration。构建 DAG 的时候,每个顶点都会和一个 VertexManager 关联。VertexManager 负责在 DAG 执行期间对顶点进行重新配置。编排框架包含各种状态机来控制顶点,任务等的生命周期,顶点状态机设计来在状态转换时与 VertexManager 交互。VertexManager 被提供一个上下文对象,上下文对象通知它关于状态的变化,比如任务的完成等。VertexManager 能使用上下文对象来改变它的顶点状态。在其他方面,VertexManager 还控制着顶点并行度,输入,处理器和输出的有效载荷的配置,边的属性和任务的调度。和其他实体一样,VertexManager 提供 API 接口给应用去实现定制化的顶点执行。使用相同的 API,Tez 自带了一些内置的 VertexManager。如果一个 VertexManager 没有在 DAG 中指定,Tez 会从内置的基于顶点特性的实现中挑选一个。

自动分区数估算。作为一个运行时优化的例子,我们给一个众所周知的问题提供一个方案,即 MarReduce 任务在 reduce 阶段决定正确的任务数的问题。这个数量一般取决于从 mappers 到 reducers 参与 shuffle 的数据大小,而且只能在执行时准确地获取。Shuffle 是一个术语,用来描述在调用 reduce 操作之前的跨网络的读取和分区化的输入的聚合。在 Tez 中 ShuffleVertexManager 能用作控制读取混洗后数据的顶点。生成等待被混洗数据的任务通过 VertexManager 事件发送数据统计信息给 ShuffleVertexManager。如下图 6 中展示的,ShuffleVertexManager 收集这些统计信息来计算总数据大小,启发式地估算 reducer 的数量,以使每个 reducer 读取到想要地数据大小。因为 reducer 的数量本质上代表了分区数,这个方案能更广泛地用于估算运行时地分区数(比如参与到 distibuted join 操作的分区)。

Figure 6: DAG reconfiguration by VertexManager to apply partition cardinality estimated at runtime

调度优化。VertexManager 同样也能控制他们顶点上的任务调度。通常情况下 tasks 应该在它们的输入数据准备好后才启动。但是如果一些任务能有意义地处理部分输入数据,那它们能提前启动并使用可用的计算资源。上面提到的混洗操作就是一个部分输入数据能被 tasks 主动读取的例子。跨网络的数据传输代价大,早点启动能通过与产生剩余的输入数据的任务的完成时间重叠来隐藏延迟。错序调度可能导致调度死锁,在一个资源耗尽的集群中,一个提前调度的任务能阻塞它的一个输入任务,因为它占用了集群上的资源。Tez 有内置的死锁检测和主动抢占地来处理这种情况。它会用 DAG 的依赖关系来检测执行失序的任务,并抢占它们来解决死锁。

3.5 数据源初始化器

在 Tez 的设计中,我们把数据源作为一等实体来建模。在 DAG 中第一步通常设计从数据源(如分布式文件系统)中读取初始数据,一般来说这也是资源消耗最大一步。因此,在这一步中一个好的决定能大幅改善性能,反之亦然。DAG 中的一个数据源和一个 DataSourceInitializer 相关联,框架会在读取数据源的顶点执行任务前唤起初始化器。这初始化器有机会使用运行时的准确信息来决定如何优化读取输入。如 VertexManager 一样,初始化器也能与其他实体发送和接收事件。它也能通过框架的上下文对象获取到集群信息。基于这些和其他信息源,初始化器能配置任务输入或者通知顶点管理器关于顶点的新配置(如处理输入所需的最佳并行度)。

我们展示一个 Hive 动态分区修剪的使用例子。经常的一个场景是读取数据源,然后再根据某个键做关联。如果关联键空间是已知的,我们就可以只读取数据中和关联有关的子集部分。有时这种元数据只能在运行时在 DAG 的一个不同的子图中通过查看数据获取到。Hive 使用 InputInitializer 事件从其他节点的任务发送这些元数据到数据源的初始化器上。初始化器用这些元数据来决定需要读取的数据中相关子集。这基于关联的选择能大幅改善性能。

上述讨论有了关于 Tez 架构和特性的一个大体描述。更多关于 API 语法和用户自定义实体的细节都可以在项目网站的 API 文档中找到。

4. 实现和使用性考虑

我们现在转向描述上节提到的架构如何在 YARN 中实现,并讨论更多效率的细节以及 Tez 的生产方面。从工程的角度上看,这是我们花费最多精力的地方,也是使得 Tez 做数据处理引擎的一个有用的构件的地方。

4.1 YARN 中的实现

Apache Tez 项目包含 3 个主要部分:

  • API 库:提供 DAG 和 Runtime APIs 和构建应用的客户端侧库;
  • 编排框架(Orchestration framework):这作为 YARN Application Master(AM)的实现在 Hadoop 集群中通过 YARN 去执行 DAG;
  • Runtime 库:提供各种输入和输出的实现,能用一种补充。

典型的 Tez 应用生命周期。一个基于 Tez 的应用是用表示应用逻辑的 DAG 构建 API 库写成的。通常情况下,高阶应用(如 Apache Pig)通过将自己原生语义编码成 Tez DAGs 以实现构建 DAGs 的目的。因为 Tez 是设计成在 Hadoop 集群上执行的,所以我们把输入和输出的实现分为两类,可靠数据存储用所有 Hadoop 集群上现成的标准存储服务 HDFS,临时数据存储采用 YARN Shuffle Service。只使用这些服务的应用只需要实现它们的处理器来启动和运行。一般地,应用创建一个通用的处理器主体,它能配置成执行 DAG 的依赖操作。Tez 的输入和输出采用键值对数据格式,以便于在键值对主宰的 Hadoop 生态系统项目(如 Apache Hive, Pig 等)中方便使用,并且也能扩展到其他数据格式。然后使用 Tez 客户端库将 DAG 提交到 YARN 集群上。YARN 启动 Tez Application Master(AM,每个应用的控制器)去编排 DAG 的执行。AM 执行的 DAG 通常是描述数据流图的逻辑 DAG。AM 扩展这个图以纳入每个顶点的任务并行性。它通过 DAG 图中输入的初始化器和顶点管理器来实现。然后 AM 从 YARN 处请求资源执行来自不同顶点的任务。YARN 的响应是提供容器(containers)。一个容器是集群节点上的资源分配基本单位。AM 在容器中启动任务,路由控制事件。任务通常按依赖的顺序执行,当所有任务执行完成,DAG 也就执行完成。AM 记录追踪和元数据信息,用于监控和调试。

通过利用现有的 YARN 和 MapReduce 上的库和服务,我们能在其上快速构建,这些是几个人年的生产代码,用以实现了安全和高容量的网络数据混洗,并且和 YARN 这种已验证的资源共享的,多租户模型集成。因此基于 Tez 构建的应用不用更多的付出也能从中得利。

4.2 执行效率

编排框架的 YARN 实现一直关注着执行效率和性能,并吸收着这些年来从各种分布式数据处理系统中的想法。

位置感知调度(Locality Aware Scheduling)。对大规模数据处理来说,将处理过程放在靠近数据位置的地方是很重要的 [21,32]。Tez 尝试在靠近输入的地方去执行任务。位置可能在 DAG 创建时静态地指定,但是一般都是在运行时决定的。从初始数据源读取数据的任务通常从它们的数据源得到位置信息,后续的任务则从上游任务和连接边中推断出数据位置。例如,具有分散收集输入的任务没有特定的位置,但更倾向于在靠近输入的大块分片处执行。1-1 的边指定了它们源任务和目标任务之间严格的位置关系。因为在一个繁忙的集群里无法保证能获取到准确的位置信息,框架自动放宽位置性的范围从节点到机架,依此类推,延迟调度 [36] 用于在每次放宽前增加一个等待周期。

猜测(Speculation)。大集群可能有异构的硬件,不同的负载和硬件的老化。这会导致硬件引起的任务减速。这样慢的任务被称为散兵游勇(stragglers),启动这种任务的克隆经常被用来减轻它们对延迟的影响 [21]。Tez 监控任务进度并试图检测在相同节点上比其他任务执行慢得多的 straggler 任务。一旦检测到这样的任务,就会猜测性地和原始任务并行启动一个任务,并与之竞争完成。如果猜测性地尝试先完成了那么就算是成功地改进了完成时间。

容器复用(Container Reuse)。回顾一下,Tez AM 在 YARN 分配给它的容器里执行任务。当一个任务结束,AM 有一个选择去归还容器给 YARN 并请求另一个有不同能力或位置的容器。然而,每个容器分配周期都有与 YARN 协调资源的相关开销,以及启动容器进程的开销。通过复用容器去执行其他能匹配资源和位置的等待任务能把这种开销最小化。当没有能匹配上的任务,Tez AM 释放空闲容器回 YARN,以换取不同能力的新资源。在 Java 的世界中,这个复用有额外的好处,它给了 JVM 优化器更长的时间去观察和优化热点代码路径,从而进一步提高性能 [1]。

会话(Session)。会话把容器复用的概念往前更推进一步。一个 Tez AM 能在会话模式下执行,它能运行客户端提交的一系列 DAGs。这允许来源多个 DAGs 的任务能复用容器,从而进一步得到效率和性能提升。此外,会话能提前预热,它能在首个 DAG 准备执行前就先向 AM 申请启动容器。这些预热的容器能执行一些预先确定的代码,以使 JVM 优化启动。这就把容器复用的好处扩展到了提交给会话的第一个 DAG 上。比如,Apache Hive 和 Pig 使用会话模式在一个会话中执行多个下钻查询以获得性能提升。Tez 会话也可以高效地执行迭代过程。每次迭代都能表示成一个新的 DAG,并提交给共享会话,以便使用预热的会话资源进行高效执行。图 7 展示一个 Tez 会话的轨迹,其容器在多个 DAGs 的任务中共享使用。

Figure 7: Execution trace of 2 DAGs executed in the same Tez Session. Containers are re-used by tasks within a DAG and across DAGs. Cross DAG re-use happens only in session mode.

共享对象注册(Shared Object Registry)。Tez 扩展容器复用的好处到应用上,它为某一任务填充并需要在后续任务中被重复使用的对象提供一个内存缓存。缓存内对象的生命周期能被限制到一个顶点,一个 DAG,或者是会话上,这些都由框架控制。它能用作避免重新计算结果。比如,在 Apache Hive 中的 map join(在 Hive 的术语里的叫法,broadcast join)对小表一边填充 hash table。一旦一个 hash table 在一个 join 任务中被构建出来,其他 join 任务都不需要去重新计算这个结果,从而改善了性能。

4.3 生产环境准备情况

如 Tez 这样对性能和效率很重要的框架来说,我们也不能忽视标准的能力,这些能力是一个生产就绪的和大规模数据处理的可靠框架所要求的能力。大量的实体使用如 Apache Hive,Pig 和其他商用软件如 Cascading 去执行任务的关键操作。如果他们要自信地使用 Apache Tez,那么像容错,安全性和多租户就变成了一个必要的要求。幸运的是,Tez 已经建立在经过验证和测试的平台(如 Hadoop YARN 和 MapReduce)之上,并利用它们的优势来实现这些能力。Tez 中集成 YARN 所实现的专业代码能被使用 Tez 的高阶引擎所使用。

多租户(Multi-Tenancy)。数据处理集群正在变得越来越大,从资本开支的角度看,在多个应用和用户中间共享资本开支变得很关键 [32]。在构建应用时要考虑到这样的共享和协作行为。Tez 中基于离散任务的处理模型很好地适应这样的协作行为。短期存在的任务允许资源被 Tez 应用周期性地释放,从而被分配给其他用户和应用,这样的行为很适合集群资源分配策略。将资源从不需要的应用转移到需要的地方也提高了资源使用率。

这是有效部署服务守护进程的引擎的缺点。一般情况下,服务的守护进程需要提前分配大量资源,这些资源无法被其他应用共享。为了刚好的利用率,这些守护进程尝试同时运行多个“任务”,但是当系统没有足够的负载时这就没有用了,还带来了并发任务互相影响的可能。在 Tez 中,因为每个任务在它自己的容器进程中运行,资源分配的粒度更细。这改进了利用率(通过减少空闲资源的分配)也提供了基于进程的资源隔离(对 CPU/内存 等)。这也给 Tez 应用提供了资源弹性,当集群空闲时能提升使用的资源以加速执行时间,当资源紧缺时能优雅地适当降低性能完成工作。需要说明的一点是基于守护进程设计的讨论是在短暂的数据处理工作的特定背景下进行的。在很多情况下,比如数据存储,网络服务,PAAS 应用等,由基于守护进程的引擎所提供的一个长时共享服务是合适的。

安全性(Security)。随着现代数据处理集群中存储的数据种类和数量的增加,安全问题成为真正的关切。Hadoop 有内置的 Kerberos 和基于令牌的认证和访问控制 [32],而 Tez 原生集成了 Hadoop 安全框架以提供应用程序的安全访问。除此以外。Tez 提供的输入和输出支持通过网络读取数据的加密。因为储存的各种数据和来自多个用户的并发访问,安全问题确实令人担忧。在数据平面之外,减少了 Tez 对应用程序的威胁面的贡献。Tez 与应用程序之间的唯一互动是通过 Tez 的事件路由的控制元数据。这种元数据是以不透明的二进制有效载荷的形式呈现给 Tez 的,因此可以由应用程序通过加密或其他必要的技术来保护。在控制平面,安全 Hadoop 提供了应用程序访问存储或计算资源的基于 Kerberos 和令牌的认证,而 Tez 则与 Hadoop 暴露的安全 API 集成。Tez 有一些内置的输入和输出库,用于 HDFS 和本地存储。在一个安全 Hadoop 集群中,这些库使用基于 HDFS 令牌的认证来访问数据。在一个安全的集群中,本地数据被写在操作系统上用户的安全内容中,并通过 YARN Shuffle 服务提供的安全 SSL 通道进行读取。

安全的另一个方面是在同一台机器上运行但属于不同用户的任务之间的隔离。YARN 提供了通过在应用程序用户的安全上下文中运行容器,在容器之间提供这种安全隔离。由于其细粒度、短暂的任务模型,Tez 可以利用这种容器的安全性,在应用程序的容器中为单个用户运行任务,从而保证用户级别的隔离。当使用部署服务守护进程的应用引擎时,这就更难实现了。守护进程需要在同一个守护进程中运行来自不同用户的任务,这样使安全隔离变得困难甚至是不可能。为了解决这个问题,需要启动多个服务守护进程的实例(每个用户一个),如上所述这可能会降低资源利用率。我们相信,Tez 的细粒度、短暂的任务模型使其更适合于安全和多用户的 YARN 集群。

容错性(Fault Tolerance)。故障是商品硬件集群中的一个常态。故障可能发生在计算节点或网络上。Tez 提供了强大的故障容错能力,使用任务重新执行作为从错误中恢复的手段。当一个任务由于机器错误而失败时,它将在不同的机器上重新执行。基于任务重新执行的容错取决于确定性的和无副作用的任务执行。无副作用允许一个任务可以被多次执行。确定性保证了如果相同的代码在相同的输入数据上被执行,那么它将在每次执行中产生相同的输出数据。这些使系统能够安全地重新执行任务,以便从故障和数据丢失中恢复。由于输出是相同的,已经完成的消费者任务的输出不需要重新执行。这这限制了需要重新执行的数量,降低了故障的成本。

由于 Tez 不在数据平面上,它暴露了一个 InputReadError 事件,任务输入可以用它来通知 Tez 关于中间数据的丢失。使用 DAG 依赖性信息,Tez 可以确定哪个任务输出产生了丢失的数据,并重新执行该任务以重新生成数据。可能发生的情况是,重新执行的任务也报告了一个输入错误。这将导致 Tez 在 DAG 依赖关系中再升一步,以此类推,直到它找到稳定的中间数据。边 API 允许指定中间数据的弹性,这样 Tez 就可以被告知一个给定的边数据已经被可靠地存储,从而创造了一个级联故障的障碍。Tez 内置的输入/输出库利用了继承自 MapReduce 的启发式方法,当混洗大量数据时用来缓解和恢复网络错误和级联故障。例如,在报告错误事件之前,临时的网络错误会被重试。部分取用的数据被缓存,消费者任务保持活力,直到剩余的缺失数据被重新生成。Tez AM 周期性检查其状态。如果运行 Tez AM 的节点出现了故障,那么 YARN 将在另一个节点上重新启动 AM,AM 可以从检查点数据中恢复其状态。

Tez 与 YARN 紧密结合处理计划内和计划外的集群中断。它监听关于机器丢失或退役的通知,并积极主动地重新执行在这些机器上完成的任务。这就减少了这些任务输出的消费者会失败。Tez 还能理解 YARN 采取的行动,如抢占容器进行资源再平衡或终止表现不佳的容器,并对这些行动做出适当的反应。

限制(Limitations)。目前 Tez 的实现是基于 Java 的,因此,我们现在只限于基于 JVM 的应用。基于 Tez 的 MapReduce 实现已经成功执行了使用 MapReduce 的分叉方法执行非 JVM 用户代码。然而,一个更原生的 Tez 将需要编写 IPOs 和执行器的非 Java APIs 来支持它们。目前正在进行的工作是为 DAG API 提供一个可移植的基于文本的表示方法,以使非 Java 编译器能够以 Tez 为目标。Tez 只能在基于 YARN 的 Hadoop 集群中使用,因为目前的调度实现是为 YARN 编写的。我们最近的工作是让开发者具备调试能力,这就抽象出了对集群的依赖。这项工作的扩展可以使 Tez 利用其他系统来执行。目前的故障容错模型依赖于这样的假设,即中间任务输出被定位到任务运行的机器上。因此中间数据丢失导致任务在不同的机器上重新执行。可能不是所有的 IOs 都是如此。例如,如果数据是直接通过网络进行流式传输。此外,这种网络流可能会导致连接的流式子图的崩溃,这就需要对容错模型进行扩展以处理这种相关故障。

5. 各种框架和接受度

在本节中,我们将概述已经更新或已经做出原型的使用 Tez 框架在 YARN 上运行的项目。这些项目代表了相当多的应用类型,并有助于显示了 Tez APIs 在建模和构建高性能数据处理应用方面的适用性。这些项目代表了多种多样的应用类型,有助于展示 Tez APIs 在建模和构建高性能数据处理应用方面的适用性。

5.1 Apache MapReduce

MapReduce 是一种简单而强大的可扩展数据处理手段,可以说是开创了廉价的超大规模的数据处理时代。它的核心是一个简单的 2 顶点连接图。在 Tez 中,它可以用一个 map 顶点,一个 reduce 顶点以及一条散点收集边连接来表示。Tez 有内置的 MapProcessor 和 ReduceProcessor 来代表 Map 和 Reduce 顶点,并提供 MapReduce 接口功能。因此,MapReduce 可以很容易地被写成基于 Tez 的应用程序,事实上,Tez 项目内置了 MapReduce 的实现。任何基于 MapReduce 的应用程序都可以使用 Tez 版本的 MapReduce 来执行,只需简单地改变 YARN 集群上的 MapReduce 配置。

5.2 Apache Hive

Apache Hive 是 Hadoop 生态系统中最流行的基于 SQL 的声明式查询引擎之一。它将 HiveQL(一种类似 SQL 的方言)编写的查询翻译成 MapReduce 任务,并在 Hadoop 集群上运行。像其他 SQL 引擎一样,Hive 将查询翻译成优化查询树。通常情况下,由于 MapReduce 的表达能力限制,这些翻译到 MapReduce 的效率很低。这些树可以使用 Tez DAG API 直接翻译为 DAGs。因此,它们可以在 Tez 中有效地表示出来。此外,Hive 使用自定义的边(通过 Tez API)来执行到目前为止很难做到的复杂的关联。例如,在一个名为动态分区哈希关联(Dynamically Partitioned Hash Join)的关联变体中,Hive 使用一个自定义的顶点管理器来确定哪些数据分片的子集要相互连接,并创建一个自定义边,将适当的分片路由到它们的消费任务。虽然这些查询计划的改进提供了算法上的性能提升,但 Hive 从执行效率(第 4.2 节)中获益,以获得显著的性能优势。这种整合已经由 Apache Hive 社区实现,Hive 0.13 是 Hive 的第一个集成 Tez 的发行版本 [7]。更多基于 Tez 的优化(如动态分区修剪)已经在 Hive 0.14 中发布,并计划在未来的版本中进行更多的工作

5.3 Apache Pig

Apache Pig 提供了一种程序化的脚本语言(名为 PigLatin),它被设计用来编写复杂的批处理 ETL 管线。PigLatin 的程序化性质允许创建复杂的 DAG,顶点有多个输出。在 MapReduce 中,应用程序只能有一个输出,因此不得不用创造性的变通方法,如对数据进行标记或编写副作用输出。通过 Tez APIs 可以明确地对多个输出进行建模,使得 Pig 中的计划和执行代码简洁可维护。Pig 支持带有数据倾斜检测的关联,这是在早期通过运行另一个 MapReduce 作业来读取和采样数据,然后在客户端机器上创建直方图,最后再运行一个作业,使用直方图来读取和划分数据。这个复杂的作业流程在使用 Tez 的情况下可以被表示为任何 Pig DAG 的子图。样本被收集在一个计算直方图的直方图顶点中。直方图数据通过一个事件被发送至一个自定义的顶点管理器,该管理器重新配置分区顶点,以便执行最佳分区。来自 Yahoo、Netflix、LinkedIn、Twitter 和 Hortonworks 的 Pig 开发者共同实现了这个集成。Pig 0.14 是 Pig 的第一个除了 MapReduce 之外还支持基于 Tez 的发行版本 [8] 。

5.4 Apache Spark

Apache Spark [38] 是一个新的计算引擎,为分布式数据处理提供了一个优雅的集成 Scala API 语言。它专注于机器学习,但其 API 也适用于混合的工作负载和流水线处理。数据分布元数据是在一个叫做分布式弹性数据集(RDD)的概念 [37] 的语言层捕获,这个元数据在编译过程中被用来构建一个执行分布式计算的任务 DAG。Spark 有自己执行这些任务的处理引擎服务。我们能够将编译后的 Spark DAG 编码为 Tez DAG,并在未运行 Spark 引擎的 YARN 集群中成功运行。用户定义 Spark 代码被序列化为 Tez 处理器的有效载荷,并被注入到一个通用的 Spark 处理器中,该处理器反序列化并执行用户代码。这使得未经修改的 Spark 程序可以使用 Spark 自己的运行时操作符在 YARN 上运行。Apache Hive 和 Pig 已经被设计成可以翻译成 MapReduce,翻译成 Tez 是一个进化的步骤。在 YARN 上使用 Tez 对 Spark 这样的新引擎进行建模,这有力地证明了 Tez 框架的通用性和建模能力。Tez 会话也使 Spark 机器学习的迭代能够有效运行,每个迭代的 DAGs 提交给共享的 Tez 会话。这项工作是一个实验性原型,不是 Spark 项目的一部分 [22]。

Apache Flink [5] 是 Apache 社区的一个新项目,发源于柏林大学数据管理社区的 Stratosphere 研究项目。它是一个并行处理引擎,提供 Java 和 Scala 的编程 APIs,和一个为 APIs 和它自己执行引擎的基于成本的优化器。Flink 是另一个新平台的例子,它可以使用 Tez 框架与 YARN 集成,而不是作为一个独立的服务运行。这种整合把优化后的 DAG 翻译成 Tez DAG。虽然 Apache Hive 和 Pig 工作在键值数据格式上,并且可以使用内置的 Tez 输入和输出,但 Flink 将中间数据保存为自定义的二进制数据。这种格式可以用来执行诸如 group by 等操作,而没有太多的反序列化开销。可插拔和可组合的 Tez 任务模型允许 Flink 将其运行时操作符和二进制格式纳入 Tez 任务中,从而使未经修改的程序能够在 YARN 上使用 Flink 的原生运行时模型。这项工作目前正在 Apache Flink 开发者社区中进行 [6] 。

5.6 商业上的接受

Tez 的可定制性和以性能为重点的设计,使其迅速被商业机构采用。Concurrent Inc. 支持一个用 Java 写成的开源语言集成 API,称为 Cascading,用于分布式数据处理。Cascading 已被更新为使用 Tez 框架在 YARN 上运行,并取得了良好的性能结果。Scalding 是 Cascading 上的一种 Scala 方言,通过 Cascading 自动获得 Tez 的好处。与Tez集成的 Cascading 3.0 目前以开发者预览版的形式提供 [3]。Datameer 提供了一个可视化分析平台,使用 Tez 在 YARN 上运行优化的分析查询。它还使用 Tez 会话来维护一个查询服务池,以便在一个安全的、多租户的环境中实现快速响应。Datameer 5.0 是第一个使用 Tez 的版本 [33]。Syncsort 的第 8 版 Hadoop 产品,DMX-h,配备了一个智能执行层,以实现对 Mapreduce 以外的执行框架的透明定位。在这之后,他们正在整合 Tez 作为其支持的执行框架之一 [11]。

5.7 部署

Apache Tez 已经被部署在多个组织和在各种集群配置上。最突出的是,Yahoo! 已经在多个集群上部署了 Tez,范围从 100 多个节点到 1000 多个节点不等,用 Tez 运行 Hive 和 Pig [35]。LinkedIn 已经完成了他们所有的 Hadoop 集群迁移到 YARN,并使用 Tez 运行 Hive 和 Pig。微软 Azure 已经部署了带有 Tez 的 Hive 作为其云 Hadoop 产品的一部分 [10]。Hortonworks 自 2014 年 4 月以来已经提供了 Tez 作为其 Hadoop 发行版的一部分,其后迅速接受 Tez。在本报告发表时,近 100 个 Hortonworks 的客户已经探索了 Tez 的功能。

我们在这一节中描述的 Tez 不断被采用,为项目的成功提供了一个定性的衡量标准。在下一节中,我们将讨论由于这些应用与 Tez 集成而获得的实验结果。

6. 实验结果

我们在本节中专门介绍了几个实验,展示 YARN 上基于 Tez 的 Hive、Pig 和 Spark 的实现是如何优于它们在 YARN 上的原始实现的。这些实验来自于标准基准和生产工作负载。

6.1 Hive 0.14 性能测试

Hive 利用了 Tez 的各种特性,如广播边缘、运行时重新配置和自定义顶点管理器,来更好地整体实现用户处理目标的执行。与 Hive 0.14 基于成本的优化器相结合,Tez 可以实现执行繁忙的关联计划,可以利用中间的广播连接。Tez 的可插拔任务模型允许 Hive 在处理过程中使用自定义的矢量运算符。自定义边被用来执行高效的 Hive sort-merge-bucket joins。在图 8 中,我们展示了这些优化对 TPC-DS 派生的 Hive 负载的影响,该负载以 30 TB 的规模在 20 个节点(每个节点有 16 核、256 Gb 内存和 6 个 4 Tb 驱动器)的集群上运行。基于 Tez 的实现大大超过了传统的基于 MapReduce 的实现。

Figure 8: Hive: TPC-DS derived workload (30TB scale)

6.2 Yahoo Hive 规模测试

我们在图 9 中展示了 Hive 在 Tez 上的对比测试,该测试采用了 TPC-H 衍生的 Hive 工作负载 [35],负载以 10 TB 规模在 350 个节点(每个节点有 16 核、24 Gb 内存和 6 个 2 Tb 驱动器)的研究集群上测试。这是在 2014 年 Hadoop 峰会(圣何塞)上发表的。这表明,基于 Tez 的实施方案在大型集群上优于 MapReduce 的实现。

Figure 9: Hive: TPC-H derived workload at Yahoo (10TB scale)

6.3 Yahoo Pig 生产测试

在 Yahoo!,Pig on Tez 在大型生产 ETL pig 作业上进行了测试,这些作业的运行时长从几分钟到几小时。为了测试规模和功能的不同方面,所运行的 pig 脚本具有不同的特性,如 TB 级的输入,100 K 以上的任务,有 20 到 50 个顶点的复杂的 DAGs,并做了各种操作的组合,如 group by、union、distinct、join、order by 等。测试是在不同的生产集群上进行的,这些集群中的数据已经在正常运行,平均利用率为 60%-70%。该集群有 4,200 台服务器,46 PB HDFS 存储和 90 TB 总内存。大多数数据节点都有 12/24G 内存,2 个 Xeon 2.40GHz,6 个 2TB SATA,使用Hadoop 2.5,RHEL 6.5,JDK 1.7。与 MapReduce 相比,在保持所有配置(内存,shuffle 配置等)与 MapReduce 相同的情况下有 1.5 到 2 倍的性能提升。图 10 显示了这些测试的结果。

Figure 10: Pig workloads at Yahoo

6.4 Pig K-均值迭代测试

正如第 4.2 节所述,Tez 的会话和容器复用功能对快速迭代的工作负载有利,这需要连续的 DAGs 在同一数据集上执行。在图 11 中,我们展示了 K-means 迭代式 PIG 脚本 [20] 的性能改进。对一个 10,000 行的输入数据集在单个节点上进行 10、50 和 100 次迭代。这是在 2014 年 Hadoop 峰会(圣何塞)上发表的。

Figure 11: Pig iteration workload (k-means)

6.5 YARN 上的 Spark 多租户测试

正如第 4.3 节所解释的,Tez 的基于短暂任务的模型更适合于多租户和资源共享。这表现在图 12 中,通过比较 YARN 上基于服务和基于 Tez 的 Spark 实现证明了这一点。基于 Tez 的实现释放了闲置资源被分配给其他需要这些资源的作业,从而如图 13 所示,加速了它们的速度,而基于服务的实现则在服务的生命周期内保留资源。这个实验中,我们有 5 个用户的并发测试 [22],在 20 个节点的集群上,沿着 L SHIPDATE 列对 TPC-H lineitem 数据集进行分区。测试是在对应于数据仓库 100 GB、200 GB、500 GB 和 1 TB 规模的数据集上进行的。用来运行这个工作负载的集群与 Hive 0.14 基准测试一样,每个节点有 16 核、256 Gb 内存和 6 个 4 Tb 驱动器。

Figure 12: Sharing a cluster across 5 concurrent Spark jobs with Tez (identical x-axis time scales)

Figure 13: Spark Multi-Tenancy on YARN Latency Results

7. 开源和未来的工作

Apache Tez 是在 Apache 软件基金会下开发的一个开源项目。它是一个社区驱动的项目,贡献者来自微软、雅虎、Hortonworks、LinkedIn 以及其他个人爱好者。该项目的源代码可在 http://git.apache.org/tez.git,项目网站在 [2]。

Tez 的开放架构和其基础可定制性设计使它成为一个实验和创新的平台。我们相信,目前建立在 Tez 上的用例只是漫长旅程中的最初步骤。各个领域对改善和利用 Tez 有相当大的兴趣。渐进式查询,允许复杂的大型查询被部分执行,并随着查询的进行而逐步优化。Apache Hive [30] 和 Apache Calcite [17] 正在合作进行物化视图的研究,以提高普通子查询速度。我们希望与正在添加到 HDFS [12] 中的内存存储功能进行深度整合,以便 Tez 应用能够从内存计算中受益。Tez 目前支持 Java 应用,将其扩展到支持其他语言将扩大使用 Tez 构建的应用程序的范围。另一个感兴趣的领域是用于调试故障和性能瓶颈的工具。越来越多的地理分布和法律/隐私要求使得跨数据中心的工作执行变得非常重要 [34]。改进 Tez 编排和 API 来构建工作模型可能有助于有效地执行它们。以上只是 Tez 在学术界和商业界发展或使用的众多可能性中的一小部分。许多运行时的优化也在进行中。例如,自动选择最佳的数据传输机制,如为分配到同机器上的任务选择内存数据,或者为超长任务的输出使用一个可靠的存储器,输出就能得到保障不至于丢失。一个战术性的想法是创建工具使整个 MapReduce 工作流能够被缝合到一个单一的 Tez DAG 中。这将使遗留的 MapReduce 工作流程能够轻松地使用 MapReduce im

8. 相关工作

Apache Tez 很幸运的可以从 Dryad、Hyracks 和 Nephele [24, 15, 14] 等类似系统的开发和经验中学习。所有这些系统都有一个共同的概念,就是将数据处理建模为 DAGs,顶点代表应用逻辑,边或通道代表数据传输。Tez 通过增加输入、处理器和输出的概念来正式定义执行 DAG 的任务,使其更加细化和关注点的明确分离,并允许可插拔的任务组成。所有这些系统都在不同程度上参与了数据平面,并定义了某种形式的数据格式,应用程序可以从基本定义衍生出自定义数据格式。它们都定义了磁盘上、网络上和内存中的通信通道。另一方面,Tez 没有定义任何数据格式,也不是数据格式的一部分。在类似的情况下,Hyracks 为执行定义了一个运算符模型,使其能够更好地理解数据流以进行调度。Tez 将处理器视为黑盒子,因此应用逻辑可以完全从框架中解耦。Nephele 为云环境进行了优化,它可以弹性地增加或减少资源并选择适当的虚拟机。Tez 也可以通过在 YARN 中获取和释放资源来实现资源的弹性。Dryad 和 Tez 共享动态图重新配置的顶点管理器的概念。Tez 将这一概念向前推进了一步,正式确定了一个 API,允许在不了解框架内部的情况下编写管理器,还定义了一个基于事件的通信机制,使任务中的应用代码与顶点管理器中的应用程序代码进行通信,以便来启动重新配置。此外,Tez 还增加了一个输入初始化器的概念,对主要数据源进行正式建模,并在读取数据时应用运行时优化。当任务的所有输入都准备好被消耗时,Dryad 才会安排任务,以防止调度死锁的发生。Tez 出于性能原因可以错序执行,并有内置的抢占功能来解决调度死锁。总之,Tez 与这些系统的不同之处在于它的建模能力和设计目标是作为一个库来构建引擎,而不是自己作为一个引擎。MapReduce 当然是 Hadoop 生态系统中现有的引擎。Tez 涵盖了 MapReduce APIs,因此可以用 Tez 编写一个功能齐全的 MapReduce 应用程序。

Dremel [26] 是一个基于多级执行树的大型数据集交互式分析的处理框架,它对聚合查询进行了优化,并激发了 Presto [9] 和 Apache Drill [23] 等系统。这些,以及其他 SQL 查询引擎,如 Impala [25] 或 Apache Tajo [18] 等,与 Tez 不同的是,它们是为特定的处理领域而优化的引擎,而 Tez 是一个建立数据处理应用程序的库。Spark [38] 是一个新的通用数据处理引擎。它暴露了一个基于弹性分布式数据集(RDD)[37] 的计算模型,最终在内存存储和计算引擎中执行。同样的区别在于 Tez 它是一个库,而不是一个通用的引擎。Tez 并不提供任何存储服务,但应用程序可以使用现有的内存存储,例如 HDFS 内存存储 [12],以获得内存计算的优势。Spark 的概念是使用 RDDs 作为隐式地捕捉处理步骤之间的血缘依赖关系的手段,同样可以通过使用 Tez APIs 定义 DAG 来显式地捕获相同的依赖关系。

一个要与之比较的重要的系统类别是构建 YARN 应用的框架。在这个领域中,最相关的两个方面是 Apache REEF [19] 和 Apache Twill [31]。这些系统相比 Tez 专注于更广泛的应用类别(包括服务),因此提供了一个较低级别的 API。Tez 专注于支持数据流驱动的应用,因此有意识地选择提供一个结构化的基于 DAG 的控制流。

9. 结论

今天,Hadoop 是一个蓬勃发展的大规模数据处理的生态系统,它拥有一套不断增长的应用框架,为处理数据提供了不同的抽象。我们认识到这是非常有价值的,但我们也强调了一些实质性关于碎片化和重复工作的担忧,因为每个应用框架反复解决类似的基本问题。

为了解决这个问题,我们提出了 Apache Tez,一个开源的框架,旨在建立数据流驱动的处理引擎。Tez 提供了一个脚手架和库,以促进以 DAG 为中心的数据处理应用的设计和实现,并且着重于重复使用,同时平衡性能关键的数据平面的可定制性。Tez 有意识地努力实现动态优化,如分区修剪。除了这些关键的架构选择,Tez 与其他统一框架不同的是,它有一个相当大的开源社区,正在推动 Tez 成为构建以 DAG 为导向的数据处理引擎的框架首选。到今天为止。 最受欢迎的项目(Hive、Pig 和 Cascading)已经与 Tez 集成。我们通过实验证明,这些系统基于 Tez 的化身提供了实质性的性能优势,超出了利用通用功能的定性论证。

我们认为,我们所推动的标准化可以促进更快的创新,并使整合工作得以进行,否则会很麻烦(例如,由多个应用框架组成的管道)。Tez 的可定制性和开源社区使其成为研究的理想场所,因为新的想法可以被测试、整合,并以最小的代价获得现实世界的影响。

10. 参考链接

[1] Understanding just-in-time compilation and optimization. https://docs.oracle.com/cd/E15289_01/doc.40/e15058/underst_jit.htm

[2] Apache tez project website, 2014. http://tez.apache.org

[3] Cascading integration with tez project, 2014. http://www.cascading.org/2014/09/17/cascading-3-0-wip-now-supports-apache-tez

[4] Cascading: Project website, 2014. http://www.cascading.org

[5] Flink: apache incubator project (formerly stratosphere), 2014. http://flink.incubator.apache.org/

[6] Flink integration with tez project jira, 2014. http://issues.apache.org/jira/browse/FLINK-972

[7] Hive integration with tez project jira, 2014. http://issues.apache.org/jira/browse/HIVE-4660

[8] Pig integration with tez project jira, 2014. http://issues.apache.org/jira/browse/PIG-3446

[9] Presto: Distributed sql query engine for big data, 2014. http:/prestodb.io

[10] What’s new in the hadoop cluster versions provided by hdinsight?, 2014. http://azure.microsoft.com/en-us/documentation/articles/hdinsight-component-versioning/

[11] New syncsort big data software removes major barriers to mainstream apache hadoop adoption, 2015. http://www.syncsort.com/en/About/News-Center/Press-Release/New-Syncsort-Big-Data-Software

[12] A. Agarwal. https://issues.apache.org/jira/browse/hdfs-5851, 2014. https://issues.apache.org/jira/browse/HDFS-5851

[13] S. Babu. Towards automatic optimization of mapreduce programs. In Proceedings of the 1st ACM Symposium on Cloud Computing, SoCC’10, pages 137–142, New York, NY, USA, 2010. ACM.

[14] D. Battré, S. Ewen, F. Hueske, O. Kao, V. Markl, and D. Warneke. Nephele/PACTs: A programming model and execution framework for web-scale analytical processing. In SOCC, 2010.

[15] V. Borkar, M. Carey, R. Grover, N. Onose, and R. Vernica. Hyracks: A flexible and extensible foundation for data-intensive computing. In ICDE, 2011.

[16] D. Borthakur. Hdfs architecture guide. HADOOP APACHE PROJECT http://hadoop. apache. org/common/docs/current/hdfsdesign. pdf, 2008.

[17] Calcite developer community (formerly Optiq). Calcite: Apache incubator project, 2014. https://github.com/apache/incubator-calcite

[18] H. Choi, J. Son, H. Yang, H. Ryu, B. Lim, S. Kim, and Y. D. Chung. Tajo: A distributed data warehouse system on large clusters. In Data Engineering (ICDE), 2013 IEEE 29th International Conference on,pages 1320–1323. IEEE, 2013.

[19] B.-G. Chun, T. Condie, C. Curino, C. Douglas, S. Matusevych, B. Myers, S. Narayanamurthy, R. Ramakrishnan, S. Rao, J. Rosen, et al. Reef: Retainable evaluator execution framework. Proceedings of the VLDB Endowment, 6(12):1370–1373, 2013.

[20] Daniel Dai and Rohini Palaniswamy. Pig on Tez: Low latency ETL, 2014. http://www.slideshare.net/Hadoop_Summit/pig-on-tez-low-latency-etl-with-big-data/25

[21] J. Dean and S. Ghemawat. MapReduce: simplified data processing on large clusters. Commun. ACM, 51, 2008.

[22] Gopal Vijayaraghavan and Oleg Zhurakousky. Improving Spark for Data Pipelines with Native YARN Integration, 2014. http://bit.ly/1IkZVvP

[23] M. Hausenblas and J. Nadeau. Apache drill: interactive ad-hoc analysis at scale. Big Data, 1(2):100–104, 2013.

[24] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. SIGOPS, 2007.

[25] M. Kornacker, A. Behm, V. Bittorf, and et al. Impala: A modern, open-source sql engine for hadoop. In CIDR, 2015.

[26] S. Melnik, A. Gubarev, J. J. Long, G. Romer, S. Shivakumar, M. Tolton, and T. Vassilakis. Dremel: Interactive analysis of web-scale datasets. Proc. VLDB Endow., 3(1-2):330–339, Sept. 2010.

[27] A. C. Murthy, C. Douglas, M. Konar, O. O’Malley, S. Radia, S. Agarwal, and V. KV. Architecture of next generation apache hadoop mapreduce framework. Technical report, Tech. rep., Apache Hadoop, 2011.

[28] C. Olston, B. Reed, U. Srivastava, R. Kumar, and A. Tomkins. Pig latin: a not-so-foreign language for data processing. In Proceedings of the 2008 ACM SIGMOD international conference on Management of data, SIGMOD ’08, pages 1099–1110, New York, NY, USA, 2008. ACM.

[29] M. Stonebraker, D. Abadi, D. J. DeWitt, S. Madden, E. Paulson, A. Pavlo, and A. Rasin. Mapreduce and parallel dbmss: Friends or foes? Commun. ACM, 53(1):64–71, Jan. 2010.

[30] A. Thusoo, J. S. Sarma, N. Jain, Z. Shao, P. Chakka, S. Anthony, H. Liu, P. Wyckoff, and R. Murthy. Hive – a warehousing solution over a map-reduce framework. In PVLDB, 2009.

[31] Twill developer community. Twill: Apache incubator project, 2014. http://twill.incubator.apache.org/

[32] V. K. Vavilapalli, A. C. Murthy, C. Douglas, S. Agarwal, M. Konar, R. Evans, T. Graves, J. Lowe, H. Shah, S. Seth, B. Saha, C. Curino, O. O’Malley, S. Radia, B. Reed, and E. Baldeschwieler. Apache hadoop yarn: Yet another resource negotiator. In SOCC, 2013.

[33] P. Voss. The challenge to choosing the “right” execution engine, 2014. http://bit.ly/1wXP9Wp

[34] A. Vulimiri, C. Curino, B. Godfrey, K. Karanasos, and G. Varghese. Wanalytics: Analytics for a geo-distributed data-intensive world. In CIDR, 2015.

[35] Yahoo Hadoop Platforms Team. Yahoo Betting on Apache Hive, Tez, and YARN, 2014. http://yahoodevelopers.tumblr.com/post/85930551108/yahoo-betting-on-apache-hive-tez-and-yarn

[36] M. Zaharia, D. Borthakur, J. Sen Sarma, K. Elmeleegy, S. Shenker, and I. Stoica. Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling. In Proceedings of the 5th European conference on Computer systems, pages 265–278. ACM, 2010.

[37] M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M. J. Franklin, S. Shenker, and I. Stoica. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. In Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation, pages 2–2. USENIX Association, 2012.

[38] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica. Spark: Cluster computing with working sets. In Proceedings of the 2Nd USENIX Conference on Hot Topics in Cloud Computing, HotCloud’10, pages 10–10, Berkeley, CA, USA, 2010. USENIX Association.1369