Amazon EMR 上 Apache Hudi 0.9.0 提供的新功能

0
0
{"value":"[Apache Hudi](https://hudi.apache.org/) 是一个开源事务性数据湖框架,极大地简化了增量数据处理和数据管道开发。它面向 [Amazon Simple Storage Service(Amazon S3)](https://aws.amazon.com/s3/) 或 Apache HDFS 上的数据湖,提供了事务支持和记录级的插入、更新和删除功能,从而实现了简化。Apache Hudi 支持与 Apache Spark、Apache Hive、Presto 和 Trino 等开源大数据分析框架集成。此外,Apache Hudi 还允许您以开放格式(例如 Apache Parquet 和 Apache Avro),在 Amazon S3 或 Apache HDFS 中维护数据。\n\n客户使用 Apache Hudi 的常见使用场景如下:\n\n- 简化数据摄入管道,这些管道用于处理来自流式传输和批量数据源的延迟到达或更新记录。\n- 使用更改数据捕获(CDC, Change Data Capture)从事务系统中摄入数据。\n- 实施数据删除管道以遵守数据隐私法规,例如 GDPR(General Data Protection Regulation, 通用数据保护条例)合规性。 遵守 GDPR 是当今现代化数据架构的必要条件,这包括“擦除权”或“被遗忘权”功能,并且可以使用 Apache Hudi 功能来实施,用于替代删除和更新操作。\n\n我们很高兴地宣布,Apache Hudi 0.9.0 已在 [Amazon EMR](https://aws.amazon.com/emr/) 5.34 和 EMR 6.5.0 上提供。这是一个主要[版本](https://hudi.apache.org/releases/release-0.9.0/),其亮点在于包括了 Spark SQL DML 和 DDL 支持,以及其他一些写入器/读取器方面的改进。与 Hudi 0.6.0 相比,我们观察到的查询性能提高了 3 倍,这尤为显著,因此,如果您希望实施事务数据湖来达到极高的更新插入和删除操作速度,或者正在使用旧版本的Hudi,那么这是一个极佳的版本。在这篇文章中,我们将重点介绍 0.9.0 版本提供的以下新功能和改进:\n\n- Spark SQL DML 和 DDL 支持:探索 Spark SQL DML 和 DDL 支持。\n- 性能改进:探索写入器和查询端引入的性能改进和与性能相关的新功能。\n- 其他功能:探索其他有用的功能,例如基于 [Amazon DynamoDB](https://aws.amazon.com/dynamodb/) 的 Optimistic Concurrency Control(OCC)锁定、删除分区操作等。\n\n### **Spark SQL DML 和 DDL 支持**\n\n最令人兴奋的新功能是 Apache Hudi 0.9.0 增加了对使用 Spark SQL 的 [DDL/DML](https://hudi.apache.org/docs/0.9.0/quick-start-guide) 的支持。这使得 Hudi 面向所有人(非工程师、分析师等)提供更好的可访问性和可操作性迈出了重要一步。此外,它还使现有数据集可以轻松地迁移到 Apache Hudi 表中,并且它更接近使用 Spark SQL DML 和 DDL 的低代码范例,因此无需编写 scala/python 代码。\n\n用户现在可以使用 ```CREATE TABLE....USING HUDI``` 和 ```REATE TABLE ..AS SELECT SQL``` 语句创建表,以直接在 [Amazon Glue](https://aws.amazon.com/glue/) 目录中管理表。\n\n然后,用户可以使用 ```INSERT```、```UPDATE```、```MERGE INTO``` 和 ```DELETE SQL``` 语句操作数据。```INSERT OVERWRITE``` 语句可用于针对现有批处理 ETL 管道中的表或分区,覆盖其现有数据。\n\n我们来看一个简单的[示例](https://github.com/aws-samples/emr-studio-notebook-examples/blob/main/examples/query-hudi-0.9.0-dataset-with-spark-sql.ipynb),在该示例中,我们创建了一个 Hudi 表 ```amazon_customer_review_hudi```,模仿了 [Amazon Customer Reviews](https://s3.amazonaws.com/amazon-reviews-pds/readme.html) 公有数据集并执行以下活动:\n\n- 先决条件:创建 Amazon Simple Storage Service(S3)存储桶 ```s3://EXAMPLE-BUCKET``` 和 ```s3://EXAMPLE-BUCKET-1```\n- 创建分区的 Hudi 表 ```amazon_product_review_hudi```\n- 创建源 Hudi 表 ```amazon_customer_review_parquet_merge_source```,其内容将与 ```amazon_product_review_hudi``` 表合并\n- 将数据插入 ```amazon_customer_review_parquet_merge_source``` 和 ```amazon_product_review_hudi``` 中,以及通过读取\n- ```amazon_customer_review_parquet_merge_source``` 中的数据并与 Hudi 表 ```amazon_product_review_hudi``` 合并来执行合并操作\n- 在 ```amazon_customer_review_hudi``` 上对之前插入的记录执行删除操作\n\n### **配置 Spark 会话**\n\n我们通过 [EMR studio](https://aws.amazon.com/emr/features/studio/) 笔记本使用以下脚本,用于配置 Spark 会话,以便结合使用 Apache Hudi DML 和 DDL 支持。以下[示例](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-work-with-dataset.html)演示了如何启动交互式 Spark shell、使用 Spark 提交或将 Amazon EMR Notebooks 与 Amazon EMR 上的 Hudi 结合使用。我们建议使用以下 Apache Livy 配置启动 EMR 集群:\n\n```\n[\n {\n \"Classification\": \"livy-conf\",\n \"Properties\": {\n \"livy.file.local-dir-whitelist\": \"/usr/lib/hudi\"\n }\n }\n]\n```\n\n上述配置让您可以在配置 Spark 会话时,直接引用 EMR leader 节点上的本地 ```/usr/lib/hudi/hudi-spark-bundle.jar```。或者,您也可以将 ```/usr/lib/hudi/hudi-spark-bundle.jar``` 复制到 HDFS 位置,并在初始化 Spark 会话时引用该项。以下是从笔记本初始化 Spark 会话的命令:\n\n```\n%%configure -f\n{\n \"conf\" : {\n \"spark.jars\":\"file:///usr/lib/hudi/hudi-spark-bundle.jar\",\n \"spark.serializer\":\"org.apache.spark.serializer.KryoSerializer\",\n \"spark.sql.extensions\":\"org.apache.spark.sql.hudi.HoodieSparkSessionExtension\"\n }\n}\n```\n\n#### **创建表**\n\n我们[创建](https://hudi.apache.org/docs/0.9.0/quick-start-guide#create-table)以下 Apache Hudi 表:```amazon_customer_review_hudi``` 和 ```amazon_customer_review_parquet_merge_source```\n\n```\namazon_customer_review_hudi and amazon_customer_review_parquet_merge_source\n\n%%sql \n\n/****************************\n创建 HUDI 表,该表与包含所选列的 Amazon Customer Reviews 表具有相同架构 \n*****************************/\n\n-- Hudi 0.9.0 配置 https://hudi.apache.org/docs/configurations\n-- Hudi 配置可以在选项块中设置为 hoodie.datasource.hive_sync.assume_date_partitioning = 'false',\n\n\ncreate table if not exists amazon_customer_review_hudi\n ( marketplace string,\n review_id string,\n customer_id string,\n product_title string,\n star_rating int,\n timestamp long ,\n review_date date,\n year string,\n month string ,\n day string\n )\n using hudi\n location 's3://EXAMPLE-BUCKET/my-hudi-dataset/'\n options ( \n type = 'cow', \n primaryKey = 'review_id',\n preCombineField = 'timestamp',\n hoodie.datasource.write.hive_style_partitioning = 'true'\n )\n partitioned by (year,month,day);\n \n\n-- 将位置 's3://EXAMPLE-BUCKET/my-hudi-dataset/' 更改为您已在 AWS 账户中创建的对应 S3 存储桶\n\n%%sql \n/****************************\n创建用作源的 amazon_customer_review_parquet_merge_source,以合并到 amazon_customer_review_hudi。\n该表包含 deleteRecord 列,用于跟踪是否需要删除记录\n*****************************/\n\n\ncreate table if not exists amazon_customer_review_parquet_merge_source \n (\n marketplace string,\n review_id string,\n customer_id string,\n product_title string,\n star_rating int,\n review_date date,\n deleteRecord string\n )\n STORED AS PARQUET\n LOCATION 's3://EXAMPLE-BUCKET-1/toBeMergeData/'\n\n\n-- 将位置 (s3://EXAMPLE-BUCKET-1/toBeMergeData/') 更改为您已在 AWS 账户中创建的对应 S3 存储桶\n```\n\n为了进行比较,如果使用编程方法创建 ```amazon_customer_review_hudi,则 PySpark``` 示例代码如下所示。\n\n```\n# 创建 DataFrame\ninputDF = spark.createDataFrame(\n [\n (\"Italy\", \"11\", \"1111\", \"table\", 5, 1648126827, \"2015/05/02\", \"2015\", \"05\", \"02\"),\n (\"Spain\", \"22\", \"2222\", \"chair\", 5, 1648126827, \"2015/05/02\", \"2015\", \"05\", \"02\") \n ],\n [\"marketplace\", \"review_id\", \"customer_id\", \"product_title\", \"star_rating\", \"timestamp\", \"review_date\", \"year\", \"month\", \"day\" ]\n)\n\n# 输出 inputDF 的架构 \ninputDF.printSchema()\n\n# 在单个 hudiOptions 变量中指定通用的 DataSourceWriteOptions\nhudiOptions = {\n\"hoodie.table.name\": \"amazon_customer_review_hudi\",\n\"hoodie.datasource.write.recordkey.field\": \"review_id\",\n\"hoodie.datasource.write.partitionpath.field\": \"year,month,day\",\n\"hoodie.datasource.write.precombine.field\": \"timestamp\",\n\"hoodie.datasource.write.hive_style_partitioning\": \"true\",\n\"hoodie.datasource.hive_sync.enable\": \"true\",\n\"hoodie.datasource.hive_sync.table\": \" amazon_customer_review_hudi\",\n\"hoodie.datasource.hive_sync.partition_fields\": \"year,month,day\",\n\"hoodie.datasource.hive_sync.partition_extractor_class\": \"org.apache.hudi.hive.MultiPartKeysValueExtractor\"\n}\n\n\n# 创建 Hudi 表并将数据插入位于所指定 S3 位置的 my_hudi_table_1 hudi 表中 \ninputDF.write \\\n .format(\"org.apache.hudi\")\\\n .option(\"hoodie.datasource.write.operation\", \"insert\")\\\n .options(**hudiOptions)\\\n .mode(\"append\")\\\n .save(\"s3://EXAMPLE-BUCKET/my-hudi-dataset/\") \n```\n\n#### **将数据插入 Hudi 表**\n\n我们将记录[插入](https://hudi.apache.org/docs/0.9.0/quick-start-guide#insert-data)表```amazon_customer_review_parquet_merge_source``` 以供合并操作使用。这包括插入一行以刷新插入、更新和删除\n\n```\n%%sql \n\n/****************************\n 将记录插入 amazon_customer_review_parquet_merge_source 以进行删除 \n*****************************/\n\n-- 将 merge as deleteRecord 设置为 yes 后,将从 amazon_customer_review_hudi 中删除记录\n\ninsert into amazon_customer_review_parquet_merge_source\n select\n 'italy',\n '11',\n '1111',\n 'table',\n 5,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n 'yes' \n \n \n\n%%sql\n/****************************\n 将记录插入 amazon_customer_review_parquet_merge_source 以进行更新\n*****************************/\n\n-- 在合并后,将从 amazon_customer_review_hudi 使用新的 Star rating 和 product_title 更新记录\n\ninsert into amazon_customer_review_parquet_merge_source\n select\n 'spain',\n '22',\n '2222',\n 'Relaxing chair',\n 4,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n 'no' \n\n\n%%sql\n/****************************\n 将记录插入 amazon_customer_review_parquet_merge_source 以进行插入 \n*****************************/\n\n-- 合并后,记录将插入 amazon_customer_review_hudi \n\ninsert into amazon_customer_review_parquet_merge_source\n select\n 'uk',\n '33',\n '3333',\n 'hanger',\n 3,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n 'no' \n```\n\n现在,我们将记录插入用作合并操作目标表的 ```amazon_customer_review_hudi``` 表。\n\n```\n%%sql\n\n/****************************\n 在合并后,将记录插入 amazon_customer_review_hudi 表以进行删除 \n*****************************/\n\n-- Spark SQL 日期时间函数 https://spark.apache.org/docs/latest/api/sql/index.html#date_add\n\ninsert into amazon_customer_review_hudi \n select \n 'italy',\n '11',\n '1111',\n 'table',\n 5,\n unix_timestamp(current_timestamp()) as timestamp,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n date_format(date '2015-05-02', \"yyyy\") as year,\n date_format(date '2015-05-02', \"MM\") as month,\n date_format(date '2015-05-02', \"dd\") as day \n\n\n%%sql\n/****************************\n 在合并后,将记录插入 amazon_customer_review_hudi table 以进行更新 \n*****************************/\n\ninsert into amazon_customer_review_hudi\n select \n 'spain',\n '22',\n '2222',\n 'chair ',\n 5,\n unix_timestamp(current_timestamp()) as timestamp,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n date_format(date '2015-05-02', \"yyyy\") as year,\n date_format(date '2015-05-02', \"MM\") as month,\n date_format(date '2015-05-02', \"dd\") as day \n```\n\n#### **合并到**\n\n我们来执行从 ```amazon_customer_review_parquet_merge_source``` [合并](https://hudi.apache.org/docs/0.9.0/quick-start-guide#mergeinto)到 ```amazon_customer_review_hudi```。\n\n```\n%%sql \n\n/*************************************\nMergeInto:将源合并到目标 \n**************************************/\n\n-- 源 amazon_customer_review_parquet_merge_source \n-- 目标 amazon_customer_review_hudi\n\nmerge into amazon_customer_review_hudi as target\nusing ( \n select\n marketplace,\n review_id,\n customer_id,\n product_title,\n star_rating,\n review_date,\n deleteRecord,\n date_format(review_date, \"yyyy\") as year,\n date_format(review_date, \"MM\") as month,\n date_format(review_date, \"dd\") as day\n from amazon_customer_review_parquet_merge_source ) source\non target.review_id = source.review_id \nwhen matched and deleteRecord != 'yes' then \n\nupdate set target.timestamp = unix_timestamp(current_timestamp()), \ntarget.star_rating = source.star_rating,\ntarget.product_title = source.product_title\n\nwhen matched and deleteRecord = 'yes' then delete\n\nwhen not matched then insert \n ( target.marketplace,\n target.review_id,\n target.customer_id,\n target.product_title,\n target.star_rating,\n target.timestamp ,\n target.review_date,\n target.year ,\n target.month ,\n target.day\n ) \n values\n (\n source.marketplace,\n source.review_id,\n source.customer_id,\n source.product_title,\n source.star_rating,\n unix_timestamp(current_timestamp()),\n source.review_date,\n source.year ,\n source.month ,\n source.day \n )\n```\n\n#### **注意事项和限制**\n\n- 到目前为止,合并条件只能应用于主键。\n- ```-- 合并条件只能用于主键```\n- ```on target.review_id = source.review_id```\n- 写入时复制(CoW, Copy on Write)表上支持部分更新,但读取时合并(MoR, Merge on Read)表上不支持。\n- 目标表的字段不能是 MoR 表的更新表达式的右侧值:\n- ```-- 更新将导致错误,因为目标列位于表达式的右侧```\n- ```update set target.star_rating = target.star_rating +1 ```\n\n#### **删除记录**\n\n现在我们[删除](https://hudi.apache.org/docs/0.9.0/quick-start-guide#deletes)插入的记录。\n\n```\n%%sql\n\n/*************************************\n从 amazon_customer_review_hudi 表中删除插入的记录 \n**************************************/\nDelete from amazon_customer_review_hudi where review_id == '22'\n\n\n%%sql \n/*************************************\n从 amazon_customer_review_hudi 表中查询已删除的记录 \n**************************************/\nselect * from amazon_customer_review_hudi where review_id == '22'\n```\n\n#### **架构演变**\n\nHudi 支持常见的[模式演变](https://hudi.apache.org/docs/0.9.0/schema_evolution)场景,例如添加可为空的字段或提升字段的数据类型。我们将新列 **ssid**(类型为 int)添加到现有 ```amazon_customer_review_hudi``` 表中,并使用额外的列插入记录。Hudi 允许查询具有更新的表架构的新旧数据和新数据。\n\n```\n%%sql\n\n/*************************************\n将类型为 int 的新列名 ssid 添加到 amazon_customer_review_hudi 表\n**************************************/\n\nALTER TABLE amazon_customer_review_hudi ADD COLUMNS (ssid int)\n\n%%sql\n/*************************************\n将新记录添加到更改后的表 amazon_customer_review_hudi \n**************************************/\ninsert into amazon_customer_review_hudi\n select \n 'germany',\n '55',\n '5555',\n 'car',\n 5,\n unix_timestamp(current_timestamp()) as timestamp,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n 10 as ssid,\n date_format(date '2015-05-02', \"yyyy\") as year,\n date_format(date '2015-05-02', \"MM\") as month,\n date_format(date '2015-05-02', \"dd\") as day \n\n%%sql \n/*************************************\n将 ssid 类型从 int 提升为 long \n**************************************/\nALTER TABLE amazon_customer_review_hudi CHANGE COLUMN ssid ssid long\n\n\n%%sql \n/*************************************\n从 amazon_customer_review_hudi 表中查询数据\n**************************************/\nselect * from amazon_customer_review_hudi where review_id == '55'\n```\n\n### **Spark 性能改进**\n\n#### **查询端改进**\n\nApache Hudi 表现在已在元存储中注册为 Spark 数据源表。这使得 Hudi 表上的 Spark SQL 查询能够针对在写入时复制的表使用 Spark 的原生 [Parquet Reader](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala),针对读取时合并的表使用 Hudi 的自定义 [MergeOnReadSnapshotRelation](https://github.com/apache/hudi/blob/release-0.9.0/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala)。因此,它不再依赖于 Spark 中的 Hive 输入格式回退,后者的维护和效率不如 Spark 的原生读取器。这带来了许多优化,例如使用 Spark 的原生 Parquet Reader,以及实施 Hudi 自己的 Spark [FileIndex](https://github.com/apache/hudi/blob/release-0.9.0/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala) 实施。文件索引通过优化缓存、支持分区修剪以及通过 Hudi 元数据表(而不是直接从 Amazon S3 列出)列出文件的能力,帮助提高文件列表性能。此外,Hudi 现在支持通过 Spark 数据源进行[时间旅行查询](https://hudi.apache.org/docs/quick-start-guide#time-travel-query),这使您可以查询历史时间点的数据集快照。\n\n其他需要注意的重要事项包括:\n\n- 在通过 Spark SQL 进行查询时,不再需要 ```spark.sql.hive.convertMetastoreParquet=false``` 和 ```mapreduce.input.pathFilter.class=org.apache.hudi.hadoop.HoodieROTablePathFilter``` 这样的配置。\n- 现在,通过数据源 API 查询 Hudi 数据集时,您可以使用非全局查询路径。这使您可以通过基本路径查询表,而不必在查询路径中指定 ```*```。\n\n我们运行了一个源于 3 TB 规模 TPC-DS 基准测试的性能基准测试,以确定对于写入时复制的表,EMR 6.5.0 上的 Hudi 0.9.0 相对于 EMR 6.2.0 上的 Hudi 0.6.0(2021 年初)的查询性能提升。这些查询运行在 5 节点的 c5.9xlarge EMR 集群中。\n\n从几何均值来看,使用 Hudi 0.9.0 的查询比使用 Hudi 0.6.0 的查询要快 **3 倍**。下图比较了对于两个 Amazon EMR/Hudi 版本,在 TPC-DS 3TB 查询数据集中运行全部查询的总累计运行时和运行时几何均值(值越低越好)。\n\n![image.png](https://dev-media.amazoncloud.cn/a16c5034e146404383e43fd7d615c0d1_image.png)\n\n![image.png](https://dev-media.amazoncloud.cn/2b459f238e2d42888c489fcf3136a842_image.png)\n\n从几何均值来看,使用 Hudi 0.9.0 的查询比使用 Hudi 0.6.0 的查询要快 **3 倍**。\n\n#### **写入器方面的改进**\n\n##### **虚拟键支持**\n\nApache Hudi 通过向数据集添加额外的列来维护元数据。这使它能够支持 upsert/delete 操作以及与之相关的各种功能,例如增量查询、压缩等。这些元数据列(即 ```_hoodie_commit_time```、```_hoodie_record_key```、```_hoodie_partition_path```、```_hoodie_file_name``` 和 ```_hoodie_commit_seqno)```让 Hudi 可以唯一地标识记录、记录所在的分区/文件以及更新了记录的最后一次提交。\n\n但是,生成和维护这些元数据列会增加 Hudi 表在磁盘上占用的存储空间。其中一些列,例如 ```_hoodie_record_key``` 和 ```_hoodie_partition_path```,可以从已存储在数据集中的其他数据列构造。Apache Hudi 0.9.0 引入了对虚拟键的支持。这使得用户可以禁用这些元数据列的生成,改为依靠实际的数据列,使用相应的键生成器动态构造记录键/分区路径。这有助于减少存储空间占用,以及缩短摄取入时间。但是,此功能需要注意以下几点:\n\n- 它只可用于仅附加/不可改变数据。它不能用于需要更新插入和删除的使用场景,这些情况下需要 ```_hoodie_record_key``` 和 ```_hoodie_partition_path``` 等列,以使 Bloom 索引正常工作。\n- 不支持增量查询,因为它们需要 ```_hoodie_commit_time``` 筛选在特定时间写入/更新的记录。\n- 启用此功能后,将无法为现有表关闭该功能。\n\n默认情况下,该功能处于关闭状态,可以通过将 [hoodie.populate.meta.fields](https://hudi.apache.org/docs/0.9.0/configurations#hoodiepopulatemetafields) 设置为 false 来启用。我们对公开的 [Amazon Customer Reviews 数据集](https://s3.amazonaws.com/amazon-reviews-pds/readme.html),使用批量插入衡量了写入性能和存储占用空间的改进。以下是我们使用的代码片段:\n\n```\nimport org.apache.hudi.DataSourceWriteOptions\nimport org.apache.hudi.config.HoodieWriteConfig\nimport org.apache.spark.sql.SaveMode\n\nvar srcPath = \"s3://amazon-reviews-pds/parquet/\"\nvar tableName = \"amazon_reviews_table\"\nvar tablePath = \"s3://<bucket>/<prefix>/\" + tableName\n\nval inputDF = spark.read.format(\"parquet\").load(srcPath)\n\ninputDF.write.format(\"hudi\")\n .option(HoodieWriteConfig.TABLE_NAME, tableName)\n .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)\n .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)\n .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, \"review_id\")\n .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, \"product_category\") \n .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, \"review_date\")\n .option(\"hoodie.populate.meta.fields\", \"<true/false>\")\n .mode(SaveMode.Overwrite)\n .save(tablePath)\n```\n\n该实验在四节点 c4.2xlarge EMR 集群(一个 leader,三个 core)上运行。我们观察到,启用虚拟键后,写入运行时性能提高 **10.63%**,存储空间占用减少 **8.67%**。下图比较了使用和不使用虚拟键的批量插入运行时和表大小(值越小越好)\n\n![image.png](https://dev-media.amazoncloud.cn/7db45b1a0385472594ac4eb4f894241f_image.png)\n\n![image.png](https://dev-media.amazoncloud.cn/0524cd6db99c4664839ed9521d6491b3_image.png)\n\n#### **基于时间轴服务器的标记机制**\n\nApache Hudi 在写入操作期间,支持自动清理写入的未提交数据。通过生成与每个数据文件对应的标记文件来支持这种清理,利用这种方法可以跟踪感兴趣的数据文件,而不必通过列出所有文件来扫描整个表。尽管相比在整个表中扫描未提交的数据文件,现有标记机制的效率要高得多,但它仍然会对 Amazon S3 数据湖产生性能影响。例如,写入大量标记文件(每个数据文件一个标记),然后在成功提交后将其删除可能会需要相当长的时间,有时大约需要几分钟。此外,同时写入大量数据/标记文件时,可能会达到 Amazon S3 节流限制。\n\nApache Hudi 0.9.0 为这种标记机制引入了基于时间轴服务器的新实施。这提高了整体写入性能,并显著降低达到 Amazon S3 节流限制的可能性,从而提高了处理 Amazon S3 工作负载的效率。新机制将 Hudi 的时间轴服务器组件作为一个中心位置,用于处理(来自所有执行程序的)所有标记创建/删除请求,从而实现了对这些请求的批量处理并减少向 Amazon S3 发出的请求数量。因此,拥有 Amazon S3 数据湖的用户可以利用此功能来提高写操作性能,并避免因标记文件管理而导致的节流。这对于写入大量数据文件(例如 1 万或更多)的场景的影响尤为明显。\n\n这种新的机制默认情况下不启用,可以通过为写操作将 ```hoodie.write.markers.type``` 设置为 ```timeline_server_based``` 来启用。有关该功能的详细信息,请参阅 Apache Hudi 社区的此[博文](https://hudi.apache.org/blog/2021/08/18/improving-marker-mechanism)。\n\n### **其他改进**\n\n#### **基于 DynamoDB 的锁定**\n\nOptimistic Concurrency Control 是 Apache Hudi 0.8.0 引入的主要功能之一,它允许多个并发写入器将数据提取到同一个 Hudi 表中。该功能需要获取锁,为此您可以使用 Zookeeper(原定设置为在 EMR 上)或 HiveMetaStore。但是,这些锁提供程序要求所有写入器与 Zookeeper/Hive 元存储运行在同一个集群上。\n\nAmazon EMR 上的 Apache Hudi 0.9.0 引入了 DynamoDB 作为锁提供程序。这允许跨不同集群运行的多个写入器将数据提取到同一个 Hudi 表中。此功能最初是在 Amazon EMR 上的 Hudi 0.9.0 中添加的,并重新用于开源 Hudi 的 0.10.0 版本中。要对此进行配置,应设置以下属性:\n\n![image.png](https://dev-media.amazoncloud.cn/ec47eb608a444660bd1deb1ee7bf6d7c_image.png)\n\n此外,必须通过以下方式启用 Optimistic Concurrency Control:\n\n```\nhoodie.write.concurrency.mode = optimistic_concurrency_control\nhoodie.cleaner.policy.failed.writes = LAZY\n```\n\n您可以使用 EMR 配置 API 及 ```hudi-defaults``` 分类,在集群级别无缝配置这些属性,以避免为每个作业配置此项。\n\n#### **删除分区**\n\nApache Hudi 0.9.0 为其 Spark 数据源 API 引入了 ```DELETE_PARTITION``` 操作,可用于删除分区。以下是如何利用此操作的 scala 示例:\n\n```\nimport org.apache.hudi.DataSourceWriteOptions\nimport org.apache.hudi.config.HoodieWriteConfig\nimport org.apache.spark.sql.SaveMode\n\nval deletePartitionDF = spark.emptyDataFrame\n\ndeletePartitionDF.write.format(\"hudi\")\n .option(HoodieWriteConfig.TABLE_NAME, \"<table name>\")\n .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)\n .option(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), \"<partition_value1>,<partition_value2>\")\n .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)\n .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, \"<record key(s)>\")\n .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, \"<partition field(s)>\") \n .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, \"<precombine key>\")\n .mode(SaveMode.Append)\n .save(\"<table path>\")\n```\n\n但是,这里有个已知问题:\n\n- 由于存在错误,Hive Sync 与 DELETE_PARTITION 操作一起执行时会失败。在删除分区操作之后执行的任何以后的插入/更新插入/删除操作中,Hive Sync 都将成功。此错误已在 Hudi 0.10.0 版本中修复。\n\n#### **异步集群**\n\nApache Hudi 0.9.0 通过 Spark 结构化流式传输接收器和 Delta Streamer,引入了对异步集群的支持。这样,用户就可以继续将数据提取到数据湖中,同时集群服务继续在后台运行以重新组织数据,用于提高查询性能并优化文件大小。这是通过在 Hudi 0.8.0 中引入的 Optimistic Concurrency Control 实现的。目前,只能为没有在接收任何并发更新的分区安排集群。有关如何开始使用此功能的详细信息,可以在此[博文](https://hudi.apache.org/blog/2021/08/23/async-clustering/)中找到。\n\n### **结论**\n\n在这篇文章中,我们介绍了在 Amazon EMR 版本 5.34 和 6.5.0 及更高版本上提供的 Hudi 0.9.0 一些令人兴奋的新功能。这些新功能使得仅使用 SQL 语句即可构建数据管道,从而更轻松地在 Amazon S3 上构建事务性数据湖。\n\n作为后续步骤,要动手体验 EMR 上的 Hudi 0.9.0,请通过[此处](https://github.com/aws-samples/emr-studio-notebook-examples/blob/main/examples/query-hudi-0.9.0-dataset-with-spark-sql.ipynb),在使用 Amazon EMR 6.5.0 版的 EMR Studio 上试用笔记本,并告诉我们您的反馈。\n\n### **本篇作者**\n\n![image.png](https://dev-media.amazoncloud.cn/fcca2e552dca41adb83b639934ac00fa_image.png)\n\n#### **Kunal Gautam**\nKunal Gautam 是亚马逊云科技的高级大数据架构师。 他在创建自己的初创公司以及与企业合作方面拥有丰富的经验,他具备独特的视角,可以让人员、业务和技术人员协同工作,为客户服务。他热衷于帮助客户踏上数字化转型之旅,使他们能够构建可扩展的数据和高级分析解决方案,从而及时获得洞察并制定关键的业务决策。业余时间,Kunal 喜欢跑马拉松、参加技术聚会和冥想静修。\n\n![image.png](https://dev-media.amazoncloud.cn/e221f116728a4193ae8c2464c7252af8_image.png)\n\n#### **Gabriele Cacciola**\n\nGabriele Cacciola 是一名高级数据架构师,在 Amazon Web Services 的专业服务团队工作。他拥有扎实的创业经验,目前正在帮助欧洲、中东和非洲地区的企业客户实施其想法,使用最新技术进行创新,并构建可扩展的数据和分析解决方案,以制定关键的业务决策。在业余时间,Gabriele 喜欢足球和烹饪。\n\n![image.png](https://dev-media.amazoncloud.cn/146baf446b09424bb3eaebe6884eeada_image.png)\n\n#### **Udit Mehrotra**\nUdit Mehrotra 是 Amazon Web Services 的一名软件开发工程师和 Apache Hudi PMC 成员/提交者。他致力于开发 Amazon EMR 前沿功能,并参与了 Apache Hudi、Apache Spark、Apache Hadoop 和 Apache Hive 等开源项目。闲暇之余,他喜欢弹吉他、旅行、刷剧以及和朋友一起出去玩。","render":"<p><a href=\"https://hudi.apache.org/\" target=\"_blank\">Apache Hudi</a> 是一个开源事务性数据湖框架,极大地简化了增量数据处理和数据管道开发。它面向 <a href=\"https://aws.amazon.com/s3/\" target=\"_blank\">Amazon Simple Storage Service(Amazon S3)</a> 或 Apache HDFS 上的数据湖,提供了事务支持和记录级的插入、更新和删除功能,从而实现了简化。Apache Hudi 支持与 Apache Spark、Apache Hive、Presto 和 Trino 等开源大数据分析框架集成。此外,Apache Hudi 还允许您以开放格式(例如 Apache Parquet 和 Apache Avro),在 Amazon S3 或 Apache HDFS 中维护数据。</p>\n<p>客户使用 Apache Hudi 的常见使用场景如下:</p>\n<ul>\n<li>简化数据摄入管道,这些管道用于处理来自流式传输和批量数据源的延迟到达或更新记录。</li>\n<li>使用更改数据捕获(CDC, Change Data Capture)从事务系统中摄入数据。</li>\n<li>实施数据删除管道以遵守数据隐私法规,例如 GDPR(General Data Protection Regulation, 通用数据保护条例)合规性。 遵守 GDPR 是当今现代化数据架构的必要条件,这包括“擦除权”或“被遗忘权”功能,并且可以使用 Apache Hudi 功能来实施,用于替代删除和更新操作。</li>\n</ul>\n<p>我们很高兴地宣布,Apache Hudi 0.9.0 已在 <a href=\"https://aws.amazon.com/emr/\" target=\"_blank\">Amazon EMR</a> 5.34 和 EMR 6.5.0 上提供。这是一个主要<a href=\"https://hudi.apache.org/releases/release-0.9.0/\" target=\"_blank\">版本</a>,其亮点在于包括了 Spark SQL DML 和 DDL 支持,以及其他一些写入器/读取器方面的改进。与 Hudi 0.6.0 相比,我们观察到的查询性能提高了 3 倍,这尤为显著,因此,如果您希望实施事务数据湖来达到极高的更新插入和删除操作速度,或者正在使用旧版本的Hudi,那么这是一个极佳的版本。在这篇文章中,我们将重点介绍 0.9.0 版本提供的以下新功能和改进:</p>\n<ul>\n<li>Spark SQL DML 和 DDL 支持:探索 Spark SQL DML 和 DDL 支持。</li>\n<li>性能改进:探索写入器和查询端引入的性能改进和与性能相关的新功能。</li>\n<li>其他功能:探索其他有用的功能,例如基于 <a href=\"https://aws.amazon.com/dynamodb/\" target=\"_blank\">Amazon DynamoDB</a> 的 Optimistic Concurrency Control(OCC)锁定、删除分区操作等。</li>\n</ul>\n<h3><a id=\"Spark_SQL_DML__DDL__14\"></a><strong>Spark SQL DML 和 DDL 支持</strong></h3>\n<p>最令人兴奋的新功能是 Apache Hudi 0.9.0 增加了对使用 Spark SQL 的 <a href=\"https://hudi.apache.org/docs/0.9.0/quick-start-guide\" target=\"_blank\">DDL/DML</a> 的支持。这使得 Hudi 面向所有人(非工程师、分析师等)提供更好的可访问性和可操作性迈出了重要一步。此外,它还使现有数据集可以轻松地迁移到 Apache Hudi 表中,并且它更接近使用 Spark SQL DML 和 DDL 的低代码范例,因此无需编写 scala/python 代码。</p>\n<p>用户现在可以使用 <code>CREATE TABLE....USING HUDI</code> 和 <code>REATE TABLE ..AS SELECT SQL</code> 语句创建表,以直接在 <a href=\"https://aws.amazon.com/glue/\" target=\"_blank\">Amazon Glue</a> 目录中管理表。</p>\n<p>然后,用户可以使用 <code>INSERT</code>、<code>UPDATE</code>、<code>MERGE INTO</code> 和 <code>DELETE SQL</code> 语句操作数据。<code>INSERT OVERWRITE</code> 语句可用于针对现有批处理 ETL 管道中的表或分区,覆盖其现有数据。</p>\n<p>我们来看一个简单的<a href=\"https://github.com/aws-samples/emr-studio-notebook-examples/blob/main/examples/query-hudi-0.9.0-dataset-with-spark-sql.ipynb\" target=\"_blank\">示例</a>,在该示例中,我们创建了一个 Hudi 表 <code>amazon_customer_review_hudi</code>,模仿了 <a href=\"https://s3.amazonaws.com/amazon-reviews-pds/readme.html\" target=\"_blank\">Amazon Customer Reviews</a> 公有数据集并执行以下活动:</p>\n<ul>\n<li>先决条件:创建 Amazon Simple Storage Service(S3)存储桶 <code>s3://EXAMPLE-BUCKET</code> 和 <code>s3://EXAMPLE-BUCKET-1</code></li>\n<li>创建分区的 Hudi 表 <code>amazon_product_review_hudi</code></li>\n<li>创建源 Hudi 表 <code>amazon_customer_review_parquet_merge_source</code>,其内容将与 <code>amazon_product_review_hudi</code> 表合并</li>\n<li>将数据插入 <code>amazon_customer_review_parquet_merge_source</code> 和 <code>amazon_product_review_hudi</code> 中,以及通过读取</li>\n<li><code>amazon_customer_review_parquet_merge_source</code> 中的数据并与 Hudi 表 <code>amazon_product_review_hudi</code> 合并来执行合并操作</li>\n<li>在 <code>amazon_customer_review_hudi</code> 上对之前插入的记录执行删除操作</li>\n</ul>\n<h3><a id=\"_Spark__31\"></a><strong>配置 Spark 会话</strong></h3>\n<p>我们通过 <a href=\"https://aws.amazon.com/emr/features/studio/\" target=\"_blank\">EMR studio</a> 笔记本使用以下脚本,用于配置 Spark 会话,以便结合使用 Apache Hudi DML 和 DDL 支持。以下<a href=\"https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-work-with-dataset.html\" target=\"_blank\">示例</a>演示了如何启动交互式 Spark shell、使用 Spark 提交或将 Amazon EMR Notebooks 与 Amazon EMR 上的 Hudi 结合使用。我们建议使用以下 Apache Livy 配置启动 EMR 集群:</p>\n<pre><code class=\"lang-\">[\n {\n &quot;Classification&quot;: &quot;livy-conf&quot;,\n &quot;Properties&quot;: {\n &quot;livy.file.local-dir-whitelist&quot;: &quot;/usr/lib/hudi&quot;\n }\n }\n]\n</code></pre>\n<p>上述配置让您可以在配置 Spark 会话时,直接引用 EMR leader 节点上的本地 <code>/usr/lib/hudi/hudi-spark-bundle.jar</code>。或者,您也可以将 <code>/usr/lib/hudi/hudi-spark-bundle.jar</code> 复制到 HDFS 位置,并在初始化 Spark 会话时引用该项。以下是从笔记本初始化 Spark 会话的命令:</p>\n<pre><code class=\"lang-\">%%configure -f\n{\n &quot;conf&quot; : {\n &quot;spark.jars&quot;:&quot;file:///usr/lib/hudi/hudi-spark-bundle.jar&quot;,\n &quot;spark.serializer&quot;:&quot;org.apache.spark.serializer.KryoSerializer&quot;,\n &quot;spark.sql.extensions&quot;:&quot;org.apache.spark.sql.hudi.HoodieSparkSessionExtension&quot;\n }\n}\n</code></pre>\n<h4><a id=\"_59\"></a><strong>创建表</strong></h4>\n<p>我们<a href=\"https://hudi.apache.org/docs/0.9.0/quick-start-guide#create-table\" target=\"_blank\">创建</a>以下 Apache Hudi 表:<code>amazon_customer_review_hudi</code> 和 <code>amazon_customer_review_parquet_merge_source</code></p>\n<pre><code class=\"lang-\">amazon_customer_review_hudi and amazon_customer_review_parquet_merge_source\n\n%%sql \n\n/****************************\n创建 HUDI 表,该表与包含所选列的 Amazon Customer Reviews 表具有相同架构 \n*****************************/\n\n-- Hudi 0.9.0 配置 https://hudi.apache.org/docs/configurations\n-- Hudi 配置可以在选项块中设置为 hoodie.datasource.hive_sync.assume_date_partitioning = 'false',\n\n\ncreate table if not exists amazon_customer_review_hudi\n ( marketplace string,\n review_id string,\n customer_id string,\n product_title string,\n star_rating int,\n timestamp long ,\n review_date date,\n year string,\n month string ,\n day string\n )\n using hudi\n location 's3://EXAMPLE-BUCKET/my-hudi-dataset/'\n options ( \n type = 'cow', \n primaryKey = 'review_id',\n preCombineField = 'timestamp',\n hoodie.datasource.write.hive_style_partitioning = 'true'\n )\n partitioned by (year,month,day);\n \n\n-- 将位置 's3://EXAMPLE-BUCKET/my-hudi-dataset/' 更改为您已在 AWS 账户中创建的对应 S3 存储桶\n\n%%sql \n/****************************\n创建用作源的 amazon_customer_review_parquet_merge_source,以合并到 amazon_customer_review_hudi。\n该表包含 deleteRecord 列,用于跟踪是否需要删除记录\n*****************************/\n\n\ncreate table if not exists amazon_customer_review_parquet_merge_source \n (\n marketplace string,\n review_id string,\n customer_id string,\n product_title string,\n star_rating int,\n review_date date,\n deleteRecord string\n )\n STORED AS PARQUET\n LOCATION 's3://EXAMPLE-BUCKET-1/toBeMergeData/'\n\n\n-- 将位置 (s3://EXAMPLE-BUCKET-1/toBeMergeData/') 更改为您已在 AWS 账户中创建的对应 S3 存储桶\n</code></pre>\n<p>为了进行比较,如果使用编程方法创建 <code>amazon_customer_review_hudi,则 PySpark</code> 示例代码如下所示。</p>\n<pre><code class=\"lang-\"># 创建 DataFrame\ninputDF = spark.createDataFrame(\n [\n (&quot;Italy&quot;, &quot;11&quot;, &quot;1111&quot;, &quot;table&quot;, 5, 1648126827, &quot;2015/05/02&quot;, &quot;2015&quot;, &quot;05&quot;, &quot;02&quot;),\n (&quot;Spain&quot;, &quot;22&quot;, &quot;2222&quot;, &quot;chair&quot;, 5, 1648126827, &quot;2015/05/02&quot;, &quot;2015&quot;, &quot;05&quot;, &quot;02&quot;) \n ],\n [&quot;marketplace&quot;, &quot;review_id&quot;, &quot;customer_id&quot;, &quot;product_title&quot;, &quot;star_rating&quot;, &quot;timestamp&quot;, &quot;review_date&quot;, &quot;year&quot;, &quot;month&quot;, &quot;day&quot; ]\n)\n\n# 输出 inputDF 的架构 \ninputDF.printSchema()\n\n# 在单个 hudiOptions 变量中指定通用的 DataSourceWriteOptions\nhudiOptions = {\n&quot;hoodie.table.name&quot;: &quot;amazon_customer_review_hudi&quot;,\n&quot;hoodie.datasource.write.recordkey.field&quot;: &quot;review_id&quot;,\n&quot;hoodie.datasource.write.partitionpath.field&quot;: &quot;year,month,day&quot;,\n&quot;hoodie.datasource.write.precombine.field&quot;: &quot;timestamp&quot;,\n&quot;hoodie.datasource.write.hive_style_partitioning&quot;: &quot;true&quot;,\n&quot;hoodie.datasource.hive_sync.enable&quot;: &quot;true&quot;,\n&quot;hoodie.datasource.hive_sync.table&quot;: &quot; amazon_customer_review_hudi&quot;,\n&quot;hoodie.datasource.hive_sync.partition_fields&quot;: &quot;year,month,day&quot;,\n&quot;hoodie.datasource.hive_sync.partition_extractor_class&quot;: &quot;org.apache.hudi.hive.MultiPartKeysValueExtractor&quot;\n}\n\n\n# 创建 Hudi 表并将数据插入位于所指定 S3 位置的 my_hudi_table_1 hudi 表中 \ninputDF.write \\\n .format(&quot;org.apache.hudi&quot;)\\\n .option(&quot;hoodie.datasource.write.operation&quot;, &quot;insert&quot;)\\\n .options(**hudiOptions)\\\n .mode(&quot;append&quot;)\\\n .save(&quot;s3://EXAMPLE-BUCKET/my-hudi-dataset/&quot;) \n</code></pre>\n<h4><a id=\"_Hudi__163\"></a><strong>将数据插入 Hudi 表</strong></h4>\n<p>我们将记录<a href=\"https://hudi.apache.org/docs/0.9.0/quick-start-guide#insert-data\" target=\"_blank\">插入</a>表<code>amazon_customer_review_parquet_merge_source</code> 以供合并操作使用。这包括插入一行以刷新插入、更新和删除</p>\n<pre><code class=\"lang-\">%%sql \n\n/****************************\n 将记录插入 amazon_customer_review_parquet_merge_source 以进行删除 \n*****************************/\n\n-- 将 merge as deleteRecord 设置为 yes 后,将从 amazon_customer_review_hudi 中删除记录\n\ninsert into amazon_customer_review_parquet_merge_source\n select\n 'italy',\n '11',\n '1111',\n 'table',\n 5,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n 'yes' \n \n \n\n%%sql\n/****************************\n 将记录插入 amazon_customer_review_parquet_merge_source 以进行更新\n*****************************/\n\n-- 在合并后,将从 amazon_customer_review_hudi 使用新的 Star rating 和 product_title 更新记录\n\ninsert into amazon_customer_review_parquet_merge_source\n select\n 'spain',\n '22',\n '2222',\n 'Relaxing chair',\n 4,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n 'no' \n\n\n%%sql\n/****************************\n 将记录插入 amazon_customer_review_parquet_merge_source 以进行插入 \n*****************************/\n\n-- 合并后,记录将插入 amazon_customer_review_hudi \n\ninsert into amazon_customer_review_parquet_merge_source\n select\n 'uk',\n '33',\n '3333',\n 'hanger',\n 3,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n 'no' \n</code></pre>\n<p>现在,我们将记录插入用作合并操作目标表的 <code>amazon_customer_review_hudi</code> 表。</p>\n<pre><code class=\"lang-\">%%sql\n\n/****************************\n 在合并后,将记录插入 amazon_customer_review_hudi 表以进行删除 \n*****************************/\n\n-- Spark SQL 日期时间函数 https://spark.apache.org/docs/latest/api/sql/index.html#date_add\n\ninsert into amazon_customer_review_hudi \n select \n 'italy',\n '11',\n '1111',\n 'table',\n 5,\n unix_timestamp(current_timestamp()) as timestamp,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n date_format(date '2015-05-02', &quot;yyyy&quot;) as year,\n date_format(date '2015-05-02', &quot;MM&quot;) as month,\n date_format(date '2015-05-02', &quot;dd&quot;) as day \n\n\n%%sql\n/****************************\n 在合并后,将记录插入 amazon_customer_review_hudi table 以进行更新 \n*****************************/\n\ninsert into amazon_customer_review_hudi\n select \n 'spain',\n '22',\n '2222',\n 'chair ',\n 5,\n unix_timestamp(current_timestamp()) as timestamp,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n date_format(date '2015-05-02', &quot;yyyy&quot;) as year,\n date_format(date '2015-05-02', &quot;MM&quot;) as month,\n date_format(date '2015-05-02', &quot;dd&quot;) as day \n</code></pre>\n<h4><a id=\"_268\"></a><strong>合并到</strong></h4>\n<p>我们来执行从 <code>amazon_customer_review_parquet_merge_source</code> <a href=\"https://hudi.apache.org/docs/0.9.0/quick-start-guide#mergeinto\" target=\"_blank\">合并</a>到 <code>amazon_customer_review_hudi</code>。</p>\n<pre><code class=\"lang-\">%%sql \n\n/*************************************\nMergeInto:将源合并到目标 \n**************************************/\n\n-- 源 amazon_customer_review_parquet_merge_source \n-- 目标 amazon_customer_review_hudi\n\nmerge into amazon_customer_review_hudi as target\nusing ( \n select\n marketplace,\n review_id,\n customer_id,\n product_title,\n star_rating,\n review_date,\n deleteRecord,\n date_format(review_date, &quot;yyyy&quot;) as year,\n date_format(review_date, &quot;MM&quot;) as month,\n date_format(review_date, &quot;dd&quot;) as day\n from amazon_customer_review_parquet_merge_source ) source\non target.review_id = source.review_id \nwhen matched and deleteRecord != 'yes' then \n\nupdate set target.timestamp = unix_timestamp(current_timestamp()), \ntarget.star_rating = source.star_rating,\ntarget.product_title = source.product_title\n\nwhen matched and deleteRecord = 'yes' then delete\n\nwhen not matched then insert \n ( target.marketplace,\n target.review_id,\n target.customer_id,\n target.product_title,\n target.star_rating,\n target.timestamp ,\n target.review_date,\n target.year ,\n target.month ,\n target.day\n ) \n values\n (\n source.marketplace,\n source.review_id,\n source.customer_id,\n source.product_title,\n source.star_rating,\n unix_timestamp(current_timestamp()),\n source.review_date,\n source.year ,\n source.month ,\n source.day \n )\n</code></pre>\n<h4><a id=\"_332\"></a><strong>注意事项和限制</strong></h4>\n<ul>\n<li>到目前为止,合并条件只能应用于主键。</li>\n<li><code>-- 合并条件只能用于主键</code></li>\n<li><code>on target.review_id = source.review_id</code></li>\n<li>写入时复制(CoW, Copy on Write)表上支持部分更新,但读取时合并(MoR, Merge on Read)表上不支持。</li>\n<li>目标表的字段不能是 MoR 表的更新表达式的右侧值:</li>\n<li><code>-- 更新将导致错误,因为目标列位于表达式的右侧</code></li>\n<li><code>update set target.star_rating = target.star_rating +1 </code></li>\n</ul>\n<h4><a id=\"_342\"></a><strong>删除记录</strong></h4>\n<p>现在我们<a href=\"https://hudi.apache.org/docs/0.9.0/quick-start-guide#deletes\" target=\"_blank\">删除</a>插入的记录。</p>\n<pre><code class=\"lang-\">%%sql\n\n/*************************************\n从 amazon_customer_review_hudi 表中删除插入的记录 \n**************************************/\nDelete from amazon_customer_review_hudi where review_id == '22'\n\n\n%%sql \n/*************************************\n从 amazon_customer_review_hudi 表中查询已删除的记录 \n**************************************/\nselect * from amazon_customer_review_hudi where review_id == '22'\n</code></pre>\n<h4><a id=\"_362\"></a><strong>架构演变</strong></h4>\n<p>Hudi 支持常见的<a href=\"https://hudi.apache.org/docs/0.9.0/schema_evolution\" target=\"_blank\">模式演变</a>场景,例如添加可为空的字段或提升字段的数据类型。我们将新列 <strong>ssid</strong>(类型为 int)添加到现有 <code>amazon_customer_review_hudi</code> 表中,并使用额外的列插入记录。Hudi 允许查询具有更新的表架构的新旧数据和新数据。</p>\n<pre><code class=\"lang-\">%%sql\n\n/*************************************\n将类型为 int 的新列名 ssid 添加到 amazon_customer_review_hudi 表\n**************************************/\n\nALTER TABLE amazon_customer_review_hudi ADD COLUMNS (ssid int)\n\n%%sql\n/*************************************\n将新记录添加到更改后的表 amazon_customer_review_hudi \n**************************************/\ninsert into amazon_customer_review_hudi\n select \n 'germany',\n '55',\n '5555',\n 'car',\n 5,\n unix_timestamp(current_timestamp()) as timestamp,\n TO_DATE(CAST(UNIX_TIMESTAMP('2015/05/02', 'yyyy/MM/dd') AS TIMESTAMP)) as review_date,\n 10 as ssid,\n date_format(date '2015-05-02', &quot;yyyy&quot;) as year,\n date_format(date '2015-05-02', &quot;MM&quot;) as month,\n date_format(date '2015-05-02', &quot;dd&quot;) as day \n\n%%sql \n/*************************************\n将 ssid 类型从 int 提升为 long \n**************************************/\nALTER TABLE amazon_customer_review_hudi CHANGE COLUMN ssid ssid long\n\n\n%%sql \n/*************************************\n从 amazon_customer_review_hudi 表中查询数据\n**************************************/\nselect * from amazon_customer_review_hudi where review_id == '55'\n</code></pre>\n<h3><a id=\"Spark__407\"></a><strong>Spark 性能改进</strong></h3>\n<h4><a id=\"_409\"></a><strong>查询端改进</strong></h4>\n<p>Apache Hudi 表现在已在元存储中注册为 Spark 数据源表。这使得 Hudi 表上的 Spark SQL 查询能够针对在写入时复制的表使用 Spark 的原生 <a href=\"https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala\" target=\"_blank\">Parquet Reader</a>,针对读取时合并的表使用 Hudi 的自定义 <a href=\"https://github.com/apache/hudi/blob/release-0.9.0/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala\" target=\"_blank\">MergeOnReadSnapshotRelation</a>。因此,它不再依赖于 Spark 中的 Hive 输入格式回退,后者的维护和效率不如 Spark 的原生读取器。这带来了许多优化,例如使用 Spark 的原生 Parquet Reader,以及实施 Hudi 自己的 Spark <a href=\"https://github.com/apache/hudi/blob/release-0.9.0/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala\" target=\"_blank\">FileIndex</a> 实施。文件索引通过优化缓存、支持分区修剪以及通过 Hudi 元数据表(而不是直接从 Amazon S3 列出)列出文件的能力,帮助提高文件列表性能。此外,Hudi 现在支持通过 Spark 数据源进行<a href=\"https://hudi.apache.org/docs/quick-start-guide#time-travel-query\" target=\"_blank\">时间旅行查询</a>,这使您可以查询历史时间点的数据集快照。</p>\n<p>其他需要注意的重要事项包括:</p>\n<ul>\n<li>在通过 Spark SQL 进行查询时,不再需要 <code>spark.sql.hive.convertMetastoreParquet=false</code> 和 <code>mapreduce.input.pathFilter.class=org.apache.hudi.hadoop.HoodieROTablePathFilter</code> 这样的配置。</li>\n<li>现在,通过数据源 API 查询 Hudi 数据集时,您可以使用非全局查询路径。这使您可以通过基本路径查询表,而不必在查询路径中指定 <code>*</code>。</li>\n</ul>\n<p>我们运行了一个源于 3 TB 规模 TPC-DS 基准测试的性能基准测试,以确定对于写入时复制的表,EMR 6.5.0 上的 Hudi 0.9.0 相对于 EMR 6.2.0 上的 Hudi 0.6.0(2021 年初)的查询性能提升。这些查询运行在 5 节点的 c5.9xlarge EMR 集群中。</p>\n<p>从几何均值来看,使用 Hudi 0.9.0 的查询比使用 Hudi 0.6.0 的查询要快 <strong>3 倍</strong>。下图比较了对于两个 Amazon EMR/Hudi 版本,在 TPC-DS 3TB 查询数据集中运行全部查询的总累计运行时和运行时几何均值(值越低越好)。</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/a16c5034e146404383e43fd7d615c0d1_image.png\" alt=\"image.png\" /></p>\n<p><img src=\"https://dev-media.amazoncloud.cn/2b459f238e2d42888c489fcf3136a842_image.png\" alt=\"image.png\" /></p>\n<p>从几何均值来看,使用 Hudi 0.9.0 的查询比使用 Hudi 0.6.0 的查询要快 <strong>3 倍</strong>。</p>\n<h4><a id=\"_428\"></a><strong>写入器方面的改进</strong></h4>\n<h5><a id=\"_430\"></a><strong>虚拟键支持</strong></h5>\n<p>Apache Hudi 通过向数据集添加额外的列来维护元数据。这使它能够支持 upsert/delete 操作以及与之相关的各种功能,例如增量查询、压缩等。这些元数据列(即 <code>_hoodie_commit_time</code>、<code>_hoodie_record_key</code>、<code>_hoodie_partition_path</code>、<code>_hoodie_file_name</code> 和 <code>_hoodie_commit_seqno)</code>让 Hudi 可以唯一地标识记录、记录所在的分区/文件以及更新了记录的最后一次提交。</p>\n<p>但是,生成和维护这些元数据列会增加 Hudi 表在磁盘上占用的存储空间。其中一些列,例如 <code>_hoodie_record_key</code> 和 <code>_hoodie_partition_path</code>,可以从已存储在数据集中的其他数据列构造。Apache Hudi 0.9.0 引入了对虚拟键的支持。这使得用户可以禁用这些元数据列的生成,改为依靠实际的数据列,使用相应的键生成器动态构造记录键/分区路径。这有助于减少存储空间占用,以及缩短摄取入时间。但是,此功能需要注意以下几点:</p>\n<ul>\n<li>它只可用于仅附加/不可改变数据。它不能用于需要更新插入和删除的使用场景,这些情况下需要 <code>_hoodie_record_key</code> 和 <code>_hoodie_partition_path</code> 等列,以使 Bloom 索引正常工作。</li>\n<li>不支持增量查询,因为它们需要 <code>_hoodie_commit_time</code> 筛选在特定时间写入/更新的记录。</li>\n<li>启用此功能后,将无法为现有表关闭该功能。</li>\n</ul>\n<p>默认情况下,该功能处于关闭状态,可以通过将 <a href=\"https://hudi.apache.org/docs/0.9.0/configurations#hoodiepopulatemetafields\" target=\"_blank\">hoodie.populate.meta.fields</a> 设置为 false 来启用。我们对公开的 <a href=\"https://s3.amazonaws.com/amazon-reviews-pds/readme.html\" target=\"_blank\">Amazon Customer Reviews 数据集</a>,使用批量插入衡量了写入性能和存储占用空间的改进。以下是我们使用的代码片段:</p>\n<pre><code class=\"lang-\">import org.apache.hudi.DataSourceWriteOptions\nimport org.apache.hudi.config.HoodieWriteConfig\nimport org.apache.spark.sql.SaveMode\n\nvar srcPath = &quot;s3://amazon-reviews-pds/parquet/&quot;\nvar tableName = &quot;amazon_reviews_table&quot;\nvar tablePath = &quot;s3://&lt;bucket&gt;/&lt;prefix&gt;/&quot; + tableName\n\nval inputDF = spark.read.format(&quot;parquet&quot;).load(srcPath)\n\ninputDF.write.format(&quot;hudi&quot;)\n .option(HoodieWriteConfig.TABLE_NAME, tableName)\n .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)\n .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)\n .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, &quot;review_id&quot;)\n .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, &quot;product_category&quot;) \n .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, &quot;review_date&quot;)\n .option(&quot;hoodie.populate.meta.fields&quot;, &quot;&lt;true/false&gt;&quot;)\n .mode(SaveMode.Overwrite)\n .save(tablePath)\n</code></pre>\n<p>该实验在四节点 c4.2xlarge EMR 集群(一个 leader,三个 core)上运行。我们观察到,启用虚拟键后,写入运行时性能提高 <strong>10.63%</strong>,存储空间占用减少 <strong>8.67%</strong>。下图比较了使用和不使用虚拟键的批量插入运行时和表大小(值越小越好)</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/7db45b1a0385472594ac4eb4f894241f_image.png\" alt=\"image.png\" /></p>\n<p><img src=\"https://dev-media.amazoncloud.cn/0524cd6db99c4664839ed9521d6491b3_image.png\" alt=\"image.png\" /></p>\n<h4><a id=\"_471\"></a><strong>基于时间轴服务器的标记机制</strong></h4>\n<p>Apache Hudi 在写入操作期间,支持自动清理写入的未提交数据。通过生成与每个数据文件对应的标记文件来支持这种清理,利用这种方法可以跟踪感兴趣的数据文件,而不必通过列出所有文件来扫描整个表。尽管相比在整个表中扫描未提交的数据文件,现有标记机制的效率要高得多,但它仍然会对 Amazon S3 数据湖产生性能影响。例如,写入大量标记文件(每个数据文件一个标记),然后在成功提交后将其删除可能会需要相当长的时间,有时大约需要几分钟。此外,同时写入大量数据/标记文件时,可能会达到 Amazon S3 节流限制。</p>\n<p>Apache Hudi 0.9.0 为这种标记机制引入了基于时间轴服务器的新实施。这提高了整体写入性能,并显著降低达到 Amazon S3 节流限制的可能性,从而提高了处理 Amazon S3 工作负载的效率。新机制将 Hudi 的时间轴服务器组件作为一个中心位置,用于处理(来自所有执行程序的)所有标记创建/删除请求,从而实现了对这些请求的批量处理并减少向 Amazon S3 发出的请求数量。因此,拥有 Amazon S3 数据湖的用户可以利用此功能来提高写操作性能,并避免因标记文件管理而导致的节流。这对于写入大量数据文件(例如 1 万或更多)的场景的影响尤为明显。</p>\n<p>这种新的机制默认情况下不启用,可以通过为写操作将 <code>hoodie.write.markers.type</code> 设置为 <code>timeline_server_based</code> 来启用。有关该功能的详细信息,请参阅 Apache Hudi 社区的此<a href=\"https://hudi.apache.org/blog/2021/08/18/improving-marker-mechanism\" target=\"_blank\">博文</a>。</p>\n<h3><a id=\"_479\"></a><strong>其他改进</strong></h3>\n<h4><a id=\"_DynamoDB__481\"></a><strong>基于 DynamoDB 的锁定</strong></h4>\n<p>Optimistic Concurrency Control 是 Apache Hudi 0.8.0 引入的主要功能之一,它允许多个并发写入器将数据提取到同一个 Hudi 表中。该功能需要获取锁,为此您可以使用 Zookeeper(原定设置为在 EMR 上)或 HiveMetaStore。但是,这些锁提供程序要求所有写入器与 Zookeeper/Hive 元存储运行在同一个集群上。</p>\n<p>Amazon EMR 上的 Apache Hudi 0.9.0 引入了 DynamoDB 作为锁提供程序。这允许跨不同集群运行的多个写入器将数据提取到同一个 Hudi 表中。此功能最初是在 Amazon EMR 上的 Hudi 0.9.0 中添加的,并重新用于开源 Hudi 的 0.10.0 版本中。要对此进行配置,应设置以下属性:</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/ec47eb608a444660bd1deb1ee7bf6d7c_image.png\" alt=\"image.png\" /></p>\n<p>此外,必须通过以下方式启用 Optimistic Concurrency Control:</p>\n<pre><code class=\"lang-\">hoodie.write.concurrency.mode = optimistic_concurrency_control\nhoodie.cleaner.policy.failed.writes = LAZY\n</code></pre>\n<p>您可以使用 EMR 配置 API 及 <code>hudi-defaults</code> 分类,在集群级别无缝配置这些属性,以避免为每个作业配置此项。</p>\n<h4><a id=\"_498\"></a><strong>删除分区</strong></h4>\n<p>Apache Hudi 0.9.0 为其 Spark 数据源 API 引入了 <code>DELETE_PARTITION</code> 操作,可用于删除分区。以下是如何利用此操作的 scala 示例:</p>\n<pre><code class=\"lang-\">import org.apache.hudi.DataSourceWriteOptions\nimport org.apache.hudi.config.HoodieWriteConfig\nimport org.apache.spark.sql.SaveMode\n\nval deletePartitionDF = spark.emptyDataFrame\n\ndeletePartitionDF.write.format(&quot;hudi&quot;)\n .option(HoodieWriteConfig.TABLE_NAME, &quot;&lt;table name&gt;&quot;)\n .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)\n .option(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), &quot;&lt;partition_value1&gt;,&lt;partition_value2&gt;&quot;)\n .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)\n .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, &quot;&lt;record key(s)&gt;&quot;)\n .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, &quot;&lt;partition field(s)&gt;&quot;) \n .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, &quot;&lt;precombine key&gt;&quot;)\n .mode(SaveMode.Append)\n .save(&quot;&lt;table path&gt;&quot;)\n</code></pre>\n<p>但是,这里有个已知问题:</p>\n<ul>\n<li>由于存在错误,Hive Sync 与 DELETE_PARTITION 操作一起执行时会失败。在删除分区操作之后执行的任何以后的插入/更新插入/删除操作中,Hive Sync 都将成功。此错误已在 Hudi 0.10.0 版本中修复。</li>\n</ul>\n<h4><a id=\"_525\"></a><strong>异步集群</strong></h4>\n<p>Apache Hudi 0.9.0 通过 Spark 结构化流式传输接收器和 Delta Streamer,引入了对异步集群的支持。这样,用户就可以继续将数据提取到数据湖中,同时集群服务继续在后台运行以重新组织数据,用于提高查询性能并优化文件大小。这是通过在 Hudi 0.8.0 中引入的 Optimistic Concurrency Control 实现的。目前,只能为没有在接收任何并发更新的分区安排集群。有关如何开始使用此功能的详细信息,可以在此<a href=\"https://hudi.apache.org/blog/2021/08/23/async-clustering/\" target=\"_blank\">博文</a>中找到。</p>\n<h3><a id=\"_529\"></a><strong>结论</strong></h3>\n<p>在这篇文章中,我们介绍了在 Amazon EMR 版本 5.34 和 6.5.0 及更高版本上提供的 Hudi 0.9.0 一些令人兴奋的新功能。这些新功能使得仅使用 SQL 语句即可构建数据管道,从而更轻松地在 Amazon S3 上构建事务性数据湖。</p>\n<p>作为后续步骤,要动手体验 EMR 上的 Hudi 0.9.0,请通过<a href=\"https://github.com/aws-samples/emr-studio-notebook-examples/blob/main/examples/query-hudi-0.9.0-dataset-with-spark-sql.ipynb\" target=\"_blank\">此处</a>,在使用 Amazon EMR 6.5.0 版的 EMR Studio 上试用笔记本,并告诉我们您的反馈。</p>\n<h3><a id=\"_535\"></a><strong>本篇作者</strong></h3>\n<p><img src=\"https://dev-media.amazoncloud.cn/fcca2e552dca41adb83b639934ac00fa_image.png\" alt=\"image.png\" /></p>\n<h4><a id=\"Kunal_Gautam_539\"></a><strong>Kunal Gautam</strong></h4>\n<p>Kunal Gautam 是亚马逊云科技的高级大数据架构师。 他在创建自己的初创公司以及与企业合作方面拥有丰富的经验,他具备独特的视角,可以让人员、业务和技术人员协同工作,为客户服务。他热衷于帮助客户踏上数字化转型之旅,使他们能够构建可扩展的数据和高级分析解决方案,从而及时获得洞察并制定关键的业务决策。业余时间,Kunal 喜欢跑马拉松、参加技术聚会和冥想静修。</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/e221f116728a4193ae8c2464c7252af8_image.png\" alt=\"image.png\" /></p>\n<h4><a id=\"Gabriele_Cacciola_544\"></a><strong>Gabriele Cacciola</strong></h4>\n<p>Gabriele Cacciola 是一名高级数据架构师,在 Amazon Web Services 的专业服务团队工作。他拥有扎实的创业经验,目前正在帮助欧洲、中东和非洲地区的企业客户实施其想法,使用最新技术进行创新,并构建可扩展的数据和分析解决方案,以制定关键的业务决策。在业余时间,Gabriele 喜欢足球和烹饪。</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/146baf446b09424bb3eaebe6884eeada_image.png\" alt=\"image.png\" /></p>\n<h4><a id=\"Udit_Mehrotra_550\"></a><strong>Udit Mehrotra</strong></h4>\n<p>Udit Mehrotra 是 Amazon Web Services 的一名软件开发工程师和 Apache Hudi PMC 成员/提交者。他致力于开发 Amazon EMR 前沿功能,并参与了 Apache Hudi、Apache Spark、Apache Hadoop 和 Apache Hive 等开源项目。闲暇之余,他喜欢弹吉他、旅行、刷剧以及和朋友一起出去玩。</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭