将 Amazon DocumentDB(与 MongoDB 兼容)数据归档到 Amazon S3

大数据
MongoDB
Amazon Simple Storage Service (S3)
Amazon DocumentDB
云存储服务
0
0
<!--StartFragment--> 在本文中,我们将向您展示如何将存储在 [Amazon DocumentDB(与 MongoDB 兼容)](https://aws.amazon.com/documentdb/?trk=cndc-detail)中的较旧、访问频率较低的文档集合归档到 [Amazon Simple Storage Service (Amazon S3)](https://aws.amazon.com/s3/?trk=cndc-detail)。[Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 是一种快速、可扩展、高度可用且完全托管式文档数据库服务,它支持 MongoDB 工作负载。[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 提供高度耐用、经济高效的归档目标,便于您通过使用标准 SQL 的 [Amazon Athena](http://aws.amazon.com/athena?trk=cndc-detail) 进行查询。您可以使用 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 和 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 针对归档使用案例创建经济高效的 JSON 存储层次结构,以实现以下目标: * 根据政策、适用的法律法规,支持企业对延长保留期的合规性要求 * 以较低的成本长期存储文档,以满足不常见的使用案例要求 * 将 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 专用于运营数据,同时在 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 中维护 JSON 集合以用于分析目的 * 满足 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 中当前最大 64 TiB 数据库和 32 TiB 集合大小[限制](https://docs.aws.amazon.com/documentdb/latest/developerguide/limits.html#limits-cluster?trk=cndc-detail)以外的容量需求 通常,较旧的文档集合访问频率较低,且不需要 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 更高的性能特性。因此,较旧的文档很适合归档到成本较低的 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 中。本文介绍了一种使用 tweet 数据的解决方案,它将文档更新存储在 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 中,同时将文档更改流式传输到 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)。为了保持或减少集合大小,最佳做法是使用滚动集合方法删除较早的集合。相关详情,请参阅[使用滚动集合优化 Amazon DocumentDB 中的数据归档成本](https://aws.amazon.com/blogs/database/optimize-data-archival-costs-in-amazon-documentdb-using-rolling-collections/?trk=cndc-detail)。 ### **解决方案概述** 要将 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 数据归档到 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail),我们使用 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) [更改流](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html?trk=cndc-detail)功能。更改流提供集群集合中发生的更改事件序列(按时间顺序排列)。应用程序可以使用更改流订阅单个集合或数据库的数据更改。 在该解决方案中,我们使用 [Amazon Secrets Manager](https://aws.amazon.com/secrets-manager/?trk=cndc-detail),提供对 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 凭据、集群端点和端口号的安全访问。我们还使用按计划运行的 [Amazon EventBridge](https://aws.amazon.com/eventbridge/?trk=cndc-detail) 规则来触发 [Amazon Lambda](https://aws.amazon.com/lambda/?trk=cndc-detail) 函数,以将文档更改写入 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)。EventBridge 是一种[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)事件总线,使用从应用程序生成的事件,可以轻松地大规模构建事件驱动型应用程序。Lambda 是一种[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)事件驱动型计算服务,让您能够为几乎任何类型的应用程序或后端服务运行代码,而无需预置或管理服务器。下图展示了该解决方案架构。 ![DBBLOG-2289-image001.png](https://dev-media.amazoncloud.cn/c1c185dc85514e65a76b11713c0fa86f_DBBLOG-2289-image001.png "DBBLOG-2289-image001.png") ### **将 Amazon DocumentDB 更改流写入 Amazon S3** 我们使用 Lambda 轮询更改流事件并将文档写入 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)。[GitHub](https://github.com/aws-samples/amazon-documentdb-samples/tree/master/samples/change-streams?trk=cndc-detail) 上提供 Lambda 函数。此外,[Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) [研讨会](https://catalog.us-east-1.prod.workshops.aws/workshops/464d6c17-9faa-4fef-ac9f-dd49610174d3/en-US/change-streams?trk=cndc-detail)可供您试用该解决方案。 Lambda 函数为无状态函数,运行时持续时间有限。由于这些特性,该解决方案要求 EventBridge 安排 Lambda 函数以定义的频率(在本例中为 1 分钟)运行,以确保对 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 更改流事件进行持续轮询。Lambda 函数连接到 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail),并在预定义的 15 秒时间段内监视是否发生更改。在每个轮询周期结束时,该函数将最后轮询的恢复令牌写入不同的集合,以供后续检索。恢复令牌是一种更改流功能,它使用的令牌等于上次检索的更改事件文档的 \_id 字段。在 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 中,每个文档都需要一个唯一的 \_id 字段作为主键。恢复令牌用作下一次 Lambda 函数调用的更改流检查点机制,以便从上一个函数停止的地方恢复新文档的轮询活动。更改流事件在集群上发生时按顺序排列,默认情况下会在事件记录后存储 3 小时。 对于要归档现有数据的集合,在启用更改流之前,您可以使用 [mongoexport](https://docs.aws.amazon.com/documentdb/latest/developerguide/backup_restore-dump_restore_import_export_data.html#backup_restore-dump_restore_import_export_data-mongoexport?trk=cndc-detail) 等实用程序,将您的集合以 JSON 格式复制到 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)。mongoexport 工具可创建数据的时间点快照。然后,您可以使用 resumeAfter 更改流选项,并在导出完成时记录恢复令牌。大致步骤如下所示: 1. 使用 mongoexport 将要归档的集合导出到 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)。 2. 记录时间戳和上次更新的 \_id。 3. 插入 canary 文档,该文档可用作更改流监视文档更新的起点(我们在下面提供了代码块示例)。 4. 使用 startAtOperationTime 或 resumeAfter 命令在集合上启用更改流。 如果使用的是 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 4.0+ 版本,则您可以使用更改流的 startAtOperationTime 命令,无需插入 canary 记录(第 3 步)。使用 startAtOperationTime 时,更改流游标仅返回在指定时间戳或之后发生的更改。有关使用 startAtOperationTime 命令的示例代码,请参阅[通过 startAtOperationTime 恢复更改流](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-startAtOperation?trk=cndc-detail)。 您可以使用 [change_stream_log_retention_duration](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-modifying_log_retention?trk=cndc-detail) 参数,将更改流保留期配置为将更改的文档存储长达 7 天。执行导出操作时,更改流保留期必须足够长,以确保存储从第 1 步中开始导出到第 4 步中完成导出后启用更改流的所有文档更改。 ### **Lambda 代码演示** 本部分中介绍的 Lambda Python 代码示例可在 [GitHub](https://github.com/aws-samples/amazon-documentdb-samples/blob/master/samples/change-streams/app/lambda_function.py?trk=cndc-detail) 上找到。Lambda 函数使用环境变量配置数据库以监视更改事件、用于归档数据的 S3 存储桶、[Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 集群端点以及其他一些可配置的变量,如以下屏幕截图所示。 代码中的 Lambda 处理程序函数使用 PRIMARY 读取首选项建立与 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 集群的连接,并连接到在环境变量 WATCHED_DB_NAME 中配置的数据库。只有连接到 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 集群的主实例(在撰写本文时)才支持更改流。然后,Lambda 处理程序函数检索最后处理过的 \_id,以用作下一次 Lambda 调用的恢复令牌,并将其存储在由 STATE_DB 和 STATE_COLLECTION 环境变量识别的单独数据库和集合中。 ![DBBLOG-2289-image004.png](https://dev-media.amazoncloud.cn/21331650336a48beaf9ead675484ddab_DBBLOG-2289-image004.png "DBBLOG-2289-image004.png") 接下来,我们来讨论一些关键的 Python 代码块。 以下代码是 get_last_processed_id 函数,它存储与上次成功处理的变更事件相对应的恢复令牌: ```bash def get_last_processed_id(): last_processed_id = None try: state_collection = get_state_collection_client() if "WATCHED_COLLECTION_NAME" in os.environ: state_doc = state_collection.find_one({'currentState': True, 'dbWatched': str(os.environ['WATCHED_DB_NAME']), 'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'db_level': False}) else: state_doc = state_collection.find_one({'currentState': True, 'db_level': True, 'dbWatched': str(os.environ['WATCHED_DB_NAME'])}) if state_doc is not None: if 'lastProcessed' in state_doc: last_processed_id = state_doc['lastProcessed'] else: if "WATCHED_COLLECTION_NAME" in os.environ: state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']), 'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'currentState': True, 'db_level': False}) else: state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']), 'currentState': True, 'db_level': True}) except Exception as ex: logger.error('Failed to return last processed id: {}'.format(ex)) raise return last_processed_id ``` Lambda 处理程序函数监视更改流中是否存在任何更改事件,并调用 get_last_processed_id 函数: ```bash with watcher.watch(full_document='updateLookup', resume_after=last_processed_id) as change_stream: ``` 启用更改流后首次触发 Lambda 函数时,将 last_processed_id 设置为 None。要激活更改流并开始捕获更改事件,需要插入并删除一条 canary 记录,以用作虚拟记录来开始捕获更改事件: ```bash if last_processed_id is None: canary_record = insertCanary() deleteCanary() ``` 更改在当前调用的循环中流式传输 1 分钟,或者直到每次调用要处理的文档数量达到为止: ```bash while change_stream.alive and i < int(os.environ['Documents_per_run']): i += 1 change_event = change_stream.try_next() ``` change_event 变量包含一个操作类型,用于指示事件是否对应于插入、更新或删除事件。所有事件都包含 \_id。插入和更新事件也包括文档正文。change_event 变量的内容用于创建包含文档 ID、正文和上次更新时间戳的负载。然后,此负载将写入由 BUCKET_NAME 环境变量指示的 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 存储桶中。 ```bash if op_type in ['insert', 'update']: print('In insert optype') doc_body = change_event['fullDocument'] doc_id = str(doc_body.pop("_id", None)) readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat() doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)}) payload = {'_id':doc_id} payload.update(doc_body) if "BUCKET_NAME" in os.environ: put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id) ``` 对于删除操作,文档 ID 和上次更新的时间戳存储在 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 中: ```bash if op_type == 'delete': doc_id = str(change_event['documentKey']['_id']) readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat() payload = {'_id':doc_id} payload.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)}) if "BUCKET_NAME" in os.environ: put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id) ``` 最后,如果您想要识别已删除的文档并查看文档修订,可以使用文档 ID 通过 Athena 查询 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)。若要了解详情,请访问研讨会[通过 Amazon DocumentDB 更改流归档数据](https://catalog.us-east-1.prod.workshops.aws/workshops/464d6c17-9faa-4fef-ac9f-dd49610174d3/en-US/change-streams?trk=cndc-detail)。 ### **结论** 在本文中,我们提供了将 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 文档归档到 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 的使用案例,以及指向 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 研讨会的链接,供您试用该解决方案。我们还提供了指向解决方案核心的 Lambda 函数的链接,并介绍了一些关键代码部分,便于您更好地理解。 您有需要跟进的问题或反馈吗? 发表评论。要开始使用 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail),请参阅[开发人员指南](https://docs.aws.amazon.com/documentdb/latest/developerguide/get-started-guide.html?trk=cndc-detail)。 <!--EndFragment-->
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭