开始
加载和管理数据
处理数据
政府
引用和资源
2023年8月2日更新
给我们反馈
这篇文章展示了一个示例的编排与Apache数据管道气流砖工作。您还将了解如何设置气流与砖的集成。编制管理工作复杂任务之间的依赖关系。
开发和部署一个数据处理管道通常需要管理复杂任务之间的依赖关系。例如,管道可能读取数据从源,清理数据,将清洗数据,转换后的数据写入一个目标。您需要测试、进度和解决数据管道时实施。
工作流系统应对这些挑战,允许您定义任务之间的依赖关系,安排当管道运行和监控工作流。Apache气流是一个开源的bob下载地址解决方案来管理和调度的数据管道。气流代表数据管道有向无环图(无进取心的人)的操作。你定义一个工作流在Python文件和气流管理调度和执行。气流砖连接允许您利用优化的火花引擎提供的砖与气流的调度功能。
气流和砖之间的集成是1.9.0后来气流中可用版本。本文中的示例与气流2.1.0版测试。
气流需要Python 3.6、3.7或3.8。这篇文章中的示例是用Python 3.8做了试验。
安装气流砖集成,打开终端并运行下列命令。一定要用您的用户名和电子邮件在最后一行:
mkdir气流cd气流pipenv——python38 pipenv壳出口AIRFLOW_HOME=$ (松材线虫病)pipenv安装apache-airflow= =2.1.0 pipenv安装apache-airflow-providers-databricks mkdir熟练的技艺气流db init气流用户创建用户名admin——firstname < firstname >——lastname < lastname >——角色管理——电子邮件your.com
当你复制和运行脚本,您执行这些步骤:
创建一个目录命名气流和更改到该目录。
气流
使用pipenv创建和产卵Python虚拟环境。砖建议使用Python的虚拟环境隔离包版本和代码依赖环境。这种隔离有助于减少意外包版本不匹配和代码依赖碰撞。
pipenv
初始化一个环境变量命名AIRFLOW_HOME设置的路径气流目录中。
AIRFLOW_HOME
安装气流,气流砖提供者包。
创建一个气流/无进取心的人目录中。气流使用熟练的技艺目录存储DAG定义。
气流/无进取心的人
熟练的技艺
初始化一个气流SQLite数据库用来跟踪的元数据。在生产气流部署,您将配置气流与一个标准的数据库。气流的SQLite数据库和默认配置部署中初始化气流目录中。
创建管理员用户对气流。
安装临时演员例如,芹菜和密码运行:
芹菜
密码
pip安装“apache-airflow(砖、芹菜、密码)”
气流web服务器需要查看气流UI。启动web服务器,打开终端并运行下列命令:
气流网络服务器
调度器是气流组件,日程安排熟练的技艺。要运行它,打开一个新的终端和运行以下命令:
pipenv壳出口AIRFLOW_HOME=$ (松材线虫病)气流调度器
验证气流安装,您可以运行一个例子与气流无进取心的人包括:
在一个浏览器窗口,打开http://localhost: 8080 / home。气流熟练的技艺屏幕上出现了。
单击暂停/ Unpause DAG切换到unpause装饰边的一个例子,例如,example_python_operator。
example_python_operator
点击触发DAG的例子开始按钮。
单击DAG名称查看详细信息,包括DAG的运行状态。
气流砖集成提供了两个不同的操作触发的工作:
的DatabricksRunNowOperator需要一个现有的砖的工作和使用引发新工作运行(帖子/工作/运行)API请求触发运行。砖推荐使用DatabricksRunNowOperator因为它可以减少重复工作的定义和工作运行触发这个操作符很容易找到的工作界面。
帖子/工作/运行
DatabricksRunNowOperator
的DatabricksSubmitRunOperator不需要工作存在在砖和使用创建并触发一次运行(帖子/ /运行/提交工作)API请求提交作业规范和触发运行。
帖子/ /运行/提交工作
砖气流操作符写作业运行气流日志每一个页面的URLpolling_period_seconds(默认是30秒)。有关更多信息,请参见apache-airflow-providers-databricks包在气流的网站页面。
polling_period_seconds
下面的例子演示了如何创建一个简单的气流部署在本地机器上运行和部署一个例子DAG触发运行在砖。对于本例,您:
创建一个新的笔记本和添加代码来打印一个问候根据配置参数。
创建一个砖工作运行笔记本的一个任务。
配置一个气流联系砖工作区。
创建一个气流DAG触发笔记本工作。你在一个Python脚本使用定义DAGDatabricksRunNowOperator。
利用气流UI触发DAG并查看运行状态。
这个例子使用一个笔记本,其中包含两个单元:
第一个单元格包含一个砖实用程序的文本小部件定义一个变量命名问候设置为默认值世界。
问候
世界
第二个单元格打印的值问候变量的前缀你好。
你好
创建笔记本:
去你的砖着陆页面并选择创建空白笔记本,或点击新在侧边栏并选择笔记本。的创建笔记本对话框出现了。
在创建笔记本对话框中,给你的笔记本一个名字,例如你好气流。集默认的语言来Python。离开集群设置为默认值。您将配置集群创建任务时使用这个笔记本。
点击创建。
复制下面的Python代码粘贴到第一个单元格的笔记本。
dbutils。小部件。文本(“问候”,“世界”,“问候”)问候=dbutils。小部件。得到(“问候”)
添加一个新的细胞低于第一个单元格和下面的Python代码复制并粘贴到新的细胞:
打印(“你好{}”。格式(问候))
点击工作流在侧边栏。
点击。
的任务选项卡显示了创建任务对话框。
取代添加一个名称为你的工作…对你的工作名称。
在任务名称字段中,输入任务的名称,例如,greeting-task。
在类型下拉,选择笔记本。
使用文件浏览器来找到您创建的笔记本,点击笔记本名称,点击确认。
点击添加下参数。在关键字段中,输入问候。在价值字段中,输入气流用户。
气流用户
点击创建任务。
立即运行任务,点击在右上角。您还可以通过单击运行工作运行选项卡并单击现在运行在活跃的运行表。
单击运行选项卡并单击查看详细信息在活跃的运行表或完成运行60天(过去)表。
复制工作ID价值。这个值需要触发工作从气流。
请注意
安全最佳实践,当您使用自动化工具进行身份验证,系统中,脚本和应用程序,砖属于建议您使用个人访问令牌服务主体而不是用户工作区。为服务主体,创建令牌服务主体的管理令牌。
气流连接使用砖砖个人访问令牌(PAT)。看到个人访问令牌为创建一个帕特的说明。
砖的气流安装包含一个默认的连接。更新连接连接到您的工作空间中使用上面创建的个人访问令牌:
在一个浏览器窗口,打开http://localhost: 8080 /联系/清单/。
下康涅狄格州ID,定位databricks_default并单击编辑记录按钮。
取代的价值主机场的工作区实例名你的砖部署。
在额外的字段中,输入下列值:
{“令牌”:“PERSONAL_ACCESS_TOKEN”}
取代PERSONAL_ACCESS_TOKEN与你的砖个人访问令牌。
PERSONAL_ACCESS_TOKEN
你在一个Python文件定义一个气流DAG。创建一个DAG触发的例子笔记本工作:
在文本编辑器中或者IDE,创建一个新文件命名databricks_dag.py用下面的内容:
databricks_dag.py
从气流进口DAG从airflow.providers.databricks.operators.databricks进口DatabricksRunNowOperator从airflow.utils.dates进口days_agodefault_args={“主人”:“气流”}与DAG(“databricks_dag”,start_date=days_ago(2),schedule_interval=没有一个,default_args=default_args)作为dag:opr_run_now=DatabricksRunNowOperator(task_id=“run_now”,databricks_conn_id=“databricks_default”,job_id=JOB_ID)
取代JOB_ID早些时候与工作ID的值保存。
JOB_ID
保存文件气流/无进取心的人目录中。气流自动读取和安装DAG文件存储在气流/无进取心的人。
触发并验证的DAG气流界面:
定位databricks_dag并单击暂停/ Unpause DAG切换unpause DAG。
databricks_dag
触发DAG点击开始按钮。
点击运行运行列视图状态和运行的细节。