使用DynamoDB 简化MWAA(Airflow)任务调度开发

Amazon Managed Workflows for Apache Airflow
0
1
## 1 背景介绍 我们看到越来越多的商业应用对于数据处理任务的调度提出需求,无论是处理原始数据清洗,数据仓库的分层任务,亦或是复杂大规模计算的场景。任务管理编排服务被更多业务场景所提及,Amazon Managed Workflows for Apache Airflow 是一项适用于 Apache Air flow 的托管式编排服务,让您能够在云中大规模设置和操作数据管道。 Apache Airflow 是一种开源工具,用于以编程方式编写、安排和监控称为工作流的流程和任务序列。借助 Amazon MWAA,您可以使用 Apache Airflow 和 Python 来创建工作流程,而无需管理底层基础设施以实现可扩展性、可用性和安全性。Amazon MWAA 会自动扩展其工作流程执行能力以满足您的需求,Amazon MWAA 集成了AWS安全服务,可帮助您快速安全地访问数据。 MWAA 服务有着诸多优点,如灵活创建不同版本的Airflow集群,可以自动扩展组件,通过设置在您的环境中运行的最小和最大工作线程数,自动扩展Worker。集成启用基于角色的身份验证和授权等等。但对于Airflow本身,在进行任务编排及开发需要编写大量的Python脚本,用户有着一定的学习和使用成本。我们也看到,其中大部分任务的代码几乎都有通用性。本文提供一种使用serverless 数据库 DynamoDB来管理Airflow 任务,并利用Airflow scheduler自动生成DAG的方式,使用户仅需编写通用任务插件,通过DynamoDB 表格管理任务及作业参数,包含依赖关系,任务具体执行自定义参数等,自动生成Airflow DAG 并自定义时间运行调度任务。 ## 2. 设计实现思路 ### 2.1 任务组织 在实践中,常见的,一个具体的数据处理任务称之为Task,例如一次Redshift SQL执行,一次Spark 批任务处理或一次Python脚本的运行。本方案实现的功能,会提供一个基础的Task模版类,并提供Redshift SQL 任务示例代码。帮助用户编写通用Task方法。在本方案中,一个Job DAG 由多个Task构成,并且可以组织其Task 依赖关系。为了兼容更加复杂的场景,添加了自定义Scheduler DAG,它由多个Job组成,在Scheduler DAG统一设置定时触发策略,作为作业执行的主DAG,按照Job之间依赖依次触发具体job执行。在这样的设计里,业务可以实现诸如先并发处理多个分散业务模块的数据处理,再进行汇总任务的处理。设计图示如下。 ![image.png](https://dev-media.amazoncloud.cn/4ad5357d8c024361925e647e4cbd7d0c_image.png "image.png") ### 2.2 架构实现 结合DynamoDB Serverless数据库支持任意规模数据的快速读取特性,将Task,Job,Scheduler在DynamoDB Table中记录管理。编写统一的DAG 文件部署到MWAA S3桶中,Airflow定时扫描数据库里新的任务记录,自动生成DAG。通过自定义定时触发运行Task,如Redshift、Athena、EMR等任务,并在Task里可以配置监控告警,发送到日志及邮件等告警。 ![image.png](https://dev-media.amazoncloud.cn/d7faaa69efc44f7da3d47b637840fb5c_image.png "image.png") ## 3 方案部署指南 源码链接:https://github.com/SEZ9/Airflow-dynamic-dags ### 3.1 创建DynamoDB表 创建三张DynamoDB表格分别管理Scheduler、Job,Task,命名如下 mwaa_scheduler,mwaa_job,mwaa_task。其中表字段及说明如下。 表名:mwaa_scheduler 分区键:schedule_id 属性字段: job_list 字段类型:字符串;解释:关联的job id,示例:1,2 schedule_description 字段类型:字符串;解释:调度器的描述信息 schedule_job_configs 字段类型:字符串;解释:记录调度器的额外信息 schedule_job_dependencies 字段类型:字符串;解释:job间的依赖关系,示例:[{"job_id":"1","pid":"2"}] schedule_name 字段类型:字符串;解释:调度器的名称,必须为英文 schedule_params 字段类型:字符串;解释:调度器配置参数,示例:{"schedule_interval":"0 0 12 * * "} status 字段类型 数字;解释:调度器状态 1为启用 0 为禁用 表名:mwaa_job 分区键:job_type 排序字段:job_id 属性字段: job_name 字段类型:字符串;解释:作业名称 job_params 字段类型:字符串;解释:作业的额外参数 job_task_configs 字段类型:字符串;解释:关联task的额外参数 job_task_dependencies 字段类型:字符串;解释:Task间的依赖关系,示例:[{"task_id":"1","pid":"2"}] status 字段类型 数字;解释:作业状态 1为启用 0 为禁用 表名:mwaa_task 分区键:task_id 属性字段: task_name 字段类型:字符串;解释:任务名称 task_params 字段类型:字符串;解释:任务参数,示例:{"sql":""} task_type 字段类型:字符串;解释:任务类型 ### 3.2 修改代码配置文件 在代码路径中找到config/conf.py文件修改公共参数,如数据表名及区域。每个环境的有关任务的执行参数,如Redshift 的地址及端口用户名密码登。以Redshift Task示例。 ![image.png](https://dev-media.amazoncloud.cn/704e380d62c449beba8841f024c17b51_image.png "image.png") ``` class ProductionConfig(BaseConfig): REDSHIFT_REGION = 'us-east-1' Scheduler_DYNAMODB_TABLE = 'mwaa_scheduler' JOB_DYNAMODB_TABLE = 'mwaa_job' TASK_DYNAMODB_TABLE = 'mwaa_task' REDSHIFT_HOST = 'xxxx' REDSHIFT_PORT = 5439 REDSHIFT_USER = 'xxxx' RDSHIFT_PWD = 'xxxx’` ``` ### 3.3 创建MWAA集群,并部署DAG 代码 创建集群参考文档: https://docs.aws.amazon.com/zh_cn/mwaa/latest/userguide/get-started.html 部署后点击进入Airflow UI,编辑环境参数,指明环境是生产还是测试,区分不同环境的参数获取。 ![image.png](https://dev-media.amazoncloud.cn/b025710c6a694ee5ba473cd2488feb76_image.png "image.png") ### 3.4 在DynamoDB表格中编辑任务 在DynamoDB创建Task,通过DynamoDB console 进入表格添加项目填写对应任务参数 ![image.png](https://dev-media.amazoncloud.cn/d54f8016d3e84f8d8fadd3c01d39f0b8_image.png "image.png") 在DynamoDB创建Job,通过DynamoDB console 进入表格添加项目填写对应任务参数 ![image.png](https://dev-media.amazoncloud.cn/f678fece15024f0fb8953b276ef1abf5_image.png "image.png") 在DynamoDB创建Scheduler,通过DynamoDB console 进入表格添加项目填写对应任务参数 ![image.png](https://dev-media.amazoncloud.cn/06cbce6ca63048ae9f5941d77cccd54f_image.png "image.png") 等待2分钟左右,可以在Airflow UI 看到自动生成的DAG ![image.png](https://dev-media.amazoncloud.cn/745306c6f809466397373121e0f7f9cc_image.png "image.png") 查看Scheduler 的Job依赖 ![image.png](https://dev-media.amazoncloud.cn/feaa665fe55543c1b76ae022d77b56cd_image.png "image.png") 查看Job的Task 依赖 ![image.png](https://dev-media.amazoncloud.cn/1fe4b8a66b594c2cb95d4d0787eb326b_image.png "image.png") 等待预定的运行时间或手动触发schedule DAG 执行任务 ![image.png](https://dev-media.amazoncloud.cn/21ea1d3a2f0d44c28868a1fa28a5c145_image.png "image.png") ### 3.5 自定义开发Task 模版 本方案提供了一个基础的BaseTaskFactory 类,在此基础上,用户可以自定义实现不同Task类型的实现。 首先在task_factory 中创建一个task 实现类并继承BaseTaskFactory,参考示例代码编写task执行逻辑。 ![image.png](https://dev-media.amazoncloud.cn/7049d9cf08c74e3da3da821f46a4b79d_image.png "image.png") 接着在config/conf.py 中编写任务类型与Task类映射关系 ``` TASK_FACTORY_MAPPING_DICT = { "REDSHIFT-SQL": "BasicRedshiftTaskFactory" } ``` 最后,通过在DynamoDB 中task管理表中指定Task类型即可实现对于task的调用。 ## 4 总结 本文介绍了如何使用DynamoDB,利用MWAA Airflow动态DAG,简化MWAA开发工作,用户仅需实现通用Task的逻辑编写,通过DynamoDB表格数据管理任务执行及互相依赖。并且提供了Redshift调用的实现逻辑参考,将大大简化用户在使用Airflow进行任务开发的工作,提升生产效率。 ## 参考链接 MWAA 官方文档 https://docs.aws.amazon.com/zh_cn/mwaa/latest/userguide/what-is-mwaa.html Airflow Dynamic DAGs https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#dynamic-dags Airflow Architecture Overview https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html Flink on Zeppelin 作业管理系统实践 https://mp.weixin.qq.com/s/438sJdt44Cj1Yj0TF0je1Q ## 作者简介 **Qixin(Taylor) Li** 某大型上市证券交易所高级数据工程师,毕业于UCLA,曾任职于某电商独角兽企业。有着丰富的数据分析及工程经验,熟悉数据仓库建设、数据平台设计,擅长任务调度系统、数据处理工具的开发。 **张鑫** AWS解决方案架构师,负责基于AWS云平台的解决方案咨询和设计,在系统架构、数仓和实时离线计算领域有丰富的研发和架构实践经验。
1
目录
关闭
contact-us