潜入三角洲湖

DML内部:删除,更新,合并

丹尼·李。Databricks的开发者倡导者
Denny Lee是Databricks的开发者倡导者。他是一名实干的分布式系统和数据科学工程师,在为内部部署和云环境开发互联网规模的基础设施、数据平台和预测分析系统方面拥有丰富的经验。bob体育客户端下载他还拥有俄勒冈健康与科学大学(Oregon Health and Sciences University)的生物医学信息学硕士学位,并为企业医疗保健客户构建和实现了强大的数据解决方案。
如来“TD”达斯。在Databricks担任软件工程师
Tathagata“TD”Das是Apache Spark提交者和PMC的成员。他是Spark Streaming背后的首席开发人员,目前正在开发Structured Streaming。此前,他是加州大学伯克利分校AMPLab的研究生,在那里他与Scott Shenker和Ion Stoica一起进行了关于数据中心框架和网络的研究。

系列的细节

本次会议是丹尼·李和三角洲湖团队“深入三角洲湖”系列的一部分。

会议摘要

在之前的Delta Lake Internals系列网络研讨会中,我们描述了Delta Lake事务日志的工作方式。在本节课中,我们将深入讨论在执行删除、更新和合并时,提交、快照隔离以及分区和文件如何更改。

在本次网络研讨会中,您将了解到:

  • 三角洲湖交易日志的快速入门
  • 理解运行DELETE、UPDATE和MERGE时的基本原理
  • 理解执行这些任务时执行的操作

你需要:
注册社区版在这里并使用研讨会报告材料和样本笔记本进行练习。

视频记录

这是潜入Delta Lake系列的第三场讲座我们将讨论Delta Lake中的删除,更新,合并,这些操作是如何运作的。我们会偷偷看一下它们是如何工作的以及如何处理它的性能。所以我自己TD,谢谢你Karen的介绍。简单介绍一下我自己。自2011年以来,我一直在Apache Spark的生态系统工作。那时,我还是加州大学伯克利分校AMPLab的一名研究生,在2012年,我和Madi Zaharia一起开始了Spark项目本身,我们开始研究Spark流,我们一起进行了Spark流。八、九年后的今天,我是Databricks的一名工程师,也是Structured Streaming和Delta Lake的核心开发者。

丹尼,你想说说你自己吗?-非常感谢。我不像TD那么聪明,但我是Databricks的一名开发人员。(笑)我从06年开始为Apache Spark工作,大约在2011年左右,我是Concur的前数据科学工程高级总监,我也曾是微软的一员,是的,我认为这些是这里的关键组成部分。很高兴来到这里,希望你们喜欢今天的课程。

——好的。

三角洲湖-我幻灯片介绍

所以,对于那些完全不熟悉这个系列的人来说,这是这个系列的第三次演讲。为了让大家快点上手,这里有一张幻灯片介绍了三角洲湖是什么。它本质上就像一个开源存储层,将ACID事务引入Spark工作负载。格式是开放的,你可以像创建拼花桌一样存储文件你可以创建Delta Lake表。

但与拼花表不同,在Spark中,元数据处理可能会遇到瓶颈,而Delta Lake的元数据处理比拼花表更具可伸缩性,您可以将整个表进行版本控制,因此您可以及时返回并查询表的早期版本。您将获得完整的模式强制和模式演化属性,这确保您可以在没有损坏的情况下维护更高质量的数据,并且您可以通过允许您查询表历史的gapi,通过事务保证来审计您在表历史中所做的所有操作。Delta Lake的基本工作原理是,它在一个名为Delta log的主题的表中维护一个事务日志它维护所有发生的操作的事务日志,它通过写入这些JSON文件来维护它,它描述了所有发生的操作,所有在每个操作中添加和删除的文件。因此,每个JSON文件本质上都像是表的新版本,这个日志维护在同一个目录中,所有其他数据文件也维护在同一个目录中。因此,数据和元数据的所有信息都位于一个位置,并且该日志在每次操作时都进行原子更新,因此,对于表上的任何操作,都可以获得完整的ACID事务保证。这就像是Delta Lake的两分钟版本。

本系列之前的网络研讨会

因此,如果您想了解更多关于这个事务日志如何工作的细节,等等,请查看本系列的第一部分,YouTube链接在这里。你订阅的YouTube频道里也有。如果你想了解更多关于模式强制的知识我把标题写错了BOB低频彩,关于模式信息模式验证是如何工作的,这是本系列的第二部分。因此,在第三部分中,我将讨论DML操作,更新,删除,合并,它们在底层是如何工作的,以及如何调优性能以获得最佳性能。我将以讨论一些我们观察到的Delta Lake用户使用过的常见设计模式来结束,以及它们如何在使用模式版本时帮助您的用例。

大纲

我们要讲的第一个操作是Update Update是简单的SQL Update。根据谓词更新表集X等于Y。

更新+时空旅行

我们提供的不仅仅是SQL支持,还有Scala和Python API来做这个更新操作。顺便说一句,SQL支持目前仅在数据库Delta Lake中可用,但我们正在努力将其引入开源的Delta Lake中,但它将在第3.0部分发布时推出,它对这些DML操作(如更新、删除、合并)具有实际更快的级别支持。bob下载地址所以,只要3.0部分一出来,我们就会有一个Delta Lake版本,对这些操作提供实际的SQL支持,在那之前,你总是可以使用Scala和Python来做这些操作。

它的工作原理是,Delta Lake维护这些文件,所以它只跟踪文件粒度的数据。

更新-在引擎盖下

假设你有版本12的表,其中有四个文件。现在,假设运行Update。它将在下面做的是,它将对这些数据进行两次扫描,以更新它们。首先,它将执行第一个can(音频模糊)来包含需要根据您提供的谓词更新的数据。那么,假设在这四个文件中,有两个文件有与谓词匹配的数据。在整个文件中,并不是文件中的所有行顺便说一下这些都是拼花文件,这就是Delta将数据存储为拼花文件的方式。并不是parquet文件中的所有行都能匹配数据,因此会有一些行与谓词匹配,一些行与谓词不匹配,你可以看到这里用绿色和红色标记。

现在,为了识别这些文件,它使用谓词,列统计,分区等等,分区修剪,所有Spark提供的东西,它使用这些来缩小它实际需要读取的文件,来实际发现文件是否包含匹配。一旦它找到这些文件,它就会选择它们并再次扫描,我们仍然会重新写入这些文件。所以这两个文件被重写为全新的文件。因为我们不能继续更新文件,所以拼花文件不是为原地更新而设计的。我们必须重写parquet文件中的所有数据作为新文件,其中匹配的文件中的数据实际上得到了更新。不匹配的数据被复制到这些新文件中。被替换的文件基本上是tombstoned的,这意味着在事务日志中,我们添加了这样的信息两个新文件被添加了,被替换的文件被删除了,标记为已删除,它们并没有从目录make中物理删除,这样我们就可以进行时间旅行,我们稍后会讲到,我们仍然可以查询这些文件作为表的前一个版本。这就是所发生的,让我把它发给丹尼他可以在笔记本上演示一下并给出一个更清晰的原理。轮到你了,丹尼,让我停止分享我的屏幕。

丹尼,我们听不见你说什么。-如果有一天我能解除沉默就好了。-是的,会有帮助的。-会有帮助的,很好,所以我想确保你们能清楚地看到我的屏幕,这个笔记本。

——是的。-太好了,太好了。这就是TD所说的。换句话说,我们创建了这个Delta表,我只是浏览和跳过,你会注意到它们实际上是底层文件系统的样子。所以我要现场播放,你可以看看。但基本上,你会注意到这里有一个下划线Delta log,还有这个parquet文件这就是它最初的样子,就像TD说的那样。我将跳过初始日志和元数据,我把它留在那里,这样从笔记本的角度来看,你们可以自己拥有它。现在,这是数据最初的样子,基本上你已经有了一个状态,如果我只是探索数据,这是它的样子。你注意到伊利诺伊州和华盛顿州的值是3.2。好,但现在我想运行一个更新,就像TD说的,让我们继续做这个。 In this case, I’m gonna be rather arbitrary here. And I’m just gonna say, hey, let me look at the number of loans that I wanna go ahead and update okay, in this case, 209. I’m gonna go ahead and in this case, run this Update statement where I’m saying for the state of Washington updates the paid amount to be equal to the funded amount okay. So in other words, I want those values to be updated, okay, just as TD was talking about. Alright, so with a Delta Lake, as TD not talked about, you can run an Update, but then what’s actually happening underneath the covers? Alright, well, let’s go look at the jobs real quick. Alright, so for example, if we have job 1838, let’s go look at that now. Sorry, alright.

好了,它要弹开了。

我们要。抱歉,我要向后滚动,这样你们能看得更清楚。这是我运行过的所有作业,job 1838在这里。我们要读取事务日志,我们要查看DAG可视化。

—进入SQL查询。-哦,对不起,是的,我本来想这么做的,但我忘了,所以我们开始吧。好的,很好,正如你所知,我想从统计数据中放大,我们读了一个文件。它就在这里,实际上在USpark UI中,我们读了一个文件,这里是我们做的所有信息,比如缓存,写大小,花在文件系统上的时间,所有这些不同的东西,但这里是行输出。抱歉,我试着把它关上。这就是一开始的情况。然后让我回到过去,我想是1839年吗?那里去。同样,这是一组可能会受到影响的行中的四行。所有的交换处理以及它实际上要对数据做的所有事情,现在,如果我去,只是简单地看看这个的历史,操作度量实际上就嵌入在这里,所以总行数是14705。 The updated rows, they were 340 rows, in fact they were actually updated because we actually updated all of Washington State, not just the ones that were the values weren’t separate, we updated all of them because of the way I wrote that particular statement. And so the number of rows that were copied was right here. 14365 plus 340 equals the 14705 that you see. So underneath the covers, that’s what we’re doing. We’re actually taking a copy of all the files into a new set of files, okay, and we’re going ahead and making the modification as you see here. So here’s what the result looks like, I just simply said Washington State’s No, and then when I look at it, see here’s the file right here that you see, okay. So this is the file that actually has the new set of data per the updates, that just happened, okay, cool.

TD,让我把它还给你,这样你就可以继续翻下一节了。——正确的。-太好了。

——好的。

更新

现在,您已经了解了如何通过更新数据来创建新版本,您仍然可以返回并查询旧版本,Delta Lake提供了这些方法来查询您在历史中看到的表的以前版本。所以这里如果我创建了版本13,通过更新版本12,你仍然可以使用这个叫做versionAsOf 12的选项查询版本12。

更新-提高性能

现在你可以很容易地比较两个版本的表看看行是如何更新的,如果有不正确的更新,你如何调试等等。这给了你很大的能力,可以回到过去,弄清楚发生了什么是否所有的更新都是正确的,或者如果你看到一些损坏的数据,你可以回到过去,检查哪个版本的数据损坏了,也许可以回滚,通过读取版本,用正确的旧版本再次覆盖表。所以,这又是一个非常强大的工具。现在我们来谈谈如何提高性能?

正如Denny向你们展示的数据上有两个扫描,现在有人问了一个问题关于这是不是两个完整的扫描?简单的答案是,第一个可能是一个完整的扫描,而第二个通常不是。这取决于你用什么谓词。例如,如果您的谓词要更新特定分区中的所有行,那么查找所有需要更新的文件的第一次扫描将只查询该分区,因此不需要进行完整扫描。然而,如果你的表没有分区,它是一个更小的表,因此没有分区,它可能无法根据你的谓词来缩小范围,要查看的文件在这种情况下,我们必须扫描整个表来找到满足谓词的匹配文件。但在那之后,这是第一次,但第二次扫描通常只扫描需要更新的文件。

要回答这个问题,不需要两次完整的扫描,完全取决于你的表格设置。但这里要记住的关键是,在Update子句中添加的谓词越多,Delta Lake就越容易缩小搜索空间,因此第一次扫描就会越快。这样就提高了Update的总运行时间。现在,Databricks Delta Lake平台有一些更多bob体育客户端下载的性能优化,这就是绿洲三角洲和Databricks三角洲湖之间的区别,我们在Databricks三角洲湖有一些更多的性能改进。有一些工具可以做更好的数据跳跃,特别是我们有一个叫做Z-order optimization的东西,它可以优化表格中数据的布局,以一种更有组织的方式,它本质上就像多列排序,但比那好得多。

使用空间填充曲线之类的东西,所以它本质上提高了在文件级别存储列统计的效率,然后还有Bloom过滤器的支持。我们不会详细讨论这些,但要记住的关键是,如果您已经了解了这些操作下面的操作,那么您就可以更容易地推断如何优化这些操作的性能,例如,如果您知道匹配项将出现在哪里,则可以通过添加更多谓词来缩小搜索空间。

与Update类似,还有Delete,其中基于谓词从表中删除的语法也很相似,显然Apache Spark 3.0将提供SQL,但在此之前,也有Scala和Python直接支持。

删除

但有趣的是,

删除+真空

正如Denny所示,更新创建了新文件,但它不替换旧文件,它只将其标记为墓碑,它在事务日志中标记为已删除,这样它就不会自动删除它,这样你就可以进行时间旅行,并再次查询那些旧数据,你将通过指定确切的版本号。但现在对于删除,设置完全相同,任何需要删除的数据都会重新写入新文件,但旧文件将默认保留,这样你就可以在删除之前返回并查询表的版本如果不小心删除了,你可以回滚等等。但在某些情况下,你确实希望数据完全,永久地从磁盘上删除,你不希望任何文件中有旧数据。因此,你必须运行这个额外的操作,叫做真空。这个操作提供了一个特定的Delta Lake,它的作用是,你可以说,你想保留多少个之前的版本或者之前版本的长度是多少。假设你想要保留最近一天写的所有版本,因为你知道你不想去查询任何一天之前的版本那么你会运行vacuum table retain 1小时。

默认情况下,如果你不指定保留时间为7天,这是我们在大多数情况下观察到的情况。

但是Delta会做的是,它会找出目录中存在的所有文件,这些文件是前一天保留的版本中不需要的。

因此,在过去24小时内保留的版本中,任何不需要的文件都将被删除。这包括任何类型的部分文件,损坏的文件,可能已经保留,但实际上没有提交到日志中,因为失败的权利和东西,所以真空会删除所有这些。

使用它,你可以控制你想保留多少以前的版本以及你可以删除什么,最终,你可以从所有的数据文件中剔除年龄数据这样,数据就会被删除。你甚至可以运行Vacuum retain 0 hours,这实际上只会保留表的最新版本如果你真的不关心历史,而且你真的想确保删除之后,所有的数据实际上已经被物理删除,你可以运行Vacuum retention 0,它只会保留最后的版本。但是,重要的是要记住真空零,不要在其他写入正在进行时运行真空零,因为您可能会删除现在正确保留的文件,所以要小心。不管怎样,接下来是归并,这可能是这三个操作中更有趣和最强大的。

合并

Merge有一个标准的SQL语法,它的思想是用源表中的数据合并到目标表中,基于匹配,如果我们匹配源行和目标行的键。如果匹配,则可以使用匹配的源数据更新(音频模糊)。如果没有匹配,也就是说,如果有一个源行与任何目标行都不匹配,那么您可以根据源插入一个新的目标表列,根据源行插入一个新的目标表行。

这是标准SQL语法,显然我们支持,但我们还支持更多扩展语法,这让事情变得更有趣。例如,我们支持附加的子句条件,例如,你已经基于这个条件进行了匹配,但你想在这些匹配之上有额外的条件,那么你可以在这里指定附加的子句条件,不只是匹配条件,而是在更新开始之前需要满足的子句条件。

Merge—扩展语法

类似地,当不匹配和附加子句条件时,则只插入发生的事情。这个有用的地方在于,你实际上可以有多个至少两个匹配的条件。

你也支持删除,而And is equal语法不支持,这里你可以说when matched And子句条件,然后更新,如果匹配,但子句不满足Delete。

这是非常强大的,我们会在后面的例子中展示,在表格中做很多复杂的改变,我们会给你们讲解一些用例这是非常非常有用的。另一个非常有用的特性是对自动展开目标列的星型支持。据我们所知,这是唯一支持这种语法的引擎,当我们介绍它时,它是一种非常流行的语法,它所做的基本上是对于一个有很多列的表,你不需要手写所有你需要更新的列。假设你有很多改变,需要你更新所有的列,如果它匹配,或者如果你想插入所有与源完全相同的列,手写所有的列通常是很乏味的,列的数量在数百个范围内。Update Star本质上是自动展开列来表示Update Set第1列等于source, target中的第1列和第2列等于source列,这是自动展开的,这使得表达和管理这些点更容易。所以我们也支持编程api, Scala和Python。

合并-编程api

再说一次,我们所知道的为数不多的系统中,有一个实际上有编程api来做这类SQL模块操作。我认为,这取决于你是什么样的用户,你是主要的SQL用户还是编程软件工程师,数据工程师之类的人,我更喜欢类型安全,这非常非常有用。因此,在底层,所发生的事情本质上与Update完全相同,数据有两部分,一部分查找具有匹配的文件列表,因此需要更新,第二部分通过将它们重新写入新文件来更新这些文件。

合并-在引擎盖下

但有趣的是,不同于Update和Delete,在这两种情况下,你必须在源和目标之间做一个连接来找到匹配。第一种是在目标和源之间的内部连接中,找到匹配。第二个是目标文件和源文件之间的外部连接,用于对这些文件上的数据执行Update、Delete、Insert操作,甚至有些数据可能被复制,有些数据可能被更新、删除、插入等等。再一次,Denny会更有效地演示,然后交给你Denny。

-谢谢你,先生,这一次,我真的费了很大的劲记住了如何解除自己的静音,所以这通常会有帮助。很好,这就是TD所说的,不要担心,我已经让你们在问答环节提问了,并不是说我们不愿意回答这些问题,只是我们很可能会在会议结束后回答其中的一些问题,一旦我们做了常规的演示部分。好了,这里只是一个快速的GIF动画插入更新削减进程,当你运行一个合并,对,所以通常包括下面,基本上我们所做的就像TD喊你确定插入新行,确定需要更换的行,即更新,确定不会影响的行插入或更新,创建新的临时表,删除原始表,将临时表重命名和删除临时表。现在,如果你在常规的拼花程序下写,对吧,这是非常低效的,而且需要你实际完成所有的写作。我很懒,所以我不想写那么多,这就是为什么TD给我们展示了很棒的匹配语法它极大地简化了这个过程。换句话说,当我这样做时,我可以简单地写一个匹配的东西。让我们举个简单的例子。我要继续看一些数据。举个例子,如果我只看纽约,贷款Id小于30,这是我这里的三行,就可能受到的影响而言,我的贷款Id是11 21 28,每个都有大约1000到1200美元的资金,这是不同的支付金额,州是纽约。现在,让我们创建一个要运行Merge的新表。 Okay, remember the first table is the source table, it’s of the 14705, three rows of the 14705 that potentially can be impacted. Now, here’s this new table that we’re creating, which is listed here, where basically for loan Id 11, for $1,000, we’re gonna go ahead and pay it off, right, this first row that you see here, okay. So that’s why the funded amount is 1000, and the paid amount is 1000, okay. Alright, so then, we’re also gonna add a new loan, okay, and we’re also gonna for the fun of it also add a duplicate loan, okay. In other words, by accident, the source system screwed up, so there was a duplication, alright. So again, as opposed to going ahead and running all of those other statements where I’m gonna do an insertion specifically for loan Id 12, I’m gonna de-duplicate the data myself for 28 and I’m gonna go ahead and up run an update specifically for loan 11, I’m just gonna run a single merge statement, and it’s really simple right here, actually, it’s right here, okay. And so I’m using the Python syntax because unlike TD, who loves type safety, I’m lazy so I don’t like doing it, no, I’m joking. I just happen to like the Python language, that’s all even though the tabs irritate me a lot. Nevertheless, I’m gonna simply do a Merge where basically, it’s where the source table s.loan Id is equal to the target table t.loan Id, okay. Where you basically when it match, you update them all when not match you insert them all, and then implied inside here also is the duplication as well, okay. So same idea, we have a bunch of stages, okay. Now, there’s a couple of one question in terms of that I was asked, which I’m gonna answer right now, which is about the execute plan query cool tool costs, okay. That’s actually what the spark UI is for, right. It actually provides you a lot of that context, I’m gonna look at 1844, using the SQL tab, just as TD had called out, so I’m gonna go look at this, right. When I look here, the query plan is actually before I even show you the DAG here, let me do a quick call log, the logical plan, it’s actually listed right here, so you can actually understand logically what Sparks doing underneath the covers, so you can actually go ahead and see similar to when you’re working with a relational database, you have a logical plan, same concept, all that’s actually placed right here. Now, obviously, some people are gonna turn on us and say, I’d love to understand how it works graphically. And then that goes back up to here here, right, which basically, the timing is here, seven milliseconds for the WholeStageCodegen here. One second for this particular WholeStageCodegen in order for it to do its various processes, okay. So a lot of your query information is actually all in that Spark UI. So you can figure out how to improve the performance by understanding what’s going underneath the covers. So for example, this one implied right away, actually, you know what, I wanna go look at a different one right away, give me… – This is actually (mumble). – Exactly, I wanna talk about the other one first, so I think it’s this one.

-是的,45号,我想应该是那辆。-好了,完美的1845年。这是SortMergeJoin,换句话说,这是两个文件,举个例子,当你打开这个,它马上告诉你,读取一个文件需要17毫秒。好的,这个文件包含了我们正在处理的14705行数据。实际上,它在这里告诉你,它告诉你缓存,文件系统读取和所有这些统计数据,这实际上帮助你理解为什么查询性能是这样的。但它会让你马上知道。所以正如TD解释,文件或分区的数量,对的,你能告诉如果这是运行在几秒或几分钟,对你有一个很长时间或许分区需要发挥作用,因为你需要继续减少文件会需要阅读,因为为了论点,你只是想处理一个国家或一个日期数据,而不是使用这三年的数据作为一个简单的例子,好吧。好了,我把它合上。好了。然后对比这个Scan ExistingRDD。 That was the three rows that we created, remember, we had created this loan updates table? Well, this is what the Scan our ExistingRDD is, again, this is a small idea, but here’s the projection you can see it right here. The loan Id the funded amount, the paid amount, this is the RDD that we explained, right. Then exactly what TD was talking about in terms of the joins that happen, here’s that SortMergeJoin, that actually had to kick off. So first, there was a Sort right, of this information and then now, we’re gonna do SortMergeJoin between the three rows that we originally had from the table that we generated, that we created, versus the 14, similar five rows that came from the parquet file, the single file that we had, in this case, that we would now perform the Join. And then all of the statements oh, sorry, the projections here, right here, right, in terms of how it figures out the logic, that’s actually all shoved inside here, okay. And so, the idea is underneath the covers, then you can tell right away what’s actually happening to the data, okay. In other words, it’s grabbing one file, it’s grabbing the three rows, it’s gonna do a SortMergeJoin, that’s what this example tells us, versus the previous step, 1844, I believe, yes. Right, this is actually doing a broadcast exchange, right, in terms of here’s the amount of data that’s being pushed over, right, that tells you the data size itself actually helps you understand, okay if you’ve got a ton of data that’s being broadcast or exchanged over and over again for this broadcast hash join that’s happening right here, yeah, this tells you that you’re probably transferring or shuffling or moving too much data across the wire, right. So again, can you show shuffle, can you filter can you partition to reduce the sizes that you are working with, okay. And then, ultimately, back to this again, like for example, the output of this is, when I look in New York, there is the loan Id 11, which is the basically the one where we updated it, right, here’s loan Id 12, which basically is the brand new one that we just added, and here’s the other loans that basically were unaffected, right. In other words, we actually put a duplicate 28, but that duplicate never actually entered in there because the merge statement automatically had de-duplicate the data, okay. And then same idea when we looked at the history, right, I can look at the metrics, right. The operation metrics are right within the history, so you know how many rows were copied, in this case of the 1475, 1472, were actually copied, okay, we only looked at one single file in order to make sense of it. The target rows that were updated, three of them were actually updated. And based on the source rows of three, oh, sorry. There we go. Okay, and the number of bytes, so all that information is basically packed inside basically the operation metrics that you see within the history table, and within the spark UI, the SQL tab to basically make sense from the UI perspective, cool. Hey, TD, anything else you think I should be adding or we’re good to go here? – Yeah, I think we are good to go. – Okay, perfect, alright, well, I will stop sharing now. – Awesome.

好吧,让我们谈谈如何提高性能。

合并——提高性能

(低声说话)让我们谈谈如何提高业绩。因此,在开始使用更新时,重要的是要理解在封面下面发生了什么,正如Denny刚刚给你们展示的,有内部连接和外部连接。因此,您必须真正理解内部连接和外部连接之间的瓶颈是什么。如果内部连接(即找到真正正确的文件)是查询中花费时间最多的瓶颈,那么您可以使用某些优化技术。如果是另一个,那么你就有了其他的优化技术。如果内部连接很慢,你需要花很多时间来查找要更新的文件,然后,使用标准的技术来获取更多的谓词来缩小搜索空间,你显然可以调整shuffle分区这将是标准的Spark优化技术,有很多shuffle分区来控制需要为连接执行的shuffle的并行性。你可以调整广播连接,我们得到更多的广播,允许更大的数据被广播,如果源太小,不足以容纳在单个机器内存中。如果它们有时会变慢,因为表中有太多小文件。比如,如果你有一百万个千字节大小的文件,那么读取每个文件的开销要比你压缩Delta表的开销要高得多只有关于如何重写Delta表的布局来压缩它们的文档。但话说回来,您不应该创建像10gb文件这样的超大文件,因为请记住,我们只重写粒度大小的文件,您创建的大文件可能需要重写更多不必要的数据。 So if there is only one row that needs to be updated in a file, it’s cheaper to re-write an entire 100MB file rather than a 10gb file for that one row update. So, you have to tune that, based on your workload requirements. And then India recently has few performance optimizations. The Z-order optimize that I mentioned earlier, helps to sort the data in a certain in smart ways, which allows you to exploit the locality of updates that if your changes are going to be in a certain range of values with only for a particular column, then you can Z-order optimize by that column to get better locality so that less number of files are touched and needs to be updated. So but on the other hand, if you’re outer join, the second scan it is actually re-writing the file to slow then there are different set of techniques. Well, there are common ones, like Adjust shuffle partitions that again control the parallelism, but sometimes what happened is that if you parallelize too much, you can generate too many small files, especially with partition tables, you can generate too many small files for that though the solution, the knob we have provided is that you can actually reduce the number of files by enabling automatic repartition of the data based on the partition column before the write. Now, this is available as Optimized Writes in Databricks Delta Lake. But in the next upcoming release of 0.6.0, which we will release tomorrow or day after tomorrow, well, tomorrow actually.

0.6.0还支持在写入之前在Merge内部自动重新划分数据。如果它是一个完整的外部连接,你看到的外部连接Spark不能做任何形式的广播连接,但不支持外部连接的广播阈值。但从Delta Lake 0.6.0开始,它可能是一个右外部连接,在这种情况下,Spark可以做广播连接,然后如果你观察到在Spark中,你正在做一个右外部连接,我们正在看逻辑和计划,正如Denny在Spark UI中展示的那样,你也可以通过编程获得它。

然后,您可以调整广播阈值,使其广播的数据量大于spark的默认值。它还有助于源表缓存dataframe,因为你要做两个传递源和目标,特别是在整个源,它有助于源表缓存或dataframe,它可以加快第二扫描,但重要的是要记住,不缓存目标三角洲表,因为缓存三角洲表可以导致奇怪的缓存一致性问题,因为如果δ表更新,缓存不,这一切可能会导致各种各样的混乱。一般来说,如果你要更新目标表,不要缓存它。不管怎样,我们的时间有点短,所以我将很快地把常见的设计模式讲完。所以常见的设计模式,Denny在他的模型例子中也展示了一点,就是ETL过程中的重复数据删除,在ETL管道中可以生成重复的数据,你不希望在最终的Delta表中出现重复的数据。

模式1:ETL期间重复数据删除

如果你写一个merge查询,它只在唯一Id时插入你可以重复数据删除,只在不匹配时插入,这意味着你要插入的新行不在表中,根据唯一Id判断,然后插入。这是扩展合并语法的另一个例子你可能根本没有指定when matched子句,你只能指定when not match,然后插入,这是一种重复数据删除。现在你可以进一步优化它…

那么,您将面临的问题是,它可能每次都扫描整个表,以确定惟一Id是否存在。你可以进一步优化这个,如果你知道你的重复数据只会在一个特定的时间段内出现比如如果你只有过去7天内的重复数据而不是更早的,那么你可以指定,注入那个7天的约束作为匹配条件的一部分,因此强制合并只搜索最近7天的数据(听不清)这就是你可以做的优化。

模式2流聚合

另一种常见模式是使用结构化流。你可以很容易地使用结构化流计算聚合。我之前在Spark峰会上有过演讲,仍然是基于结构流的,如果你可以看看,我们想了解更多关于结构流的知识。BOB低频彩但从结构流中得到的本质上是键值聚合,这本质上是真正观察的主要候选者,你可以很容易地做到这一点。结构化流有一个叫做foreachBatch的操作,对于每一批生成的、更新的键值聚合,你可以通过在foreachBatch中调用这个合并操作将数据放到Delta Lake中。

更多的细节在我们的在线文档中。

模式3:GDPR -简单的方法

另一个常见的模式是GDPR,通过Delete和Vacuum很容易做到。从信息表中删除用户,然后再返回到表中,以确保从文件中实际删除了该用户。但是,如果您希望维护用户的整个历史,而不是依赖于表的历史,那么还有更好的方法,您可以显式地存储用户的历史,方法是将用户的所有以前的记录保存在表的最新版本中,使用这种操作进行SCD Type 2操作。

模式3.1:所有用户的GDPR

再说一次,细节和例子都在我们的在线文档中。不是我们要做的。

模式4使用删除应用更改数据

另一个非常常见的模式是变更数据捕获。人们经常希望从OLTP数据库中获取变更数据,并希望在不影响OLTP数据库的情况下对它们执行OLAP操作。他们想把OLTP数据库中的改变应用到Delta Lake格式的表中这对OLAP来说很好。对于这个,合并操作,支持删除,插入,更新,语法都是一样的。因此,根据更改的顺序,您可以很容易地将这些更改从OLTP应用到OLAP,这是一种非常常见的模式。总之,在结束我的演讲之前,让我们谈谈我们几乎每个季度都在积极发布新产品的社区和生态系统。

三角洲湖泊释放

我在连接器生态系统以及非常流行的操作(如Merge等)的性能方面做出了重大改进。

我们将在接下来的几天发布一些东西。在0.7.0中,我们将发布一个Apache Spark 3.0支持,它带来了一个完整的列表,比如SQL DDL支持,SQL DML支持,支持在Hive metastore中定义表等等。这种连接器生态系统正在不断发展。

Delta Lake连接器

有关于如何使用Delta Lake与Amazon Redshift, Athena, presto, snowflake和Hive刚刚发布了几周的在线文档,发布了一个Hive连接器,您可以从高处直接从Hive本地查询Delta表。看看在线文档,我们有一个越来越多的生态系统合作伙伴,他们承诺在他们的平台内支持Delta Lake,还有一个非常庞大的Delta用户列表,并且在快速增长。bob体育客户端下载

Delta Lake合bob体育外网下载作伙伴和供应商

非常感谢。

三角洲湖的使用者

我们现在应该回答问题。-是的,当然,我们还有大约5到8分钟的时间,让我们直接进入问题,我会问你的。好的,完美的。第一个问题,吸尘能提高性能吗?-其实我是在打字回答,但我现在更愿意口头回答-没错,我觉得口头回答在这种情况下更有意义-在大多数情况下,它不会,但在某些情况下,当您使用Delta Lake查询表时,由于事务日志,它不需要列出目录来找出要读取的文件。因此,从数据湖的角度来看,目录中是否有太多文件用于大量版本并不重要。但在某些情况下,我们观察到,在非常非常长的历史中,会有数百万,数千万的文件,存储系统本身经常开始以一种奇怪的方式运行,因为它会减慢所有文件系统的操作,即使是云存储文件系统,比如S3,也是如此因为你的文件系统在bucket中有大量的文件,或者在容器中有大量的文件。这超出了Delta Lake或任何处理引擎的能力,所有文件系统操作都会变慢。所以保存大量的历史记录是不可取的那需要你保存数千万个文件,这会减慢一切,这超出了我们或任何人的控制。

-完美,然后相关,我们会再讨论一下文件系统因为这方面似乎有很多问题。处理了更新和删除之后的文件分布呢,我想问题在于我们添加了新文件就像我们在更新和删除过程中所说的我们创建了墓碑,最终都能说英语。那么最终的分配呢?-所以,是的,可能发生的是,当你重写这些文件时,你可能会得到一个小文件问题,可能导致碎片。但是在这里,您可以使用这些操作用事务保证重写Delta Lake的布局。

在印度,如果是Delta,它是开源的优化操作,你有这个操作,你有这个选项叫做。bob下载地址

使用dataframe权限基本上没有数据更改权限,通过这种权限,你可以说我将通过读取所有数据来重写Delta表的这个分区,然后用更少的文件重新写回来。本质上,就Delta表操作而言,只是将重新分区的文件读入一个数据帧,并将其写回该分区。此外,还支持一个名为Data change的选项,将其设置为false,这意味着它将告诉事务日志没有数据被更改,只有数据被重新排列,这实际上有助于写入操作与任何其他实际数据更改操作不冲突。所有这些都记录在我们的在线文档中,看看吧。-很好,只是为了补充TD的观点,在打开交易日志部分时,我们实际上也深入了一点。让我们快进到我们讨论文件压缩的部分,以及与这次科技讲座相关的笔记本电脑本身。实际上,我们不仅会深入研究它,还会向你们展示就像我们在这里做的一样,我们会在演示中向你们展示它是如何工作的。好了,我想这就回答了那些关于文件大小和其他有趣的东西的问题。我有更有趣的一个在这里,嘿,spark端口的Java, Python和Scala语言,哪一个提供了最好的性能,为什么?-啊,好吧,就Delta Lake操作而言,比如读取和写入,Scala、Java和Python之间绝对没有区别。 In terms of processing, you want to do an addition on top of that, once you have them as dataframes and stuff, that is where there may be slight performance difference between Python and Java. It all depends on let’s say, if you’re doing built in functions, like explode and stuff, then there is absolutely no difference because everything gets boiled down on the code generated on the Java side, even if you’re writing files. But if you’re doing UDFs, that’s where the difference may arise because Python UDF versus Pandas UDF versus Scala, Java UDF, those differences can arise in terms of UDFs, but that is purely on the Spark side of things, whatever place you wanna do in Spark that is independent of Delta. – Perfect, okay.

现在我要转换一下话题,我们已经讨论过语言了。哦,实际上,不,我想留在语言中,就为了这个简短的回答。

我们如何在SQL查询中使用与Scala和Python语法相反的表名,您已经在spark 3.0中暗示了这一点,但我认为它值得我们继续前进,并再次完全放弃。-是的,我可以更详细地解释。因此,Apache Spark,在2.4行,或3.0之前的2X10中,不允许任何数据源,如Delta,真正地自定义写入metastore的内容。对于Delta来说,我们真正需要的是自定义,因为与拼字表或CSV表等不同,Delta不依赖于保持元数据(音频模糊)亚metastore,而是将所有元数据保存在事务日志中。这意味着当您在Delta表上查询或计划查询时,它需要一定程度的自定义,以便它可以指定我不希望所有元数据存储在Hive metastore中,而是希望在事务日志中使用元数据。因为在Spark 3.0之前没有API来进行定制,这就是为什么在Spark 2.4上工作的Oasis Delta Lake不支持Hive亚metastore定义的表,因为我们不能进行定制。但火花3.0中,我们添加了所有必要的API通过与火花社区密切合作,我们添加了所有的API,所以三角洲湖数据源可以定制,因此从蜂巢metastore这样表名称,它可以将其映射到的位置从蜂巢metastore忽略所有其他元数据,刚读的位置,继续阅读其他元数据从事务日志的位置,然后在此基础上计划查询。这个自定义将与Spark 3.0一起出现,时间框架可能是我们只能猜测的,因为Apache进程对于这样一个重要的版本来说有点不可预测。它很可能会在6月到7月之间登陆,Apache 3.0一登陆我们就会发布Delta Lake, Delta很可能是0.7.0。 release on top of that. – Excellent, okay, I think we only had time for probably one more question, so I’m just gonna just leave it at that. The question is how does the connectors with Presto basically work when it comes to trying to read Delta Lake, right? Does it actually read the Delta log itself to figure things out? How does Presto Connect basically? – Very very good question. So there are two part answer to that. One, so the Presto support we have added in open source does not read the log, it rather reads this thing called Manifest files, again, details are present in online docs. What manifest files are, is that it’s basically a bunch of text file that contains the names of the data files, the parquet data files that someone needs to read to give a full snapshot of the Delta table. So, the Delta Lake API’s provide the command for generating these manifest files on Delta table, which can then be read by Presto to figure out what are the actual parquet files that needs to be read to put in the Delta table and then according to query. So Presto has inbuilt support for this manifest file. So it doesn’t read the log.

Databricks和Starburst密切合作,建立了Delta Lake本地连接器,可以读取日志,我认为他们只是在自己的企业平台上重用了它。bob体育客户端下载这实际上是直接读取日志绕过Starburst中的清单文件系统直接读取日志以找出要读取的数据。希望在某个时候它们会打开(听不清)-太好了,嘿,非常感谢,