利用 ChangeStream 实现 Amazon DocumentDB 表级别容灾复制

0
0
{"value":"### **前言**\n与 MongoDB 兼容的[Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail),使用完全托管式文档数据库服务轻松扩展 JSON 工作负载,通过独立扩展计算和存储,支持每秒数以百万计文档的读取请求;自动化硬件预置、修补、设置和其他数据库管理任务;通过自动复制、连续备份和严格的网络隔离实现 99.999999999% 的持久性;将现有 MongoDB 驱动程序和工具与 Apache 2.0 开源 MongoDB 3.6 和 4.0 API 搭配使用。鉴于上述性能优势,越来越多的企业已经或即将使用 DocumentDB 来管理JSON文档数据库。\n\n对很多行业而言,需要保证数据与业务的持续性,存在关键业务与数据的容灾诉求。亚马逊云科技于2021年6月推出了面向 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail)(兼容 MongoDB)的全局集群(Global Cluster)。[全局集群](https://aws.amazon.com/cn/documentdb/global-clusters/)是一项新功能,可在发生区域范围的中断时提供灾难恢复,同时通过允许从最近的 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 集群读取来实现低延迟全局读取。客户可以将业务发生 Region 内的 DocumentDB 通过该功能同步至其他Region,轻松实现数据层的跨区域容灾。但由于 Global Cluster 全局集群功能是基于存储的快速复制,所以很遗憾,截止本文发稿时,DocumentDB Global Cluster全局集群仅支持实例级别的数据同步与复制,暂不支持 Database 或者 Collection 级别的数据容灾。\n\n亚马逊云科技还有另一款数据库产品 Amazon Data Migration Server(DMS),可以实现 Database 或者 Collection 级别的数据同步,以低延迟与较低的 RPO 指标实现数据的跨区域同步与复制,以实现容灾的需求。但在面对容灾场景中的数据保护诉求,DMS 暂不支持对删除类型的操作进行过滤。\n\n在本文中,我们将向您介绍使用 [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/cn/msk/?trk=cndc-detail)(MSK)作为消息中间件暂存 DocumentDB 的改变流事件Change Stream Events,来实现跨 Region 的数据库同步,并拦截删除类型的操作的整体解决方案。本例中,我们采用 us-east-1 弗吉尼亚北部区域作为主区域 Primary Region,已有 DocumentDB 主实例,us-west-2俄勒冈区域作为灾备区域 DR Region,已有 DocumentDB 灾备实例,使用了 python 作为编程语言,除 python 外您还可以使用其他主流编程语言譬如 Java,Node.JS 实现业务逻辑,但由于驱动原因,暂不支持 Ruby;另外请使用 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) v4.0 以上版本。参考架构图如下图所示:\n\n![image.png](https://dev-media.amazoncloud.cn/2721c70607d441e2894a9ca52e123382_image.png)\n\n##### **主 region 的 stream-capture 主机环境设置**\n**1.在主 region 的 stream-capture 主机上设置OS参数环境**\nCode部分:\n\n##设置环境变量,请替换红色的文字部分为您实际的值,本文中默认采用 bar.foo 为改变流监控 collection,您可以替换为您自定义的其他 DB 与 collection\n```\\n##设置环境变量,请替换红色的文字部分为您实际的值,本文中默认采用 bar.foo 为改变流监控 collection,您可以替换为您自定义的其他DB 与 collection\\necho -e \\"USERNAME=\\"Your Primary MongoDB User\\"\\\\nexport USERNAME\\\\nPASSWORD=\\"Your Primary MongoDB password\\"\\\\nexport PASSWORD\\\\nmongo_host=\\"Primary MongoDB Cluster URI\\"\\\\nexport mongo_host\\\\nstate_tbl=\\"YOUR STATE COLLECTION\\"\\\\nexport state_tbl\\\\nstate_db=\\"YOUR STATE DB\\"\\\\nexport state_db\\\\nwatched_db_name=\\"bar\\"\\\\nexport watched_db_name\\\\nwatched_tbl_name=\\"foo\\"\\\\nexport watched_tbl_name\\\\nevents_remain=1\\\\nexport events_remain\\\\nDocuments_per_run=100000\\\\nexport Documents_per_run\\\\nkfk_host=\\"YOUR KFK URI\\\\nexport kfk_host\\\\nkfk_topic=\\"changevents\\"\\\\nexport kfk_topic\\\\nbucket_name=\\"YOUR S3 BUCKET\\"\\\\nexport bucket_name\\\\nS3_prefix=\\"\\"\\\\nexport S3_prefix\\"\\" >> .bash_profile\\n##应用环境变量\\nsource .bash_profile\\n```\n**2. 在主 region 的 stream-capture 主机上安装 pymongo 与 boto3**\n请参考如何在 [Amazon Linux 2 上使用 Boto 3 库创建 Python 3 虚拟环境](https://aws.amazon.com/cn/premiumsupport/knowledge-center/ec2-linux-python3-boto3/#:~:text=Install%20Python%203%20for%20Amazon%20Linux%202%201.,be%20required%20to%20create%20the%20Python%203%20environment.)\n\n完成 python3 与 boto3 的安装与配置,本文不再复述\n```\\n##安装 pymongo\\nsudo pip install pymongo\\n```\n**3. 在主 region 的 stream-capture 主机上安装 MongoDB 客户端与证书**\n```\\n##下载 SSL 证书到 /tmp 下\\nwget -P /tmp https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem\\n\\n##配置 MongoDB 的 YUM REPO\\nsudo echo -e \\"[mongodb-org-5.0]\\\\nname=MongoDB Repository\\\\nbaseurl=https://repo.mongodb.org/yum/amazon/2/mongodb-org/5.0/x86_64/\\\\ngpgcheck=1\\\\nenabled=\\\\ngpgkey=https://www.mongodb.org/static/pgp/server-5.0.asc\\" >> /etc/yum.repos.d/mongodb-org-5.0.repo\\n##安装M ongoDB 客户端\\nsudo yum install -y mongodb-org-shell\\n```\n##### **创建 MSK 的 Topic 用以接受改变流事件**\n请参照[本文档](https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-topic.html)【开始使用MSK第3步:创建主题】来创建 MSK 的 topic,本文不再复述。请将步骤12中的–topic MSKTutorialTopic替换–topic changevents 之后,执行第步骤12\n\n我们将可以看到如下消息:\n```\\nCreated topic changevents.\\n```\n###### **启用[Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail)改变流**\n\n**1.使用 mongosh 客户端登陆主 DocumentDB 集群**\n```\\nMongo --host \$mongo_host:27017 --ssl --sslCAFile\\n/tmp/rds-combined-ca-bundle.pem --username \$USERNAME --password \$PASSWORD\\n```\n**2.对 bar.foo 启用改变流**\n```\\ndb.adminCommand({modifyChangeStreams: 1,database: \\"bar\\",collection: \\"foo\\", enable: true});\\n```\n#### **3.确认成功**\n```\\n{ \\"ok\\" : 1 }\\n```\n\n### **主 region 的改变流捕获程序**\n```\\n#!/bin/env python\\n\\nimport json\\nimport logging\\nimport os\\nimport time\\nimport boto3\\nimport datetime\\nfrom pymongo import MongoClient\\nfrom pymongo.errors import OperationFailure\\nfrom kafka import KafkaProducer\\n\\ndb_client = None\\nkafka_client = None \\ns3_client = None \\n \\nlogging.basicConfig(Level=logging.ERROR)\\n\\n# The error code returned when data for the requested resume token has been deleted\\nerr_code_136 = 136\\n\\n\\ndef get_db_client():\\n\\n # Use a global variable if CX has interest in Lambda function instead of long-running python\\n global db_client\\n\\n if db_client is None:\\n logging.debug('Creating a new DB client.')\\n\\n try:\\n\\n username = os.environ[‘USERNAME’]\\n password = os.environ[‘PASSWORD’]\\n cluster_uri = os.environ['mongo_host'] \\n db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='/tmp/rds-combined-ca-bundle.pem')\\n # Make an attemp for connecting\\n db_client.admin.command('ismaster')\\n db_client[\\"admin\\"].authenticate(name=username, password=password)\\n logging.debug('Successfully created a new DB client.')\\n except Exception as err:\\n logging.error('Failed to create a new DB client: {}'.format(err))\\n raise\\n\\n return db_client\\n\\n\\ndef get_state_tbl_client():\\n\\n \\"\\"\\"Return a DocumentDB client for the collection in which we store processing state.\\"\\"\\"\\n\\n try:\\n\\n db_client = get_db_client()\\n state_db_name = os.environ['state_db']\\n state_tbl_name = os.environ['state_tbl']\\n state_tbl = db_client[state_db_name][state_tbl_name]\\n except Exception as err:\\n logging.error('Failed to create new state collection client: {}'.format(err))\\n raise\\n\\n return state_tbl\\n\\n\\ndef get_last_position():\\n\\n last_position = None\\n logging.debug('Locate the last position.’)\\n try:\\n\\n state_tbl = get_state_tbl_client()\\n if \\"watched_tbl_name\\" in os.environ:\\n position_point = state_tbl.find_one({'currentposition': True, 'watched_db': str(os.environ['watched_db_name']), \\n 'watched_tbl': str(os.environ['watched_tbl_name']), 'db_level': False})\\n else:\\n position_point = state_tbl.find_one({'currentposition': True, 'db_level': True, \\n 'watched_db': str(os.environ['watched_db_name'])})\\n \\n if position_point is not None:\\n if 'lastProcessed' in position_point: \\n last_position = position_point['lastProcessed']\\n else:\\n if \\"watched_tbl_name\\" in os.environ:\\n state_tbl.insert_one({'watched_db': str(os.environ['watched_db_name']),\\n 'watched_tbl': str(os.environ['watched_tbl_name']), 'currentposition': True, 'db_level': False})\\n else:\\n state_tbl.insert_one({'watched_db': str(os.environ['watched_db_name']), 'currentposition': True, \\n 'db_level': True})\\n\\n except Exception as err:\\n logging.error('Failed to locate the last processed id: {}'.format(err))\\n raise\\n\\n return last_position\\n\\n\\ndef save_last_position(resume_token):\\n\\n \\"\\"\\"Save the resume token by the last successfully processed change event.\\"\\"\\"\\n\\n logging.debug('Saving last processed id.')\\n try:\\n\\n state_tbl = get_state_tbl_client()\\n if \\"watched_tbl_name\\" in os.environ:\\n state_tbl.update_one({'watched_db': str(os.environ['watched_db_name']), \\n 'watched_tbl': str(os.environ['watched_tbl_name'])},{'\$set': {'lastProcessed': resume_token}})\\n else:\\n state_tbl.update_one({'watched_db': str(os.environ['watched_db_name']), 'db_level': True, },\\n {'\$set': {'lastProcessed': resume_token}})\\n\\n except Exception as err:\\n logging.error('Failed to save last processed id: {}'.format(err))\\n raise\\n\\n\\ndef conn_kfk_producer():\\n\\n # Use a global variable if CX has interest in Lambda function instead of long-running python\\n global kafka_client\\n \\n if kafka_client is None:\\n logging.debug('Creating a new Kafka client.')\\n\\n try:\\n\\n kafka_client = KafkaProducer(bootstrap_servers=os.environ['kfk_host'])\\n except Exception as err:\\n logging.error('Failed to create a new Kafka client: {}'.format(err))\\n raise\\n \\n return kafka_client\\n\\n\\ndef produce_msg(producer_instance, topic_name, key, value):\\n\\n \\"\\"\\"Produce change events to MSK.\\"\\"\\"\\n \\n try:\\n\\n topic_name = os.environ['kfk_topic']\\n producer_instance = KafkaProducer(key_serializer=lambda key: json.dumps(key).encode('utf-8’),value_serializer=lambda value: json.dumps(value).encode('utf-8’),retries=3)\\n producer_instance.send(topic_name, key, value)\\n producer_instance.flush()\\n except Exception as err:\\n logging.error('Error in publishing message: {}'.format(err))\\n raise\\n\\n\\ndef write_S3(event, database, collection, doc_id):\\n\\n global s3_client\\n\\n if s3_client is None:\\n s3_client = boto3.resource('s3') \\n\\n try:\\n logging.debug('Publishing message to S3.') #, str(os.environ['S3_prefix'])\\n if \\"S3_prefix\\" in os.environ:\\n s3_client.Object(os.environ['bucket_name'], str(os.environ['S3_prefix']) + '/' + database + '/' +\\n collection + '/' + datetime.datetime.now().strftime('%Y/%m/%d/') + doc_id).put(Body=event)\\n else: \\n s3_client.Object(os.environ['bucket_name'], database + '/' + collection + '/' + \\n datetime.datetime.now().strftime('%Y/%m/%d/') + doc_id).put(Body=event)\\n\\n except Exception as err:\\n logging.error('Error in publishing message to S3: {}'.format(err))\\n raise\\n\\ndef main(event, context):\\n \\"\\"\\"Read change events from DocumentDB and push them to MSK&S3.\\"\\"\\"\\n \\n events_processed = 0\\n watcher = None\\n kafka_client = None\\n\\n try:\\n \\n # Kafka client set up \\n if \\"kfk_host\\" in os.environ:\\n kafka_client = conn_kfk_producer() \\n logging.debug('Kafka client set up.') \\n\\n # DocumentDB watched collection set up\\n db_client = get_db_client()\\n watched_db = os.environ['watched_db_name']\\n if \\"watched_tbl_name\\" in os.environ:\\n watched_tbl = os.environ['watched_tbl_name']\\n watcher = db_client[watched_db][watched_tbl]\\n else: \\n watcher = db_client[watched_db]\\n logging.debug('Watching table {}'.format(watcher))\\n\\n # DocumentDB sync set up\\n state_sync_count = int(os.environ['events_remain'])\\n last_position = get_last_position()\\n logging.debug(\\"last_position: {}\\".format(last_position))\\n\\n with watcher.watch(full_document='updateLookup', resume_after=last_position) as change_stream:\\n i = 0\\n state = 0\\n\\n while change_stream.alive and i < int(os.environ['Documents_per_run']):\\n \\n i += 1\\n change_event = change_stream.try_next()\\n logging.debug('Event: {}'.format(change_event))\\n \\n \\n if change_event is None:\\n Time.sleep(0.5)\\n Continue\\n else:\\n op_type = change_event['operationType']\\n op_id = change_event['_id']['_data']\\n\\n if op_type == insert': \\n doc_body = change_event['fullDocument']\\n doc_id = str(doc_body.pop(\\"_id\\", None))\\n insert_body = doc_body\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n doc_body.update({'insert_body':json.dumps(insert_body)})\\n doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})\\n payload = {'_id':doc_id}\\n payload.update(doc_body)\\n # Publish event to MSK\\n produce_msg(kafka_client, kfk_topic, op_id, payload)\\n\\n\\n if op_type == 'update': \\n doc_id = str(documentKey[\\"_id\\"])\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n doc_body.update({'updateDescription':json.dumps(updateDescription)})\\n doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})\\n payload = {'_id':doc_id}\\n payload.update(doc_body)\\n # Publish event to MSK\\n produce_msg(kafka_client, kfk_topic, op_id, payload)\\n\\n\\n if op_type == 'delete':\\n doc_id = str(change_event['documentKey']['_id'])\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})\\n payload = {'_id':doc_id}\\n payload.update(doc_body)\\n # Append event for S3\\n if \\"bucket_name\\" in os.environ:\\n write_S3(op_id, json.dumps(payload))\\n\\n logging.debug('Processed event ID {}'.format(op_id))\\n\\n events_processed += 1\\n\\n except OperationFailure as of:\\n if of.code == err_code_136:\\n # Data for the last processed ID has been deleted in the change stream,\\n # Store the last known good state so our next invocation\\n # starts from the most recently available data\\n save_last_position(None)\\n raise\\n\\n except Exception as err:\\n logging.error(‘Positionpoint lost: {}'.format(err))\\n raise\\n\\n else:\\n \\n if events_processed > 0:\\n\\n save_last_position(change_stream.resume_token)\\n logging.debug('Synced token {} to state collection'.format(change_stream.resume_token))\\n return{\\n 'statusCode': 200,\\n 'description': 'Success',\\n 'detail': json.dumps(str(events_processed)+ ' records processed successfully.')\\n }\\n else:\\n return{\\n 'statusCode': 201,\\n 'description': 'Success',\\n 'detail': json.dumps('No records to process.')\\n }\\n\\n finally:\\n\\n # Close Kafka client\\n if \\"kfk_host\\" in os.environ: \\n kafka_client.close()\\n```\n### **容灾 region 的 stream-apply 主机环境设置**\nCode 部分:\n```\\n##设置环境变量,请替换红色的文字部分为您实际的值\\necho -e \\"DR_USERNAME=\\"Your DR MongoDB User\\"\\\\nexport DR_USERNAME\\\\nDR_PASSWORD=\\"Your DR MongoDB Password\\"\\\\nexport DR_PASSWORD\\\\nDR_mongo_host=\\"Your DR MongoDB cluster URI\\"\\\\nexport DR_mongo_host\\\\nkfk_host=\\"YOUR KFK URI\\\\nexport kfk_host\\\\nkfk_topic=\\"changevents\\"\\\\nexport kfk_topic \\\\nDocuments_per_run=100000\\\\nexport Documents_per_run\\" >> .bash_profile\\n##应用环境变量\\nsource .bash_profile\\n```\n### **容灾 region 的改变流应用程序**\n在 stream-apply 主机上部署下列 python 代码并运行\n\nPython Code:\n```\\n#!/bin/env python\\n\\nimport json\\nimport logging\\nimport os\\nimport string\\nimport sys\\nimport time\\nimport boto3\\nimport datetime\\nfrom pymongo import MongoClient\\nfrom kafka import KafkaConsumer\\n \\ndb_client = None \\nkafka_client = None \\n\\n\\"\\"\\"ERROR level for deployment.\\"\\"\\" \\nlogging.basicConfig(Level=logging.ERROR)\\n\\ndef get_db_client():\\n global db_client\\n\\n if db_client is None:\\n logging.debug('Creating a new DB client.')\\n\\n try:\\n\\n username = os.environ[‘DR_USERNAME’]\\n password = os.environ[‘DR_PASSWORD’]\\n cluster_uri = os.environ[‘DR_mongo_host'] \\n db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='/tmp/rds-combined-ca-bundle.pem')\\n # Make an attemp for connecting\\n db_client.admin.command('ismaster')\\n db_client[\\"admin\\"].authenticate(name=username, password=password)\\n logging.debug('Successfully created a new DB client.')\\n except Exception as err:\\n logging.error('Failed to create a new DB client: {}'.format(err))\\n raise\\n\\n return db_client\\n\\ndef conn_kfk_comsumer():\\n global kafka_client\\n \\n if kafka_client is None:\\n logging.debug('Creating a new Kafka client.')\\n\\n try:\\n\\n kafka_client = KafkaConsumer(bootstrap_servers=os.environ['kfk_host'])\\n except Exception as err:\\n logging.error('Failed to create a new Kafka client: {}'.format(err))\\n raise\\n \\n return kafka_client\\n\\ndef poll_msg(consumer, topic_name, key, value):\\n \\"\\"\\"Poll documentdb changes from MSK.\\"\\"\\"\\n \\n try:\\n\\n \\t\\ttopic_name = os.environ['kfk_topic']\\n \\t\\tconsumer = KafkaConsumer(topic_name, bootstrap_servers= os.environ['kfk_host'], auto_offset_reset=‘latest’, group_id=‘docdb’, key_deserializer=lambda key: json.loads(key).decode('utf-8’), value_deserializer=lambda value: json.loads(value).decode('utf-8’))\\n \\t\\tconsumer.subscribe(topic_name, key, value)\\n \\t\\tconsumer.poll(max_records=1)\\n except Exception as err:\\n \\t\\tlogging.error('Error in polling message: {}'.format(err))\\n \\t\\traise\\n\\n\\ndef apply2mongodb(message,db_client)\\n\\n try:\\n \\n \\t\\t# Kafka client set up \\n \\t\\tif \\"kfk_host\\" in os.environ:\\n \\t\\tkafka_client = conn_kfk_consumer() \\n \\t\\tlogging.debug('Kafka client set up.') \\n\\n \\t\\tdb_client = get_db_client()\\n\\n \\t\\tpartition = KafkaConsumer.assignment()\\n \\t\\tnext_offset = KafkaConsumer.position(partition)\\n\\t\\t\\t\\n \\t\\tif next_offset is None:\\n\\t\\t\\t\\t\\tTime.sleep(0.5)\\n\\t\\t\\t\\t\\tContinue\\n \\t\\telse:\\n\\t\\t\\t\\tpoll_msg(kafka_client, kfk_topic, op_id, payload)\\n\\t\\t\\t\\tfor message in consumer:\\n\\t\\t\\t\\tevent_body = message.value()\\n \\top_type = json.loads(event_body[‘operation'])\\n\\n\\t\\t\\t\\tif op_type == 'insert':\\n\\t\\t\\t\\t\\tcoll = json.loads(event_body['coll'])\\n\\t\\t\\t\\t\\tcoll_client = db_client(coll)\\n\\t\\t\\t\\t\\tinsert_body = json.loads(event_body[‘insert_body'])\\n\\t\\t\\t\\t\\tpayload = {'_id':ObjectId(json.loads(event_body['_id']))}\\n \\t\\tpayload.update(insert_body)\\n\\t\\t\\t\\t\\tcoll_client.insert_one(payload)\\n\\n\\t\\t\\t\\tif op_type == 'update':\\n\\t\\t\\t\\t\\tcoll = json.loads(event_body['coll'])\\n\\t\\t\\t\\t\\tcoll_client = db_client(coll)\\n\\t\\t\\t\\t\\tupdate_body = json.loads(event_body[‘updateDescription']['updatedFields'])\\n\\t\\t\\t\\t\\tupdate_set = {\\"\$set\\":update_body}\\n\\t\\t\\t\\t\\tpayload = {'_id':(json.loads(event_body['_id']))}\\n\\t\\t\\t\\t\\tcoll_client.update_one(payload,update_set)\\n\\n\\t\\t\\t\\t\\tevents_processed += 1\\n\\ndef main(event, context):\\n\\t events_processed = 0\\n kafka_client = None\\n\\n try:\\n\\n \\t\\t# DocumentDB watched collection set up\\n \\t\\tdb_client = get_db_client()\\n \\t\\tdr_db = os.environ['DR_mongo_host']\\n \\t\\tdr_db_client = db_client(dr_db)\\n \\t\\twhile i < int(os.environ['Documents_per_run']):\\n \\t\\tapply2mongodb(message,dr_db_client)\\n \\t\\ti += 1\\n\\n else:\\n\\n \\t\\tif events_processed > 0:\\n\\n \\t\\tlogging.debug(' {} events been processed successfully'.format(events_processed))\\n \\t\\treturn{\\n 'statusCode': 200,\\n 'description': 'Success',\\n 'detail': json.dumps(str(events_processed)+ ' events processed successfully.')\\n }\\n else:\\n return{\\n 'statusCode': 201,\\n 'description': 'Success',\\n 'detail': json.dumps('No records to process.')\\n }\\n\\n finally:\\n\\n # Close Kafka client\\n if \\"kfk_host\\" in os.environ: \\n kafka_client.close()\\n```\n### **结果验证**\n#### **1. 分别登陆主region与容灾region的DocumentDB**\n主region:\n```\\nmongo --host \$mongo_host:27017 --ssl --sslCAFile\\n/tmp/rds-combined-ca-bundle.pem --username \$USERNAME --password \$PASSWORD\\n```\n容灾region:\n```\\nmongo --host \$DR_mongo_host:27017 --ssl --sslCAFile\\n/tmp/rds-combined-ca-bundle.pem --username \$USERNAME --password \$PASSWORD\\n```\n**2. 在主 region 插入数据**\n```\\nuse bar;\\ndb.foo.insertOne({\\n\\"x\\":1}) ;\\n```\n**3. 在灾备 region 观察**\n```\\nuse bar;\\ndb.foo.find();\\n##得到结果\\n{\\"_id\\":ObjectId(9416e4a253875177a816b3d6),\\"x\\":1}\\n```\n**3. 在主 region 更新数据**\n```\\ndb.foo.updateOne({\\n\\"x\\":1},\\n{\\n\$set:{\\"x\\":2}}\\n);\\n```\n**4. 在灾备 region 观察**\n```\\ndb.foo.find();\\n##得到结果\\n{\\"_id\\":ObjectId(9416e4a253875177a816b3d6),\\"x\\":2}\\n```\n**5.在主 region 非监控表 exa 插入数据y=1**\n```\\ndb.exa.insertOne({\\n\\"y\\":1});\\n```\n**6.在主 region 观察有哪些表,发现新增加了 exa 这张表**\n```\\nshow tables;\\nexa\\nfoo\\n```\n**7. 在灾备 region 观察,并没有 exa 出现,因为 exa 并不在我们的 watched collection 里,不会捕捉相关的改变流**\n```\\nshow tables;\\nfoo\\n```\n#### **8. 在主region的foo表删除x记录**\n```\\ndb.foo.deleteOne({\\n\\"x\\":2}) ;\\n\\n##观察得到结果,主 region DocumentDB foo 表已被清空\\ndb.foo.find();\\n##得到结果为空\\n```\n**9. 在灾备 region 验证 foo 表内容**\n```\\ndb.foo.find();\\n##得到结果\\n{\\"_id\\":ObjectId(9416e4a253875177a816b3d6),\\"x\\":2}\\n##删除操作被拦截\\n```\n**10.下载 S3 中的文件,并打开,其中内容为**\n```\\n{\\"_id\\":\\"ObjectId(9416e4a253875177a816b3d6)\\", \\"operation\\":\\"delete\\", \\"timestamp\\":1658233222,\\"timestampReadable\\":\\"2022-07-19 20:20:22\\", \\"db\\":\\"bar\\",\\"coll\\":\\"foo\\"}\\n##验证了本条 delete 命令被拦截并保存在 S3 中。\\n```\n### **总结**\n我们在此文中,使用了 MSK 来异步保存 DocumentDB 的 insert/update 改变流,拦截 delete 类型的改变流存储在 S3 中备查。如果需要进一步对删除事件做出分析,可以引入 Amazon Glue 与[Amazon Athena](https: //aws.amazon.com/cn/athena/?trk=cndc-detail) 对存储于 S3 中的日志文件即席查询。MSK 中的改变流事件,我们将其应用在灾备区域的 DocumentDB,做到数据只增不减,避免主 region 的数据库因为意外误操作导致的数据损失或者高时间成本数据恢复操作。\n\n### **参考资源**\nAmazon Linux 2 上使用 Boto 3 库创建 Python 3 虚拟环境\n\n[https://aws.amazon.com/cn/premiumsupport/knowledge-center/ec2-linux-python3-boto3/#:~:text=Install%20Python%203%20for%20Amazon%20Linux%202%201.,be%20required%20to%20create%20the%20Python%203%20environment](https://aws.amazon.com/cn/premiumsupport/knowledge-center/ec2-linux-python3-boto3/#:~:text=Install%20Python%203%20for%20Amazon%20Linux%202%201.,be%20required%20to%20create%20the%20Python%203%20environment)\n\n创建MSK的Topic\n\n[https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-topic.html](https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-topic.html)\n### 本篇作者\n\n![image.png](https://dev-media.amazoncloud.cn/ce300775d8da49a588085c229be6fe02_image.png)\n\n**付晓明**\nAmazon 解决方案架构师,负责云计算解决方案的咨询与架构设计,同时致力于数据库,边缘计算方面的研究和推广。在加入亚马逊云科技之前曾在金融行业IT部门负责互联网券商架构的设计,对分布式,高并发,中间件等具有丰富经验。\n\n![image.png](https://dev-media.amazoncloud.cn/82fe36177e8c43f29b9f1d231698e320_image.png)\n\n**刘冰冰**\nAmazon 数据库解决方案架构师,负责基于Amazon 的数据库解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在加入Amazon 之前曾在Oracle工作多年,在数据库云规划、设计运维调优、DR解决方案、大数据和数仓以及企业应用等方面有丰富的经验。","render":"<h3><a id=\\"_0\\"></a><strong>前言</strong></h3>\\n<p>与 MongoDB 兼容的Amazon DocumentDB,使用完全托管式文档数据库服务轻松扩展 JSON 工作负载,通过独立扩展计算和存储,支持每秒数以百万计文档的读取请求;自动化硬件预置、修补、设置和其他数据库管理任务;通过自动复制、连续备份和严格的网络隔离实现 99.999999999% 的持久性;将现有 MongoDB 驱动程序和工具与 Apache 2.0 开源 MongoDB 3.6 和 4.0 API 搭配使用。鉴于上述性能优势,越来越多的企业已经或即将使用 DocumentDB 来管理JSON文档数据库。</p>\n<p>对很多行业而言,需要保证数据与业务的持续性,存在关键业务与数据的容灾诉求。亚马逊云科技于2021年6月推出了面向 Amazon DocumentDB(兼容 MongoDB)的全局集群(Global Cluster)。<a href=\\"https://aws.amazon.com/cn/documentdb/global-clusters/\\" target=\\"_blank\\">全局集群</a>是一项新功能,可在发生区域范围的中断时提供灾难恢复,同时通过允许从最近的 [Amazon DocumentDB](https://aws.amazon.com/cn/documentdb/?trk=cndc-detail) 集群读取来实现低延迟全局读取。客户可以将业务发生 Region 内的 DocumentDB 通过该功能同步至其他Region,轻松实现数据层的跨区域容灾。但由于 Global Cluster 全局集群功能是基于存储的快速复制,所以很遗憾,截止本文发稿时,DocumentDB Global Cluster全局集群仅支持实例级别的数据同步与复制,暂不支持 Database 或者 Collection 级别的数据容灾。</p>\\n<p>亚马逊云科技还有另一款数据库产品 Amazon Data Migration Server(DMS),可以实现 Database 或者 Collection 级别的数据同步,以低延迟与较低的 RPO 指标实现数据的跨区域同步与复制,以实现容灾的需求。但在面对容灾场景中的数据保护诉求,DMS 暂不支持对删除类型的操作进行过滤。</p>\n<p>在本文中,我们将向您介绍使用 Amazon Managed Streaming for Apache Kafka(MSK)作为消息中间件暂存 DocumentDB 的改变流事件Change Stream Events,来实现跨 Region 的数据库同步,并拦截删除类型的操作的整体解决方案。本例中,我们采用 us-east-1 弗吉尼亚北部区域作为主区域 Primary Region,已有 DocumentDB 主实例,us-west-2俄勒冈区域作为灾备区域 DR Region,已有 DocumentDB 灾备实例,使用了 python 作为编程语言,除 python 外您还可以使用其他主流编程语言譬如 Java,Node.JS 实现业务逻辑,但由于驱动原因,暂不支持 Ruby;另外请使用 Amazon DocumentDB v4.0 以上版本。参考架构图如下图所示:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/2721c70607d441e2894a9ca52e123382_image.png\\" alt=\\"image.png\\" /></p>\n<h5><a id=\\"_region__streamcapture__11\\"></a><strong>主 region 的 stream-capture 主机环境设置</strong></h5>\\n<p><strong>1.在主 region 的 stream-capture 主机上设置OS参数环境</strong><br />\\nCode部分:</p>\n<p>##设置环境变量,请替换红色的文字部分为您实际的值,本文中默认采用 bar.foo 为改变流监控 collection,您可以替换为您自定义的其他 DB 与 collection</p>\n<pre><code class=\\"lang-\\">##设置环境变量,请替换红色的文字部分为您实际的值,本文中默认采用 bar.foo 为改变流监控 collection,您可以替换为您自定义的其他DB 与 collection\\necho -e &quot;USERNAME=&quot;Your Primary MongoDB User&quot;\\\\nexport USERNAME\\\\nPASSWORD=&quot;Your Primary MongoDB password&quot;\\\\nexport PASSWORD\\\\nmongo_host=&quot;Primary MongoDB Cluster URI&quot;\\\\nexport mongo_host\\\\nstate_tbl=&quot;YOUR STATE COLLECTION&quot;\\\\nexport state_tbl\\\\nstate_db=&quot;YOUR STATE DB&quot;\\\\nexport state_db\\\\nwatched_db_name=&quot;bar&quot;\\\\nexport watched_db_name\\\\nwatched_tbl_name=&quot;foo&quot;\\\\nexport watched_tbl_name\\\\nevents_remain=1\\\\nexport events_remain\\\\nDocuments_per_run=100000\\\\nexport Documents_per_run\\\\nkfk_host=&quot;YOUR KFK URI\\\\nexport kfk_host\\\\nkfk_topic=&quot;changevents&quot;\\\\nexport kfk_topic\\\\nbucket_name=&quot;YOUR S3 BUCKET&quot;\\\\nexport bucket_name\\\\nS3_prefix=&quot;&quot;\\\\nexport S3_prefix&quot;&quot; &gt;&gt; .bash_profile\\n##应用环境变量\\nsource .bash_profile\\n</code></pre>\\n<p><strong>2. 在主 region 的 stream-capture 主机上安装 pymongo 与 boto3</strong><br />\\n请参考如何在 <a href=\\"https://aws.amazon.com/cn/premiumsupport/knowledge-center/ec2-linux-python3-boto3/#:~:text=Install%20Python%203%20for%20Amazon%20Linux%202%201.,be%20required%20to%20create%20the%20Python%203%20environment.\\" target=\\"_blank\\">Amazon Linux 2 上使用 Boto 3 库创建 Python 3 虚拟环境</a></p>\\n<p>完成 python3 与 boto3 的安装与配置,本文不再复述</p>\n<pre><code class=\\"lang-\\">##安装 pymongo\\nsudo pip install pymongo\\n</code></pre>\\n<p><strong>3. 在主 region 的 stream-capture 主机上安装 MongoDB 客户端与证书</strong></p>\\n<pre><code class=\\"lang-\\">##下载 SSL 证书到 /tmp 下\\nwget -P /tmp https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem\\n\\n##配置 MongoDB 的 YUM REPO\\nsudo echo -e &quot;[mongodb-org-5.0]\\\\nname=MongoDB Repository\\\\nbaseurl=https://repo.mongodb.org/yum/amazon/2/mongodb-org/5.0/x86_64/\\\\ngpgcheck=1\\\\nenabled=\\\\ngpgkey=https://www.mongodb.org/static/pgp/server-5.0.asc&quot; &gt;&gt; /etc/yum.repos.d/mongodb-org-5.0.repo\\n##安装M ongoDB 客户端\\nsudo yum install -y mongodb-org-shell\\n</code></pre>\\n<h5><a id=\\"_MSK__Topic__40\\"></a><strong>创建 MSK 的 Topic 用以接受改变流事件</strong></h5>\\n<p>请参照<a href=\\"https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-topic.html\\" target=\\"_blank\\">本文档</a>【开始使用MSK第3步:创建主题】来创建 MSK 的 topic,本文不再复述。请将步骤12中的–topic MSKTutorialTopic替换–topic changevents 之后,执行第步骤12</p>\\n<p>我们将可以看到如下消息:</p>\n<pre><code class=\\"lang-\\">Created topic changevents.\\n</code></pre>\\n<h6><a id=\\"Amazon_DocumentDB_47\\"></a><strong>启用Amazon DocumentDB改变流</strong></h6>\\n<p><strong>1.使用 mongosh 客户端登陆主 DocumentDB 集群</strong></p>\\n<pre><code class=\\"lang-\\">Mongo --host \$mongo_host:27017 --ssl --sslCAFile\\n/tmp/rds-combined-ca-bundle.pem --username \$USERNAME --password \$PASSWORD\\n</code></pre>\\n<p><strong>2.对 bar.foo 启用改变流</strong></p>\\n<pre><code class=\\"lang-\\">db.adminCommand({modifyChangeStreams: 1,database: &quot;bar&quot;,collection: &quot;foo&quot;, enable: true});\\n</code></pre>\\n<h4><a id=\\"3_58\\"></a><strong>3.确认成功</strong></h4>\\n<pre><code class=\\"lang-\\">{ &quot;ok&quot; : 1 }\\n</code></pre>\\n<h3><a id=\\"_region__63\\"></a><strong>主 region 的改变流捕获程序</strong></h3>\\n<pre><code class=\\"lang-\\">#!/bin/env python\\n\\nimport json\\nimport logging\\nimport os\\nimport time\\nimport boto3\\nimport datetime\\nfrom pymongo import MongoClient\\nfrom pymongo.errors import OperationFailure\\nfrom kafka import KafkaProducer\\n\\ndb_client = None\\nkafka_client = None \\ns3_client = None \\n \\nlogging.basicConfig(Level=logging.ERROR)\\n\\n# The error code returned when data for the requested resume token has been deleted\\nerr_code_136 = 136\\n\\n\\ndef get_db_client():\\n\\n # Use a global variable if CX has interest in Lambda function instead of long-running python\\n global db_client\\n\\n if db_client is None:\\n logging.debug('Creating a new DB client.')\\n\\n try:\\n\\n username = os.environ[‘USERNAME’]\\n password = os.environ[‘PASSWORD’]\\n cluster_uri = os.environ['mongo_host'] \\n db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='/tmp/rds-combined-ca-bundle.pem')\\n # Make an attemp for connecting\\n db_client.admin.command('ismaster')\\n db_client[&quot;admin&quot;].authenticate(name=username, password=password)\\n logging.debug('Successfully created a new DB client.')\\n except Exception as err:\\n logging.error('Failed to create a new DB client: {}'.format(err))\\n raise\\n\\n return db_client\\n\\n\\ndef get_state_tbl_client():\\n\\n &quot;&quot;&quot;Return a DocumentDB client for the collection in which we store processing state.&quot;&quot;&quot;\\n\\n try:\\n\\n db_client = get_db_client()\\n state_db_name = os.environ['state_db']\\n state_tbl_name = os.environ['state_tbl']\\n state_tbl = db_client[state_db_name][state_tbl_name]\\n except Exception as err:\\n logging.error('Failed to create new state collection client: {}'.format(err))\\n raise\\n\\n return state_tbl\\n\\n\\ndef get_last_position():\\n\\n last_position = None\\n logging.debug('Locate the last position.’)\\n try:\\n\\n state_tbl = get_state_tbl_client()\\n if &quot;watched_tbl_name&quot; in os.environ:\\n position_point = state_tbl.find_one({'currentposition': True, 'watched_db': str(os.environ['watched_db_name']), \\n 'watched_tbl': str(os.environ['watched_tbl_name']), 'db_level': False})\\n else:\\n position_point = state_tbl.find_one({'currentposition': True, 'db_level': True, \\n 'watched_db': str(os.environ['watched_db_name'])})\\n \\n if position_point is not None:\\n if 'lastProcessed' in position_point: \\n last_position = position_point['lastProcessed']\\n else:\\n if &quot;watched_tbl_name&quot; in os.environ:\\n state_tbl.insert_one({'watched_db': str(os.environ['watched_db_name']),\\n 'watched_tbl': str(os.environ['watched_tbl_name']), 'currentposition': True, 'db_level': False})\\n else:\\n state_tbl.insert_one({'watched_db': str(os.environ['watched_db_name']), 'currentposition': True, \\n 'db_level': True})\\n\\n except Exception as err:\\n logging.error('Failed to locate the last processed id: {}'.format(err))\\n raise\\n\\n return last_position\\n\\n\\ndef save_last_position(resume_token):\\n\\n &quot;&quot;&quot;Save the resume token by the last successfully processed change event.&quot;&quot;&quot;\\n\\n logging.debug('Saving last processed id.')\\n try:\\n\\n state_tbl = get_state_tbl_client()\\n if &quot;watched_tbl_name&quot; in os.environ:\\n state_tbl.update_one({'watched_db': str(os.environ['watched_db_name']), \\n 'watched_tbl': str(os.environ['watched_tbl_name'])},{'\$set': {'lastProcessed': resume_token}})\\n else:\\n state_tbl.update_one({'watched_db': str(os.environ['watched_db_name']), 'db_level': True, },\\n {'\$set': {'lastProcessed': resume_token}})\\n\\n except Exception as err:\\n logging.error('Failed to save last processed id: {}'.format(err))\\n raise\\n\\n\\ndef conn_kfk_producer():\\n\\n # Use a global variable if CX has interest in Lambda function instead of long-running python\\n global kafka_client\\n \\n if kafka_client is None:\\n logging.debug('Creating a new Kafka client.')\\n\\n try:\\n\\n kafka_client = KafkaProducer(bootstrap_servers=os.environ['kfk_host'])\\n except Exception as err:\\n logging.error('Failed to create a new Kafka client: {}'.format(err))\\n raise\\n \\n return kafka_client\\n\\n\\ndef produce_msg(producer_instance, topic_name, key, value):\\n\\n &quot;&quot;&quot;Produce change events to MSK.&quot;&quot;&quot;\\n \\n try:\\n\\n topic_name = os.environ['kfk_topic']\\n producer_instance = KafkaProducer(key_serializer=lambda key: json.dumps(key).encode('utf-8’),value_serializer=lambda value: json.dumps(value).encode('utf-8’),retries=3)\\n producer_instance.send(topic_name, key, value)\\n producer_instance.flush()\\n except Exception as err:\\n logging.error('Error in publishing message: {}'.format(err))\\n raise\\n\\n\\ndef write_S3(event, database, collection, doc_id):\\n\\n global s3_client\\n\\n if s3_client is None:\\n s3_client = boto3.resource('s3') \\n\\n try:\\n logging.debug('Publishing message to S3.') #, str(os.environ['S3_prefix'])\\n if &quot;S3_prefix&quot; in os.environ:\\n s3_client.Object(os.environ['bucket_name'], str(os.environ['S3_prefix']) + '/' + database + '/' +\\n collection + '/' + datetime.datetime.now().strftime('%Y/%m/%d/') + doc_id).put(Body=event)\\n else: \\n s3_client.Object(os.environ['bucket_name'], database + '/' + collection + '/' + \\n datetime.datetime.now().strftime('%Y/%m/%d/') + doc_id).put(Body=event)\\n\\n except Exception as err:\\n logging.error('Error in publishing message to S3: {}'.format(err))\\n raise\\n\\ndef main(event, context):\\n &quot;&quot;&quot;Read change events from DocumentDB and push them to MSK&amp;S3.&quot;&quot;&quot;\\n \\n events_processed = 0\\n watcher = None\\n kafka_client = None\\n\\n try:\\n \\n # Kafka client set up \\n if &quot;kfk_host&quot; in os.environ:\\n kafka_client = conn_kfk_producer() \\n logging.debug('Kafka client set up.') \\n\\n # DocumentDB watched collection set up\\n db_client = get_db_client()\\n watched_db = os.environ['watched_db_name']\\n if &quot;watched_tbl_name&quot; in os.environ:\\n watched_tbl = os.environ['watched_tbl_name']\\n watcher = db_client[watched_db][watched_tbl]\\n else: \\n watcher = db_client[watched_db]\\n logging.debug('Watching table {}'.format(watcher))\\n\\n # DocumentDB sync set up\\n state_sync_count = int(os.environ['events_remain'])\\n last_position = get_last_position()\\n logging.debug(&quot;last_position: {}&quot;.format(last_position))\\n\\n with watcher.watch(full_document='updateLookup', resume_after=last_position) as change_stream:\\n i = 0\\n state = 0\\n\\n while change_stream.alive and i &lt; int(os.environ['Documents_per_run']):\\n \\n i += 1\\n change_event = change_stream.try_next()\\n logging.debug('Event: {}'.format(change_event))\\n \\n \\n if change_event is None:\\n Time.sleep(0.5)\\n Continue\\n else:\\n op_type = change_event['operationType']\\n op_id = change_event['_id']['_data']\\n\\n if op_type == insert': \\n doc_body = change_event['fullDocument']\\n doc_id = str(doc_body.pop(&quot;_id&quot;, None))\\n insert_body = doc_body\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n doc_body.update({'insert_body':json.dumps(insert_body)})\\n doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})\\n payload = {'_id':doc_id}\\n payload.update(doc_body)\\n # Publish event to MSK\\n produce_msg(kafka_client, kfk_topic, op_id, payload)\\n\\n\\n if op_type == 'update': \\n doc_id = str(documentKey[&quot;_id&quot;])\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n doc_body.update({'updateDescription':json.dumps(updateDescription)})\\n doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})\\n payload = {'_id':doc_id}\\n payload.update(doc_body)\\n # Publish event to MSK\\n produce_msg(kafka_client, kfk_topic, op_id, payload)\\n\\n\\n if op_type == 'delete':\\n doc_id = str(change_event['documentKey']['_id'])\\n readable = datetime.datetime.fromtimestamp(change_event['clusterTime'].time).isoformat()\\n doc_body.update({'operation':op_type,'timestamp':str(change_event['clusterTime'].time),'timestampReadable':str(readable)})\\n doc_body.update({'db':str(change_event['ns']['db']),'coll':str(change_event['ns']['coll'])})\\n payload = {'_id':doc_id}\\n payload.update(doc_body)\\n # Append event for S3\\n if &quot;bucket_name&quot; in os.environ:\\n write_S3(op_id, json.dumps(payload))\\n\\n logging.debug('Processed event ID {}'.format(op_id))\\n\\n events_processed += 1\\n\\n except OperationFailure as of:\\n if of.code == err_code_136:\\n # Data for the last processed ID has been deleted in the change stream,\\n # Store the last known good state so our next invocation\\n # starts from the most recently available data\\n save_last_position(None)\\n raise\\n\\n except Exception as err:\\n logging.error(‘Positionpoint lost: {}'.format(err))\\n raise\\n\\n else:\\n \\n if events_processed &gt; 0:\\n\\n save_last_position(change_stream.resume_token)\\n logging.debug('Synced token {} to state collection'.format(change_stream.resume_token))\\n return{\\n 'statusCode': 200,\\n 'description': 'Success',\\n 'detail': json.dumps(str(events_processed)+ ' records processed successfully.')\\n }\\n else:\\n return{\\n 'statusCode': 201,\\n 'description': 'Success',\\n 'detail': json.dumps('No records to process.')\\n }\\n\\n finally:\\n\\n # Close Kafka client\\n if &quot;kfk_host&quot; in os.environ: \\n kafka_client.close()\\n</code></pre>\\n<h3><a id=\\"_region__streamapply__358\\"></a><strong>容灾 region 的 stream-apply 主机环境设置</strong></h3>\\n<p>Code 部分:</p>\n<pre><code class=\\"lang-\\">##设置环境变量,请替换红色的文字部分为您实际的值\\necho -e &quot;DR_USERNAME=&quot;Your DR MongoDB User&quot;\\\\nexport DR_USERNAME\\\\nDR_PASSWORD=&quot;Your DR MongoDB Password&quot;\\\\nexport DR_PASSWORD\\\\nDR_mongo_host=&quot;Your DR MongoDB cluster URI&quot;\\\\nexport DR_mongo_host\\\\nkfk_host=&quot;YOUR KFK URI\\\\nexport kfk_host\\\\nkfk_topic=&quot;changevents&quot;\\\\nexport kfk_topic \\\\nDocuments_per_run=100000\\\\nexport Documents_per_run&quot; &gt;&gt; .bash_profile\\n##应用环境变量\\nsource .bash_profile\\n</code></pre>\\n<h3><a id=\\"_region__366\\"></a><strong>容灾 region 的改变流应用程序</strong></h3>\\n<p>在 stream-apply 主机上部署下列 python 代码并运行</p>\n<p>Python Code:</p>\n<pre><code class=\\"lang-\\">#!/bin/env python\\n\\nimport json\\nimport logging\\nimport os\\nimport string\\nimport sys\\nimport time\\nimport boto3\\nimport datetime\\nfrom pymongo import MongoClient\\nfrom kafka import KafkaConsumer\\n \\ndb_client = None \\nkafka_client = None \\n\\n&quot;&quot;&quot;ERROR level for deployment.&quot;&quot;&quot; \\nlogging.basicConfig(Level=logging.ERROR)\\n\\ndef get_db_client():\\n global db_client\\n\\n if db_client is None:\\n logging.debug('Creating a new DB client.')\\n\\n try:\\n\\n username = os.environ[‘DR_USERNAME’]\\n password = os.environ[‘DR_PASSWORD’]\\n cluster_uri = os.environ[‘DR_mongo_host'] \\n db_client = MongoClient(cluster_uri, ssl=True, retryWrites=False, ssl_ca_certs='/tmp/rds-combined-ca-bundle.pem')\\n # Make an attemp for connecting\\n db_client.admin.command('ismaster')\\n db_client[&quot;admin&quot;].authenticate(name=username, password=password)\\n logging.debug('Successfully created a new DB client.')\\n except Exception as err:\\n logging.error('Failed to create a new DB client: {}'.format(err))\\n raise\\n\\n return db_client\\n\\ndef conn_kfk_comsumer():\\n global kafka_client\\n \\n if kafka_client is None:\\n logging.debug('Creating a new Kafka client.')\\n\\n try:\\n\\n kafka_client = KafkaConsumer(bootstrap_servers=os.environ['kfk_host'])\\n except Exception as err:\\n logging.error('Failed to create a new Kafka client: {}'.format(err))\\n raise\\n \\n return kafka_client\\n\\ndef poll_msg(consumer, topic_name, key, value):\\n &quot;&quot;&quot;Poll documentdb changes from MSK.&quot;&quot;&quot;\\n \\n try:\\n\\n \\t\\ttopic_name = os.environ['kfk_topic']\\n \\t\\tconsumer = KafkaConsumer(topic_name, bootstrap_servers= os.environ['kfk_host'], auto_offset_reset=‘latest’, group_id=‘docdb’, key_deserializer=lambda key: json.loads(key).decode('utf-8’), value_deserializer=lambda value: json.loads(value).decode('utf-8’))\\n \\t\\tconsumer.subscribe(topic_name, key, value)\\n \\t\\tconsumer.poll(max_records=1)\\n except Exception as err:\\n \\t\\tlogging.error('Error in polling message: {}'.format(err))\\n \\t\\traise\\n\\n\\ndef apply2mongodb(message,db_client)\\n\\n try:\\n \\n \\t\\t# Kafka client set up \\n \\t\\tif &quot;kfk_host&quot; in os.environ:\\n \\t\\tkafka_client = conn_kfk_consumer() \\n \\t\\tlogging.debug('Kafka client set up.') \\n\\n \\t\\tdb_client = get_db_client()\\n\\n \\t\\tpartition = KafkaConsumer.assignment()\\n \\t\\tnext_offset = KafkaConsumer.position(partition)\\n\\t\\t\\t\\n \\t\\tif next_offset is None:\\n\\t\\t\\t\\t\\tTime.sleep(0.5)\\n\\t\\t\\t\\t\\tContinue\\n \\t\\telse:\\n\\t\\t\\t\\tpoll_msg(kafka_client, kfk_topic, op_id, payload)\\n\\t\\t\\t\\tfor message in consumer:\\n\\t\\t\\t\\tevent_body = message.value()\\n \\top_type = json.loads(event_body[‘operation'])\\n\\n\\t\\t\\t\\tif op_type == 'insert':\\n\\t\\t\\t\\t\\tcoll = json.loads(event_body['coll'])\\n\\t\\t\\t\\t\\tcoll_client = db_client(coll)\\n\\t\\t\\t\\t\\tinsert_body = json.loads(event_body[‘insert_body'])\\n\\t\\t\\t\\t\\tpayload = {'_id':ObjectId(json.loads(event_body['_id']))}\\n \\t\\tpayload.update(insert_body)\\n\\t\\t\\t\\t\\tcoll_client.insert_one(payload)\\n\\n\\t\\t\\t\\tif op_type == 'update':\\n\\t\\t\\t\\t\\tcoll = json.loads(event_body['coll'])\\n\\t\\t\\t\\t\\tcoll_client = db_client(coll)\\n\\t\\t\\t\\t\\tupdate_body = json.loads(event_body[‘updateDescription']['updatedFields'])\\n\\t\\t\\t\\t\\tupdate_set = {&quot;\$set&quot;:update_body}\\n\\t\\t\\t\\t\\tpayload = {'_id':(json.loads(event_body['_id']))}\\n\\t\\t\\t\\t\\tcoll_client.update_one(payload,update_set)\\n\\n\\t\\t\\t\\t\\tevents_processed += 1\\n\\ndef main(event, context):\\n\\t events_processed = 0\\n kafka_client = None\\n\\n try:\\n\\n \\t\\t# DocumentDB watched collection set up\\n \\t\\tdb_client = get_db_client()\\n \\t\\tdr_db = os.environ['DR_mongo_host']\\n \\t\\tdr_db_client = db_client(dr_db)\\n \\t\\twhile i &lt; int(os.environ['Documents_per_run']):\\n \\t\\tapply2mongodb(message,dr_db_client)\\n \\t\\ti += 1\\n\\n else:\\n\\n \\t\\tif events_processed &gt; 0:\\n\\n \\t\\tlogging.debug(' {} events been processed successfully'.format(events_processed))\\n \\t\\treturn{\\n 'statusCode': 200,\\n 'description': 'Success',\\n 'detail': json.dumps(str(events_processed)+ ' events processed successfully.')\\n }\\n else:\\n return{\\n 'statusCode': 201,\\n 'description': 'Success',\\n 'detail': json.dumps('No records to process.')\\n }\\n\\n finally:\\n\\n # Close Kafka client\\n if &quot;kfk_host&quot; in os.environ: \\n kafka_client.close()\\n</code></pre>\\n<h3><a id=\\"_519\\"></a><strong>结果验证</strong></h3>\\n<h4><a id=\\"1_regionregionDocumentDB_520\\"></a><strong>1. 分别登陆主region与容灾region的DocumentDB</strong></h4>\\n<p>主region:</p>\n<pre><code class=\\"lang-\\">mongo --host \$mongo_host:27017 --ssl --sslCAFile\\n/tmp/rds-combined-ca-bundle.pem --username \$USERNAME --password \$PASSWORD\\n</code></pre>\\n<p>容灾region:</p>\n<pre><code class=\\"lang-\\">mongo --host \$DR_mongo_host:27017 --ssl --sslCAFile\\n/tmp/rds-combined-ca-bundle.pem --username \$USERNAME --password \$PASSWORD\\n</code></pre>\\n<p><strong>2. 在主 region 插入数据</strong></p>\\n<pre><code class=\\"lang-\\">use bar;\\ndb.foo.insertOne({\\n&quot;x&quot;:1}) ;\\n</code></pre>\\n<p><strong>3. 在灾备 region 观察</strong></p>\\n<pre><code class=\\"lang-\\">use bar;\\ndb.foo.find();\\n##得到结果\\n{&quot;_id&quot;:ObjectId(9416e4a253875177a816b3d6),&quot;x&quot;:1}\\n</code></pre>\\n<p><strong>3. 在主 region 更新数据</strong></p>\\n<pre><code class=\\"lang-\\">db.foo.updateOne({\\n&quot;x&quot;:1},\\n{\\n\$set:{&quot;x&quot;:2}}\\n);\\n</code></pre>\\n<p><strong>4. 在灾备 region 观察</strong></p>\\n<pre><code class=\\"lang-\\">db.foo.find();\\n##得到结果\\n{&quot;_id&quot;:ObjectId(9416e4a253875177a816b3d6),&quot;x&quot;:2}\\n</code></pre>\\n<p><strong>5.在主 region 非监控表 exa 插入数据y=1</strong></p>\\n<pre><code class=\\"lang-\\">db.exa.insertOne({\\n&quot;y&quot;:1});\\n</code></pre>\\n<p><strong>6.在主 region 观察有哪些表,发现新增加了 exa 这张表</strong></p>\\n<pre><code class=\\"lang-\\">show tables;\\nexa\\nfoo\\n</code></pre>\\n<p><strong>7. 在灾备 region 观察,并没有 exa 出现,因为 exa 并不在我们的 watched collection 里,不会捕捉相关的改变流</strong></p>\\n<pre><code class=\\"lang-\\">show tables;\\nfoo\\n</code></pre>\\n<h4><a id=\\"8_regionfoox_574\\"></a><strong>8. 在主region的foo表删除x记录</strong></h4>\\n<pre><code class=\\"lang-\\">db.foo.deleteOne({\\n&quot;x&quot;:2}) ;\\n\\n##观察得到结果,主 region DocumentDB foo 表已被清空\\ndb.foo.find();\\n##得到结果为空\\n</code></pre>\\n<p><strong>9. 在灾备 region 验证 foo 表内容</strong></p>\\n<pre><code class=\\"lang-\\">db.foo.find();\\n##得到结果\\n{&quot;_id&quot;:ObjectId(9416e4a253875177a816b3d6),&quot;x&quot;:2}\\n##删除操作被拦截\\n</code></pre>\\n<p><strong>10.下载 S3 中的文件,并打开,其中内容为</strong></p>\\n<pre><code class=\\"lang-\\">{&quot;_id&quot;:&quot;ObjectId(9416e4a253875177a816b3d6)&quot;, &quot;operation&quot;:&quot;delete&quot;, &quot;timestamp&quot;:1658233222,&quot;timestampReadable&quot;:&quot;2022-07-19 20:20:22&quot;, &quot;db&quot;:&quot;bar&quot;,&quot;coll&quot;:&quot;foo&quot;}\\n##验证了本条 delete 命令被拦截并保存在 S3 中。\\n</code></pre>\\n<h3><a id=\\"_595\\"></a><strong>总结</strong></h3>\\n<p>我们在此文中,使用了 MSK 来异步保存 DocumentDB 的 insert/update 改变流,拦截 delete 类型的改变流存储在 S3 中备查。如果需要进一步对删除事件做出分析,可以引入 Amazon Glue 与Amazon Athena 对存储于 S3 中的日志文件即席查询。MSK 中的改变流事件,我们将其应用在灾备区域的 DocumentDB,做到数据只增不减,避免主 region 的数据库因为意外误操作导致的数据损失或者高时间成本数据恢复操作。</p>\n<h3><a id=\\"_598\\"></a><strong>参考资源</strong></h3>\\n<p>Amazon Linux 2 上使用 Boto 3 库创建 Python 3 虚拟环境</p>\n<p><a href=\\"https://aws.amazon.com/cn/premiumsupport/knowledge-center/ec2-linux-python3-boto3/#:~:text=Install%20Python%203%20for%20Amazon%20Linux%202%201.,be%20required%20to%20create%20the%20Python%203%20environment\\" target=\\"_blank\\">https://aws.amazon.com/cn/premiumsupport/knowledge-center/ec2-linux-python3-boto3/#:~:text=Install%20Python%203%20for%20Amazon%20Linux%202%201.,be%20required%20to%20create%20the%20Python%203%20environment</a></p>\\n<p>创建MSK的Topic</p>\n<p><a href=\\"https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-topic.html\\" target=\\"_blank\\">https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-topic.html</a></p>\\n<h3><a id=\\"_606\\"></a>本篇作者</h3>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/ce300775d8da49a588085c229be6fe02_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>付晓明</strong><br />\\nAmazon 解决方案架构师,负责云计算解决方案的咨询与架构设计,同时致力于数据库,边缘计算方面的研究和推广。在加入亚马逊云科技之前曾在金融行业IT部门负责互联网券商架构的设计,对分布式,高并发,中间件等具有丰富经验。</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/82fe36177e8c43f29b9f1d231698e320_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>刘冰冰</strong><br />\\nAmazon 数据库解决方案架构师,负责基于Amazon 的数据库解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在加入Amazon 之前曾在Oracle工作多年,在数据库云规划、设计运维调优、DR解决方案、大数据和数仓以及企业应用等方面有丰富的经验。</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭