跳到主要内容
工程的博客

使用Python api对Delta Lake表进行简单、可靠的upsert和delete

Delta Lake 0.4.0包括Python api和Parquet到Delta Lake表的就地转换
通过如来佛Das而且丹尼李

10月3日 工程的博客

分享这篇文章

试试这个Jupyter笔记本

我们很高兴地宣布三角洲湖0.4.0引入了Python api来操作和管理Delta表中的数据。这个版本的主要特性是:

  • 用于DML和实用程序操作的Python api(# 89你现在可以使用Python api来更新/删除/合并Delta Lake表中的数据,并对它们运行实用程序操作(即,真空,历史)。这对于在Python中构建复杂的工作负载非常有用,例如,慢变维(SCD)操作,合并更改数据对于复制,和来自流查询的Upserts.看到文档欲知详情。
  • Convert-to-Delta(# 78) -您现在可以在不重写任何数据的情况下将Parquet表转换为Delta Lake表。这对于转换非常大的Parquet表非常有用,重写为Delta表的成本很高。此外,这个过程是可逆的-您可以将Parquet表转换为Delta Lake表,对其进行操作(例如,删除或合并),并轻松地将其转换回Parquet表。看到文档欲知详情。
  • 实用程序操作的SQL-您现在可以使用SQL来运行实用程序操作真空和历史。看到文档有关如何配置Spark以执行这些特定于delta的SQL命令的详细信息。

有关更多资料,请参阅Delta Lake 0.4.0发布说明而且三角洲湖泊文件>表删除、更新和合并

https://www.youtube.com/watch?v=R4f6SKOetB4

在本博客中,我们将演示Apache Spark™2.4.3如何在Delta Lake 0.4.0中使用Python和新的Python api来实现准时飞行性能。我们将展示如何插入和删除数据,使用时间旅行查询旧版本的数据,以及如何清理旧版本的数据。

如何开始使用三角洲湖

Delta Lake包与——包选择。在我们的示例中,我们还将演示在Apache Spark中VACUUM文件和执行Delta Lake SQL命令的能力。由于这是一个简短的演示,我们还将启用以下配置:

  • spark.databricks.delta.retentionDurationCheck.enabled = false使我们真空文件短于默认保留期限7天。注意,这只需要SQL命令VACUUM。
  • spark.sql.extensions = io.delta.sql.DeltaSparkSessionExtension在Apache Spark中启用Delta Lake SQL命令;对于Python或Scala API调用,这不是必需的。
#使用Spark包./箱子. /pyspark——package io.delta:delta-core_20.4.0——设计“spark.databricks.delta.retentionDurationCheck.enabled = false”——设计“spark.sql.extensions = io.delta.sql.DeltaSparkSessionExtension”

加载并保存Delta Lake数据

本场景将使用RITA BTS航班起飞统计数据生成的准点率航班性能或起飞延误数据集;该数据的一些实际示例包括基于d3.js交叉滤波器的2014年航班离港性能而且使用GraphFrames for Apache Spark™准时飞行性能.这个数据集可以从这里本地下载github的位置.在pyspark,首先读取数据集。

#位置变量tripdelaysFilePath =“/根/数据/ departuredelays.csv”pathToEventsTable =“/根/ deltalake / departureDelays.delta”#读取航班延误数据departuredelayed = spark。读\.option (“头”“真正的”) \.option (“inferSchema”“真正的”) \. csv (tripdelaysFilePath)

接下来,让我们保存departureDelays数据集到Delta Lake表。通过将该表保存到Delta Lake存储,我们将能够利用它的特性,包括ACID事务、统一批处理和流处理以及时间旅行。

将航班延误数据保存为Delta Lake格式departureDelays \.write \格式(“δ”) \.mode (“覆盖”) \.save (“departureDelays.delta”

注意,这种方法类似于通常保存Parquet数据的方式;而不是指定格式(“铺”),您现在将指定格式(“δ”).如果查看底层文件系统,您将注意到为departureDelays三角洲湖表。

/ departuredelay .delta$ ls -l.._delta_log部分- 00000 df6f69ea e6aa - 424 b - bc0e f3674c4f1906 c000.snappy.parquet- 00001 - 711 -一部分bcce3 fe9e - 466 e - a22c - 8256 f8b54930 c000.snappy.parquet- 00002 - 778 -一部分ba97d - 89 - b8 - 4942 a495 - 5 - f6238830b68 c000.snappy.parquet部分- 00003 - 1 - a791c4a - 6 - f11 - 49 - a8 - 8837 - 8093 - a3220581 c000.snappy.parquet
注意,_delta_log是包含Delta Lake事务日志的文件夹。有关更多信息,请参阅潜入Delta Lake:解包事务日志。

现在,让我们重新加载数据,但这一次我们的DataFrame将由Delta Lake支持。

以Delta Lake格式加载航班延误数据Delays_delta = spark \.read \格式(“δ”) \.load (“departureDelays.delta”#创建临时视图delays_delta.createOrReplaceTempView (“delays_delta”西雅图和旧金山之间有多少航班spark.sql ("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'"),告诉()

最后,让我们确定从西雅图到旧金山的航班数量;在这个数据集中,有1698个航班。

就地转换至三角洲湖

如果您有现有的Parquet表,则可以将表就地转换为Delta Lake,因此不需要重写表。如果需要转换表,可以执行以下命令。

三角洲。表格s import转换无隔断拼花表格路径“/道路/ /表”deltaTableDeltaTable。convertToDelta(火花,“parquet. /道路/ /表”)转换分区的拼花表格路径“/道路/ /表”而且分区通过整数命名“部分”partitionedDeltaTableDeltaTable。convertToDelta(火花,“拼花。' /path/to/table ' ", "part int")

有关更多信息,包括如何在Scala和SQL中进行这种转换,请参阅转换为三角洲湖

删除我们的飞行数据

要从传统的数据湖表中删除数据,您需要:

  1. 从表中选择所有数据,不包括要删除的行
  2. 根据前面的查询创建一个新表
  3. 删除原表
  4. 将新表重命名为下游依赖项的原始表名。

使用Delta Lake,我们可以通过运行DELETE语句来简化这个过程,而不是执行所有这些步骤。为了显示这一点,让我们删除所有早到或准时到达的航班。延迟)。

/ departure时延。delta$ ls -l _delta_log part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy. snappy. snappy. snappy. snappy. snappy. snappy. snappy. snappy。拼花一部分- 00000 df6f69ea e6aa - 424 b - bc0e f3674c4f1906 c000.snappy。拼花一部分- 00001 - 711 - bcce3 fe9e - 466 e - a22c - 8256 f8b54930 c000.snappy。拼花一部分- 00001 - a0423a18 - 62 eb - 46 - b3 - a82f ca9aac1f1e93 c000.snappy。拼花一部分- 00002 - 778 - ba97d - 89 - b8 - 4942 a495 - 5 - f6238830b68 c000.snappy。拼花一部分- 00002 - bfaa0a2a - 0 - a31 4 -沛富aa63 - 162402 - f802cc c000.snappy。拼花一部分- 00003 - 1 - a791c4a - 6 - f11 - 49 - a8 - 8837 - 8093 - a3220581 c000.snappy。拼花一部分- 00003 - b0247e1d f5ce - 4 - b45 - 91 - cd - 16413 - c784a66 c000.snappy.parquet

在传统的数据湖中,删除通过重写除要删除的值之外的整个表来执行。有了三角洲湖,删除而是通过有选择地写入新版本的文件来执行,其中包含的数据只能被删除标志着之前的文件已被删除。这是因为Delta Lake使用多版本并发控制在表上执行原子操作:例如,当一个用户正在删除数据时,另一个用户可能正在查询表的以前版本。这种多版本的模型也使我们能够穿越回过去(即回到过去)。时间旅行)和查询以前的版本,我们将在后面看到。

更新航班资料

要更新传统数据湖表中的数据,您需要:

  1. 从表中选择所有数据,不包括要修改的行
  2. 修改需要更新/更改的行
  3. 合并这两个表以创建一个新表
  4. 删除原表
  5. 将新表重命名为下游依赖项的原始表名。

使用Delta Lake,我们可以通过运行UPDATE语句来简化这个过程,而不是执行所有这些步骤。为了说明这一点,让我们更新从底特律到西雅图的所有航班。

#更新所有航班始发底特律现在开始西雅图deltaTable。更新("origin = 'DTW'", { "origin": "'SEA'" } )#多少次航班之间的西雅图而且旧金山spark.sql ("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").显示()

底特律航班现在被标记为西雅图航班,我们现在有986个航班从西雅图飞往旧金山。如果要列出文件系统departureDelays文件夹(即。美元. ./ departureDelays / ls - l),您将注意到现在有11个文件(而不是删除文件后的8个文件和创建表后的4个文件)。

合并我们的飞行数据

在使用数据湖时,一个常见的场景是不断地将数据追加到表中。这通常会导致重复的数据(不希望再次插入到表中的行)、需要插入的新行以及需要更新的一些行。对于Delta Lake,所有这些都可以通过使用合并操作(类似于SQL merge语句)来实现。

让我们从一个示例数据集开始,您希望使用以下查询更新、插入或重复数据删除该数据集。

#哪些航班之间的而且旧金山这些日期spark.sql ("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").显示()

此查询的输出如下表所示。注意,这个博客添加了颜色编码,以清楚地识别哪些行被重复数据删除(蓝色)、更新(黄色)和插入(绿色)。

接下来,让我们生成我们自己的merge_table其中包含我们将使用以下代码段插入、更新或删除重复的数据。

物品= [(101071031590“海”“旧金山”), (101052110590“海”“旧金山”), (101082231590“海”“旧金山”)]Cols = [“日期”“延迟”“距离”“起源”“目的地”Merge_table = spark。createDataFrame(项目、峡路)merge_table.toPandas ()

在上面的表(merge_table)中,有三行具有唯一的日期值:

  1. 1010521:这一行需要更新航班带有新延迟值的表(黄色)
  2. 1010710:表示该行为a行重复的(蓝色)
  3. 1010822:这是一个新行插入(绿色)

有了三角洲湖,这可以很容易地通过一个合并语句,如下面的代码片段所示。

合并merge_table航班deltaTable.alias \(“飞行”)合并(merge_table.alias(“更新”)、“航班。Date = updates.date") \.whenMatchedUpdate ({"delay": "updates.delay"}) \.whenNotMatchedInsertAll () \执行()#哪些航班之间的而且旧金山这些日期spark.sql ("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").显示()

重复删除、更新和插入这三个操作都可以用一条语句有效地完成。

查看表历史

如前所述,在我们的每个事务(删除、更新)之后,在文件系统中创建了更多的文件。这是因为对于每个事务,都有不同版本的Delta Lake表。可以通过使用DeltaTable.history ()方法,如下所示。

deltaTable.history()。显示()+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+|版本|时间戳|用户标识|用户名|操作|operationParameters|工作|笔记本|clusterId|readVersion|isolationLevel|isBlindAppend|+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+|2|2019-09年-29年154122|||更新|(谓语->(或…||||1||||1|2019-09年-29年154045|||删除|(谓语->["(…|零|零|零|0|零|假|| 0|2019-09-29 15:40:14| null| null| WRITE|[mode -> overwrite…]|null| null| null| null| null| false|+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
注意,您也可以使用SQL: spark执行相同的任务。sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()

可以看到,对于每个操作(创建表、删除表和更新表),有三行表示表的不同版本(下面是一个简化版本,以帮助更容易阅读):

版本 时间戳 操作 operationParameters
2 2019-09-29 15:41:22 更新 [predicate ->(或…
1 2019-09-29 15:40:45 删除 [predicate ->["(…
0 2019-09-29 15:40:14 [mode -> overwrite…]

与表历史一起回到过去

使用Time Travel,您可以查看Delta Lake表的版本或时间戳。有关更多信息,请参阅Delta Lake文档>使用时间旅行读取旧版本的数据.如果要查看历史数据,请指定版本时间戳选择;在下面的代码片段中,我们将指定版本选项

#加载数据框架每一个版本dfv0spark.read.format(“δ”).option(“versionAsOf”,0) .load(“departureDelays.delta”)dfv1spark.read.format(“δ”).option(“versionAsOf”,1) .load(“departureDelays.delta”)dfv2spark.read.format(“δ”).option(“versionAsOf”,2) .load(“departureDelays.delta”)#计算SEASFO航班计数每一个版本历史cnt0dfv0。where("origin = 'SEA'")。在哪里("destination = 'SFO'")。()cnt1dfv1。在哪里("origin = 'SEA'").在哪里("destination = 'SFO'")。()cnt2dfv2。在哪里("origin = 'SEA'").在哪里("destination = 'SFO'")。()#打印价值print("SEA -> SFO计数:创建表:%s,删除:%s,更新:%s"(cnt0, cnt1, cnt2))
              # #输出->SFO计数:创建表格1698删除837更新:986

无论是治理、风险管理和遵从性(GRC)还是回滚错误,Delta Lake表都包含元数据(例如,记录这些操作符发生删除的事实)和数据(例如,实际删除的行)。但是,我们如何出于遵从性或大小的原因删除数据文件呢?

用吸尘器清理旧桌子

Delta Lake vacuum方法将默认删除超过7天的所有行(和文件)(参考:三角洲湖真空).如果您要查看文件系统,您将注意到表中有11个文件。

/ departuredelay .delta$ ls -l_delta_log部分- 00000 - 5 - e52736b - 0 - e63 - 48 - f3 - 8 d56 - 50 - f7cfa0494d c000.snappy.parquet- 00000 - 69 -一部分eb53d5 - 34 - b4 - 408 f - a7e4 - 86 - e000428c37 c000.snappy.parquet部分- 00000 f8edaf04 - 712 e - 4 - ac4 - 8 b42 - 368 - d0bbdb95b c000.snappy.parquet部分- 00001 - 20893 -速度- 9 - d4f - 4 - c1f b619 - 3 - e6ea1fdd05f c000.snappy.parquet部分- 00001 - 9 - b68b9f6 bad3 - 434 f - 9498 - f92dc4f503e3 c000.snappy.parquet部分- 00001 d4823d2e - 8 f9d - 42 - e3 - 918 d - 4060969 - e5844 c000.snappy.parquet部分- 00002 - 24 - da7f4e 7 - e8d - 40 - d1 - b664 - 95 - bf93ffeadb c000.snappy.parquet部分- 00002 - 3027786 - c - 20 - a9 4 - b19 - 868 d - dc7586c275d4 c000.snappy.parquet部分- 00002 f2609f27 - 3478 4 - bf9 aeb7 - 2 - c78a05e6ec1 c000.snappy.parquet部分- 00003 - 850436 - a6 c4dd - 4535 a1c0 - 5 - dc0f01d3d55 c000.snappy.parquet部分- 00003 b9292122 - 99 a7 - 4223 - aaa9 - 8646 - c281f199 c000.snappy.parquet

要删除所有文件,以便只保留数据的当前快照,您将为vacuum方法指定一个小值(而不是默认的7天保留)。

#删除0小时以上的所有文件。deltaTable.vacuum (0
注意,您可以通过SQL语法执行相同的任务:#移除0小时以上的所有文件。sql(“VACUUM”+ pathToEventsTable +“RETAIN 0 HOURS”)

一旦真空完成,当您检查文件系统时,您将注意到由于历史数据已被删除,文件减少了。

/ departuredelay .delta$ ls -l_delta_log部分- 00000 f8edaf04 - 712 e - 4 - ac4 - 8 b42 - 368 - d0bbdb95b c000.snappy.parquet部分- 00001 - 9 - b68b9f6 bad3 - 434 f - 9498 - f92dc4f503e3 c000.snappy.parquet部分- 00002 - 24 - da7f4e 7 - e8d - 40 - d1 - b664 - 95 - bf93ffeadb c000.snappy.parquet部分- 00003 b9292122 - 99 a7 - 4223 - aaa9 - 8646 - c281f199 c000.snappy.parquet
注意,运行真空后,时间旅行回到比保留期更早的版本的能力将丧失。

接下来是什么

今天就在Apache Spark 2.4.3(或更高版本)实例上尝试前面的代码片段来尝试Delta Lake。通过使用Delta Lake,您可以使数据湖更加可靠(无论是创建新的数据湖还是迁移现有的数据湖)。要了解BOB低频彩更多,请参阅https://delta.io/加入三角洲湖社区松弛而且谷歌集团.您可以跟踪所有即将发布的版本和计划的功能github的里程碑

接下来,我们也很高兴有Spark AI欧洲峰会10月15日至17日。在峰会上,我们将有一个三角洲湖专门的训练课

学分

我们要感谢以下贡献者对Delta Lake 0.4.0的更新,文档更改和贡献:Andreas Neumann, Burak Yavuz, Jose Torres, Jules Damji, Jungtaek Lim, Liwen Sun, Michael Armbrust, Mukul Murthy, Pranav Anand, Rahul Mahadev, Shixiong Zhu, Tathagata Das, Terry Kim, Wenchen Fan, Wesley Hoffman, yishang Lu, Yucai Yu, lys0716。

免费试用Databricks

相关的帖子

看到所有工程的博客的帖子