[![image.png](https://dev-media.amazoncloud.cn/95a7d085cb9c461da48569312a86b301_image.png "image.png")](https://summit.awsevents.cn/2023/form.html?source=aHMZ6Q20We4igheElTULyiA9EY0oZ3rM/VD+PZulcC8S8qmXIkr6oo5CBkqLbtp7)
### **服务及场景介绍**
- **Amazon MWAA**
Amazon MWAA ([Amazon Managed Workflows for Apache Airflow](https://aws.amazon.com/cn/managed-workflows-for-apache-airflow/?trk=cndc-detail)) 是 Apache Airflow 的一项托管服务,让您可以使用当前熟悉的 Apache Airflow 平台来编排您的工作流程。您可以获得更高的可扩展性、可用性和安全性,而无需承担管理底层基础设施的运营负担。
- **Amazon Glue**
Amazon Glue 是一项[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)数据集成服务,可让使用分析功能的用户轻松发现、准备、移动和集成来自多个来源的数据。您可以将其用于分析、[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)和应用程序开发。它还包括用于编写、运行任务和实施业务工作流程的额外生产力和数据操作工具。
- **[Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail)**
[Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) 是一种完全托管的 PB 级云中数据仓库服务。[Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) Serverless 让您可以访问和分析数据,而无需对预置数据仓库执行任何配置操作。系统将自动预置资源,数据仓库的容量会智能扩展,即使面对要求最为苛刻且不可预测的工作负载也能提供高速性能。
- **Amazon MSK**
[Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/cn/msk/?trk=cndc-detail) (Amazon MSK) 是一种亚马逊云科技流数据服务,可管理 Apache Kafka 基础设施和运营,让开发人员和 DevOps 经理可以轻松地在亚马逊云科技上运行 Apache Kafka 应用程序和 Kafka Connect 连接器,而无需成为运行 Apache Kafka 方面的专家。Amazon MSK 运营、维护和扩展 Apache Kafka 集群,提供开箱即用的企业级安全功能,并具有内置的亚马逊云科技集成,可加速流数据应用程序的开发。
- **场景介绍**
本文以典型数仓数据集成(抽取-数据)业务场景,演示如何通过 Amazon MWAA 进行跨 DAG 文件的上下游依赖调度。
**上游 DAG 任务**:抽取业务数据库([Amazon Aurora](https://aws.amazon.com/cn/rds/aurora/?trk=cndc-detail))数据到数仓([Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail))中。
**下游 DAG 任务**:数据推送到 Amazon MSK 且依赖上游任务。
**数据集成任务**:Amazon Glue Job
### **本次演示的 Amazon Glue Job 样例脚本代码**
**上游数据集成 Glue Job 脚本**
```js
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node MySQL table
MySQLtable_node1 = glueContext.create_dynamic_frame.from_catalog(
database="demo_dataset",
table_name="mysql_redshift_bigdata_test_visit_log_batch",
transformation_ctx="MySQLtable_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=MySQLtable_node1,
mappings=[
("pt", "int", "pt", "int"),
("visit_time", "timestamp", "visit_time", "timestamp"),
("product_id", "string", "product_id", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node Amazon Redshift
AmazonRedshift_node3 = glueContext.write_dynamic_frame.from_options(
frame=ApplyMapping_node2,
connection_type="redshift",
connection_options={
"redshiftTmpDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",
"useConnectionProperties": "true",
"dbtable": "public.bigdata_test_visit_log_batch",
"connectionName": "jerry-demo-redshift-connection",
"preactions": "CREATE TABLE IF NOT EXISTS public.bigdata_test_visit_log_batch (pt INTEGER, visit_time TIMESTAMP, product_id VARCHAR); TRUNCATE TABLE public.bigdata_test_visit_log_batch;",
},
transformation_ctx="AmazonRedshift_node3",
)
job.commit()
```
**下游数据集成 Glue Job 脚本**
(数据转成 DataFrame 形式写入 Amazon MSK)
```js
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
redshift_connection_options = {
# JDBC URL to Amazon Redshift Workgroup, available from the Amazon Redshift console
"url": "jdbc:redshift://jerry-demo-redshift.xxxxxx.us-west-2.redshift.amazonaws.com:5439/dev",
"dbtable": "public.bigdata_test_visit_log_batch",
"redshiftTmpDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",
"aws_iam_role": "arn:aws:iam::xxx:role/jerry_redshift_role",
"user": "YourUserName",
"password": "YourPassword"
}
# In the following, glueContext is your Glue Context for the ETL job.
# To load from Amazon Redshift
dyf = glueContext.create_dynamic_frame_from_options("redshift", redshift_connection_options)
df = dyf.toDF()
# df.show()
from pyspark.sql.functions import *
df_withjson=(df.withColumn('value', to_json(struct(col("*")))))
df_withjson.selectExpr("CAST(value AS STRING)") \\
.write \\
.format("kafka") \\
.option("kafka.bootstrap.servers", "b-2.jerry-demo-msk-cluste.pr2zz2.c10.kafka.us-west-2.amazonaws.com:9092,b-3.jerry-demo-msk-cluste.pr2zz2.c10.kafka.us-west-2.amazonaws.com:9092,b-1.jerry-demo-msk-cluste.pr2zz2.c10.kafka.us-west-2.amazonaws.com:9092") \\
.option("topic", "redshift-msk") \\
.save()
job.commit()
```
<!--StartFragment-->
**重要参数说明:**
**url**: [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) JDBC 连接串
**dbtable**: 访问的表名(schema.table 形式)
**aws_iam_role**: [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) 使用的 IAM Role(需要授权 Redshift 足够的权限去访问数据源,如 S3)
**user/password**: [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) 用户用户名、密码(生产上可使用 IAM 方式进行鉴权)
### **Amazon MWAA DAG 样例脚本代码**
- **上游任务 DAG 脚本 – jerry-demo-mysql-redshift-dag.py**
```js
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.utils.dates import days_ago
import os
from datetime import datetime as dt
DAG_ID = os.path.basename(__file__).replace(".py", "")
hour = dt.now().strftime("%Y%m%d%H")
with DAG(dag_id=DAG_ID, schedule_interval="0 * * * *", catchup=False, start_date=days_ago(1)) as dag:
submit_glue_job = GlueJobOperator(
task_id="jerry-demo-mysql-redshift-updated",
job_name="jerry-demo-mysql-redshift-updated",
script_location=f"s3://aws-glue-assets-xxx-us-west-2/scripts/jerry-demo-mysql-redshift.py",
s3_bucket="aws-glue-assets-xxx-us-west-2",
iam_role_name="glue_s3_full_access",
create_job_kwargs={
"GlueVersion": "4.0",
"NumberOfWorkers": 2,
"WorkerType": "G.1X",
"Connections":{"Connections":["jerry-demo-aurora","jerry-demo-redshift-connection"]},
"DefaultArguments": {
"--enable-auto-scaling": "true",
"--max-num-workers": "10",
"--enable-metrics": "true",
"--metrics-sample-rate": "1",
"--job-bookmark-option": "job-bookmark-disable",
"--enable-continuous-cloudwatch-log": "true",
"--log-level": "INFO",
"--enable-glue-datacatalog": "true",
"--enable-spark-ui": "true",
"--enable-job-insights": "true",
"--TempDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",
"--spark-event-logs-path": "s3://aws-glue-assets-xxx-us-west-2/sparkHistoryLogs/"
}
}
)
submit_glue_job
```
- **下游任务 DAG 脚本(依赖上游 DAG) – jerry-demo-redshift-msk-dag-sensor.py**
脚本中引入了 Airflow sensor 中的 ExternalTaskSensor 作为依赖算子,并配置上游 DAG ID 作为依赖项。
```js
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.utils.dates import days_ago
from airflow.utils import timezone
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import os
from datetime import datetime as dt
DAG_ID = os.path.basename(__file__).replace(".py", "")
hour = dt.now().strftime("%Y%m%d%H")
with DAG(dag_id=DAG_ID, schedule_interval="0 * * * *", catchup=False, start_date=days_ago(1)) as dag:
submit_glue_job = GlueJobOperator(
task_id="jerry-demo-redshift-msk-updated",
job_name="jerry-demo-redshift-msk-updated",
script_location=f"s3://aws-glue-assets-xxx-us-west-2/scripts/jerry-demo-redshift-msk-updated.py",
s3_bucket="aws-glue-assets-xxx-us-west-2",
iam_role_name="glue_s3_full_access",
create_job_kwargs={
"GlueVersion": "4.0",
"NumberOfWorkers": 2,
"WorkerType": "G.1X",
"Connections":{"Connections":["jerry-demo-redshift-connection"]},
"DefaultArguments": {
"--enable-auto-scaling": "true",
"--max-num-workers": "10",
"--enable-metrics": "true",
"--metrics-sample-rate": "1",
"--job-bookmark-option": "job-bookmark-disable",
"--enable-continuous-cloudwatch-log": "true",
"--log-level": "INFO",
"--enable-glue-datacatalog": "true",
"--enable-spark-ui": "true",
"--enable-job-insights": "true",
"--TempDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",
"--spark-event-logs-path": "s3://aws-glue-assets-xxx-us-west-2/sparkHistoryLogs/"
}
}
)
#wait for external dag to finish
wait_for_external_task = ExternalTaskSensor(
task_id="wait_for_external_dag",
external_dag_id="jerry-demo-mysql-redshift-dag",
external_task_id=None,
check_existence=True,
#与执行的external任务的时间差
#execution_delta=timedelta(minutes=40),
#allowed_states=["success"],
#failed_states=["failed", "skipped"],
mode="reschedule",
timeout=180
)
wait_for_external_task >> submit_glue_job
```
**重要参数说明:**
**dag_id**: Airflow 中 DAG ID
**task_id**: Airflow 中 DAG 任务里面单一 task id
**job_name**: Glue Job 名称
**script_location**: Glue Job 脚本存放路径
**s3\_bucket**: Glue Job 运行日志存放路径
**iam_role_name**: Glue Job 运行时赋予的角色名称(无需完整 ARN)
**create_job_kwargs**: Glue Job 额外配置参数
将开发好的 DAG 脚本上传至 Amazon MWAA 监听的 S3 桶下的 dag/ 路径之后,随即就能在 Airflow UI 上看到 DAG 任务的展示,然后可以手动触发任务执行。
![image.png](https://dev-media.amazoncloud.cn/a51836264e434f08b13f78ccf0854b1a_image.png "image.png")
上下游任务启动后,在 Airflow UI – DAG – \[下游 DAG ID] 页面下,可以观察到 task“wait_for_external_dag”处于“up_for_reschedule”状态,代表在监听上游 DAG 任务运行。
![image.png](https://dev-media.amazoncloud.cn/1105eef44ab743cf8ae53591fe22dfa6_image.png "image.png")
Task 任务 log 日志观察监听过程:
![image.png](https://dev-media.amazoncloud.cn/b57778b511c4491297bf956bcdd3509d_image.png "image.png")
![image.png](https://dev-media.amazoncloud.cn/6676d40a31ce4fd0aa76cdf17768647c_image.png "image.png")
至此已简单演示完如何通过 Amazon MWAA 服务进行 Cross DAG 任务调度。在后续内容中,将为大家讲解如何进行基于任务责任人和任务颗粒度的告警通知,敬请期待!
![image.png](https://dev-media.amazoncloud.cn/18fcfb26bef2495487ace81d63e7433d_image.png "image.png")
[![image.png](https://dev-media.amazoncloud.cn/ac6b4d6df9c64b548acd6ab800b891f4_image.png "image.png")](https://summit.awsevents.cn/2023/form.html?source=aHMZ6Q20We4igheElTULyiA9EY0oZ3rM/VD+PZulcC8S8qmXIkr6oo5CBkqLbtp7)