{"value":"Metadata is an integral part of data management and governance. The [AWS Glue](https://aws.amazon.com/glue) Data Catalog can provide a uniform repository to store and share metadata. The main purpose of the Data Catalog is to provide a central metadata store where disparate systems can store, discover, and use that metadata to query and process the data.\n\nAnother important aspect of data governance is serving and managing the relationship between data stores and external clients, which are the producers and consumers of data. As the data evolves, especially in streaming use cases, we need a central framework that provides a contract between producers and consumers to enable schema evolution and improved governance. The [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) provides a centralized framework to help manage and enforce schemas on data streaming applications using convenient integrations with Apache Kafka and [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/) (Amazon MSK), [Amazon Kinesis Data Streams](https://aws.amazon.com/msk/), Apache Flink and [Amazon Kinesis Data Analytics for Apache Flink](https://aws.amazon.com/kinesis/data-analytics/), and [AWS Lambda](https://aws.amazon.com/lambda/).\n\nIn this post, we demonstrate how to integrate Schema Registry with the Data Catalog to enable efficient schema enforcement in streaming analytics use cases.\n\n### **Stream analytics on AWS**\n\nThere are many different scenarios where customers want to run stream analytics on AWS while managing the schema evolution effectively. To manage the end-to-end stream analytics life cycle, there are many different applications involved for data production, processing, analytics, routing, and consumption. It can be quite hard to manage changes across different applications for stream analytics use cases. Adding/removing a data field across different stream analytics applications can lead to data quality issues or downstream application failures if it is not managed appropriately.\n\nFor example, a large grocery store may want to send orders information using Amazon KDS to it’s backend systems. While sending the order information, customer may want to make some data transformations or run analytics on it. The orders may be routed to different targets depending upon the type of orders and it may be integrated with many backend applications which expect order stream data in specific format. But the order details schema can change due to many different reasons such as new business requirements, technical changes, source system upgrades or something else.\n\nThe changes are inevitable but customers want a mechanism to manage these changes effectively while running their stream analytics workloads. To support stream analytics use cases on AWS and enforce schema and governance, customers can make use of [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) Schema Registry along with AWS Stream analytics services.\n\nYou can use [Amazon Kinesis Data Firehose](https://aws.amazon.com/kinesis/data-firehose/) [data transformation](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) to ingest data from Kinesis Data Streams, run a simple data transformation on a batch of records via a Lambda function, and deliver the transformed records to destinations such as [Amazon Simple Storage Service](http://aws.amazon.com/s3) ([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)), [Amazon Redshift](http://aws.amazon.com/redshift), [Amazon OpenSearch Service](https://aws.amazon.com/opensearch-service/), Splunk, Datadog, NewRelic, Dynatrace, Sumologic, LogicMonitor, MongoDB, and an HTTP endpoint. The Lambda function transforms the current batch of records with no information or state from previous batches.\n\nLambda function also has the stream analytics capability for [Amazon Kinesis Data Analytics](https://aws.amazon.com/kinesis/data-analytics/) and [Amazon DynamoDB](https://aws.amazon.com/dynamodb/). This feature enables data aggregation and state management across multiple function invocations. This capability uses a tumbling window, which is a fixed-size, non-overlapping time interval of up to 15 minutes. When you apply a tumbling window to a stream, records in the stream are grouped by window and sent to the processing Lambda function. The function returns a state value that is passed to the next tumbling window.\n\nKinesis Data Analytics provides SQL-based stream analytics against streaming data. This service also enables you to use an Apache Flink application to process stream data. Data can be ingested from Kinesis Data Streams and Kinesis Data Firehose while supporting Kinesis Data Firehose ([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail), [Amazon OpenSearch Service](https://aws.amazon.com/cn/opensearch-service/?trk=cndc-detail), and Splunk), Lambda, and Kinesis Data Streams as destinations.\n\nFinally, you can use the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) [streaming extract, transform, and load](https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html) (ETL) capability as a serverless method to consume data from Kinesis and Apache Kafka or Amazon MSK. The job aggregates, transforms, and enriches the data using Spark streaming, then continuously loads the results into [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)-based data lakes, data warehouses, DynamoDB, JDBC, and more.\n\nManaging stream metadata and schema evolution is becoming more important for stream analytics use cases. To enable these on AWS, the Data Catalog and Schema Registry allow you to centrally control and discover schemas. Before the [release of schema referencing in the Data Catalog](https://aws.amazon.com/about-aws/whats-new/2021/12/aws-glue-streaming-etl-jobs-schema-registry/), you relied on managing schema evolution separately in the Data Catalog and Schema Registry, which usually leads to inconsistencies between these two. With the new release of the Data Catalog and Schema Registry integration, you can now reference schemas stored in the schema registry when creating or updating [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) tables in the Data Catalog. This helps avoid inconsistency between the schema registry and Data Catalog, which results in end-to-end data quality enforcement.\n\nIn this post, we walk you through a streaming ETL example in [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) to better showcase how this integration can help. This example includes reading streaming data from Kinesis Data Streams, schema discovery with Schema Registry, using the Data Catalog to store the metadata, and writing out the results to an [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) as a sink.\n\n### **Solution overview**\n\nThe following high-level architecture diagram shows the components to integrate Schema Registry and the Data Catalog to run streaming ETL jobs. In this architecture, Schema Registry helps centrally track and evolve Kinesis Data Streams schemas.\n\nAt a high level, we use the [Amazon Kinesis Data Generator](https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/) (KDG) to stream data to a Kinesis data stream, use [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) to run streaming ETL, and use [Amazon Athena](http://aws.amazon.com/athena) to query the data.\n\n![image.png](https://dev-media.amazoncloud.cn/dc68ee4bdfd84136b01154bdce38f5f0_image.png)\n\nIn the following sections, we walk you through the steps to build this architecture.\n\n### **Create a Kinesis data stream**\n\nTo set up a Kinesis data stream, complete the following steps:\n\n1.On the Kinesis console, choose **Data streams**.\n2.Choose Create **data stream**.\n3.Give the stream a name, such as ```ventilator_gsr_stream```.\n4.Complete stream creation.\n\n### **Configure Kinesis Data Generator to generate sample data**\n\nYou can use the KDG with the ventilator template available on the [GitHub repo](https://github.com/aws-samples/aws-glue-streaming-etl-blog) to generate sample data. The following diagram shows the template on the KDG console.\n\n![image.png](https://dev-media.amazoncloud.cn/472622f282894627a145f5453d92d94f_image.png)\n\n### **Add a new [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) schema registry**\n\nTo add a new schema registry, complete the following steps:\n\n1.On the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) console, under **Data catalog** in the navigation pane, choose **Schema registries**.\n2.Choose **Add registry**.\n3.For **Registry name**, enter a name (for example, ```MyDemoSchemaReg```).\n4.For **Description**, enter an optional description for the registry.\n5.Choose **Add registry**.\n\n### **Add a schema to the schema registry**\n\nTo add a new schema, complete the following steps:\n\n1.On the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) console, under **Schema registries** in the navigation pane, choose **Schemas**.\n2.Choose **Add schema**.\n3.Provide the schema name (```ventilatorstream_schema_gsr```) and attach the schema to the schema registry defined in the previous step.\n4.[AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) schemas currently support Avro or JSON formats; for this post, select **JSON**.\n5.Use the default **Compatibility mode** and provide the necessary tags as per your tagging strategy.\n\nCompatibility modes allow you to control how schemas can or cannot evolve over time. These modes form the contract between applications producing and consuming data. When a new version of a schema is submitted to the registry, the compatibility rule applied to the schema name is used to determine if the new version can be accepted. For more information on different compatibility modes, refer to [Schema Versioning and Compatibility](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-compatibility).\n\n6.Enter the following sample JSON:\n\n```\\n{\\n \\"\$id\\": \\"https://example.com/person.schema.json\\",\\n \\"\$schema\\": \\"http://json-schema.org/draft-07/schema#\\",\\n \\"title\\": \\"Ventilator\\",\\n \\"type\\": \\"object\\",\\n \\"properties\\": {\\n \\"ventilatorid\\": {\\n \\"type\\": \\"integer\\",\\n \\"description\\": \\"Ventilator ID\\"\\n },\\n \\"eventtime\\": {\\n \\"type\\": \\"string\\",\\n \\"description\\": \\"Time of the event.\\"\\n },\\n \\"serialnumber\\": {\\n \\"description\\": \\"Serial number of the device.\\",\\n \\"type\\": \\"string\\",\\n \\"minimum\\": 0\\n },\\n \\"pressurecontrol\\": {\\n \\"description\\": \\"Pressure control of the device.\\",\\n \\"type\\": \\"integer\\",\\n \\"minimum\\": 0\\n },\\n \\"o2stats\\": {\\n \\"description\\": \\"O2 status.\\",\\n \\"type\\": \\"integer\\",\\n \\"minimum\\": 0\\n },\\n \\"minutevolume\\": {\\n \\"description\\": \\"Volume.\\",\\n \\"type\\": \\"integer\\",\\n \\"minimum\\": 0\\n },\\n \\"manufacturer\\": {\\n \\"description\\": \\"Volume.\\",\\n \\"type\\": \\"string\\",\\n \\"minimum\\": 0\\n }\\n }\\n}\\n```\n\n7.Choose **Create schema and version**.\n\n### **Create a new Data Catalog table**\n\nTo add a new table in the Data Catalog, complete the following steps:\n\n1.On the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) Console, under **Data Catalog** in the navigation pane, choose **Tables**.\n2.Choose **Add table**.\n3.Select **Add tables from existing schema**.\n4.Enter the table name and choose the database.\n5.Select the source type as **Kinesis** and choose a data stream in your own account.\n6.Choose the respective Region and choose the stream ```ventilator_gsr_stream```.\n7.Choose the ```MyDemoSchemaReg``` registry created earlier and the schema (```ventilatorstream_schema_gsr```) with its respective version.\n\nYou should be able to preview the schema.\n\n8.Choose Next and then choose Finish to create your table.\n\n![image.png](https://dev-media.amazoncloud.cn/16344de6e97842108816a998eed3b2ac_image.png)\n\n### **Create the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job**\n\nTo create your [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job, complete the following steps:\n\n1.On the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) Studio console, choose **Jobs** in the navigation pane.\n2.Select **Visual with a source and target**.\n3.Under **Source**, select **[Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail)** and under **Target**, select **[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)**.\n4.Choose Create.\n5.Choose Data source.\n6.Configure the job properties such as name, [AWS Identity and Access Management](http://aws.amazon.com/iam) (IAM) role, type, and AWS version.\n\nFor the IAM role, specify a role that is used for authorization to resources used to run the job and access data stores. Because streaming jobs require connecting to sources and sinks, you need to make sure that the IAM role has permissions to read from Kinesis Data Streams and write to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail).\n\n7.For **This job runs**, select **A new script authored by you**.\n8.Under **Advanced properties**, keep **Job bookmark disabled**.\n9.For **Log Filtering**, select **Standard filter** and **Spark UI**.\n10.Under **Monitoring options**, enable **Job metrics** and Continuous logging with **Standard filter**.\n11.Enable the Spark UI and provide the S3 bucket path to store the Spark event logs.\n12.For **Job parameters**, enter the following key-values:\n- **–output_path** – The S3 path where the final aggregations are persisted\n- **–aws_region** – The Region where you run the job\n13.Leave **Connections** empty and choose **Save job and edit script**.\n14.Use the following code for the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job (update the values for ```database```, ```table_name```, and ```checkpointLocation```):\n\n```\\nimport sys\\nimport datetime\\nimport boto3\\nimport base64\\nfrom pyspark.sql import DataFrame, Row\\nfrom pyspark.context import SparkContext\\nfrom pyspark.sql.types import *\\nfrom pyspark.sql.functions import *\\nfrom awsglue.transforms import *\\nfrom awsglue.utils import getResolvedOptions\\nfrom awsglue.context import GlueContext\\nfrom awsglue.job import Job\\nfrom awsglue import DynamicFrame\\n\\nargs = getResolvedOptions(sys.argv, \\\\\\n['JOB_NAME', \\\\\\n'aws_region', \\\\\\n'output_path'])\\n\\nsc = SparkContext()\\nglueContext = GlueContext(sc)\\nspark = glueContext.spark_session\\njob = Job(glueContext)\\njob.init(args['JOB_NAME'], args)\\n\\n# S3 sink locations\\naws_region = args['aws_region']\\noutput_path = args['output_path']\\n\\ns3_target = output_path + \\"ventilator_metrics\\"\\ncheckpoint_location = output_path + \\"cp/\\"\\ntemp_path = output_path + \\"temp/\\"\\n\\n\\ndef processBatch(data_frame, batchId):\\nnow = datetime.datetime.now()\\nyear = now.year\\nmonth = now.month\\nday = now.day\\nhour = now.hour\\nminute = now.minute\\nif (data_frame.count() > 0):\\ndynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, \\"from_data_frame\\")\\napply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \\\\\\n(\\"ventilatorid\\", \\"long\\", \\"ventilatorid\\", \\"long\\"), \\\\\\n(\\"eventtime\\", \\"string\\", \\"eventtime\\", \\"timestamp\\"), \\\\\\n(\\"serialnumber\\", \\"string\\", \\"serialnumber\\", \\"string\\"), \\\\\\n(\\"pressurecontrol\\", \\"long\\", \\"pressurecontrol\\", \\"long\\"), \\\\\\n(\\"o2stats\\", \\"long\\", \\"o2stats\\", \\"long\\"), \\\\\\n(\\"minutevolume\\", \\"long\\", \\"minutevolume\\", \\"long\\"), \\\\\\n(\\"manufacturer\\", \\"string\\", \\"manufacturer\\", \\"string\\")],\\\\\\ntransformation_ctx = \\"apply_mapping\\")\\n\\ndynamic_frame.printSchema()\\n\\n# Write to S3 Sink\\ns3path = s3_target + \\"/ingest_year=\\" + \\"{:0>4}\\".format(str(year)) + \\"/ingest_month=\\" + \\"{:0>2}\\".format(str(month)) + \\"/ingest_day=\\" + \\"{:0>2}\\".format(str(day)) + \\"/ingest_hour=\\" + \\"{:0>2}\\".format(str(hour)) + \\"/\\"\\ns3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = \\"s3\\", connection_options = {\\"path\\": s3path}, format = \\"parquet\\", transformation_ctx = \\"s3sink\\")\\n\\n# Read from Kinesis Data Stream\\nsourceData = glueContext.create_data_frame.from_catalog( \\\\\\ndatabase = \\"kinesislab\\", \\\\\\ntable_name = \\"ventilator_gsr_new\\", \\\\\\ntransformation_ctx = \\"datasource0\\", \\\\\\nadditional_options = {\\"startingPosition\\": \\"TRIM_HORIZON\\", \\"inferSchema\\": \\"true\\"})\\n\\nsourceData.printSchema()\\n\\nglueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {\\"windowSize\\": \\"100 seconds\\", \\"checkpointLocation\\": \\"s3://<bucket name>/ventilator_gsr/checkpoint/\\"})\\njob.commit()\\n```\nOur [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job is ready to read the data from the Kinesis data stream and send it to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) in Parquet format.\n\n### **Query the data using Athena**\n\nThe processed streaming data is written in Parquet format to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). Run an [ AWS Glue crawler ](https://docs.aws.amazon.com/glue/latest/dg/console-crawlers.html)on the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) location where the streaming data is written; the crawler updates the Data Catalog. You can then run queries using Athena to start driving relevant insights from the data.\n\n![image.png](https://dev-media.amazoncloud.cn/75e75fe220bd4bf5aa10d5b9379db55d_image.png)\n\n### **Clean up**\n\nIt’s always a good practice to clean up all the resources created as part of this post to avoid any undue cost. To clean up your resources, delete the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) database, tables, crawlers, jobs, service role, and S3 buckets.\n\nAdditionally, be sure to clean up all other AWS resources that you created using [AWS CloudFormation](http://aws.amazon.com/cloudformation). You can delete these resources on the [AWS CloudFormation](https://aws.amazon.com/cn/cloudformation/?trk=cndc-detail) console by deleting the stack used for the Kinesis Data Generator.\n\n### **Conclusion**\n\nThis post demonstrated the importance of centrally managing metadata and schema evolution in stream analytics use cases. It also described how the integration of the Data Catalog and Schema Registry can help you achieve this on AWS. We used a streaming ETL example in [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) to better showcase how this integration can help to enforce end-to-end data quality.\n\nTo learn more and get started, you can check out [AWS Glue Data Catalog](https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html) and [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html).\n\n#### **About the Authors**\n\n![image.png](https://dev-media.amazoncloud.cn/02461eaafb804cf381b5fca1d007a2fc_image.png)\n\n**Dr. Sam Mokhtari** is a Senior Solutions Architect at AWS. His main area of depth is data and analytics, and he has published more than 30 influential articles in this field. He is also a respected data and analytics advisor, and has led several large-scale implementation projects across different industries, including energy, health, telecom, and transport.\n\n![image.png](https://dev-media.amazoncloud.cn/51482cc6d720455ebf0910c66ba10d35_image.png)\n\n**Amar Surjit** is a Sr. Solutions Architect based in the UK who has been working in IT for over 20 years designing and implementing global solutions for enterprise customers. He is passionate about streaming technologies and enjoys working with customers globally to design and build streaming architectures and drive value by analyzing their streaming data.\n\n\n\n\n","render":"<p>Metadata is an integral part of data management and governance. The <a href=\\"https://aws.amazon.com/glue\\" target=\\"_blank\\">AWS Glue</a> Data Catalog can provide a uniform repository to store and share metadata. The main purpose of the Data Catalog is to provide a central metadata store where disparate systems can store, discover, and use that metadata to query and process the data.</p>\\n<p>Another important aspect of data governance is serving and managing the relationship between data stores and external clients, which are the producers and consumers of data. As the data evolves, especially in streaming use cases, we need a central framework that provides a contract between producers and consumers to enable schema evolution and improved governance. The <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html\\" target=\\"_blank\\">AWS Glue Schema Registry</a> provides a centralized framework to help manage and enforce schemas on data streaming applications using convenient integrations with Apache Kafka and <a href=\\"https://aws.amazon.com/msk/\\" target=\\"_blank\\">Amazon Managed Streaming for Apache Kafka</a> (Amazon MSK), <a href=\\"https://aws.amazon.com/msk/\\" target=\\"_blank\\">Amazon Kinesis Data Streams</a>, Apache Flink and <a href=\\"https://aws.amazon.com/kinesis/data-analytics/\\" target=\\"_blank\\">Amazon Kinesis Data Analytics for Apache Flink</a>, and <a href=\\"https://aws.amazon.com/lambda/\\" target=\\"_blank\\">AWS Lambda</a>.</p>\\n<p>In this post, we demonstrate how to integrate Schema Registry with the Data Catalog to enable efficient schema enforcement in streaming analytics use cases.</p>\n<h3><a id=\\"Stream_analytics_on_AWS_6\\"></a><strong>Stream analytics on AWS</strong></h3>\\n<p>There are many different scenarios where customers want to run stream analytics on AWS while managing the schema evolution effectively. To manage the end-to-end stream analytics life cycle, there are many different applications involved for data production, processing, analytics, routing, and consumption. It can be quite hard to manage changes across different applications for stream analytics use cases. Adding/removing a data field across different stream analytics applications can lead to data quality issues or downstream application failures if it is not managed appropriately.</p>\n<p>For example, a large grocery store may want to send orders information using Amazon KDS to it’s backend systems. While sending the order information, customer may want to make some data transformations or run analytics on it. The orders may be routed to different targets depending upon the type of orders and it may be integrated with many backend applications which expect order stream data in specific format. But the order details schema can change due to many different reasons such as new business requirements, technical changes, source system upgrades or something else.</p>\n<p>The changes are inevitable but customers want a mechanism to manage these changes effectively while running their stream analytics workloads. To support stream analytics use cases on AWS and enforce schema and governance, customers can make use of AWS Glue Schema Registry along with AWS Stream analytics services.</p>\n<p>You can use <a href=\\"https://aws.amazon.com/kinesis/data-firehose/\\" target=\\"_blank\\">Amazon Kinesis Data Firehose</a> <a href=\\"https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html\\" target=\\"_blank\\">data transformation</a> to ingest data from Kinesis Data Streams, run a simple data transformation on a batch of records via a Lambda function, and deliver the transformed records to destinations such as <a href=\\"http://aws.amazon.com/s3\\" target=\\"_blank\\">Amazon Simple Storage Service</a> ([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)), <a href=\\"http://aws.amazon.com/redshift\\" target=\\"_blank\\">Amazon Redshift</a>, <a href=\\"https://aws.amazon.com/opensearch-service/\\" target=\\"_blank\\">Amazon OpenSearch Service</a>, Splunk, Datadog, NewRelic, Dynatrace, Sumologic, LogicMonitor, MongoDB, and an HTTP endpoint. The Lambda function transforms the current batch of records with no information or state from previous batches.</p>\\n<p>Lambda function also has the stream analytics capability for <a href=\\"https://aws.amazon.com/kinesis/data-analytics/\\" target=\\"_blank\\">Amazon Kinesis Data Analytics</a> and <a href=\\"https://aws.amazon.com/dynamodb/\\" target=\\"_blank\\">Amazon DynamoDB</a>. This feature enables data aggregation and state management across multiple function invocations. This capability uses a tumbling window, which is a fixed-size, non-overlapping time interval of up to 15 minutes. When you apply a tumbling window to a stream, records in the stream are grouped by window and sent to the processing Lambda function. The function returns a state value that is passed to the next tumbling window.</p>\\n<p>Kinesis Data Analytics provides SQL-based stream analytics against streaming data. This service also enables you to use an Apache Flink application to process stream data. Data can be ingested from Kinesis Data Streams and Kinesis Data Firehose while supporting Kinesis Data Firehose (Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk), Lambda, and Kinesis Data Streams as destinations.</p>\n<p>Finally, you can use the AWS Glue <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html\\" target=\\"_blank\\">streaming extract, transform, and load</a> (ETL) capability as a serverless method to consume data from Kinesis and Apache Kafka or Amazon MSK. The job aggregates, transforms, and enriches the data using Spark streaming, then continuously loads the results into [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)-based data lakes, data warehouses, DynamoDB, JDBC, and more.</p>\\n<p>Managing stream metadata and schema evolution is becoming more important for stream analytics use cases. To enable these on AWS, the Data Catalog and Schema Registry allow you to centrally control and discover schemas. Before the <a href=\\"https://aws.amazon.com/about-aws/whats-new/2021/12/aws-glue-streaming-etl-jobs-schema-registry/\\" target=\\"_blank\\">release of schema referencing in the Data Catalog</a>, you relied on managing schema evolution separately in the Data Catalog and Schema Registry, which usually leads to inconsistencies between these two. With the new release of the Data Catalog and Schema Registry integration, you can now reference schemas stored in the schema registry when creating or updating [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) tables in the Data Catalog. This helps avoid inconsistency between the schema registry and Data Catalog, which results in end-to-end data quality enforcement.</p>\\n<p>In this post, we walk you through a streaming ETL example in AWS Glue to better showcase how this integration can help. This example includes reading streaming data from Kinesis Data Streams, schema discovery with Schema Registry, using the Data Catalog to store the metadata, and writing out the results to an Amazon S3 as a sink.</p>\n<h3><a id=\\"Solution_overview_26\\"></a><strong>Solution overview</strong></h3>\\n<p>The following high-level architecture diagram shows the components to integrate Schema Registry and the Data Catalog to run streaming ETL jobs. In this architecture, Schema Registry helps centrally track and evolve Kinesis Data Streams schemas.</p>\n<p>At a high level, we use the <a href=\\"https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/\\" target=\\"_blank\\">Amazon Kinesis Data Generator</a> (KDG) to stream data to a Kinesis data stream, use [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) to run streaming ETL, and use <a href=\\"http://aws.amazon.com/athena\\" target=\\"_blank\\">Amazon Athena</a> to query the data.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/dc68ee4bdfd84136b01154bdce38f5f0_image.png\\" alt=\\"image.png\\" /></p>\n<p>In the following sections, we walk you through the steps to build this architecture.</p>\n<h3><a id=\\"Create_a_Kinesis_data_stream_36\\"></a><strong>Create a Kinesis data stream</strong></h3>\\n<p>To set up a Kinesis data stream, complete the following steps:</p>\n<p>1.On the Kinesis console, choose <strong>Data streams</strong>.<br />\\n2.Choose Create <strong>data stream</strong>.<br />\\n3.Give the stream a name, such as <code>ventilator_gsr_stream</code>.<br />\\n4.Complete stream creation.</p>\n<h3><a id=\\"Configure_Kinesis_Data_Generator_to_generate_sample_data_45\\"></a><strong>Configure Kinesis Data Generator to generate sample data</strong></h3>\\n<p>You can use the KDG with the ventilator template available on the <a href=\\"https://github.com/aws-samples/aws-glue-streaming-etl-blog\\" target=\\"_blank\\">GitHub repo</a> to generate sample data. The following diagram shows the template on the KDG console.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/472622f282894627a145f5453d92d94f_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"Add_a_new_AWS_Glue_schema_registry_51\\"></a><strong>Add a new AWS Glue schema registry</strong></h3>\\n<p>To add a new schema registry, complete the following steps:</p>\n<p>1.On the AWS Glue console, under <strong>Data catalog</strong> in the navigation pane, choose <strong>Schema registries</strong>.<br />\\n2.Choose <strong>Add registry</strong>.<br />\\n3.For <strong>Registry name</strong>, enter a name (for example, <code>MyDemoSchemaReg</code>).<br />\\n4.For <strong>Description</strong>, enter an optional description for the registry.<br />\\n5.Choose <strong>Add registry</strong>.</p>\\n<h3><a id=\\"Add_a_schema_to_the_schema_registry_61\\"></a><strong>Add a schema to the schema registry</strong></h3>\\n<p>To add a new schema, complete the following steps:</p>\n<p>1.On the AWS Glue console, under <strong>Schema registries</strong> in the navigation pane, choose <strong>Schemas</strong>.<br />\\n2.Choose <strong>Add schema</strong>.<br />\\n3.Provide the schema name (<code>ventilatorstream_schema_gsr</code>) and attach the schema to the schema registry defined in the previous step.<br />\\n4.AWS Glue schemas currently support Avro or JSON formats; for this post, select <strong>JSON</strong>.<br />\\n5.Use the default <strong>Compatibility mode</strong> and provide the necessary tags as per your tagging strategy.</p>\\n<p>Compatibility modes allow you to control how schemas can or cannot evolve over time. These modes form the contract between applications producing and consuming data. When a new version of a schema is submitted to the registry, the compatibility rule applied to the schema name is used to determine if the new version can be accepted. For more information on different compatibility modes, refer to <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-compatibility\\" target=\\"_blank\\">Schema Versioning and Compatibility</a>.</p>\\n<p>6.Enter the following sample JSON:</p>\n<pre><code class=\\"lang-\\">{\\n "\$id": "https://example.com/person.schema.json",\\n "\$schema": "http://json-schema.org/draft-07/schema#",\\n "title": "Ventilator",\\n "type": "object",\\n "properties": {\\n "ventilatorid": {\\n "type": "integer",\\n "description": "Ventilator ID"\\n },\\n "eventtime": {\\n "type": "string",\\n "description": "Time of the event."\\n },\\n "serialnumber": {\\n "description": "Serial number of the device.",\\n "type": "string",\\n "minimum": 0\\n },\\n "pressurecontrol": {\\n "description": "Pressure control of the device.",\\n "type": "integer",\\n "minimum": 0\\n },\\n "o2stats": {\\n "description": "O2 status.",\\n "type": "integer",\\n "minimum": 0\\n },\\n "minutevolume": {\\n "description": "Volume.",\\n "type": "integer",\\n "minimum": 0\\n },\\n "manufacturer": {\\n "description": "Volume.",\\n "type": "string",\\n "minimum": 0\\n }\\n }\\n}\\n</code></pre>\\n<p>7.Choose <strong>Create schema and version</strong>.</p>\\n<h3><a id=\\"Create_a_new_Data_Catalog_table_121\\"></a><strong>Create a new Data Catalog table</strong></h3>\\n<p>To add a new table in the Data Catalog, complete the following steps:</p>\n<p>1.On the AWS Glue Console, under <strong>Data Catalog</strong> in the navigation pane, choose <strong>Tables</strong>.<br />\\n2.Choose <strong>Add table</strong>.<br />\\n3.Select <strong>Add tables from existing schema</strong>.<br />\\n4.Enter the table name and choose the database.<br />\\n5.Select the source type as <strong>Kinesis</strong> and choose a data stream in your own account.<br />\\n6.Choose the respective Region and choose the stream <code>ventilator_gsr_stream</code>.<br />\\n7.Choose the <code>MyDemoSchemaReg</code> registry created earlier and the schema (<code>ventilatorstream_schema_gsr</code>) with its respective version.</p>\\n<p>You should be able to preview the schema.</p>\n<p>8.Choose Next and then choose Finish to create your table.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/16344de6e97842108816a998eed3b2ac_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"Create_the_AWS_Glue_job_139\\"></a><strong>Create the AWS Glue job</strong></h3>\\n<p>To create your AWS Glue job, complete the following steps:</p>\n<p>1.On the AWS Glue Studio console, choose <strong>Jobs</strong> in the navigation pane.<br />\\n2.Select <strong>Visual with a source and target</strong>.<br />\\n3.Under <strong>Source</strong>, select <strong>Amazon Kinesis</strong> and under <strong>Target</strong>, select <strong>Amazon S3</strong>.<br />\\n4.Choose Create.<br />\\n5.Choose Data source.<br />\\n6.Configure the job properties such as name, <a href=\\"http://aws.amazon.com/iam\\" target=\\"_blank\\">AWS Identity and Access Management</a> (IAM) role, type, and AWS version.</p>\\n<p>For the IAM role, specify a role that is used for authorization to resources used to run the job and access data stores. Because streaming jobs require connecting to sources and sinks, you need to make sure that the IAM role has permissions to read from Kinesis Data Streams and write to Amazon S3.</p>\n<p>7.For <strong>This job runs</strong>, select <strong>A new script authored by you</strong>.<br />\\n8.Under <strong>Advanced properties</strong>, keep <strong>Job bookmark disabled</strong>.<br />\\n9.For <strong>Log Filtering</strong>, select <strong>Standard filter</strong> and <strong>Spark UI</strong>.<br />\\n10.Under <strong>Monitoring options</strong>, enable <strong>Job metrics</strong> and Continuous logging with <strong>Standard filter</strong>.<br />\\n11.Enable the Spark UI and provide the S3 bucket path to store the Spark event logs.<br />\\n12.For <strong>Job parameters</strong>, enter the following key-values:</p>\\n<ul>\\n<li><strong>–output_path</strong> – The S3 path where the final aggregations are persisted</li>\\n<li><strong>–aws_region</strong> – The Region where you run the job<br />\\n13.Leave <strong>Connections</strong> empty and choose <strong>Save job and edit script</strong>.<br />\\n14.Use the following code for the AWS Glue job (update the values for <code>database</code>, <code>table_name</code>, and <code>checkpointLocation</code>):</li>\\n</ul>\n<pre><code class=\\"lang-\\">import sys\\nimport datetime\\nimport boto3\\nimport base64\\nfrom pyspark.sql import DataFrame, Row\\nfrom pyspark.context import SparkContext\\nfrom pyspark.sql.types import *\\nfrom pyspark.sql.functions import *\\nfrom awsglue.transforms import *\\nfrom awsglue.utils import getResolvedOptions\\nfrom awsglue.context import GlueContext\\nfrom awsglue.job import Job\\nfrom awsglue import DynamicFrame\\n\\nargs = getResolvedOptions(sys.argv, \\\\\\n['JOB_NAME', \\\\\\n'aws_region', \\\\\\n'output_path'])\\n\\nsc = SparkContext()\\nglueContext = GlueContext(sc)\\nspark = glueContext.spark_session\\njob = Job(glueContext)\\njob.init(args['JOB_NAME'], args)\\n\\n# S3 sink locations\\naws_region = args['aws_region']\\noutput_path = args['output_path']\\n\\ns3_target = output_path + "ventilator_metrics"\\ncheckpoint_location = output_path + "cp/"\\ntemp_path = output_path + "temp/"\\n\\n\\ndef processBatch(data_frame, batchId):\\nnow = datetime.datetime.now()\\nyear = now.year\\nmonth = now.month\\nday = now.day\\nhour = now.hour\\nminute = now.minute\\nif (data_frame.count() > 0):\\ndynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")\\napply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \\\\\\n("ventilatorid", "long", "ventilatorid", "long"), \\\\\\n("eventtime", "string", "eventtime", "timestamp"), \\\\\\n("serialnumber", "string", "serialnumber", "string"), \\\\\\n("pressurecontrol", "long", "pressurecontrol", "long"), \\\\\\n("o2stats", "long", "o2stats", "long"), \\\\\\n("minutevolume", "long", "minutevolume", "long"), \\\\\\n("manufacturer", "string", "manufacturer", "string")],\\\\\\ntransformation_ctx = "apply_mapping")\\n\\ndynamic_frame.printSchema()\\n\\n# Write to S3 Sink\\ns3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"\\ns3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")\\n\\n# Read from Kinesis Data Stream\\nsourceData = glueContext.create_data_frame.from_catalog( \\\\\\ndatabase = "kinesislab", \\\\\\ntable_name = "ventilator_gsr_new", \\\\\\ntransformation_ctx = "datasource0", \\\\\\nadditional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})\\n\\nsourceData.printSchema()\\n\\nglueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpointLocation": "s3://<bucket name>/ventilator_gsr/checkpoint/"})\\njob.commit()\\n</code></pre>\\n<p>Our AWS Glue job is ready to read the data from the Kinesis data stream and send it to Amazon S3 in Parquet format.</p>\n<h3><a id=\\"Query_the_data_using_Athena_237\\"></a><strong>Query the data using Athena</strong></h3>\\n<p>The processed streaming data is written in Parquet format to Amazon S3. Run an <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/console-crawlers.html\\" target=\\"_blank\\"> AWS Glue crawler </a>on the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) location where the streaming data is written; the crawler updates the Data Catalog. You can then run queries using Athena to start driving relevant insights from the data.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/75e75fe220bd4bf5aa10d5b9379db55d_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"Clean_up_243\\"></a><strong>Clean up</strong></h3>\\n<p>It’s always a good practice to clean up all the resources created as part of this post to avoid any undue cost. To clean up your resources, delete the AWS Glue database, tables, crawlers, jobs, service role, and S3 buckets.</p>\n<p>Additionally, be sure to clean up all other AWS resources that you created using <a href=\\"http://aws.amazon.com/cloudformation\\" target=\\"_blank\\">AWS CloudFormation</a>. You can delete these resources on the [AWS CloudFormation](https://aws.amazon.com/cn/cloudformation/?trk=cndc-detail) console by deleting the stack used for the Kinesis Data Generator.</p>\\n<h3><a id=\\"Conclusion_249\\"></a><strong>Conclusion</strong></h3>\\n<p>This post demonstrated the importance of centrally managing metadata and schema evolution in stream analytics use cases. It also described how the integration of the Data Catalog and Schema Registry can help you achieve this on AWS. We used a streaming ETL example in AWS Glue to better showcase how this integration can help to enforce end-to-end data quality.</p>\n<p>To learn more and get started, you can check out <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html\\" target=\\"_blank\\">AWS Glue Data Catalog</a> and <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html\\" target=\\"_blank\\">AWS Glue Schema Registry</a>.</p>\\n<h4><a id=\\"About_the_Authors_255\\"></a><strong>About the Authors</strong></h4>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/02461eaafb804cf381b5fca1d007a2fc_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>Dr. Sam Mokhtari</strong> is a Senior Solutions Architect at AWS. His main area of depth is data and analytics, and he has published more than 30 influential articles in this field. He is also a respected data and analytics advisor, and has led several large-scale implementation projects across different industries, including energy, health, telecom, and transport.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/51482cc6d720455ebf0910c66ba10d35_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>Amar Surjit</strong> is a Sr. Solutions Architect based in the UK who has been working in IT for over 20 years designing and implementing global solutions for enterprise customers. He is passionate about streaming technologies and enjoys working with customers globally to design and build streaming architectures and drive value by analyzing their streaming data.</p>\n"}