Amazon MWAA 实战分享 – Glue Job 任务调度

Serverless
0
0
[![image.png](https://dev-media.amazoncloud.cn/dfbf0f986f494f65a6ea77a6e8526702_image.png "image.png")](https://summit.awsevents.cn/2023/form.html?source=aHMZ6Q20We4igheElTULyiA9EY0oZ3rM/VD+PZulcC8S8qmXIkr6oo5CBkqLbtp7) ## **服务及场景介绍** ### **Amazon MWAA** Amazon MWAA (Amazon Managed Workflows for Apache Airflow) 是 Apache Airflow 的一项托管服务,让您可以使用当前熟悉的 Apache Airflow 平台来编排您的工作流程。您可以获得更高的可扩展性、可用性和安全性,而无需承担管理底层基础设施的运营负担。 ### **Amazon Glue** Amazon Glue 是一项无服务器数据集成服务,可让使用分析功能的用户轻松发现、准备、移动和集成来自多个来源的数据。您可以将其用于分析、机器学习和应用程序开发。它还包括用于编写、运行任务和实施业务工作流程的额外生产力和数据操作工具。 ### **场景介绍** 本文以典型数仓数据集成业务场景中的“抽数”场景为例,演示如何通过 Amazon MWAA 调度 Amazon Glue Job,抽取业务数据库(Amazon Aurora)数据到数仓(Amazon Redshift)中。 ### **Amazon Glue Job 任务开发** Amazon Glue 默认支持丰富的数据源同时也支持自定义 connector,任务开发方式提供可视化编辑界面以及 Jupyter Notebook 形式的 Spark/Pyspark 脚本开发。由于篇幅有限,本文将不详细展开 Amazon Glue Job 任务开发流程,简要流程如下: - **创建数据源及目标的 connection,详细可参考 Glue-Connections** - **任务逻辑开发**  ○ 通过 Amazon Glue Studio 进行可视化开发 Amazon Glue Studio  ○ 通过 Notebook 开发脚本 Notebook with Glue Studio - **验证任务能正常运行** **作者本次演示的 Amazon Glue Job 样例脚本代码**: ``` import sys from awsglue.trans forms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Jobfrom awsglue import DynamicFrame args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node MySQL table MySQLtable_node1 = glueContext.create_dynamic_frame.from_catalog(     database="demo_dataset", table_name="mysql_redshift_bigdata_test_visit_log_batch", transformation_ctx="MySQLtable_node1", ) # Script generated for node ApplyMapping ApplyMapping_node2 = ApplyMapping.apply(     frame=MySQLtable_node1,     mappings=[         ("pt", "int", "pt", "int"),         ("visit_time", "timestamp", "visit_time", "timestamp"), ("product_id", "string", "product_id", "string"),     ],     transformation_ctx="ApplyMapping_node2", ) # Script generated for node Amazon RedshiftAmazonRedshift_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2,     connection_type="redshift", connection_options={         "redshiftTmpDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",         "useConnectionProperties": "true",         "dbtable": "public.bigdata_test_visit_log_batch", "connectionName": "jerry-demo-redshift-connection", "preactions": "CREATE TABLE IF NOT EXISTS public.bigdata_test_visit_log_batch (pt INTEGER, visit_time TIMESTAMP, product_id VARCHAR); TRUNCATE TABLE public.bigdata_test_visit_log_batch;",     }, transformation_ctx="AmazonRedshift_node3", ) ``` ## **Amazon MWAA 环境配置以及 DAG 脚本开发** ### **配置环境** - **创建存放工程文件的 S3 桶** ○ 创建 dag/ 路径 ○ 创建 plugins/ 路径 ○ 创建 requirements/ 路径 ○ 创建 scripts/ 路径 - **创建 Amazon MWAA environment** ○ 选择版本,本次演示选择 5.1 版本 ○ 指定上面创建的 S3 桶,其中包括: \- DAGs 文件路径\ \- Plugins 文件路径(可选) \- Requirements 文件路径(可选) \- 启动脚本文件路径(可选) ○ 网络配置 \- VPC 配置(由于 Amazon MWAA 网络环境要求复杂,建议选择通过模板新建 VPC) \- Web Server UI 公网访问(本次演示选择公网访问打开,则以 IAM User/Role 进行 SSO 登录) \- 安全组(同样建议选择新建安全组) \- 节点规格选择 \- 数据加密(可选) \- 日志监控(本次演示为默认配置,实战中建议按需打开 scheduler 和 worker 日志以及配置日志级别,方便排错) \- Execution Role(建议选择新建,随后根据项目需求再额外添加所需权限。本次演示中为了简便,作者额外添加了 IAM 和 Glue 的 Full Access 权限,实际场景下请按照最小权限原则进行配置) ○ 等待环境部署完成(需要15分钟左右) ○ 通过控制台登录 Airflow UI ![image.png](https://dev-media.amazoncloud.cn/3581118566304e18822bb27ab792654d_image.png "image.png") ![image.png](https://dev-media.amazoncloud.cn/e1c708edba734980abd8bfec2b27c910_image.png "image.png") ### **DAG 脚本开发** Airflow 原生提供非常丰富的 operator,为任务编排提供极大的灵活度,本次演示中,作者引入 GlueJobOperator 来进行对 Glue Job 的调度。 本次演示中的样例 DAG 代码 jerry-demo-mysql-redshift-dag.py 如下: ``` from airflow import DAG from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.utils.dates import days_ago import os from datetime import datetime as dt DAG_ID = os.path.basename(__file__).replace(".py", "") hour = dt.now().strftime("%Y%m%d%H") with DAG(dag_id=DAG_ID, schedule_interval="0 * * * *", catchup=False, start_date=days_ago(1)) as dag:     submit_glue_job = GlueJobOperator(         task_id="jerry-demo-mysql-redshift-updated", job_name="jerry-demo-mysql-redshift-updated", script_location=f"s3://aws-glue-assets-xxx-us-west-2/scripts/jerry-demo-mysql-redshift.py",         s3_bucket="aws-glue-assets-xxx-us-west-2", iam_role_name="glue_s3_full_access",         create_job_kwargs={             "GlueVersion": "4.0",             "NumberOfWorkers": 2,             "WorkerType": "G.1X",             "Connections":{"Connections":["jerry-demo-aurora","jerry-demo-redshift-connection"]},             "DefaultArguments": {                 "--enable-auto-scaling": "true",                 "--max-num-workers": "10",                 "--enable-metrics": "true",                 "--metrics-sample-rate": "1",                 "--job-bookmark-option": "job-bookmark-disable", "--enable-continuous-cloudwatch-log": "true", "--log-level": "INFO",                 "--enable-glue-datacatalog": "true",                 "--enable-spark-ui": "true", "--enable-job-insights": "true",                 "--TempDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",                 "--spark-event-logs-path": "s3://aws-glue-assets-xxx-us-west-2/sparkHistoryLogs/"                 }         }     ) submit_glue_job ``` 重要参数说明: dag_id:Airflow 中 DAG ID task_id:Airflow 中 DAG 任务里面单一 task id\ job_name:Glue Job 名称 script_location:Glue Job 脚本存放路径 s3\_bucket:Glue Job 运行日志存放路径 iam_role_name:Glue Job 运行时赋予的角色名称(无需完整 ARN) create_job_kwargs:Glue Job 额外配置参数 将开发好的 DAG 脚本上传至之前定义好的 S3 桶下的 dag/路径之后,随即就能在 Airflow UI 上看到 DAG 任务的展示,然后可以手动触发任务执行。 ![image.png](https://dev-media.amazoncloud.cn/2a0c57cc73d04b24a2ad378616a78de0_image.png "image.png") 任务启动后,可以到 Amazon Glue 控制台 ETL Jobs 页面查看任务已经被调度起来了。 ![image.png](https://dev-media.amazoncloud.cn/0dd1becca37540f2a7551827b2c517ce_image.png "image.png") 至此已简单演示完如何通过 Amazon MWAA 服务统一进行 Glue ETL 任务调度。在后续内容中,作者还会给大家演示如何进行 Cross DAG 的上下游依赖调度,以及如何进行基于任务责任人和任务颗粒度的告警通知。 **2023亚马逊云科技中国峰会即将开启!** **👇👇👇点击下方图片即刻注册👇👇👇** [![image.png](https://dev-media.amazoncloud.cn/c0795290db5d4abeb4be2fc9e315ffea_image.png "image.png")](https://summit.awsevents.cn/2023/form.html?source=aHMZ6Q20We4igheElTULyiA9EY0oZ3rM/VD+PZulcC8S8qmXIkr6oo5CBkqLbtp7?trk=cndc-detail )
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭
contact-us