New for Amazon Redshift – General Availability of Streaming Ingestion for Kinesis Data Streams and Managed Streaming for Apache Kafka

海外精选
re:Invent
Amazon Redshift
海外精选的内容汇集了全球优质的亚马逊云科技相关技术内容。同时,内容中提到的“AWS” 是 “Amazon Web Services” 的缩写,在此网站不作为商标展示。
0
0
{"value":"Ten years ago, just a few months after I joined AWS, [Amazon Redshift](https://aws.amazon.com/redshift/) was launched. Over the years, many features have been added to improve performance and make it easier to use. [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) now allows you to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes. More recently, [Amazon Redshift Serverless became generally available](https://aws.amazon.com/blogs/aws/amazon-redshift-serverless-now-generally-available-with-new-capabilities/) to make it easier to run and scale analytics without having to manage your data warehouse infrastructure.\n\nTo process data as quickly as possible from real-time applications, customers are adopting streaming engines like [Amazon Kinesis](https://aws.amazon.com/kinesis/) and [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/). Previously, to load streaming data into your [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) database, you’d have to configure a process to stage data in [Amazon Simple Storage Service (Amazon S3)](https://aws.amazon.com/s3/) before loading. Doing so would introduce a latency of one minute or more, depending on the volume of data.\n\nToday, I am happy to share the general availability of [Amazon Redshift Streaming Ingestion](https://aws.amazon.com/redshift/redshift-streaming-ingestion/). With this new capability, [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) can natively ingest hundreds of megabytes of data per second from [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Streams and Amazon MSK into an [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) materialized view and query it in seconds.\n\n![image.png](https://dev-media.amazoncloud.cn/9a8aad945d2044ed8a400c072361bbe1_image.png)\n\nStreaming ingestion benefits from the ability to optimize query performance with materialized views and allows the use of [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) more efficiently for operational analytics and as the data source for real-time dashboards. Another interesting use case for streaming ingestion is analyzing real-time data from gamers to optimize their gaming experience. This new integration also makes it easier to implement analytics for IoT devices, clickstream analysis, application monitoring, fraud detection, and live leaderboards.\n\nLet’s see how this works in practice.\n\n### ++Configuring [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) Streaming Ingestion++\nApart from managing permissions, [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) streaming ingestion can be configured entirely with SQL within [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail). This is especially useful for business users who lack access to the [AWS Management Console](https://console.aws.amazon.com/) or the expertise to configure integrations between AWS services.\n\nYou can set up streaming ingestion in three steps:\n\n\nYou can set up streaming ingestion in three steps:\n\n- Create or update an [AWS Identity and Access Management (IAM)](https://aws.amazon.com/iam/) role to allow access to the streaming platform you use (Kinesis Data Streams or Amazon MSK). Note that the IAM role should have a trust policy that allows [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) to assume the role.\n- Create an external schema to connect to the streaming service.\n- Create a materialized view that references the streaming object (Kinesis data stream or Kafka topic) in the external schemas.\n\nAfter that, you can query the materialized view to use the data from the stream in your analytics workloads. Streaming ingestion works with [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) provisioned clusters and with the new serverless option. To maximize simplicity, I am going to use [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) Serverless in this walkthrough.\n\nTo prepare my environment, I need a Kinesis data stream. In the Kinesis console, I choose **Data streams** in the navigation pane and then **Create data stream**. For the **Data stream name**, I use ```my-input-stream```and then leave all other options set to their default value. After a few seconds, the Kinesis data stream is ready. Note that by default I am using [on-demand capacity mode](https://docs.aws.amazon.com/streams/latest/dev/how-do-i-size-a-stream.html#ondemandmode). In a development or test environment, you can choose [provisioned capacity mode](https://docs.aws.amazon.com/streams/latest/dev/how-do-i-size-a-stream.html#provisionedmode) with one [shard](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#shard) to optimize costs.\n\n\nNow, I create an IAM role to give [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) access to the ```my-input-stream``` Kinesis data streams. In the [IAM console](https://console.aws.amazon.com/iam), I create a role with this policy:\n\n\n```\\n{\\n \\"Version\\": \\"2012-10-17\\",\\n \\"Statement\\": [\\n {\\n \\"Effect\\": \\"Allow\\",\\n \\"Action\\": [\\n \\"kinesis:DescribeStreamSummary\\",\\n \\"kinesis:GetShardIterator\\",\\n \\"kinesis:GetRecords\\",\\n \\"kinesis:DescribeStream\\"\\n ],\\n \\"Resource\\": \\"arn:aws:kinesis:*:123412341234:stream/my-input-stream\\"\\n },\\n {\\n \\"Effect\\": \\"Allow\\",\\n \\"Action\\": [\\n \\"kinesis:ListStreams\\",\\n \\"kinesis:ListShards\\"\\n ],\\n \\"Resource\\": \\"*\\"\\n }\\n ]\\n}\\n```\n\nTo allow [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) to assume the role, I use the following trust policy:\n\n\n```\\n{\\n \\"Version\\": \\"2012-10-17\\",\\n \\"Statement\\": [\\n {\\n \\"Effect\\": \\"Allow\\",\\n \\"Principal\\": {\\n \\"Service\\": \\"redshift.amazonaws.com\\"\\n },\\n \\"Action\\": \\"sts:AssumeRole\\"\\n }\\n ]\\n}\\n```\n\nIn the [Amazon Redshift console](https://console.aws.amazon.com/redshift), I choose **Redshift serverless** from the navigation pane and create a new workgroup and namespace, [similar to what I did in this blog post](https://aws.amazon.com/blogs/aws/amazon-redshift-serverless-now-generally-available-with-new-capabilities/). When I create the namespace, in the **Permissions** section, I choose **Associate IAM roles** from the dropdown menu. Then, I select the role I just created. Note that the role is visible in this selection only if the trust policy allows [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) to assume it. After that, I complete the creation of the namespace using the default options. After a few minutes, the serverless database is ready for use.\n\nIn the [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) console, I choose Query editor v2\n in the navigation pane. I connect to the new serverless database by choosing it from the list of resources. Now, I can use SQL to configure streaming ingestion. First, I create an external schema that maps to the streaming service. Because I am going to use simulated IoT data as an example, I call the external schema ```sensors```.\n\n```\\nCREATE EXTERNAL SCHEMA sensors\\nFROM KINESIS\\nIAM_ROLE 'arn:aws:iam::123412341234:role/redshift-streaming-ingestion';\\n```\n\nTo access the data in the stream, I create a materialized view that selects data from the stream. In general, materialized views contain a precomputed result set based on the result of a query. In this case, the query is reading from the stream, and [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) is the consumer of the stream.\n\nBecause streaming data is going to be ingested as JSON data, I have two options:\n\n1. Leave all the JSON data in a single column and use [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) capabilities to query semi-structured data.\n2. Extract JSON properties into their own separate columns.\n\nLet’s see the pros and cons of both options.\n\nThe ```approximate_arrival_timestamp```, ```partition_key```, shard_id, and ```sequence_number``` columns in the ```SELECT``` statement are provided by Kinesis Data Streams. The record from the stream is in the ```kinesis_data``` column. The ```refresh_time``` column is provided by [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail).\n\nTo leave the JSON data in a single column of the ```sensor_data``` materialized view, I use the [JSON_PARSE](https://docs.aws.amazon.com/redshift/latest/dg/JSON_PARSE.html) function:\n\n```\\nCREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS\\n SELECT approximate_arrival_timestamp,\\n partition_key,\\n shard_id,\\n sequence_number,\\n refresh_time,\\n JSON_PARSE(kinesis_data, 'utf-8') as payload \\n FROM sensors.\\"my-input-stream\\";\\nCREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS\\nSELECT approximate_arrival_timestamp,\\npartition_key,\\nshard_id,\\nsequence_number,\\nrefresh_time,\\nJSON_PARSE(kinesis_data) as payload \\nFROM sensors.\\"my-input-stream\\";\\n```\n\nBecause I used the AUTO ```REFRESH YES parameter```, the content of the materialized view is automatically refreshed when there is new data in the stream.\n\nTo extract the JSON properties into separate columns of the ```sensor_data_extract``` materialized view, I use the [JSON_EXTRACT_PATH_TEXT](https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html) function:\n\n```\\nCREATE MATERIALIZED VIEW sensor_data_extract AUTO REFRESH YES AS\\n SELECT approximate_arrival_timestamp,\\n partition_key,\\n shard_id,\\n sequence_number,\\n refresh_time,\\n JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'sensor_id')::VARCHAR(8) as sensor_id,\\n JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'current_temperature')::DECIMAL(10,2) as current_temperature,\\n JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'status')::VARCHAR(8) as status,\\n JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_time')::CHARACTER(26) as event_time\\n FROM sensors.\\"my-input-stream\\";\\n```\n\n### ++Loading Data into the Kinesis Data Stream++\nTo put data in the ```my-input-stream``` Kinesis Data Stream, I use the following ```random_data_generator```.py Python script simulating data from IoT sensors:\n\n```\\nimport datetime\\nimport json\\nimport random\\nimport boto3\\n\\nSTREAM_NAME = \\"my-input-stream\\"\\n\\n\\ndef get_random_data():\\n current_temperature = round(10 + random.random() * 170, 2)\\n if current_temperature > 160:\\n status = \\"ERROR\\"\\n elif current_temperature > 140 or random.randrange(1, 100) > 80:\\n status = random.choice([\\"WARNING\\",\\"ERROR\\"])\\n else:\\n status = \\"OK\\"\\n return {\\n 'sensor_id': random.randrange(1, 100),\\n 'current_temperature': current_temperature,\\n 'status': status,\\n 'event_time': datetime.datetime.now().isoformat()\\n }\\n\\n\\ndef send_data(stream_name, kinesis_client):\\n while True:\\n data = get_random_data()\\n partition_key = str(data[\\"sensor_id\\"])\\n print(data)\\n kinesis_client.put_record(\\n StreamName=stream_name,\\n Data=json.dumps(data),\\n PartitionKey=partition_key)\\n\\n\\nif __name__ == '__main__':\\n kinesis_client = boto3.client('kinesis')\\n send_data(STREAM_NAME, kinesis_client)\\n```\n\nI start the script and see the records that are being put in the stream. They use a JSON syntax and contain random data.\n\n```\\n\$ python3 random_data_generator.py\\n```\n\n```\\n{'sensor_id': 66, 'current_temperature': 69.67, 'status': 'OK', 'event_time': '2022-11-20T18:31:30.693395'}\\n{'sensor_id': 45, 'current_temperature': 122.57, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.486649'}\\n{'sensor_id': 15, 'current_temperature': 101.64, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.671593'}\\n...\\n```\n\n### ++Querying Streaming Data from [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail)++\nTo compare the two materialized views, I select the first ten rows from each of them:\n\n- In the ```sensor_data``` materialized view, the JSON data in the stream is in the ```payload``` column. I can use [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) [JSON functions](https://docs.aws.amazon.com/redshift/latest/dg/json-functions.html) to access data stored in JSON format.\n\n![image.png](https://dev-media.amazoncloud.cn/715c1113c14444fdba9b707f4d45d90f_image.png)\n\n- In the ```sensor_data_extract``` materialized view, the JSON data in the stream has been extracted into different columns: ```sensor_id```, ```current_temperature```, ```status```, and ```event_time```.\n\n\n![image.png](https://dev-media.amazoncloud.cn/ca0ce35ef10c407da69c55a24595819f_image.png)\n\nNow I can use the data in these views in my analytics workloads together with the data in my data warehouse, my operational databases, and my data lake. I can use the data in these views together with [Redshift ML](https://aws.amazon.com/redshift/features/redshift-ml/) to train a machine learning model or use predictive analytics. Because materialized views support incremental updates, the data in these views can be efficiently used as a data source for dashboards, for example, [using Amazon Redshift as a data source](https://docs.aws.amazon.com/grafana/latest/userguide/AWS-RedShift.html) for [Amazon Managed Grafana](https://aws.amazon.com/grafana/).\n\n\n### ++Availability and Pricing++\n[Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) streaming ingestion for Kinesis Data Streams and Managed Streaming for Apache Kafka is generally available today in all commercial [AWS Regions](https://aws.amazon.com/about-aws/global-infrastructure/regions_az/).\n\nThere are no additional costs for using [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) streaming ingestion. For more information, see [Amazon Redshift pricing](https://aws.amazon.com/redshift/pricing/).\n\nIt’s never been easier to use low-latency streaming data in your data warehouse and in your data lake. Let us know what you build with this new capability!\n\n— [Danilo](https://twitter.com/danilop)\n\n![image.png](https://dev-media.amazoncloud.cn/4865915be39448759eb9e6e749112328_image.png)\n\n\n### **[Danilo Poccia](https://aws.amazon.com/blogs/aws/author/danilop/)**\nDanilo works with startups and companies of any size to support their innovation. In his role as Chief Evangelist (EMEA) at Amazon Web Services, he leverages his experience to help people bring their ideas to life, focusing on serverless architectures and event-driven programming, and on the technical and business impact of machine learning and edge computing. He is the author of [AWS Lambda](https://aws.amazon.com/cn/lambda/?trk=cndc-detail) in Action from Manning.\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n","render":"<p>Ten years ago, just a few months after I joined AWS, <a href=\\"https://aws.amazon.com/redshift/\\" target=\\"_blank\\">Amazon Redshift</a> was launched. Over the years, many features have been added to improve performance and make it easier to use. [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) now allows you to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes. More recently, <a href=\\"https://aws.amazon.com/blogs/aws/amazon-redshift-serverless-now-generally-available-with-new-capabilities/\\" target=\\"_blank\\">Amazon Redshift Serverless became generally available</a> to make it easier to run and scale analytics without having to manage your data warehouse infrastructure.</p>\\n<p>To process data as quickly as possible from real-time applications, customers are adopting streaming engines like <a href=\\"https://aws.amazon.com/kinesis/\\" target=\\"_blank\\">Amazon Kinesis</a> and <a href=\\"https://aws.amazon.com/msk/\\" target=\\"_blank\\">Amazon Managed Streaming for Apache Kafka</a>. Previously, to load streaming data into your [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) database, you’d have to configure a process to stage data in <a href=\\"https://aws.amazon.com/s3/\\" target=\\"_blank\\">Amazon Simple Storage Service (Amazon S3)</a> before loading. Doing so would introduce a latency of one minute or more, depending on the volume of data.</p>\\n<p>Today, I am happy to share the general availability of <a href=\\"https://aws.amazon.com/redshift/redshift-streaming-ingestion/\\" target=\\"_blank\\">Amazon Redshift Streaming Ingestion</a>. With this new capability, [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) can natively ingest hundreds of megabytes of data per second from [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Streams and Amazon MSK into an [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) materialized view and query it in seconds.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/9a8aad945d2044ed8a400c072361bbe1_image.png\\" alt=\\"image.png\\" /></p>\n<p>Streaming ingestion benefits from the ability to optimize query performance with materialized views and allows the use of Amazon Redshift more efficiently for operational analytics and as the data source for real-time dashboards. Another interesting use case for streaming ingestion is analyzing real-time data from gamers to optimize their gaming experience. This new integration also makes it easier to implement analytics for IoT devices, clickstream analysis, application monitoring, fraud detection, and live leaderboards.</p>\n<p>Let’s see how this works in practice.</p>\n<h3><a id=\\"Configuring_Amazon_Redshift_Streaming_Ingestion_12\\"></a><ins>Configuring Amazon Redshift Streaming Ingestion</ins></h3>\\n<p>Apart from managing permissions, Amazon Redshift streaming ingestion can be configured entirely with SQL within Amazon Redshift. This is especially useful for business users who lack access to the <a href=\\"https://console.aws.amazon.com/\\" target=\\"_blank\\">AWS Management Console</a> or the expertise to configure integrations between AWS services.</p>\\n<p>You can set up streaming ingestion in three steps:</p>\n<p>You can set up streaming ingestion in three steps:</p>\n<ul>\\n<li>Create or update an <a href=\\"https://aws.amazon.com/iam/\\" target=\\"_blank\\">AWS Identity and Access Management (IAM)</a> role to allow access to the streaming platform you use (Kinesis Data Streams or Amazon MSK). Note that the IAM role should have a trust policy that allows [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) to assume the role.</li>\\n<li>Create an external schema to connect to the streaming service.</li>\n<li>Create a materialized view that references the streaming object (Kinesis data stream or Kafka topic) in the external schemas.</li>\n</ul>\\n<p>After that, you can query the materialized view to use the data from the stream in your analytics workloads. Streaming ingestion works with Amazon Redshift provisioned clusters and with the new serverless option. To maximize simplicity, I am going to use Amazon Redshift Serverless in this walkthrough.</p>\n<p>To prepare my environment, I need a Kinesis data stream. In the Kinesis console, I choose <strong>Data streams</strong> in the navigation pane and then <strong>Create data stream</strong>. For the <strong>Data stream name</strong>, I use <code>my-input-stream</code>and then leave all other options set to their default value. After a few seconds, the Kinesis data stream is ready. Note that by default I am using <a href=\\"https://docs.aws.amazon.com/streams/latest/dev/how-do-i-size-a-stream.html#ondemandmode\\" target=\\"_blank\\">on-demand capacity mode</a>. In a development or test environment, you can choose <a href=\\"https://docs.aws.amazon.com/streams/latest/dev/how-do-i-size-a-stream.html#provisionedmode\\" target=\\"_blank\\">provisioned capacity mode</a> with one <a href=\\"https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#shard\\" target=\\"_blank\\">shard</a> to optimize costs.</p>\\n<p>Now, I create an IAM role to give Amazon Redshift access to the <code>my-input-stream</code> Kinesis data streams. In the <a href=\\"https://console.aws.amazon.com/iam\\" target=\\"_blank\\">IAM console</a>, I create a role with this policy:</p>\\n<pre><code class=\\"lang-\\">{\\n &quot;Version&quot;: &quot;2012-10-17&quot;,\\n &quot;Statement&quot;: [\\n {\\n &quot;Effect&quot;: &quot;Allow&quot;,\\n &quot;Action&quot;: [\\n &quot;kinesis:DescribeStreamSummary&quot;,\\n &quot;kinesis:GetShardIterator&quot;,\\n &quot;kinesis:GetRecords&quot;,\\n &quot;kinesis:DescribeStream&quot;\\n ],\\n &quot;Resource&quot;: &quot;arn:aws:kinesis:*:123412341234:stream/my-input-stream&quot;\\n },\\n {\\n &quot;Effect&quot;: &quot;Allow&quot;,\\n &quot;Action&quot;: [\\n &quot;kinesis:ListStreams&quot;,\\n &quot;kinesis:ListShards&quot;\\n ],\\n &quot;Resource&quot;: &quot;*&quot;\\n }\\n ]\\n}\\n</code></pre>\\n<p>To allow Amazon Redshift to assume the role, I use the following trust policy:</p>\n<pre><code class=\\"lang-\\">{\\n &quot;Version&quot;: &quot;2012-10-17&quot;,\\n &quot;Statement&quot;: [\\n {\\n &quot;Effect&quot;: &quot;Allow&quot;,\\n &quot;Principal&quot;: {\\n &quot;Service&quot;: &quot;redshift.amazonaws.com&quot;\\n },\\n &quot;Action&quot;: &quot;sts:AssumeRole&quot;\\n }\\n ]\\n}\\n</code></pre>\\n<p>In the <a href=\\"https://console.aws.amazon.com/redshift\\" target=\\"_blank\\">Amazon Redshift console</a>, I choose <strong>Redshift serverless</strong> from the navigation pane and create a new workgroup and namespace, <a href=\\"https://aws.amazon.com/blogs/aws/amazon-redshift-serverless-now-generally-available-with-new-capabilities/\\" target=\\"_blank\\">similar to what I did in this blog post</a>. When I create the namespace, in the <strong>Permissions</strong> section, I choose <strong>Associate IAM roles</strong> from the dropdown menu. Then, I select the role I just created. Note that the role is visible in this selection only if the trust policy allows [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) to assume it. After that, I complete the creation of the namespace using the default options. After a few minutes, the serverless database is ready for use.</p>\\n<p>In the Amazon Redshift console, I choose Query editor v2<br />\\nin the navigation pane. I connect to the new serverless database by choosing it from the list of resources. Now, I can use SQL to configure streaming ingestion. First, I create an external schema that maps to the streaming service. Because I am going to use simulated IoT data as an example, I call the external schema <code>sensors</code>.</p>\\n<pre><code class=\\"lang-\\">CREATE EXTERNAL SCHEMA sensors\\nFROM KINESIS\\nIAM_ROLE 'arn:aws:iam::123412341234:role/redshift-streaming-ingestion';\\n</code></pre>\\n<p>To access the data in the stream, I create a materialized view that selects data from the stream. In general, materialized views contain a precomputed result set based on the result of a query. In this case, the query is reading from the stream, and Amazon Redshift is the consumer of the stream.</p>\n<p>Because streaming data is going to be ingested as JSON data, I have two options:</p>\n<ol>\\n<li>Leave all the JSON data in a single column and use Amazon Redshift capabilities to query semi-structured data.</li>\n<li>Extract JSON properties into their own separate columns.</li>\n</ol>\\n<p>Let’s see the pros and cons of both options.</p>\n<p>The <code>approximate_arrival_timestamp</code>, <code>partition_key</code>, shard_id, and <code>sequence_number</code> columns in the <code>SELECT</code> statement are provided by Kinesis Data Streams. The record from the stream is in the <code>kinesis_data</code> column. The <code>refresh_time</code> column is provided by [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail).</p>\\n<p>To leave the JSON data in a single column of the <code>sensor_data</code> materialized view, I use the <a href=\\"https://docs.aws.amazon.com/redshift/latest/dg/JSON_PARSE.html\\" target=\\"_blank\\">JSON_PARSE</a> function:</p>\\n<pre><code class=\\"lang-\\">CREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS\\n SELECT approximate_arrival_timestamp,\\n partition_key,\\n shard_id,\\n sequence_number,\\n refresh_time,\\n JSON_PARSE(kinesis_data, 'utf-8') as payload \\n FROM sensors.&quot;my-input-stream&quot;;\\nCREATE MATERIALIZED VIEW sensor_data AUTO REFRESH YES AS\\nSELECT approximate_arrival_timestamp,\\npartition_key,\\nshard_id,\\nsequence_number,\\nrefresh_time,\\nJSON_PARSE(kinesis_data) as payload \\nFROM sensors.&quot;my-input-stream&quot;;\\n</code></pre>\\n<p>Because I used the AUTO <code>REFRESH YES parameter</code>, the content of the materialized view is automatically refreshed when there is new data in the stream.</p>\\n<p>To extract the JSON properties into separate columns of the <code>sensor_data_extract</code> materialized view, I use the <a href=\\"https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html\\" target=\\"_blank\\">JSON_EXTRACT_PATH_TEXT</a> function:</p>\\n<pre><code class=\\"lang-\\">CREATE MATERIALIZED VIEW sensor_data_extract AUTO REFRESH YES AS\\n SELECT approximate_arrival_timestamp,\\n partition_key,\\n shard_id,\\n sequence_number,\\n refresh_time,\\n JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'sensor_id')::VARCHAR(8) as sensor_id,\\n JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'current_temperature')::DECIMAL(10,2) as current_temperature,\\n JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'status')::VARCHAR(8) as status,\\n JSON_EXTRACT_PATH_TEXT(FROM_VARBYTE(kinesis_data, 'utf-8'),'event_time')::CHARACTER(26) as event_time\\n FROM sensors.&quot;my-input-stream&quot;;\\n</code></pre>\\n<h3><a id=\\"Loading_Data_into_the_Kinesis_Data_Stream_137\\"></a><ins>Loading Data into the Kinesis Data Stream</ins></h3>\\n<p>To put data in the <code>my-input-stream</code> Kinesis Data Stream, I use the following <code>random_data_generator</code>.py Python script simulating data from IoT sensors:</p>\\n<pre><code class=\\"lang-\\">import datetime\\nimport json\\nimport random\\nimport boto3\\n\\nSTREAM_NAME = &quot;my-input-stream&quot;\\n\\n\\ndef get_random_data():\\n current_temperature = round(10 + random.random() * 170, 2)\\n if current_temperature &gt; 160:\\n status = &quot;ERROR&quot;\\n elif current_temperature &gt; 140 or random.randrange(1, 100) &gt; 80:\\n status = random.choice([&quot;WARNING&quot;,&quot;ERROR&quot;])\\n else:\\n status = &quot;OK&quot;\\n return {\\n 'sensor_id': random.randrange(1, 100),\\n 'current_temperature': current_temperature,\\n 'status': status,\\n 'event_time': datetime.datetime.now().isoformat()\\n }\\n\\n\\ndef send_data(stream_name, kinesis_client):\\n while True:\\n data = get_random_data()\\n partition_key = str(data[&quot;sensor_id&quot;])\\n print(data)\\n kinesis_client.put_record(\\n StreamName=stream_name,\\n Data=json.dumps(data),\\n PartitionKey=partition_key)\\n\\n\\nif __name__ == '__main__':\\n kinesis_client = boto3.client('kinesis')\\n send_data(STREAM_NAME, kinesis_client)\\n</code></pre>\\n<p>I start the script and see the records that are being put in the stream. They use a JSON syntax and contain random data.</p>\n<pre><code class=\\"lang-\\">\$ python3 random_data_generator.py\\n</code></pre>\\n<pre><code class=\\"lang-\\">{'sensor_id': 66, 'current_temperature': 69.67, 'status': 'OK', 'event_time': '2022-11-20T18:31:30.693395'}\\n{'sensor_id': 45, 'current_temperature': 122.57, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.486649'}\\n{'sensor_id': 15, 'current_temperature': 101.64, 'status': 'OK', 'event_time': '2022-11-20T18:31:31.671593'}\\n...\\n</code></pre>\\n<h3><a id=\\"Querying_Streaming_Data_from_Amazon_Redshift_194\\"></a><ins>Querying Streaming Data from Amazon Redshift</ins></h3>\\n<p>To compare the two materialized views, I select the first ten rows from each of them:</p>\n<ul>\\n<li>In the <code>sensor_data</code> materialized view, the JSON data in the stream is in the <code>payload</code> column. I can use [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) <a href=\\"https://docs.aws.amazon.com/redshift/latest/dg/json-functions.html\\" target=\\"_blank\\">JSON functions</a> to access data stored in JSON format.</li>\\n</ul>\n<p><img src=\\"https://dev-media.amazoncloud.cn/715c1113c14444fdba9b707f4d45d90f_image.png\\" alt=\\"image.png\\" /></p>\n<ul>\\n<li>In the <code>sensor_data_extract</code> materialized view, the JSON data in the stream has been extracted into different columns: <code>sensor_id</code>, <code>current_temperature</code>, <code>status</code>, and <code>event_time</code>.</li>\\n</ul>\n<p><img src=\\"https://dev-media.amazoncloud.cn/ca0ce35ef10c407da69c55a24595819f_image.png\\" alt=\\"image.png\\" /></p>\n<p>Now I can use the data in these views in my analytics workloads together with the data in my data warehouse, my operational databases, and my data lake. I can use the data in these views together with <a href=\\"https://aws.amazon.com/redshift/features/redshift-ml/\\" target=\\"_blank\\">Redshift ML</a> to train a machine learning model or use predictive analytics. Because materialized views support incremental updates, the data in these views can be efficiently used as a data source for dashboards, for example, <a href=\\"https://docs.aws.amazon.com/grafana/latest/userguide/AWS-RedShift.html\\" target=\\"_blank\\">using Amazon Redshift as a data source</a> for <a href=\\"https://aws.amazon.com/grafana/\\" target=\\"_blank\\">Amazon Managed Grafana</a>.</p>\\n<h3><a id=\\"Availability_and_Pricing_209\\"></a><ins>Availability and Pricing</ins></h3>\\n<p>Amazon Redshift streaming ingestion for Kinesis Data Streams and Managed Streaming for Apache Kafka is generally available today in all commercial <a href=\\"https://aws.amazon.com/about-aws/global-infrastructure/regions_az/\\" target=\\"_blank\\">AWS Regions</a>.</p>\\n<p>There are no additional costs for using Amazon Redshift streaming ingestion. For more information, see <a href=\\"https://aws.amazon.com/redshift/pricing/\\" target=\\"_blank\\">Amazon Redshift pricing</a>.</p>\\n<p>It’s never been easier to use low-latency streaming data in your data warehouse and in your data lake. Let us know what you build with this new capability!</p>\n<p>— <a href=\\"https://twitter.com/danilop\\" target=\\"_blank\\">Danilo</a></p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/4865915be39448759eb9e6e749112328_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"Danilo_Pocciahttpsawsamazoncomblogsawsauthordanilop_221\\"></a><strong><a href=\\"https://aws.amazon.com/blogs/aws/author/danilop/\\" target=\\"_blank\\">Danilo Poccia</a></strong></h3>\n<p>Danilo works with startups and companies of any size to support their innovation. In his role as Chief Evangelist (EMEA) at Amazon Web Services, he leverages his experience to help people bring their ideas to life, focusing on serverless architectures and event-driven programming, and on the technical and business impact of machine learning and edge computing. He is the author of AWS Lambda in Action from Manning.</p>\n"}
0
目录
关闭