深入了解Spark SQL的Catalyst Optimizer
Spark SQL是Spark最新的、技术上涉及最多的组件之一。它支持SQL查询和新的DataFrame API.Spark SQL的核心是催化剂优化器,它利用了高级编程语言的特性(例如Scala的模式匹配而且但是)以一种新颖的方式构建可扩展的查询优化器。
我们最近发表了纸将出现在SIGMOD 2015(与Davies Liu、Joseph K. Bradley、Xiangrui孟、Tomer Kaftan、Michael J. Franklin和Ali Ghodsi共同撰写)。在这篇博客文章中,我们将重新发布论文中解释Catalyst优化器内部结构的部分,以供更广泛的读者使用。
为了实现Spark SQL,我们基于Scala中的函数式编程结构设计了一个新的可扩展优化器Catalyst。Catalyst的可扩展设计有两个目的。首先,我们希望能够很容易地向Spark SQL中添加新的优化技术和特性,特别是为了解决我们在大数据中遇到的各种问题(例如,半结构化数据和高级分析)。其次,我们希望外部开发人员能够扩展优化器——例如,通过添加特定于数据源的规则,这些规则可以将过滤或聚合推入外部存储系统,或者支持新的数据类型。Catalyst支持基于规则和基于成本的优化。
Catalyst的核心包含一个通用库,用于表示树并应用规则来操作它们。在这个框架之上,我们构建了特定于关系查询处理的库(例如,表达式、逻辑查询计划),以及几组处理查询执行的不同阶段的规则:分析、逻辑优化、物理规划和将部分查询编译为Java字节码的代码生成。对于后者,我们使用另一个Scala特性,但是,这使得在运行时很容易从可组合表达式生成代码。最后,Catalyst提供了几个公共扩展点,包括外部数据源和用户定义类型。
树
Catalyst中的主要数据类型是由节点对象组成的树。每个节点都有一个节点类型和零个或多个子节点。新的节点类型在Scala中定义为TreeNode类的子类。这些对象是不可变的,可以使用函数转换进行操作,这将在下一小节中讨论。
作为一个简单的例子,假设对于一个非常简单的表达式语言,我们有以下三个节点类:
文字(价值:Int)
:常数值属性(名称:字符串):
输入行的属性,例如," x "添加(左:TreeNode,右:TreeNode):
两个表达式的和。
这些类可以用来建立树;例如,表达式的树x + (1 + 2)
,在Scala代码中表示如下:
添加(属性(x),添加(文字(1),文字(2)))
规则
可以使用规则操作树,规则是从一棵树到另一棵树的函数。虽然规则可以在其输入树上运行任意代码(假设该树只是一个Scala对象),但最常见的方法是使用一组模式匹配函数,用特定的结构查找和替换子树。
模式匹配是许多函数式语言的一个特性,它允许从可能嵌套的代数数据类型结构中提取值。在Catalyst中,树提供了一种转换方法,在树的所有节点上递归地应用模式匹配函数,将匹配每个模式的节点转换为结果。例如,我们可以实现一个规则,在常量之间折叠Add操作,如下所示:
树。变换{情况下添加(文字(c1),文字(c2)) = >文字(c1 + c2)}
把这个应用到树上x + (1 + 2)
会生出新的树吗x + 3
.的情况下
关键字在这里是Scala的标准模式匹配语法,可以用来匹配对象的类型,以及为提取的值命名(c1
而且c2
这里)。
传递给transform的模式匹配表达式是一个部分函数,这意味着它只需要匹配所有可能输入树的子集。Catalyst将测试给定规则适用于树的哪些部分,自动跳过并下降到不匹配的子树。这种能力意味着规则只需要对应用给定优化的树进行推理,而不需要对不匹配的树进行推理。因此,当向系统中添加新的操作符类型时,不需要修改规则。
规则(以及Scala模式匹配)可以在同一个转换调用中匹配多个模式,使得一次实现多个转换非常简洁:
树。变换{
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
case Add(left, Literal(0)) => left
case Add(Literal(0), right) => right
}
在实践中,规则可能需要执行多次才能完全转换树。Catalyst将规则分组成批,并执行每个批,直到它到达一个固定点,也就是说,直到树在应用其规则后停止变化。将规则运行到固定点意味着每个规则可以简单且自包含,但最终仍然对树具有更大的全局影响。在上面的例子中,重复应用将不断折叠更大的树,例如(x + 0) + (3 + 3)
.另一个例子是,第一批处理可能会分析一个表达式来为所有属性分配类型,而第二批处理可能会使用这些类型来执行常量折叠。在每个批处理之后,开发人员还可以在新树上运行完整性检查(例如,查看所有属性都分配了类型),通常也通过递归匹配编写。
最后,规则条件及其主体可以包含任意的Scala代码。对于优化器来说,这使Catalyst比特定领域的语言更强大,同时对简单的规则保持简洁。
根据我们的经验,不可变树上的函数转换使整个优化器非常容易推理和调试。它们还支持优化器中的并行化,尽管我们还没有利用这一点。
在Spark SQL中使用Catalyst
我们分四个阶段使用Catalyst的通用树转换框架,如下所示:(1)分析逻辑计划以解析引用,(2)逻辑计划优化,(3)物理规划,(4)代码生成(将部分查询编译为Java字节码)。在物理计划阶段,Catalyst可能会生成多个计划,并根据成本对它们进行比较。所有其他阶段都完全基于规则。每个阶段使用不同类型的树节点;Catalyst包括用于表达式、数据类型以及逻辑和物理操作符的节点库。我们现在描述这些阶段中的每一个。
分析
Spark SQL从一个要计算的关系开始,可以从SQL解析器返回的抽象语法树(AST)开始,也可以从使用API构造的DataFrame对象开始。在这两种情况下,关系都可能包含未解析的属性引用或关系:例如,在SQL查询中SELECT col FROM sales
, col的类型,甚至它是否是一个有效的列名,都是不知道的,直到我们查找表sales。如果不知道属性的类型或没有将其与输入表(或别名)匹配,则称为未解析属性。Spark SQL使用Catalyst规则和Catalog对象来解析这些属性,Catalog对象跟踪所有数据源中的表。它首先构建一个具有未绑定属性和数据类型的“未解析逻辑计划”树,然后应用执行以下操作的规则:
- 从目录中按名字查找关系。
- 将命名属性(如col)映射到给定操作符的子操作符的输入。
- 确定哪些属性引用相同的值,以给予它们一个唯一的ID(稍后允许优化表达式,例如
Col = Col
). - 通过表达式传播和强制类型:例如,我们无法知道的返回类型
1 + col
直到我们解析了col,并可能将其子表达式转换为兼容类型。
总的来说,分析器的规则是关于1000行代码.
逻辑优化
逻辑优化阶段对逻辑计划应用标准的基于规则的优化。(基于成本的优化通过使用规则生成多个计划,然后计算它们的成本来执行。)这些规则包括常量折叠、谓词下推、投影修剪、零传播、布尔表达式简化和其他规则。总的来说,我们发现为各种各样的情况添加规则非常简单。例如,当我们将固定精度的DECIMAL类型添加到Spark SQL中时,我们希望以较小的精度优化DECIMAL上的总和和平均值等聚合;它花了12行代码来编写一个规则,在SUM和AVG表达式中找到这样的小数,并将它们转换为未缩放的64位long,在此基础上进行聚合,然后将结果转换回来。下面是这个规则的简化版本,它只优化SUM表达式:
扩展规则[LogicalPlan] {/**长*/中十进制数字的最大个数val MAX_LONG_DIGITS=18def apply(计划:LogicalPlan): LogicalPlan={计划transformalleexpressions {情况下总和(e @ DecimalType。表达式(prec、规模)如果前的+10MakeDecimal (总和(UnscaledValue (e)),前的+10, scale)}}
作为另一个例子,12行规则将简单正则表达式优化为String。startsWith或字符串。包含调用。在规则中使用任意Scala代码的自由使得这类优化(超出了子树结构的模式匹配)易于表达。
总的来说,逻辑优化规则是800行代码.
物理规划
在物理计划阶段,Spark SQL采用一个逻辑计划,并使用与Spark执行引擎匹配的物理操作符生成一个或多个物理计划。然后它使用成本模型选择一个计划。目前,基于成本的优化只用于选择连接算法:对于已知的小关系,Spark SQL使用广播连接,使用Spark中可用的点对点广播功能。然而,该框架支持更广泛地使用基于成本的优化,因为可以使用规则递归地估计整个树的成本。因此,我们打算在未来实现更丰富的基于成本的优化。
物理规划器还执行基于规则的物理优化,例如流水线投影或过滤到一个Spark映射操作中。此外,它可以将操作从逻辑计划推入支持谓词或投影下推的数据源。我们将在后面的部分中描述这些数据源的API。
总的来说,物理规划规则是关于500行代码.
代码生成
查询优化的最后一个阶段涉及生成Java字节码以在每台机器上运行。因为Spark SQL经常在内存中运行数据集我们希望支持代码生成以加快执行速度。尽管如此,代码生成引擎的构建通常很复杂,基本上相当于一个编译器。Catalyst依赖于Scala语言的一个特殊特性,即准引号,以简化代码生成。准引号允许Scala语言中的抽象语法树(ast)的编程式构造,然后可以在运行时将其提供给Scala编译器以生成字节码。我们使用Catalyst将表示SQL表达式的树转换为用于Scala代码的AST,以计算该表达式,然后编译并运行生成的代码。
作为一个简单的例子,考虑第4.2节中介绍的Add、Attribute和Literal树节点,它们允许我们编写诸如(x + y) + 1
.如果没有代码生成,就必须为每一行数据解释这样的表达式,遍历由Add、Attribute和Literal节点组成的树。这会引入大量的分支和虚函数调用,从而降低执行速度。通过代码生成,我们可以编写一个函数来将特定的表达式树转换为Scala AST,如下所示:
def编译(节点:节点): AST=节点匹配{情况下文字(值)= >问“美元价值”情况下属性(名字)= >问“row.get(名字)美元”情况下添加(左,右)= >问"${compile(左)}+ ${compile(右)}"}
弦以问
是准引号,这意味着尽管它们看起来像字符串,但它们在编译时由Scala编译器解析,并表示其中代码的ast。准引号可以有变量或其他ast拼接在其中,用$
符号。例如,文字(1)
将成为Scala AST for 1,而属性(“x”)
就变成了row.get(“x”)
.最后,一棵树似的Add(文字(1)、属性(" x "))
变成Scala表达式的AST,例如1 + row.get(“x”)
.
准引号在编译时进行类型检查,以确保只替换适当的AST或字面量,使它们比字符串连接更有用,并且它们直接导致Scala AST,而不是在运行时运行Scala解析器。此外,它们是高度可组合的,因为每个节点的代码生成规则不需要知道其子节点返回的树是如何构造的。最后,生成的代码由Scala编译器进一步优化,以防Catalyst遗漏表达式级优化。下图显示了准引号可以让我们生成性能类似于手工调优程序的代码。
我们发现在代码生成中使用准引号非常简单,并且我们观察到即使是Spark SQL的新贡献者也可以快速地为新类型的表达式添加规则。准引号也可以很好地实现在本机Java对象上运行的目标:当从这些对象中访问字段时,我们可以通过代码生成对所需字段的直接访问,而不必将对象复制到Spark SQL Row中并使用Row的访问器方法。最后,对于还没有为其生成代码的表达式,将代码生成求值与解释求值相结合是很简单的,因为我们编译的Scala代码可以直接调用表达式解释器。
总的来说,Catalyst的代码生成器大约是700行代码.
这篇博文介绍了Spark SQL的Catalyst优化器的内部结构。它新颖、简单的设计使Spark社区能够快速地原型化、实现和扩展引擎。你可以阅读剩下的部分纸在这里.如果你参加今年的SIGMOD,请来参加我们的会议!
你也可以从下面找到更多关于Spark SQL的信息:
- Spark SQL和DataFrame编程指南来自Apache Spark
- Spark中的数据源APIYin Huai报告
- 为大规模数据科学在Spark中引入dataframe作者:Reynold Xin
- 超越SQL:用DataFrames加速Spark迈克尔·阿姆布鲁斯特