利用 Amazon Lambda 将 Kinesis Data Stream 数据批量自动写入 Amazon MSK

Serverless
数据分析
数据处理
Amazon Pinpoint
Amazon Lambda
0
0
# 背景 在混合云架构中,用户的一些应用原本运行在自建的数据中心。这些应用程序统一从 Apache Kafka 中拉取实时数据做分析和处理,例如监控系统、大数据分析平台等。由于业务发展需要,用户将这些工作负载部分迁移到了 Amazon 上,或者在 Amazon 上构建新的应用。 由于 Amazon 部分服务仅支持以 [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Stream 方式输出日志或数据,例如 [Amazon Pinpoint](https://aws.amazon.com/cn/pinpoint/?trk=cndc-detail),因此 Kinesis Data Stream 数据投递一般使用 Kinesis Firehose 或者自有应用拉取。而 Kinesis Firehose 的数据投递目标并不支持 Kafka,因此需要用户修改应用拉取数据的方式。如果这个应用需要同时运行数据中心和 Amazon 云,用户则需要维护支持2个接口的应用;或者假如这些应用的数量非常多,用户改造和测试它需要大量的人力和时间的时候,我们会希望通过一种无需运维管理的方式,能自动将 Kinesis Data Stream 中的数据导入到 Kafka 的方案。这样所有的应用无需修改,便能平滑迁移到 Amazon 上,同时运维人力成本上也基本不会增加。 Amazon Lambda 是一项[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)事件驱动型计算服务。利用 Amazon Lambda 的[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)特性,我们可以做到无论需要对接多少个 Kinesis Data Stream 都可以支持。每个 Kinesis Data Stream 需要接收多大的数据流量,我们都能自动扩展,而底层资源无需运维管理。 # 解决方案 ## 架构图 ![1b96ecabce20a1fd5f745fb4efdef7d1.jpg](https://dev-media.amazoncloud.cn/892c9250d6064af1ab963e42d95d17a1_1b96ecabce20a1fd5f745fb4efdef7d1.jpg "1b96ecabce20a1fd5f745fb4efdef7d1.jpg") ## 架构说明 本方案完全采用 Serverless 架构,主要通过 Lambda 实现 Kinesis 和 Amazon MSK 数据相互传输到目的。当 Lambda 执行失败的时候,会触发 [Amazon SNS](https://aws.amazon.com/cn/sns/?trk=cndc-detail)(可替换为 [Amazon SQS](https://aws.amazon.com/cn/sqs/?trk=cndc-detail))以发送通知信息。 ## 一键部署代码 参考 **Github 开源代码**: https://github.com/yourlin/Kinesis2MSK?trk=cndc-detail ## 部署方式 先安装 SAM-CLI,具体步骤参考: **安装 Amazon SAMLinux 上的 CLI – Amazon Serverless Application Model** https://docs.aws.amazon.com/zh_cn/serverless-application-model/latest/developerguide/serverless-sam-cli-install-linux.html?trk=cndc-detail 然后下载代码修改参数后部署。 ```js git clone https://github.com/yourlin/Kinesis2MSK cd Kinesis2MSK sam build sam deploy --guided ``` 左滑查看更多 # 关键参数 主要配置文件 `template.yaml`,MSK 配置在 `kinesis_to_msk/config.py` 中。 ![58be1987eff2b04c692ee89365266a1b.jpg](https://dev-media.amazoncloud.cn/f91bf949a45a482ea7e88e18a99d6c98_58be1987eff2b04c692ee89365266a1b.jpg "58be1987eff2b04c692ee89365266a1b.jpg") ## 环境配置说明 全局配置: ```js Globals: Function: Timeout: 5 MemorySize: 128 ``` 函数超时时间默认为:5秒。 函数内存:128MB。如果加大每次批处理的数据量可适当提高内存量。 **VPC 配置** ```js VpcConfig: SecurityGroupIds: - sg-087f1d9e26d9140ad # 该安全组,是指定的 VPC 内提前创建好的 SubnetIds: - subnet-0eafba9fee295d045 # 提前创建好的子网 1 - subnet-093cb4ccabff0d3d5 # 提前创建好的子网 2 - subnet-08c55a13b324420b8 # 提前创建好的子网 3 ``` 左滑查看更多 注意:除非 MSK 配置为公网可访问,否则必须将 Lambda 配置在和 MSK 同一个 VPC 或者跟 MSK 所在 VPC 已经打通的其他 VPC 内,否则会 Lambda 无法正常连接 MSK,引起连接超时。 安全组和子网推荐使用**预定义**好的,子网至少选择2个,保证其高可用。 **Kinesis 源配置** ```js Events: Kinesis2MSK: Type: Kinesis # More info about API Event Source: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api Properties: Stream: arn:aws:kinesis:us-east-1:784675006790:stream/poc-kinesis # Create kinesis stream before run it StartingPosition: TRIM_HORIZON BatchSize: 10 MaximumBatchingWindowInSeconds: 10 Enabled: true ParallelizationFactor: 8 MaximumRetryAttempts: 100 BisectBatchOnFunctionError: true MaximumRecordAgeInSeconds: 604800 DestinationConfig: OnFailure: Type: SNS Destination: arn:aws:sns:us-east-1:784675006790:email # Create SNS Topic before run it TumblingWindowInSeconds: 0 FunctionResponseTypes: - ReportBatchItemFailures ``` 左滑查看更多 详细参数说明参考文档: **将 Amazon Lambda 与 [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) 结合使用** https://docs.aws.amazon.com/zh_cn/lambda/latest/dg/with-kinesis.html#services-kinesis-params?trk=cndc-detail 其中 `DestinationConfig` 的 `Destination`,可以设置成 SQS/SNS。示例中采用 SNS。 `Stream` 设置为目标 Kinesis 的 ARN。 # 网络 如果 Lambda 和 MSK 不在同一个 VPC 内(包括跨 region 的情况),2个 VPC 之间需要做 Peering,Lambda 才能访问 MSK。VPC Peering 创建方法参考:**创建 VPC 对等连接** https://docs.aws.amazon.com/zh_cn/vpc/latest/peering/create-vpc-peering-connection.html?trk=cndc-detail # 安全性 ## 安全组设置 **同 region 访问:** MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的安全组; Lambda 设置出站规则为,允许 9092-9093 端口访问,目标为 MSK 的安全组。 **跨 region 访问**: MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的所在 VPC 的 subnet CDIR。如果 Lambda 部署在多个 subnet 中,需要允许多个 CIDR 访问。 Lambda 设置出站规则为,允许 9092-9093 端口访问,目标为 MSK 的所在 VPC 的 subnet CDIR。 安全组配置方法参考: **使用安全组控制到资源的流量** https://docs.aws.amazon.com/zh_cn/vpc/latest/userguide/VPC_SecurityGroups.html?trk=cndc-detail ## MSK 认证设置 推荐安全策略:MSK 集群使用 IAM Role 进行认证,然后对 Lambda 附加相关的 Policy。参考 **Amazon MSK 如何与 IAM 协同工作:** https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/security_iam_service-with-iam.html?trk=cndc-detail 其他安全策略:如果 MSK 使用其他认证方式,除了用户名密码 /pem 证书外,访问控制也受到安全组规则设置限制。 # 常见问题 ## 问题一 KafkaTimeoutError: Failed to update metadata after 60.0 secs **原因分析**: 目标 Topic 不存在 **可能的解决方案**: 1.修改 MSK 集群的配置,设置 create.topics.enable=true 2.手动创建目标 Kafka 的 Topic ## 问题二 Lambda 执行超时 **原因分析**: Kafka 访问失败,失败原因可能是因为网络无法连通,也可能是问题一引起的超时。 **可能的解决方案**: 1.检查 Lambda 和 MSK 所在的 VPC,是否为同一 VPC 或者2个 VPC 之间是否已经 Peering。VPC Peering 创建方法参考:**创建 VPC 对等连接** https://docs.aws.amazon.com/zh_cn/vpc/latest/peering/create-vpc-peering-connection.html?trk=cndc-detail 2.检查 Lambda 使用的安全组,确保在出站规则中,允许 9092-9093 端口,目标为 MSK 使用的安全组。 3.检查 MSK 使用的安全组,确保在入站规则中,允许 9092-9093 端口,源包含 Lambda 使用的安全组。 4.如果 MSK 使用用户名密码方式访问,请确保用户名密码正确。 # 其他优化 在数据量足够大的情况下,需要调节 BatchSize,加大 ParallelizationFactor。在调整参数后需要测试实际数据传输的时间,并且调整 Lambda 执行的超时时间。 ### 本篇作者 ![微信图片_20230310104428.jpg](https://dev-media.amazoncloud.cn/5b1778ddf6fa4e079f992f52fc825117_%E5%BE%AE%E4%BF%A1%E5%9B%BE%E7%89%87_20230310104428.jpg "微信图片_20230310104428.jpg") **林业** Amazon 解决方案架构师,负责基于 Amazon 的云计算方案的咨询与架构设计。拥有超过14年研发经验,曾打造千万级用户 APP,多项 Github 开源项目贡献者。在游戏、IoT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。 ![微信图片_20230310104433.jpg](https://dev-media.amazoncloud.cn/c848e5b555da4c3b9cc768fce8a1a0d5_%E5%BE%AE%E4%BF%A1%E5%9B%BE%E7%89%87_20230310104433.jpg "微信图片_20230310104433.jpg") **Dora Gui** Amazon 技术客户经理,主要支持游戏,互联网行业客户的架构优化、成本管理、技术咨询等工作,并专注在 IAAS,大数据和容器等方向的技术选型,方案落地和实践。在加入 Amazon 之前,曾就职于 EMC 和微软,腾讯等科技公司,拥有近10年虚拟化与公有云领域的架构优化和技术支持经验。
0
目录
关闭