编排与Apache气流砖工作
这篇文章展示了一个示例的编排与Apache数据管道气流砖工作。您还将了解如何设置气流与砖的集成。编制管理工作复杂任务之间的依赖关系。
工作编制的数据管道
开发和部署一个数据处理管道通常需要管理复杂任务之间的依赖关系。例如,管道可能读取数据从源,清理数据,将清洗数据,转换后的数据写入一个目标。您需要测试、进度和解决数据管道时实施。
工作流系统应对这些挑战,允许您定义任务之间的依赖关系,安排当管道运行和监控工作流。Apache气流是一个开源的bob下载地址解决方案来管理和调度的数据管道。气流代表数据管道有向无环图(无进取心的人)的操作。你定义一个工作流在Python文件和气流管理调度和执行。气流砖连接允许您利用优化的火花引擎提供的砖与气流的调度功能。
安装气流砖集成
安装气流砖集成,打开终端并运行下列命令。一定要用您的用户名和电子邮件在最后一行:
mkdir气流cd气流pipenv——python38 pipenv壳出口AIRFLOW_HOME=$ (松材线虫病)pipenv安装apache-airflow= =2.1.0 pipenv安装apache-airflow-providers-databricks mkdir熟练的技艺气流db init气流用户创建用户名admin——firstname < firstname >——lastname < lastname >——角色管理——电子邮件your.com
当你复制和运行脚本,您执行这些步骤:
创建一个目录命名
气流
和更改到该目录。使用
pipenv
创建和产卵Python虚拟环境。砖建议使用Python的虚拟环境隔离包版本和代码依赖环境。这种隔离有助于减少意外包版本不匹配和代码依赖碰撞。初始化一个环境变量命名
AIRFLOW_HOME
设置的路径气流
目录中。安装气流,气流砖提供者包。
创建一个
气流/无进取心的人
目录中。气流使用熟练的技艺
目录存储DAG定义。初始化一个气流SQLite数据库用来跟踪的元数据。在生产气流部署,您将配置气流与一个标准的数据库。气流的SQLite数据库和默认配置部署中初始化
气流
目录中。创建管理员用户对气流。
安装临时演员例如,芹菜
,s3
,密码
运行:
pip安装“apache-airflow(砖、芹菜、s3、密码)”
启动气流web服务器和调度器
气流web服务器需要查看气流UI。启动web服务器,打开终端并运行下列命令:
气流网络服务器
调度器是气流组件,日程安排熟练的技艺。要运行它,打开一个新的终端和运行以下命令:
pipenv壳出口AIRFLOW_HOME=$ (松材线虫病)气流调度器
气流安装进行测试
验证气流安装,您可以运行一个例子与气流无进取心的人包括:
在一个浏览器窗口,打开http://localhost: 8080 / home。气流熟练的技艺屏幕上出现了。
单击暂停/ Unpause DAG切换到unpause装饰边的一个例子,例如,
example_python_operator
。点击触发DAG的例子开始按钮。
单击DAG名称查看详细信息,包括DAG的运行状态。
气流运营商的砖
气流砖集成提供了两个不同的操作触发的工作:
的DatabricksRunNowOperator需要一个现有的砖的工作和使用引发新工作运行(
帖子/工作/运行
)API请求触发运行。砖推荐使用DatabricksRunNowOperator
因为它可以减少重复工作的定义和工作运行触发这个操作符很容易找到的工作界面。的DatabricksSubmitRunOperator不需要工作存在在砖和使用创建并触发一次运行(
帖子/ /运行/提交工作
)API请求提交作业规范和触发运行。
砖气流操作符写作业运行气流日志每一个页面的URLpolling_period_seconds
(默认是30秒)。有关更多信息,请参见apache-airflow-providers-databricks包在气流的网站页面。
运行一个砖气流的工作
下面的例子演示了如何创建一个简单的气流部署在本地机器上运行和部署一个例子DAG触发运行在砖。对于本例,您:
创建一个新的笔记本和添加代码来打印一个问候根据配置参数。
创建一个砖工作运行笔记本的一个任务。
配置一个气流联系砖工作区。
创建一个气流DAG触发笔记本工作。你在一个Python脚本使用定义DAG
DatabricksRunNowOperator
。利用气流UI触发DAG并查看运行状态。
创建一个笔记本
这个例子使用一个笔记本,其中包含两个单元:
第一个单元格包含一个砖实用程序的文本小部件定义一个变量命名
问候
设置为默认值世界
。第二个单元格打印的值
问候
变量的前缀你好
。
创建笔记本:
去你的砖着陆页面并选择创建空白笔记本,或点击新在侧边栏并选择笔记本。的创建笔记本对话框出现了。
在创建笔记本对话框中,给你的笔记本一个名字,例如你好气流。集默认的语言来Python。离开集群设置为默认值。您将配置集群创建任务时使用这个笔记本。
点击创建。
复制下面的Python代码粘贴到第一个单元格的笔记本。
dbutils。小部件。文本(“问候”,“世界”,“问候”)问候=dbutils。小部件。得到(“问候”)
添加一个新的细胞低于第一个单元格和下面的Python代码复制并粘贴到新的细胞:
打印(“你好{}”。格式(问候))
创建一个工作
点击工作流在侧边栏。
点击。
的任务选项卡显示了创建任务对话框。
取代添加一个名称为你的工作…对你的工作名称。
在任务名称字段中,输入任务的名称,例如,greeting-task。
在类型下拉,选择笔记本。
使用文件浏览器来找到您创建的笔记本,点击笔记本名称,点击确认。
点击添加下参数。在关键字段中,输入
问候
。在价值字段中,输入气流用户
。点击创建任务。
创建一个砖个人访问令牌的气流
气流连接使用砖砖个人访问令牌(PAT)。看到个人访问令牌为创建一个帕特的说明。
配置一个砖连接
砖的气流安装包含一个默认的连接。更新连接连接到您的工作空间中使用上面创建的个人访问令牌:
在一个浏览器窗口,打开http://localhost: 8080 /联系/清单/。
下康涅狄格州ID,定位databricks_default并单击编辑记录按钮。
取代的价值主机场的工作区实例名你的砖部署。
在额外的字段中,输入下列值:
{“令牌”:“PERSONAL_ACCESS_TOKEN”}
取代
PERSONAL_ACCESS_TOKEN
与你的砖个人访问令牌。
创建一个新的气流DAG
你在一个Python文件定义一个气流DAG。创建一个DAG触发的例子笔记本工作:
在文本编辑器中或者IDE,创建一个新文件命名
databricks_dag.py
用下面的内容:从气流进口DAG从airflow.providers.databricks.operators.databricks进口DatabricksRunNowOperator从airflow.utils.dates进口days_agodefault_args={“主人”:“气流”}与DAG(“databricks_dag”,start_date=days_ago(2),schedule_interval=没有一个,default_args=default_args)作为dag:opr_run_now=DatabricksRunNowOperator(task_id=“run_now”,databricks_conn_id=“databricks_default”,job_id=JOB_ID)
取代
JOB_ID
早些时候与工作ID的值保存。保存文件
气流/无进取心的人
目录中。气流自动读取和安装DAG文件存储在气流/无进取心的人
。
安装和验证DAG的气流
触发并验证的DAG气流界面:
在一个浏览器窗口,打开http://localhost: 8080 / home。气流熟练的技艺屏幕上出现了。
定位
databricks_dag
并单击暂停/ Unpause DAG切换unpause DAG。触发DAG点击开始按钮。
点击运行运行列视图状态和运行的细节。