Archive data from Amazon DocumentDB (with MongoDB compatibility) to Amazon S3

海外精选
海外精选的内容汇集了全球优质的亚马逊云科技相关技术内容。同时,内容中提到的“AWS” 是 “Amazon Web Services” 的缩写,在此网站不作为商标展示。
0
0
{"value":"In this post, we show you how to archive older, less frequently accessed document collections stored in [Amazon DocumentDB (with MongoDB compatibility)](https://aws.amazon.com/documentdb/) to [Amazon Simple Storage Service (Amazon S3)](https://aws.amazon.com/s3/). [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) is a fast, scalable, highly available, and fully managed document database service that supports MongoDB workloads. [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) provides a highly durable, cost-effective archive destination that you can query using [Amazon Athena](http://aws.amazon.com/athena) using standard SQL. You can use [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) and [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) to create a cost-effective JSON storage hierarchy for archival use cases to do the following:\n\n- Support an organization’s compliance requirements in accordance with policies, applicable laws, and regulations for extended retention periods\n- Store documents long term at lower cost for infrequent use case requirements\n- Dedicate [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) for operational data while maintaining JSON collections in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) for analytical purposes\n- Address capacity needs beyond the current maximum 64 TiB database and 32 TiB collection size [limits](https://docs.aws.amazon.com/documentdb/latest/developerguide/limits.html#limits-cluster) in [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail)\n\nIn general, older document collections are less frequently accessed and don’t require the higher performance characteristics of [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail). This makes older documents good candidates for archiving to lower-cost [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). This post describes a solution using tweet data that stores document updates in [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) while simultaneously streaming the document changes to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). To maintain or reduce collection sizes, a best practice is to use a rolling collections methodology to drop older collections. For more information, refer to [Optimize data archival costs in Amazon DocumentDB using rolling collections.](https://aws.amazon.com/blogs/database/optimize-data-archival-costs-in-amazon-documentdb-using-rolling-collections/)\n\n\n### **Solution overview**\n\nTo archive [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) data to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), we use the [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) [change streams](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html) feature. Change streams provide a time-ordered sequence of change events as they occur within your cluster’s collections. Applications can use change streams to subscribe to data changes on individual collections or databases.\n\nIn this solution, we use [Amazon Web Services Secrets Manager](https://aws.amazon.com/secrets-manager/) to provide secure access to [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) credentials, cluster endpoint, and port number. We also use an [Amazon EventBridge](https://aws.amazon.com/eventbridge/) rule running on a schedule to trigger an [Amazon Web Services Lambda](https://aws.amazon.com/lambda/) function to write the document changes to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). EventBridge is a serverless event bus that makes it easy to build event-driven applications at scale using events generated from your applications. Lambda is a serverless, event-driven compute service that lets you run code for virtually any type of application or backend service without provisioning or managing servers. The following diagram illustrates the architecture for this solution.\n\n![image.png](https://dev-media.amazoncloud.cn/d99286025f0841d7ad4ad93778698d63_image.png)\n\n### **Write [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) change streams to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)**\n\nWe use Lambda to poll the change stream events and write the documents to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). The Lambda function is available on [GitHub](https://github.com/aws-samples/amazon-documentdb-samples/tree/master/samples/change-streams). Additionally, an [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) [workshop](https://catalog.us-east-1.prod.workshops.aws/workshops/464d6c17-9faa-4fef-ac9f-dd49610174d3/en-US/change-streams) is available for you to try the solution.\n\nLambda functions are stateless and have limited runtime durations. Because of those characteristics, the solution requires EventBridge to schedule a Lambda function to run at a defined frequency (1 minute in this example) to ensure continuous polling of the [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) change stream events. The Lambda function connects to [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) and watches for changes for a predefined time period of 15 seconds. At the end of each poll cycle, the function writes a last polled resume token to a different collection for subsequent retrieval. A resume token is a change streams feature that uses a token equal to the ```_id```field of the last retrieved change event document. In [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail), each document requires a unique ```_id```field that acts as a primary key. The resume token is used as the change stream checkpoint mechanism for the next Lambda function invocation to resume polling activity of new documents from where the previous function left off. Change streams events are ordered as they occur on the cluster and are stored for 3 hours by default after the event has been recorded.\n\nFor collections where you intend to archive existing data, before enabling change streams, you may use a utility like [mongoexport](https://docs.aws.amazon.com/documentdb/latest/developerguide/backup_restore-dump_restore_import_export_data.html#backup_restore-dump_restore_import_export_data-mongoexport) to copy your collections to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) in JSON format. The mongoexport tool creates a point in time snapshot of the data. You can then use the ```resumeAfter```change stream option with a resume token recorded when the export completed. The high-level steps are as follows:\n\n1. Export the collections to be archived to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) using mongoexport.\n2. Record the timestamp and last updated ```_id```.\n3. Insert a canary document that can be used as starting point from which change streams watch for document updates (we provide a code block example below).\n4. Enable change streams on the collection using either the ```startAtOperationTime```or ```resumeAfter```command.\n\nIf using [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 4.0+ versions, you can use the change stream’s ```startAtOperationTime```command and remove the need to insert a canary record (step 3). When using ```startAtOperationTime```, the change stream cursor only returns changes that occurred at or after the specified timestamp. For sample code for using the ```startAtOperationTime```command, refer to [Resuming a Change Stream with startAtOperationTime.](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-startAtOperation)\n\nYou can configure the change stream retention period to store changed documents for periods up to 7 days using the [change_stream_log_retention_duration](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-modifying_log_retention) parameter. When performing the export operation, the change stream retention period must be long enough to ensure storage of all document changes from the time the export began in step 1, until change streams are enabled after completion of the export in step 4.\n\n### **Lambda code walkthrough**\n\nThe Lambda Python code example described in this section is available on GitHub. The Lambda function uses environment variables to configure the database to watch for change events, the S3 bucket to archive data, the [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) cluster endpoint, and a few other configurable variables, as shown in the following screenshot.\n\nThe Lambda handler function in the code establishes a connection to the [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) cluster using a ```PRIMARY```read preference and connects to the database configured in the environment variable ```WATCHED_DB_NAME```. Change streams are only supported with connections to the primary instance of an [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) cluster (at the time of this writing). The Lambda handler function then retrieves the last processed ```_id ```to use as a resume token for the next Lambda invocation and stores it in a separate database and collection identified by the ```STATE_DB```and ```STATE_COLLECTION```environment variables.\n\n![image.png](https://dev-media.amazoncloud.cn/f9634008c939420695f2b346cb8374ff_image.png)\n\nNext, let’s discuss some key Python code blocks.\n\nThe following code is the ```get_last_processed_id```function that stores the resume token corresponding to the last successfully processed change event:\n\n```\\ndef get_last_processed_id():\\n last_processed_id = None\\n try:\\n state_collection = get_state_collection_client()\\n if \\"WATCHED_COLLECTION_NAME\\" in os.environ:\\n state_doc = state_collection.find_one({'currentState': True, 'dbWatched': str(os.environ['WATCHED_DB_NAME']),\\n 'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'db_level': False})\\n else:\\n state_doc = state_collection.find_one({'currentState': True, 'db_level': True,\\n 'dbWatched': str(os.environ['WATCHED_DB_NAME'])})\\n\\n if state_doc is not None:\\n if 'lastProcessed' in state_doc:\\n last_processed_id = state_doc['lastProcessed']\\n else:\\n if \\"WATCHED_COLLECTION_NAME\\" in os.environ:\\n state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']),\\n 'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'currentState': True, 'db_level': False})\\n else:\\n state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']), 'currentState': True,\\n 'db_level': True})\\n\\n except Exception as ex:\\n logger.error('Failed to return last processed id: {}'.format(ex))\\n raise\\n return last_processed_id\\n```\n\nThe Lambda handler function watches the change stream for any change events and calls the ```get_last_processed_id```function:\n\n```\\nwith watcher.watch(full_document='updateLookup', resume_after=last_processed_id) as change_stream:\\n```\n\nWhen the Lambda function is triggered for the first time after enabling the change streams, the ```last_processed_id```is set to ```None```. To activate the change streams and start capturing the change events, a canary record is inserted and deleted to act as a dummy record to start capturing the change events:\n\n```\\nif last_processed_id is None:\\n canary_record = insertCanary()\\n deleteCanary()\\n```\nThe changes are streamed in a loop for the current invocation for 1 minute or until the number of documents to process for each invocation is met:\n\n```\\nwhile change_stream.alive and i < int(os.environ['Documents_per_run']):\\n i += 1\\n change_event = change_stream.try_next()\\n```\nThe ```change_event```variable contains an operation type to indicate if an event corresponds to an insert, update, or delete event. All events contain the ```_id```. Insert and update events include the document body as well. The content of the ```change_event```variable is used to create a payload containing the document ID, body, and last updated timestamp. This payload is then written to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), into a bucket indicated by the ```BUCKET_NAME```environment variable.\n\n```\\nif op_type in ['insert', 'update']:\\n print('In insert optype')\\n doc_body = change_event['fullDocument']\\n doc_id = str(doc_body.pop(\\"_id\\", None))\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n payload = {'_id':doc_id}\\n payload.update(doc_body)\\n if \\"BUCKET_NAME\\" in os.environ:\\n put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id)\\n```\nFor the delete operation, the document ID and last updated timestamp are stored in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail):\n\n```\\nif op_type == 'delete':\\n doc_id = str(change_event['documentKey']['_id'])\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n payload = {'_id':doc_id}\\n payload.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n if \\"BUCKET_NAME\\" in os.environ:\\n put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id)\\n\\n```\nFinally, if you want to identify documents that have been deleted and view document revisions, you can use the document ID to query [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) using Athena. Visit the workshop [Archiving data with Amazon DocumentDB change streams](https://catalog.us-east-1.prod.workshops.aws/workshops/464d6c17-9faa-4fef-ac9f-dd49610174d3/en-US/change-streams) for more information.\n\n### **Conclusion**\n\nIn this post, we provided use cases for archiving [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) documents to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), along with a link to an [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) workshop for you to try the solution. We also provided a link to the Lambda function that is central to the solution, and walked through some of the critical code sections for better understanding.\n\nDo you have follow-up questions or feedback? Leave a comment. To get started with [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail), refer to the [Developer Guide](https://docs.aws.amazon.com/documentdb/latest/developerguide/get-started-guide.html).\n\n#### **About the authors**\n\n![image.png](https://dev-media.amazoncloud.cn/5e6dad25932c4aee8edb74bc22f85157_image.png)\n\n**Mark Mulligan** is a Senior Database Specialist Solutions Architect at Amazon Web Services. He enjoys helping customers adopt Amazon’s purpose-built databases, both NoSQL and Relational to address business requirements and maximize return on investment. He started his career as a customer in roles including mainframe Systems Programmer and UNIX/Linux Systems Administrator providing him with customer’s perspective for requirements in the areas of cost, performance, operational excellence, security, reliability, and sustainability.\n\n![image.png](https://dev-media.amazoncloud.cn/d16e8c1b67054099894ac6a1e98bea80_image.png)\n\n**Karthik Vijayraghavan** is a Senior DocumentDB Specialist Solutions Architect at Amazon Web Services. He has been helping customers modernize their applications using NoSQL databases. He enjoys solving customer problems and is passionate about providing cost effective solutions that performs at scale. Karthik started his career as a developer building web and REST services with strong focus on integration with relational databases and hence can relate to customers that are in the process of migration to NoSQL.","render":"<p>In this post, we show you how to archive older, less frequently accessed document collections stored in <a href=\\"https://aws.amazon.com/documentdb/\\" target=\\"_blank\\">Amazon DocumentDB (with MongoDB compatibility)</a> to <a href=\\"https://aws.amazon.com/s3/\\" target=\\"_blank\\">Amazon Simple Storage Service (Amazon S3)</a>. [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) is a fast, scalable, highly available, and fully managed document database service that supports MongoDB workloads. [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) provides a highly durable, cost-effective archive destination that you can query using <a href=\\"http://aws.amazon.com/athena\\" target=\\"_blank\\">Amazon Athena</a> using standard SQL. You can use [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) and [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) to create a cost-effective JSON storage hierarchy for archival use cases to do the following:</p>\\n<ul>\\n<li>Support an organization’s compliance requirements in accordance with policies, applicable laws, and regulations for extended retention periods</li>\n<li>Store documents long term at lower cost for infrequent use case requirements</li>\n<li>Dedicate Amazon DocumentDB for operational data while maintaining JSON collections in Amazon S3 for analytical purposes</li>\n<li>Address capacity needs beyond the current maximum 64 TiB database and 32 TiB collection size <a href=\\"https://docs.aws.amazon.com/documentdb/latest/developerguide/limits.html#limits-cluster\\" target=\\"_blank\\">limits</a> in [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail)</li>\\n</ul>\n<p>In general, older document collections are less frequently accessed and don’t require the higher performance characteristics of Amazon DocumentDB. This makes older documents good candidates for archiving to lower-cost Amazon S3. This post describes a solution using tweet data that stores document updates in Amazon DocumentDB while simultaneously streaming the document changes to Amazon S3. To maintain or reduce collection sizes, a best practice is to use a rolling collections methodology to drop older collections. For more information, refer to <a href=\\"https://aws.amazon.com/blogs/database/optimize-data-archival-costs-in-amazon-documentdb-using-rolling-collections/\\" target=\\"_blank\\">Optimize data archival costs in Amazon DocumentDB using rolling collections.</a></p>\\n<h3><a id=\\"Solution_overview_10\\"></a><strong>Solution overview</strong></h3>\\n<p>To archive Amazon DocumentDB data to Amazon S3, we use the Amazon DocumentDB <a href=\\"https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html\\" target=\\"_blank\\">change streams</a> feature. Change streams provide a time-ordered sequence of change events as they occur within your cluster’s collections. Applications can use change streams to subscribe to data changes on individual collections or databases.</p>\\n<p>In this solution, we use <a href=\\"https://aws.amazon.com/secrets-manager/\\" target=\\"_blank\\">Amazon Web Services Secrets Manager</a> to provide secure access to [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) credentials, cluster endpoint, and port number. We also use an <a href=\\"https://aws.amazon.com/eventbridge/\\" target=\\"_blank\\">Amazon EventBridge</a> rule running on a schedule to trigger an <a href=\\"https://aws.amazon.com/lambda/\\" target=\\"_blank\\">Amazon Web Services Lambda</a> function to write the document changes to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). EventBridge is a serverless event bus that makes it easy to build event-driven applications at scale using events generated from your applications. Lambda is a serverless, event-driven compute service that lets you run code for virtually any type of application or backend service without provisioning or managing servers. The following diagram illustrates the architecture for this solution.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/d99286025f0841d7ad4ad93778698d63_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"Write_Amazon_DocumentDB_change_streams_to_Amazon_S3_18\\"></a><strong>Write Amazon DocumentDB change streams to Amazon S3</strong></h3>\\n<p>We use Lambda to poll the change stream events and write the documents to Amazon S3. The Lambda function is available on <a href=\\"https://github.com/aws-samples/amazon-documentdb-samples/tree/master/samples/change-streams\\" target=\\"_blank\\">GitHub</a>. Additionally, an [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) <a href=\\"https://catalog.us-east-1.prod.workshops.aws/workshops/464d6c17-9faa-4fef-ac9f-dd49610174d3/en-US/change-streams\\" target=\\"_blank\\">workshop</a> is available for you to try the solution.</p>\\n<p>Lambda functions are stateless and have limited runtime durations. Because of those characteristics, the solution requires EventBridge to schedule a Lambda function to run at a defined frequency (1 minute in this example) to ensure continuous polling of the Amazon DocumentDB change stream events. The Lambda function connects to Amazon DocumentDB and watches for changes for a predefined time period of 15 seconds. At the end of each poll cycle, the function writes a last polled resume token to a different collection for subsequent retrieval. A resume token is a change streams feature that uses a token equal to the <code>_id</code>field of the last retrieved change event document. In [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail), each document requires a unique <code>_id</code>field that acts as a primary key. The resume token is used as the change stream checkpoint mechanism for the next Lambda function invocation to resume polling activity of new documents from where the previous function left off. Change streams events are ordered as they occur on the cluster and are stored for 3 hours by default after the event has been recorded.</p>\\n<p>For collections where you intend to archive existing data, before enabling change streams, you may use a utility like <a href=\\"https://docs.aws.amazon.com/documentdb/latest/developerguide/backup_restore-dump_restore_import_export_data.html#backup_restore-dump_restore_import_export_data-mongoexport\\" target=\\"_blank\\">mongoexport</a> to copy your collections to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) in JSON format. The mongoexport tool creates a point in time snapshot of the data. You can then use the <code>resumeAfter</code>change stream option with a resume token recorded when the export completed. The high-level steps are as follows:</p>\\n<ol>\\n<li>Export the collections to be archived to Amazon S3 using mongoexport.</li>\n<li>Record the timestamp and last updated <code>_id</code>.</li>\\n<li>Insert a canary document that can be used as starting point from which change streams watch for document updates (we provide a code block example below).</li>\n<li>Enable change streams on the collection using either the <code>startAtOperationTime</code>or <code>resumeAfter</code>command.</li>\\n</ol>\n<p>If using Amazon DocumentDB 4.0+ versions, you can use the change stream’s <code>startAtOperationTime</code>command and remove the need to insert a canary record (step 3). When using <code>startAtOperationTime</code>, the change stream cursor only returns changes that occurred at or after the specified timestamp. For sample code for using the <code>startAtOperationTime</code>command, refer to <a href=\\"https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-startAtOperation\\" target=\\"_blank\\">Resuming a Change Stream with startAtOperationTime.</a></p>\\n<p>You can configure the change stream retention period to store changed documents for periods up to 7 days using the <a href=\\"https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-modifying_log_retention\\" target=\\"_blank\\">change_stream_log_retention_duration</a> parameter. When performing the export operation, the change stream retention period must be long enough to ensure storage of all document changes from the time the export began in step 1, until change streams are enabled after completion of the export in step 4.</p>\\n<h3><a id=\\"Lambda_code_walkthrough_35\\"></a><strong>Lambda code walkthrough</strong></h3>\\n<p>The Lambda Python code example described in this section is available on GitHub. The Lambda function uses environment variables to configure the database to watch for change events, the S3 bucket to archive data, the Amazon DocumentDB cluster endpoint, and a few other configurable variables, as shown in the following screenshot.</p>\n<p>The Lambda handler function in the code establishes a connection to the Amazon DocumentDB cluster using a <code>PRIMARY</code>read preference and connects to the database configured in the environment variable <code>WATCHED_DB_NAME</code>. Change streams are only supported with connections to the primary instance of an [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) cluster (at the time of this writing). The Lambda handler function then retrieves the last processed <code>_id </code>to use as a resume token for the next Lambda invocation and stores it in a separate database and collection identified by the <code>STATE_DB</code>and <code>STATE_COLLECTION</code>environment variables.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/f9634008c939420695f2b346cb8374ff_image.png\\" alt=\\"image.png\\" /></p>\n<p>Next, let’s discuss some key Python code blocks.</p>\n<p>The following code is the <code>get_last_processed_id</code>function that stores the resume token corresponding to the last successfully processed change event:</p>\\n<pre><code class=\\"lang-\\">def get_last_processed_id():\\n last_processed_id = None\\n try:\\n state_collection = get_state_collection_client()\\n if &quot;WATCHED_COLLECTION_NAME&quot; in os.environ:\\n state_doc = state_collection.find_one({'currentState': True, 'dbWatched': str(os.environ['WATCHED_DB_NAME']),\\n 'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'db_level': False})\\n else:\\n state_doc = state_collection.find_one({'currentState': True, 'db_level': True,\\n 'dbWatched': str(os.environ['WATCHED_DB_NAME'])})\\n\\n if state_doc is not None:\\n if 'lastProcessed' in state_doc:\\n last_processed_id = state_doc['lastProcessed']\\n else:\\n if &quot;WATCHED_COLLECTION_NAME&quot; in os.environ:\\n state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']),\\n 'collectionWatched': str(os.environ['WATCHED_COLLECTION_NAME']), 'currentState': True, 'db_level': False})\\n else:\\n state_collection.insert_one({'dbWatched': str(os.environ['WATCHED_DB_NAME']), 'currentState': True,\\n 'db_level': True})\\n\\n except Exception as ex:\\n logger.error('Failed to return last processed id: {}'.format(ex))\\n raise\\n return last_processed_id\\n</code></pre>\\n<p>The Lambda handler function watches the change stream for any change events and calls the <code>get_last_processed_id</code>function:</p>\\n<pre><code class=\\"lang-\\">with watcher.watch(full_document='updateLookup', resume_after=last_processed_id) as change_stream:\\n</code></pre>\\n<p>When the Lambda function is triggered for the first time after enabling the change streams, the <code>last_processed_id</code>is set to <code>None</code>. To activate the change streams and start capturing the change events, a canary record is inserted and deleted to act as a dummy record to start capturing the change events:</p>\\n<pre><code class=\\"lang-\\">if last_processed_id is None:\\n canary_record = insertCanary()\\n deleteCanary()\\n</code></pre>\\n<p>The changes are streamed in a loop for the current invocation for 1 minute or until the number of documents to process for each invocation is met:</p>\n<pre><code class=\\"lang-\\">while change_stream.alive and i &lt; int(os.environ['Documents_per_run']):\\n i += 1\\n change_event = change_stream.try_next()\\n</code></pre>\\n<p>The <code>change_event</code>variable contains an operation type to indicate if an event corresponds to an insert, update, or delete event. All events contain the <code>_id</code>. Insert and update events include the document body as well. The content of the <code>change_event</code>variable is used to create a payload containing the document ID, body, and last updated timestamp. This payload is then written to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), into a bucket indicated by the <code>BUCKET_NAME</code>environment variable.</p>\\n<pre><code class=\\"lang-\\">if op_type in ['insert', 'update']:\\n print('In insert optype')\\n doc_body = change_event['fullDocument']\\n doc_id = str(doc_body.pop(&quot;_id&quot;, None))\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n payload = {'_id':doc_id}\\n payload.update(doc_body)\\n if &quot;BUCKET_NAME&quot; in os.environ:\\n put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id)\\n</code></pre>\\n<p>For the delete operation, the document ID and last updated timestamp are stored in Amazon S3:</p>\n<pre><code class=\\"lang-\\">if op_type == 'delete':\\n doc_id = str(change_event['documentKey']['_id'])\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n payload = {'_id':doc_id}\\n payload.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n if &quot;BUCKET_NAME&quot; in os.environ:\\n put_s3_event(json_util.dumps(payload), str(change_event['ns']['db']), str(change_event['ns']['coll']),op_id)\\n\\n</code></pre>\\n<p>Finally, if you want to identify documents that have been deleted and view document revisions, you can use the document ID to query Amazon S3 using Athena. Visit the workshop <a href=\\"https://catalog.us-east-1.prod.workshops.aws/workshops/464d6c17-9faa-4fef-ac9f-dd49610174d3/en-US/change-streams\\" target=\\"_blank\\">Archiving data with Amazon DocumentDB change streams</a> for more information.</p>\\n<h3><a id=\\"Conclusion_124\\"></a><strong>Conclusion</strong></h3>\\n<p>In this post, we provided use cases for archiving Amazon DocumentDB documents to Amazon S3, along with a link to an Amazon DocumentDB workshop for you to try the solution. We also provided a link to the Lambda function that is central to the solution, and walked through some of the critical code sections for better understanding.</p>\n<p>Do you have follow-up questions or feedback? Leave a comment. To get started with Amazon DocumentDB, refer to the <a href=\\"https://docs.aws.amazon.com/documentdb/latest/developerguide/get-started-guide.html\\" target=\\"_blank\\">Developer Guide</a>.</p>\\n<h4><a id=\\"About_the_authors_130\\"></a><strong>About the authors</strong></h4>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/5e6dad25932c4aee8edb74bc22f85157_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>Mark Mulligan</strong> is a Senior Database Specialist Solutions Architect at Amazon Web Services. He enjoys helping customers adopt Amazon’s purpose-built databases, both NoSQL and Relational to address business requirements and maximize return on investment. He started his career as a customer in roles including mainframe Systems Programmer and UNIX/Linux Systems Administrator providing him with customer’s perspective for requirements in the areas of cost, performance, operational excellence, security, reliability, and sustainability.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/d16e8c1b67054099894ac6a1e98bea80_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>Karthik Vijayraghavan</strong> is a Senior DocumentDB Specialist Solutions Architect at Amazon Web Services. He has been helping customers modernize their applications using NoSQL databases. He enjoys solving customer problems and is passionate about providing cost effective solutions that performs at scale. Karthik started his career as a developer building web and REST services with strong focus on integration with relational databases and hence can relate to customers that are in the process of migration to NoSQL.</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭