<!--StartFragment-->
### **现状**
在实际应用场景中,客户的大数据部门会接到基于不同业务场景的数据分析需求,而大数据部门的痛点常常出现在,这些需求背后的数据往往来自一个个数据孤岛,并没有通过有效的方式打通。数据孤岛的产生可能来源于不同的数据摄入/获取方式,当业务规模不断扩展,业务部分需要不同环节的数据联合起来所产生的分析结果来进一步做业务决策,此时便是数据湖发挥其优势的时刻。数据湖可让组织将所有结构化和非结构化数据存储在一个集中式存储库中,可解决处理海量异构数据的难题。Amazon Glue 是一项完全托管的 ETL(提取、转换和加载)服务,使您能够轻松而经济高效地对数据进行分类、清理和扩充,并在各种数据存储和数据流之间可靠地移动数据。此文记录了两类典型数据类型注入以 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 构建的数据湖的过程实战,帮助 Apache Spark 应用程序和 Glue ETL 作业的开发人员、大数据架构师、数据工程师自动扩展在 Amazon Glue 上运行的数据处理作业的最佳实践。
### **概念简介**
#### **Glue Data Catalog**
Amazon Glue Data Catalog 是您在亚马逊云科技云中的持久性技术元数据存储,与 Apache Hive Metastore 兼容。
#### **Glue Crawler**
Amazon Glue 还能让您设置爬网程序,它可以扫描所有类型的存储库中的数据,对其进行分类,从中提取架构信息,并自动在 Amazon Glue Data Catalog 中存储元数据。
#### **Glue ETL Jobs**
Amazon Glue 中有三种类型的作业:Spark、Streaming ETL 和 Python shell,本文以 Spark 任务为例,以 PySpark 为代码语言。Spark 任务会在由 Amazon Glue 托管的 Apache Spark 环境中执行。
#### **DynamicFrame**
DynamicFrame 类似于 Apache Spark SQL DataFrame ,后者是用于将数据组织到行和列中的数据抽象,不同之处在于每条记录都是自描述的,因此初始化时并不需要任何 schema。借助 DynamicFrame,您可以获得架构灵活性和一组专为 DynamicFrame 设计的 Function。
### **解决方案架构总览**
[![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s31.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s31.jpg)
### **方案实战**
#### **任务1:在 Glue Job 中通过 Apache Spark DataFrame 将 S3 中的 JSON 格式数据以 Parquet 格式注入 S3 数据湖**
透过开源的 Spark DataFrame read function 直接指定 s3 路径来做转换。源数据是 JSON 格式(含有 Nested JSON)存储在 S3 上,其中 S3 prefix 按年、月、日、小时划分文件夹,业务按分钟级别生成 log.gz 文件,示例: s3://xxxdatalake/json/2022/08/01/01/xxxx.log.gz。
目标数据是按天分区,存储文件格式为 Parquet,存放在 S3 上。
##### **1.1循环读写,Glue ETL Jobs 代码片段示例和解析:**
从源数据的天目录下循环读取每个小时下的多个 log.gz 文件到 DataFrame, 写到路径为‘dt=年-月-日’路径下存成 parquet 格式:
```python
df = spark.read.option("recursiveFileLookup", "true").json(
path='s3://xxxdatalake/json/' + p_year + '/' + p_month + '/'+ p_day + '/',
schema=datalake_json_schema
)
df.repartition(1).write.mode('overwrite').parquet(
path='s3://xxxdatalake/json/dt=' + p_year + '-' + p_month + '-' + p_day + '/'
)
```
```
其中repartition()会重新整理DataFrame使用到的Spark Partition(与前面提到的"按天分区"的分区概念不同),每一个Spark Partition在输出的时候会对应到一个文件,当写入磁盘时,它会在单个目录中创建part files。若使用者发现输出文件单个文件过小,导致单个路径下的文件数量过多,可选择降低repartition的数量。
```
##### **1.2处理 Nested JSON**
Glue ETL Jobs 也可以像 Apache Spark DataFrame 自行定义 schema,来确保 schema 是预期的结构,使用者可根据数据原有格式定义嵌套字段,如’mlbTeam’下含有嵌套字段:
```python
datalake_json_schema = StructType(
fields=[
StructField('mlbId', StringType()),
StructField('mlbName', StringType()),
StructField('brefId', StringType()),
StructField('cbsId', StringType()),
StructField('mlbTeam', StructType([
StructField('nfbcId', StringType()),
StructField('yahooId', StringType())
…
```
##### **1.3 注意输出的 struct 字段的内容**
当数据按照1.1的方式写出成 parquet 格式文件时,后续在使用 Glue Crawler 来进行表结构的爬取,嵌套部分默认会在对应字段下以 struct 的格式呈现,struct 格式下如有特殊字符如$,会在 Athena Query 时出现报错,可按需确认相关字段是否为必要字段来做丢弃或者改写。
##### **1.4 处理 Glue Job ConcurrenRunExceededException**
[![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s32.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s32.jpg)
在测试以及使用 Glue Studio 创建的 Job 时,默认会 retry 3次,Maximum Concurrency 为1,当我们看到任务失败的时候常常会立刻对 Job script 做修改后再点击一次运行,此时上次的失败的任务还在重试,便会遇到 ConcurrenRunExceededException。我们可以透过增加 Maximum Concurrency
[![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s33.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s33.jpg)
或者将 number of retry 调整成0 来避免此报错:
**![using-aws-glue-to-build-a-data-lake-on-amazon-s34.png](https://dev-media.amazoncloud.cn/f7cafd2186fa4a1ba504e116aab7749b_using-aws-glue-to-build-a-data-lake-on-amazon-s34.png "using-aws-glue-to-build-a-data-lake-on-amazon-s34.png")**
#### **任务2: : Glue Job 结合 Glue Data Catalog 将 S3 中的 JSON 格式数据以 Parquet 格式注入 S3 数据湖**
在日期多层结构的处理上,对比任务1中的循环读取方式,任务2中我们先使用了 Glue Data Catalog 来定义好分区信息,就可以在读取的时候一次读取需要的分区,并且后续的 DataFrame 也会有分区信息。
##### **2.1 使用 Glue Crawler 创建表后在 Glue ETL Job 使用 Spark sql 读取**
对于相同格式的源数据(s3://xxxdatalake/json/2022/08/01/01/xxxx.log.gz)创建 Glue Crawler,针对路径 s3://xxxdatalake/json/ 做爬取后会在 Glue Data Catalog 中产出一张表(名为 json),此时读取到的数据会有分区信息: 年,月,日,小时,分别映射为 partition\_0, partition\_1, partition\_2,partition\_3,接下来从 Glue ETL Jobs 中从 Glue Data Catalog 来读取数据,:
```python
df = spark.sql("select * from `demodatabase`.`json`")
```
当数据量较大时,程序里如果需要一次弄数月资料的话, 可以改成写出到 S3 的时候用 partition by,并以分区信息当作键値。此时分区信息在 DataFrame 中的字段名称是 partition\_0, partition\_1, partition\_2:
```python
df.write.partitionBy("partition_0"," partition_1"," partition_2")
.mode(SaveMode.Append)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName)
```
注意,如果原始路径是 Hive Style Naming Conventions 格式,如 (s3://xxxdatalake/json/year=2022/month=08/day=01/), Crawler 的分区字段就从 partition\_0, partition\_1, partition\_2 变成 year, month, day。
##### **2.2 Glue Crawler 爬虫程序爬出表结构后的 timestamp 字段处理**
Glue Crawler 遇到时间类型的数据时通常会判断为 String 或者数字类型。若是此类格式的 “2018-08-15 22:03:25.296” String 类型,可以在 Athena 做 Query 直接(使用 > 或者<)比较大小,但是无法用更多的时间类型的函数,有时原始数据为 Epoch Time, 则在后续 Query 时,将时间通过 epoch 和 human-readable data 做转换后进行比较大小。
当然,进阶做法是在 Glue Job 中将字段转换成 timestamp 类型以便在后续的 Athena Query 中充分利用时间函数。
#### **任务3: 通过 JDBC 连接将数据注入 S3 数据湖**
##### **3.1 构建网络连通性**
客户侧通过 JDBC 连接获取的数据一般以 [Amazon RDS](https://aws.amazon.com/cn/rds/?trk=cndc-detail) 或者在 [Amazon EC2 ](https://aws.amazon.com/cn/ec2/?trk=cndc-detail)自建关系型数据库或者 MPP 数据库为主。为保证 Glue ETL Job 后续的连通性,此处需要先行在 Amazon Glue Studio – Connectors 处创建 Connection,为后续 Glue Job 绑定 VPC 和安全组资源,以便 JDBC 数据源侧可以配置从安全组允许对应目标的访问。此 Connection 也会用在3.2的爬网程序中。
[![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s35.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s35.jpg)[![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s36.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s36.jpg)[![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s37.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s37.jpg)
此后3.3创建 Glue Job 时,可在 Jobs Details – Connections 处选择上述定义好的 Connections。
[![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s38.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s38.jpg)
##### **3.2 Glue Crawler 创建爬网程序获取 JDBC 数据表结构**
选择数据源的位置,勾选3.1设置的 Connection,其他步骤和创建 Glue Crawler 爬网程序的步骤一致。
[![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s39.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s39.jpg)
运行 Glue Crawler 爬网程序之后,在定义好的 Glue Data Catalog 的 Table 中会出现 JDBC 数据源的表结构:
[![](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s310.png)](https://s3.cn-north-1.amazonaws.com.cn/awschinablog/using-aws-glue-to-build-a-data-lake-on-amazon-s310.jpg)
##### **3.3 Glue ETL Job 将 JDBC 数据转换为 Parquet 数据写入数据湖代码片段和解析**
有别于前两个范例,这个任务展示如何用 Glue DynamicFrame 操作。基于 Glue Data Catalog 中存储的库和表名,创建 DynamicFrame,再创建 TempView:
```python
df1 = glueContext.create_dynamic_frame_from_catalog(
database='default',
table_name='jdbcdwd_mlb_data',
transformation_ctx='df1'
).toDF()
df2 = glueContext.create_dynamic_frame_from_catalog(
database='default',
table_name='jdbcdwd_mlb_data2',
transformation_ctx='df2'
).toDF()
df1.createOrReplaceTempView('default_jdbcdwd_mlb_data')
df2.createOrReplaceTempView('default_jdbcdwd_mlb_data2')
```
创建出来的 TempView,就可以直接使用 SQL 语句进行数据处理了:
```python
sql = '''
select
t1.*,
t2.espn_name,
t2.espn_id,
t2.mlb_name
from
default_jdbcdwd_mlb_data t1
left join default_jdbcdwd_mlb_data2 t2
on
t1.mlb_id = t2.mlb_id;
'''
```
将通过 SQL Query 出来的目标数据以 Parquet 格式写到 S3 中:
```powershell
sql_df = spark.sql(sql)
sql_df.repartition(10).write.mode('overwrite').parquet(
path='s3://xxxglue-datalake/JDBC/'
)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job.commit()
```
##### **3.4读取 JDBC 数据慢或者 Out of memory 问题**
在使用 JDBC 读取 [Amazon RDS](https://aws.amazon.com/cn/rds/?trk=cndc-detail) 等数据库来源的时候,最常见的问题是读取数据源太久或者读进来之后马上 OutOfMemory。这两者其实是类似的问题,因为读取的时候平行度不够,单一 Spark task 处理太多数据。可以透过指定平行度处理如下:
```python
df1 = glueContext.create_dynamic_frame_from_catalog(
database='default',
table_name='jdbcdwd_mlb_data',
transformation_ctx='df1',
additional_options = {"hashfield": "id", 'hashpartitions': '20' },
).toDF()
```
将 hashfield 设置为 JDBC 表中用于将数据划分为分区的列的名称,此列可以是任何数据类型。 Amazon Glue 生成非重叠查询,这些查询并行运行以读取此列分区的数据。将 hashpartitions 设置为 JDBC 表的并行读取数。
### **总结**
此文整体采用[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)的架构,利用 Amazon Glue 加载并转换应用日志和 JDBC 数据源,并以目标格式写到以 S3 构建的数据湖中,该技术可以有效的打通因为不同摄入/获取数据方式形成的数据孤岛,以数据为基石更好的帮助业务部门做业务决策。
<!--EndFragment-->