Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and Amazon Glue

海外精选
海外精选的内容汇集了全球优质的亚马逊云科技相关技术内容。同时,内容中提到的“AWS” 是 “Amazon Web Services” 的缩写,在此网站不作为商标展示。
0
0
{"value":"Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. The data is processed by specialized big data compute engines, such as [Amazon Athena](https://aws.amazon.com/athena) for interactive queries, [Amazon EMR](https://aws.amazon.com/emr/) for [Apache Spark](https://spark.apache.org/) applications, [Amazon SageMaker](https://aws.amazon.com/pm/sagemaker) for machine learning, and [Amazon QuickSight](https://aws.amazon.com/quicksight/) for data visualization.\n\n[Apache Iceberg](https://iceberg.apache.org/) is an open-source table format for data stored in data lakes. It is optimized for data access patterns in [Amazon Simple Storage Service](https://aws.amazon.com/s3/) ([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)) cloud object storage. Iceberg helps data engineers tackle complex challenges in data lakes such as managing continuously evolving datasets while maintaining query performance. Iceberg allows you to do the following:\n\n- Maintain transactional consistency where files can be added, removed, or modified atomically with full read isolation and multiple concurrent writes\n- Implement full schema evolution to process safe table schema updates as the table data evolves\n- Organize tables into flexible partition layouts with partition evolution, enabling updates to partition schemes as queries and data volume changes without relying on physical directories\n- Perform row-level update and delete operations to satisfy new regulatory requirements such as the General Data Protection Regulation (GDPR)\n- Provide versioned tables and support time travel queries to query historical data and verify changes between updates\n- Roll back tables to prior versions to return tables to a known good state in case of any issues\n\nIn 2021, AWS teams contributed the [Apache Iceberg integration with the AWS Glue Data Catalog](https://iceberg.apache.org/docs/latest/aws/) to open source, which enables you to use open-source compute engines like [Apache Spark](https://iceberg.apache.org/docs/latest/getting-started/) with Iceberg on [AWS Glue](https://aws.amazon.com/glue). In 2022, [Amazon Athena announced support of Iceberg](https://aws.amazon.com/about-aws/whats-new/2022/04/amazon-athena-acid-transactions-powered-apache-iceberg/) and [Amazon EMR added support of Iceberg starting with version 6.5.0](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg.html).\n\nIn this post, we show you how to use [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) Spark to create an Iceberg table, load sample books review data, and use Athena to query, perform schema evolution, row-level update and delete, and time travel, all coordinated through the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) Data Catalog.\n\n#### **Solution overview**\n\n\nWe use the [Amazon Customer Reviews public dataset](https://s3.amazonaws.com/amazon-reviews-pds/readme.html) as our source data. The dataset contains data files in [Apache Parquet](https://parquet.apache.org/) format on [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). We load all the book-related Amazon review data as an Iceberg table to demonstrate the advantages of using the Iceberg table format on top of raw Parquet files. The following diagram illustrates our solution architecture.\n\n![image.png](1)\n\nTo set up and test this solution, we complete the following high-level steps:\n\n1. Create an S3 bucket.\n2. Create an EMR cluster.\n3. Create an EMR notebook.\n4. Configure a Spark session.\n5. Load data into the Iceberg table.\n6. Query the data in Athena.\n7. Perform a row-level update in Athena.\n8. Perform a schema evolution in Athena.\n9. Perform time travel in Athena.\n10. Consume Iceberg data across [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) and Athena.\n\n#### **Prerequisites**\n\nTo follow along with this walkthrough, you must have the following:\n\n- An [AWS Account](https://portal.aws.amazon.com/billing/signup#/start) with a role that has sufficient access to provision the required resources.\n\n\n#### **Create an S3 bucket**\n\n\nTo create an S3 bucket that holds your Iceberg data, complete the following steps:\n\n1. On the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) console, choose **Buckets** in the navigation pane.\n2. Choose **Create bucket**.\n3. For **Bucket name**, enter a name (for this post, we enter ```aws-lake-house-iceberg-blog-demo```).Because S3 bucket names are globally unique, choose a different name when you create your bucket.\n4. For **AWS Region**, choose your preferred Region (for this post, we use ```us-east-1```).\n\n![image.png](2)\n\n5. Complete the remaining steps to create your bucket.\n6. If this is the first time that you’re using Athena to run queries, create another globally unique S3 bucket to hold your Athena query output.\n\n\n#### **Create an EMR cluster**\n\n\nNow we’re ready to start an EMR cluster to run Iceberg jobs using Spark.\n\n1. On the [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) console, choose **Create cluster**.\n2. Choose **Advanced options**.\n3. For **Software Configuration**, choose your [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) release version.Iceberg requires release 6.5.0 and above.\n4. Select **JupyterEnterpriseGateway** and **Spark** as the software to install.\n5. For **Edit software settings**, select **Enter configuration** and enter ```[{\\"classification\\":\\"iceberg-defaults\\",\\"properties\\":{\\"iceberg.enabled\\":true}}]```.\n6. Leave other settings at their default and choose **Next**.\n\n![image.png](3)\n\n7. You can change the hardware used by the [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) cluster in this step. In this demo, we use the default setting.\n8. Choose **Next**.\n9. For **Cluster name**, enter ```Iceberg Spark Cluster```.\n10. Leave the remaining settings unchanged and choose **Next**.\n\n![image.png](4)\n\n\n11. You can configure security settings such as [adding an EC2 key pair](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-access-ssh.html) to access your EMR cluster locally. In this demo, we use the default setting.\n12. Choose **Create cluster**.\n\nYou’re redirected to the cluster detail page, where you wait for the EMR cluster to transition from ```Starting``` to ```Waiting```.\n\n\n#### **Create an EMR notebook**\n\n\nWhen the cluster is active and in the ```Waiting``` state, we’re ready to run Spark programs in the cluster. For this demo, we use an [EMR notebook](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks.html) to run Spark commands.\n\n1. On the [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) console, choose **Notebooks** in the navigation pane.\n2. Choose **Create notebook**.\n3. For **Notebook name**, enter a name (for this post, we enter iceberg-spark-notebook).\n4. For **Cluster**, select **Choose an existing cluster** and choose **Iceberg Spark Cluster**.\n5. For **AWS service role**, choose **Create a new role** to create ```EMR_Notebook_DefaultRole``` or choose a different role to access resources in the notebook.\n6. Choose **Create notebook**.\n\n![image.png](5)\n\nYou’re redirected to the notebook detail page.\n\n7. Choose Open in JupyterLab next to your notebook.\n8. Choose to create a new notebook.\n9. Under Notebook, choose Spark.\n\n![image.png](6)\n\n#### **Configure a Spark session**\n\n\nIn your notebook, run the following code:\n\n```\\n%%configure -f\\n{\\n \\"conf\\": {\\n \\"spark.sql.catalog.demo\\": \\"org.apache.iceberg.spark.SparkCatalog\\",\\n \\"spark.sql.catalog.demo.catalog-impl\\": \\"org.apache.iceberg.aws.glue.GlueCatalog\\",\\n \\"spark.sql.catalog.demo.warehouse\\": \\"s3://<your-iceberg-blog-demo-bucket>\\",\\n \\"spark.sql.extensions\\":\\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\\"\\n }\\n}\\n```\n\nThis sets the following Spark session configurations:\n\n- **spark.sql.catalog.demo** – Registers a Spark catalog named ```demo```, which uses the Iceberg Spark catalog plugin\n- **spark.sql.catalog.demo.catalog-impl** – The ```demo``` Spark catalog uses [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) as the physical catalog to store Iceberg database and table information\n- **spark.sql.catalog.demo.warehous**e – The ```demo``` Spark catalog stores all Iceberg metadata and data files under the root path ```s3://<your-iceberg-blog-demo-bucket>```\n- **spark.sql.extensions** – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step)\n\n#### **Load data into the Iceberg table**\n\nIn our Spark session, run the following commands to load data:\n\n```\\n// create a database in AWS Glue named reviews if not exist\\nspark.sql(\\"CREATE DATABASE IF NOT EXISTS demo.reviews\\")\\n\\n// load reviews related to books\\nval book_reviews_location = \\"s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet\\"\\nval book_reviews = spark.read.parquet(book_reviews_location)\\n\\n// write book reviews data to an Iceberg v2 table\\nbook_reviews.writeTo(\\"demo.reviews.book_reviews\\").tableProperty(\\"format-version\\", \\"2\\").createOrReplace()\\n\\n```\n\nIceberg format v2 is needed to support row-level updates and deletes. See [Format Versioning](https://iceberg.apache.org/spec/#format-versioning) for more details.\n\nIt may take up to 15 minutes for the commands to complete. When it’s complete, you should be able to see the table on the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) console, under the ``reviews`` database, with the ```table_type``` property shown as ```ICEBERG```.\\n\\n![image.png](7)\\n\\nThe table schema is inferred from the source Parquet data files. You can also create the table with a specific schema before loading data using [Spark SQL](https://iceberg.apache.org/docs/latest/spark-ddl/#create-table), [Athena SQL](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html), or Iceberg [Java and Python SDKs](https://iceberg.apache.org/docs/latest/java-api-quickstart/).\\n\\n#### **Query in Athena**\\n\\nNavigate to the Athena console and choose **Query editor**. If this is your first time using the Athena query editor, you need to [configure](https://docs.aws.amazon.com/athena/latest/ug/querying.html#query-results-specify-location-console) to use the S3 bucket you created earlier to store the query results.\\n\\nThe table ```book_reviews``` is available for querying. Run the following query:\\n\\n```\\nSELECT * FROM reviews.book_reviews LIMIT 5;\\n```\\n\\nThe following screenshot shows the first five records from the table being displayed.\\n\\n![image.png](8)\\n\\n#### **Perform a row-level update in Athena**\\n\\n\\nIn the next few steps, let’s focus on a record in the table with review ID ```RZDVOUQG1GBG7```. Currently, it has no total votes when we run the following query:\\n\\n```\\nSELECT total_votes FROM reviews.book_reviews \\nWHERE review_id = 'RZDVOUQG1GBG7'\\n```\\n\\n![image.png](9)\\n\\nLet’s update the total_votes value to 2 using the following query:\\n\\n```\\nUPDATE reviews.book_reviews\\nSET total_votes = 2\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n```\\n\\n![image.png](10)\\n\\nAfter your update command runs successfully, run the below query and note the updated result showing a total of two votes:\\n\\n```\\nSELECT total_votes FROM reviews.book_reviews\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n```\\n\\nAthena enforces ACID transaction guarantee for all the write operations against an Iceberg table. This is done through the Iceberg format’s optimistic locking specification. When concurrent attempts are made to update the same record, a commit conflict occurs. In this scenario, Athena displays a transaction conflict error, as shown in the following screenshot.\\n\\n![image.png](11)\\n\\nDelete queries work in a similar way; see [DELETE](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-updating-iceberg-table-data.html#querying-iceberg-delete) for more details.\\n\\n\\n#### **Perform a schema evolution in Athena**\\n\\n\\nSuppose the review suddenly goes viral and gets 10 billion votes:\\n\\n```\\nUPDATE reviews.book_reviews\\nSET total_votes = 10000000000\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n```\\n\\nBased on the AWS Glue table information, the ```total_votes``` is an integer column. If you try to update a value of 10 billion, which is greater than the maximum allowed integer value, you get an error reporting a type mismatch.\\n\\n![image.png](12)\\n\\nIceberg supports most schema evolution features as metadata-only operations, which don’t require a table rewrite. This includes add, drop, rename, reorder column, and promote column types. To solve this issue, you can change the integer column ```total_votes``` to a ```BIGINT``` type by running the following ```DDL```:\\n\\n```\\nALTER TABLE reviews.book_reviews\\nCHANGE COLUMN total_votes total_votes BIGINT;\\n```\\n\\nYou can now update the value successfully:\\n\\n```\\nUPDATE reviews.book_reviews\\nSET total_votes = 10000000000\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n```\\n\\nQuerying the record now gives us the expected result in ```BIGINT```:\\n\\n```\\nSELECT total_votes FROM reviews.book_reviews\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n```\\n\\n#### **Perform time travel in Athena**\\n\\n\\nIn Iceberg, the transaction history is retained, and each transaction commit creates a new version. You can perform time travel to look at a historical version of a table. In Athena, you can use the following syntax to travel to a time that is after when the first version was committed:\\n\\n```\\nSELECT total_votes FROM reviews.book_reviews\\nFOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n```\\n\\n![image.png](13)\\n\\n#### **Consume Iceberg data across Amazon EMR and Athena**\\n\\nOne of the most important features of a data lake is for different systems to seamlessly work together through the Iceberg open-source protocol. After all the operations are performed in Athena, let’s go back to Amazon EMR and confirm that Amazon EMR Spark can consume the updated data.\\n\\nFirst, run the same Spark SQL and see if you get the same result for the review used in the example:\\n\\n```\\nval select_votes = \\"\\"\\"SELECT total_votes FROM demo.reviews.book_reviews\\nWHERE review_id = 'RZDVOUQG1GBG7'\\"\\"\\"\\n\\nspark.sql(select_votes).show()\\n```\\n\\nSpark shows 10 billion total votes for the review.\\n\\n\\n![image.png](14)\\n\\nCheck the transaction history of the operation in Athena through Spark Iceberg’s history system table:\\n\\n```\\nval select_history = \\"SELECT * FROM demo.reviews.book_reviews.history\\"\\n\\nspark.sql(select_history).show()\\n```\\n\\nThis shows three transactions corresponding to the two updates you ran in Athena.\\n\\n![image.png](15)\\n\\nIceberg offers a variety of Spark procedures to optimize the table. For example, you can run an [expire_snapshot](https://iceberg.apache.org/docs/latest/spark-procedures/#expire_snapshots) sprocedure to remove old snapshots, and free up storage space in Amazon S3:\\n\\n```\\nimport java.util.Calendar\\nimport java.text.SimpleDateFormat\\n\\nval now = Calendar.getInstance().getTime()\\nval form = new SimpleDateFormat(\\"yyyy-MM-dd HH:mm:ss\\")\\nval now_formatted = form.format(now.getTime())\\nval procedure = s\\"\\"\\"CALL demo.system.expire_snapshots(\\n table => 'reviews.book_reviews',\\n older_than => TIMESTAMP '\$now_formatted',\\n retain_last => 1)\\"\\"\\"\\n\\nspark.sql(procedure)\\n```\\n\\nNote that, after running this procedure, time travel can no longer be performed against expired snapshots.\\n\\nExamine the history system table again and notice that it shows you only the most recent snapshot.\\n\\nRunning the following query in Athena results in an error “No table snapshot found before timestamp…” as older snapshots were deleted, and you are no longer able to time travel to the older snapshot:\\n\\n```\\nSELECT total_votes FROM reviews.book_reviews\\nFOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n```\\n\\n#### **Clean up**\\n\\nTo avoid incurring ongoing costs, complete the following steps to clean up your resources:\\n\\n1. Run the following code in your notebook to drop the AWS Glue table and database:\\n\\n```\\n// DROP the table \\nspark.sql(\\"DROP TABLE demo.reviews.book_reviews\\") \\n// DROP the database \\nspark.sql(\\"DROP DATABASE demo.reviews\\")\\n```\\n\\n2. On the Amazon EMR console, choose **Notebooks** in the navigation pane.\\n3. Select the notebook ```iceberg-spark-notebook``` and choose **Delete**.\\n4. Choose **Clusters** in the navigation pane.\\n5. Select the cluster ```Iceberg Spark Cluster``` and choose **Terminate**.\n6. [Delete the S3 buckets](https://docs.aws.amazon.com/AmazonS3/latest/userguide/delete-bucket.html) and any other resources that you created as part of the prerequisites for this post.\n\n#### **Conclusion**\n\nIn this post, we showed you an example of using [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail), [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail), and Athena to build an Iceberg data lake on AWS. An Iceberg table can seamlessly work across two popular compute engines, and you can take advantage of both to design your customized data production and consumption use cases.\n\nWith [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail), [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail), and Athena, you can already use many features through AWS integrations, such as [SageMaker Athena integration](https://aws.amazon.com/blogs/machine-learning/run-sql-queries-from-your-sagemaker-notebooks-using-amazon-athena/) for machine learning, or [QuickSight Athena integration](https://aws.amazon.com/blogs/big-data/use-amazon-athena-and-amazon-quicksight-in-a-cross-account-environment/) for dashboard and reporting. [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) also offers the [Iceberg connector](https://aws.amazon.com/marketplace/pp/prodview-iicxofvpqvsio), which you can use to author and run Iceberg data pipelines.\n\nIn addition, Iceberg supports [a variety of other open-source compute engines](https://iceberg.apache.org/multi-engine-support/) that you can choose from. For example, you can use [Apache Flink](https://iceberg.apache.org/docs/latest/flink/) on [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) for streaming and change data capture (CDC) use cases. The strong transaction guarantee and efficient row-level update, delete, time travel, and schema evolution experience offered by Iceberg offers a sound foundation and infinite possibilities for users to unlock the power of big data.\n\n#### **About the Authors**\n\n![image.png](16)\n\n**Kishore Dhamodaran** is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.\n\n![image.png](17)\n\n**Jack Ye** is a software engineer of the Athena Data Lake and Storage team. He is an Apache Iceberg Committer and PMC member.\n\n![image.png](18)\n\n**Mohit Mehta** is a Principal Architect at AWS with expertise in AI/ML and data analytics. He holds 12 AWS certifications and is passionate about helping customers implement cloud enterprise strategies for digital transformation. In his free time, he trains for marathons and plans hikes across major peaks around the world.\n\n![image.png](19)\n\n**Giovanni Matteo Fumarola** is the Engineering Manager of the Athena Data Lake and Storage team. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.\n\n![image.png](20)\n\n**Jared Keating** is a Senior Cloud Consultant with AWS Professional Services. Jared assists customers with their cloud infrastructure, compliance, and automation requirements, drawing from his 20+ years of IT experience.\n\n","render":"<p>Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. The data is processed by specialized big data compute engines, such as <a href=\\"https://aws.amazon.com/athena\\" target=\\"_blank\\">Amazon Athena</a> for interactive queries, <a href=\\"https://aws.amazon.com/emr/\\" target=\\"_blank\\">Amazon EMR</a> for <a href=\\"https://spark.apache.org/\\" target=\\"_blank\\">Apache Spark</a> applications, <a href=\\"https://aws.amazon.com/pm/sagemaker\\" target=\\"_blank\\">Amazon SageMaker</a> for machine learning, and <a href=\\"https://aws.amazon.com/quicksight/\\" target=\\"_blank\\">Amazon QuickSight</a> for data visualization.</p>\\n<p><a href=\\"https://iceberg.apache.org/\\" target=\\"_blank\\">Apache Iceberg</a> is an open-source table format for data stored in data lakes. It is optimized for data access patterns in <a href=\\"https://aws.amazon.com/s3/\\" target=\\"_blank\\">Amazon Simple Storage Service</a> ([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)) cloud object storage. Iceberg helps data engineers tackle complex challenges in data lakes such as managing continuously evolving datasets while maintaining query performance. Iceberg allows you to do the following:</p>\\n<ul>\\n<li>Maintain transactional consistency where files can be added, removed, or modified atomically with full read isolation and multiple concurrent writes</li>\n<li>Implement full schema evolution to process safe table schema updates as the table data evolves</li>\n<li>Organize tables into flexible partition layouts with partition evolution, enabling updates to partition schemes as queries and data volume changes without relying on physical directories</li>\n<li>Perform row-level update and delete operations to satisfy new regulatory requirements such as the General Data Protection Regulation (GDPR)</li>\n<li>Provide versioned tables and support time travel queries to query historical data and verify changes between updates</li>\n<li>Roll back tables to prior versions to return tables to a known good state in case of any issues</li>\n</ul>\\n<p>In 2021, AWS teams contributed the <a href=\\"https://iceberg.apache.org/docs/latest/aws/\\" target=\\"_blank\\">Apache Iceberg integration with the AWS Glue Data Catalog</a> to open source, which enables you to use open-source compute engines like <a href=\\"https://iceberg.apache.org/docs/latest/getting-started/\\" target=\\"_blank\\">Apache Spark</a> with Iceberg on <a href=\\"https://aws.amazon.com/glue\\" target=\\"_blank\\">AWS Glue</a>. In 2022, <a href=\\"https://aws.amazon.com/about-aws/whats-new/2022/04/amazon-athena-acid-transactions-powered-apache-iceberg/\\" target=\\"_blank\\">Amazon Athena announced support of Iceberg</a> and <a href=\\"https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg.html\\" target=\\"_blank\\">Amazon EMR added support of Iceberg starting with version 6.5.0</a>.</p>\\n<p>In this post, we show you how to use Amazon EMR Spark to create an Iceberg table, load sample books review data, and use Athena to query, perform schema evolution, row-level update and delete, and time travel, all coordinated through the AWS Glue Data Catalog.</p>\n<h4><a id=\\"Solution_overview_15\\"></a><strong>Solution overview</strong></h4>\\n<p>We use the <a href=\\"https://s3.amazonaws.com/amazon-reviews-pds/readme.html\\" target=\\"_blank\\">Amazon Customer Reviews public dataset</a> as our source data. The dataset contains data files in <a href=\\"https://parquet.apache.org/\\" target=\\"_blank\\">Apache Parquet</a> format on [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). We load all the book-related Amazon review data as an Iceberg table to demonstrate the advantages of using the Iceberg table format on top of raw Parquet files. The following diagram illustrates our solution architecture.</p>\\n<p><img src=\\"1\\" alt=\\"image.png\\" /></p>\n<p>To set up and test this solution, we complete the following high-level steps:</p>\n<ol>\\n<li>Create an S3 bucket.</li>\n<li>Create an EMR cluster.</li>\n<li>Create an EMR notebook.</li>\n<li>Configure a Spark session.</li>\n<li>Load data into the Iceberg table.</li>\n<li>Query the data in Athena.</li>\n<li>Perform a row-level update in Athena.</li>\n<li>Perform a schema evolution in Athena.</li>\n<li>Perform time travel in Athena.</li>\n<li>Consume Iceberg data across Amazon EMR and Athena.</li>\n</ol>\\n<h4><a id=\\"Prerequisites_35\\"></a><strong>Prerequisites</strong></h4>\\n<p>To follow along with this walkthrough, you must have the following:</p>\n<ul>\\n<li>An <a href=\\"https://portal.aws.amazon.com/billing/signup#/start\\" target=\\"_blank\\">AWS Account</a> with a role that has sufficient access to provision the required resources.</li>\\n</ul>\n<h4><a id=\\"Create_an_S3_bucket_42\\"></a><strong>Create an S3 bucket</strong></h4>\\n<p>To create an S3 bucket that holds your Iceberg data, complete the following steps:</p>\n<ol>\\n<li>On the Amazon S3 console, choose <strong>Buckets</strong> in the navigation pane.</li>\\n<li>Choose <strong>Create bucket</strong>.</li>\\n<li>For <strong>Bucket name</strong>, enter a name (for this post, we enter <code>aws-lake-house-iceberg-blog-demo</code>).Because S3 bucket names are globally unique, choose a different name when you create your bucket.</li>\\n<li>For <strong>AWS Region</strong>, choose your preferred Region (for this post, we use <code>us-east-1</code>).</li>\\n</ol>\n<p><img src=\\"2\\" alt=\\"image.png\\" /></p>\n<ol start=\\"5\\">\\n<li>Complete the remaining steps to create your bucket.</li>\n<li>If this is the first time that you’re using Athena to run queries, create another globally unique S3 bucket to hold your Athena query output.</li>\n</ol>\\n<h4><a id=\\"Create_an_EMR_cluster_58\\"></a><strong>Create an EMR cluster</strong></h4>\\n<p>Now we’re ready to start an EMR cluster to run Iceberg jobs using Spark.</p>\n<ol>\\n<li>On the Amazon EMR console, choose <strong>Create cluster</strong>.</li>\\n<li>Choose <strong>Advanced options</strong>.</li>\\n<li>For <strong>Software Configuration</strong>, choose your [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) release version.Iceberg requires release 6.5.0 and above.</li>\\n<li>Select <strong>JupyterEnterpriseGateway</strong> and <strong>Spark</strong> as the software to install.</li>\\n<li>For <strong>Edit software settings</strong>, select <strong>Enter configuration</strong> and enter <code>[{&quot;classification&quot;:&quot;iceberg-defaults&quot;,&quot;properties&quot;:{&quot;iceberg.enabled&quot;:true}}]</code>.</li>\\n<li>Leave other settings at their default and choose <strong>Next</strong>.</li>\\n</ol>\n<p><img src=\\"3\\" alt=\\"image.png\\" /></p>\n<ol start=\\"7\\">\\n<li>You can change the hardware used by the Amazon EMR cluster in this step. In this demo, we use the default setting.</li>\n<li>Choose <strong>Next</strong>.</li>\\n<li>For <strong>Cluster name</strong>, enter <code>Iceberg Spark Cluster</code>.</li>\\n<li>Leave the remaining settings unchanged and choose <strong>Next</strong>.</li>\\n</ol>\n<p><img src=\\"4\\" alt=\\"image.png\\" /></p>\n<ol start=\\"11\\">\\n<li>You can configure security settings such as <a href=\\"https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-access-ssh.html\\" target=\\"_blank\\">adding an EC2 key pair</a> to access your EMR cluster locally. In this demo, we use the default setting.</li>\\n<li>Choose <strong>Create cluster</strong>.</li>\\n</ol>\n<p>You’re redirected to the cluster detail page, where you wait for the EMR cluster to transition from <code>Starting</code> to <code>Waiting</code>.</p>\\n<h4><a id=\\"Create_an_EMR_notebook_86\\"></a><strong>Create an EMR notebook</strong></h4>\\n<p>When the cluster is active and in the <code>Waiting</code> state, we’re ready to run Spark programs in the cluster. For this demo, we use an <a href=\\"https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks.html\\" target=\\"_blank\\">EMR notebook</a> to run Spark commands.</p>\\n<ol>\\n<li>On the Amazon EMR console, choose <strong>Notebooks</strong> in the navigation pane.</li>\\n<li>Choose <strong>Create notebook</strong>.</li>\\n<li>For <strong>Notebook name</strong>, enter a name (for this post, we enter iceberg-spark-notebook).</li>\\n<li>For <strong>Cluster</strong>, select <strong>Choose an existing cluster</strong> and choose <strong>Iceberg Spark Cluster</strong>.</li>\\n<li>For <strong>AWS service role</strong>, choose <strong>Create a new role</strong> to create <code>EMR_Notebook_DefaultRole</code> or choose a different role to access resources in the notebook.</li>\\n<li>Choose <strong>Create notebook</strong>.</li>\\n</ol>\n<p><img src=\\"5\\" alt=\\"image.png\\" /></p>\n<p>You’re redirected to the notebook detail page.</p>\n<ol start=\\"7\\">\\n<li>Choose Open in JupyterLab next to your notebook.</li>\n<li>Choose to create a new notebook.</li>\n<li>Under Notebook, choose Spark.</li>\n</ol>\\n<p><img src=\\"6\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Configure_a_Spark_session_108\\"></a><strong>Configure a Spark session</strong></h4>\\n<p>In your notebook, run the following code:</p>\n<pre><code class=\\"lang-\\">%%configure -f\\n{\\n &quot;conf&quot;: {\\n &quot;spark.sql.catalog.demo&quot;: &quot;org.apache.iceberg.spark.SparkCatalog&quot;,\\n &quot;spark.sql.catalog.demo.catalog-impl&quot;: &quot;org.apache.iceberg.aws.glue.GlueCatalog&quot;,\\n &quot;spark.sql.catalog.demo.warehouse&quot;: &quot;s3://&lt;your-iceberg-blog-demo-bucket&gt;&quot;,\\n &quot;spark.sql.extensions&quot;:&quot;org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions&quot;\\n }\\n}\\n</code></pre>\\n<p>This sets the following Spark session configurations:</p>\n<ul>\\n<li><strong>spark.sql.catalog.demo</strong> – Registers a Spark catalog named <code>demo</code>, which uses the Iceberg Spark catalog plugin</li>\\n<li><strong>spark.sql.catalog.demo.catalog-impl</strong> – The <code>demo</code> Spark catalog uses [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) as the physical catalog to store Iceberg database and table information</li>\\n<li><strong>spark.sql.catalog.demo.warehous</strong>e – The <code>demo</code> Spark catalog stores all Iceberg metadata and data files under the root path <code>s3://&lt;your-iceberg-blog-demo-bucket&gt;</code></li>\\n<li><strong>spark.sql.extensions</strong> – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step)</li>\\n</ul>\n<h4><a id=\\"Load_data_into_the_Iceberg_table_132\\"></a><strong>Load data into the Iceberg table</strong></h4>\\n<p>In our Spark session, run the following commands to load data:</p>\n<pre><code class=\\"lang-\\">// create a database in AWS Glue named reviews if not exist\\nspark.sql(&quot;CREATE DATABASE IF NOT EXISTS demo.reviews&quot;)\\n\\n// load reviews related to books\\nval book_reviews_location = &quot;s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet&quot;\\nval book_reviews = spark.read.parquet(book_reviews_location)\\n\\n// write book reviews data to an Iceberg v2 table\\nbook_reviews.writeTo(&quot;demo.reviews.book_reviews&quot;).tableProperty(&quot;format-version&quot;, &quot;2&quot;).createOrReplace()\\n\\n</code></pre>\\n<p>Iceberg format v2 is needed to support row-level updates and deletes. See <a href=\\"https://iceberg.apache.org/spec/#format-versioning\\" target=\\"_blank\\">Format Versioning</a> for more details.</p>\\n<p>It may take up to 15 minutes for the commands to complete. When it’s complete, you should be able to see the table on the AWS Glue console, under the <code>reviews</code> database, with the <code>table_type</code> property shown as <code>ICEBERG</code>.</p>\\n<p><img src=\\"7\\" alt=\\"image.png\\" /></p>\n<p>The table schema is inferred from the source Parquet data files. You can also create the table with a specific schema before loading data using <a href=\\"https://iceberg.apache.org/docs/latest/spark-ddl/#create-table\\" target=\\"_blank\\">Spark SQL</a>, <a href=\\"https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html\\" target=\\"_blank\\">Athena SQL</a>, or Iceberg <a href=\\"https://iceberg.apache.org/docs/latest/java-api-quickstart/\\" target=\\"_blank\\">Java and Python SDKs</a>.</p>\\n<h4><a id=\\"Query_in_Athena_157\\"></a><strong>Query in Athena</strong></h4>\\n<p>Navigate to the Athena console and choose <strong>Query editor</strong>. If this is your first time using the Athena query editor, you need to <a href=\\"https://docs.aws.amazon.com/athena/latest/ug/querying.html#query-results-specify-location-console\\" target=\\"_blank\\">configure</a> to use the S3 bucket you created earlier to store the query results.</p>\\n<p>The table <code>book_reviews</code> is available for querying. Run the following query:</p>\\n<pre><code class=\\"lang-\\">SELECT * FROM reviews.book_reviews LIMIT 5;\\n</code></pre>\\n<p>The following screenshot shows the first five records from the table being displayed.</p>\n<p><img src=\\"8\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Perform_a_rowlevel_update_in_Athena_171\\"></a><strong>Perform a row-level update in Athena</strong></h4>\\n<p>In the next few steps, let’s focus on a record in the table with review ID <code>RZDVOUQG1GBG7</code>. Currently, it has no total votes when we run the following query:</p>\\n<pre><code class=\\"lang-\\">SELECT total_votes FROM reviews.book_reviews \\nWHERE review_id = 'RZDVOUQG1GBG7'\\n</code></pre>\\n<p><img src=\\"9\\" alt=\\"image.png\\" /></p>\n<p>Let’s update the total_votes value to 2 using the following query:</p>\n<pre><code class=\\"lang-\\">UPDATE reviews.book_reviews\\nSET total_votes = 2\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n</code></pre>\\n<p><img src=\\"10\\" alt=\\"image.png\\" /></p>\n<p>After your update command runs successfully, run the below query and note the updated result showing a total of two votes:</p>\n<pre><code class=\\"lang-\\">SELECT total_votes FROM reviews.book_reviews\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n</code></pre>\\n<p>Athena enforces ACID transaction guarantee for all the write operations against an Iceberg table. This is done through the Iceberg format’s optimistic locking specification. When concurrent attempts are made to update the same record, a commit conflict occurs. In this scenario, Athena displays a transaction conflict error, as shown in the following screenshot.</p>\n<p><img src=\\"11\\" alt=\\"image.png\\" /></p>\n<p>Delete queries work in a similar way; see <a href=\\"https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-updating-iceberg-table-data.html#querying-iceberg-delete\\" target=\\"_blank\\">DELETE</a> for more details.</p>\\n<h4><a id=\\"Perform_a_schema_evolution_in_Athena_207\\"></a><strong>Perform a schema evolution in Athena</strong></h4>\\n<p>Suppose the review suddenly goes viral and gets 10 billion votes:</p>\n<pre><code class=\\"lang-\\">UPDATE reviews.book_reviews\\nSET total_votes = 10000000000\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n</code></pre>\\n<p>Based on the AWS Glue table information, the <code>total_votes</code> is an integer column. If you try to update a value of 10 billion, which is greater than the maximum allowed integer value, you get an error reporting a type mismatch.</p>\\n<p><img src=\\"12\\" alt=\\"image.png\\" /></p>\n<p>Iceberg supports most schema evolution features as metadata-only operations, which don’t require a table rewrite. This includes add, drop, rename, reorder column, and promote column types. To solve this issue, you can change the integer column <code>total_votes</code> to a <code>BIGINT</code> type by running the following <code>DDL</code>:</p>\\n<pre><code class=\\"lang-\\">ALTER TABLE reviews.book_reviews\\nCHANGE COLUMN total_votes total_votes BIGINT;\\n</code></pre>\\n<p>You can now update the value successfully:</p>\n<pre><code class=\\"lang-\\">UPDATE reviews.book_reviews\\nSET total_votes = 10000000000\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n</code></pre>\\n<p>Querying the record now gives us the expected result in <code>BIGINT</code>:</p>\\n<pre><code class=\\"lang-\\">SELECT total_votes FROM reviews.book_reviews\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n</code></pre>\\n<h4><a id=\\"Perform_time_travel_in_Athena_244\\"></a><strong>Perform time travel in Athena</strong></h4>\\n<p>In Iceberg, the transaction history is retained, and each transaction commit creates a new version. You can perform time travel to look at a historical version of a table. In Athena, you can use the following syntax to travel to a time that is after when the first version was committed:</p>\n<pre><code class=\\"lang-\\">SELECT total_votes FROM reviews.book_reviews\\nFOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n</code></pre>\\n<p><img src=\\"13\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Consume_Iceberg_data_across_Amazon_EMR_and_Athena_257\\"></a><strong>Consume Iceberg data across Amazon EMR and Athena</strong></h4>\\n<p>One of the most important features of a data lake is for different systems to seamlessly work together through the Iceberg open-source protocol. After all the operations are performed in Athena, let’s go back to Amazon EMR and confirm that Amazon EMR Spark can consume the updated data.</p>\n<p>First, run the same Spark SQL and see if you get the same result for the review used in the example:</p>\n<pre><code class=\\"lang-\\">val select_votes = &quot;&quot;&quot;SELECT total_votes FROM demo.reviews.book_reviews\\nWHERE review_id = 'RZDVOUQG1GBG7'&quot;&quot;&quot;\\n\\nspark.sql(select_votes).show()\\n</code></pre>\\n<p>Spark shows 10 billion total votes for the review.</p>\n<p><img src=\\"14\\" alt=\\"image.png\\" /></p>\n<p>Check the transaction history of the operation in Athena through Spark Iceberg’s history system table:</p>\n<pre><code class=\\"lang-\\">val select_history = &quot;SELECT * FROM demo.reviews.book_reviews.history&quot;\\n\\nspark.sql(select_history).show()\\n</code></pre>\\n<p>This shows three transactions corresponding to the two updates you ran in Athena.</p>\n<p><img src=\\"15\\" alt=\\"image.png\\" /></p>\n<p>Iceberg offers a variety of Spark procedures to optimize the table. For example, you can run an <a href=\\"https://iceberg.apache.org/docs/latest/spark-procedures/#expire_snapshots\\" target=\\"_blank\\">expire_snapshot</a> sprocedure to remove old snapshots, and free up storage space in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail):</p>\\n<pre><code class=\\"lang-\\">import java.util.Calendar\\nimport java.text.SimpleDateFormat\\n\\nval now = Calendar.getInstance().getTime()\\nval form = new SimpleDateFormat(&quot;yyyy-MM-dd HH:mm:ss&quot;)\\nval now_formatted = form.format(now.getTime())\\nval procedure = s&quot;&quot;&quot;CALL demo.system.expire_snapshots(\\n table =&gt; 'reviews.book_reviews',\\n older_than =&gt; TIMESTAMP '\$now_formatted',\\n retain_last =&gt; 1)&quot;&quot;&quot;\\n\\nspark.sql(procedure)\\n</code></pre>\\n<p>Note that, after running this procedure, time travel can no longer be performed against expired snapshots.</p>\n<p>Examine the history system table again and notice that it shows you only the most recent snapshot.</p>\n<p>Running the following query in Athena results in an error “No table snapshot found before timestamp…” as older snapshots were deleted, and you are no longer able to time travel to the older snapshot:</p>\n<pre><code class=\\"lang-\\">SELECT total_votes FROM reviews.book_reviews\\nFOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute\\nWHERE review_id = 'RZDVOUQG1GBG7'\\n</code></pre>\\n<h4><a id=\\"Clean_up_316\\"></a><strong>Clean up</strong></h4>\\n<p>To avoid incurring ongoing costs, complete the following steps to clean up your resources:</p>\n<ol>\\n<li>Run the following code in your notebook to drop the AWS Glue table and database:</li>\n</ol>\\n<pre><code class=\\"lang-\\">// DROP the table \\nspark.sql(&quot;DROP TABLE demo.reviews.book_reviews&quot;) \\n// DROP the database \\nspark.sql(&quot;DROP DATABASE demo.reviews&quot;)\\n</code></pre>\\n<ol start=\\"2\\">\\n<li>On the Amazon EMR console, choose <strong>Notebooks</strong> in the navigation pane.</li>\\n<li>Select the notebook <code>iceberg-spark-notebook</code> and choose <strong>Delete</strong>.</li>\\n<li>Choose <strong>Clusters</strong> in the navigation pane.</li>\\n<li>Select the cluster <code>Iceberg Spark Cluster</code> and choose <strong>Terminate</strong>.</li>\\n<li><a href=\\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/delete-bucket.html\\" target=\\"_blank\\">Delete the S3 buckets</a> and any other resources that you created as part of the prerequisites for this post.</li>\\n</ol>\n<h4><a id=\\"Conclusion_335\\"></a><strong>Conclusion</strong></h4>\\n<p>In this post, we showed you an example of using Amazon S3, AWS Glue, Amazon EMR, and Athena to build an Iceberg data lake on AWS. An Iceberg table can seamlessly work across two popular compute engines, and you can take advantage of both to design your customized data production and consumption use cases.</p>\n<p>With AWS Glue, Amazon EMR, and Athena, you can already use many features through AWS integrations, such as <a href=\\"https://aws.amazon.com/blogs/machine-learning/run-sql-queries-from-your-sagemaker-notebooks-using-amazon-athena/\\" target=\\"_blank\\">SageMaker Athena integration</a> for machine learning, or <a href=\\"https://aws.amazon.com/blogs/big-data/use-amazon-athena-and-amazon-quicksight-in-a-cross-account-environment/\\" target=\\"_blank\\">QuickSight Athena integration</a> for dashboard and reporting. [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) also offers the <a href=\\"https://aws.amazon.com/marketplace/pp/prodview-iicxofvpqvsio\\" target=\\"_blank\\">Iceberg connector</a>, which you can use to author and run Iceberg data pipelines.</p>\\n<p>In addition, Iceberg supports <a href=\\"https://iceberg.apache.org/multi-engine-support/\\" target=\\"_blank\\">a variety of other open-source compute engines</a> that you can choose from. For example, you can use <a href=\\"https://iceberg.apache.org/docs/latest/flink/\\" target=\\"_blank\\">Apache Flink</a> on [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) for streaming and change data capture (CDC) use cases. The strong transaction guarantee and efficient row-level update, delete, time travel, and schema evolution experience offered by Iceberg offers a sound foundation and infinite possibilities for users to unlock the power of big data.</p>\\n<h4><a id=\\"About_the_Authors_343\\"></a><strong>About the Authors</strong></h4>\\n<p><img src=\\"16\\" alt=\\"image.png\\" /></p>\n<p><strong>Kishore Dhamodaran</strong> is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.</p>\\n<p><img src=\\"17\\" alt=\\"image.png\\" /></p>\n<p><strong>Jack Ye</strong> is a software engineer of the Athena Data Lake and Storage team. He is an Apache Iceberg Committer and PMC member.</p>\\n<p><img src=\\"18\\" alt=\\"image.png\\" /></p>\n<p><strong>Mohit Mehta</strong> is a Principal Architect at AWS with expertise in AI/ML and data analytics. He holds 12 AWS certifications and is passionate about helping customers implement cloud enterprise strategies for digital transformation. In his free time, he trains for marathons and plans hikes across major peaks around the world.</p>\\n<p><img src=\\"19\\" alt=\\"image.png\\" /></p>\n<p><strong>Giovanni Matteo Fumarola</strong> is the Engineering Manager of the Athena Data Lake and Storage team. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.</p>\\n<p><img src=\\"20\\" alt=\\"image.png\\" /></p>\n<p><strong>Jared Keating</strong> is a Senior Cloud Consultant with AWS Professional Services. Jared assists customers with their cloud infrastructure, compliance, and automation requirements, drawing from his 20+ years of IT experience.</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭