Stream change data to Amazon Kinesis Data Streams with Amazon DMS

海外精选
海外精选的内容汇集了全球优质的亚马逊云科技相关技术内容。同时,内容中提到的“AWS” 是 “Amazon Web Services” 的缩写,在此网站不作为商标展示。
0
0
{"value":"In this post, we discuss how to use [AWS Database Migration Service](https://aws.amazon.com/dms/) (AWS DMS) native change data capture (CDC) capabilities to stream changes into [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/).\n\nAWS DMS is a cloud service that makes it easy to migrate relational databases, data warehouses, NoSQL databases, and other types of data stores. You can use AWS DMS to migrate your data into the AWS Cloud or between combinations of cloud and on-premises setups. AWS DMS also helps you replicate ongoing changes to keep sources and targets in sync.\n\nCDC refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations.\n\nKinesis Data Streams is a fully managed streaming data service. You can continuously add various types of data such as clickstreams, application logs, and social media to a Kinesis stream from hundreds of thousands of sources. Within seconds, the data will be available for your [Kinesis applications](https://www.amazonaws.cn/en/kinesis/data-streams/faqs/#kinesisapp) to read and process from the stream.\n\nAWS DMS can do both replication and migration. Kinesis Data Streams is most valuable in the replication use case because it lets you react to replicated data changes in other integrated AWS systems.\n\nThis post is an update to the post [Use the AWS Database Migration Service to Stream Change Data to Amazon Kinesis Data Streams](https://aws.amazon.com/blogs/database/use-the-aws-database-migration-service-to-stream-change-data-to-amazon-kinesis-data-streams/). This new post includes steps required to configure AWS DMS and Kinesis Data Streams for a CDC use case. With Kinesis Data Streams as a target for AWS DMS, we make it easier for you to stream, analyze, and store CDC data. AWS DMS uses best practices to automatically collect changes from a data store and stream them to Kinesis Data Streams.\n\nWith the addition of Kinesis Data Streams as a target, we’re helping customers build data lakes and perform real-time processing on change data from your data stores. You can use AWS DMS in your data integration pipelines to replicate data in near-real time directly into Kinesis Data Streams. With this approach, you can build a decoupled and eventually consistent view of your database without having to build applications on top of a database, which is expensive. You can refer to the AWS whitepaper [AWS Cloud Data Ingestion Patterns and Practices](https://d1.awsstatic.com/whitepapers/aws-cloud-data-ingestion-patterns-practices.pdf) for more details on data ingestion patters.\n\n\n#### **AWS DMS sources for real-time change data**\n\n\nThe following diagram illustrates that AWS DMS can use many of the [most popular database engines as a source](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Task.CDC.html) for data replication to a Kinesis Data Streams target. The database source can be a self-managed engine running on an [Amazon Elastic Compute Cloud](https://aws.amazon.com/ec2/) (Amazon EC2) instance or an on-premises database, or it can be on [Amazon Relational Database Service](https://aws.amazon.com/rds/) (Amazon RDS), [Amazon Aurora](https://aws.amazon.com/rds/aurora/), or [Amazon DocumentDB (with MongoDB availability)](https://aws.amazon.com/documentdb/).\n\nKinesis Data Streams can collect, process, and store data streams at any scale in real time and write to [AWS Glue](https://aws.amazon.com/glue/?whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc), which is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. You can use [Amazon EMR](https://aws.amazon.com/emr/) for big data processing, Amazon Kinesis Data Analytics to process and analyze streaming data , [Amazon Kinesis Data Firehose](https://aws.amazon.com/kinesis/data-firehose/) to run ETL (extract, transform, and load) jobs on streaming data, and [AWS Lambda](https://aws.amazon.com/lambda/) as a serverless compute for further processing, transformation, and delivery of data for consumption.\n\nYou can store the data in a data warehouse like [Amazon Redshift](https://aws.amazon.com/redshift/), which is a cloud-scale data warehouse, and in an [Amazon Simple Storage Service](http://aws.amazon.com/s3) (Amazon S3) data lake for consumption. You can use Kinesis Data Firehose to capture the data streams and load the data into S3 buckets for further analytics.\n\nOnce the data is available in Kinesis Data Streams targets (as shown in the following diagram), you can visualize it using [Amazon QuickSight](https://aws.amazon.com/quicksight/); run ad hoc queries using [Amazon Athena](https://aws.amazon.com/athena/?whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc); access, process, and analyze it using an [Amazon SageMaker notebook instance](https://docs.aws.amazon.com/sagemaker/latest/dg/nbi.html); and efficiently query and retrieve structured and semi-structured data from files in Amazon S3 without having to load the data into Amazon Redshift tables using [Amazon Redshift Spectrum](https://docs.aws.amazon.com/redshift/latest/dg/c-using-spectrum.html).\n\n![image.png](1)\n\n#### **Solution overview**\n\n\nIn this post, we describe how to use AWS DMS to load data from a database to Kinesis Data Streams in real time. We use a SQL Server database as example, but other databases like Oracle, Microsoft Azure SQL, PostgreSQL, MySQL, SAP ASE, MongoDB, Amazon DocumentDB, and IBM DB2 also support this configuration.\n\nYou can use AWS DMS to capture data changes on the database and then send this data to Kinesis Data Streams. After the streams are ingested in Kinesis Data Streams, they can be consumed by different services like Lambda, Kinesis Data Analytics, Kinesis Data Firehose, and custom consumers using the Kinesis Client Library (KCL) or the AWS SDK.\n\nThe following are some use cases that can use AWS DMS and Kinesis Data Streams:\n\n- **Triggering real-time event-driven applications** – This use case integrates Lambda and [Amazon Simple Notification Service](http://aws.amazon.com/sns) (Amazon SNS).\n- **Simplifying and decoupling applications** – For example, moving from monolith to microservices. This solution integrates Lambda and [Amazon API Gateway](https://aws.amazon.com/api-gateway).\n- **Cache invalidation, and updating or rebuilding indexes** – Integrates [Amazon OpenSearch Service](https://aws.amazon.com/opensearch-service/) (successor to Amazon Elasticsearch Service) and [Amazon DynamoDB](https://aws.amazon.com/dynamodb/).\n- **Data integration across multiple heterogeneous systems** – This solution sends data to DynamoDB or another data store.\n- **Aggregating data and pushing it to downstream system** – This solution uses Kinesis Data Analytics to analyze and integrate different sources and load the results in another data store.\n\nTo facilitate the understanding of the integration between AWS DMS, Kinesis Data Streams, and Kinesis Data Firehose, we have defined a business case that you can solve. In this use case, you are the data engineer of an energy company. This company uses [Amazon Relational Database Service](http://aws.amazon.com/rds) (Amazon RDS) to store their end customer information, billing information, and also electric meter and gas usage data. Amazon RDS is their core transaction data store.\n\nYou run a batch job weekly to collect all the transactional data and send it to the data lake for reporting, forecasting, and even sending billing information to customers. You also have a trigger-based system to send emails and SMS periodically to the customer about their electricity usage and monthly billing information.\n\nBecause the company has millions of customers, processing massive amounts of data every day and sending emails or SMS was slowing down the core transactional system. Additionally, running weekly batch jobs for analytics wasn’t giving accurate and latest results for the forecasting you want to do on customer gas and electricity usage. Initially, your team was considering rebuilding the entire platform and avoiding all those issues, but the core application is complex in design, and running in production for many years and rebuilding the entire platform will take years and cost millions.\n\nSo, you took a new approach. Instead of running batch jobs on the core transactional database, you started capturing data changes with AWS DMS and sending that data to Kinesis Data Streams. Then you use Lambda to listen to a particular data stream and generate emails or SMS using Amazon SNS to send to the customer (for example, sending monthly billing information or notifying when their electricity or gas usage is higher than normal). You also use Kinesis Data Firehose to send all transaction data to the data lake, so your company can run forecasting immediately and accurately.\n\nThe following diagram illustrates the architecture.\n\n![image.png](2)\n\nIn the following steps, you configure your database to replicate changes to Kinesis Data Streams, using AWS DMS. Additionally, you configure Kinesis Data Firehose to load data from Kinesis Data Streams to Amazon S3.\n\nIt’s simple to set up Kinesis Data Streams as a change data target in AWS DMS and start streaming data. For more information, see [Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html).\n\nTo get started, you first create a Kinesis data stream in Kinesis Data Streams, then an [AWS Identity and Access Management](http://aws.amazon.com/iam) (IAM) role with minimal access as described in [Prerequisites for using a Kinesis data stream as a target for AWS Database Migration Service](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.Prerequisites). After you define your IAM policy and role, you set up your [source and target endpoints](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Endpoints.html) and [replication instance](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReplicationInstance.html) in AWS DMS. Your source is the database that you want to move data from, and the target is the database that you’re moving data to. In our case, the source database is a SQL Server database on Amazon RDS, and the target is the Kinesis data stream. The replication instance processes the migration tasks and requires access to the source and target endpoints inside your VPC.\n\nA Kinesis delivery stream (created in Kinesis Data Firehose) is used to load the records from the database to the data lake hosted on Amazon S3. Kinesis Data Firehose can load data also to Amazon Redshift, Amazon OpenSearch Service, an HTTP endpoint, Datadog, Dynatrace, LogicMonitor, MongoDB Cloud, New Relic, Splunk, and Sumo Logic.\n\n\n#### **Configure the source database**\n\n\nFor testing purposes, we use the database ```democustomer```, which is hosted on a SQL Server on Amazon RDS. Use the following command and script to create the database and table, and insert 10 records:\n\n```\ncreate database democustomer\n\nuse democustomer\n\ncreate table invoices (\n\tinvoice_id INT,\n\tcustomer_id INT,\n\tbilling_date DATE,\n\tdue_date DATE,\n\tbalance INT,\n\tmonthly_kwh_use INT,\n\ttotal_amount_due VARCHAR(50)\n);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (1, 1219578, '4/15/2022', '4/30/2022', 25, 6, 28);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (2, 1365142, '4/15/2022', '4/28/2022', null, 41, 20.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (3, 1368834, '4/15/2022', '5/5/2022', null, 31, 15.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (4, 1226431, '4/15/2022', '4/28/2022', null, 47, 23.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (5, 1499194, '4/15/2022', '5/1/2022', null, 39, 19.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (6, 1221240, '4/15/2022', '5/2/2022', null, 38, 19);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (7, 1235442, '4/15/2022', '4/27/2022', null, 50, 25);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (8, 1306894, '4/15/2022', '5/2/2022', null, 16, 8);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (9, 1343570, '4/15/2022', '5/3/2022', null, 39, 19.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (10, 1465198, '4/15/2022', '5/4/2022', null, 47, 23.5);\n```\n\nTo capture the new records added to the table, enable MS-CDC (Microsoft Change Data Capture) using the following commands at the database level (replace ```SchemaName``` and ```TableName```). This is required if ongoing replication is configured on the task migration in AWS DMS.\n\n```\nEXEC msdb.dbo.rds_cdc_enable_db 'democustomer';\nGO\nEXECUTE sys.sp_cdc_enable_table @source_schema = N'SchemaName', @source_name =N'TableName', @role_name = NULL;\nGO\nEXEC sys.sp_cdc_change_job @job_type = 'capture' ,@pollinginterval = 3599;\nGO\n```\n\nYou can use ongoing replication (CDC) for a self-managed SQL Server database on premises or on [Amazon Elastic Compute Cloud](http://aws.amazon.com/ec2) (Amazon EC2), or a cloud database such as Amazon RDS or an Azure SQL managed instance. SQL Server must be configured for full backups, and you must perform a backup before beginning to replicate data.\n\nFor more information, see [Using a Microsoft SQL Server database as a source for AWS DMS](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.SQLServer.html#CHAP_Source.SQLServer.Configuration).\n\n\n#### **Configure the Kinesis data stream**\n\n\nNext, we configure our Kinesis data stream. For full instructions, see [Creating a Stream via the AWS Management Console](https://docs.aws.amazon.com/streams/latest/dev/how-do-i-create-a-stream.html). Complete the following steps:\n\n1. On the Kinesis Data Streams console, choose **Create data stream**.\n2. For **Data stream name**¸ enter a name.\n3. For **Capacity mode**, select **On-demand**.When you choose on-demand capacity mode, Kinesis Data Streams instantly accommodates your workloads as they ramp up or down. For more information, refer to [Choosing the Data Stream Capacity Mode](https://docs.aws.amazon.com/streams/latest/dev/how-do-i-size-a-stream.html).\n\n![image.png](3)\n\n4. Choose **Create data stream**.\n5. When the data stream is active, copy the ARN.\n\n![image.png](4)\n\n\n#### **Configure the IAM policy and role**\n\n\nNext, you configure your IAM policy and role.\n\n1. On the IAM console, choose **Policies** in the navigation pane.\n2. Choose **Create policy**.\n3. Select **JSON** and use the following policy as a template, replacing the data stream ARN:\n\n```\n{\n \"Version\": \"2012-10-17\",\n \"Statement\": [\n {\n \"Effect\": \"Allow\",\n \"Action\": [\n \"kinesis:PutRecord\",\n \"kinesis:PutRecords\",\n \"kinesis:DescribeStream\"\n ],\n \"Resource\": \"<streamArn>\"\n }\n ]\n}\n\n```\n\n4. In the navigation pane, choose **Roles**.\n5. Choose **Create role**.\n6. Select **AWS DMS**, then choose **Next: Permissions**.\n7. Select the policy you created.\n8. Assign a role name and then choose **Create role**.\n\n\n#### **Configure the Kinesis delivery stream**\n\n\nWe use a Kinesis delivery stream to load the information from the Kinesis data stream to Amazon S3. To configure the delivery stream, complete the following steps:\n\n1. On the Kinesis console, choose **Delivery streams**.\n2. Choose **Create delivery stream**.\n3. For **Source**, choose **Amazon Kinesis Data Streams**.\n4. For **Destination**, choose **Amazon S3**.\n5. For **Kinesis data stream**, enter the ARN of the data stream.\n\n![image.png](5)\n\n6. For **Delivery stream name**, enter a name.\n\n![image.png](6)\n\n7. Leave the transform and convert options at their defaults.\n\n![image.png](7)\n\n8. Provide the destination bucket and specify the bucket prefixes for the events and errors.\n\n![image.png](8)\n\n9. Under **Buffer hints, compression and encryption**, change the buffer size to 1 MB and buffer interval to 60 seconds.\n\n![image.png](9)\n\n10. Leave the other configurations at their defaults.\n\n\n#### **Configure AWS DMS**\n\n\nWe use an AWS DMS instance to connect to the SQL Server database and then replicate the table and future transactions to a Kinesis data stream. In this section, we create a replication instance, source endpoint, target endpoint, and migration task. For more information about endpoints, refer to [Creating source and target endpoints](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Endpoints.Creating.html).\n\n1. [Create a replication instance](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReplicationInstance.Creating.html) in a VPC with connectivity to the SQL Server database and associate a security group with enough permissions to access to the database.\n2. On the AWS DMS console, choose **Endpoints** in the navigation pane.\n3. Choose **Create endpoint**.\n4. Select **Source endpoint**.\n\n![image.png](10)\n\n5. For **Endpoint identifier**, enter a label for the endpoint.\n6. For **Source engine**, choose **Microsoft SQL Server**.\n7. For **Access to endpoint database**, select **Provide access information manually**.\n8. Enter the endpoint database information.\n\n![image.png](11)\n\n9. Test the connectivity to the source endpoint.\nNow we create the target endpoint.\n10. On the AWS DMS console, choose **Endpoints** in the navigation pane.\n11. Choose **Create endpoint**.\n12. Select **Target endpoint**.\n\n![image.png](12)\n\n13. For **Endpoint identifier**, enter a label for the endpoint.\n14. For **Target engine**, choose **Amazon Kinesis**.\n15. Provide the AWS DMS service role ARN and the data stream ARN.\n\n![image.png](13)\n\n16. Test the connectivity to the target endpoint.\n\n![image.png](14)\n\nThe final step is to create a database migration task. This task replicates the existing data from the SQL Server table to the data stream and replicates the ongoing changes. For more information, see [Creating a task](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.Creating.html).\n\n17. On the AWS DMS console, choose **Database migration tasks**.\n18. Choose **Create task**.\n19. For **Task identifier**, enter a name for your task.\n20. For **Replication instance**, choose your instance.\n21. Choose the source and target database endpoints you created.\n22. For **Migration type**, choose **Migrate existing data and replicate ongoing changes**.\n\n![image.png](15)\n\n23. In **Task settings**, use the default settings.\n24. In **Table mappings**, add a new selection rule and specify the schema and table name of the SQL Server database. In this case, our schema name is ```dbo``` and the table name is ```invoices```.\n25. For **Action**, choose **Include**.\n\n![image.png](16)\n\nWhen the task is ready, the migration starts.\n\n![image.png](17)\n\nAfter the data has been loaded, the table statistics are updated and you can see the 10 records created initially.\n\n![image.png](18)\n\nAs the Kinesis delivery stream reads the data from Kinesis Data Streams and loads it in Amazon S3, the records are available in the bucket you defined previously.\n\n![image.png](19)\n\nTo check that AWS DMS ongoing replication and CDC are working, use [this script](https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-1732/invoices.sql) to add 1,000 records to the table.\n\nYou can see 1,000 inserts on the **Table statistics** tab for the database migration task.\n\n![image.png](20)\n\nAfter about 1 minute, you can see the records in the S3 bucket.\n\n![image.png](21)\n\nAt this point the replication has been activated, and a Lambda function can start consuming the data streams to send emails SMS to the customers through Amazon SNS. More information, refer to [Using AWS Lambda with Amazon Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html).\n\n#### **Conclusion**\n\nWith Kinesis Data Streams as an AWS DMS target, you now have a powerful way to stream change data from a database directly into a Kinesis data stream. You can use this method to stream change data from any sources supported by AWS DMS to perform real-time data processing. Happy streaming!\n\nIf you have any questions or suggestions, please leave a comment.\n\n#### **About the Authors**\n\n\n![image.png](22)\n\n**Luis Eduardo Torres** is a Solutions Architect at AWS based in Bogotá, Colombia. He helps companies to build their business using the AWS cloud platform. He has a great interest in Analytics and has been leading the Analytics track of AWS Podcast in Spanish.\n\n![image.png](23)\n\n**Sukhomoy Basak** is a Solutions Architect at Amazon Web Services, with a passion for Data and Analytics solutions. Sukhomoy works with enterprise customers to help them architect, build, and scale applications to achieve their business outcomes.\n\n","render":"<p>In this post, we discuss how to use <a href=\"https://aws.amazon.com/dms/\" target=\"_blank\">AWS Database Migration Service</a> (AWS DMS) native change data capture (CDC) capabilities to stream changes into <a href=\"https://aws.amazon.com/kinesis/data-streams/\" target=\"_blank\">Amazon Kinesis Data Streams</a>.</p>\n<p>AWS DMS is a cloud service that makes it easy to migrate relational databases, data warehouses, NoSQL databases, and other types of data stores. You can use AWS DMS to migrate your data into the AWS Cloud or between combinations of cloud and on-premises setups. AWS DMS also helps you replicate ongoing changes to keep sources and targets in sync.</p>\n<p>CDC refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations.</p>\n<p>Kinesis Data Streams is a fully managed streaming data service. You can continuously add various types of data such as clickstreams, application logs, and social media to a Kinesis stream from hundreds of thousands of sources. Within seconds, the data will be available for your <a href=\"https://www.amazonaws.cn/en/kinesis/data-streams/faqs/#kinesisapp\" target=\"_blank\">Kinesis applications</a> to read and process from the stream.</p>\n<p>AWS DMS can do both replication and migration. Kinesis Data Streams is most valuable in the replication use case because it lets you react to replicated data changes in other integrated AWS systems.</p>\n<p>This post is an update to the post <a href=\"https://aws.amazon.com/blogs/database/use-the-aws-database-migration-service-to-stream-change-data-to-amazon-kinesis-data-streams/\" target=\"_blank\">Use the AWS Database Migration Service to Stream Change Data to Amazon Kinesis Data Streams</a>. This new post includes steps required to configure AWS DMS and Kinesis Data Streams for a CDC use case. With Kinesis Data Streams as a target for AWS DMS, we make it easier for you to stream, analyze, and store CDC data. AWS DMS uses best practices to automatically collect changes from a data store and stream them to Kinesis Data Streams.</p>\n<p>With the addition of Kinesis Data Streams as a target, we’re helping customers build data lakes and perform real-time processing on change data from your data stores. You can use AWS DMS in your data integration pipelines to replicate data in near-real time directly into Kinesis Data Streams. With this approach, you can build a decoupled and eventually consistent view of your database without having to build applications on top of a database, which is expensive. You can refer to the AWS whitepaper <a href=\"https://d1.awsstatic.com/whitepapers/aws-cloud-data-ingestion-patterns-practices.pdf\" target=\"_blank\">AWS Cloud Data Ingestion Patterns and Practices</a> for more details on data ingestion patters.</p>\n<h4><a id=\"AWS_DMS_sources_for_realtime_change_data_15\"></a><strong>AWS DMS sources for real-time change data</strong></h4>\n<p>The following diagram illustrates that AWS DMS can use many of the <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Task.CDC.html\" target=\"_blank\">most popular database engines as a source</a> for data replication to a Kinesis Data Streams target. The database source can be a self-managed engine running on an <a href=\"https://aws.amazon.com/ec2/\" target=\"_blank\">Amazon Elastic Compute Cloud</a> (Amazon EC2) instance or an on-premises database, or it can be on <a href=\"https://aws.amazon.com/rds/\" target=\"_blank\">Amazon Relational Database Service</a> (Amazon RDS), <a href=\"https://aws.amazon.com/rds/aurora/\" target=\"_blank\">Amazon Aurora</a>, or <a href=\"https://aws.amazon.com/documentdb/\" target=\"_blank\">Amazon DocumentDB (with MongoDB availability)</a>.</p>\n<p>Kinesis Data Streams can collect, process, and store data streams at any scale in real time and write to <a href=\"https://aws.amazon.com/glue/?whats-new-cards.sort-by=item.additionalFields.postDateTime&amp;whats-new-cards.sort-order=desc\" target=\"_blank\">AWS Glue</a>, which is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. You can use <a href=\"https://aws.amazon.com/emr/\" target=\"_blank\">Amazon EMR</a> for big data processing, Amazon Kinesis Data Analytics to process and analyze streaming data , <a href=\"https://aws.amazon.com/kinesis/data-firehose/\" target=\"_blank\">Amazon Kinesis Data Firehose</a> to run ETL (extract, transform, and load) jobs on streaming data, and <a href=\"https://aws.amazon.com/lambda/\" target=\"_blank\">AWS Lambda</a> as a serverless compute for further processing, transformation, and delivery of data for consumption.</p>\n<p>You can store the data in a data warehouse like <a href=\"https://aws.amazon.com/redshift/\" target=\"_blank\">Amazon Redshift</a>, which is a cloud-scale data warehouse, and in an <a href=\"http://aws.amazon.com/s3\" target=\"_blank\">Amazon Simple Storage Service</a> (Amazon S3) data lake for consumption. You can use Kinesis Data Firehose to capture the data streams and load the data into S3 buckets for further analytics.</p>\n<p>Once the data is available in Kinesis Data Streams targets (as shown in the following diagram), you can visualize it using <a href=\"https://aws.amazon.com/quicksight/\" target=\"_blank\">Amazon QuickSight</a>; run ad hoc queries using <a href=\"https://aws.amazon.com/athena/?whats-new-cards.sort-by=item.additionalFields.postDateTime&amp;whats-new-cards.sort-order=desc\" target=\"_blank\">Amazon Athena</a>; access, process, and analyze it using an <a href=\"https://docs.aws.amazon.com/sagemaker/latest/dg/nbi.html\" target=\"_blank\">Amazon SageMaker notebook instance</a>; and efficiently query and retrieve structured and semi-structured data from files in Amazon S3 without having to load the data into Amazon Redshift tables using <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/c-using-spectrum.html\" target=\"_blank\">Amazon Redshift Spectrum</a>.</p>\n<p><img src=\"1\" alt=\"image.png\" /></p>\n<h4><a id=\"Solution_overview_28\"></a><strong>Solution overview</strong></h4>\n<p>In this post, we describe how to use AWS DMS to load data from a database to Kinesis Data Streams in real time. We use a SQL Server database as example, but other databases like Oracle, Microsoft Azure SQL, PostgreSQL, MySQL, SAP ASE, MongoDB, Amazon DocumentDB, and IBM DB2 also support this configuration.</p>\n<p>You can use AWS DMS to capture data changes on the database and then send this data to Kinesis Data Streams. After the streams are ingested in Kinesis Data Streams, they can be consumed by different services like Lambda, Kinesis Data Analytics, Kinesis Data Firehose, and custom consumers using the Kinesis Client Library (KCL) or the AWS SDK.</p>\n<p>The following are some use cases that can use AWS DMS and Kinesis Data Streams:</p>\n<ul>\n<li><strong>Triggering real-time event-driven applications</strong> – This use case integrates Lambda and <a href=\"http://aws.amazon.com/sns\" target=\"_blank\">Amazon Simple Notification Service</a> (Amazon SNS).</li>\n<li><strong>Simplifying and decoupling applications</strong> – For example, moving from monolith to microservices. This solution integrates Lambda and <a href=\"https://aws.amazon.com/api-gateway\" target=\"_blank\">Amazon API Gateway</a>.</li>\n<li><strong>Cache invalidation, and updating or rebuilding indexes</strong> – Integrates <a href=\"https://aws.amazon.com/opensearch-service/\" target=\"_blank\">Amazon OpenSearch Service</a> (successor to Amazon Elasticsearch Service) and <a href=\"https://aws.amazon.com/dynamodb/\" target=\"_blank\">Amazon DynamoDB</a>.</li>\n<li><strong>Data integration across multiple heterogeneous systems</strong> – This solution sends data to DynamoDB or another data store.</li>\n<li><strong>Aggregating data and pushing it to downstream system</strong> – This solution uses Kinesis Data Analytics to analyze and integrate different sources and load the results in another data store.</li>\n</ul>\n<p>To facilitate the understanding of the integration between AWS DMS, Kinesis Data Streams, and Kinesis Data Firehose, we have defined a business case that you can solve. In this use case, you are the data engineer of an energy company. This company uses <a href=\"http://aws.amazon.com/rds\" target=\"_blank\">Amazon Relational Database Service</a> (Amazon RDS) to store their end customer information, billing information, and also electric meter and gas usage data. Amazon RDS is their core transaction data store.</p>\n<p>You run a batch job weekly to collect all the transactional data and send it to the data lake for reporting, forecasting, and even sending billing information to customers. You also have a trigger-based system to send emails and SMS periodically to the customer about their electricity usage and monthly billing information.</p>\n<p>Because the company has millions of customers, processing massive amounts of data every day and sending emails or SMS was slowing down the core transactional system. Additionally, running weekly batch jobs for analytics wasn’t giving accurate and latest results for the forecasting you want to do on customer gas and electricity usage. Initially, your team was considering rebuilding the entire platform and avoiding all those issues, but the core application is complex in design, and running in production for many years and rebuilding the entire platform will take years and cost millions.</p>\n<p>So, you took a new approach. Instead of running batch jobs on the core transactional database, you started capturing data changes with AWS DMS and sending that data to Kinesis Data Streams. Then you use Lambda to listen to a particular data stream and generate emails or SMS using Amazon SNS to send to the customer (for example, sending monthly billing information or notifying when their electricity or gas usage is higher than normal). You also use Kinesis Data Firehose to send all transaction data to the data lake, so your company can run forecasting immediately and accurately.</p>\n<p>The following diagram illustrates the architecture.</p>\n<p><img src=\"2\" alt=\"image.png\" /></p>\n<p>In the following steps, you configure your database to replicate changes to Kinesis Data Streams, using AWS DMS. Additionally, you configure Kinesis Data Firehose to load data from Kinesis Data Streams to Amazon S3.</p>\n<p>It’s simple to set up Kinesis Data Streams as a change data target in AWS DMS and start streaming data. For more information, see <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html\" target=\"_blank\">Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service</a>.</p>\n<p>To get started, you first create a Kinesis data stream in Kinesis Data Streams, then an <a href=\"http://aws.amazon.com/iam\" target=\"_blank\">AWS Identity and Access Management</a> (IAM) role with minimal access as described in <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.Prerequisites\" target=\"_blank\">Prerequisites for using a Kinesis data stream as a target for AWS Database Migration Service</a>. After you define your IAM policy and role, you set up your <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Endpoints.html\" target=\"_blank\">source and target endpoints</a> and <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReplicationInstance.html\" target=\"_blank\">replication instance</a> in AWS DMS. Your source is the database that you want to move data from, and the target is the database that you’re moving data to. In our case, the source database is a SQL Server database on Amazon RDS, and the target is the Kinesis data stream. The replication instance processes the migration tasks and requires access to the source and target endpoints inside your VPC.</p>\n<p>A Kinesis delivery stream (created in Kinesis Data Firehose) is used to load the records from the database to the data lake hosted on Amazon S3. Kinesis Data Firehose can load data also to Amazon Redshift, Amazon OpenSearch Service, an HTTP endpoint, Datadog, Dynatrace, LogicMonitor, MongoDB Cloud, New Relic, Splunk, and Sumo Logic.</p>\n<h4><a id=\"Configure_the_source_database_64\"></a><strong>Configure the source database</strong></h4>\n<p>For testing purposes, we use the database <code>democustomer</code>, which is hosted on a SQL Server on Amazon RDS. Use the following command and script to create the database and table, and insert 10 records:</p>\n<pre><code class=\"lang-\">create database democustomer\n\nuse democustomer\n\ncreate table invoices (\n\tinvoice_id INT,\n\tcustomer_id INT,\n\tbilling_date DATE,\n\tdue_date DATE,\n\tbalance INT,\n\tmonthly_kwh_use INT,\n\ttotal_amount_due VARCHAR(50)\n);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (1, 1219578, '4/15/2022', '4/30/2022', 25, 6, 28);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (2, 1365142, '4/15/2022', '4/28/2022', null, 41, 20.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (3, 1368834, '4/15/2022', '5/5/2022', null, 31, 15.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (4, 1226431, '4/15/2022', '4/28/2022', null, 47, 23.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (5, 1499194, '4/15/2022', '5/1/2022', null, 39, 19.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (6, 1221240, '4/15/2022', '5/2/2022', null, 38, 19);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (7, 1235442, '4/15/2022', '4/27/2022', null, 50, 25);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (8, 1306894, '4/15/2022', '5/2/2022', null, 16, 8);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (9, 1343570, '4/15/2022', '5/3/2022', null, 39, 19.5);\ninsert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (10, 1465198, '4/15/2022', '5/4/2022', null, 47, 23.5);\n</code></pre>\n<p>To capture the new records added to the table, enable MS-CDC (Microsoft Change Data Capture) using the following commands at the database level (replace <code>SchemaName</code> and <code>TableName</code>). This is required if ongoing replication is configured on the task migration in AWS DMS.</p>\n<pre><code class=\"lang-\">EXEC msdb.dbo.rds_cdc_enable_db 'democustomer';\nGO\nEXECUTE sys.sp_cdc_enable_table @source_schema = N'SchemaName', @source_name =N'TableName', @role_name = NULL;\nGO\nEXEC sys.sp_cdc_change_job @job_type = 'capture' ,@pollinginterval = 3599;\nGO\n</code></pre>\n<p>You can use ongoing replication (CDC) for a self-managed SQL Server database on premises or on <a href=\"http://aws.amazon.com/ec2\" target=\"_blank\">Amazon Elastic Compute Cloud</a> (Amazon EC2), or a cloud database such as Amazon RDS or an Azure SQL managed instance. SQL Server must be configured for full backups, and you must perform a backup before beginning to replicate data.</p>\n<p>For more information, see <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.SQLServer.html#CHAP_Source.SQLServer.Configuration\" target=\"_blank\">Using a Microsoft SQL Server database as a source for AWS DMS</a>.</p>\n<h4><a id=\"Configure_the_Kinesis_data_stream_111\"></a><strong>Configure the Kinesis data stream</strong></h4>\n<p>Next, we configure our Kinesis data stream. For full instructions, see <a href=\"https://docs.aws.amazon.com/streams/latest/dev/how-do-i-create-a-stream.html\" target=\"_blank\">Creating a Stream via the AWS Management Console</a>. Complete the following steps:</p>\n<ol>\n<li>On the Kinesis Data Streams console, choose <strong>Create data stream</strong>.</li>\n<li>For <strong>Data stream name</strong>¸ enter a name.</li>\n<li>For <strong>Capacity mode</strong>, select <strong>On-demand</strong>.When you choose on-demand capacity mode, Kinesis Data Streams instantly accommodates your workloads as they ramp up or down. For more information, refer to <a href=\"https://docs.aws.amazon.com/streams/latest/dev/how-do-i-size-a-stream.html\" target=\"_blank\">Choosing the Data Stream Capacity Mode</a>.</li>\n</ol>\n<p><img src=\"3\" alt=\"image.png\" /></p>\n<ol start=\"4\">\n<li>Choose <strong>Create data stream</strong>.</li>\n<li>When the data stream is active, copy the ARN.</li>\n</ol>\n<p><img src=\"4\" alt=\"image.png\" /></p>\n<h4><a id=\"Configure_the_IAM_policy_and_role_128\"></a><strong>Configure the IAM policy and role</strong></h4>\n<p>Next, you configure your IAM policy and role.</p>\n<ol>\n<li>On the IAM console, choose <strong>Policies</strong> in the navigation pane.</li>\n<li>Choose <strong>Create policy</strong>.</li>\n<li>Select <strong>JSON</strong> and use the following policy as a template, replacing the data stream ARN:</li>\n</ol>\n<pre><code class=\"lang-\">{\n &quot;Version&quot;: &quot;2012-10-17&quot;,\n &quot;Statement&quot;: [\n {\n &quot;Effect&quot;: &quot;Allow&quot;,\n &quot;Action&quot;: [\n &quot;kinesis:PutRecord&quot;,\n &quot;kinesis:PutRecords&quot;,\n &quot;kinesis:DescribeStream&quot;\n ],\n &quot;Resource&quot;: &quot;&lt;streamArn&gt;&quot;\n }\n ]\n}\n\n</code></pre>\n<ol start=\"4\">\n<li>In the navigation pane, choose <strong>Roles</strong>.</li>\n<li>Choose <strong>Create role</strong>.</li>\n<li>Select <strong>AWS DMS</strong>, then choose <strong>Next: Permissions</strong>.</li>\n<li>Select the policy you created.</li>\n<li>Assign a role name and then choose <strong>Create role</strong>.</li>\n</ol>\n<h4><a id=\"Configure_the_Kinesis_delivery_stream_162\"></a><strong>Configure the Kinesis delivery stream</strong></h4>\n<p>We use a Kinesis delivery stream to load the information from the Kinesis data stream to Amazon S3. To configure the delivery stream, complete the following steps:</p>\n<ol>\n<li>On the Kinesis console, choose <strong>Delivery streams</strong>.</li>\n<li>Choose <strong>Create delivery stream</strong>.</li>\n<li>For <strong>Source</strong>, choose <strong>Amazon Kinesis Data Streams</strong>.</li>\n<li>For <strong>Destination</strong>, choose <strong>Amazon S3</strong>.</li>\n<li>For <strong>Kinesis data stream</strong>, enter the ARN of the data stream.</li>\n</ol>\n<p><img src=\"5\" alt=\"image.png\" /></p>\n<ol start=\"6\">\n<li>For <strong>Delivery stream name</strong>, enter a name.</li>\n</ol>\n<p><img src=\"6\" alt=\"image.png\" /></p>\n<ol start=\"7\">\n<li>Leave the transform and convert options at their defaults.</li>\n</ol>\n<p><img src=\"7\" alt=\"image.png\" /></p>\n<ol start=\"8\">\n<li>Provide the destination bucket and specify the bucket prefixes for the events and errors.</li>\n</ol>\n<p><img src=\"8\" alt=\"image.png\" /></p>\n<ol start=\"9\">\n<li>Under <strong>Buffer hints, compression and encryption</strong>, change the buffer size to 1 MB and buffer interval to 60 seconds.</li>\n</ol>\n<p><img src=\"9\" alt=\"image.png\" /></p>\n<ol start=\"10\">\n<li>Leave the other configurations at their defaults.</li>\n</ol>\n<h4><a id=\"Configure_AWS_DMS_194\"></a><strong>Configure AWS DMS</strong></h4>\n<p>We use an AWS DMS instance to connect to the SQL Server database and then replicate the table and future transactions to a Kinesis data stream. In this section, we create a replication instance, source endpoint, target endpoint, and migration task. For more information about endpoints, refer to <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Endpoints.Creating.html\" target=\"_blank\">Creating source and target endpoints</a>.</p>\n<ol>\n<li><a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReplicationInstance.Creating.html\" target=\"_blank\">Create a replication instance</a> in a VPC with connectivity to the SQL Server database and associate a security group with enough permissions to access to the database.</li>\n<li>On the AWS DMS console, choose <strong>Endpoints</strong> in the navigation pane.</li>\n<li>Choose <strong>Create endpoint</strong>.</li>\n<li>Select <strong>Source endpoint</strong>.</li>\n</ol>\n<p><img src=\"10\" alt=\"image.png\" /></p>\n<ol start=\"5\">\n<li>For <strong>Endpoint identifier</strong>, enter a label for the endpoint.</li>\n<li>For <strong>Source engine</strong>, choose <strong>Microsoft SQL Server</strong>.</li>\n<li>For <strong>Access to endpoint database</strong>, select <strong>Provide access information manually</strong>.</li>\n<li>Enter the endpoint database information.</li>\n</ol>\n<p><img src=\"11\" alt=\"image.png\" /></p>\n<ol start=\"9\">\n<li>Test the connectivity to the source endpoint.<br />\nNow we create the target endpoint.</li>\n<li>On the AWS DMS console, choose <strong>Endpoints</strong> in the navigation pane.</li>\n<li>Choose <strong>Create endpoint</strong>.</li>\n<li>Select <strong>Target endpoint</strong>.</li>\n</ol>\n<p><img src=\"12\" alt=\"image.png\" /></p>\n<ol start=\"13\">\n<li>For <strong>Endpoint identifier</strong>, enter a label for the endpoint.</li>\n<li>For <strong>Target engine</strong>, choose <strong>Amazon Kinesis</strong>.</li>\n<li>Provide the AWS DMS service role ARN and the data stream ARN.</li>\n</ol>\n<p><img src=\"13\" alt=\"image.png\" /></p>\n<ol start=\"16\">\n<li>Test the connectivity to the target endpoint.</li>\n</ol>\n<p><img src=\"14\" alt=\"image.png\" /></p>\n<p>The final step is to create a database migration task. This task replicates the existing data from the SQL Server table to the data stream and replicates the ongoing changes. For more information, see <a href=\"https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.Creating.html\" target=\"_blank\">Creating a task</a>.</p>\n<ol start=\"17\">\n<li>On the AWS DMS console, choose <strong>Database migration tasks</strong>.</li>\n<li>Choose <strong>Create task</strong>.</li>\n<li>For <strong>Task identifier</strong>, enter a name for your task.</li>\n<li>For <strong>Replication instance</strong>, choose your instance.</li>\n<li>Choose the source and target database endpoints you created.</li>\n<li>For <strong>Migration type</strong>, choose <strong>Migrate existing data and replicate ongoing changes</strong>.</li>\n</ol>\n<p><img src=\"15\" alt=\"image.png\" /></p>\n<ol start=\"23\">\n<li>In <strong>Task settings</strong>, use the default settings.</li>\n<li>In <strong>Table mappings</strong>, add a new selection rule and specify the schema and table name of the SQL Server database. In this case, our schema name is <code>dbo</code> and the table name is <code>invoices</code>.</li>\n<li>For <strong>Action</strong>, choose <strong>Include</strong>.</li>\n</ol>\n<p><img src=\"16\" alt=\"image.png\" /></p>\n<p>When the task is ready, the migration starts.</p>\n<p><img src=\"17\" alt=\"image.png\" /></p>\n<p>After the data has been loaded, the table statistics are updated and you can see the 10 records created initially.</p>\n<p><img src=\"18\" alt=\"image.png\" /></p>\n<p>As the Kinesis delivery stream reads the data from Kinesis Data Streams and loads it in Amazon S3, the records are available in the bucket you defined previously.</p>\n<p><img src=\"19\" alt=\"image.png\" /></p>\n<p>To check that AWS DMS ongoing replication and CDC are working, use <a href=\"https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-1732/invoices.sql\" target=\"_blank\">this script</a> to add 1,000 records to the table.</p>\n<p>You can see 1,000 inserts on the <strong>Table statistics</strong> tab for the database migration task.</p>\n<p><img src=\"20\" alt=\"image.png\" /></p>\n<p>After about 1 minute, you can see the records in the S3 bucket.</p>\n<p><img src=\"21\" alt=\"image.png\" /></p>\n<p>At this point the replication has been activated, and a Lambda function can start consuming the data streams to send emails SMS to the customers through Amazon SNS. More information, refer to <a href=\"https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html\" target=\"_blank\">Using AWS Lambda with Amazon Kinesis</a>.</p>\n<h4><a id=\"Conclusion_272\"></a><strong>Conclusion</strong></h4>\n<p>With Kinesis Data Streams as an AWS DMS target, you now have a powerful way to stream change data from a database directly into a Kinesis data stream. You can use this method to stream change data from any sources supported by AWS DMS to perform real-time data processing. Happy streaming!</p>\n<p>If you have any questions or suggestions, please leave a comment.</p>\n<h4><a id=\"About_the_Authors_278\"></a><strong>About the Authors</strong></h4>\n<p><img src=\"22\" alt=\"image.png\" /></p>\n<p><strong>Luis Eduardo Torres</strong> is a Solutions Architect at AWS based in Bogotá, Colombia. He helps companies to build their business using the AWS cloud platform. He has a great interest in Analytics and has been leading the Analytics track of AWS Podcast in Spanish.</p>\n<p><img src=\"23\" alt=\"image.png\" /></p>\n<p><strong>Sukhomoy Basak</strong> is a Solutions Architect at Amazon Web Services, with a passion for Data and Analytics solutions. Sukhomoy works with enterprise customers to help them architect, build, and scale applications to achieve their business outcomes.</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭