[![image.png](https://dev-media.amazoncloud.cn/dfbf0f986f494f65a6ea77a6e8526702_image.png "image.png")](https://summit.awsevents.cn/2023/form.html?source=aHMZ6Q20We4igheElTULyiA9EY0oZ3rM/VD+PZulcC8S8qmXIkr6oo5CBkqLbtp7)
## **服务及场景介绍**
### **Amazon MWAA**
Amazon MWAA ([Amazon Managed Workflows for Apache Airflow](https://aws.amazon.com/cn/managed-workflows-for-apache-airflow/?trk=cndc-detail)) 是 Apache Airflow 的一项托管服务,让您可以使用当前熟悉的 Apache Airflow 平台来编排您的工作流程。您可以获得更高的可扩展性、可用性和安全性,而无需承担管理底层基础设施的运营负担。
### **Amazon Glue**
Amazon Glue 是一项[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)数据集成服务,可让使用分析功能的用户轻松发现、准备、移动和集成来自多个来源的数据。您可以将其用于分析、[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)和应用程序开发。它还包括用于编写、运行任务和实施业务工作流程的额外生产力和数据操作工具。
### **场景介绍**
本文以典型数仓数据集成业务场景中的“抽数”场景为例,演示如何通过 Amazon MWAA 调度 Amazon Glue Job,抽取业务数据库([Amazon Aurora](https://aws.amazon.com/cn/rds/aurora/?trk=cndc-detail))数据到数仓([Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail))中。
### **Amazon Glue Job 任务开发**
Amazon Glue 默认支持丰富的数据源同时也支持自定义 connector,任务开发方式提供可视化编辑界面以及 Jupyter Notebook 形式的 Spark/Pyspark 脚本开发。由于篇幅有限,本文将不详细展开 Amazon Glue Job 任务开发流程,简要流程如下:
- **创建数据源及目标的 connection,详细可参考 Glue-Connections**
- **任务逻辑开发**
○ 通过 Amazon Glue Studio 进行可视化开发 Amazon Glue Studio
○ 通过 Notebook 开发脚本 Notebook with Glue Studio
- **验证任务能正常运行**
**作者本次演示的 Amazon Glue Job 样例脚本代码**:
```
import sys
from awsglue.trans
forms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Jobfrom awsglue import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node MySQL table
MySQLtable_node1 = glueContext.create_dynamic_frame.from_catalog(
database="demo_dataset",
table_name="mysql_redshift_bigdata_test_visit_log_batch",
transformation_ctx="MySQLtable_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=MySQLtable_node1,
mappings=[
("pt", "int", "pt", "int"),
("visit_time", "timestamp", "visit_time", "timestamp"),
("product_id", "string", "product_id", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node
Amazon RedshiftAmazonRedshift_node3 = glueContext.write_dynamic_frame.from_options(
frame=ApplyMapping_node2,
connection_type="redshift",
connection_options={
"redshiftTmpDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",
"useConnectionProperties": "true",
"dbtable": "public.bigdata_test_visit_log_batch",
"connectionName": "jerry-demo-redshift-connection",
"preactions": "CREATE TABLE IF NOT EXISTS public.bigdata_test_visit_log_batch (pt INTEGER, visit_time TIMESTAMP, product_id VARCHAR); TRUNCATE TABLE public.bigdata_test_visit_log_batch;",
},
transformation_ctx="AmazonRedshift_node3",
)
```
## **Amazon MWAA 环境配置以及 DAG 脚本开发**
### **配置环境**
- **创建存放工程文件的 S3 桶**
○ 创建 dag/ 路径
○ 创建 plugins/ 路径
○ 创建 requirements/ 路径
○ 创建 scripts/ 路径
- **创建 Amazon MWAA environment**
○ 选择版本,本次演示选择 5.1 版本
○ 指定上面创建的 S3 桶,其中包括:
\- DAGs 文件路径\
\- Plugins 文件路径(可选)
\- Requirements 文件路径(可选)
\- 启动脚本文件路径(可选)
○ 网络配置
\- VPC 配置(由于 Amazon MWAA 网络环境要求复杂,建议选择通过模板新建 VPC)
\- Web Server UI 公网访问(本次演示选择公网访问打开,则以 IAM User/Role 进行 SSO 登录)
\- 安全组(同样建议选择新建安全组)
\- 节点规格选择
\- 数据加密(可选)
\- 日志监控(本次演示为默认配置,实战中建议按需打开 scheduler 和 worker 日志以及配置日志级别,方便排错)
\- Execution Role(建议选择新建,随后根据项目需求再额外添加所需权限。本次演示中为了简便,作者额外添加了 IAM 和 Glue 的 Full Access 权限,实际场景下请按照最小权限原则进行配置)
○ 等待环境部署完成(需要15分钟左右)
○ 通过控制台登录 Airflow UI
![image.png](https://dev-media.amazoncloud.cn/3581118566304e18822bb27ab792654d_image.png "image.png")
![image.png](https://dev-media.amazoncloud.cn/e1c708edba734980abd8bfec2b27c910_image.png "image.png")
### **DAG 脚本开发**
Airflow 原生提供非常丰富的 operator,为任务编排提供极大的灵活度,本次演示中,作者引入 GlueJobOperator 来进行对 Glue Job 的调度。
本次演示中的样例 DAG 代码 jerry-demo-mysql-redshift-dag.py 如下:
```
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.utils.dates import days_ago
import os
from datetime import datetime as dt
DAG_ID = os.path.basename(__file__).replace(".py", "")
hour = dt.now().strftime("%Y%m%d%H")
with DAG(dag_id=DAG_ID, schedule_interval="0 * * * *", catchup=False, start_date=days_ago(1)) as dag:
submit_glue_job = GlueJobOperator(
task_id="jerry-demo-mysql-redshift-updated",
job_name="jerry-demo-mysql-redshift-updated",
script_location=f"s3://aws-glue-assets-xxx-us-west-2/scripts/jerry-demo-mysql-redshift.py",
s3_bucket="aws-glue-assets-xxx-us-west-2",
iam_role_name="glue_s3_full_access",
create_job_kwargs={
"GlueVersion": "4.0",
"NumberOfWorkers": 2,
"WorkerType": "G.1X",
"Connections":{"Connections":["jerry-demo-aurora","jerry-demo-redshift-connection"]},
"DefaultArguments": {
"--enable-auto-scaling": "true",
"--max-num-workers": "10",
"--enable-metrics": "true",
"--metrics-sample-rate": "1",
"--job-bookmark-option": "job-bookmark-disable",
"--enable-continuous-cloudwatch-log": "true",
"--log-level": "INFO",
"--enable-glue-datacatalog": "true",
"--enable-spark-ui": "true",
"--enable-job-insights": "true",
"--TempDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",
"--spark-event-logs-path": "s3://aws-glue-assets-xxx-us-west-2/sparkHistoryLogs/"
}
}
)
submit_glue_job
```
重要参数说明:
dag_id:Airflow 中 DAG ID
task_id:Airflow 中 DAG 任务里面单一 task id\
job_name:Glue Job 名称
script_location:Glue Job 脚本存放路径
s3\_bucket:Glue Job 运行日志存放路径
iam_role_name:Glue Job 运行时赋予的角色名称(无需完整 ARN)
create_job_kwargs:Glue Job 额外配置参数
将开发好的 DAG 脚本上传至之前定义好的 S3 桶下的 dag/路径之后,随即就能在 Airflow UI 上看到 DAG 任务的展示,然后可以手动触发任务执行。
![image.png](https://dev-media.amazoncloud.cn/2a0c57cc73d04b24a2ad378616a78de0_image.png "image.png")
任务启动后,可以到 Amazon Glue 控制台 ETL Jobs 页面查看任务已经被调度起来了。
![image.png](https://dev-media.amazoncloud.cn/0dd1becca37540f2a7551827b2c517ce_image.png "image.png")
至此已简单演示完如何通过 Amazon MWAA 服务统一进行 Glue ETL 任务调度。在后续内容中,作者还会给大家演示如何进行 Cross DAG 的上下游依赖调度,以及如何进行基于任务责任人和任务颗粒度的告警通知。
**2023亚马逊云科技中国峰会即将开启!**
**👇👇👇点击下方图片即刻注册👇👇👇**
[![image.png](https://dev-media.amazoncloud.cn/c0795290db5d4abeb4be2fc9e315ffea_image.png "image.png")](https://summit.awsevents.cn/2023/form.html?source=aHMZ6Q20We4igheElTULyiA9EY0oZ3rM/VD+PZulcC8S8qmXIkr6oo5CBkqLbtp7?trk=cndc-detail )