Kinesis Data Analytics Studio 和 Python 交互式开发自定义聚合查询

0
0
{"value":"[Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Analytics Studio 使客户能够轻松地实时分析流数据,并使用标准 SQL、Python 和 Scala 构建由 Apache Flink 支持的流处理应用程序。 只需在 Amazon 管理控制台中单击几下,客户就可以启动 serverless notebook 来查询数据流并在几秒钟内获得结果。\n\nApache Flink 是一个用于处理数据流的开源框架和引擎。它具有高可用性和可扩展性,为流处理应用程序提供了高吞吐量和低延迟。Kinesis Data Analytics 降低了构建和管理 Apache Flink 应用程序的复杂性。\n\n运行 Apache Flink 工作负载的客户面临着一个艰巨的挑战: 开发他们的分布式流处理应用程序,却无法真正了解他们的应用程序执行的数据处理步骤。Kinesis Data Analytics Studio 将 Apache Zeppelin 笔记本电脑的易用性与 Apache Flink 处理引擎的强大功能相结合,在完全管理的产品中提供高级流式分析功能。此外,它还加快了流处理应用程序的开发和运行,从而不断生成实时见解。\n\n在本文中,我们将向您介绍 Kinesis Data Analytics Studio,并开始使用 Apache Flink(Pyflink)的 Python API 从 [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) 数据流交互式查询数据。本文我们使用 Kinesis Data Stream 作为数据源。Kinesis Data Analytics Studio 还与 [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/cn/msk/?trk=cndc-detail)(Amazon MSK)、[Amazon Simple Storage Service](https://aws.amazon.com/cn/s3/?trk=cndc-detail)([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail))以及 Apache Flink 支持的各种其他数据源兼容。\n### **准备:**\n- Kinesis Data Stream\n- Cloud9\n\n### **创建 Kinesis Data Stream**\n进入 Cloud9, 新建一个 terminal,并执行下列 cli 创建一个名为 teststream 的消息队列:\n```\\n\$ aws kinesis create-stream \\\\\\n--stream-name teststream \\\\\\n--shard-count 1 \\\\\\n--region ap-northeast-1\\n```\n### **创建一个 Kinesis Data Analytics Studio notebook**\n您可以通过以下步骤开始与数据流交互:\n\n- 打开 Amazon 管理控制台并导航至 [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail)/Data Analytics/Streaming applications\n- 选择主页上的 Studio 选项卡,然后选择 Create Studio Notebook。\n- 选择 Create with custom settings, 输入 Studio 笔记本的名称,并让 Kinesis Data Analytics Studio 为此创建 Amazon IAM 角色。\n- 选择一个 Amazon Glue 数据库来存储 Kinesis Data Analytics Studio 使用的源和目标周围的元数据。\n- 选择创建 Studio notebook。\n\n创建应用程序后,选择 Run 以启动 Apache Flink 应用程序。这将需要几分钟的时间来完成,此后可以点击 Open in Apache Zeppelin 打开。\n\n![image.png](https://dev-media.amazoncloud.cn/05d4009a20df49bd99f3fc8459b97c8f_image.png)\n\n### **在 Cloud9 中创建样本数据**\n在 cloud9 中创建 ticker.py 文件,并复制如下代码到文件内并保存\n```\\nimport datetime\\nimport json\\nimport random\\nimport boto3\\nimport time\\n\\nSTREAM_NAME = \\"teststream\\"\\nprice = 100\\n\\ndef get_data():\\n global price \\n price = price + (random.random()*2-1)*10\\n price = 0 if price < 0 else price\\n return {\\n #'EVENT_TIME': datetime.datetime.now().isoformat(),\\n 'ticker': random.choice(['BTC','ETH','BSC','SOL']),\\n 'price': price,\\n 'event_time': datetime.datetime.now().isoformat()\\n }\\n\\ndef generate(stream_name, kinesis_client):\\n while True:\\n data = get_data() \\n print(data)\\n time.sleep(1)\\n kinesis_client.put_record(\\n StreamName=stream_name,\\n Data=json.dumps(data),\\n PartitionKey=\\"partitionkey\\")\\n\\nif __name__ == '__main__':\\n generate(STREAM_NAME, boto3.client('kinesis'))\\n```\n运行代码,程序会将模拟的数据发送到 Kinesis 中。这里注意,由于 Cloud9 使用临时凭证,所以有可能会出现 token 过期的问题,重新运行即可。\n### **编写 Studio 代码实现自定义聚合**\n首先我们创建一个 source table,用于定义数据的 schema 以及 watermark:\n```\\n%flink.ssql(type=update)\\n\\ncreate table stock_from_flink (\\n ticker varchar(6),\\n price double,\\n event_time TIMESTAMP(3),\\n WATERMARK for event_time AS event_time - INTERVAL '5' SECOND\\n)\\nPARTITIONED BY (ticker)\\nWITH(\\n 'connector' = 'kinesis',\\n 'stream' = 'teststream',\\n 'aws.region' = 'ap-northeast-1',\\n 'scan.stream.initpos' = 'LATEST',\\n 'format' = 'json',\\n 'json.timestamp-format.standard' = 'ISO-8601'\\n)\\n```\n点击右上角的执行按钮,然后添加新的 paragraph 并拷贝自定义聚合函数,这里我们定义计算窗口内的蜡烛图四元组,最大值,最小值,初始值和最新值:\n```\\n%flink.pyflink\\n\\nclass CountAndSumAggregateFunction(AggregateFunction):\\n\\n def get_value(self, accumulator):\\n return Row(accumulator[0], accumulator[1], accumulator[2], accumulator[3])\\n\\n def create_accumulator(self):\\n return Row(-1, 0,-1,0)\\n\\n def accumulate(self, accumulator, row: Row):\\n accumulator[0] = min(accumulator[0],row[1]) if accumulator[0] > 0 else row[1]\\n accumulator[1] = max(accumulator[1],row[1])\\n accumulator[2] = accumulator[2] if accumulator[2] > 0 else row[1]\\n accumulator[3] = row[1]\\n\\n def retract(self, accumulator, row: Row):\\n pass\\n\\n def merge(self, accumulator, accumulators):\\n pass\\n\\n def get_accumulator_type(self):\\n return DataTypes.ROW(\\n [DataTypes.FIELD(\\"minp\\", DataTypes.DOUBLE()),\\n DataTypes.FIELD(\\"maxp\\", DataTypes.DOUBLE()),\\n DataTypes.FIELD(\\"initialp\\", DataTypes.DOUBLE()),\\n DataTypes.FIELD(\\"lastedp\\", DataTypes.DOUBLE())])\\n\\n def get_result_type(self):\\n return DataTypes.ROW(\\n [DataTypes.FIELD(\\"minp\\", DataTypes.DOUBLE()),\\n DataTypes.FIELD(\\"maxp\\", DataTypes.DOUBLE()),\\n DataTypes.FIELD(\\"initialp\\", DataTypes.DOUBLE()),\\n DataTypes.FIELD(\\"lastedp\\", DataTypes.DOUBLE())])\\n\\nfunction = CountAndSumAggregateFunction()\\nagg = udaf(function,\\n result_type=function.get_result_type(),\\n accumulator_type=function.get_accumulator_type(),\\n name=str(function.__class__.__name__))\\n```\n创建新的 paragraph ,粘贴主程序,调用自定义聚合函数,我们定义一个10秒钟的窗口方便数据观察:\n```\\n%flink.pyflink\\n\\ninput_table = st_env.from_path(\\"stock_from_flink\\")\\nnew_table3 = input_table.window(Tumble.over(\\"10.seconds\\").on(\\"event_time\\").alias(\\"ten_seconds_window\\")) \\\\\\n .group_by(\\"ten_seconds_window, ticker\\") \\\\\\n .aggregate(agg.alias(\\"minp\\",\\"maxp\\",\\"initialp\\",\\"lastedp\\")) \\\\\\n .select(\\" ticker as ticker, minp as min_price, maxp as max_price, initialp as initial_price, lastedp as latest_price, ten_seconds_window.end as epoch_time\\")\\n```\n最后一个 paragraph 我们用来展示我们聚合后的结果:\n```\\n%flink.pyflink\\n\\nz.show(new_table3, stream_type=\\"update\\")\\n```\n点击执行后,我们会很快接收到数据并生成窗口:\n\n![image.png](https://dev-media.amazoncloud.cn/779ecf20a4f14d77bb3030a052ad2942_image.png)\n\n我们还可以通过不同的可视化图形来观察数据\n\n![image.png](https://dev-media.amazoncloud.cn/5c17466d837c4446a6e397f0231326bb_image.png)\n\n同时,同传统的应用一样,Studio 提供了 Apache Flink Dashboard, 方便查询程序运行时的状况。\n\n![image.png](https://dev-media.amazoncloud.cn/133e43dc28c54bc9a69496aa1570c9da_image.png)\n\n### **总结:**\nKinesis Data Analytics Studio 使 Apache Flink 应用程序开发的速度大大加快。此外,所有这些都是通过丰富的可视化、可扩展且用户友好的界面来实现的,并能协作开发,灵活选择语言,使得流式工作负载性能强大。用户可以按照本文所述段落,或者选择将他们升级为具有持久状态的针对 Apache Flink 的 Kinesis Data Analytics 应用程序。\n\n### **参考文档:**\n[https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html)\n\n[https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html](https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html)\n\n### 本篇作者\n![image.png](https://dev-media.amazoncloud.cn/e90f62df62f64e7390734e6cfc7cddd4_image.png)\n\n**孙标**\n亚马逊云科技资深解决方案架构师。拥有多年金融,移动互联网研发及数字货币交易所架构经验。","render":"<p>Amazon Kinesis Data Analytics Studio 使客户能够轻松地实时分析流数据,并使用标准 SQL、Python 和 Scala 构建由 Apache Flink 支持的流处理应用程序。 只需在 Amazon 管理控制台中单击几下,客户就可以启动 serverless notebook 来查询数据流并在几秒钟内获得结果。</p>\n<p>Apache Flink 是一个用于处理数据流的开源框架和引擎。它具有高可用性和可扩展性,为流处理应用程序提供了高吞吐量和低延迟。Kinesis Data Analytics 降低了构建和管理 Apache Flink 应用程序的复杂性。</p>\n<p>运行 Apache Flink 工作负载的客户面临着一个艰巨的挑战: 开发他们的分布式流处理应用程序,却无法真正了解他们的应用程序执行的数据处理步骤。Kinesis Data Analytics Studio 将 Apache Zeppelin 笔记本电脑的易用性与 Apache Flink 处理引擎的强大功能相结合,在完全管理的产品中提供高级流式分析功能。此外,它还加快了流处理应用程序的开发和运行,从而不断生成实时见解。</p>\n<p>在本文中,我们将向您介绍 Kinesis Data Analytics Studio,并开始使用 Apache Flink(Pyflink)的 Python API 从 Amazon Kinesis 数据流交互式查询数据。本文我们使用 Kinesis Data Stream 作为数据源。Kinesis Data Analytics Studio 还与 Amazon Managed Streaming for Apache Kafka(Amazon MSK)、Amazon Simple Storage Service(Amazon S3)以及 Apache Flink 支持的各种其他数据源兼容。</p>\n<h3><a id=\\"_7\\"></a><strong>准备:</strong></h3>\\n<ul>\\n<li>Kinesis Data Stream</li>\n<li>Cloud9</li>\n</ul>\\n<h3><a id=\\"_Kinesis_Data_Stream_11\\"></a><strong>创建 Kinesis Data Stream</strong></h3>\\n<p>进入 Cloud9, 新建一个 terminal,并执行下列 cli 创建一个名为 teststream 的消息队列:</p>\n<pre><code class=\\"lang-\\">\$ aws kinesis create-stream \\\\\\n--stream-name teststream \\\\\\n--shard-count 1 \\\\\\n--region ap-northeast-1\\n</code></pre>\\n<h3><a id=\\"_Kinesis_Data_Analytics_Studio_notebook_19\\"></a><strong>创建一个 Kinesis Data Analytics Studio notebook</strong></h3>\\n<p>您可以通过以下步骤开始与数据流交互:</p>\n<ul>\\n<li>打开 Amazon 管理控制台并导航至 Amazon Kinesis/Data Analytics/Streaming applications</li>\n<li>选择主页上的 Studio 选项卡,然后选择 Create Studio Notebook。</li>\n<li>选择 Create with custom settings, 输入 Studio 笔记本的名称,并让 Kinesis Data Analytics Studio 为此创建 Amazon IAM 角色。</li>\n<li>选择一个 Amazon Glue 数据库来存储 Kinesis Data Analytics Studio 使用的源和目标周围的元数据。</li>\n<li>选择创建 Studio notebook。</li>\n</ul>\\n<p>创建应用程序后,选择 Run 以启动 Apache Flink 应用程序。这将需要几分钟的时间来完成,此后可以点击 Open in Apache Zeppelin 打开。</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/05d4009a20df49bd99f3fc8459b97c8f_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"_Cloud9__32\\"></a><strong>在 Cloud9 中创建样本数据</strong></h3>\\n<p>在 cloud9 中创建 ticker.py 文件,并复制如下代码到文件内并保存</p>\n<pre><code class=\\"lang-\\">import datetime\\nimport json\\nimport random\\nimport boto3\\nimport time\\n\\nSTREAM_NAME = &quot;teststream&quot;\\nprice = 100\\n\\ndef get_data():\\n global price \\n price = price + (random.random()*2-1)*10\\n price = 0 if price &lt; 0 else price\\n return {\\n #'EVENT_TIME': datetime.datetime.now().isoformat(),\\n 'ticker': random.choice(['BTC','ETH','BSC','SOL']),\\n 'price': price,\\n 'event_time': datetime.datetime.now().isoformat()\\n }\\n\\ndef generate(stream_name, kinesis_client):\\n while True:\\n data = get_data() \\n print(data)\\n time.sleep(1)\\n kinesis_client.put_record(\\n StreamName=stream_name,\\n Data=json.dumps(data),\\n PartitionKey=&quot;partitionkey&quot;)\\n\\nif __name__ == '__main__':\\n generate(STREAM_NAME, boto3.client('kinesis'))\\n</code></pre>\\n<p>运行代码,程序会将模拟的数据发送到 Kinesis 中。这里注意,由于 Cloud9 使用临时凭证,所以有可能会出现 token 过期的问题,重新运行即可。</p>\n<h3><a id=\\"_Studio__69\\"></a><strong>编写 Studio 代码实现自定义聚合</strong></h3>\\n<p>首先我们创建一个 source table,用于定义数据的 schema 以及 watermark:</p>\n<pre><code class=\\"lang-\\">%flink.ssql(type=update)\\n\\ncreate table stock_from_flink (\\n ticker varchar(6),\\n price double,\\n event_time TIMESTAMP(3),\\n WATERMARK for event_time AS event_time - INTERVAL '5' SECOND\\n)\\nPARTITIONED BY (ticker)\\nWITH(\\n 'connector' = 'kinesis',\\n 'stream' = 'teststream',\\n 'aws.region' = 'ap-northeast-1',\\n 'scan.stream.initpos' = 'LATEST',\\n 'format' = 'json',\\n 'json.timestamp-format.standard' = 'ISO-8601'\\n)\\n</code></pre>\\n<p>点击右上角的执行按钮,然后添加新的 paragraph 并拷贝自定义聚合函数,这里我们定义计算窗口内的蜡烛图四元组,最大值,最小值,初始值和最新值:</p>\n<pre><code class=\\"lang-\\">%flink.pyflink\\n\\nclass CountAndSumAggregateFunction(AggregateFunction):\\n\\n def get_value(self, accumulator):\\n return Row(accumulator[0], accumulator[1], accumulator[2], accumulator[3])\\n\\n def create_accumulator(self):\\n return Row(-1, 0,-1,0)\\n\\n def accumulate(self, accumulator, row: Row):\\n accumulator[0] = min(accumulator[0],row[1]) if accumulator[0] &gt; 0 else row[1]\\n accumulator[1] = max(accumulator[1],row[1])\\n accumulator[2] = accumulator[2] if accumulator[2] &gt; 0 else row[1]\\n accumulator[3] = row[1]\\n\\n def retract(self, accumulator, row: Row):\\n pass\\n\\n def merge(self, accumulator, accumulators):\\n pass\\n\\n def get_accumulator_type(self):\\n return DataTypes.ROW(\\n [DataTypes.FIELD(&quot;minp&quot;, DataTypes.DOUBLE()),\\n DataTypes.FIELD(&quot;maxp&quot;, DataTypes.DOUBLE()),\\n DataTypes.FIELD(&quot;initialp&quot;, DataTypes.DOUBLE()),\\n DataTypes.FIELD(&quot;lastedp&quot;, DataTypes.DOUBLE())])\\n\\n def get_result_type(self):\\n return DataTypes.ROW(\\n [DataTypes.FIELD(&quot;minp&quot;, DataTypes.DOUBLE()),\\n DataTypes.FIELD(&quot;maxp&quot;, DataTypes.DOUBLE()),\\n DataTypes.FIELD(&quot;initialp&quot;, DataTypes.DOUBLE()),\\n DataTypes.FIELD(&quot;lastedp&quot;, DataTypes.DOUBLE())])\\n\\nfunction = CountAndSumAggregateFunction()\\nagg = udaf(function,\\n result_type=function.get_result_type(),\\n accumulator_type=function.get_accumulator_type(),\\n name=str(function.__class__.__name__))\\n</code></pre>\\n<p>创建新的 paragraph ,粘贴主程序,调用自定义聚合函数,我们定义一个10秒钟的窗口方便数据观察:</p>\n<pre><code class=\\"lang-\\">%flink.pyflink\\n\\ninput_table = st_env.from_path(&quot;stock_from_flink&quot;)\\nnew_table3 = input_table.window(Tumble.over(&quot;10.seconds&quot;).on(&quot;event_time&quot;).alias(&quot;ten_seconds_window&quot;)) \\\\\\n .group_by(&quot;ten_seconds_window, ticker&quot;) \\\\\\n .aggregate(agg.alias(&quot;minp&quot;,&quot;maxp&quot;,&quot;initialp&quot;,&quot;lastedp&quot;)) \\\\\\n .select(&quot; ticker as ticker, minp as min_price, maxp as max_price, initialp as initial_price, lastedp as latest_price, ten_seconds_window.end as epoch_time&quot;)\\n</code></pre>\\n<p>最后一个 paragraph 我们用来展示我们聚合后的结果:</p>\n<pre><code class=\\"lang-\\">%flink.pyflink\\n\\nz.show(new_table3, stream_type=&quot;update&quot;)\\n</code></pre>\\n<p>点击执行后,我们会很快接收到数据并生成窗口:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/779ecf20a4f14d77bb3030a052ad2942_image.png\\" alt=\\"image.png\\" /></p>\n<p>我们还可以通过不同的可视化图形来观察数据</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/5c17466d837c4446a6e397f0231326bb_image.png\\" alt=\\"image.png\\" /></p>\n<p>同时,同传统的应用一样,Studio 提供了 Apache Flink Dashboard, 方便查询程序运行时的状况。</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/133e43dc28c54bc9a69496aa1570c9da_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"_162\\"></a><strong>总结:</strong></h3>\\n<p>Kinesis Data Analytics Studio 使 Apache Flink 应用程序开发的速度大大加快。此外,所有这些都是通过丰富的可视化、可扩展且用户友好的界面来实现的,并能协作开发,灵活选择语言,使得流式工作负载性能强大。用户可以按照本文所述段落,或者选择将他们升级为具有持久状态的针对 Apache Flink 的 Kinesis Data Analytics 应用程序。</p>\n<h3><a id=\\"_165\\"></a><strong>参考文档:</strong></h3>\\n<p><a href=\\"https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html\\" target=\\"_blank\\">https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html</a></p>\\n<p><a href=\\"https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html\\" target=\\"_blank\\">https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html</a></p>\\n<h3><a id=\\"_170\\"></a>本篇作者</h3>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/e90f62df62f64e7390734e6cfc7cddd4_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>孙标</strong><br />\\n亚马逊云科技资深解决方案架构师。拥有多年金融,移动互联网研发及数字货币交易所架构经验。</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭