Sink Amazon Kinesis Data Analytics Apache Flink output to Amazon Keyspaces using Apache Cassandra Connector

海外精选
海外精选的内容汇集了全球优质的亚马逊云科技相关技术内容。同时,内容中提到的“AWS” 是 “Amazon Web Services” 的缩写,在此网站不作为商标展示。
0
0
{"value":"[Amazon Keyspaces](https://aws.amazon.com/keyspaces/) (for Apache Cassandra) is a scalable, highly available, and managed Apache Cassandra–compatible database service. With [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) you don’t have to provision, patch, or manage servers, and you don’t have to install, maintain, or operate software. [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) is serverless, so you only pay for the resources that you use. You can use [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) to store large volumes of data, such as entries in a log file or the message history for a chat application as [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) tables scale in response to actual application traffic, with virtually unlimited throughput and storage. You can also use [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) to store information about devices for Internet of Things (IoT) applications or player profiles for games.\n\nA popular use case in the wind energy sector is to protect wind turbines from wind speed. Engineers and analysts often want to see real-time aggregated wind turbine speed data to analyze the current situation out in the field. Furthermore, they need access to historical aggregated wind turbine speed data to build machine learning (ML) models which can help them take preventative actions on wind turbines. Customers often ingest high-velocity IoT data into [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/) and [use Amazon Kinesis Data Analytics](https://aws.amazon.com/kinesis/data-analytics/) , [AWS Lambda](https://aws.amazon.com/cn/lambda/?trk=cndc-detail),or [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Client Library (KCL) applications to aggregate IoT data in real-time and store it in [Amazon Keyspaces](https://aws.amazon.com/keyspaces/), [Amazon DynamoDB,](https://aws.amazon.com/dynamodb/) or [Amazon Timestream](https://aws.amazon.com/timestream).\n\nIn this post, we demonstrate how to aggregate sensor data using [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Analytics and persist aggregated sensor data in to [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) using Apache Flink’s [Apache Cassandra Connector](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/cassandra/).\n\n\n### **Architecture**\n\n\n![image.png](https://dev-media.amazoncloud.cn/fb1db23f4fb442cb8361b6368014122e_image.png)\n\nIn the architecture diagram above, Lambda simulates wind speed sensor data and ingests sensor data into [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Stream. [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Analytics Apache Flink application reads wind speed sensor data from [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Stream in real-time and aggregates wind speed sensor data using a five minutes tumbling window and storing aggregated wind speed sensor data into [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) table. Aggregated wind speed sensor data stored in [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) can be used by engineers and analysts to review real-time dashboards or to perform historical analysis on specific wind turbine.\n\n\n### **Deploying resources using [AWS CloudFormation](https://aws.amazon.com/cn/cloudformation/?trk=cndc-detail)**\n\n\nAfter you sign in to your AWS account, launch the [AWS CloudFormation](https://aws.amazon.com/cloudformation/) template by choosing Launch Stack:\n\n![image.png](https://dev-media.amazoncloud.cn/9dcdf76161514c778f8b598c6aa9d323_image.png)\n\nThe CloudFormation template configures the following resources in your account:\n\n- One Lambda function which simulates wind turbine data\n- One [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Stream\n- One [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Analytics Apache Flink application\n- An [AWS Identity and Access Management](https://aws.amazon.com/cn/iam/?trk=cndc-detail) (IAM) role (service execution role) for [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Analytics Apache Flink application\n- One [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) Table: turbine_aggregated_sensor_data\n\nAfter you complete the setup, sign in to the Kinesis Data Analytics console. On the Kinesis Data Analytics applications page, choose the Streaming applications tab, where you can see the Streaming application in the ready status. Select the Streaming application, choose Run, and wait until the Streaming application is in running status. It can take a couple of minutes for the Streaming application to get into running status.\n\nNow that we have deployed all of the resources using CloudFormation template, let’s review deployed resources and how they function.\n\n\n### **Format of wind speed sensor data**\n\n\nLambda simulates wind turbine speed data every one minute and ingests it into [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Stream. Each wind turbine sensor data message consists of two attributes: turbineId and speed.\n\n```\\n{\\n \\"turbineId\\": \\"turbine-0001\\",\\n \\"speed\\": 60\\n}\\n```\n\n\n### **Schema of destination [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) table**\n\n\nWe’ll store aggregated sensor data in to destination turbine_aggregated_sensor_data [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) table. turbine_aggregated_sensor_data table has [on-demand capacity mode](https://docs.aws.amazon.com/keyspaces/latest/devguide/ReadWriteCapacityMode.html#ReadWriteCapacityMode.OnDemand) enabled. [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) (for Apache Cassandra) [on-demand capacity mode ](https://docs.aws.amazon.com/keyspaces/latest/devguide/ReadWriteCapacityMode.html#ReadWriteCapacityMode.OnDemand) is a flexible billing option capable of serving thousands of requests per second without capacity planning. This option offers pay-per-request pricing for read and write requests so that you pay only for what you use. When you choose on-demand mode, [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) can scale the throughput capacity for your table up to any previously reached traffic level instantly, and then back down when application traffic decreases. If a workload’s traffic level hits a new peak, then the service adapts rapidly to increase throughput capacity for your table.\n\n![image.png](https://dev-media.amazoncloud.cn/6c9c120becc047ffbb19f1bb22f44158_image.png)\n\n![image.png](https://dev-media.amazoncloud.cn/cd0ccdd462134a019e75aa0551acfa90_image.png)\n\n![image.png](https://dev-media.amazoncloud.cn/49d60558abfb455aa43a04414f966199_image.png)\n\n\n### **Apache Flink code to aggregate and persist data in [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) Table**\n\n\nApache Flink source code used by this post can be found on the [KeyspacesSink ](https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples/tree/master/KeyspacesSink) section of [Kinesis Data Analytics Java Examples](https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples) public git repository.\n\nThe following code snippet demonstrates how incoming wind turbine messages are getting aggregated using a five-minute tumbling window and produces a DataStream of TurbineAggregatedRecord records.\n\n```\\nDataStream<TurbineAggregatedRecord> result = input\\n.map(new WindTurbineInputMap())\\n.keyBy(t -> t.turbineId)\\n.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))\\n.reduce(new AggregateReducer())\\n.map(new AggregateMap());\\n```\n\n\nThe following code snippet demonstrates how [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) table name and column names are annotated on the TurbineAggregatedRecord class.\n\n```\\n@Table(keyspace = \\"sensor_data\\", name = \\"turbine_aggregated_sensor_data\\", readConsistency = \\"LOCAL_QUORUM\\", writeConsistency = \\"LOCAL_QUORUM\\")\\npublic class TurbineAggregatedRecord {\\n\\n@Column(name = \\"turbineid\\")\\n@PartitionKey(0)\\nprivate String turbineid = \\"\\";\\n\\n@Column(name = \\"reported_time\\")\\nprivate long reported_time = 0;\\n\\n@Column(name = \\"max_speed\\")\\nprivate long max_speed = 0;\\n\\n@Column(name = \\"min_speed\\")\\nprivate long min_speed = 0;\\n\\n@Column(name = \\"avg_speed\\")\\nprivate long avg_speed = 0;\\n```\n\nThe following code snippet demonstrates the implementation of[ Apache Cassandra Connector](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/cassandra/)to sink aggregated wind speed sensor data [TurbineAggregatedRecord](https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples/blob/master/KeyspacesSink/src/main/java/com/amazonaws/services/kinesisanalytics/TurbineAggregatedRecord.java) into [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) table. We’re using [SigV4AuthProvider ](https://docs.aws.amazon.com/keyspaces/latest/devguide/using_java_driver.html)with [Apache Cassandra Connector](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/cassandra/). The SigV4 authentication plugin lets you use IAM credentials for users or roles when connecting to [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail). Instead of requiring a user name and password, this plugin signs API requests using access keys.\n\n```\\nCassandraSink.addSink(result)\\n .setClusterBuilder(\\n new ClusterBuilder() {\\n\\n private static final long serialVersionUID = 2793938419775311824L;\\n\\n @Override\\n public Cluster buildCluster(Cluster.Builder builder) {\\n return builder\\n .addContactPoint(\\"cassandra.\\"+ region +\\".amazonaws.com\\")\\n .withPort(9142)\\n .withSSL()\\n .withAuthProvider(new SigV4AuthProvider(region))\\n .withLoadBalancingPolicy(\\n DCAwareRoundRobinPolicy\\n .builder()\\n .withLocalDc(region)\\n .build())\\n .withQueryOptions(queryOptions)\\n .build();\\n }\\n })\\n .setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})\\n .setDefaultKeyspace(\\"sensor_data\\")\\n .build();\\n```\n\n\n### **Review output in [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) Table**\n\n\nOnce [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Analytics Apache Flink application aggregates wind turbine sensor data and persists aggregated data in [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) table, we can query and review aggregated data using [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) CQL editor as illustrated in the following.\n\n```\\nselect * from sensor_data.turbine_aggregated_sensor_data\\n```\n\n![image.png](https://dev-media.amazoncloud.cn/310bc7101ab948b294ef0a62729a9364_image.png)\n\n![image.png](https://dev-media.amazoncloud.cn/596f2f1237a94371abd43a47efec2feb_image.png)\n\n\n### **Clean up**\n\n\nTo avoid incurring future charges, complete the following steps:\n\n1. Empty [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) bucket created by [AWS CloudFormation](https://aws.amazon.com/cn/cloudformation/?trk=cndc-detail) stack.\n2. Delete [AWS CloudFormation](https://aws.amazon.com/cn/cloudformation/?trk=cndc-detail) stack.\n\n\n\n### **Conclusion**\n\n\nAs you’ve learned in this post, you can build [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Analytics Apache Flink application to read sensor data from [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Streams, perform aggregations, and persist aggregated sensor data in [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) using [Apache Cassandra Connector](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/cassandra/).There are several use cases in IoT and Application development to move data quickly through the analytics pipeline and persist data in [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail).\n\nWe look forward to hearing from you about your experience. If you have questions or suggestions, please leave a comment.\n\n\n### **About the Author**\n\n\n![image.png](https://dev-media.amazoncloud.cn/a28a9f744b4d4aa1924756c37e9fd2b2_image.png)\n\n**Pratik Patel** is a Sr Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices and proactively helps in keeping customer’s AWS environments operationally healthy.","render":"<p><a href=\\"https://aws.amazon.com/keyspaces/\\" target=\\"_blank\\">Amazon Keyspaces</a> (for Apache Cassandra) is a scalable, highly available, and managed Apache Cassandra–compatible database service. With [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) you don’t have to provision, patch, or manage servers, and you don’t have to install, maintain, or operate software. [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) is serverless, so you only pay for the resources that you use. You can use [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) to store large volumes of data, such as entries in a log file or the message history for a chat application as [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) tables scale in response to actual application traffic, with virtually unlimited throughput and storage. You can also use [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) to store information about devices for Internet of Things (IoT) applications or player profiles for games.</p>\\n<p>A popular use case in the wind energy sector is to protect wind turbines from wind speed. Engineers and analysts often want to see real-time aggregated wind turbine speed data to analyze the current situation out in the field. Furthermore, they need access to historical aggregated wind turbine speed data to build machine learning (ML) models which can help them take preventative actions on wind turbines. Customers often ingest high-velocity IoT data into <a href=\\"https://aws.amazon.com/kinesis/data-streams/\\" target=\\"_blank\\">Amazon Kinesis Data Streams</a> and <a href=\\"https://aws.amazon.com/kinesis/data-analytics/\\" target=\\"_blank\\">use Amazon Kinesis Data Analytics</a> , [AWS Lambda](https://aws.amazon.com/cn/lambda/?trk=cndc-detail),or [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Client Library (KCL) applications to aggregate IoT data in real-time and store it in <a href=\\"https://aws.amazon.com/keyspaces/\\" target=\\"_blank\\">Amazon Keyspaces</a>, <a href=\\"https://aws.amazon.com/dynamodb/\\" target=\\"_blank\\">Amazon DynamoDB,</a> or <a href=\\"https://aws.amazon.com/timestream\\" target=\\"_blank\\">Amazon Timestream</a>.</p>\\n<p>In this post, we demonstrate how to aggregate sensor data using Amazon Kinesis Data Analytics and persist aggregated sensor data in to Amazon Keyspaces using Apache Flink’s <a href=\\"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/cassandra/\\" target=\\"_blank\\">Apache Cassandra Connector</a>.</p>\\n<h3><a id=\\"Architecture_7\\"></a><strong>Architecture</strong></h3>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/fb1db23f4fb442cb8361b6368014122e_image.png\\" alt=\\"image.png\\" /></p>\n<p>In the architecture diagram above, Lambda simulates wind speed sensor data and ingests sensor data into Amazon Kinesis Data Stream. Amazon Kinesis Data Analytics Apache Flink application reads wind speed sensor data from Amazon Kinesis Data Stream in real-time and aggregates wind speed sensor data using a five minutes tumbling window and storing aggregated wind speed sensor data into Amazon Keyspaces table. Aggregated wind speed sensor data stored in Amazon Keyspaces can be used by engineers and analysts to review real-time dashboards or to perform historical analysis on specific wind turbine.</p>\n<h3><a id=\\"Deploying_resources_using_AWS_CloudFormation_15\\"></a><strong>Deploying resources using AWS CloudFormation</strong></h3>\\n<p>After you sign in to your AWS account, launch the <a href=\\"https://aws.amazon.com/cloudformation/\\" target=\\"_blank\\">AWS CloudFormation</a> template by choosing Launch Stack:</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/9dcdf76161514c778f8b598c6aa9d323_image.png\\" alt=\\"image.png\\" /></p>\n<p>The CloudFormation template configures the following resources in your account:</p>\n<ul>\\n<li>One Lambda function which simulates wind turbine data</li>\n<li>One Amazon Kinesis Data Stream</li>\n<li>One Amazon Kinesis Data Analytics Apache Flink application</li>\n<li>An AWS Identity and Access Management (IAM) role (service execution role) for Amazon Kinesis Data Analytics Apache Flink application</li>\n<li>One Amazon Keyspaces Table: turbine_aggregated_sensor_data</li>\n</ul>\\n<p>After you complete the setup, sign in to the Kinesis Data Analytics console. On the Kinesis Data Analytics applications page, choose the Streaming applications tab, where you can see the Streaming application in the ready status. Select the Streaming application, choose Run, and wait until the Streaming application is in running status. It can take a couple of minutes for the Streaming application to get into running status.</p>\n<p>Now that we have deployed all of the resources using CloudFormation template, let’s review deployed resources and how they function.</p>\n<h3><a id=\\"Format_of_wind_speed_sensor_data_35\\"></a><strong>Format of wind speed sensor data</strong></h3>\\n<p>Lambda simulates wind turbine speed data every one minute and ingests it into Amazon Kinesis Data Stream. Each wind turbine sensor data message consists of two attributes: turbineId and speed.</p>\n<pre><code class=\\"lang-\\">{\\n &quot;turbineId&quot;: &quot;turbine-0001&quot;,\\n &quot;speed&quot;: 60\\n}\\n</code></pre>\\n<h3><a id=\\"Schema_of_destination_Amazon_Keyspaces_table_48\\"></a><strong>Schema of destination Amazon Keyspaces table</strong></h3>\\n<p>We’ll store aggregated sensor data in to destination turbine_aggregated_sensor_data Amazon Keyspaces table. turbine_aggregated_sensor_data table has <a href=\\"https://docs.aws.amazon.com/keyspaces/latest/devguide/ReadWriteCapacityMode.html#ReadWriteCapacityMode.OnDemand\\" target=\\"_blank\\">on-demand capacity mode</a> enabled. [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) (for Apache Cassandra) <a href=\\"https://docs.aws.amazon.com/keyspaces/latest/devguide/ReadWriteCapacityMode.html#ReadWriteCapacityMode.OnDemand\\" target=\\"_blank\\">on-demand capacity mode </a> is a flexible billing option capable of serving thousands of requests per second without capacity planning. This option offers pay-per-request pricing for read and write requests so that you pay only for what you use. When you choose on-demand mode, [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) can scale the throughput capacity for your table up to any previously reached traffic level instantly, and then back down when application traffic decreases. If a workload’s traffic level hits a new peak, then the service adapts rapidly to increase throughput capacity for your table.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/6c9c120becc047ffbb19f1bb22f44158_image.png\\" alt=\\"image.png\\" /></p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/cd0ccdd462134a019e75aa0551acfa90_image.png\\" alt=\\"image.png\\" /></p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/49d60558abfb455aa43a04414f966199_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"Apache_Flink_code_to_aggregate_and_persist_data_in_Amazon_Keyspaces_Table_60\\"></a><strong>Apache Flink code to aggregate and persist data in Amazon Keyspaces Table</strong></h3>\\n<p>Apache Flink source code used by this post can be found on the <a href=\\"https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples/tree/master/KeyspacesSink\\" target=\\"_blank\\">KeyspacesSink </a> section of <a href=\\"https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples\\" target=\\"_blank\\">Kinesis Data Analytics Java Examples</a> public git repository.</p>\\n<p>The following code snippet demonstrates how incoming wind turbine messages are getting aggregated using a five-minute tumbling window and produces a DataStream of TurbineAggregatedRecord records.</p>\n<pre><code class=\\"lang-\\">DataStream&lt;TurbineAggregatedRecord&gt; result = input\\n.map(new WindTurbineInputMap())\\n.keyBy(t -&gt; t.turbineId)\\n.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))\\n.reduce(new AggregateReducer())\\n.map(new AggregateMap());\\n</code></pre>\\n<p>The following code snippet demonstrates how Amazon Keyspaces table name and column names are annotated on the TurbineAggregatedRecord class.</p>\n<pre><code class=\\"lang-\\">@Table(keyspace = &quot;sensor_data&quot;, name = &quot;turbine_aggregated_sensor_data&quot;, readConsistency = &quot;LOCAL_QUORUM&quot;, writeConsistency = &quot;LOCAL_QUORUM&quot;)\\npublic class TurbineAggregatedRecord {\\n\\n@Column(name = &quot;turbineid&quot;)\\n@PartitionKey(0)\\nprivate String turbineid = &quot;&quot;;\\n\\n@Column(name = &quot;reported_time&quot;)\\nprivate long reported_time = 0;\\n\\n@Column(name = &quot;max_speed&quot;)\\nprivate long max_speed = 0;\\n\\n@Column(name = &quot;min_speed&quot;)\\nprivate long min_speed = 0;\\n\\n@Column(name = &quot;avg_speed&quot;)\\nprivate long avg_speed = 0;\\n</code></pre>\\n<p>The following code snippet demonstrates the implementation of<a href=\\"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/cassandra/\\" target=\\"_blank\\"> Apache Cassandra Connector</a>to sink aggregated wind speed sensor data <a href=\\"https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples/blob/master/KeyspacesSink/src/main/java/com/amazonaws/services/kinesisanalytics/TurbineAggregatedRecord.java\\" target=\\"_blank\\">TurbineAggregatedRecord</a> into [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail) table. We’re using <a href=\\"https://docs.aws.amazon.com/keyspaces/latest/devguide/using_java_driver.html\\" target=\\"_blank\\">SigV4AuthProvider </a>with <a href=\\"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/cassandra/\\" target=\\"_blank\\">Apache Cassandra Connector</a>. The SigV4 authentication plugin lets you use IAM credentials for users or roles when connecting to [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail). Instead of requiring a user name and password, this plugin signs API requests using access keys.</p>\\n<pre><code class=\\"lang-\\">CassandraSink.addSink(result)\\n .setClusterBuilder(\\n new ClusterBuilder() {\\n\\n private static final long serialVersionUID = 2793938419775311824L;\\n\\n @Override\\n public Cluster buildCluster(Cluster.Builder builder) {\\n return builder\\n .addContactPoint(&quot;cassandra.&quot;+ region +&quot;.amazonaws.com&quot;)\\n .withPort(9142)\\n .withSSL()\\n .withAuthProvider(new SigV4AuthProvider(region))\\n .withLoadBalancingPolicy(\\n DCAwareRoundRobinPolicy\\n .builder()\\n .withLocalDc(region)\\n .build())\\n .withQueryOptions(queryOptions)\\n .build();\\n }\\n })\\n .setMapperOptions(() -&gt; new Mapper.Option[] {Mapper.Option.saveNullFields(true)})\\n .setDefaultKeyspace(&quot;sensor_data&quot;)\\n .build();\\n</code></pre>\\n<h3><a id=\\"Review_output_in_Amazon_Keyspaces_Table_131\\"></a><strong>Review output in Amazon Keyspaces Table</strong></h3>\\n<p>Once Amazon Kinesis Data Analytics Apache Flink application aggregates wind turbine sensor data and persists aggregated data in Amazon Keyspaces table, we can query and review aggregated data using Amazon Keyspaces CQL editor as illustrated in the following.</p>\n<pre><code class=\\"lang-\\">select * from sensor_data.turbine_aggregated_sensor_data\\n</code></pre>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/310bc7101ab948b294ef0a62729a9364_image.png\\" alt=\\"image.png\\" /></p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/596f2f1237a94371abd43a47efec2feb_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"Clean_up_145\\"></a><strong>Clean up</strong></h3>\\n<p>To avoid incurring future charges, complete the following steps:</p>\n<ol>\\n<li>Empty Amazon S3 bucket created by AWS CloudFormation stack.</li>\n<li>Delete AWS CloudFormation stack.</li>\n</ol>\\n<h3><a id=\\"Conclusion_155\\"></a><strong>Conclusion</strong></h3>\\n<p>As you’ve learned in this post, you can build Amazon Kinesis Data Analytics Apache Flink application to read sensor data from Amazon Kinesis Data Streams, perform aggregations, and persist aggregated sensor data in Amazon Keyspaces using <a href=\\"https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/cassandra/\\" target=\\"_blank\\">Apache Cassandra Connector</a>.There are several use cases in IoT and Application development to move data quickly through the analytics pipeline and persist data in [Amazon Keyspaces](https://aws.amazon.com/cn/keyspaces/?trk=cndc-detail).</p>\\n<p>We look forward to hearing from you about your experience. If you have questions or suggestions, please leave a comment.</p>\n<h3><a id=\\"About_the_Author_163\\"></a><strong>About the Author</strong></h3>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/a28a9f744b4d4aa1924756c37e9fd2b2_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>Pratik Patel</strong> is a Sr Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices and proactively helps in keeping customer’s AWS environments operationally healthy.</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭