{“细胞”:[{“cell_type”:“减价”、“元数据”:{},“源”:[" #简化数据可靠性与三角洲湖和湖Python, SQL工具,和就地迁移\ n”,“\ n”,“我们兴奋地宣布释放的三角洲湖0.4.0介绍了Python api用于操纵和管理三角洲表中的数据。这个版本的主要特点是:\ n”、“\ n”、“* * * Python api DML和实用操作* * ((# 89)(https://github.com/delta-io/delta/issues/89))——你现在可以使用Python api来更新/删除/合并三角洲湖表中的数据和实用程序运行操作(即。、真空、历史)。这些都是很好的构建复杂工作负载在Python中,例如,[缓慢变化维度(SCD)] (https://docs.delta.io/0.4.0/delta-update.html # slowly-changing-data-scd-type-2-operation-into-delta-tables)操作,合并(更改数据)(https://docs.delta.io/0.4.0/delta-update.html write-change-data-into-a-delta-table)复制,和[插入从流媒体查询)(https://docs.delta.io/0.4.0/delta-update.html # upsert-from-streaming-queries-using-foreachbatch)。看到(文档)(https://docs.delta.io/0.4.0/delta-update.html)为更多的细节。\ n”、“\ n”、“* **Convert-to-Delta** ([#78](https://github.com/delta-io/delta/issues/78)) - You can now convert a Parquet table in place to a Delta Lake table without rewriting any of the data. This is great for converting very large Parquet tables which would be costly to rewrite as a Delta table. Furthermore, this process is reversible - you can convert a Parquet table to Delta Lake table, operate on it (e.g., delete or merge), and easily convert it back to a Parquet table. See the [documentation](https://docs.delta.io/0.4.0/delta-utility.html#convert-to-delta) for more details.\n", "\n", "* **SQL for utility operations** - You can now use SQL to run utility operations vacuum and history. See the [documentation](https://docs.delta.io/0.4.0/delta-utility.html#enable-sql-commands-within-apache-spark) for more details on how to configure Spark to execute these Delta-specific SQL commands.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Data Preparation\n", "Configure locations for the source file and where the Delta Lake Table will be stored" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pandas as pd" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "tripdelaysFilePath = \"/usr/local/Cellar/spark/data/departuredelays.csv\"\n", "pathToEventsTable = \"/usr/local/Cellar/spark/spark-2.4.3-bin-hadoop2.7/departureDelays.delta\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create `departureDelays` DataFrame" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "departureDelays = spark.read.option(\"header\", \"true\").option(\"inferSchema\", \"true\").csv(tripdelaysFilePath)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Save table as Delta Lake (update `pathToEventsTable` to match the following location" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "departureDelays.write.format(\"delta\").mode(\"overwrite\").save(\"departureDelays.delta\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load Delta Lake table" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "delays_delta = spark.read.format(\"delta\").load(\"departureDelays.delta\")\n", "delays_delta.createOrReplaceTempView(\"delays_delta\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get count of rows" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\ n”、“\ n”、“ \ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“
数(1)
01698年
\ n”、“
“,”文本/普通”:[“计数(1)\ n”, 1698“0”)},“execution_count”: 6,“元数据”:{},“output_type”:“execute_result”}],“源”:["火花。sql (\“select count(1)从delays_delta起源=‘海’和目的地= anglo american \”) .toPandas ()] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Review File System**: Note there are four files initially created as part of the table creation." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[34m_delta_log\u001b[m\u001b[m/\r\n", "part-00000-091cff02-ca22-473d-bb22-13164be0846b-c000.snappy.parquet\r\n", "part-00001-e61a8edb-210a-4692-a45e-388caf74dd62-c000.snappy.parquet\r\n", "part-00002-9c17ccd7-7c13-429e-b26c-eabad1e1d58a-c000.snappy.parquet\r\n", "part-00003-bf58ae04-4900-425c-bdd4-c29aa8c454e4-c000.snappy.parquet\r\n" ] } ], "source": [ "%ls $pathToEventsTable" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deletes\n", "With Delta Lake, you can delete data with the Python API" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "from delta.tables import *\n", "from pyspark.sql.functions import *\n", "deltaTable = DeltaTable.forPath(spark, pathToEventsTable)\n", "deltaTable.delete(\"delay < 0\") " ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\ n”、“\ n”、“ \ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“
数(1)
0837年
\ n”、“
“,”文本/普通”:[“计数(1)\ n”, 837“0”)},“execution_count”: 9,“元数据”:{},“output_type”:“execute_result}],“源”(“#得到行数\ n”、“火花。sql (\“select count(1)从delays_delta起源=‘海’和目的地= anglo american \”) .toPandas ()] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Review File System**: Note that while we deleted early (and on-time) flights, there are now eight files (instead of the four files initially created as part of the table creation)." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[34m_delta_log\u001b[m\u001b[m/\r\n", "part-00000-091cff02-ca22-473d-bb22-13164be0846b-c000.snappy.parquet\r\n", "part-00000-fb071d3b-9d25-4faf-a04f-26d35716315b-c000.snappy.parquet\r\n", "part-00001-8db427ad-edd5-4c99-8578-01da4b4ea921-c000.snappy.parquet\r\n", "part-00001-e61a8edb-210a-4692-a45e-388caf74dd62-c000.snappy.parquet\r\n", "part-00002-8c19b041-8e4b-4c38-be54-3d32bce76f83-c000.snappy.parquet\r\n", "part-00002-9c17ccd7-7c13-429e-b26c-eabad1e1d58a-c000.snappy.parquet\r\n", "part-00003-bf58ae04-4900-425c-bdd4-c29aa8c454e4-c000.snappy.parquet\r\n", "part-00003-c08cd37e-659b-49c8-9ab1-5a39862dddc9-c000.snappy.parquet\r\n" ] } ], "source": [ "%ls $pathToEventsTable" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Updates\n", "Update flights originating from Detroit (DTW) to now be from Seattle (SEA)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "deltaTable.update(\"origin = 'DTW'\", { \"origin\": \"'SEA'\" } ) " ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\ n”、“\ n”、“ \ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“
数(1)
0986年
\ n”、“
“,”文本/普通”:[“计数(1)\ n”, 986“0”)},“execution_count”: 12,“元数据”:{},“output_type”:“execute_result”}],“源”:["火花。sql (\“select count(1)从delays_delta起源=‘海’和目的地= anglo american \”) .toPandas ()] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View History\n", "View the table history (note the create table, insert, and update operations)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\ n”、“\ n”、“ \ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“
版本时间戳用户标识用户名操作operationParameters工作笔记本clusterIdreadVersionisolationLevelisBlindAppend
022019-10-02 17:56:46没有一个没有一个更新{“谓语”:“(起源# 767 = DTW)”}没有一个没有一个没有一个1.0没有一个
112019-10-02 17:56:38没有一个没有一个删除{“谓语”:“\[”(“延迟”< 0)\]}没有一个没有一个没有一个0.0没有一个
202019-10-02 17:56:25没有一个没有一个{“模式”:“覆盖”,“partitionBy”:“[]”}没有一个没有一个没有一个没有一个
\ n”、“
“,”文本/普通”:[”版本时间戳userId操作用户名\ \ \ n”、“0 2 2019-10-02 17:56:46都没有更新\ n”,“1 1 2019-10-02 17:56:38都没有删除\ n”,“2 0 2019-10-02 17:56:25都没有写\ n”,“\ n”、“operationParameters工作笔记本clusterId \ \ \ n”、“0{“谓语”:“(起源# 767 = DTW)”}没有没有没有\ n”,“1{“谓语”:[\”(“延迟”< 0)\]}没有没有没有\ n”,“2{“模式”:“覆盖”,“partitionBy”:“[]”}没有没有没有\ n”、“\ n”、“readVersion isolationLevel isBlindAppend \ n”、“0 1.0没有假\ n”,“1 0.0没有假\ n”,“2南没有假”)},”execution_count”: 13日,“元数据”:{},“output_type”:“execute_result}],“源”:[" deltaTable.history () .toPandas ()] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Calculate counts for each version of the table" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986\n" ] } ], "source": [ "dfv0 = spark.read.format(\"delta\").option(\"versionAsOf\", 0).load(\"departureDelays.delta\")\n", "dfv1 = spark.read.format(\"delta\").option(\"versionAsOf\", 1).load(\"departureDelays.delta\")\n", "dfv2 = spark.read.format(\"delta\").option(\"versionAsOf\", 2).load(\"departureDelays.delta\")\n", "\n", "cnt0 = dfv0.where(\"origin = 'SEA'\").where(\"destination = 'SFO'\").count()\n", "cnt1 = dfv1.where(\"origin = 'SEA'\").where(\"destination = 'SFO'\").count()\n", "cnt2 = dfv2.where(\"origin = 'SEA'\").where(\"destination = 'SFO'\").count()\n", "\n", "print(\"SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s\" % (cnt0, cnt1, cnt2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Review File System**: Note the number of files based on the preceding operations." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[34m_delta_log\u001b[m\u001b[m/\r\n", "part-00000-091cff02-ca22-473d-bb22-13164be0846b-c000.snappy.parquet\r\n", "part-00000-fa942ead-0f90-4a99-8949-510d00a77597-c000.snappy.parquet\r\n", "part-00000-fb071d3b-9d25-4faf-a04f-26d35716315b-c000.snappy.parquet\r\n", "part-00001-8db427ad-edd5-4c99-8578-01da4b4ea921-c000.snappy.parquet\r\n", "part-00001-e2639517-7fea-4bc0-ba74-3af982025112-c000.snappy.parquet\r\n", "part-00001-e61a8edb-210a-4692-a45e-388caf74dd62-c000.snappy.parquet\r\n", "part-00002-8c19b041-8e4b-4c38-be54-3d32bce76f83-c000.snappy.parquet\r\n", "part-00002-9c17ccd7-7c13-429e-b26c-eabad1e1d58a-c000.snappy.parquet\r\n", "part-00002-c85d41c2-0ea1-451a-9bbf-52e7bf87d569-c000.snappy.parquet\r\n", "part-00003-bf58ae04-4900-425c-bdd4-c29aa8c454e4-c000.snappy.parquet\r\n", "part-00003-c08cd37e-659b-49c8-9ab1-5a39862dddc9-c000.snappy.parquet\r\n" ] } ], "source": [ "%ls $pathToEventsTable" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Vacuum\n", "Remove older data (by default 7 days) " ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[]" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "deltaTable.vacuum(0)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[34m_delta_log\u001b[m\u001b[m/\r\n", "part-00000-fa942ead-0f90-4a99-8949-510d00a77597-c000.snappy.parquet\r\n", "part-00001-e2639517-7fea-4bc0-ba74-3af982025112-c000.snappy.parquet\r\n", "part-00002-c85d41c2-0ea1-451a-9bbf-52e7bf87d569-c000.snappy.parquet\r\n", "part-00003-c08cd37e-659b-49c8-9ab1-5a39862dddc9-c000.snappy.parquet\r\n" ] } ], "source": [ "%ls $pathToEventsTable" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And let's not forget, Delta Lake 0.4.0 also includes `MERGE` in the Python API!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Merge\n", "Let's merge another table with the `departureDelays` table with [data deduplication](https://docs.delta.io/0.4.0/delta-update.html#data-deduplication-when-writing-into-delta-tables). Let's start by viewing data that will be impacted by the merge." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\ n”、“\ n”、“ \ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“
日期延迟距离起源目的地
010105210590年旧金山
1101071031日590年旧金山
210107305590年旧金山
31010955104年590年旧金山
\ n”、“
“,”文本/普通”:[起源“日期推迟距离目的地\ n”,“0 1010521 0 590海SFO \ n”,“1 590 31 1010710海SFO \ n”,“2 1010730 5 590海SFO \ n”,“3 590 104 1010955海SFO”]},“execution_count”: 18岁的“元数据”:{},“output_type”:“execute_result”}],“源”:["火花。sql (\“select * from delays_delta起源=‘海’和目的地=日期和“旧金山”像“1010%”按日期顺序限制10 \”).toPandas ()] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's create our `merge_table` which contains three rows:\n", "* 1010710: this row is a duplicate\n", "* 1010521: this row will be updated with a new delay value\n", "* 1010822: this is a new row" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\ n”、“\ n”、“ \ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“
日期延迟距离起源目的地
0101052110590年旧金山
1101071031日590年旧金山
2101083231日590年旧金山
\ n”、“
“,”文本/普通”:[起源“日期推迟距离目的地\ n”,“0 1010521 10 590海SFO \ n”,“1 590 31 1010710海SFO \ n”,“2 1010832 31 590海SFO”]},“execution_count”: 19日,“元数据”:{},“output_type”:“execute_result}],“源”:["项=[(1010521,590,“海”,“旧金山”),(590年1010710,31日,“海”,“旧金山”),(590年1010832,31日,“海”,“旧金山”)]\ n”、“关口=[‘日期’,‘延迟’,‘距离’,‘源’,‘目的地’]\ n”、“merge_table =火花。createDataFrame(项目、关口)\ n”、“merge_table.toPandas ()] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's run our merge statement that will handle the duplicates, updates, and add a new row" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "deltaTable.alias(\"flights\") \\\n", " .merge(merge_table.alias(\"updates\"),\"flights.date = updates.date\") \\\n", " .whenMatchedUpdate(set = { \"delay\" : \"updates.delay\" } ) \\\n", " .whenNotMatchedInsertAll() \\\n", " .execute()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\ n”、“\ n”、“ \ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“ \ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“\ n”、“
日期延迟距离起源目的地
0101052110590年旧金山
1101071031日590年旧金山
210107305590年旧金山
3101083231日590年旧金山
41010955104年590年旧金山
\ n”、“
“,”文本/普通”:[起源“日期推迟距离目的地\ n”,“0 1010521 10 590海SFO \ n”,“1 590 31 1010710海SFO \ n”,“2 1010730 5 590海SFO \ n”,“3 590 31 1010832海SFO \ n”,“4 1010955 104 590海SFO”]},“execution_count”: 21岁的“元数据”:{},“output_type”:“execute_result”}],“源”:["火花。sql (\“select * from delays_delta起源=‘海’和目的地=日期和“旧金山”像“1010%”按日期顺序限制10 \”).toPandas ()] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As noted in the previous cells, notice the following:\n", "* There is only one row for the date `1010710` as `merge` automatically takes care of **data deduplication**\n", "* The row for the date `1010521` has the `delay` value **updated** from 0 to 10.\n", "* The row for the date `1010821` has been added as this date did not exist, hence it was **inserted**." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }