{"value":"Amazon Kinesis Data Analytics Studio 使客户能够轻松地实时分析流数据,并使用标准 SQL、Python 和 Scala 构建由 Apache Flink 支持的流处理应用程序。 只需在 Amazon 管理控制台中单击几下,客户就可以启动 serverless notebook 来查询数据流并在几秒钟内获得结果。\n\nApache Flink 是一个用于处理数据流的开源框架和引擎。它具有高可用性和可扩展性,为流处理应用程序提供了高吞吐量和低延迟。Kinesis Data Analytics 降低了构建和管理 Apache Flink 应用程序的复杂性。\n\n运行 Apache Flink 工作负载的客户面临着一个艰巨的挑战: 开发他们的分布式流处理应用程序,却无法真正了解他们的应用程序执行的数据处理步骤。Kinesis Data Analytics Studio 将 Apache Zeppelin 笔记本电脑的易用性与 Apache Flink 处理引擎的强大功能相结合,在完全管理的产品中提供高级流式分析功能。此外,它还加快了流处理应用程序的开发和运行,从而不断生成实时见解。\n\n在本文中,我们将向您介绍 Kinesis Data Analytics Studio,并开始使用 Apache Flink(Pyflink)的 Python API 从 Amazon Kinesis 数据流交互式查询数据。本文我们使用 Kinesis Data Stream 作为数据源。Kinesis Data Analytics Studio 还与 Amazon Managed Streaming for Apache Kafka(Amazon MSK)、Amazon Simple Storage Service(Amazon S3)以及 Apache Flink 支持的各种其他数据源兼容。\n### **准备:**\n- Kinesis Data Stream\n- Cloud9\n\n### **创建 Kinesis Data Stream**\n进入 Cloud9, 新建一个 terminal,并执行下列 cli 创建一个名为 teststream 的消息队列:\n```\n$ aws kinesis create-stream \\\n--stream-name teststream \\\n--shard-count 1 \\\n--region ap-northeast-1\n```\n### **创建一个 Kinesis Data Analytics Studio notebook**\n您可以通过以下步骤开始与数据流交互:\n\n- 打开 Amazon 管理控制台并导航至 Amazon Kinesis/Data Analytics/Streaming applications\n- 选择主页上的 Studio 选项卡,然后选择 Create Studio Notebook。\n- 选择 Create with custom settings, 输入 Studio 笔记本的名称,并让 Kinesis Data Analytics Studio 为此创建 Amazon IAM 角色。\n- 选择一个 Amazon Glue 数据库来存储 Kinesis Data Analytics Studio 使用的源和目标周围的元数据。\n- 选择创建 Studio notebook。\n\n创建应用程序后,选择 Run 以启动 Apache Flink 应用程序。这将需要几分钟的时间来完成,此后可以点击 Open in Apache Zeppelin 打开。\n\n![image.png](https://dev-media.amazoncloud.cn/05d4009a20df49bd99f3fc8459b97c8f_image.png)\n\n### **在 Cloud9 中创建样本数据**\n在 cloud9 中创建 ticker.py 文件,并复制如下代码到文件内并保存\n```\nimport datetime\nimport json\nimport random\nimport boto3\nimport time\n\nSTREAM_NAME = \"teststream\"\nprice = 100\n\ndef get_data():\n global price \n price = price + (random.random()*2-1)*10\n price = 0 if price < 0 else price\n return {\n #'EVENT_TIME': datetime.datetime.now().isoformat(),\n 'ticker': random.choice(['BTC','ETH','BSC','SOL']),\n 'price': price,\n 'event_time': datetime.datetime.now().isoformat()\n }\n\ndef generate(stream_name, kinesis_client):\n while True:\n data = get_data() \n print(data)\n time.sleep(1)\n kinesis_client.put_record(\n StreamName=stream_name,\n Data=json.dumps(data),\n PartitionKey=\"partitionkey\")\n\nif __name__ == '__main__':\n generate(STREAM_NAME, boto3.client('kinesis'))\n```\n运行代码,程序会将模拟的数据发送到 Kinesis 中。这里注意,由于 Cloud9 使用临时凭证,所以有可能会出现 token 过期的问题,重新运行即可。\n### **编写 Studio 代码实现自定义聚合**\n首先我们创建一个 source table,用于定义数据的 schema 以及 watermark:\n```\n%flink.ssql(type=update)\n\ncreate table stock_from_flink (\n ticker varchar(6),\n price double,\n event_time TIMESTAMP(3),\n WATERMARK for event_time AS event_time - INTERVAL '5' SECOND\n)\nPARTITIONED BY (ticker)\nWITH(\n 'connector' = 'kinesis',\n 'stream' = 'teststream',\n 'aws.region' = 'ap-northeast-1',\n 'scan.stream.initpos' = 'LATEST',\n 'format' = 'json',\n 'json.timestamp-format.standard' = 'ISO-8601'\n)\n```\n点击右上角的执行按钮,然后添加新的 paragraph 并拷贝自定义聚合函数,这里我们定义计算窗口内的蜡烛图四元组,最大值,最小值,初始值和最新值:\n```\n%flink.pyflink\n\nclass CountAndSumAggregateFunction(AggregateFunction):\n\n def get_value(self, accumulator):\n return Row(accumulator[0], accumulator[1], accumulator[2], accumulator[3])\n\n def create_accumulator(self):\n return Row(-1, 0,-1,0)\n\n def accumulate(self, accumulator, row: Row):\n accumulator[0] = min(accumulator[0],row[1]) if accumulator[0] > 0 else row[1]\n accumulator[1] = max(accumulator[1],row[1])\n accumulator[2] = accumulator[2] if accumulator[2] > 0 else row[1]\n accumulator[3] = row[1]\n\n def retract(self, accumulator, row: Row):\n pass\n\n def merge(self, accumulator, accumulators):\n pass\n\n def get_accumulator_type(self):\n return DataTypes.ROW(\n [DataTypes.FIELD(\"minp\", DataTypes.DOUBLE()),\n DataTypes.FIELD(\"maxp\", DataTypes.DOUBLE()),\n DataTypes.FIELD(\"initialp\", DataTypes.DOUBLE()),\n DataTypes.FIELD(\"lastedp\", DataTypes.DOUBLE())])\n\n def get_result_type(self):\n return DataTypes.ROW(\n [DataTypes.FIELD(\"minp\", DataTypes.DOUBLE()),\n DataTypes.FIELD(\"maxp\", DataTypes.DOUBLE()),\n DataTypes.FIELD(\"initialp\", DataTypes.DOUBLE()),\n DataTypes.FIELD(\"lastedp\", DataTypes.DOUBLE())])\n\nfunction = CountAndSumAggregateFunction()\nagg = udaf(function,\n result_type=function.get_result_type(),\n accumulator_type=function.get_accumulator_type(),\n name=str(function.__class__.__name__))\n```\n创建新的 paragraph ,粘贴主程序,调用自定义聚合函数,我们定义一个10秒钟的窗口方便数据观察:\n```\n%flink.pyflink\n\ninput_table = st_env.from_path(\"stock_from_flink\")\nnew_table3 = input_table.window(Tumble.over(\"10.seconds\").on(\"event_time\").alias(\"ten_seconds_window\")) \\\n .group_by(\"ten_seconds_window, ticker\") \\\n .aggregate(agg.alias(\"minp\",\"maxp\",\"initialp\",\"lastedp\")) \\\n .select(\" ticker as ticker, minp as min_price, maxp as max_price, initialp as initial_price, lastedp as latest_price, ten_seconds_window.end as epoch_time\")\n```\n最后一个 paragraph 我们用来展示我们聚合后的结果:\n```\n%flink.pyflink\n\nz.show(new_table3, stream_type=\"update\")\n```\n点击执行后,我们会很快接收到数据并生成窗口:\n\n![image.png](https://dev-media.amazoncloud.cn/779ecf20a4f14d77bb3030a052ad2942_image.png)\n\n我们还可以通过不同的可视化图形来观察数据\n\n![image.png](https://dev-media.amazoncloud.cn/5c17466d837c4446a6e397f0231326bb_image.png)\n\n同时,同传统的应用一样,Studio 提供了 Apache Flink Dashboard, 方便查询程序运行时的状况。\n\n![image.png](https://dev-media.amazoncloud.cn/133e43dc28c54bc9a69496aa1570c9da_image.png)\n\n### **总结:**\nKinesis Data Analytics Studio 使 Apache Flink 应用程序开发的速度大大加快。此外,所有这些都是通过丰富的可视化、可扩展且用户友好的界面来实现的,并能协作开发,灵活选择语言,使得流式工作负载性能强大。用户可以按照本文所述段落,或者选择将他们升级为具有持久状态的针对 Apache Flink 的 Kinesis Data Analytics 应用程序。\n\n### **参考文档:**\n[https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html)\n\n[https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html](https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html)\n\n### 本篇作者\n![image.png](https://dev-media.amazoncloud.cn/e90f62df62f64e7390734e6cfc7cddd4_image.png)\n\n**孙标**\n亚马逊云科技资深解决方案架构师。拥有多年金融,移动互联网研发及数字货币交易所架构经验。","render":"<p>Amazon Kinesis Data Analytics Studio 使客户能够轻松地实时分析流数据,并使用标准 SQL、Python 和 Scala 构建由 Apache Flink 支持的流处理应用程序。 只需在 Amazon 管理控制台中单击几下,客户就可以启动 serverless notebook 来查询数据流并在几秒钟内获得结果。</p>\n<p>Apache Flink 是一个用于处理数据流的开源框架和引擎。它具有高可用性和可扩展性,为流处理应用程序提供了高吞吐量和低延迟。Kinesis Data Analytics 降低了构建和管理 Apache Flink 应用程序的复杂性。</p>\n<p>运行 Apache Flink 工作负载的客户面临着一个艰巨的挑战: 开发他们的分布式流处理应用程序,却无法真正了解他们的应用程序执行的数据处理步骤。Kinesis Data Analytics Studio 将 Apache Zeppelin 笔记本电脑的易用性与 Apache Flink 处理引擎的强大功能相结合,在完全管理的产品中提供高级流式分析功能。此外,它还加快了流处理应用程序的开发和运行,从而不断生成实时见解。</p>\n<p>在本文中,我们将向您介绍 Kinesis Data Analytics Studio,并开始使用 Apache Flink(Pyflink)的 Python API 从 Amazon Kinesis 数据流交互式查询数据。本文我们使用 Kinesis Data Stream 作为数据源。Kinesis Data Analytics Studio 还与 Amazon Managed Streaming for Apache Kafka(Amazon MSK)、Amazon Simple Storage Service(Amazon S3)以及 Apache Flink 支持的各种其他数据源兼容。</p>\n<h3><a id=\"_7\"></a><strong>准备:</strong></h3>\n<ul>\n<li>Kinesis Data Stream</li>\n<li>Cloud9</li>\n</ul>\n<h3><a id=\"_Kinesis_Data_Stream_11\"></a><strong>创建 Kinesis Data Stream</strong></h3>\n<p>进入 Cloud9, 新建一个 terminal,并执行下列 cli 创建一个名为 teststream 的消息队列:</p>\n<pre><code class=\"lang-\">$ aws kinesis create-stream \\\n--stream-name teststream \\\n--shard-count 1 \\\n--region ap-northeast-1\n</code></pre>\n<h3><a id=\"_Kinesis_Data_Analytics_Studio_notebook_19\"></a><strong>创建一个 Kinesis Data Analytics Studio notebook</strong></h3>\n<p>您可以通过以下步骤开始与数据流交互:</p>\n<ul>\n<li>打开 Amazon 管理控制台并导航至 Amazon Kinesis/Data Analytics/Streaming applications</li>\n<li>选择主页上的 Studio 选项卡,然后选择 Create Studio Notebook。</li>\n<li>选择 Create with custom settings, 输入 Studio 笔记本的名称,并让 Kinesis Data Analytics Studio 为此创建 Amazon IAM 角色。</li>\n<li>选择一个 Amazon Glue 数据库来存储 Kinesis Data Analytics Studio 使用的源和目标周围的元数据。</li>\n<li>选择创建 Studio notebook。</li>\n</ul>\n<p>创建应用程序后,选择 Run 以启动 Apache Flink 应用程序。这将需要几分钟的时间来完成,此后可以点击 Open in Apache Zeppelin 打开。</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/05d4009a20df49bd99f3fc8459b97c8f_image.png\" alt=\"image.png\" /></p>\n<h3><a id=\"_Cloud9__32\"></a><strong>在 Cloud9 中创建样本数据</strong></h3>\n<p>在 cloud9 中创建 ticker.py 文件,并复制如下代码到文件内并保存</p>\n<pre><code class=\"lang-\">import datetime\nimport json\nimport random\nimport boto3\nimport time\n\nSTREAM_NAME = "teststream"\nprice = 100\n\ndef get_data():\n global price \n price = price + (random.random()*2-1)*10\n price = 0 if price < 0 else price\n return {\n #'EVENT_TIME': datetime.datetime.now().isoformat(),\n 'ticker': random.choice(['BTC','ETH','BSC','SOL']),\n 'price': price,\n 'event_time': datetime.datetime.now().isoformat()\n }\n\ndef generate(stream_name, kinesis_client):\n while True:\n data = get_data() \n print(data)\n time.sleep(1)\n kinesis_client.put_record(\n StreamName=stream_name,\n Data=json.dumps(data),\n PartitionKey="partitionkey")\n\nif __name__ == '__main__':\n generate(STREAM_NAME, boto3.client('kinesis'))\n</code></pre>\n<p>运行代码,程序会将模拟的数据发送到 Kinesis 中。这里注意,由于 Cloud9 使用临时凭证,所以有可能会出现 token 过期的问题,重新运行即可。</p>\n<h3><a id=\"_Studio__69\"></a><strong>编写 Studio 代码实现自定义聚合</strong></h3>\n<p>首先我们创建一个 source table,用于定义数据的 schema 以及 watermark:</p>\n<pre><code class=\"lang-\">%flink.ssql(type=update)\n\ncreate table stock_from_flink (\n ticker varchar(6),\n price double,\n event_time TIMESTAMP(3),\n WATERMARK for event_time AS event_time - INTERVAL '5' SECOND\n)\nPARTITIONED BY (ticker)\nWITH(\n 'connector' = 'kinesis',\n 'stream' = 'teststream',\n 'aws.region' = 'ap-northeast-1',\n 'scan.stream.initpos' = 'LATEST',\n 'format' = 'json',\n 'json.timestamp-format.standard' = 'ISO-8601'\n)\n</code></pre>\n<p>点击右上角的执行按钮,然后添加新的 paragraph 并拷贝自定义聚合函数,这里我们定义计算窗口内的蜡烛图四元组,最大值,最小值,初始值和最新值:</p>\n<pre><code class=\"lang-\">%flink.pyflink\n\nclass CountAndSumAggregateFunction(AggregateFunction):\n\n def get_value(self, accumulator):\n return Row(accumulator[0], accumulator[1], accumulator[2], accumulator[3])\n\n def create_accumulator(self):\n return Row(-1, 0,-1,0)\n\n def accumulate(self, accumulator, row: Row):\n accumulator[0] = min(accumulator[0],row[1]) if accumulator[0] > 0 else row[1]\n accumulator[1] = max(accumulator[1],row[1])\n accumulator[2] = accumulator[2] if accumulator[2] > 0 else row[1]\n accumulator[3] = row[1]\n\n def retract(self, accumulator, row: Row):\n pass\n\n def merge(self, accumulator, accumulators):\n pass\n\n def get_accumulator_type(self):\n return DataTypes.ROW(\n [DataTypes.FIELD("minp", DataTypes.DOUBLE()),\n DataTypes.FIELD("maxp", DataTypes.DOUBLE()),\n DataTypes.FIELD("initialp", DataTypes.DOUBLE()),\n DataTypes.FIELD("lastedp", DataTypes.DOUBLE())])\n\n def get_result_type(self):\n return DataTypes.ROW(\n [DataTypes.FIELD("minp", DataTypes.DOUBLE()),\n DataTypes.FIELD("maxp", DataTypes.DOUBLE()),\n DataTypes.FIELD("initialp", DataTypes.DOUBLE()),\n DataTypes.FIELD("lastedp", DataTypes.DOUBLE())])\n\nfunction = CountAndSumAggregateFunction()\nagg = udaf(function,\n result_type=function.get_result_type(),\n accumulator_type=function.get_accumulator_type(),\n name=str(function.__class__.__name__))\n</code></pre>\n<p>创建新的 paragraph ,粘贴主程序,调用自定义聚合函数,我们定义一个10秒钟的窗口方便数据观察:</p>\n<pre><code class=\"lang-\">%flink.pyflink\n\ninput_table = st_env.from_path("stock_from_flink")\nnew_table3 = input_table.window(Tumble.over("10.seconds").on("event_time").alias("ten_seconds_window")) \\\n .group_by("ten_seconds_window, ticker") \\\n .aggregate(agg.alias("minp","maxp","initialp","lastedp")) \\\n .select(" ticker as ticker, minp as min_price, maxp as max_price, initialp as initial_price, lastedp as latest_price, ten_seconds_window.end as epoch_time")\n</code></pre>\n<p>最后一个 paragraph 我们用来展示我们聚合后的结果:</p>\n<pre><code class=\"lang-\">%flink.pyflink\n\nz.show(new_table3, stream_type="update")\n</code></pre>\n<p>点击执行后,我们会很快接收到数据并生成窗口:</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/779ecf20a4f14d77bb3030a052ad2942_image.png\" alt=\"image.png\" /></p>\n<p>我们还可以通过不同的可视化图形来观察数据</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/5c17466d837c4446a6e397f0231326bb_image.png\" alt=\"image.png\" /></p>\n<p>同时,同传统的应用一样,Studio 提供了 Apache Flink Dashboard, 方便查询程序运行时的状况。</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/133e43dc28c54bc9a69496aa1570c9da_image.png\" alt=\"image.png\" /></p>\n<h3><a id=\"_162\"></a><strong>总结:</strong></h3>\n<p>Kinesis Data Analytics Studio 使 Apache Flink 应用程序开发的速度大大加快。此外,所有这些都是通过丰富的可视化、可扩展且用户友好的界面来实现的,并能协作开发,灵活选择语言,使得流式工作负载性能强大。用户可以按照本文所述段落,或者选择将他们升级为具有持久状态的针对 Apache Flink 的 Kinesis Data Analytics 应用程序。</p>\n<h3><a id=\"_165\"></a><strong>参考文档:</strong></h3>\n<p><a href=\"https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html\" target=\"_blank\">https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html</a></p>\n<p><a href=\"https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html\" target=\"_blank\">https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-notebook.html</a></p>\n<h3><a id=\"_170\"></a>本篇作者</h3>\n<p><img src=\"https://dev-media.amazoncloud.cn/e90f62df62f64e7390734e6cfc7cddd4_image.png\" alt=\"image.png\" /></p>\n<p><strong>孙标</strong><br />\n亚马逊云科技资深解决方案架构师。拥有多年金融,移动互联网研发及数字货币交易所架构经验。</p>\n"}