Amazon EMR Hudi 性能调优——Clustering

0
0
{"value":"随着数据体量的日益增长,人们对 Hudi 的查询性能也提出更多要求,除了 Parquet 存储格式本来的性能优势之外,还希望 Hudi 能够提供更多的性能优化的技术途径,尤其当对 Hudi 表进行高并发的写入,产生了大量的小文件之后,又需要使用 Presto/Trino 对 Hudi 表进行高吞吐的即席查询的场景里。怎样处理这些小文件,即把原本是写优化的Hudi 表,让它也能支持读优化,就成了使用 Hudi 的用户需要解决的问题。\n\n本文将通过一个实际的例子,使用 Clustering 技术,对Hudi表的数据文件进行重组和重写,从而提升Hudi表的SQL查询性能。\n\n#### **1.Hudi**\n\nHudi将数据仓库和数据库的核心功能直接引入数据湖。Hudi 提供了表、事务、高效的升级/删除、高级索引、流式摄取服务、数据集群(Clustering)、压缩优化和并发,同时将数据保持为开源文件格式,即可以把 Hudi 表的数据,保存在 HDFS,[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 等文件系统。\n\nHudi 之所以能快速流行起来,为多数开发用户接受,除了它可以轻松地在任何云平台上使用,并且可以通过任何流行的查询引擎(包括 Apache Spark、Flink、Presto、Trino、Hive 等)来访问Hudi的数据,更为难能可贵的,是Hudi的设计者考虑了尽可能多的业务场景和实际需求。\n\n从实际的业务场景出发,对数据湖平台对需求,首先可以先分为两大类:读偏好和写偏好,所以 Apache Hudi 提供了两种类型的表:\n\n- Copy On Write 表:简称 COW,这类Hudi表使用列文件格式(例如Parquet)存储数据,如果有数据写入,则会对整个 Parquet 文件进行复制,适合读偏好的操作\n- Merge On Read 表:简称 MOR,这类 Hudi 表使用列文件格式(例如Parquet)和行文件格式(例如 Avro)共同存储数据。数据更新时,写到行文件中,然后进行压缩,以同步或异步方式生成列文件,适合写偏好的操作\n\n再细分下来,Hudi 对两种类型的表,提供了不同的查询类型:\n\n- Snapshot Queries:快照查询,查询数据的最新快照,即全部的数据\n- Incremental Queries:增量查询,可以查询指定时间范围内的新增或修改的数据\n- Read Optimized Queries:读取优化查询,对MOR表来说,仅查询Parquet 文件中的数据\n\n以上三种查询类型,读优化查询只能用于MOR表(其实用于 COW 也没什么意义,本来COW就只有 Parquet 文件保存数据),另外两种查询模式,可以用于 COW 表和 MOR 表。\n\n不仅于此, Hudi 还在索引、事务管理、并发、压缩等方面,使用到了很多先进的理念和技术,这也给那些想对 Hudi 表进行性能调优的用户,提供了广阔的空间和更多的手段,例如 Index,Metadata Table,Clustering 等,本文将介绍 Clustering 这一技术。\n\n#### **2.Hudi Clustering**\n\n在数据湖/数据仓库中,关键的权衡之一是写入速度和查询性能之间的权衡。数据写入通常倾向于使用小文件来提高并行性,并使数据尽快可用于查询。但是,如果有很多小文件,查询性能会很差。此外,在写入过程中,数据通常根据到达时间写入同一位置的文件。然而,当频繁查询的数据位于同一位置时,查询引擎的性能会更好。\n\n这就对 Hudi 的数据重组提出了要求,即数据写入时使用小文件,数据查询时使用大文件。\n\n##### 2.1 设定 Hudi 表的 Clustering 参数\n\n在文档 [RFC-19] 中, 作者创建了一个Hudi表,并设定了 Clustering的参数,然后启动了异步 Clustering Job,并对结果进行了对比。请注意,该文档创建 Hudi 表的时候,调用 getQuickstartWriteConfigs 方法来设定参数hoodie.upsert.shuffle.parallelism 为 2,对数据量较大的测试,这显然是不够的。\n\n我们来看一个不同的例子,首先生成一套 TPC-DS 测试数据,它具体包括24个表,以及用于性能测试的99个 SQL 查询语句,生成数据的具体步骤请参考:\n\n[https://aws.amazon.com/cn/blogs/china/improve-the-cost-effectiveness-of-emr-by-using-amazon-graviton2/](https://aws.amazon.com/cn/blogs/china/improve-the-cost-effectiveness-of-emr-by-using-amazon-graviton2/)\n\n创建一个 [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) 集群,版本6.5.0,硬件配置如下:\n\n![image.png](https://dev-media.amazoncloud.cn/9fbd7afbb4cd4388aee15847e5d3f881_image.png)\n\n使用该集群生成一套100G的TPC-DS数据,大概需要30分钟。\n\n[Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) 提供了 Hudi 组件,接下来用生成的 TPC-DS 数据,来生成一个 Hudi 表,我们选取表 store_sales,脚本如下:\n\n```\\nspark-shell --master yarn \\\\\\n--deploy-mode client \\\\\\n--conf \\"spark.serializer=org.apache.spark.serializer.KryoSerializer\\" \\\\\\n--conf \\"spark.sql.hive.convertMetastoreParquet=false\\" \\\\\\n--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0\\n\\n\\n import org.apache.hudi.QuickstartUtils._\\n import org.apache.spark.sql.SaveMode._\\n import org.apache.hudi.DataSourceReadOptions._\\n import org.apache.hudi.DataSourceWriteOptions._\\n import org.apache.hudi.config.HoodieWriteConfig._\\n import java.util.Date\\n\\n val tableName = \\"store_sales\\"\\n val basePath = \\"s3://dalei-demo/hudi/tpcds_hudi_cluster/store_sales\\"\\n val partitionKey = \\"ss_sold_date_sk\\"\\n\\n val df = spark.read.format(\\"parquet\\").\\n load(s\\"s3://dalei-demo/tpcds/data10g/store_sales\\").\\n filter(\\"ss_sold_time_sk is not null and ss_item_sk is not null and ss_sold_date_sk is not null and ss_customer_sk is not null\\").\\n withColumn(\\"ts\\", lit((new Date()).getTime)).\\n repartition(1000)\\n\\n df.write.format(\\"org.apache.hudi\\").\\n option(TABLE_NAME, tableName).\\n option(\\"hoodie.datasource.write.precombine.field\\", \\"ts\\").\\n option(\\"hoodie.datasource.write.recordkey.field\\", \\"ss_sold_time_sk, ss_item_sk\\").\\n option(\\"hoodie.datasource.write.partitionpath.field\\", partitionKey).\\n option(\\"hoodie.upsert.shuffle.parallelism\\", \\"1000\\").\\n option(\\"hoodie.datasource.write.table.type\\", \\"MERGE_ON_READ\\").\\n option(\\"hoodie.datasource.write.operation\\", \\"upsert\\").\\n option(\\"hoodie.parquet.max.file.size\\", \\"10485760\\").\\n option(\\"hoodie.datasource.write.hive_style_partitioning\\", \\"true\\").\\n option(\\"hoodie.datasource.write.keygenerator.class\\", \\"org.apache.hudi.keygen.ComplexKeyGenerator\\").\\n option(\\"hoodie.datasource.hive_sync.enable\\", \\"true\\").\\n option(\\"hoodie.datasource.hive_sync.mode\\", \\"hms\\").\\n option(\\"hoodie.datasource.hive_sync.database\\", \\"tpcds_hudi_cluster\\").\\n option(\\"hoodie.datasource.hive_sync.table\\", tableName).\\n option(\\"hoodie.datasource.hive_sync.partition_fields\\", partitionKey).\\n option(\\"hoodie.parquet.small.file.limit\\", \\"0\\").\\n option(\\"hoodie.clustering.inline\\", \\"true\\").\\n option(\\"hoodie.clustering.inline.max.commits\\", \\"2\\").\\n option(\\"hoodie.clustering.plan.strategy.max.num.groups\\", \\"10000\\").\\n option(\\"hoodie.clustering.plan.strategy.target.file.max.bytes\\", \\"1073741824\\").\\n option(\\"hoodie.clustering.plan.strategy.small.file.limit\\", \\"629145600\\").\\n option(\\"hoodie.clustering.plan.strategy.sort.columns\\", \\"ss_customer_sk\\").\\n mode(Append).\\n save(basePath);\\n```\n\n解释一下上面代码中用到的主要参数:\n\n- hoodie.upsert.shuffle.parallelism: upsert 时 shuffle 的并发数\n- hoodie.parquet.max.file.size: 指定 Parquet 文件大小,为了对比 Clustering 前后的效果,我们希望能生成大量的小文件,所以这里限制了文件不能过大\n- hoodie.datasource.write.keygenerator.class: 如果是复合主键,需要指定该参数的值为 org.apache.hudi.keygen.ComplexKeyGenerator\n- hoodie.datasource.hive_sync.*: 这些参数是为了将 Hudi 表的 Schema 信息同步到 Hive MetaStore\n- hoodie.parquet.small.file.limit: 如果小于指定的值,将被看作是小文件,Upsert 时会将小文件替换成大文件(所谓的“扩展”),而不是新生成一个文件,将该值设为 0,即关闭了小文件限制,这样每次写入数据都会生成新的文件\n- hoodie.clustering.inline: 启用同步的 Clustering, 即 Commit 数量一旦达到,马上执行 Clustering\n- hoodie.clustering.inline.max.commits: 多少次 Commit 之后,就开始执行Clustering\n- hoodie.clustering.plan.strategy.max.num.groups: Clustering 将产生多少个File Group,默认30个\n- hoodie.clustering.plan.strategy.target.file.max.bytes: Clustering 后文件大小的限制\n- hoodie.clustering.plan.strategy.small.file.limit: 小于该值的文件,会被 Clustering\n- hoodie.clustering.plan.strategy.sort.columns: Clustering 时,使用该字段排序\n\n参数可以使用定义在 org.apache.hudi.DataSourceWriteOptions 里的常量(例如TABLE_NAME),也可以直接使用字符串(例如” hoodie.datasource.write.table.name”),效果都是一样的。\n\n##### 2.2 触发 Clustering\n\n之前的操作只是创建了Hudi表和配置了 Clustering,由于 Commit 次数不到2次(请注意看配置参数,之前的 Upsert 是1次 Commit),所以还没有触发 Clustering. 大家可以先把Commit理解为一次 Upsert操作。\n\n我们再模拟一次 Commit 操作,对 store_sales 表的一个分区的某个字段做修改,然后再Upsert到表里,代码如下:\n\n```\\nval df1 = spark.read.format(\\"hudi\\").option(\\"hoodie.datasource.query.type\\", \\"read_optimized\\").\\n load(\\"s3://dalei-demo/hudi/tpcds_hudi_cluster/store_sales\\").\\n filter(\\"ss_sold_date_sk=2450816\\").\\n drop(col(\\"_hoodie_commit_seqno\\")).drop(col(\\"_hoodie_commit_time\\")).\\n drop(col(\\"_hoodie_record_key\\")).drop(col(\\"_hoodie_partition_path\\")).\\n drop(col(\\"_hoodie_file_name\\"))\\n\\n val df2 = df1.withColumn(\\"ss_ext_tax\\", col(\\"ss_ext_tax\\") + lit(1.0))\\n\\n\\n df2.write.format(\\"org.apache.hudi\\").\\n option(TABLE_NAME, tableName).\\n option(\\"hoodie.datasource.write.precombine.field\\", \\"ts\\").\\n option(\\"hoodie.datasource.write.recordkey.field\\", \\"ss_sold_time_sk, ss_item_sk\\").\\n option(\\"hoodie.datasource.write.partitionpath.field\\", partitionKey).\\n option(\\"hoodie.upsert.shuffle.parallelism\\", \\"1000\\").\\n option(\\"hoodie.datasource.write.table.type\\", \\"MERGE_ON_READ\\").\\n option(\\"hoodie.datasource.write.operation\\", \\"upsert\\").\\n option(\\"hoodie.parquet.max.file.size\\", \\"10485760\\").\\n option(\\"hoodie.datasource.write.hive_style_partitioning\\", \\"true\\").\\n option(\\"hoodie.datasource.write.keygenerator.class\\", \\"org.apache.hudi.keygen.ComplexKeyGenerator\\").\\n option(\\"hoodie.datasource.hive_sync.enable\\", \\"true\\").\\n option(\\"hoodie.datasource.hive_sync.mode\\", \\"hms\\").\\n option(\\"hoodie.datasource.hive_sync.database\\", \\"tpcds_hudi_cluster\\").\\n option(\\"hoodie.datasource.hive_sync.table\\", tableName).\\n option(\\"hoodie.datasource.hive_sync.partition_fields\\", partitionKey).\\n option(\\"hoodie.parquet.small.file.limit\\", \\"0\\").\\n option(\\"hoodie.clustering.inline\\", \\"true\\").\\n option(\\"hoodie.clustering.inline.max.commits\\", \\"2\\").\\n option(\\"hoodie.clustering.plan.strategy.max.num.groups\\", \\"10000\\").\\n option(\\"hoodie.clustering.plan.strategy.target.file.max.bytes\\", \\"1073741824\\").\\n option(\\"hoodie.clustering.plan.strategy.small.file.limit\\", \\"629145600\\").\\n option(\\"hoodie.clustering.plan.strategy.sort.columns\\", \\"ss_customer_sk\\").\\n mode(Append).\\n save(basePath);\\n```\n\n代码执行完后,Commit 次数达到2次,Clustering 已经做后台执行了。\n\n**2.3 解释 Clustering 操作过程**\n\n在解释 Clustering 之前,先介绍一下 Hudi 表的操作文件的构成。\n\n##### 2.3.1 Hudi 表的操作文件\n\n以前面生成的 store_sales 表为例,在.hoodie 目录下,包含了该表的操作记录,如下图:\n\n![image.png](https://dev-media.amazoncloud.cn/970925a0ef4e460da9b35f1b210c37ef_image.png)\n\n图1: Hudi 表的操作文件\n\nHudi 操作的文件名,通常有三个部分组成:\n\n- Instant Time: 操作的时间,一个17位的时间戳(8位日期 + 9位时间,精确到毫秒)\n- Instant Action: 操作的类型,前端执行 Upsert 时,会产生的操作类型是deltacommit; Clustering会产生的操作类型是 replacecommit\n- Instant State: 操作的状态,requested 表示请求,inflight 表示正在进行,状态为空表示已经执行完成\n\n可以把 Clustering 请求文件20220701161238291.replacecommit.requested 文件下载下来,因为它是 Avro 格式的,使用 avro-tools 来查看一下它的内容:\n\n```\\n[ec2-user@cm ~]\$ aws s3 cp s3://dalei-demo/hudi/tpcds_hudi_cluster/store_sales/.hoodie/20220701161238291.replacecommit.requested ./\\n\\n[ec2-user@cm ~]\$ wget http://archive.apache.org/dist/avro/avro-1.9.2/java/avro-tools-1.9.2.jar\\n\\n[ec2-user@cm ~]\$ java -jar avro-tools-1.9.2.jar tojson 20220701161238291.replacecommit.requested >> 20220701161238291.replacecommit.requested.json\\n```\n\n可以使用浏览器将文件打开,如下图:\n\n![image.png](https://dev-media.amazoncloud.cn/fe6587e282a448878c607fedf44b7de6_image.png)\n\n图2: Clustering 请求文件\n\n上图中的inputGroups就是File Group,slices就是File Slice,还有File ID, 这三个概念在2.3.2里会介绍,这个文件就是发起了一个Clustering操作的请求,要把这些文件作为输入,生成更大的文件来替换它们,生成的文件也会在2.3.2里介绍。\n\n20220701161238291.replacecommit.inflight 文件的大小为0,说明 Clustering 已经立即完成了,再来看20220701161238291.replacecommit 文件,它是 json 格式的文件,可以直接打开,内容如下:\n\n```\\n{\\n \\"partitionToWriteStats\\" : {\\n \\"ss_sold_date_sk=2451080\\" : [ {\\n \\"fileId\\" : \\"91377ca5-48a9-491a-9c82-56a1ba4ba2e3-0\\",\\n \\"path\\" : \\"ss_sold_date_sk=2451080/91377ca5-48a9-491a-9c82-56a1ba4ba2e3-0_263-1967-116065_20220701161238291.parquet\\",\\n \\"prevCommit\\" : \\"null\\",\\n \\"numWrites\\" : 191119,\\n \\"numDeletes\\" : 0,\\n \\"numUpdateWrites\\" : 0,\\n \\"numInserts\\" : 191119,\\n \\"totalWriteBytes\\" : 11033199,\\n \\"totalWriteErrors\\" : 0,\\n \\"tempPath\\" : null,\\n \\"partitionPath\\" : \\"ss_sold_date_sk=2451080\\",\\n \\"totalLogRecords\\" : 0,\\n \\"totalLogFilesCompacted\\" : 0,\\n \\"totalLogSizeCompacted\\" : 0,\\n \\"totalUpdatedRecordsCompacted\\" : 0,\\n \\"totalLogBlocks\\" : 0,\\n \\"totalCorruptLogBlock\\" : 0,\\n \\"totalRollbackBlocks\\" : 0,\\n \\"fileSizeInBytes\\" : 11033199,\\n \\"minEventTime\\" : null,\\n \\"maxEventTime\\" : null\\n } ],\\n ......\\n },\\n \\"compacted\\" : false,\\n \\"extraMetadata\\" : {\\n \\"schema\\" : \\"{\\\\\\"type\\\\\\":\\\\\\"record\\\\\\",\\\\\\"name\\\\\\":\\\\\\"store_sales_record\\\\\\",\\\\\\"namespace\\\\\\":\\\\\\"hoodie.store_sales\\\\\\",\\\\\\"fields\\\\\\":[{\\\\\\"name\\\\\\":\\\\\\"ss_sold_time_sk\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_item_sk\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_customer_sk\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_cdemo_sk\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_hdemo_sk\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_addr_sk\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_store_sk\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_promo_sk\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_ticket_number\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"long\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_quantity\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_wholesale_cost\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_list_price\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_sales_price\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_ext_discount_amt\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_ext_sales_price\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_ext_wholesale_cost\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_ext_list_price\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_ext_tax\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_coupon_amt\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_net_paid\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_net_paid_inc_tax\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ss_net_profit\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"double\\\\\\"],\\\\\\"default\\\\\\":null},{\\\\\\"name\\\\\\":\\\\\\"ts\\\\\\",\\\\\\"type\\\\\\":\\\\\\"long\\\\\\"},{\\\\\\"name\\\\\\":\\\\\\"ss_sold_date_sk\\\\\\",\\\\\\"type\\\\\\":[\\\\\\"null\\\\\\",\\\\\\"int\\\\\\"],\\\\\\"default\\\\\\":null}]}\\"\\n },\\n \\"operationType\\" : \\"CLUSTER\\",\\n \\"partitionToReplaceFileIds\\" : {\\n \\"ss_sold_date_sk=2451080\\" : [ \\"2e2bec06-78fb-4059-ad89-2914f63dd1c0-0\\", \\"63fc2a2d-73e6-4261-ab30-ff44912e1696-0\\", \\"fc5fd42e-0f3f-434c-aa56-ca43c36c659d-0\\", \\"41299b3d-0be9-4338-bbad-6feeb41d4975-0\\", \\"c23873a1-03a3-424a-aa9c-044b40f1659f-0\\", \\"8af23590-4b8c-4b44-946e-0fdd73747e19-0\\", \\"7d740b43-83ca-48ca-a9dc-6b8e19fce6f0-0\\", \\"bc90dfd5-7323-4786-832c-4a6516332adf-0\\", \\"67abd081-dfcc-45d9-8f29-50a4fb71108c-0\\", \\"80bffa2b-df05-4c9f-9766-84a700403a89-0\\", \\"cbba9f2a-32cd-4c73-a38b-570cbb5501e4-0\\", \\"ea59e1a4-1f97-40e8-baae-3bedc5752095-0\\", \\"55cffcb6-5410-4c2a-a61d-01300be50171-0\\", \\"601b74b3-663d-4ef8-bf5e-158f135f81ea-0\\", \\"c46e8539-418e-482d-936e-a79464d869ac-0\\", \\"3dbe1997-bfc2-41a7-ac12-f302d3013c87-0\\", \\"acf9be44-71a3-436f-b595-c0f322f34172-0\\", \\"d7bbe517-87c7-482c-b885-a16164062b81-0\\", \\"f1060ef7-ba7c-4b8e-abc3-c409cd6af7d4-0\\" ],\\n ......\\n },\\n \\"writePartitionPaths\\" : [ \\"ss_sold_date_sk=2451080\\", ......],\\n \\"fileIdAndRelativePaths\\" : {\\n \\"742c6044-4f76-4d04-993c-d4255235d484-0\\" : \\"ss_sold_date_sk=2451329/742c6044-4f76-4d04-993c-d4255235d484-0_511-1967-116236_20220701161238291.parquet\\",\\n \\"20dafb58-8ae7-41d6-a02d-2b529bcdcc83-0\\" : \\"ss_sold_date_sk=2452226/20dafb58-8ae7-41d6-a02d-2b529bcdcc83-0_1407-1967-116870_20220701161238291.parquet\\",\\n ......\\n },\\n \\"totalRecordsDeleted\\" : 0,\\n \\"totalLogRecordsCompacted\\" : 0,\\n \\"totalLogFilesCompacted\\" : 0,\\n \\"totalCompactedRecordsUpdated\\" : 0,\\n \\"totalLogFilesSize\\" : 0,\\n \\"totalScanTime\\" : 0,\\n \\"totalCreateTime\\" : 151847,\\n \\"totalUpsertTime\\" : 0,\\n \\"minAndMaxEventTime\\" : {\\n \\"Optional.empty\\" : {\\n \\"val\\" : null,\\n \\"present\\" : false\\n }\\n }\\n}\\n```\n\n上面省略了大量重复的内容,主要的信息如下:\n\n- partitionToWriteStats: 列出将要被 Clustering 的分区,以及将要被 Clustering 的文件的信息\n- extraMetadata: Hudi 表的 Schema\n- operationType: 表示操作类型是 Clustering\n- partitionToReplaceFileIds: 列出将要被 Clustering 的分区和文件ID\n- fileIdAndRelativePaths: Clustering 产生的新的文件,请注意,文件名的时间戳\n\n##### 2.3.2 Hudi 表的数据文件\n\n接下来介绍一下 Hudi 表的数据文件的构成,以MOR类型的表为例,如下图:\n\n![image.png](https://dev-media.amazoncloud.cn/ba558b5e714f459da94fd9b6895893ce_image.png)\n\n图3: MOR 表的文件结构\n\n可以看出,文件的层级和包含关系是:Partition -> File Group -> File Slice -> Parquet + Log, 其中:\n\n- Partition: 分区,大家都比较熟悉了,有的表也可以没有Partition\n- File Group: 用于控制文件版本,同一个 File Group 里有唯一的File ID\n- File Slice: 用于组织文件数据,同一个 File Slice 里,不但File ID要相同,Instant Time 也要相同\n- Parquet文件是列存格式文件,Log 文件是行存文件格式,缺省值为Apache Avro, 它记录的是对同一个 File Slice 里的 Parquet 文件的修改。\n\n来看一个 File Group 的例子:\n\n![image.png](https://dev-media.amazoncloud.cn/c7e0499414624263aef2956f3d7f1c16_image.png)\n\n图4: FileGroup的例子\n\n在图4中,第一个文件和第二个文件的 File ID 相同,说明是同一个File Group,但是 Instant Time 不同,说明不是同一个 File Slice, 使用读优化的方式查询数据的时候,会读取 Instant Time更大的Parquet文件。\n\n来看 store_sales 表的数据文件,如下图:\n\n![image.png](https://dev-media.amazoncloud.cn/9c67673c0fb04e3fbbb4617b9e05e9ba_image.png)\n\n图5: store_sales表的数据文件\n\n在图5中,标记的Log文件和Parquet文件,具有相同的 File ID 和Instant Time,表示这个 Log 文件是在 Parquet 文件基础上做的Upsert 后产生的,它们属于同一个File Slice,如果是Snapshot查询,需要把同一个 File Slice 的 Log 文件和 Parquet 的数据一起读出来。\n\n在图5中,“68c14d48-cba6-4f82-a4b5-48fadf1282f6-0_0-1967-115358_20220701161238291.parquet”文件就是 clustering 生成的文件,可以把它下载下来,使用 parquet-tool 看看它的数据,如下:\n\n```\\n[ec2-user@cm ~]\$ wget http://logservice-resource.oss-cn-shanghai.aliyuncs.com/tools/parquet-tools-1.6.0rc3-SNAPSHOT.jar\\n\\n[ec2-user@cm ~]\$ aws s3 cp s3://dalei-demo/hudi/tpcds_hudi_cluster/store_sales/ss_sold_date_sk=2450816/68c14d48-cba6-4f82-a4b5-48fadf1282f6-0_0-1967-115358_20220701161238291.parquet ./\\n\\n[ec2-user@cm ~]\$ java -jar ./parquet-tools-1.6.0rc3-SNAPSHOT.jar head -n 10 68c14d48-cba6-4f82-a4b5-48fadf1282f6-0_0-1967-115358_20220701161238291.parquet\\n```\n\n上面的命令显示了10条 Clustering 后的 Parquet 文件的数据,注意观察Sort Column (ss_customer_sk) 的值,已经是排序的了。\n\n对比Clustering前后的文件,可以看出,原来用10个1M左右的Parquet文件保存的数据,Clustering 后只有1个5.1M的 Parquet 文件。至于保存相同条数的数据,文件的总容量为什么会下降这么多,请参考Parquet的相关知识:[https://parquet.apache.org/](https://parquet.apache.org/) .\n\n##### 2.3.3 多分区表的Clustering\n\n默认情况下, Hudi出于对工作负载的考虑,使用 hoodie.clustering.plan.strategy.max.num.groups 这个参数 (默认值是30),规定了Clustering只会创建30个File Group. (根据文件大小的设定,目前每个分区只需创建1个File Group)\n\n如果分区比较多的话,可以通过hoodie.clustering.plan.partition.filter.mode 参数,来规划Clustering的分区范围,具体可以参考:[https://hudi.apache.org/docs/configurations/#Clustering-Configs .](https://hudi.apache.org/docs/configurations/#Clustering-Configs .)\n\n#### **3.使用 Trino 来查询数据**\n\n##### **3.1 准备其它的表**\n\nstore_sales 表的数据就准备好了,类似的,我们也可以生成customer_address,customer,date_dim,item 这4个表,它们都是测试查询要用到的表。这4个表都是维度表,变化不会很频繁,所以都生成 COW 表,生成 customer_address 表的代码如下:\n\n```\\n val tableName = \\"customer_address\\"\\n val basePath = \\"s3://dalei-demo/hudi/tpcds_hudi_cluster/customer_address\\"\\n\\n val df = spark.read.format(\\"parquet\\").\\n load(s\\"s3://dalei-demo/tpcds/data10g/customer_address\\").\\n filter(\\"ca_address_sk is not null\\")\\n\\n df.write.format(\\"org.apache.hudi\\").\\n option(TABLE_NAME, tableName).\\n option(\\"hoodie.datasource.write.precombine.field\\", \\"ca_address_id\\").\\n option(\\"hoodie.datasource.write.recordkey.field\\", \\"ca_address_sk\\").\\n option(\\"hoodie.upsert.shuffle.parallelism\\", \\"100\\").\\n option(\\"hoodie.datasource.write.table.type\\", \\"COPY_ON_WRITE\\").\\n option(\\"hoodie.datasource.write.operation\\", \\"upsert\\").\\n option(\\"hoodie.parquet.max.file.size\\", \\"10485760\\").\\n option(\\"hoodie.datasource.hive_sync.enable\\", \\"true\\").\\n option(\\"hoodie.datasource.hive_sync.mode\\", \\"hms\\").\\n option(\\"hoodie.datasource.hive_sync.database\\", \\"tpcds_hudi_cluster\\").\\n option(\\"hoodie.datasource.hive_sync.table\\", tableName).\\n option(\\"hoodie.parquet.small.file.limit\\", \\"0\\").\\n option(\\"hoodie.clustering.inline\\", \\"true\\").\\n option(\\"hoodie.clustering.inline.max.commits\\", \\"2\\").\\n option(\\"hoodie.clustering.plan.strategy.target.file.max.bytes\\", \\"1073741824\\").\\n option(\\"hoodie.clustering.plan.strategy.small.file.limit\\", \\"629145600\\").\\n option(\\"hoodie.clustering.plan.strategy.sort.columns\\", \\"\\").\\n mode(Append).\\n save(basePath);\\n```\n\n触发 Clustering 的代码如下:\n\n```\\nval df1 = spark.read.format(\\"hudi\\").option(\\"hoodie.datasource.query.type\\", \\"read_optimized\\").\\n load(\\"s3://dalei-demo/hudi/tpcds_hudi_cluster/customer_address\\")\\n val df2 = df1.withColumn(\\"ca_gmt_offset\\", col(\\"ca_gmt_offset\\") + lit(1.1))\\n\\n df2.write.format(\\"org.apache.hudi\\").\\n option(TABLE_NAME, tableName).\\n option(\\"hoodie.datasource.write.precombine.field\\", \\"ca_address_id\\").\\n option(\\"hoodie.datasource.write.recordkey.field\\", \\"ca_address_sk\\").\\n option(\\"hoodie.upsert.shuffle.parallelism\\", \\"100\\").\\n option(\\"hoodie.datasource.write.table.type\\", \\"COPY_ON_WRITE\\").\\n option(\\"hoodie.datasource.write.operation\\", \\"upsert\\").\\n option(\\"hoodie.parquet.max.file.size\\", \\"10485760\\").\\n option(\\"hoodie.datasource.hive_sync.enable\\", \\"true\\").\\n option(\\"hoodie.datasource.hive_sync.mode\\", \\"hms\\").\\n option(\\"hoodie.datasource.hive_sync.database\\", \\"tpcds_hudi_cluster\\").\\n option(\\"hoodie.datasource.hive_sync.table\\", tableName).\\n option(\\"hoodie.parquet.small.file.limit\\", \\"0\\").\\n option(\\"hoodie.clustering.inline\\", \\"true\\").\\n option(\\"hoodie.clustering.inline.max.commits\\", \\"2\\").\\n option(\\"hoodie.clustering.plan.strategy.target.file.max.bytes\\", \\"1073741824\\").\\n option(\\"hoodie.clustering.plan.strategy.small.file.limit\\", \\"629145600\\").\\n option(\\"hoodie.clustering.plan.strategy.sort.columns\\", \\"\\").\\n mode(Append).\\n save(basePath);\\n```\n\n其余三个表的生成语句,跟 customer_address 表类似,大家可以试着生成一下。\n\n为了对比,我们还要生成一组同命名的没有使用 Clustering 的表,可以把这两组表,分别放到不同的Hive Database里,例如tpcds_hudi_cluster 和 pcds_hudi_nocluster. 生成没有Clustering 表的脚本,跟生成 Clustering 表的脚本类似,只要把Clustering 相关的参数拿掉就可以了。\n\n##### **3.2 查询**\n\n[Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) 6.5.0里提供了 Trino360, 我们使用它来测试Hudi表的SQL查询性能。启动命令如下:\n/usr/lib/trino/bin/trino-cli-360-executable –server localhost:8889 –catalog hive –schema tpcds_hudi_cluster\n\n如果按照2.1中生成了 TPC-DS 测试数据,就会看到一起生成的用于测试的查询语句,我们用 q6.sql 来测试,脚本如下:\n\n```\\n--q6.sql--\\n\\nSELECT state, cnt FROM (\\n SELECT a.ca_state state, count(*) cnt\\n FROM\\n customer_address a, customer c, store_sales_ro s, date_dim d, item i\\n WHERE a.ca_address_sk = c.c_current_addr_sk\\n AND c.c_customer_sk = s.ss_customer_sk\\n AND s.ss_sold_date_sk = d.d_date_sk\\n AND s.ss_item_sk = i.i_item_sk\\n AND d.d_month_seq =\\n (SELECT distinct (d_month_seq) FROM date_dim\\n WHERE d_year = 2001 AND d_moy = 1)\\n AND i.i_current_price > 1.2 *\\n (SELECT avg(j.i_current_price) FROM item j\\n WHERE j.i_category = i.i_category)\\n GROUP BY a.ca_state\\n) x\\nWHERE cnt >= 10\\nORDER BY cnt LIMIT 100\\n```\n\n对没有使用Clustering的Hudi表的查询如下:\n\n![image.png](https://dev-media.amazoncloud.cn/11cb4b4c7896459cb4388311391a922c_image.png)\n\n图6: 没有Clustering的Hudi表的查询\n\n对使用Clustering的Hudi表的查询如下:\n\n![image.png](https://dev-media.amazoncloud.cn/6cc2141693394933b618c7da6e5e736c_image.png)\n\n图7: 使用Clustering的Hudi表查询\n\n 可以看出:使用了 Clustering 的 Hudi 表,对比没有使用Clustering 的 Hudi 表,查询性能提升了35.4% , 读取记录数相同,读取文件容量大幅下降了。\n\n#### **4.对于使用Clustering的一些建议**\n\n##### 4.1 对Upsert的影响\n\n当执行 Clustering 的时候,对 File Group 实现的是 Snapshot Isolation 级别的隔离,所以对他们的修改是不允许对,也就是说,如果有 Upsert 和 Compaction 操作(MOR表),都要等 Clustering 结束之后\n\n##### 4.2 对负载的考虑\n\n如果需要 Clustering 的表数据量比较大,分区比较多的话,做一次Clustering 也会产生大量的负载,所以对 Clustering 的范围,Hudi提供了多种选择。对既要高并发写,又要高吞吐读对表,可以在高并发写的波谷时段,例如夜间进行 Clustering \n\n##### 4.3 同步 or 异步\n\nudi提供了两种 Clustering 方式,同步和异步。在对Hudi表高并发写的时候,不建议使用同步 Clustering,可以参考 [RFC-19] 中的方法,使用命令进行异步 Clustering.\n\n##### 4.4 要不要选择Sort Column\n\n如果有些字段经常要用来做 Join,而且该字段的值又可以保证非空的情况下,是可以放到 Sort Column 里的,如果 Clustering 之后还有多个文件,Sort Column 有助于确认每个文件里该字段的范围,可以避免过多的文件读取,提升 Join 操作的性能。原理上有点类似Hive Clustering,请参考:https://www.upgrad.com/blog/bucketing-in-hive/ .\n\n有兴趣的朋友可以对比一下,选不选择Sort Column在Join查询性能上的差别。\n\n##### 4.5 Clustering 等同于大文件吗?\n\n有人会说,Clustering 就是把小文件合并成大文件,那创建Hudi表的时候,我直接选择大文件不就可以了吗?如果只是考虑读性能,这么做确实可以。但 Clustering 提供了更多的选择,对于那种有时候高并发写(适合小文件),有时候高吞吐读(适合大文件)的表,就很适合用Clustering 了。\n\n##### 4.6 增量查询\n\n目前 Hudi 版本0.10下,Clustering 对增量查询的支持不是很好,Clustering 后的数据,都会被认为是“新”数据,也会出现在增量查询的结果中,然后这并不是我们期望的,因为本来没有对数据做任何改变,只是从小文件重写到大文件中,就被认为是增量数据了。所以依赖增量查询的表,不推荐使用 Clustering.\n\n##### 4.7 什么时间指定Clustering?\n\n可以在任何需要 Clustering 的时候,指定 Clustering 的相关配置,不是只能在创建 Hudi 表的时候指定 Clustering. 也就是说,对于任意的 Hudi 表,如果发现产生了大量的小文件,如果其他条件符合(没有高并发写、不依赖增量查询等),可以随时指定 Clustering.\n\n#### 参考文档\n\n[https://aws.amazon.com/cn/blogs/china/improve-the-cost-effectiveness-of-emr-by-using-amazon-graviton2/](https://aws.amazon.com/cn/blogs/china/improve-the-cost-effectiveness-of-emr-by-using-amazon-graviton2/)\n\n[https://hudi.apache.org/docs/clustering/#clustering-architecture](https://hudi.apache.org/docs/clustering/#clustering-architecture)\n\n[https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance)\n\n[https://parquet.apache.org/](https://parquet.apache.org/)\n\n[https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi.html](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi.html)\n\n[https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-presto.html](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-presto.html)\n\n[https://www.upgrad.com/blog/bucketing-in-hive/](https://www.upgrad.com/blog/bucketing-in-hive/)\n\n#### 本篇作者\n\n![image.png](https://dev-media.amazoncloud.cn/ff8adbd3ca3c4a5bac7db1674181d541_image.png)\n\n#### **Dalei Xu**\n\n Amazon 数据分析产品技术专家,负责 Amazon 数据分析的解决方案的咨询和架构设计。多年从事一线开发,在数据开发、架构设计、性能优化和组件管理方面积累了丰富的经验,希望能将 Amazon 优秀的服务组件,推广给更多的企业用户,实现与客户的双赢和共同成长。","render":"<p>随着数据体量的日益增长,人们对 Hudi 的查询性能也提出更多要求,除了 Parquet 存储格式本来的性能优势之外,还希望 Hudi 能够提供更多的性能优化的技术途径,尤其当对 Hudi 表进行高并发的写入,产生了大量的小文件之后,又需要使用 Presto/Trino 对 Hudi 表进行高吞吐的即席查询的场景里。怎样处理这些小文件,即把原本是写优化的Hudi 表,让它也能支持读优化,就成了使用 Hudi 的用户需要解决的问题。</p>\n<p>本文将通过一个实际的例子,使用 Clustering 技术,对Hudi表的数据文件进行重组和重写,从而提升Hudi表的SQL查询性能。</p>\n<h4><a id=\\"1Hudi_4\\"></a><strong>1.Hudi</strong></h4>\\n<p>Hudi将数据仓库和数据库的核心功能直接引入数据湖。Hudi 提供了表、事务、高效的升级/删除、高级索引、流式摄取服务、数据集群(Clustering)、压缩优化和并发,同时将数据保持为开源文件格式,即可以把 Hudi 表的数据,保存在 HDFS,Amazon S3 等文件系统。</p>\n<p>Hudi 之所以能快速流行起来,为多数开发用户接受,除了它可以轻松地在任何云平台上使用,并且可以通过任何流行的查询引擎(包括 Apache Spark、Flink、Presto、Trino、Hive 等)来访问Hudi的数据,更为难能可贵的,是Hudi的设计者考虑了尽可能多的业务场景和实际需求。</p>\n<p>从实际的业务场景出发,对数据湖平台对需求,首先可以先分为两大类:读偏好和写偏好,所以 Apache Hudi 提供了两种类型的表:</p>\n<ul>\\n<li>Copy On Write 表:简称 COW,这类Hudi表使用列文件格式(例如Parquet)存储数据,如果有数据写入,则会对整个 Parquet 文件进行复制,适合读偏好的操作</li>\n<li>Merge On Read 表:简称 MOR,这类 Hudi 表使用列文件格式(例如Parquet)和行文件格式(例如 Avro)共同存储数据。数据更新时,写到行文件中,然后进行压缩,以同步或异步方式生成列文件,适合写偏好的操作</li>\n</ul>\\n<p>再细分下来,Hudi 对两种类型的表,提供了不同的查询类型:</p>\n<ul>\\n<li>Snapshot Queries:快照查询,查询数据的最新快照,即全部的数据</li>\n<li>Incremental Queries:增量查询,可以查询指定时间范围内的新增或修改的数据</li>\n<li>Read Optimized Queries:读取优化查询,对MOR表来说,仅查询Parquet 文件中的数据</li>\n</ul>\\n<p>以上三种查询类型,读优化查询只能用于MOR表(其实用于 COW 也没什么意义,本来COW就只有 Parquet 文件保存数据),另外两种查询模式,可以用于 COW 表和 MOR 表。</p>\n<p>不仅于此, Hudi 还在索引、事务管理、并发、压缩等方面,使用到了很多先进的理念和技术,这也给那些想对 Hudi 表进行性能调优的用户,提供了广阔的空间和更多的手段,例如 Index,Metadata Table,Clustering 等,本文将介绍 Clustering 这一技术。</p>\n<h4><a id=\\"2Hudi_Clustering_25\\"></a><strong>2.Hudi Clustering</strong></h4>\\n<p>在数据湖/数据仓库中,关键的权衡之一是写入速度和查询性能之间的权衡。数据写入通常倾向于使用小文件来提高并行性,并使数据尽快可用于查询。但是,如果有很多小文件,查询性能会很差。此外,在写入过程中,数据通常根据到达时间写入同一位置的文件。然而,当频繁查询的数据位于同一位置时,查询引擎的性能会更好。</p>\n<p>这就对 Hudi 的数据重组提出了要求,即数据写入时使用小文件,数据查询时使用大文件。</p>\n<h5><a id=\\"21__Hudi__Clustering__31\\"></a>2.1 设定 Hudi 表的 Clustering 参数</h5>\\n<p>在文档 [RFC-19] 中, 作者创建了一个Hudi表,并设定了 Clustering的参数,然后启动了异步 Clustering Job,并对结果进行了对比。请注意,该文档创建 Hudi 表的时候,调用 getQuickstartWriteConfigs 方法来设定参数hoodie.upsert.shuffle.parallelism 为 2,对数据量较大的测试,这显然是不够的。</p>\n<p>我们来看一个不同的例子,首先生成一套 TPC-DS 测试数据,它具体包括24个表,以及用于性能测试的99个 SQL 查询语句,生成数据的具体步骤请参考:</p>\n<p><a href=\\"https://aws.amazon.com/cn/blogs/china/improve-the-cost-effectiveness-of-emr-by-using-amazon-graviton2/\\" target=\\"_blank\\">https://aws.amazon.com/cn/blogs/china/improve-the-cost-effectiveness-of-emr-by-using-amazon-graviton2/</a></p>\\n<p>创建一个 Amazon EMR 集群,版本6.5.0,硬件配置如下:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/9fbd7afbb4cd4388aee15847e5d3f881_image.png\\" alt=\\"image.png\\" /></p>\n<p>使用该集群生成一套100G的TPC-DS数据,大概需要30分钟。</p>\n<p>Amazon EMR 提供了 Hudi 组件,接下来用生成的 TPC-DS 数据,来生成一个 Hudi 表,我们选取表 store_sales,脚本如下:</p>\n<pre><code class=\\"lang-\\">spark-shell --master yarn \\\\\\n--deploy-mode client \\\\\\n--conf &quot;spark.serializer=org.apache.spark.serializer.KryoSerializer&quot; \\\\\\n--conf &quot;spark.sql.hive.convertMetastoreParquet=false&quot; \\\\\\n--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0\\n\\n\\n import org.apache.hudi.QuickstartUtils._\\n import org.apache.spark.sql.SaveMode._\\n import org.apache.hudi.DataSourceReadOptions._\\n import org.apache.hudi.DataSourceWriteOptions._\\n import org.apache.hudi.config.HoodieWriteConfig._\\n import java.util.Date\\n\\n val tableName = &quot;store_sales&quot;\\n val basePath = &quot;s3://dalei-demo/hudi/tpcds_hudi_cluster/store_sales&quot;\\n val partitionKey = &quot;ss_sold_date_sk&quot;\\n\\n val df = spark.read.format(&quot;parquet&quot;).\\n load(s&quot;s3://dalei-demo/tpcds/data10g/store_sales&quot;).\\n filter(&quot;ss_sold_time_sk is not null and ss_item_sk is not null and ss_sold_date_sk is not null and ss_customer_sk is not null&quot;).\\n withColumn(&quot;ts&quot;, lit((new Date()).getTime)).\\n repartition(1000)\\n\\n df.write.format(&quot;org.apache.hudi&quot;).\\n option(TABLE_NAME, tableName).\\n option(&quot;hoodie.datasource.write.precombine.field&quot;, &quot;ts&quot;).\\n option(&quot;hoodie.datasource.write.recordkey.field&quot;, &quot;ss_sold_time_sk, ss_item_sk&quot;).\\n option(&quot;hoodie.datasource.write.partitionpath.field&quot;, partitionKey).\\n option(&quot;hoodie.upsert.shuffle.parallelism&quot;, &quot;1000&quot;).\\n option(&quot;hoodie.datasource.write.table.type&quot;, &quot;MERGE_ON_READ&quot;).\\n option(&quot;hoodie.datasource.write.operation&quot;, &quot;upsert&quot;).\\n option(&quot;hoodie.parquet.max.file.size&quot;, &quot;10485760&quot;).\\n option(&quot;hoodie.datasource.write.hive_style_partitioning&quot;, &quot;true&quot;).\\n option(&quot;hoodie.datasource.write.keygenerator.class&quot;, &quot;org.apache.hudi.keygen.ComplexKeyGenerator&quot;).\\n option(&quot;hoodie.datasource.hive_sync.enable&quot;, &quot;true&quot;).\\n option(&quot;hoodie.datasource.hive_sync.mode&quot;, &quot;hms&quot;).\\n option(&quot;hoodie.datasource.hive_sync.database&quot;, &quot;tpcds_hudi_cluster&quot;).\\n option(&quot;hoodie.datasource.hive_sync.table&quot;, tableName).\\n option(&quot;hoodie.datasource.hive_sync.partition_fields&quot;, partitionKey).\\n option(&quot;hoodie.parquet.small.file.limit&quot;, &quot;0&quot;).\\n option(&quot;hoodie.clustering.inline&quot;, &quot;true&quot;).\\n option(&quot;hoodie.clustering.inline.max.commits&quot;, &quot;2&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.max.num.groups&quot;, &quot;10000&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.target.file.max.bytes&quot;, &quot;1073741824&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.small.file.limit&quot;, &quot;629145600&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.sort.columns&quot;, &quot;ss_customer_sk&quot;).\\n mode(Append).\\n save(basePath);\\n</code></pre>\\n<p>解释一下上面代码中用到的主要参数:</p>\n<ul>\\n<li>hoodie.upsert.shuffle.parallelism: upsert 时 shuffle 的并发数</li>\n<li>hoodie.parquet.max.file.size: 指定 Parquet 文件大小,为了对比 Clustering 前后的效果,我们希望能生成大量的小文件,所以这里限制了文件不能过大</li>\n<li>hoodie.datasource.write.keygenerator.class: 如果是复合主键,需要指定该参数的值为 org.apache.hudi.keygen.ComplexKeyGenerator</li>\n<li>hoodie.datasource.hive_sync.*: 这些参数是为了将 Hudi 表的 Schema 信息同步到 Hive MetaStore</li>\n<li>hoodie.parquet.small.file.limit: 如果小于指定的值,将被看作是小文件,Upsert 时会将小文件替换成大文件(所谓的“扩展”),而不是新生成一个文件,将该值设为 0,即关闭了小文件限制,这样每次写入数据都会生成新的文件</li>\n<li>hoodie.clustering.inline: 启用同步的 Clustering, 即 Commit 数量一旦达到,马上执行 Clustering</li>\n<li>hoodie.clustering.inline.max.commits: 多少次 Commit 之后,就开始执行Clustering</li>\n<li>hoodie.clustering.plan.strategy.max.num.groups: Clustering 将产生多少个File Group,默认30个</li>\n<li>hoodie.clustering.plan.strategy.target.file.max.bytes: Clustering 后文件大小的限制</li>\n<li>hoodie.clustering.plan.strategy.small.file.limit: 小于该值的文件,会被 Clustering</li>\n<li>hoodie.clustering.plan.strategy.sort.columns: Clustering 时,使用该字段排序</li>\n</ul>\\n<p>参数可以使用定义在 org.apache.hudi.DataSourceWriteOptions 里的常量(例如TABLE_NAME),也可以直接使用字符串(例如” hoodie.datasource.write.table.name”),效果都是一样的。</p>\n<h5><a id=\\"22__Clustering_115\\"></a>2.2 触发 Clustering</h5>\\n<p>之前的操作只是创建了Hudi表和配置了 Clustering,由于 Commit 次数不到2次(请注意看配置参数,之前的 Upsert 是1次 Commit),所以还没有触发 Clustering. 大家可以先把Commit理解为一次 Upsert操作。</p>\n<p>我们再模拟一次 Commit 操作,对 store_sales 表的一个分区的某个字段做修改,然后再Upsert到表里,代码如下:</p>\n<pre><code class=\\"lang-\\">val df1 = spark.read.format(&quot;hudi&quot;).option(&quot;hoodie.datasource.query.type&quot;, &quot;read_optimized&quot;).\\n load(&quot;s3://dalei-demo/hudi/tpcds_hudi_cluster/store_sales&quot;).\\n filter(&quot;ss_sold_date_sk=2450816&quot;).\\n drop(col(&quot;_hoodie_commit_seqno&quot;)).drop(col(&quot;_hoodie_commit_time&quot;)).\\n drop(col(&quot;_hoodie_record_key&quot;)).drop(col(&quot;_hoodie_partition_path&quot;)).\\n drop(col(&quot;_hoodie_file_name&quot;))\\n\\n val df2 = df1.withColumn(&quot;ss_ext_tax&quot;, col(&quot;ss_ext_tax&quot;) + lit(1.0))\\n\\n\\n df2.write.format(&quot;org.apache.hudi&quot;).\\n option(TABLE_NAME, tableName).\\n option(&quot;hoodie.datasource.write.precombine.field&quot;, &quot;ts&quot;).\\n option(&quot;hoodie.datasource.write.recordkey.field&quot;, &quot;ss_sold_time_sk, ss_item_sk&quot;).\\n option(&quot;hoodie.datasource.write.partitionpath.field&quot;, partitionKey).\\n option(&quot;hoodie.upsert.shuffle.parallelism&quot;, &quot;1000&quot;).\\n option(&quot;hoodie.datasource.write.table.type&quot;, &quot;MERGE_ON_READ&quot;).\\n option(&quot;hoodie.datasource.write.operation&quot;, &quot;upsert&quot;).\\n option(&quot;hoodie.parquet.max.file.size&quot;, &quot;10485760&quot;).\\n option(&quot;hoodie.datasource.write.hive_style_partitioning&quot;, &quot;true&quot;).\\n option(&quot;hoodie.datasource.write.keygenerator.class&quot;, &quot;org.apache.hudi.keygen.ComplexKeyGenerator&quot;).\\n option(&quot;hoodie.datasource.hive_sync.enable&quot;, &quot;true&quot;).\\n option(&quot;hoodie.datasource.hive_sync.mode&quot;, &quot;hms&quot;).\\n option(&quot;hoodie.datasource.hive_sync.database&quot;, &quot;tpcds_hudi_cluster&quot;).\\n option(&quot;hoodie.datasource.hive_sync.table&quot;, tableName).\\n option(&quot;hoodie.datasource.hive_sync.partition_fields&quot;, partitionKey).\\n option(&quot;hoodie.parquet.small.file.limit&quot;, &quot;0&quot;).\\n option(&quot;hoodie.clustering.inline&quot;, &quot;true&quot;).\\n option(&quot;hoodie.clustering.inline.max.commits&quot;, &quot;2&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.max.num.groups&quot;, &quot;10000&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.target.file.max.bytes&quot;, &quot;1073741824&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.small.file.limit&quot;, &quot;629145600&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.sort.columns&quot;, &quot;ss_customer_sk&quot;).\\n mode(Append).\\n save(basePath);\\n</code></pre>\\n<p>代码执行完后,Commit 次数达到2次,Clustering 已经做后台执行了。</p>\n<p><strong>2.3 解释 Clustering 操作过程</strong></p>\\n<p>在解释 Clustering 之前,先介绍一下 Hudi 表的操作文件的构成。</p>\n<h5><a id=\\"231_Hudi__165\\"></a>2.3.1 Hudi 表的操作文件</h5>\\n<p>以前面生成的 store_sales 表为例,在.hoodie 目录下,包含了该表的操作记录,如下图:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/970925a0ef4e460da9b35f1b210c37ef_image.png\\" alt=\\"image.png\\" /></p>\n<p>图1: Hudi 表的操作文件</p>\n<p>Hudi 操作的文件名,通常有三个部分组成:</p>\n<ul>\\n<li>Instant Time: 操作的时间,一个17位的时间戳(8位日期 + 9位时间,精确到毫秒)</li>\n<li>Instant Action: 操作的类型,前端执行 Upsert 时,会产生的操作类型是deltacommit; Clustering会产生的操作类型是 replacecommit</li>\n<li>Instant State: 操作的状态,requested 表示请求,inflight 表示正在进行,状态为空表示已经执行完成</li>\n</ul>\\n<p>可以把 Clustering 请求文件20220701161238291.replacecommit.requested 文件下载下来,因为它是 Avro 格式的,使用 avro-tools 来查看一下它的内容:</p>\n<pre><code class=\\"lang-\\">[ec2-user@cm ~]\$ aws s3 cp s3://dalei-demo/hudi/tpcds_hudi_cluster/store_sales/.hoodie/20220701161238291.replacecommit.requested ./\\n\\n[ec2-user@cm ~]\$ wget http://archive.apache.org/dist/avro/avro-1.9.2/java/avro-tools-1.9.2.jar\\n\\n[ec2-user@cm ~]\$ java -jar avro-tools-1.9.2.jar tojson 20220701161238291.replacecommit.requested &gt;&gt; 20220701161238291.replacecommit.requested.json\\n</code></pre>\\n<p>可以使用浏览器将文件打开,如下图:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/fe6587e282a448878c607fedf44b7de6_image.png\\" alt=\\"image.png\\" /></p>\n<p>图2: Clustering 请求文件</p>\n<p>上图中的inputGroups就是File Group,slices就是File Slice,还有File ID, 这三个概念在2.3.2里会介绍,这个文件就是发起了一个Clustering操作的请求,要把这些文件作为输入,生成更大的文件来替换它们,生成的文件也会在2.3.2里介绍。</p>\n<p>20220701161238291.replacecommit.inflight 文件的大小为0,说明 Clustering 已经立即完成了,再来看20220701161238291.replacecommit 文件,它是 json 格式的文件,可以直接打开,内容如下:</p>\n<pre><code class=\\"lang-\\">{\\n &quot;partitionToWriteStats&quot; : {\\n &quot;ss_sold_date_sk=2451080&quot; : [ {\\n &quot;fileId&quot; : &quot;91377ca5-48a9-491a-9c82-56a1ba4ba2e3-0&quot;,\\n &quot;path&quot; : &quot;ss_sold_date_sk=2451080/91377ca5-48a9-491a-9c82-56a1ba4ba2e3-0_263-1967-116065_20220701161238291.parquet&quot;,\\n &quot;prevCommit&quot; : &quot;null&quot;,\\n &quot;numWrites&quot; : 191119,\\n &quot;numDeletes&quot; : 0,\\n &quot;numUpdateWrites&quot; : 0,\\n &quot;numInserts&quot; : 191119,\\n &quot;totalWriteBytes&quot; : 11033199,\\n &quot;totalWriteErrors&quot; : 0,\\n &quot;tempPath&quot; : null,\\n &quot;partitionPath&quot; : &quot;ss_sold_date_sk=2451080&quot;,\\n &quot;totalLogRecords&quot; : 0,\\n &quot;totalLogFilesCompacted&quot; : 0,\\n &quot;totalLogSizeCompacted&quot; : 0,\\n &quot;totalUpdatedRecordsCompacted&quot; : 0,\\n &quot;totalLogBlocks&quot; : 0,\\n &quot;totalCorruptLogBlock&quot; : 0,\\n &quot;totalRollbackBlocks&quot; : 0,\\n &quot;fileSizeInBytes&quot; : 11033199,\\n &quot;minEventTime&quot; : null,\\n &quot;maxEventTime&quot; : null\\n } ],\\n ......\\n },\\n &quot;compacted&quot; : false,\\n &quot;extraMetadata&quot; : {\\n &quot;schema&quot; : &quot;{\\\\&quot;type\\\\&quot;:\\\\&quot;record\\\\&quot;,\\\\&quot;name\\\\&quot;:\\\\&quot;store_sales_record\\\\&quot;,\\\\&quot;namespace\\\\&quot;:\\\\&quot;hoodie.store_sales\\\\&quot;,\\\\&quot;fields\\\\&quot;:[{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_sold_time_sk\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_item_sk\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_customer_sk\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_cdemo_sk\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_hdemo_sk\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_addr_sk\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_store_sk\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_promo_sk\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_ticket_number\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;long\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_quantity\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_wholesale_cost\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_list_price\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_sales_price\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_ext_discount_amt\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_ext_sales_price\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_ext_wholesale_cost\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_ext_list_price\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_ext_tax\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_coupon_amt\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_net_paid\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_net_paid_inc_tax\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_net_profit\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;double\\\\&quot;],\\\\&quot;default\\\\&quot;:null},{\\\\&quot;name\\\\&quot;:\\\\&quot;ts\\\\&quot;,\\\\&quot;type\\\\&quot;:\\\\&quot;long\\\\&quot;},{\\\\&quot;name\\\\&quot;:\\\\&quot;ss_sold_date_sk\\\\&quot;,\\\\&quot;type\\\\&quot;:[\\\\&quot;null\\\\&quot;,\\\\&quot;int\\\\&quot;],\\\\&quot;default\\\\&quot;:null}]}&quot;\\n },\\n &quot;operationType&quot; : &quot;CLUSTER&quot;,\\n &quot;partitionToReplaceFileIds&quot; : {\\n &quot;ss_sold_date_sk=2451080&quot; : [ &quot;2e2bec06-78fb-4059-ad89-2914f63dd1c0-0&quot;, &quot;63fc2a2d-73e6-4261-ab30-ff44912e1696-0&quot;, &quot;fc5fd42e-0f3f-434c-aa56-ca43c36c659d-0&quot;, &quot;41299b3d-0be9-4338-bbad-6feeb41d4975-0&quot;, &quot;c23873a1-03a3-424a-aa9c-044b40f1659f-0&quot;, &quot;8af23590-4b8c-4b44-946e-0fdd73747e19-0&quot;, &quot;7d740b43-83ca-48ca-a9dc-6b8e19fce6f0-0&quot;, &quot;bc90dfd5-7323-4786-832c-4a6516332adf-0&quot;, &quot;67abd081-dfcc-45d9-8f29-50a4fb71108c-0&quot;, &quot;80bffa2b-df05-4c9f-9766-84a700403a89-0&quot;, &quot;cbba9f2a-32cd-4c73-a38b-570cbb5501e4-0&quot;, &quot;ea59e1a4-1f97-40e8-baae-3bedc5752095-0&quot;, &quot;55cffcb6-5410-4c2a-a61d-01300be50171-0&quot;, &quot;601b74b3-663d-4ef8-bf5e-158f135f81ea-0&quot;, &quot;c46e8539-418e-482d-936e-a79464d869ac-0&quot;, &quot;3dbe1997-bfc2-41a7-ac12-f302d3013c87-0&quot;, &quot;acf9be44-71a3-436f-b595-c0f322f34172-0&quot;, &quot;d7bbe517-87c7-482c-b885-a16164062b81-0&quot;, &quot;f1060ef7-ba7c-4b8e-abc3-c409cd6af7d4-0&quot; ],\\n ......\\n },\\n &quot;writePartitionPaths&quot; : [ &quot;ss_sold_date_sk=2451080&quot;, ......],\\n &quot;fileIdAndRelativePaths&quot; : {\\n &quot;742c6044-4f76-4d04-993c-d4255235d484-0&quot; : &quot;ss_sold_date_sk=2451329/742c6044-4f76-4d04-993c-d4255235d484-0_511-1967-116236_20220701161238291.parquet&quot;,\\n &quot;20dafb58-8ae7-41d6-a02d-2b529bcdcc83-0&quot; : &quot;ss_sold_date_sk=2452226/20dafb58-8ae7-41d6-a02d-2b529bcdcc83-0_1407-1967-116870_20220701161238291.parquet&quot;,\\n ......\\n },\\n &quot;totalRecordsDeleted&quot; : 0,\\n &quot;totalLogRecordsCompacted&quot; : 0,\\n &quot;totalLogFilesCompacted&quot; : 0,\\n &quot;totalCompactedRecordsUpdated&quot; : 0,\\n &quot;totalLogFilesSize&quot; : 0,\\n &quot;totalScanTime&quot; : 0,\\n &quot;totalCreateTime&quot; : 151847,\\n &quot;totalUpsertTime&quot; : 0,\\n &quot;minAndMaxEventTime&quot; : {\\n &quot;Optional.empty&quot; : {\\n &quot;val&quot; : null,\\n &quot;present&quot; : false\\n }\\n }\\n}\\n</code></pre>\\n<p>上面省略了大量重复的内容,主要的信息如下:</p>\n<ul>\\n<li>partitionToWriteStats: 列出将要被 Clustering 的分区,以及将要被 Clustering 的文件的信息</li>\n<li>extraMetadata: Hudi 表的 Schema</li>\n<li>operationType: 表示操作类型是 Clustering</li>\n<li>partitionToReplaceFileIds: 列出将要被 Clustering 的分区和文件ID</li>\n<li>fileIdAndRelativePaths: Clustering 产生的新的文件,请注意,文件名的时间戳</li>\n</ul>\\n<h5><a id=\\"232_Hudi__267\\"></a>2.3.2 Hudi 表的数据文件</h5>\\n<p>接下来介绍一下 Hudi 表的数据文件的构成,以MOR类型的表为例,如下图:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/ba558b5e714f459da94fd9b6895893ce_image.png\\" alt=\\"image.png\\" /></p>\n<p>图3: MOR 表的文件结构</p>\n<p>可以看出,文件的层级和包含关系是:Partition -&gt; File Group -&gt; File Slice -&gt; Parquet + Log, 其中:</p>\n<ul>\\n<li>Partition: 分区,大家都比较熟悉了,有的表也可以没有Partition</li>\n<li>File Group: 用于控制文件版本,同一个 File Group 里有唯一的File ID</li>\n<li>File Slice: 用于组织文件数据,同一个 File Slice 里,不但File ID要相同,Instant Time 也要相同</li>\n<li>Parquet文件是列存格式文件,Log 文件是行存文件格式,缺省值为Apache Avro, 它记录的是对同一个 File Slice 里的 Parquet 文件的修改。</li>\n</ul>\\n<p>来看一个 File Group 的例子:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/c7e0499414624263aef2956f3d7f1c16_image.png\\" alt=\\"image.png\\" /></p>\n<p>图4: FileGroup的例子</p>\n<p>在图4中,第一个文件和第二个文件的 File ID 相同,说明是同一个File Group,但是 Instant Time 不同,说明不是同一个 File Slice, 使用读优化的方式查询数据的时候,会读取 Instant Time更大的Parquet文件。</p>\n<p>来看 store_sales 表的数据文件,如下图:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/9c67673c0fb04e3fbbb4617b9e05e9ba_image.png\\" alt=\\"image.png\\" /></p>\n<p>图5: store_sales表的数据文件</p>\n<p>在图5中,标记的Log文件和Parquet文件,具有相同的 File ID 和Instant Time,表示这个 Log 文件是在 Parquet 文件基础上做的Upsert 后产生的,它们属于同一个File Slice,如果是Snapshot查询,需要把同一个 File Slice 的 Log 文件和 Parquet 的数据一起读出来。</p>\n<p>在图5中,“68c14d48-cba6-4f82-a4b5-48fadf1282f6-0_0-1967-115358_20220701161238291.parquet”文件就是 clustering 生成的文件,可以把它下载下来,使用 parquet-tool 看看它的数据,如下:</p>\n<pre><code class=\\"lang-\\">[ec2-user@cm ~]\$ wget http://logservice-resource.oss-cn-shanghai.aliyuncs.com/tools/parquet-tools-1.6.0rc3-SNAPSHOT.jar\\n\\n[ec2-user@cm ~]\$ aws s3 cp s3://dalei-demo/hudi/tpcds_hudi_cluster/store_sales/ss_sold_date_sk=2450816/68c14d48-cba6-4f82-a4b5-48fadf1282f6-0_0-1967-115358_20220701161238291.parquet ./\\n\\n[ec2-user@cm ~]\$ java -jar ./parquet-tools-1.6.0rc3-SNAPSHOT.jar head -n 10 68c14d48-cba6-4f82-a4b5-48fadf1282f6-0_0-1967-115358_20220701161238291.parquet\\n</code></pre>\\n<p>上面的命令显示了10条 Clustering 后的 Parquet 文件的数据,注意观察Sort Column (ss_customer_sk) 的值,已经是排序的了。</p>\n<p>对比Clustering前后的文件,可以看出,原来用10个1M左右的Parquet文件保存的数据,Clustering 后只有1个5.1M的 Parquet 文件。至于保存相同条数的数据,文件的总容量为什么会下降这么多,请参考Parquet的相关知识:<a href=\\"https://parquet.apache.org/\\" target=\\"_blank\\">https://parquet.apache.org/</a> .</p>\\n<h5><a id=\\"233_Clustering_312\\"></a>2.3.3 多分区表的Clustering</h5>\\n<p>默认情况下, Hudi出于对工作负载的考虑,使用 hoodie.clustering.plan.strategy.max.num.groups 这个参数 (默认值是30),规定了Clustering只会创建30个File Group. (根据文件大小的设定,目前每个分区只需创建1个File Group)</p>\n<p>如果分区比较多的话,可以通过hoodie.clustering.plan.partition.filter.mode 参数,来规划Clustering的分区范围,具体可以参考:[https://hudi.apache.org/docs/configurations/#Clustering-Configs .](https://hudi.apache.org/docs/configurations/#Clustering-Configs .)</p>\n<h4><a id=\\"3_Trino__318\\"></a><strong>3.使用 Trino 来查询数据</strong></h4>\\n<h5><a id=\\"31__320\\"></a><strong>3.1 准备其它的表</strong></h5>\\n<p>store_sales 表的数据就准备好了,类似的,我们也可以生成customer_address,customer,date_dim,item 这4个表,它们都是测试查询要用到的表。这4个表都是维度表,变化不会很频繁,所以都生成 COW 表,生成 customer_address 表的代码如下:</p>\n<pre><code class=\\"lang-\\"> val tableName = &quot;customer_address&quot;\\n val basePath = &quot;s3://dalei-demo/hudi/tpcds_hudi_cluster/customer_address&quot;\\n\\n val df = spark.read.format(&quot;parquet&quot;).\\n load(s&quot;s3://dalei-demo/tpcds/data10g/customer_address&quot;).\\n filter(&quot;ca_address_sk is not null&quot;)\\n\\n df.write.format(&quot;org.apache.hudi&quot;).\\n option(TABLE_NAME, tableName).\\n option(&quot;hoodie.datasource.write.precombine.field&quot;, &quot;ca_address_id&quot;).\\n option(&quot;hoodie.datasource.write.recordkey.field&quot;, &quot;ca_address_sk&quot;).\\n option(&quot;hoodie.upsert.shuffle.parallelism&quot;, &quot;100&quot;).\\n option(&quot;hoodie.datasource.write.table.type&quot;, &quot;COPY_ON_WRITE&quot;).\\n option(&quot;hoodie.datasource.write.operation&quot;, &quot;upsert&quot;).\\n option(&quot;hoodie.parquet.max.file.size&quot;, &quot;10485760&quot;).\\n option(&quot;hoodie.datasource.hive_sync.enable&quot;, &quot;true&quot;).\\n option(&quot;hoodie.datasource.hive_sync.mode&quot;, &quot;hms&quot;).\\n option(&quot;hoodie.datasource.hive_sync.database&quot;, &quot;tpcds_hudi_cluster&quot;).\\n option(&quot;hoodie.datasource.hive_sync.table&quot;, tableName).\\n option(&quot;hoodie.parquet.small.file.limit&quot;, &quot;0&quot;).\\n option(&quot;hoodie.clustering.inline&quot;, &quot;true&quot;).\\n option(&quot;hoodie.clustering.inline.max.commits&quot;, &quot;2&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.target.file.max.bytes&quot;, &quot;1073741824&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.small.file.limit&quot;, &quot;629145600&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.sort.columns&quot;, &quot;&quot;).\\n mode(Append).\\n save(basePath);\\n</code></pre>\\n<p>触发 Clustering 的代码如下:</p>\n<pre><code class=\\"lang-\\">val df1 = spark.read.format(&quot;hudi&quot;).option(&quot;hoodie.datasource.query.type&quot;, &quot;read_optimized&quot;).\\n load(&quot;s3://dalei-demo/hudi/tpcds_hudi_cluster/customer_address&quot;)\\n val df2 = df1.withColumn(&quot;ca_gmt_offset&quot;, col(&quot;ca_gmt_offset&quot;) + lit(1.1))\\n\\n df2.write.format(&quot;org.apache.hudi&quot;).\\n option(TABLE_NAME, tableName).\\n option(&quot;hoodie.datasource.write.precombine.field&quot;, &quot;ca_address_id&quot;).\\n option(&quot;hoodie.datasource.write.recordkey.field&quot;, &quot;ca_address_sk&quot;).\\n option(&quot;hoodie.upsert.shuffle.parallelism&quot;, &quot;100&quot;).\\n option(&quot;hoodie.datasource.write.table.type&quot;, &quot;COPY_ON_WRITE&quot;).\\n option(&quot;hoodie.datasource.write.operation&quot;, &quot;upsert&quot;).\\n option(&quot;hoodie.parquet.max.file.size&quot;, &quot;10485760&quot;).\\n option(&quot;hoodie.datasource.hive_sync.enable&quot;, &quot;true&quot;).\\n option(&quot;hoodie.datasource.hive_sync.mode&quot;, &quot;hms&quot;).\\n option(&quot;hoodie.datasource.hive_sync.database&quot;, &quot;tpcds_hudi_cluster&quot;).\\n option(&quot;hoodie.datasource.hive_sync.table&quot;, tableName).\\n option(&quot;hoodie.parquet.small.file.limit&quot;, &quot;0&quot;).\\n option(&quot;hoodie.clustering.inline&quot;, &quot;true&quot;).\\n option(&quot;hoodie.clustering.inline.max.commits&quot;, &quot;2&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.target.file.max.bytes&quot;, &quot;1073741824&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.small.file.limit&quot;, &quot;629145600&quot;).\\n option(&quot;hoodie.clustering.plan.strategy.sort.columns&quot;, &quot;&quot;).\\n mode(Append).\\n save(basePath);\\n</code></pre>\\n<p>其余三个表的生成语句,跟 customer_address 表类似,大家可以试着生成一下。</p>\n<p>为了对比,我们还要生成一组同命名的没有使用 Clustering 的表,可以把这两组表,分别放到不同的Hive Database里,例如tpcds_hudi_cluster 和 pcds_hudi_nocluster. 生成没有Clustering 表的脚本,跟生成 Clustering 表的脚本类似,只要把Clustering 相关的参数拿掉就可以了。</p>\n<h5><a id=\\"32__387\\"></a><strong>3.2 查询</strong></h5>\\n<p>Amazon EMR 6.5.0里提供了 Trino360, 我们使用它来测试Hudi表的SQL查询性能。启动命令如下:<br />\\n/usr/lib/trino/bin/trino-cli-360-executable –server localhost:8889 –catalog hive –schema tpcds_hudi_cluster</p>\n<p>如果按照2.1中生成了 TPC-DS 测试数据,就会看到一起生成的用于测试的查询语句,我们用 q6.sql 来测试,脚本如下:</p>\n<pre><code class=\\"lang-\\">--q6.sql--\\n\\nSELECT state, cnt FROM (\\n SELECT a.ca_state state, count(*) cnt\\n FROM\\n customer_address a, customer c, store_sales_ro s, date_dim d, item i\\n WHERE a.ca_address_sk = c.c_current_addr_sk\\n AND c.c_customer_sk = s.ss_customer_sk\\n AND s.ss_sold_date_sk = d.d_date_sk\\n AND s.ss_item_sk = i.i_item_sk\\n AND d.d_month_seq =\\n (SELECT distinct (d_month_seq) FROM date_dim\\n WHERE d_year = 2001 AND d_moy = 1)\\n AND i.i_current_price &gt; 1.2 *\\n (SELECT avg(j.i_current_price) FROM item j\\n WHERE j.i_category = i.i_category)\\n GROUP BY a.ca_state\\n) x\\nWHERE cnt &gt;= 10\\nORDER BY cnt LIMIT 100\\n</code></pre>\\n<p>对没有使用Clustering的Hudi表的查询如下:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/11cb4b4c7896459cb4388311391a922c_image.png\\" alt=\\"image.png\\" /></p>\n<p>图6: 没有Clustering的Hudi表的查询</p>\n<p>对使用Clustering的Hudi表的查询如下:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/6cc2141693394933b618c7da6e5e736c_image.png\\" alt=\\"image.png\\" /></p>\n<p>图7: 使用Clustering的Hudi表查询</p>\n<p>可以看出:使用了 Clustering 的 Hudi 表,对比没有使用Clustering 的 Hudi 表,查询性能提升了35.4% , 读取记录数相同,读取文件容量大幅下降了。</p>\n<h4><a id=\\"4Clustering_431\\"></a><strong>4.对于使用Clustering的一些建议</strong></h4>\\n<h5><a id=\\"41_Upsert_433\\"></a>4.1 对Upsert的影响</h5>\\n<p>当执行 Clustering 的时候,对 File Group 实现的是 Snapshot Isolation 级别的隔离,所以对他们的修改是不允许对,也就是说,如果有 Upsert 和 Compaction 操作(MOR表),都要等 Clustering 结束之后</p>\n<h5><a id=\\"42__437\\"></a>4.2 对负载的考虑</h5>\\n<p>如果需要 Clustering 的表数据量比较大,分区比较多的话,做一次Clustering 也会产生大量的负载,所以对 Clustering 的范围,Hudi提供了多种选择。对既要高并发写,又要高吞吐读对表,可以在高并发写的波谷时段,例如夜间进行 Clustering</p>\n<h5><a id=\\"43__or__441\\"></a>4.3 同步 or 异步</h5>\\n<p>udi提供了两种 Clustering 方式,同步和异步。在对Hudi表高并发写的时候,不建议使用同步 Clustering,可以参考 [RFC-19] 中的方法,使用命令进行异步 Clustering.</p>\n<h5><a id=\\"44_Sort_Column_445\\"></a>4.4 要不要选择Sort Column</h5>\\n<p>如果有些字段经常要用来做 Join,而且该字段的值又可以保证非空的情况下,是可以放到 Sort Column 里的,如果 Clustering 之后还有多个文件,Sort Column 有助于确认每个文件里该字段的范围,可以避免过多的文件读取,提升 Join 操作的性能。原理上有点类似Hive Clustering,请参考:https://www.upgrad.com/blog/bucketing-in-hive/ .</p>\n<p>有兴趣的朋友可以对比一下,选不选择Sort Column在Join查询性能上的差别。</p>\n<h5><a id=\\"45_Clustering__451\\"></a>4.5 Clustering 等同于大文件吗?</h5>\\n<p>有人会说,Clustering 就是把小文件合并成大文件,那创建Hudi表的时候,我直接选择大文件不就可以了吗?如果只是考虑读性能,这么做确实可以。但 Clustering 提供了更多的选择,对于那种有时候高并发写(适合小文件),有时候高吞吐读(适合大文件)的表,就很适合用Clustering 了。</p>\n<h5><a id=\\"46__455\\"></a>4.6 增量查询</h5>\\n<p>目前 Hudi 版本0.10下,Clustering 对增量查询的支持不是很好,Clustering 后的数据,都会被认为是“新”数据,也会出现在增量查询的结果中,然后这并不是我们期望的,因为本来没有对数据做任何改变,只是从小文件重写到大文件中,就被认为是增量数据了。所以依赖增量查询的表,不推荐使用 Clustering.</p>\n<h5><a id=\\"47_Clustering_459\\"></a>4.7 什么时间指定Clustering?</h5>\\n<p>可以在任何需要 Clustering 的时候,指定 Clustering 的相关配置,不是只能在创建 Hudi 表的时候指定 Clustering. 也就是说,对于任意的 Hudi 表,如果发现产生了大量的小文件,如果其他条件符合(没有高并发写、不依赖增量查询等),可以随时指定 Clustering.</p>\n<h4><a id=\\"_463\\"></a>参考文档</h4>\\n<p><a href=\\"https://aws.amazon.com/cn/blogs/china/improve-the-cost-effectiveness-of-emr-by-using-amazon-graviton2/\\" target=\\"_blank\\">https://aws.amazon.com/cn/blogs/china/improve-the-cost-effectiveness-of-emr-by-using-amazon-graviton2/</a></p>\\n<p><a href=\\"https://hudi.apache.org/docs/clustering/#clustering-architecture\\" target=\\"_blank\\">https://hudi.apache.org/docs/clustering/#clustering-architecture</a></p>\\n<p><a href=\\"https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance\\" target=\\"_blank\\">https://cwiki.apache.org/confluence/display/HUDI/RFC±+19+Clustering+data+for+freshness+and+query+performance</a></p>\\n<p><a href=\\"https://parquet.apache.org/\\" target=\\"_blank\\">https://parquet.apache.org/</a></p>\\n<p><a href=\\"https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi.html\\" target=\\"_blank\\">https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi.html</a></p>\\n<p><a href=\\"https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-presto.html\\" target=\\"_blank\\">https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-presto.html</a></p>\\n<p><a href=\\"https://www.upgrad.com/blog/bucketing-in-hive/\\" target=\\"_blank\\">https://www.upgrad.com/blog/bucketing-in-hive/</a></p>\\n<h4><a id=\\"_479\\"></a>本篇作者</h4>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/ff8adbd3ca3c4a5bac7db1674181d541_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Dalei_Xu_483\\"></a><strong>Dalei Xu</strong></h4>\\n<p>Amazon 数据分析产品技术专家,负责 Amazon 数据分析的解决方案的咨询和架构设计。多年从事一线开发,在数据开发、架构设计、性能优化和组件管理方面积累了丰富的经验,希望能将 Amazon 优秀的服务组件,推广给更多的企业用户,实现与客户的双赢和共同成长。</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭