![640.gif](https://dev-media.amazoncloud.cn/c6e6fc9e90c54d70adeff32bab423f66_640.gif "640.gif")
# **服务及场景介绍**
### **Amazon MWAA**
Amazon MWAA ([Amazon Managed Workflows for Apache Airflow](https://aws.amazon.com/cn/managed-workflows-for-apache-airflow/?trk=cndc-detail)) 是 Apache Airflow 的一项托管服务,让您可以使用当前熟悉的 Apache Airflow 平台来编排您的工作流程。您可以获得更高的可扩展性、可用性和安全性,而无需承担管理底层基础设施的运营负担。
### **Amazon SNS**
[Amazon Simple Notification Service](https://aws.amazon.com/cn/sns/?trk=cndc-detail) ([Amazon SNS](https://aws.amazon.com/cn/sns/?trk=cndc-detail)) 是一项托管服务,提供从发布者向订阅者(也称为创建者和使用者)的消息传输。发布者通过将消息发送至主题与订阅者进行异步交流,主题是一个逻辑访问点和通信渠道。客户端可以订阅 SNS 主题并使用受支持的终端节点类型接收已发布的消息,例如 [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Firehose、[Amazon SQS](https://aws.amazon.com/cn/sqs/?trk=cndc-detail)、Amazon Lambda、HTTP、电子邮件、移动推送通知和移动短信(SMS)。
### **Amazon Lambda**
Amazon Lambda 是一项[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)事件驱动型计算服务,该服务使您可以运行几乎任何类型的应用程序或后端服务的代码,而无需预置或管理服务器。您可以从 200 多个亚马逊云科技服务和软件即服务(SaaS)应用程序中触发 Lambda,且只需按您的使用量付费。
### **场景介绍**
本文将演示在常规的数仓调度任务监控中,如何实现基于 Airflow Task 颗粒度的监控(Task Owner,DAG ID,Task ID,执行时间),同时将告警信息通知到第三方消息工具(本文以飞书为示例)。
### **方案架构**
![image.png](https://dev-media.amazoncloud.cn/497c260371ca4c8fa34162d3d81e58c9_image.png "image.png")
方案包含两个部分实现(细节在下文中说明):
* 竖线左方为 Amazon MWAA Task 监控,通过自定义回调函数将告警信息推送到 [Amazon SNS](https://aws.amazon.com/cn/sns/?trk=cndc-detail)
* 竖线右方为现有的 Feish Notifier 一键部署方案,负责对消息进行自定义格式化以及通过 Webhook API 推送到第三方客户端
# **部署流程**
### **飞书通知链路部署**
* 飞书提前创建好 Webhook API,并记录下来供后续使用(本文将忽略创建步骤)
* Console 控制台搜索栏上搜索关键字“sar”
![image.png](https://dev-media.amazoncloud.cn/421570f54caf4ddaab38156d6d6c22e1_image.png "image.png")
* 点击左方“Available Application”,并搜索“feishu”关键字
![image.png](https://dev-media.amazoncloud.cn/e91b52adab9e4f29be144adc2f0ec6f6_image.png "image.png")
* 点击“Feish-Notifier”,将创建好的飞书 Webhook URL 输入后开始部署所需资源
![image.png](https://dev-media.amazoncloud.cn/84af1f7f38084e6491dfc4e4f8c1a814_image.png "image.png")
* 部署完成后从“Resources”中复制 [Amazon SNS](https://aws.amazon.com/cn/sns/?trk=cndc-detail) 完整 ARN 供后续使用
![image.png](https://dev-media.amazoncloud.cn/b9476225dec841319ce5f93781df4bf7_image.png "image.png")
至此已完成飞书通知链路部署。
### **Amazon MWAA Task 监控配置**
* Task 监控配置。Airflow DAG 配置参数中,“on_failure_callback”参数的作用是 Task 执行失败后可以执行此参数指定的自定义回调函数,我们就可以通过此参数来进行基于任务颗粒度的监控以及非常灵活地进行自定义告警处理。
```
default_args = {
"owner": "JerryWONG",
"depends_on_past": False,
#"retries": 1,
#"retry_delay": timedelta(minutes=5),
"on_failure_callback": on_failure_callback
}
```
* 回调函数样例。此样例中核心处理逻辑是从回调上下文(context)里面将任务对象(TaskInstance)取出,并将对应的 owner、task_id,dag_id,execution_date 的值获取到,作为消息内容发送到上述步骤建立的 [Amazon SNS](https://aws.amazon.com/cn/sns/?trk=cndc-detail) 通道,从而推送到飞书客户端。
```
def on_failure_callback(context):
# Initiate TaskInstance object
ti = context.get('ti')
# Get Task attributes
owner = ti.task.owner
execution_date = ti.execution_date
task_id = ti.task_id
dag_id = ti.task.dag_id
op = SnsPublishOperator(
task_id='dag_failure',
target_arn=sns_topic_arn,
subject="DAG FAILED",
message_attributes={"MessageStructure": 'string'},
message=f"MWAA(Airflow) task has failed, please check the task details:\\n\\n- Owner: {owner},\\n- Execution_Date: {execution_date},\\n- Task_ID: {task_id},\\n- DAG_ID: {dag_id}"
)
op.execute(context)
```
* 完整 DAG 代码样例
```
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
from airflow.utils.dates import days_ago
from airflow.models import TaskInstance
from airflow.sensors.external_task import ExternalTaskSensor
import os
from datetime import datetime as dt, timedelta
# Define AWS SNS Topic ARN
# sns_topic_arn = os.getenv('SNS_TOPIC_ARN')
sns_topic_arn = 'arn:aws:sns:us-west-2:xxx:serverlessrepo-jerry-demo-Feishu-Notifier-ServerlessNotifierSNSTopic-XTDxij194UZp'
# Task failure callback function
# Call AWS SNS API to send notification
def on_failure_callback(context):
# Initiate TaskInstance object
ti = context.get('ti')
# Get Task attributes
owner = ti.task.owner
execution_date = ti.execution_date
task_id = ti.task_id
dag_id = ti.task.dag_id
op = SnsPublishOperator(
task_id='dag_failure',
target_arn=sns_topic_arn,
subject="DAG FAILED",
message_attributes={"MessageStructure": 'string'},
message=f"MWAA(Airflow) task has failed, please check the task details:\\n\\n- Owner: {owner},\\n- Execution_Date: {execution_date},\\n- Task_ID: {task_id},\\n- DAG_ID: {dag_id}"
)
op.execute(context)
# Default settings applied to all tasks
default_args = {
"owner": "JerryWONG",
"depends_on_past": False,
#"retries": 1,
#"retry_delay": timedelta(minutes=5),
"on_failure_callback": on_failure_callback
}
DAG_ID = os.path.basename(__file__).replace(".py", "")
hour = dt.now().strftime("%Y%m%d%H")
with DAG(
dag_id=DAG_ID,
schedule_interval="0 * * * *",
catchup=False,
start_date=days_ago(1),
default_args=default_args,
) as dag:
submit_glue_job = GlueJobOperator(
task_id="jerry-demo-redshift-msk-updated",
job_name="jerry-demo-redshift-msk-updated",
script_location=f"s3://aws-glue-assets-xxx-us-west-2/scripts/jerry-demo-redshift-msk.py",
s3_bucket="aws-glue-assets-xxx-us-west-2",
iam_role_name="glue_s3_full_access",
create_job_kwargs={
"GlueVersion": "4.0",
"NumberOfWorkers": 2,
"WorkerType": "G.1X",
"Connections": {"Connections": ["jerry-demo-redshift-connection"]},
"DefaultArguments": {
"--enable-auto-scaling": "true",
"--max-num-workers": "10",
"--enable-metrics": "true",
"--metrics-sample-rate": "1",
"--job-bookmark-option": "job-bookmark-disable",
"--enable-continuous-cloudwatch-log": "true",
"--log-level": "INFO",
"--enable-glue-datacatalog": "true",
"--enable-spark-ui": "true",
"--enable-job-insights": "true",
"--TempDir": "s3://aws-glue-assets-xxx-us-west-2/temporary/",
"--spark-event-logs-path": "s3://aws-glue-assets-xxx-us-west-2/sparkHistoryLogs/"
}
}
)
# wait for external dag to finish
wait_for_external_task = ExternalTaskSensor(
task_id="wait_for_external_dag",
external_dag_id="jerry-demo-mysql-redshift-dag",
external_task_id=None,
check_existence=True,
# @与执行的external任务的时间差
# execution_delta=timedelta(minutes=40),
# allowed_states=["success"],
# failed_states=["failed", "skipped"],
mode="reschedule",
timeout=60
)
wait_for_external_task >> submit_glue_job
```
最后作者特意运行一个配置错误导致 Task 运行失败的 DAG 来验证效果。任务失败时,马上能在飞书客户端收到自定义告警通知。
![image.png](https://dev-media.amazoncloud.cn/4df023ed5ccb4837bc53f2372c02e34b_image.png "image.png")
至此已完整演示如何实现 Amazon MWAA 基于任务责任人和任务颗粒度的告警通知方案。
##### **本篇作者**
![image.png](https://dev-media.amazoncloud.cn/98b102f5979648dfa02785eb2695ded7_image.png "image.png")
**黄家曦**
*亚马逊云科技解决方案架构师,负责基于亚马逊云科技云计算方案咨询和设计。曾就职于思科、中国电信,在网络以及运营商骨干网有丰富经验,热衷于大数据领域,熟悉 EMR、Kinesis、Athena、Glue 等数据分析服务与方案设计。*