# 背景
在混合云架构中,用户的一些应用原本运行在自建的数据中心。这些应用程序统一从 Apache Kafka 中拉取实时数据做分析和处理,例如监控系统、大数据分析平台等。由于业务发展需要,用户将这些工作负载部分迁移到了 Amazon 上,或者在 Amazon 上构建新的应用。
由于 Amazon 部分服务仅支持以 [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Stream 方式输出日志或数据,例如 [Amazon Pinpoint](https://aws.amazon.com/cn/pinpoint/?trk=cndc-detail),因此 Kinesis Data Stream 数据投递一般使用 Kinesis Firehose 或者自有应用拉取。而 Kinesis Firehose 的数据投递目标并不支持 Kafka,因此需要用户修改应用拉取数据的方式。如果这个应用需要同时运行数据中心和 Amazon 云,用户则需要维护支持2个接口的应用;或者假如这些应用的数量非常多,用户改造和测试它需要大量的人力和时间的时候,我们会希望通过一种无需运维管理的方式,能自动将 Kinesis Data Stream 中的数据导入到 Kafka 的方案。这样所有的应用无需修改,便能平滑迁移到 Amazon 上,同时运维人力成本上也基本不会增加。
Amazon Lambda 是一项[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)事件驱动型计算服务。利用 Amazon Lambda 的[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)特性,我们可以做到无论需要对接多少个 Kinesis Data Stream 都可以支持。每个 Kinesis Data Stream 需要接收多大的数据流量,我们都能自动扩展,而底层资源无需运维管理。
# 解决方案
## 架构图
![1b96ecabce20a1fd5f745fb4efdef7d1.jpg](https://dev-media.amazoncloud.cn/892c9250d6064af1ab963e42d95d17a1_1b96ecabce20a1fd5f745fb4efdef7d1.jpg "1b96ecabce20a1fd5f745fb4efdef7d1.jpg")
## 架构说明
本方案完全采用 Serverless 架构,主要通过 Lambda 实现 Kinesis 和 Amazon MSK 数据相互传输到目的。当 Lambda 执行失败的时候,会触发 [Amazon SNS](https://aws.amazon.com/cn/sns/?trk=cndc-detail)(可替换为 [Amazon SQS](https://aws.amazon.com/cn/sqs/?trk=cndc-detail))以发送通知信息。
## 一键部署代码
参考 **Github 开源代码**:
https://github.com/yourlin/Kinesis2MSK?trk=cndc-detail
## 部署方式
先安装 SAM-CLI,具体步骤参考:
**安装 Amazon SAMLinux 上的 CLI – Amazon Serverless Application Model**
https://docs.aws.amazon.com/zh_cn/serverless-application-model/latest/developerguide/serverless-sam-cli-install-linux.html?trk=cndc-detail
然后下载代码修改参数后部署。
```js
git clone https://github.com/yourlin/Kinesis2MSK
cd Kinesis2MSK
sam build
sam deploy --guided
```
左滑查看更多
# 关键参数
主要配置文件 `template.yaml`,MSK 配置在 `kinesis_to_msk/config.py` 中。
![58be1987eff2b04c692ee89365266a1b.jpg](https://dev-media.amazoncloud.cn/f91bf949a45a482ea7e88e18a99d6c98_58be1987eff2b04c692ee89365266a1b.jpg "58be1987eff2b04c692ee89365266a1b.jpg")
## 环境配置说明
全局配置:
```js
Globals:
Function:
Timeout: 5
MemorySize: 128
```
函数超时时间默认为:5秒。
函数内存:128MB。如果加大每次批处理的数据量可适当提高内存量。
**VPC 配置**
```js
VpcConfig:
SecurityGroupIds:
- sg-087f1d9e26d9140ad # 该安全组,是指定的 VPC 内提前创建好的
SubnetIds:
- subnet-0eafba9fee295d045 # 提前创建好的子网 1
- subnet-093cb4ccabff0d3d5 # 提前创建好的子网 2
- subnet-08c55a13b324420b8 # 提前创建好的子网 3
```
左滑查看更多
注意:除非 MSK 配置为公网可访问,否则必须将 Lambda 配置在和 MSK 同一个 VPC 或者跟 MSK 所在 VPC 已经打通的其他 VPC 内,否则会 Lambda 无法正常连接 MSK,引起连接超时。
安全组和子网推荐使用**预定义**好的,子网至少选择2个,保证其高可用。
**Kinesis 源配置**
```js
Events:
Kinesis2MSK:
Type: Kinesis # More info about API Event Source: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api
Properties:
Stream: arn:aws:kinesis:us-east-1:784675006790:stream/poc-kinesis # Create kinesis stream before run it
StartingPosition: TRIM_HORIZON
BatchSize: 10
MaximumBatchingWindowInSeconds: 10
Enabled: true
ParallelizationFactor: 8
MaximumRetryAttempts: 100
BisectBatchOnFunctionError: true
MaximumRecordAgeInSeconds: 604800
DestinationConfig:
OnFailure:
Type: SNS
Destination: arn:aws:sns:us-east-1:784675006790:email # Create SNS Topic before run it
TumblingWindowInSeconds: 0
FunctionResponseTypes:
- ReportBatchItemFailures
```
左滑查看更多
详细参数说明参考文档:
**将 Amazon Lambda 与 [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) 结合使用**
https://docs.aws.amazon.com/zh_cn/lambda/latest/dg/with-kinesis.html#services-kinesis-params?trk=cndc-detail
其中 `DestinationConfig` 的 `Destination`,可以设置成 SQS/SNS。示例中采用 SNS。
`Stream` 设置为目标 Kinesis 的 ARN。
# 网络
如果 Lambda 和 MSK 不在同一个 VPC 内(包括跨 region 的情况),2个 VPC 之间需要做 Peering,Lambda 才能访问 MSK。VPC Peering 创建方法参考:**创建 VPC 对等连接**
https://docs.aws.amazon.com/zh_cn/vpc/latest/peering/create-vpc-peering-connection.html?trk=cndc-detail
# 安全性
## 安全组设置
**同 region 访问:**
MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的安全组;
Lambda 设置出站规则为,允许 9092-9093 端口访问,目标为 MSK 的安全组。
**跨 region 访问**:
MSK 设置入站规则为,允许 9092-9093 端口访问,源为 Lambda 的所在 VPC 的 subnet CDIR。如果 Lambda 部署在多个 subnet 中,需要允许多个 CIDR 访问。
Lambda 设置出站规则为,允许 9092-9093 端口访问,目标为 MSK 的所在 VPC 的 subnet CDIR。
安全组配置方法参考:
**使用安全组控制到资源的流量**
https://docs.aws.amazon.com/zh_cn/vpc/latest/userguide/VPC_SecurityGroups.html?trk=cndc-detail
## MSK 认证设置
推荐安全策略:MSK 集群使用 IAM Role 进行认证,然后对 Lambda 附加相关的 Policy。参考 **Amazon MSK 如何与 IAM 协同工作:**
https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/security_iam_service-with-iam.html?trk=cndc-detail
其他安全策略:如果 MSK 使用其他认证方式,除了用户名密码 /pem 证书外,访问控制也受到安全组规则设置限制。
# 常见问题
## 问题一
KafkaTimeoutError: Failed to update metadata after 60.0 secs
**原因分析**:
目标 Topic 不存在
**可能的解决方案**:
1.修改 MSK 集群的配置,设置 create.topics.enable=true
2.手动创建目标 Kafka 的 Topic
## 问题二
Lambda 执行超时
**原因分析**:
Kafka 访问失败,失败原因可能是因为网络无法连通,也可能是问题一引起的超时。
**可能的解决方案**:
1.检查 Lambda 和 MSK 所在的 VPC,是否为同一 VPC 或者2个 VPC 之间是否已经 Peering。VPC Peering 创建方法参考:**创建 VPC 对等连接**
https://docs.aws.amazon.com/zh_cn/vpc/latest/peering/create-vpc-peering-connection.html?trk=cndc-detail
2.检查 Lambda 使用的安全组,确保在出站规则中,允许 9092-9093 端口,目标为 MSK 使用的安全组。
3.检查 MSK 使用的安全组,确保在入站规则中,允许 9092-9093 端口,源包含 Lambda 使用的安全组。
4.如果 MSK 使用用户名密码方式访问,请确保用户名密码正确。
# 其他优化
在数据量足够大的情况下,需要调节 BatchSize,加大 ParallelizationFactor。在调整参数后需要测试实际数据传输的时间,并且调整 Lambda 执行的超时时间。
### 本篇作者
![微信图片_20230310104428.jpg](https://dev-media.amazoncloud.cn/5b1778ddf6fa4e079f992f52fc825117_%E5%BE%AE%E4%BF%A1%E5%9B%BE%E7%89%87_20230310104428.jpg "微信图片_20230310104428.jpg")
**林业**
Amazon 解决方案架构师,负责基于 Amazon 的云计算方案的咨询与架构设计。拥有超过14年研发经验,曾打造千万级用户 APP,多项 Github 开源项目贡献者。在游戏、IoT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。
![微信图片_20230310104433.jpg](https://dev-media.amazoncloud.cn/c848e5b555da4c3b9cc768fce8a1a0d5_%E5%BE%AE%E4%BF%A1%E5%9B%BE%E7%89%87_20230310104433.jpg "微信图片_20230310104433.jpg")
**Dora Gui**
Amazon 技术客户经理,主要支持游戏,互联网行业客户的架构优化、成本管理、技术咨询等工作,并专注在 IAAS,大数据和容器等方向的技术选型,方案落地和实践。在加入 Amazon 之前,曾就职于 EMC 和微软,腾讯等科技公司,拥有近10年虚拟化与公有云领域的架构优化和技术支持经验。