编排与Apache气流砖工作

这篇文章展示了一个示例的编排与Apache数据管道气流砖工作。您还将了解如何设置气流与砖的集成。编制管理工作复杂任务之间的依赖关系。

工作编制的数据管道

开发和部署一个数据处理管道通常需要管理复杂任务之间的依赖关系。例如,管道可能读取数据从源,清理数据,将清洗数据,转换后的数据写入一个目标。您需要测试、进度和解决数据管道时实施。

工作流系统应对这些挑战,允许您定义任务之间的依赖关系,安排当管道运行和监控工作流。Apache气流是一个开源的bob下载地址解决方案来管理和调度的数据管道。气流代表数据管道有向无环图(无进取心的人)的操作。你定义一个工作流在Python文件和气流管理调度和执行。气流砖连接允许您利用优化的火花引擎提供的砖与气流的调度功能。

需求

  • 气流和砖之间的集成是1.9.0后来气流中可用版本。本文中的示例与气流2.1.0版测试。

  • 气流需要Python 3.6、3.7或3.8。这篇文章中的示例是用Python 3.8做了试验。

安装气流砖集成

安装气流砖集成,打开终端并运行下列命令。一定要用您的用户名和电子邮件在最后一行:

mkdir气流cd气流pipenv——python38 pipenv壳出口AIRFLOW_HOME=$ (松材线虫病)pipenv安装apache-airflow= =2.1.0 pipenv安装apache-airflow-providers-databricks mkdir熟练的技艺气流db init气流用户创建用户名admin——firstname < firstname >——lastname < lastname >——角色管理——电子邮件your.com

当你复制和运行脚本,您执行这些步骤:

  1. 创建一个目录命名气流和更改到该目录。

  2. 使用pipenv创建和产卵Python虚拟环境。砖建议使用Python的虚拟环境隔离包版本和代码依赖环境。这种隔离有助于减少意外包版本不匹配和代码依赖碰撞。

  3. 初始化一个环境变量命名AIRFLOW_HOME设置的路径气流目录中。

  4. 安装气流,气流砖提供者包。

  5. 创建一个气流/无进取心的人目录中。气流使用熟练的技艺目录存储DAG定义。

  6. 初始化一个气流SQLite数据库用来跟踪的元数据。在生产气流部署,您将配置气流与一个标准的数据库。气流的SQLite数据库和默认配置部署中初始化气流目录中。

  7. 创建管理员用户对气流。

安装临时演员例如,芹菜,s3,密码运行:

pip安装“apache-airflow(砖、芹菜、s3、密码)”

启动气流web服务器和调度器

气流web服务器需要查看气流UI。启动web服务器,打开终端并运行下列命令:

气流网络服务器

调度器是气流组件,日程安排熟练的技艺。要运行它,打开一个新的终端和运行以下命令:

pipenv壳出口AIRFLOW_HOME=$ (松材线虫病)气流调度器

气流安装进行测试

验证气流安装,您可以运行一个例子与气流无进取心的人包括:

  1. 在一个浏览器窗口,打开http://localhost: 8080 / home。气流熟练的技艺屏幕上出现了。

  2. 单击暂停/ Unpause DAG切换到unpause装饰边的一个例子,例如,example_python_operator

  3. 点击触发DAG的例子开始按钮。

  4. 单击DAG名称查看详细信息,包括DAG的运行状态。

气流运营商的砖

气流砖集成提供了两个不同的操作触发的工作:

砖气流操作符写作业运行气流日志每一个页面的URLpolling_period_seconds(默认是30秒)。有关更多信息,请参见apache-airflow-providers-databricks包在气流的网站页面。

运行一个砖气流的工作

下面的例子演示了如何创建一个简单的气流部署在本地机器上运行和部署一个例子DAG触发运行在砖。对于本例,您:

  1. 创建一个新的笔记本和添加代码来打印一个问候根据配置参数。

  2. 创建一个砖工作运行笔记本的一个任务。

  3. 配置一个气流联系砖工作区。

  4. 创建一个气流DAG触发笔记本工作。你在一个Python脚本使用定义DAGDatabricksRunNowOperator

  5. 利用气流UI触发DAG并查看运行状态。

创建一个笔记本

这个例子使用一个笔记本,其中包含两个单元:

  • 第一个单元格包含一个砖实用程序的文本小部件定义一个变量命名问候设置为默认值世界

  • 第二个单元格打印的值问候变量的前缀你好

创建笔记本:

  1. 去你的砖着陆页面并选择创建空白笔记本,或点击新图标”src=在侧边栏并选择笔记本。的创建笔记本对话框出现了。

  2. 创建笔记本对话框中,给你的笔记本一个名字,例如你好气流。集默认的语言Python。离开集群设置为默认值。您将配置集群创建任务时使用这个笔记本。

  3. 点击创建

  4. 复制下面的Python代码粘贴到第一个单元格的笔记本。

    dbutils小部件文本(“问候”,“世界”,“问候”)问候=dbutils小部件得到(“问候”)
  5. 添加一个新的细胞低于第一个单元格和下面的Python代码复制并粘贴到新的细胞:

    打印(“你好{}格式(问候))

创建一个工作

  1. 点击工作图标”src=工作流在侧边栏。

  2. 点击创建工作按钮”src=

    任务选项卡显示了创建任务对话框。

    创建第一个任务对话框”src=
  3. 取代添加一个名称为你的工作…对你的工作名称。

  4. 任务名称字段中,输入任务的名称,例如,greeting-task

  5. 类型下拉,选择笔记本

  6. 使用文件浏览器来找到您创建的笔记本,点击笔记本名称,点击确认

  7. 点击添加参数。在关键字段中,输入问候。在价值字段中,输入气流用户

  8. 点击创建任务

运行作业

立即运行任务,点击现在运行按钮”src=在右上角。您还可以通过单击运行工作运行选项卡并单击现在运行活跃的运行表。

查看运行细节

  1. 单击运行选项卡并单击查看详细信息活跃的运行表或完成运行60天(过去)表。

  2. 复制工作ID价值。这个值需要触发工作从气流。

    查看示例的工作结果”src=

创建一个砖个人访问令牌的气流

请注意

作为一个安全最佳实践进行身份验证时使用自动化工具,系统、脚本和应用程序,砖建议您使用OAuth令牌或个人访问令牌属于服务主体而不是用户工作区。为服务主体,创建令牌服务主体的管理令牌

气流连接使用砖砖个人访问令牌(PAT)。看到个人访问令牌为创建一个帕特的说明。

配置一个砖连接

砖的气流安装包含一个默认的连接。更新连接连接到您的工作空间中使用上面创建的个人访问令牌:

  1. 在一个浏览器窗口,打开http://localhost: 8080 /联系/清单/

  2. 康涅狄格州ID,定位databricks_default并单击编辑记录按钮。

  3. 取代的价值主机场的工作区实例名你的砖部署。

  4. 额外的字段中,输入下列值:

    {“令牌”:“PERSONAL_ACCESS_TOKEN”}

    取代PERSONAL_ACCESS_TOKEN与你的砖个人访问令牌。

创建一个新的气流DAG

你在一个Python文件定义一个气流DAG。创建一个DAG触发的例子笔记本工作:

  1. 在文本编辑器中或者IDE,创建一个新文件命名databricks_dag.py用下面的内容:

    气流进口DAGairflow.providers.databricks.operators.databricks进口DatabricksRunNowOperatorairflow.utils.dates进口days_agodefault_args={“主人”:“气流”}DAG(“databricks_dag”,start_date=days_ago(2),schedule_interval=没有一个,default_args=default_args)作为dag:opr_run_now=DatabricksRunNowOperator(task_id=“run_now”,databricks_conn_id=“databricks_default”,job_id=JOB_ID)

    取代JOB_ID早些时候与工作ID的值保存。

  2. 保存文件气流/无进取心的人目录中。气流自动读取和安装DAG文件存储在气流/无进取心的人

安装和验证DAG的气流

触发并验证的DAG气流界面:

  1. 在一个浏览器窗口,打开http://localhost: 8080 / home。气流熟练的技艺屏幕上出现了。

  2. 定位databricks_dag并单击暂停/ Unpause DAG切换unpause DAG。

  3. 触发DAG点击开始按钮。

  4. 点击运行运行列视图状态和运行的细节。