Implement a CDC-based UPSERT in a data lake using Apache Iceberg and Amazon Glue

海外精选
海外精选的内容汇集了全球优质的亚马逊云科技相关技术内容。同时,内容中提到的“AWS” 是 “Amazon Web Services” 的缩写,在此网站不作为商标展示。
0
0
{"value":"As the implementation of data lakes and modern data architecture increases, customers’ expectations around its features also increase, which include ACID transaction, UPSERT, time travel, schema evolution, auto compaction, and many more. By default, [Amazon Simple Storage Service](https://aws.amazon.com/cn/s3/?trk=cndc-detail) ([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)) objects are immutable, which means you can’t update records in your data lake because it supports append-only transactions. But there are use cases where you might be receiving incremental updates with change data capture (CDC) from your source systems, and you might need to update existing data in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) to have a golden copy. Previously, you had to overwrite the complete S3 object or folders, but with the evolution of frameworks such as [Apache Hudi](https://hudi.apache.org/), [Apache Iceberg](https://iceberg.apache.org/), [Delta Lake](https://delta.io/), and [governed tables](https://docs.aws.amazon.com/lake-formation/latest/dg/governed-tables.html) in [AWS Lake Formation](https://aws.amazon.com/lake-formation/), you can get database-like UPSERT features in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail).\n\nApache Hudi integration is already supported with AWS analytics services, and recently [AWS Glue](https://aws.amazon.com/glue/), [Amazon EMR](https://aws.amazon.com/emr/), and [Amazon Athena](https://aws.amazon.com/athena/) announced support for Apache Iceberg. Apache Iceberg is an open table format originally developed at Netflix, which got open-sourced as an Apache project in 2018 and graduated from incubator mid-2020. It’s designed to support ACID transactions and UPSERT on petabyte-scale data lakes, and is getting popular because of its flexible SQL syntax for CDC-based MERGE, full schema evolution, and hidden partitioning features.\n\nIn this post, we walk you through a solution to implement CDC-based UPSERT or MERGE in an S3 data lake using Apache Iceberg and [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail).\n\n#### **Configure Apache Iceberg with [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail)**\nYou can integrate Apache Iceberg JARs into [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) through its [AWS Marketplace connector](https://aws.amazon.com/marketplace/pp/prodview-iicxofvpqvsio). The connector supports [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) versions 1.0, 2.0, and 3.0, and is free to use. Configuring this connector is as easy as clicking few buttons on the user interface.\n\nThe following steps guide you through the setup process:\n1. Navigate to the [AWS Marketplace connector page](https://aws.amazon.com/marketplace/pp/prodview-iicxofvpqvsio).\n2. Choose **Continue to Subscribe** and then **Accept Terms**.\n\n![image.png](https://dev-media.amazoncloud.cn/69eae38ae4cf405b947ef08b919b2260_image.png)\n\n3.Choose **Continue to Configuration**.\n\n![image.png](https://dev-media.amazoncloud.cn/67bc3606f6c64311a4611dcb5d82442b_image.png)\n\n4.Choose the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) version and software version.\n5.Choose **Continue to Launch**.\n\n![image.png](https://dev-media.amazoncloud.cn/32147c55446c459b86b8c1d19490c667_image.png)\n\n6.Choose **Usage Instruction**, which opens a page that has a link to activate the connector.\n\n![image.png](https://dev-media.amazoncloud.cn/8049d33016e2434c8f0ea1f455d8b374_image.png)\n\n7.Create a connection by providing a name and choosing **Create connection and activate connector**.\n\n![image.png](https://dev-media.amazoncloud.cn/d68c097cd56d4186ba99e0810872dce6_image.png)\n\nYou can confirm your new connection on the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) Studio **Connectors** page.\n\n![image.png](https://dev-media.amazoncloud.cn/dc14f6c4be284f43931ba6a7ac6de12b_image.png)\n\nTo use this connector, when you create an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job, make sure you add this connector to your job. Later in the implementation steps, when you create an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job, we show how to use the connector you just configured.\n\n![image.png](https://dev-media.amazoncloud.cn/b9ef272a6f5448d69985dd28d0da1742_image.png)\n\n#### **Solution overview**\nLet’s assume you have a relational database that has product inventory data, and you want to move it into an S3 data lake on a continuous basis, so that your downstream applications or consumers can use it for analytics. After your initial data movement to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), you’re supposed to receive incremental updates from the source database as CSV files using AWS DMS or equivalent tools, where each record has an additional column to represent an insert, update, or delete operation. While processing the incremental CDC data, one of the primary requirements you have is merging the CDC data in the data lake and providing the capability to query previous versions of the data.\n\nTo solve this use case, we present the following simple architecture that integrates [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) for the data lake, [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) with the Apache Iceberg connector for ETL (extract, transform, and load), and Athena for querying the data using standard SQL. Athena helps in querying the latest product inventory data from the Iceberg table’s latest snapshot, and Iceberg’s time travel feature helps in identifying a product’s price at any previous date.\n\nThe following diagram illustrates the solution architecture.\n\n![image.png](https://dev-media.amazoncloud.cn/de7d3b70de9847ae9b991739a51102f8_image.png)\n\nThe solution workflow consists of the following steps:\n- Data ingestion:\n\t- Steps 1.1 and 1.2 use [AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/dms/), which connects to the source database and moves incremental data (CDC) to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) in CSV format.\n\t- Steps 1.3 and 1.4 consist of the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) PySpark job, which reads incremental data from the S3 input bucket, performs deduplication of the records, and then invokes Apache Iceberg’s MERGE statements to merge the data with the target UPSERT S3 bucket.\n- Data access:\n\t- Steps 2.1 and 2.2 represent Athena integration to query data from the Iceberg table using standard SQL and validate the time travel feature of Iceberg.\n- Data Catalog:\n\t- The [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) Data Catalog is treated as a centralized catalog, which is used by [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) and Athena. An [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) crawler is integrated on top of S3 buckets to automatically detect the schema.\n\nWe have referenced AWS DMS as part of the architecture, but while showcasing the solution steps, we assume that the AWS DMS output is already available in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), and focus on processing the data using [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) and Apache Iceberg.\n\nTo demo the implementation steps, we use sample product inventory data that has the following attributes:\n- **op**– Represents the operation on the source record. This shows values ```I``` to represent insert operations, ```U``` to represent updates, and ```D``` to represent deletes. You need to make sure this attribute is included in your CDC incremental data before it gets written to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). AWS DMS enables you to include this attribute, but if you’re using other mechanisms to move data, make sure you capture this attribute, so that your ETL logic can take appropriate action while merging it.\n- **product_id** – This is the primary key column in the source database’s products table.\n- **category** – This column represents the product’s category, such as ```Electronics``` or ```Cosmetics```.\n- **product_name** – This is the name of the product.\n- **quantity_available**– This is the quantity available in the inventory for a product. When we showcase the incremental data for UPSERT or MERGE, we reduce the quantity available for the product to showcase the functionality.\n- **last_update_time** – This is the time when the product record was updated at the source database.\n\nIf you’re using AWS DMS to move data from your relational database to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), then by default AWS DMS includes the ```op``` attribute for incremental CDC data, but it’s not included by default for the initial load. If you’re using CSV as your target file format, you can include ```IncludeOpForFullLoad``` as ```true``` in your S3 target endpoint setting of AWS DMS to have the ```op``` attribute included in your initial full load file. To learn more about the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) settings in AWS DMS, refer to [S3Settings](https://docs.aws.amazon.com/dms/latest/APIReference/API_S3Settings.html).\n\n#### **Prerequisites**\nBefore getting started on the implementation, make sure you have the required permissions to perform the following in your AWS account:\n- Create [AWS Identity and Access Management](http://aws.amazon.com/iam) (IAM) roles as needed\n- Read or write to an S3 bucket\n- Create and run [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) crawlers and jobs\n- Manage a database, table, and workgroups, and run queries in Athena\n\nTo implement the solution, we create AWS resources such as an S3 bucket and an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job, and integrate the Iceberg code for processing. Before we run the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job, we have to upload the sample CSV files to the input bucket and process it with [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) PySpark code for the output.\n\nFor this post, we use the ```us-east-1``` Region, but you can integrate it in your preferred Region if the AWS services included in the architecture are available in that Region.\n\n#### **Create an S3 bucket for input and output**\nTo create an S3 bucket, complete the following steps:\nNow let’s dive into the implementation steps.\n1. On the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) console, choose **Buckets** in the navigation pane.\n2. Choose**Create bucket**.\n3. Specify the bucket name as ```glue-iceberg-demo```, and leave the remaining fields as default. S3 bucket names are globally unique. While implementing the solution, you may get an error saying the bucket name already exists. Make sure to provide a unique name and use the same name while implementing the rest of the implementation steps. Formatting the bucket name as ```<Bucket-Name>-\${AWS_ACCOUNT_ID}-\${AWS_REGION_CODE}``` might help you get a unique name.\n5. Choose **Create bucket**.\n6. On the bucket details page, choose **Create folder**.\n7. Create two subfolders: ```raw-csv-input``` and ```iceberg-output```.\n\n![image.png](https://dev-media.amazoncloud.cn/72d71cde2f944395892c7e03bc6e57f4_image.png)\n\n7.Upload the [LOAD00000001.csv](https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-2164/LOAD00000001.csv) file into the ```raw-csv-input``` folder of the bucket.\n\n![image.png](https://dev-media.amazoncloud.cn/8a728c6899834e82b6f9b426f57c3e35_image.png)\n\nThe following screenshot provides a sample of the input dataset.\n\n![image.png](https://dev-media.amazoncloud.cn/1b97e5dd708a47968027d7003be0ef46_image.png)\n\n#### **Create input and output tables using Athena**\nTo create input and output Iceberg tables in the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) Data Catalog, open the Athena console and run the following queries in sequence:\n\n```\\n-- Create database for the demo\\nCREATE DATABASE iceberg_demo;\\n-- Create external table in input CSV files. Replace the S3 path with your bucket name\\nCREATE EXTERNAL TABLE iceberg_demo.raw_csv_input(\\n op string, \\n product_id bigint, \\n category string, \\n product_name string, \\n quantity_available bigint, \\n last_update_time string)\\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' \\nSTORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' \\nOUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\\nLOCATION 's3://glue-iceberg-demo/raw-csv-input/'\\nTBLPROPERTIES (\\n 'areColumnsQuoted'='false', \\n 'classification'='csv', \\n 'columnsOrdered'='true', \\n 'compressionType'='none', \\n 'delimiter'=',', \\n 'typeOfData'='file');\\n-- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name\\nCREATE TABLE iceberg_demo.iceberg_output (\\n product_id bigint,\\n category string,\\n product_name string,\\n quantity_available bigint,\\n last_update_time timestamp) \\nPARTITIONED BY (category, bucket(16,product_id)) \\nLOCATION 's3://glue-iceberg-demo/iceberg-output/' \\nTBLPROPERTIES (\\n 'table_type'='ICEBERG',\\n 'format'='parquet',\\n 'write_target_data_file_size_bytes'='536870912' \\n)\\n-- Validate the input data\\nSELECT * FROM iceberg_demo.raw_csv_input\\n;\\n```\nAlternatively, you can integrate an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) crawler on top of the input to create the table. Next, let’s create the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) PySpark job to process the input data.\n\n#### **Create the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job**\nComplete the following steps to create an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job:\n1. On the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) console, choose **Jobs** in the navigation pane.\n2. Choose **Create job**.\n3. Select **Spark script editor**.\n4. For **Options**, select **Create a new script with boilerplate code**.\n5. Choose **Create**.\n\n![image.png](https://dev-media.amazoncloud.cn/af0e000f6abf4e73acaf94a7ef736f56_image.png)\n\n6.Replace the script with the following script:\n```\\nimport sys\\nfrom awsglue.transforms import *\\nfrom awsglue.utils import getResolvedOptions\\nfrom pyspark.context import SparkContext\\nfrom awsglue.context import GlueContext\\nfrom awsglue.job import Job\\n\\nfrom pyspark.sql.functions import *\\nfrom awsglue.dynamicframe import DynamicFrame\\n\\nfrom pyspark.sql.window import Window\\nfrom pyspark.sql.functions import rank, max\\n\\nfrom pyspark.conf import SparkConf\\n\\nargs = getResolvedOptions(sys.argv, ['JOB_NAME', 'iceberg_job_catalog_warehouse'])\\nconf = SparkConf()\\n\\n## Please make sure to pass runtime argument --iceberg_job_catalog_warehouse with value as the S3 path \\nconf.set(\\"spark.sql.catalog.job_catalog.warehouse\\", args['iceberg_job_catalog_warehouse'])\\nconf.set(\\"spark.sql.catalog.job_catalog\\", \\"org.apache.iceberg.spark.SparkCatalog\\")\\nconf.set(\\"spark.sql.catalog.job_catalog.catalog-impl\\", \\"org.apache.iceberg.aws.glue.GlueCatalog\\")\\nconf.set(\\"spark.sql.catalog.job_catalog.io-impl\\", \\"org.apache.iceberg.aws.s3.S3FileIO\\")\\nconf.set(\\"spark.sql.extensions\\", \\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\\")\\nconf.set(\\"spark.sql.sources.partitionOverwriteMode\\", \\"dynamic\\")\\nconf.set(\\"spark.sql.iceberg.handle-timestamp-without-timezone\\",\\"true\\")\\n\\nsc = SparkContext(conf=conf)\\nglueContext = GlueContext(sc)\\nspark = glueContext.spark_session\\njob = Job(glueContext)\\njob.init(args[\\"JOB_NAME\\"], args)\\n\\n## Read Input Table\\nIncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = \\"iceberg_demo\\", table_name = \\"raw_csv_input\\", transformation_ctx = \\"IncrementalInputDyF\\")\\nIncrementalInputDF = IncrementalInputDyF.toDF()\\n\\nif not IncrementalInputDF.rdd.isEmpty():\\n ## Apply De-duplication logic on input data, to pickup latest record based on timestamp and operation \\n IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)\\n \\n # Add new columns to capture first and last OP value and what is the latest timestamp\\n inputDFWithTS= IncrementalInputDF.withColumn(\\"max_op_date\\",max(IncrementalInputDF.last_update_time).over(IDWindowDF))\\n \\n # Filter out new records that are inserted, then select latest record from existing records and merge both to get deduplicated output \\n NewInsertsDF = inputDFWithTS.filter(\\"last_update_time=max_op_date\\").filter(\\"op='I'\\")\\n UpdateDeleteDf = inputDFWithTS.filter(\\"last_update_time=max_op_date\\").filter(\\"op IN ('U','D')\\")\\n finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)\\n\\n # Register the deduplicated input as temporary table to use in Iceberg Spark SQL statements\\n finalInputDF.createOrReplaceTempView(\\"incremental_input_data\\")\\n finalInputDF.show()\\n \\n ## Perform merge operation on incremental input data with MERGE INTO. This section of the code uses Spark SQL to showcase the expressive SQL approach of Iceberg to perform a Merge operation\\n IcebergMergeOutputDF = spark.sql(\\"\\"\\"\\n MERGE INTO job_catalog.iceberg_demo.iceberg_output t\\n USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s\\n ON t.product_id = s.product_id\\n WHEN MATCHED AND s.op = 'D' THEN DELETE\\n WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time \\n WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)\\n \\"\\"\\")\\n\\n job.commit()\\n```\n7.On the **Job details** tab, specify the job name.\n8.For **IAM Role**, assign an IAM role that has the required permissions to run an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job and read and write to the S3 bucket.\n9.For **Glue version**, choose **Glue 3.0**.\n10.For **Language**, choose **Python 3**.\n\n![image.png](https://dev-media.amazoncloud.cn/674d9e83b7ce44a4a354cc235e657c61_image.png)\n\n11.Make sure **Job bookmark** has default value of **Enable**.\n12.Under **Connections**, choose the Iceberg connector.\n\n![image.png](https://dev-media.amazoncloud.cn/a7908d4e0ebd416e964b4dd891b08c35_image.png)\n\n13.Under **Job parameters**, specify **Key** as --```iceberg_job_catalog_warehouse``` and **Value** as your ```S3 path (e.g. s3://<bucket-name>/<iceberg-warehouse-path>```).\n\n![image.png](https://dev-media.amazoncloud.cn/652d87776d764e9a8307f3a8fdafe102_image.png)\n\n14.Choose **Save** and then **Run**, which should write the input data to the Iceberg table with a MERGE statement.\nBecause the target table is empty in the first run, the Iceberg MERGE statement runs an INSERT statement for all records.\n\n#### **Query the Iceberg table using Athena**\nAfter you have successfully run the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job, you can validate the output in Athena with the following SQL query:\n```\\nSELECT * FROM iceberg_demo.iceberg_output limit 10;\\n```\nThe output of the query should match the input, with one difference: The Iceberg output table doesn’t have the op column.\n\n![image.png](https://dev-media.amazoncloud.cn/6e243e72807d463e81b67e55269a7580_image.png)\n\n#### **Upload incremental (CDC) data for further processing**\nAfter we process the initial full load file, let’s upload the following two incremental files, which include insert, update, and delete records for a few products.\n\nThe following is a snapshot of first incremental file [(20220302-1134010000.csv)](https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-2164/20220302-1134010000.csv).\n\n![image.png](https://dev-media.amazoncloud.cn/6e531c63241e47d7942fd4d64b37f2d4_image.png)\n\nThe following is a snapshot of the second incremental file ([20220302-1135010000.csv](https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-2164/20220302-1135010000.csv)), which shows that record 102 has another update transaction before the next ETL job processing.\n\n![image.png](https://dev-media.amazoncloud.cn/7f3ef3c81fe0438ba10caff8ce2544d7_image.png)\n\nAfter you upload both incremental files, you should see them in the S3 bucket.\n\n![image.png](https://dev-media.amazoncloud.cn/ed93f55892c04b60bce00b772487fa02_image.png)\n\n#### **Run the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job again to process incremental files**\nBecause we enabled bookmarks on the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job, the next job picks up only the two new incremental files and performs a merge operation on the Iceberg table.\n\nTo run the job again, complete the following steps:\n- On the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) console, choose **Jobs** in the navigation pane.\n- Select the job and choose **Run**.\n\nAs explained earlier, the PySpark script is expected to deduplicate the input data before merging to the target Iceberg table, which means it only picks up the latest record of the 102 product.\n\nFor this post, we run the job manually, but you can configure your [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) jobs to run as part of an [AWS Glue workflow](https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html) or via [AWS Step Functions](http://aws.amazon.com/step-functions) (for more information, see [Manage AWS Glue Jobs with Step Functions](https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html)).\n\n#### **Query the Iceberg table using Athena, after incremental data processing**\nAfter incremental data processing is complete, you can run the same SELECT statement again and validate that the quantity value is updated for record 102 and product record 103 is deleted.\n\nThe following screenshot shows the output.\n\n![image.png](https://dev-media.amazoncloud.cn/eaf7d4517a5c4424a1c897b99fba3bcb_image.png)\n\n#### **Query the previous version of data with Iceberg’s time travel feature**\nYou can run the following SQL query in Athena that uses the AS OF TIME statement of Iceberg to query the previous version of the data:\n```\\n-SELECT * FROM iceberg_demo.iceberg_output FOR SYSTEM_TIME AS OF TIMESTAMP '2022-03-23 18:56:00'\\n```\nThe following screenshot shows the output. As you can see, the quantity value of product ID 102 is 30, which was available during the initial load.\n\n![image.png](https://dev-media.amazoncloud.cn/64628ed14cd049bcb013720af6a55d56_image.png)\n\nNote that you have to change the AS OF TIMESTAMP value based on your runtime.\n\nThis concludes the implementation steps.\n\n#### **Considerations**\nThe following are a few considerations you should keep in mind while integrating Apache Iceberg with [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail):\n\n- Athena support for Iceberg became generally available recently, so make sure you review the [considerations and limitations](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html) of using this feature.\n- [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) provides DynamicFrame APIs to read from different source systems and write to different targets. For this post, we integrated Spark DataFrame instead of [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) DynamicFrame because Iceberg’s MERGE statements aren’t supported with [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) DynamicFrame APIs.\nTo learn more about AWS integration, [refer to Iceberg AWS Integrations](https://iceberg.apache.org/docs/latest/aws/).\n\n#### **Conclusion**\nThis post explains how you can use the Apache Iceberg framework with [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) to implement UPSERT on an S3 data lake. It provides an overview of Apache Iceberg, its features and integration approaches, and explains how you can implement it through a step-by-step guide.\n\nI hope this gives you a great starting point for using Apache Iceberg with AWS analytics services and that you can build on top of it to implement your solution.\n\n#### **Appendix: [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) DynamicFrame sample code to interact with Iceberg tables**\n- The following code sample demonstrates how you can integrate the DynamicFrame method to read from an Iceberg table:\n```\\nIcebergDyF = (\\n glueContext.create_dynamic_frame.from_options(\\n connection_type=\\"marketplace.spark\\",\\n connection_options={\\n \\"path\\": \\"job_catalog.iceberg_demo.iceberg_output\\",\\n \\"connectionName\\": \\"Iceberg Connector for Glue 3.0\\",\\n },\\n transformation_ctx=\\"IcebergDyF\\",\\n )\\n)\\n\\n## Optionally, convert to Spark DataFrame if you plan to leverage Iceberg’s SQL based MERGE statements\\nInputIcebergDF = IcebergDyF.toDF()\\n```\n\n- The following sample code shows how you can integrate the DynamicFrame method to write to an Iceberg table for append-only mode:\n\n```\\n## Use the following 2 lines to convert Spark DataFrame to DynamicFrame, if you plan to leverage DynamicFrame API to write to final target\\nfrom awsglue.dynamicframe import DynamicFrame \\nfinalDyF = DynamicFrame.fromDF(InputIcebergDF,glueContext,\\"finalDyF\\")\\n\\nWriteIceberg = glueContext.write_dynamic_frame.from_options(\\n frame= finalDyF,\\n connection_type=\\"marketplace.spark\\",\\n connection_options={\\n \\"path\\": \\"job_catalog.iceberg_demo.iceberg_output\\",\\n \\"connectionName\\": \\"Iceberg Connector for Glue 3.0\\",\\n },\\n format=\\"parquet\\",\\n transformation_ctx=\\"WriteIcebergDyF\\",\\n)\\n```\n**About the Author**\n\n![image.png](https://dev-media.amazoncloud.cn/d4462a5643334b6db3bd39559ee60b37_image.png)\n\n**Sakti Mishra** is a Principal Data Lab Solution Architect at AWS, where he helps customers modernize their data architecture and help define end to end data strategy including data security, accessibility, governance, and more. He is also the author of the book [Simplify Big Data Analytics with Amazon EMR](https://www.amazon.com/Simplify-Big-Data-Analytics-Amazon-ebook/dp/B09RZYSZPL/). Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.","render":"<p>As the implementation of data lakes and modern data architecture increases, customers’ expectations around its features also increase, which include ACID transaction, UPSERT, time travel, schema evolution, auto compaction, and many more. By default, Amazon Simple Storage Service (Amazon S3) objects are immutable, which means you can’t update records in your data lake because it supports append-only transactions. But there are use cases where you might be receiving incremental updates with change data capture (CDC) from your source systems, and you might need to update existing data in Amazon S3 to have a golden copy. Previously, you had to overwrite the complete S3 object or folders, but with the evolution of frameworks such as <a href=\\"https://hudi.apache.org/\\" target=\\"_blank\\">Apache Hudi</a>, <a href=\\"https://iceberg.apache.org/\\" target=\\"_blank\\">Apache Iceberg</a>, <a href=\\"https://delta.io/\\" target=\\"_blank\\">Delta Lake</a>, and <a href=\\"https://docs.aws.amazon.com/lake-formation/latest/dg/governed-tables.html\\" target=\\"_blank\\">governed tables</a> in <a href=\\"https://aws.amazon.com/lake-formation/\\" target=\\"_blank\\">AWS Lake Formation</a>, you can get database-like UPSERT features in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail).</p>\\n<p>Apache Hudi integration is already supported with AWS analytics services, and recently <a href=\\"https://aws.amazon.com/glue/\\" target=\\"_blank\\">AWS Glue</a>, <a href=\\"https://aws.amazon.com/emr/\\" target=\\"_blank\\">Amazon EMR</a>, and <a href=\\"https://aws.amazon.com/athena/\\" target=\\"_blank\\">Amazon Athena</a> announced support for Apache Iceberg. Apache Iceberg is an open table format originally developed at Netflix, which got open-sourced as an Apache project in 2018 and graduated from incubator mid-2020. It’s designed to support ACID transactions and UPSERT on petabyte-scale data lakes, and is getting popular because of its flexible SQL syntax for CDC-based MERGE, full schema evolution, and hidden partitioning features.</p>\\n<p>In this post, we walk you through a solution to implement CDC-based UPSERT or MERGE in an S3 data lake using Apache Iceberg and AWS Glue.</p>\n<h4><a id=\\"Configure_Apache_Iceberg_with_AWS_Glue_6\\"></a><strong>Configure Apache Iceberg with AWS Glue</strong></h4>\\n<p>You can integrate Apache Iceberg JARs into AWS Glue through its <a href=\\"https://aws.amazon.com/marketplace/pp/prodview-iicxofvpqvsio\\" target=\\"_blank\\">AWS Marketplace connector</a>. The connector supports [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) versions 1.0, 2.0, and 3.0, and is free to use. Configuring this connector is as easy as clicking few buttons on the user interface.</p>\\n<p>The following steps guide you through the setup process:</p>\n<ol>\\n<li>Navigate to the <a href=\\"https://aws.amazon.com/marketplace/pp/prodview-iicxofvpqvsio\\" target=\\"_blank\\">AWS Marketplace connector page</a>.</li>\\n<li>Choose <strong>Continue to Subscribe</strong> and then <strong>Accept Terms</strong>.</li>\\n</ol>\n<p><img src=\\"https://dev-media.amazoncloud.cn/69eae38ae4cf405b947ef08b919b2260_image.png\\" alt=\\"image.png\\" /></p>\n<p>3.Choose <strong>Continue to Configuration</strong>.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/67bc3606f6c64311a4611dcb5d82442b_image.png\\" alt=\\"image.png\\" /></p>\n<p>4.Choose the AWS Glue version and software version.<br />\\n5.Choose <strong>Continue to Launch</strong>.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/32147c55446c459b86b8c1d19490c667_image.png\\" alt=\\"image.png\\" /></p>\n<p>6.Choose <strong>Usage Instruction</strong>, which opens a page that has a link to activate the connector.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/8049d33016e2434c8f0ea1f455d8b374_image.png\\" alt=\\"image.png\\" /></p>\n<p>7.Create a connection by providing a name and choosing <strong>Create connection and activate connector</strong>.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/d68c097cd56d4186ba99e0810872dce6_image.png\\" alt=\\"image.png\\" /></p>\n<p>You can confirm your new connection on the AWS Glue Studio <strong>Connectors</strong> page.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/dc14f6c4be284f43931ba6a7ac6de12b_image.png\\" alt=\\"image.png\\" /></p>\n<p>To use this connector, when you create an AWS Glue job, make sure you add this connector to your job. Later in the implementation steps, when you create an AWS Glue job, we show how to use the connector you just configured.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/b9ef272a6f5448d69985dd28d0da1742_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Solution_overview_40\\"></a><strong>Solution overview</strong></h4>\\n<p>Let’s assume you have a relational database that has product inventory data, and you want to move it into an S3 data lake on a continuous basis, so that your downstream applications or consumers can use it for analytics. After your initial data movement to Amazon S3, you’re supposed to receive incremental updates from the source database as CSV files using AWS DMS or equivalent tools, where each record has an additional column to represent an insert, update, or delete operation. While processing the incremental CDC data, one of the primary requirements you have is merging the CDC data in the data lake and providing the capability to query previous versions of the data.</p>\n<p>To solve this use case, we present the following simple architecture that integrates Amazon S3 for the data lake, AWS Glue with the Apache Iceberg connector for ETL (extract, transform, and load), and Athena for querying the data using standard SQL. Athena helps in querying the latest product inventory data from the Iceberg table’s latest snapshot, and Iceberg’s time travel feature helps in identifying a product’s price at any previous date.</p>\n<p>The following diagram illustrates the solution architecture.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/de7d3b70de9847ae9b991739a51102f8_image.png\\" alt=\\"image.png\\" /></p>\n<p>The solution workflow consists of the following steps:</p>\n<ul>\\n<li>Data ingestion:\\n<ul>\\n<li>Steps 1.1 and 1.2 use <a href=\\"https://aws.amazon.com/dms/\\" target=\\"_blank\\">AWS Database Migration Service (AWS DMS)</a>, which connects to the source database and moves incremental data (CDC) to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) in CSV format.</li>\\n<li>Steps 1.3 and 1.4 consist of the AWS Glue PySpark job, which reads incremental data from the S3 input bucket, performs deduplication of the records, and then invokes Apache Iceberg’s MERGE statements to merge the data with the target UPSERT S3 bucket.</li>\n</ul>\\n</li>\n<li>Data access:\\n<ul>\\n<li>Steps 2.1 and 2.2 represent Athena integration to query data from the Iceberg table using standard SQL and validate the time travel feature of Iceberg.</li>\n</ul>\\n</li>\n<li>Data Catalog:\\n<ul>\\n<li>The AWS Glue Data Catalog is treated as a centralized catalog, which is used by AWS Glue and Athena. An AWS Glue crawler is integrated on top of S3 buckets to automatically detect the schema.</li>\n</ul>\\n</li>\n</ul>\\n<p>We have referenced AWS DMS as part of the architecture, but while showcasing the solution steps, we assume that the AWS DMS output is already available in Amazon S3, and focus on processing the data using AWS Glue and Apache Iceberg.</p>\n<p>To demo the implementation steps, we use sample product inventory data that has the following attributes:</p>\n<ul>\\n<li><strong>op</strong>– Represents the operation on the source record. This shows values <code>I</code> to represent insert operations, <code>U</code> to represent updates, and <code>D</code> to represent deletes. You need to make sure this attribute is included in your CDC incremental data before it gets written to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). AWS DMS enables you to include this attribute, but if you’re using other mechanisms to move data, make sure you capture this attribute, so that your ETL logic can take appropriate action while merging it.</li>\\n<li><strong>product_id</strong> – This is the primary key column in the source database’s products table.</li>\\n<li><strong>category</strong> – This column represents the product’s category, such as <code>Electronics</code> or <code>Cosmetics</code>.</li>\\n<li><strong>product_name</strong> – This is the name of the product.</li>\\n<li><strong>quantity_available</strong>– This is the quantity available in the inventory for a product. When we showcase the incremental data for UPSERT or MERGE, we reduce the quantity available for the product to showcase the functionality.</li>\\n<li><strong>last_update_time</strong> – This is the time when the product record was updated at the source database.</li>\\n</ul>\n<p>If you’re using AWS DMS to move data from your relational database to Amazon S3, then by default AWS DMS includes the <code>op</code> attribute for incremental CDC data, but it’s not included by default for the initial load. If you’re using CSV as your target file format, you can include <code>IncludeOpForFullLoad</code> as <code>true</code> in your S3 target endpoint setting of AWS DMS to have the <code>op</code> attribute included in your initial full load file. To learn more about the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) settings in AWS DMS, refer to <a href=\\"https://docs.aws.amazon.com/dms/latest/APIReference/API_S3Settings.html\\" target=\\"_blank\\">S3Settings</a>.</p>\\n<h4><a id=\\"Prerequisites_70\\"></a><strong>Prerequisites</strong></h4>\\n<p>Before getting started on the implementation, make sure you have the required permissions to perform the following in your AWS account:</p>\n<ul>\\n<li>Create <a href=\\"http://aws.amazon.com/iam\\" target=\\"_blank\\">AWS Identity and Access Management</a> (IAM) roles as needed</li>\\n<li>Read or write to an S3 bucket</li>\n<li>Create and run AWS Glue crawlers and jobs</li>\n<li>Manage a database, table, and workgroups, and run queries in Athena</li>\n</ul>\\n<p>To implement the solution, we create AWS resources such as an S3 bucket and an AWS Glue job, and integrate the Iceberg code for processing. Before we run the AWS Glue job, we have to upload the sample CSV files to the input bucket and process it with AWS Glue PySpark code for the output.</p>\n<p>For this post, we use the <code>us-east-1</code> Region, but you can integrate it in your preferred Region if the AWS services included in the architecture are available in that Region.</p>\\n<h4><a id=\\"Create_an_S3_bucket_for_input_and_output_81\\"></a><strong>Create an S3 bucket for input and output</strong></h4>\\n<p>To create an S3 bucket, complete the following steps:<br />\\nNow let’s dive into the implementation steps.</p>\n<ol>\\n<li>On the Amazon S3 console, choose <strong>Buckets</strong> in the navigation pane.</li>\\n<li>Choose<strong>Create bucket</strong>.</li>\\n<li>Specify the bucket name as <code>glue-iceberg-demo</code>, and leave the remaining fields as default. S3 bucket names are globally unique. While implementing the solution, you may get an error saying the bucket name already exists. Make sure to provide a unique name and use the same name while implementing the rest of the implementation steps. Formatting the bucket name as <code>&lt;Bucket-Name&gt;-\${AWS_ACCOUNT_ID}-\${AWS_REGION_CODE}</code> might help you get a unique name.</li>\\n<li>Choose <strong>Create bucket</strong>.</li>\\n<li>On the bucket details page, choose <strong>Create folder</strong>.</li>\\n<li>Create two subfolders: <code>raw-csv-input</code> and <code>iceberg-output</code>.</li>\\n</ol>\n<p><img src=\\"https://dev-media.amazoncloud.cn/72d71cde2f944395892c7e03bc6e57f4_image.png\\" alt=\\"image.png\\" /></p>\n<p>7.Upload the <a href=\\"https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-2164/LOAD00000001.csv\\" target=\\"_blank\\">LOAD00000001.csv</a> file into the <code>raw-csv-input</code> folder of the bucket.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/8a728c6899834e82b6f9b426f57c3e35_image.png\\" alt=\\"image.png\\" /></p>\n<p>The following screenshot provides a sample of the input dataset.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/1b97e5dd708a47968027d7003be0ef46_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Create_input_and_output_tables_using_Athena_101\\"></a><strong>Create input and output tables using Athena</strong></h4>\\n<p>To create input and output Iceberg tables in the AWS Glue Data Catalog, open the Athena console and run the following queries in sequence:</p>\n<pre><code class=\\"lang-\\">-- Create database for the demo\\nCREATE DATABASE iceberg_demo;\\n-- Create external table in input CSV files. Replace the S3 path with your bucket name\\nCREATE EXTERNAL TABLE iceberg_demo.raw_csv_input(\\n op string, \\n product_id bigint, \\n category string, \\n product_name string, \\n quantity_available bigint, \\n last_update_time string)\\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' \\nSTORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' \\nOUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\\nLOCATION 's3://glue-iceberg-demo/raw-csv-input/'\\nTBLPROPERTIES (\\n 'areColumnsQuoted'='false', \\n 'classification'='csv', \\n 'columnsOrdered'='true', \\n 'compressionType'='none', \\n 'delimiter'=',', \\n 'typeOfData'='file');\\n-- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name\\nCREATE TABLE iceberg_demo.iceberg_output (\\n product_id bigint,\\n category string,\\n product_name string,\\n quantity_available bigint,\\n last_update_time timestamp) \\nPARTITIONED BY (category, bucket(16,product_id)) \\nLOCATION 's3://glue-iceberg-demo/iceberg-output/' \\nTBLPROPERTIES (\\n 'table_type'='ICEBERG',\\n 'format'='parquet',\\n 'write_target_data_file_size_bytes'='536870912' \\n)\\n-- Validate the input data\\nSELECT * FROM iceberg_demo.raw_csv_input\\n;\\n</code></pre>\\n<p>Alternatively, you can integrate an AWS Glue crawler on top of the input to create the table. Next, let’s create the AWS Glue PySpark job to process the input data.</p>\n<h4><a id=\\"Create_the_AWS_Glue_job_146\\"></a><strong>Create the AWS Glue job</strong></h4>\\n<p>Complete the following steps to create an AWS Glue job:</p>\n<ol>\\n<li>On the AWS Glue console, choose <strong>Jobs</strong> in the navigation pane.</li>\\n<li>Choose <strong>Create job</strong>.</li>\\n<li>Select <strong>Spark script editor</strong>.</li>\\n<li>For <strong>Options</strong>, select <strong>Create a new script with boilerplate code</strong>.</li>\\n<li>Choose <strong>Create</strong>.</li>\\n</ol>\n<p><img src=\\"https://dev-media.amazoncloud.cn/af0e000f6abf4e73acaf94a7ef736f56_image.png\\" alt=\\"image.png\\" /></p>\n<p>6.Replace the script with the following script:</p>\n<pre><code class=\\"lang-\\">import sys\\nfrom awsglue.transforms import *\\nfrom awsglue.utils import getResolvedOptions\\nfrom pyspark.context import SparkContext\\nfrom awsglue.context import GlueContext\\nfrom awsglue.job import Job\\n\\nfrom pyspark.sql.functions import *\\nfrom awsglue.dynamicframe import DynamicFrame\\n\\nfrom pyspark.sql.window import Window\\nfrom pyspark.sql.functions import rank, max\\n\\nfrom pyspark.conf import SparkConf\\n\\nargs = getResolvedOptions(sys.argv, ['JOB_NAME', 'iceberg_job_catalog_warehouse'])\\nconf = SparkConf()\\n\\n## Please make sure to pass runtime argument --iceberg_job_catalog_warehouse with value as the S3 path \\nconf.set(&quot;spark.sql.catalog.job_catalog.warehouse&quot;, args['iceberg_job_catalog_warehouse'])\\nconf.set(&quot;spark.sql.catalog.job_catalog&quot;, &quot;org.apache.iceberg.spark.SparkCatalog&quot;)\\nconf.set(&quot;spark.sql.catalog.job_catalog.catalog-impl&quot;, &quot;org.apache.iceberg.aws.glue.GlueCatalog&quot;)\\nconf.set(&quot;spark.sql.catalog.job_catalog.io-impl&quot;, &quot;org.apache.iceberg.aws.s3.S3FileIO&quot;)\\nconf.set(&quot;spark.sql.extensions&quot;, &quot;org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions&quot;)\\nconf.set(&quot;spark.sql.sources.partitionOverwriteMode&quot;, &quot;dynamic&quot;)\\nconf.set(&quot;spark.sql.iceberg.handle-timestamp-without-timezone&quot;,&quot;true&quot;)\\n\\nsc = SparkContext(conf=conf)\\nglueContext = GlueContext(sc)\\nspark = glueContext.spark_session\\njob = Job(glueContext)\\njob.init(args[&quot;JOB_NAME&quot;], args)\\n\\n## Read Input Table\\nIncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = &quot;iceberg_demo&quot;, table_name = &quot;raw_csv_input&quot;, transformation_ctx = &quot;IncrementalInputDyF&quot;)\\nIncrementalInputDF = IncrementalInputDyF.toDF()\\n\\nif not IncrementalInputDF.rdd.isEmpty():\\n ## Apply De-duplication logic on input data, to pickup latest record based on timestamp and operation \\n IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)\\n \\n # Add new columns to capture first and last OP value and what is the latest timestamp\\n inputDFWithTS= IncrementalInputDF.withColumn(&quot;max_op_date&quot;,max(IncrementalInputDF.last_update_time).over(IDWindowDF))\\n \\n # Filter out new records that are inserted, then select latest record from existing records and merge both to get deduplicated output \\n NewInsertsDF = inputDFWithTS.filter(&quot;last_update_time=max_op_date&quot;).filter(&quot;op='I'&quot;)\\n UpdateDeleteDf = inputDFWithTS.filter(&quot;last_update_time=max_op_date&quot;).filter(&quot;op IN ('U','D')&quot;)\\n finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)\\n\\n # Register the deduplicated input as temporary table to use in Iceberg Spark SQL statements\\n finalInputDF.createOrReplaceTempView(&quot;incremental_input_data&quot;)\\n finalInputDF.show()\\n \\n ## Perform merge operation on incremental input data with MERGE INTO. This section of the code uses Spark SQL to showcase the expressive SQL approach of Iceberg to perform a Merge operation\\n IcebergMergeOutputDF = spark.sql(&quot;&quot;&quot;\\n MERGE INTO job_catalog.iceberg_demo.iceberg_output t\\n USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s\\n ON t.product_id = s.product_id\\n WHEN MATCHED AND s.op = 'D' THEN DELETE\\n WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time \\n WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)\\n &quot;&quot;&quot;)\\n\\n job.commit()\\n</code></pre>\\n<p>7.On the <strong>Job details</strong> tab, specify the job name.<br />\\n8.For <strong>IAM Role</strong>, assign an IAM role that has the required permissions to run an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job and read and write to the S3 bucket.<br />\\n9.For <strong>Glue version</strong>, choose <strong>Glue 3.0</strong>.<br />\\n10.For <strong>Language</strong>, choose <strong>Python 3</strong>.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/674d9e83b7ce44a4a354cc235e657c61_image.png\\" alt=\\"image.png\\" /></p>\n<p>11.Make sure <strong>Job bookmark</strong> has default value of <strong>Enable</strong>.<br />\\n12.Under <strong>Connections</strong>, choose the Iceberg connector.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/a7908d4e0ebd416e964b4dd891b08c35_image.png\\" alt=\\"image.png\\" /></p>\n<p>13.Under <strong>Job parameters</strong>, specify <strong>Key</strong> as –<code>iceberg_job_catalog_warehouse</code> and <strong>Value</strong> as your <code>S3 path (e.g. s3://&lt;bucket-name&gt;/&lt;iceberg-warehouse-path&gt;</code>).</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/652d87776d764e9a8307f3a8fdafe102_image.png\\" alt=\\"image.png\\" /></p>\n<p>14.Choose <strong>Save</strong> and then <strong>Run</strong>, which should write the input data to the Iceberg table with a MERGE statement.<br />\\nBecause the target table is empty in the first run, the Iceberg MERGE statement runs an INSERT statement for all records.</p>\n<h4><a id=\\"Query_the_Iceberg_table_using_Athena_242\\"></a><strong>Query the Iceberg table using Athena</strong></h4>\\n<p>After you have successfully run the AWS Glue job, you can validate the output in Athena with the following SQL query:</p>\n<pre><code class=\\"lang-\\">SELECT * FROM iceberg_demo.iceberg_output limit 10;\\n</code></pre>\\n<p>The output of the query should match the input, with one difference: The Iceberg output table doesn’t have the op column.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/6e243e72807d463e81b67e55269a7580_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Upload_incremental_CDC_data_for_further_processing_251\\"></a><strong>Upload incremental (CDC) data for further processing</strong></h4>\\n<p>After we process the initial full load file, let’s upload the following two incremental files, which include insert, update, and delete records for a few products.</p>\n<p>The following is a snapshot of first incremental file <a href=\\"https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-2164/20220302-1134010000.csv\\" target=\\"_blank\\">(20220302-1134010000.csv)</a>.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/6e531c63241e47d7942fd4d64b37f2d4_image.png\\" alt=\\"image.png\\" /></p>\n<p>The following is a snapshot of the second incremental file (<a href=\\"https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-2164/20220302-1135010000.csv\\" target=\\"_blank\\">20220302-1135010000.csv</a>), which shows that record 102 has another update transaction before the next ETL job processing.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/7f3ef3c81fe0438ba10caff8ce2544d7_image.png\\" alt=\\"image.png\\" /></p>\n<p>After you upload both incremental files, you should see them in the S3 bucket.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/ed93f55892c04b60bce00b772487fa02_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Run_the_AWS_Glue_job_again_to_process_incremental_files_266\\"></a><strong>Run the AWS Glue job again to process incremental files</strong></h4>\\n<p>Because we enabled bookmarks on the AWS Glue job, the next job picks up only the two new incremental files and performs a merge operation on the Iceberg table.</p>\n<p>To run the job again, complete the following steps:</p>\n<ul>\\n<li>On the AWS Glue console, choose <strong>Jobs</strong> in the navigation pane.</li>\\n<li>Select the job and choose <strong>Run</strong>.</li>\\n</ul>\n<p>As explained earlier, the PySpark script is expected to deduplicate the input data before merging to the target Iceberg table, which means it only picks up the latest record of the 102 product.</p>\n<p>For this post, we run the job manually, but you can configure your AWS Glue jobs to run as part of an <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html\\" target=\\"_blank\\">AWS Glue workflow</a> or via <a href=\\"http://aws.amazon.com/step-functions\\" target=\\"_blank\\">AWS Step Functions</a> (for more information, see <a href=\\"https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html\\" target=\\"_blank\\">Manage AWS Glue Jobs with Step Functions</a>).</p>\\n<h4><a id=\\"Query_the_Iceberg_table_using_Athena_after_incremental_data_processing_277\\"></a><strong>Query the Iceberg table using Athena, after incremental data processing</strong></h4>\\n<p>After incremental data processing is complete, you can run the same SELECT statement again and validate that the quantity value is updated for record 102 and product record 103 is deleted.</p>\n<p>The following screenshot shows the output.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/eaf7d4517a5c4424a1c897b99fba3bcb_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Query_the_previous_version_of_data_with_Icebergs_time_travel_feature_284\\"></a><strong>Query the previous version of data with Iceberg’s time travel feature</strong></h4>\\n<p>You can run the following SQL query in Athena that uses the AS OF TIME statement of Iceberg to query the previous version of the data:</p>\n<pre><code class=\\"lang-\\">-SELECT * FROM iceberg_demo.iceberg_output FOR SYSTEM_TIME AS OF TIMESTAMP '2022-03-23 18:56:00'\\n</code></pre>\\n<p>The following screenshot shows the output. As you can see, the quantity value of product ID 102 is 30, which was available during the initial load.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/64628ed14cd049bcb013720af6a55d56_image.png\\" alt=\\"image.png\\" /></p>\n<p>Note that you have to change the AS OF TIMESTAMP value based on your runtime.</p>\n<p>This concludes the implementation steps.</p>\n<h4><a id=\\"Considerations_297\\"></a><strong>Considerations</strong></h4>\\n<p>The following are a few considerations you should keep in mind while integrating Apache Iceberg with AWS Glue:</p>\n<ul>\\n<li>Athena support for Iceberg became generally available recently, so make sure you review the <a href=\\"https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html\\" target=\\"_blank\\">considerations and limitations</a> of using this feature.</li>\\n<li>AWS Glue provides DynamicFrame APIs to read from different source systems and write to different targets. For this post, we integrated Spark DataFrame instead of AWS Glue DynamicFrame because Iceberg’s MERGE statements aren’t supported with AWS Glue DynamicFrame APIs.<br />\\nTo learn more about AWS integration, <a href=\\"https://iceberg.apache.org/docs/latest/aws/\\" target=\\"_blank\\">refer to Iceberg AWS Integrations</a>.</li>\\n</ul>\n<h4><a id=\\"Conclusion_304\\"></a><strong>Conclusion</strong></h4>\\n<p>This post explains how you can use the Apache Iceberg framework with AWS Glue to implement UPSERT on an S3 data lake. It provides an overview of Apache Iceberg, its features and integration approaches, and explains how you can implement it through a step-by-step guide.</p>\n<p>I hope this gives you a great starting point for using Apache Iceberg with AWS analytics services and that you can build on top of it to implement your solution.</p>\n<h4><a id=\\"Appendix_AWS_Glue_DynamicFrame_sample_code_to_interact_with_Iceberg_tables_309\\"></a><strong>Appendix: AWS Glue DynamicFrame sample code to interact with Iceberg tables</strong></h4>\\n<ul>\\n<li>The following code sample demonstrates how you can integrate the DynamicFrame method to read from an Iceberg table:</li>\n</ul>\\n<pre><code class=\\"lang-\\">IcebergDyF = (\\n glueContext.create_dynamic_frame.from_options(\\n connection_type=&quot;marketplace.spark&quot;,\\n connection_options={\\n &quot;path&quot;: &quot;job_catalog.iceberg_demo.iceberg_output&quot;,\\n &quot;connectionName&quot;: &quot;Iceberg Connector for Glue 3.0&quot;,\\n },\\n transformation_ctx=&quot;IcebergDyF&quot;,\\n )\\n)\\n\\n## Optionally, convert to Spark DataFrame if you plan to leverage Iceberg’s SQL based MERGE statements\\nInputIcebergDF = IcebergDyF.toDF()\\n</code></pre>\\n<ul>\\n<li>The following sample code shows how you can integrate the DynamicFrame method to write to an Iceberg table for append-only mode:</li>\n</ul>\\n<pre><code class=\\"lang-\\">## Use the following 2 lines to convert Spark DataFrame to DynamicFrame, if you plan to leverage DynamicFrame API to write to final target\\nfrom awsglue.dynamicframe import DynamicFrame \\nfinalDyF = DynamicFrame.fromDF(InputIcebergDF,glueContext,&quot;finalDyF&quot;)\\n\\nWriteIceberg = glueContext.write_dynamic_frame.from_options(\\n frame= finalDyF,\\n connection_type=&quot;marketplace.spark&quot;,\\n connection_options={\\n &quot;path&quot;: &quot;job_catalog.iceberg_demo.iceberg_output&quot;,\\n &quot;connectionName&quot;: &quot;Iceberg Connector for Glue 3.0&quot;,\\n },\\n format=&quot;parquet&quot;,\\n transformation_ctx=&quot;WriteIcebergDyF&quot;,\\n)\\n</code></pre>\\n<p><strong>About the Author</strong></p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/d4462a5643334b6db3bd39559ee60b37_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>Sakti Mishra</strong> is a Principal Data Lab Solution Architect at AWS, where he helps customers modernize their data architecture and help define end to end data strategy including data security, accessibility, governance, and more. He is also the author of the book <a href=\\"https://www.amazon.com/Simplify-Big-Data-Analytics-Amazon-ebook/dp/B09RZYSZPL/\\" target=\\"_blank\\">Simplify Big Data Analytics with Amazon EMR</a>. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭