Feast on Amazon 解决方案

0
0
{"value":"\n\n#### **背景&引言**\n\n\n众所周知,AI 算法模型开发落地有三个主要阶段:数据准备、模型训练、模型部署。目前已经有较多厂商及开源社区推出通用的 AI、MLOps 平台支撑模型训练与部署阶段,但主要偏重于[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)模型开发,部署,服务层面,自 2019 年后才陆续有各厂商推出数据准备支撑阶段的产品及服务,即特征平台(如 [Amazon Sagemaker feature Store](https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html))。\n\n特征平台的主要能力包含:特征注册中心、离线存储&消费、在线存储&消费、离线&在线特征同步,特征版本,尤其特征版本最为重要,实现特征 point-in-time cross join,避免特征穿越造成 train-server skew 的重要功能特性。\n\n各个厂商在特征平台的架构和实现方式方面迥然不同,缺乏跨平台的通用的特征库方案。\n\nFeast (**Fea**ture **St**ore) 是一套开源特征库框架,纯 python 框架,与 Pandas dataframe 无缝集成,对 ML,AI 算法工程师友好,它提供了在线,离线特征库注册,特征库存储,特征数据摄取、训练数据检索、特征版本、离线-在线特征同步等功能;且具有云原生亲和力,可以构建在多个公有云平台上。\n\n本文介绍了 Feast 框架的整体架构及设计思路,并 step by step 详细说明了 Feast on Amazon 集成和使用,包括安装部署离线/在线特征库、使用特征库、特征库同步的方法等。对于使用 Feast 开源框架构建 MLOps 平台的用户,本文可以作为快速构建和开发指南。\n\n\n#### **Feast 整体架构**\n\n\n![image.png](https://dev-media.amazoncloud.cn/e9c40747242947c4818e05e014ee05de_image.png)\n\nFeast的主要功能组件:\n\n- **Feast Repo&Registry**:轻量级的目录级及 Split 文件数据库格式 Repository,用于特征库基础设施及元数据注册\n- **Feast Python SDK/CLI**: 开发构建及使用特征库的主要功能组件\n - **Feast Apply**:命令行工具执行安装部署配置的特征库到底层基础设施,并且注册特征库元数据到 Runtime 运行态\n - **Feast Materialize**:离线-在线特征库版本同步工具\n - **Get Online Features**:在线特征数据提取,调用对应的在线特征库基础设施 API 抽取特征数据,用于模型推断\n - **Get Historical Features**:离线特征数据抽取,调用对应的离线特征库基础设施 API 抽取历史特征数据,用于模型训练或者特征组合\n- **Online Store**: 在线特征库,根据不同云厂商的 nosql 数据库承载,存储特征快照版本数据\n- **Offline Store**:离线特征库,根据不同云厂商数仓承载,存储特征历史版本数据\n\n\n#### **Feast On Amazon 安装部署方案**\n\n\n##### **依赖准备**\n\n\n- Feast on Amazon 使用 Redshift 作为离线特征库,需要 Redshift 集群(如果采用 Spectrum 外部表,还需要 Spectrum 角色及 Glue Catalog 权限)\n- Feast on Amazon 使用 DynamoDB 作为在线特征库,需要 DynamoDB 读写权限\n- 可以用 Terraform 或者 CloudFormation 准备需要的 Redshift,DDB,IAM 角色等\n- 以下使用 Terraform 为例安装部署 Feast 需要的 Redshift,S3,IAM 角色等各种基础设施\n\n1) 安装部署 Terraform\n\n```\\nsudo yum install python3-devel\\nsudo yum install -y yum-utils\\nsudo yum-config-manager —add-repo https://rpm.releases.hashicorp.com/AmazonLinux/hashicorp.repo\\nsudo yum -y install terraform\\n```\n\n2) 编写 Terraform 配置文件\n\n```\\nproject: feast_aws_repo\\nregistry: data/registry.db\\nprovider: aws\\nonline_store:\\n type: dynamodb\\n region: ap-southeast-1\\noffline_store:\\n type: redshift\\n cluster_id: feast-demo2-redshift-cluster\\n region: ap-southeast-1\\n database: flinkstreamdb\\n user: awsuser\\n s3_staging_location: s3://feastdemobucket\\n iam_role: arn:aws:iam::**********:role/s3_spectrum_role\\n```\n\n3) 构建基础设施\n\n```\\ncd infra\\nsudo terraform init\\nsudo terraform plan -var=\\"admin_password=xxxxx\\"\\nsudo terraform apply -var=\\"admin_password=xxxxx\\"\\n```\n\n4) 如果需要 Spectrum 承载离线特征库,需要在 Redshift 中建立 Spectrum 外部 schema,以便指向Glue Catalog 中的 s3 外部表\n\n```\\naws redshift-data execute-statement \\\\\\n —region ap-southeast-1 \\\\\\n —cluster-identifier feast-demo-redshift-cluster \\\\\\n —db-user awsuser \\\\\\n —database dev —sql \\"create external schema spectrum from data catalog database 'flinkstreamdb' iam_role \\\\\\n 'arn:aws:iam::**********:role/s3_spectrum_role' create external database if not exists;“\\n```\n\n\n##### **Feast 特征库 Repository 准备**\n\n\n1) 依赖安装及升级\n\n```\\npip3 install -U numpy==1.21\\npip3 install feast[aws]\\n```\n\n2) 初始化 repository\n\n```\\nfeast init -t xxxxx(repository_name)\\nAWS Region (e.g. us-west-2): ap-southeast-1\\nRedshift Cluster ID: feast-demo-redshift-cluster\\nRedshift Database Name: flinkstreamdb\\nRedshift User Name: awsuser\\nRedshift S3 Staging Location (s3://*): s3://feastdemobucket\\nRedshift IAM Role for S3 (arn:aws:iam::*:role/*): arn:aws:iam::xxxxxx:role/s3_spectrum_role\\n```\n\n创建好的特征库的 schema 及骨架示例:\n\n```\\n\$ tree ./feast_aws_repo/\\n./feast_aws_repo/\\n├── data\\n│ └── registry.db\\n├── driver_repo.py\\n├── feature_store.yaml\\n```\n\n- *.yam l 配置指定 Feast repository 的基础环境资源(s3、Redshift、DDB 等)\n- *.py 配置特征库元数据,特征 view 及 schema 等\n- db 保存基于 *.py 元数据构建后的特征组,特征库对象实例,以便运行态使用\n\n安装部署后的 feature_store.yaml 示例:\n\n```\\nproject: feast_aws_repo\\nregistry: data/registry.db\\nprovider: aws\\nonline_store:\\n type: dynamodb\\n region: ap-southeast-1\\noffline_store:\\n type: redshift\\n cluster_id: feast-demo2-redshift-cluster\\n region: ap-southeast-1\\n database: flinkstreamdb\\n user: awsuser\\n s3_staging_location: s3://feastdemobucket\\n iam_role: arn:aws:iam::xxxxxxx:role/s3_spectrum_role\\n```\n\ndriver_repo 的司机行程特征库元数据示例:\n\n```\\nfrom datetime import timedelta\\nfrom feast import Entity, Feature, FeatureView, RedshiftSource, ValueType\\ndriver = Entity(\\n name=\\"driver_id\\",\\n join_key=\\"driver_id\\",\\n value_type=ValueType.INT64,\\n)\\ndriver_stats_source = RedshiftSource(\\n table=\\"feast_driver_hourly_stats\\",\\n event_timestamp_column=\\"event_timestamp\\",\\n created_timestamp_column=\\"created\\",\\n)\\n\\ndriver_stats_fv = FeatureView(\\n name=\\"driver_hourly_stats\\",\\n entities=[\\"driver_id\\"],\\n ttl=timedelta(weeks=52),\\n features=[\\n Feature(name=\\"conv_rate\\", dtype=ValueType.FLOAT),\\n Feature(name=\\"acc_rate\\", dtype=ValueType.FLOAT),\\n Feature(name=\\"avg_daily_trips\\", dtype=ValueType.INT64),\\n ],\\n batch_source=driver_stats_source,\\n tags={\\"team\\": \\"driver_performance\\"},\\n)\\n```\n\n部署成功后可以在 Redshift 看到离线特征库的 Spectuam schema 及库表,DDB 中可以看到在线特征库的表\n\nRedshift 离线特征库:\n\n![image.png](https://dev-media.amazoncloud.cn/09ed41c597d64ff38457a91c27ba0fa5_image.png)\n\nDDB 在线特征库:\n\n![image.png](https://dev-media.amazoncloud.cn/e4b743025fbc49d0a2204d362c478a31_image.png)\n\n##### **使用Feast SDK API进行特征库操作**\n\n\n###### **连接特征库**\n\n\n安装部署完成后,在 python 代码中,可以方便的通过加载注册的 repository 路径,来连接到特征库及特征组\n\n在 repository 中注册的特征组,也可以直接 import 实例化\n\n```\\nfrom datetime import datetime, timedelta\\nimport pandas as pd\\nfrom feast import FeatureStore\\nfrom driver_repo import driver, driver_stats_fv\\nfs = FeatureStore(repo_path=\\"./\\")\\n>>> print(fs)\\n<feast.feature_store.FeatureStore object at 0x7f48d47098d0>\\n>>> print(driver_stats_fv)\\n{\\n \\"spec\\": {\\n \\"name\\": \\"driver_hourly_stats\\",\\n \\"entities\\": [\\n \\"driver_id\\"\\n ],\\n \\"features\\": [\\n {\\n \\"name\\": \\"conv_rate\\",\\n \\"valueType\\": \\"FLOAT\\"\\n },\\n {\\n \\"name\\": \\"acc_rate\\",\\n \\"valueType\\": \\"FLOAT\\"\\n },\\n {\\n \\"name\\": \\"avg_daily_trips\\",\\n \\"valueType\\": \\"INT64\\"\\n }\\n ],\\n \\"tags\\": {\\n \\"team\\": \\"driver_performance\\"\\n },\\n \\"ttl\\": \\"31449600s\\",\\n \\"batchSource\\": {\\n \\"type\\": \\"BATCH_REDSHIFT\\",\\n \\"eventTimestampColumn\\": \\"event_timestamp\\",\\n \\"createdTimestampColumn\\": \\"created\\",\\n \\"redshiftOptions\\": {\\n \\"table\\": \\"feast_driver_hourly_stats\\"\\n },\\n \\"dataSourceClassType\\": \\"feast.infra.offline_stores.redshift_source.RedshiftSource\\"\\n },\\n \\"online\\": true\\n },\\n \\"meta\\": {}\\n}\\n```\n\n\n###### **离线特征数据提取**\n\n\n通过 Feast get_historical_features API,可以抽取离线特征库数据用于离线训练或特征组合\n\n```\\nfeatures = [\\"driver_hourly_stats:conv_rate\\", \\"driver_hourly_stats:acc_rate\\"]\\nentity_df = pd.DataFrame(\\n {\\n \\"event_timestamp\\": [\\n pd.Timestamp(dt, unit=\\"ms\\", tz=\\"UTC\\").round(\\"ms\\")\\n for dt in pd.date_range(\\n start=datetime.now() - timedelta(days=3),\\n end=datetime.now(),\\n periods=3,\\n )\\n ],\\n \\"driver_id\\": [1001, 1002, 1003],\\n }\\n )\\n training_df = fs.get_historical_features(\\n features=features, entity_df=entity_df\\n ).to_df()\\n```\n\n如上我们抽取特征标识(entity 字段为 driver_id)为 1001,1002,1003, 时间版本为最近 3 天的离线特征库数据\n\n```\\n>>> training_df\\n event_timestamp driver_id conv_rate acc_rate\\n0 2022-07-04 02:33:54.114 1001 0.036082 0.707744\\n1 2022-07-05 14:33:54.114 1002 0.522306 0.983233\\n2 2022-07-07 02:33:54.114 1003 0.734294 0.034062\\n```\n\n\n###### **离线特征组合**\n\n\n多个特征组需要联合并抽取作为模型训练时,get_historical_features 可以指定多个特征 view 的 features,基于 event_timestamp 做 point-in-time 关联,从而得到同一时间版本的离线特征组合的数据\n\n ```\\nfeast_features = [\\n \\"zipcode_features:city\\",\\n \\"zipcode_features:state\\",\\n \\"zipcode_features:location_type\\",\\n \\"zipcode_features:tax_returns_filed\\",\\n \\"zipcode_features:population\\",\\n \\"zipcode_features:total_wages\\",\\n \\"credit_history:credit_card_due\\",\\n \\"credit_history:mortgage_due\\",\\n \\"credit_history:student_loan_due\\",\\n \\"credit_history:vehicle_loan_due\\",\\n \\"credit_history:hard_pulls\\",\\n \\"credit_history:missed_payments_2y\\",\\n \\"credit_history:missed_payments_1y\\",\\n \\"credit_history:missed_payments_6m\\",\\n \\"credit_history:bankruptcies\\",\\n ]\\ntraining_df = self.fs.get_historical_features(\\n entity_df=entity_df, features=feast_features\\n).to_df()\\n```\n\n如上代码示例,在抽取离线特征时,关联了 credit_history 和 zipcode_features 两个离线特征库的相应特征字段,Feast 会在后台拼接 Redshift Sql 关联对应的库表及 event_timestamp 等条件\n\n\n###### **离线特征数据同步在线特征库**\n\n\n通过Feast 提供的 ```materialize cli```,可以将指定时间版本的 Redshift 离线特征数据同步到 DynamoDB 的在线特征库中\n\nmaterialize-incremental cli 会记录该 repository 特征库下每次同步的增量时间版本,因此每次执行会把自上次执行至今的最新数据增量同步到 DynamoDB\n\n```\\nCURRENT_TIME=\$(date -u +\\"%Y-%m-%dT%H:%M:%S\\")\\nfeast materialize-incremental \$CURRENT_TIME\\n\\nMaterializing 1 feature views to 2022-07-07 08:00:03+00:00 into the sqlite online\\nstore.\\ndriver_hourly_stats from 2022-07-06 16:25:47+00:00 to 2022-07-07 08:00:03+00:00:\\n100%|████████████████████████████████████████████| 5/5 [00:00<00:00, 592.05it/s]\\n```\n\n当然也可以使用 ```materialize``` 显式指定开始时间(startdt)和截止时间(enddt), feast 会将指定时间版本的离线特征库数据同步到在线特征库\n\n```\\nfeast materialize 2022-07-13T00:00:00 2022-07-19T00:00:00\\n\\nMaterializing 1 feature views from 2022-07-13 00:00:00+00:00 to 2022-07-19 00:00:00+00:00 into the dynamodb online store.\\ndriver_hourly_stats:\\n100%|█████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 51.18it/s]\\n```\n\n\n###### **在线特征查询**\n\n\n```\\n>>> online_features = fs.get_online_features(\\n features=features, entity_rows=[{\\"driver_id\\": 1001}, {\\"driver_id\\": 1002}],\\n ).to_dict()\\n>>> print(pd.DataFrame.from_dict(online_features))\\n acc_rate conv_rate driver_id\\n0 0.179407 0.984951 1001\\n1 0.023422 0.069323 1002\\n```\n\n\n#### **Feast offline store on Spark 方案**\n\n\n上文我们看到的是 Feast 依托 [Amazon Redshift](https://aws.amazon.com/cn/redshift/?trk=cndc-detail) 作为离线特征库存储和特征抽取的方案,虽然安装部署简介明快,上手方便,但 Redshift 定位是云服务数据仓库,虽然在 sql 兼容性、扩展性上优秀,但灵活性不足,如:\n\n- 离线特征抽取必须要指定 event_timestamp 版本,无法直接查询最新 snapshot\n- point-in-time 关联查询直接拼接 partition over 分组 sql 并下压,海量数据情况下,多历史版本的特征库 time travel 抽取时会膨胀数倍,存在性能瓶颈\n\nFeast 自0.19版本开始,支持 Spark 作为离线特征库历史数据提取,版本查询,同步在线特征库的计算框架\n\nSpark 作为高性能分布式计算引擎,在海量数据场景下性能优异,且使用 Spark 时,Feast FeatureView 的 DataSource 既可以是指向 Hive 中的表,也可以是指向对象存储上的文件,通过 Hive 表可以兼容诸如 Hudi、iceberg 等多种数据湖架构。\n\n同时,通过 Spark 离线特征库抽取的特征数据,Feast 将其封装为 Spark DataFrame,从而可以方便的加载到 S3分布式存储,因而也避免了 Pandas DataFrame 保存在本地磁盘的存储空间问题。\n\n\n##### **Feast point-in-time correct join Spark 实现**\n\n\npoint-in-time correct join,根据源码来看,使用 pySpark+SparkSQL 实现,因此整体思路和 Redshift 类似:\n\n- 将 entity_df 由 DataFrame 转化为 Spark DataFrame,并注册成临时表\n- 根据用户指定要关联的 features,找到对应的 FeatureView,进而找到底层的 DataSource 和相关的元数据\n- 根据以上信息,即 query_context,通过 jinjia 渲染一个 SparkSQL,并提交给 Spark 集群计算\n- 计算完成的结果就是实现 point-in-time correct join 之后的 training dataset\n\n\n##### **Feast offline store on [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail)安装部署**\n\n\n[Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) 是全托管的 hadoop 大数据集群,提供了良好的弹性伸缩,高可用,存算分离等特性,且通过 EMRFS 原生集成 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)云存储,用于承载 Feast 的 Spark 离线特征库具有天然的亲和力。\n\n以下详细介绍 Feast Spark 离线特征库在 [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) 的安装部署步骤及使用方法\n\n\n###### **启动 [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) 集群**\n\n\n[Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) 的启动方法本文不再赘述,感兴趣的同学可以参阅 [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) 文档\n\n此处选择 emr 6.5版本,Spark 3.1.2\n\n![image.png](https://dev-media.amazoncloud.cn/014d764b306f4627b4ee11e65c521e7a_image.png)\n\n\n###### **Offline store on EMR 特征库配置**\n\n\n我们在 emr 主节点上可以 feast init 特征库,从而直接利用 [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail)上spark 与 S3的原生集成,通过 emrfs 读写 S3数据湖上各种格式文件,不再需要 hadoop s3开源 lib 的支持\n\nfeast init my_project 后,在该特征库的 yaml 配置文件中,指定 Feast spark 的对应参数即可:\n\n```\\nproject: feast_spark_project\\nregistry: data/registry.db\\nprovider: local\\noffline_store:\\n type: spark\\n spark_conf:\\n spark.master: yarn\\n spark.ui.enabled: \\"true\\"\\n spark.eventLog.enabled: \\"true\\"\\n spark.sql.catalogImplementation: \\"hive\\"\\n spark.sql.parser.quotedRegexColumnNames: \\"true\\"\\n spark.sql.session.timeZone: \\"UTC\\"\\n```\n\n配置完成后,通过 feast apply cli 同样部署到 EMR spark\n\n注:在 EMR master 节点上 pyspark lib 路径需要在环境变量中设置,以便 feast 找到 spark 的 home 目录及相应配置\n\n```\\nsource /etc/spark/conf/spark-env.sh\\nexport PYTHONPATH=\\"\${SPARK_HOME}/python/:\$PYTHONPATH\\"\\nexport PYTHONPATH=\\"\${SPARK_HOME}/python/lib/py4j-0.10.9-src.zip:\$PYTHONPATH\\"\\n```\n\n###### **Feast on Spark 离线特征库元数据**\n\n```\\nfrom feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource,)\\n\\ndriver_hourly_stats= SparkSource(\\n name=\\"driver_hourly_stats\\",\\n query=\\"SELECT event_timestamp as ts, created_timestamp as created, conv_rate,conv_rate,conv_rate FROM emr_feature_store.driver_hourly_stats\\",\\n event_timestamp_column=\\"ts\\",\\n created_timestamp_column=\\"created\\"\\n )\\n\\n```\n \nFeast 的 sparkSource 提供了 query, table,及原始 raw 文件路径几种初始化方法,本文中使用 query 方式。\n\n需要注意 query 方式中,需要指定 event timestamp field 特征字段以便 Feast 识别作为 point-in-time cross join 时间版本抽取及特征 join 的依据\n\n\n###### **Feast Spark offline store 执行**\n\n\n配置 Spark 作为 Feast offline store 后,通过 [Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail) 上 spark history UI,可以清楚的看到其 get_historical_features 方法,底层 Feast 使用 SparkSQL 创建临时视图,拼接 event time join 的 sql,并查询上文中 source 数据湖上 hive 库表等各个步骤的业务逻辑:\n\n![image.png](https://dev-media.amazoncloud.cn/116184a35b2741b8a940beb38d37c77b_image.png)\n\n跟踪 Spark history UI 上,Spark Sql 的各个 query 可以看到,Feast 的 get_historical_features 方法执行时,会构造临时表 entity_dataframe,即用户调用 get_historical_features 方法时,传入的样本列表。再构建 driver_hourly_stats_base,即需要 join 及 point-in-time 查询的即样例特征时序表\n\n```\\n== Parsed Logical Plan ==\\n'CreateViewStatement [driver_hourly_stats__cleaned], (\\n\\n WITH driver_hourly_stats__entity_dataframe AS (\\n SELECT\\n driver_id,\\n entity_timestamp,\\n driver_hourly_stats__entity_row_unique_id\\n FROM entity_dataframe\\n GROUP BY\\n driver_id,\\n entity_timestamp,\\n driver_hourly_stats__entity_row_unique_id\\n ),\\n\\ndriver_hourly_stats__base AS (\\n SELECT\\n subquery.*,\\n entity_dataframe.entity_timestamp,\\n entity_dataframe.driver_hourly_stats__entity_row_unique_id\\n FROM driver_hourly_stats__subquery AS subquery\\n INNER JOIN driver_hourly_stats__entity_dataframe AS entity_dataframe\\n ON TRUE\\n AND subquery.event_timestamp <= entity_dataframe.entity_timestamp\\n\\n \\n AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - 86400 * interval '1' second\\n \\n\\n \\n AND subquery.driver_id = entity_dataframe.driver_id\\n \\n ),\\n```\n\n后续的 subquery、dedup 及 cleaned 子查询,会基于以上的两张基础表,进行基于特征标识字段 driver_id 和时序时间戳字段 event_timestamp 的分组排序,剔重等操作,最后 join 样本列表临时表 entity_dataframe,整个流程与 Redshift 上基本一致\n\n```\\ndriver_hourly_stats__subquery AS (\\n SELECT\\n ts as event_timestamp,\\n created as created_timestamp,\\n driver_id AS driver_id,\\n \\n conv_rate as conv_rate, \\n \\n acc_rate as acc_rate\\n \\n FROM (SELECT driver_id,event_timestamp as ts, created_timestamp as created, conv_rate,acc_rate,avg_daily_trips FROM emr_feature_store.driver_hourly_stats)\\n WHERE ts <= '2022-07-25T03:27:05.903000'\\n \\n AND ts >= '2022-07-21T03:27:05.903000'\\n \\n ),\\n\\n driver_hourly_stats__dedup AS (\\n SELECT\\n driver_hourly_stats__entity_row_unique_id,\\n event_timestamp,\\n MAX(created_timestamp) as created_timestamp\\n FROM driver_hourly_stats__base\\n GROUP BY driver_hourly_stats__entity_row_unique_id, event_timestamp\\n ),\\ndriver_hourly_stats__latest AS (\\n SELECT\\n event_timestamp,\\n created_timestamp,\\n driver_hourly_stats__entity_row_unique_id\\n FROM\\n (\\n SELECT *,\\n ROW_NUMBER() OVER(\\n PARTITION BY driver_hourly_stats__entity_row_unique_id\\n ORDER BY event_timestamp DESC,created_timestamp DESC\\n ) AS row_number\\n FROM driver_hourly_stats__base\\n \\n INNER JOIN driver_hourly_stats__dedup\\n USING (driver_hourly_stats__entity_row_unique_id, event_timestamp, created_timestamp)\\n \\n )\\n WHERE row_number = 1\\n )\\n```\n\nAPI 结果返回可以 to_df 为 Spark 的 Dataframe,从而实现 remote 存储离线特征库抽取结果数据的操作,这也从另一方面解决了原有 Redshift 离线特征存储,特征抽取只能返回 pandas Dataframe 的劣势,在大数据量离线特征场景下更有优势\n\n\n#### **总结**\n\n\n综上所述,Feast 框架整体架构和在 Amazon 的构建是非常简洁明快的,对构建 MLOps 平台的用户而言,其主要有价值的优势如下:\n\n- 同时提供了离线,在线特征库,离线-在线特征库快照版本同步功能\n- 轻量级,快速部署使用, 代码即配置,feast apply 即可部署到 Amazon\n- 通过 repository 文件系统隔离特征库,方便 MLOps 多租户多 CICD 协同开发\n- API 抽象程度高,贴近 AI/ML 算法工程师业务语言\n\n对于海量离线特征数据抽取时 point-in-time cross join 的版本查询数据膨胀的业界难点,Feast 也可以通过 on EMR Spark 的构建方式,优化解决其性能问题\n\n\n#### **参考资料**\n\n\nAmazon Sagemaker Feature Store: [https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html](https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html)\n\nFeast官方:[https://docs.feast.dev/getting-started/architecture-and-components/overview](https://docs.feast.dev/getting-started/architecture-and-components/overview)\n\n[Amazon EMR](https://aws.amazon.com/cn/emr/?trk=cndc-detail)集群部署:[https://docs.aws.amazon.com/zh_cn/emr/latest/ManagementGuide/emr-what-is-emr.html](https://docs.aws.amazon.com/zh_cn/emr/latest/ManagementGuide/emr-what-is-emr.html)\n\n\n#### **本篇作者**\n\n\n![image.png](https://dev-media.amazoncloud.cn/251089bc94a3449988ff3e039457bd75_image.png)\n\n\n#### **唐清原**\n\n\nAmazon 数据分析解决方案架构师,负责 Amazon Data Analytic 服务方案架构设计以及性能优化,迁移,治理等 Deep Dive 支持。10+数据领域研发及架构设计经验,历任 Oracle 高级咨询顾问,咪咕文化数据集市高级架构师,澳新银行数据分析领域架构师职务。在大数据,数据湖,智能湖仓,及相关推荐系统 /MLOps 平台等项目有丰富实战经验","render":"<h4><a id=\\"_2\\"></a><strong>背景&amp;引言</strong></h4>\\n<p>众所周知,AI 算法模型开发落地有三个主要阶段:数据准备、模型训练、模型部署。目前已经有较多厂商及开源社区推出通用的 AI、MLOps 平台支撑模型训练与部署阶段,但主要偏重于机器学习模型开发,部署,服务层面,自 2019 年后才陆续有各厂商推出数据准备支撑阶段的产品及服务,即特征平台(如 <a href=\\"https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html\\" target=\\"_blank\\">Amazon Sagemaker feature Store</a>)。</p>\\n<p>特征平台的主要能力包含:特征注册中心、离线存储&amp;消费、在线存储&amp;消费、离线&amp;在线特征同步,特征版本,尤其特征版本最为重要,实现特征 point-in-time cross join,避免特征穿越造成 train-server skew 的重要功能特性。</p>\n<p>各个厂商在特征平台的架构和实现方式方面迥然不同,缺乏跨平台的通用的特征库方案。</p>\n<p>Feast (<strong>Fea</strong>ture <strong>St</strong>ore) 是一套开源特征库框架,纯 python 框架,与 Pandas dataframe 无缝集成,对 ML,AI 算法工程师友好,它提供了在线,离线特征库注册,特征库存储,特征数据摄取、训练数据检索、特征版本、离线-在线特征同步等功能;且具有云原生亲和力,可以构建在多个公有云平台上。</p>\\n<p>本文介绍了 Feast 框架的整体架构及设计思路,并 step by step 详细说明了 Feast on Amazon 集成和使用,包括安装部署离线/在线特征库、使用特征库、特征库同步的方法等。对于使用 Feast 开源框架构建 MLOps 平台的用户,本文可以作为快速构建和开发指南。</p>\n<h4><a id=\\"Feast__16\\"></a><strong>Feast 整体架构</strong></h4>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/e9c40747242947c4818e05e014ee05de_image.png\\" alt=\\"image.png\\" /></p>\n<p>Feast的主要功能组件:</p>\n<ul>\\n<li><strong>Feast Repo&amp;Registry</strong>:轻量级的目录级及 Split 文件数据库格式 Repository,用于特征库基础设施及元数据注册</li>\\n<li><strong>Feast Python SDK/CLI</strong>: 开发构建及使用特征库的主要功能组件\n<ul>\\n<li><strong>Feast Apply</strong>:命令行工具执行安装部署配置的特征库到底层基础设施,并且注册特征库元数据到 Runtime 运行态</li>\\n<li><strong>Feast Materialize</strong>:离线-在线特征库版本同步工具</li>\\n<li><strong>Get Online Features</strong>:在线特征数据提取,调用对应的在线特征库基础设施 API 抽取特征数据,用于模型推断</li>\\n<li><strong>Get Historical Features</strong>:离线特征数据抽取,调用对应的离线特征库基础设施 API 抽取历史特征数据,用于模型训练或者特征组合</li>\\n</ul>\n</li>\\n<li><strong>Online Store</strong>: 在线特征库,根据不同云厂商的 nosql 数据库承载,存储特征快照版本数据</li>\\n<li><strong>Offline Store</strong>:离线特征库,根据不同云厂商数仓承载,存储特征历史版本数据</li>\\n</ul>\n<h4><a id=\\"Feast_On_Amazon__33\\"></a><strong>Feast On Amazon 安装部署方案</strong></h4>\\n<h5><a id=\\"_36\\"></a><strong>依赖准备</strong></h5>\\n<ul>\\n<li>Feast on Amazon 使用 Redshift 作为离线特征库,需要 Redshift 集群(如果采用 Spectrum 外部表,还需要 Spectrum 角色及 Glue Catalog 权限)</li>\n<li>Feast on Amazon 使用 DynamoDB 作为在线特征库,需要 DynamoDB 读写权限</li>\n<li>可以用 Terraform 或者 CloudFormation 准备需要的 Redshift,DDB,IAM 角色等</li>\n<li>以下使用 Terraform 为例安装部署 Feast 需要的 Redshift,S3,IAM 角色等各种基础设施</li>\n</ul>\\n<ol>\\n<li>安装部署 Terraform</li>\n</ol>\\n<pre><code class=\\"lang-\\">sudo yum install python3-devel\\nsudo yum install -y yum-utils\\nsudo yum-config-manager —add-repo https://rpm.releases.hashicorp.com/AmazonLinux/hashicorp.repo\\nsudo yum -y install terraform\\n</code></pre>\\n<ol start=\\"2\\">\\n<li>编写 Terraform 配置文件</li>\n</ol>\\n<pre><code class=\\"lang-\\">project: feast_aws_repo\\nregistry: data/registry.db\\nprovider: aws\\nonline_store:\\n type: dynamodb\\n region: ap-southeast-1\\noffline_store:\\n type: redshift\\n cluster_id: feast-demo2-redshift-cluster\\n region: ap-southeast-1\\n database: flinkstreamdb\\n user: awsuser\\n s3_staging_location: s3://feastdemobucket\\n iam_role: arn:aws:iam::**********:role/s3_spectrum_role\\n</code></pre>\\n<ol start=\\"3\\">\\n<li>构建基础设施</li>\n</ol>\\n<pre><code class=\\"lang-\\">cd infra\\nsudo terraform init\\nsudo terraform plan -var=&quot;admin_password=xxxxx&quot;\\nsudo terraform apply -var=&quot;admin_password=xxxxx&quot;\\n</code></pre>\\n<ol start=\\"4\\">\\n<li>如果需要 Spectrum 承载离线特征库,需要在 Redshift 中建立 Spectrum 外部 schema,以便指向Glue Catalog 中的 s3 外部表</li>\n</ol>\\n<pre><code class=\\"lang-\\">aws redshift-data execute-statement \\\\\\n —region ap-southeast-1 \\\\\\n —cluster-identifier feast-demo-redshift-cluster \\\\\\n —db-user awsuser \\\\\\n —database dev —sql &quot;create external schema spectrum from data catalog database 'flinkstreamdb' iam_role \\\\\\n 'arn:aws:iam::**********:role/s3_spectrum_role' create external database if not exists;“\\n</code></pre>\\n<h5><a id=\\"Feast__Repository__93\\"></a><strong>Feast 特征库 Repository 准备</strong></h5>\\n<ol>\\n<li>依赖安装及升级</li>\n</ol>\\n<pre><code class=\\"lang-\\">pip3 install -U numpy==1.21\\npip3 install feast[aws]\\n</code></pre>\\n<ol start=\\"2\\">\\n<li>初始化 repository</li>\n</ol>\\n<pre><code class=\\"lang-\\">feast init -t xxxxx(repository_name)\\nAWS Region (e.g. us-west-2): ap-southeast-1\\nRedshift Cluster ID: feast-demo-redshift-cluster\\nRedshift Database Name: flinkstreamdb\\nRedshift User Name: awsuser\\nRedshift S3 Staging Location (s3://*): s3://feastdemobucket\\nRedshift IAM Role for S3 (arn:aws:iam::*:role/*): arn:aws:iam::xxxxxx:role/s3_spectrum_role\\n</code></pre>\\n<p>创建好的特征库的 schema 及骨架示例:</p>\n<pre><code class=\\"lang-\\">\$ tree ./feast_aws_repo/\\n./feast_aws_repo/\\n├── data\\n│ └── registry.db\\n├── driver_repo.py\\n├── feature_store.yaml\\n</code></pre>\\n<ul>\\n<li>*.yam l 配置指定 Feast repository 的基础环境资源(s3、Redshift、DDB 等)</li>\n<li>*.py 配置特征库元数据,特征 view 及 schema 等</li>\n<li>db 保存基于 *.py 元数据构建后的特征组,特征库对象实例,以便运行态使用</li>\n</ul>\\n<p>安装部署后的 feature_store.yaml 示例:</p>\n<pre><code class=\\"lang-\\">project: feast_aws_repo\\nregistry: data/registry.db\\nprovider: aws\\nonline_store:\\n type: dynamodb\\n region: ap-southeast-1\\noffline_store:\\n type: redshift\\n cluster_id: feast-demo2-redshift-cluster\\n region: ap-southeast-1\\n database: flinkstreamdb\\n user: awsuser\\n s3_staging_location: s3://feastdemobucket\\n iam_role: arn:aws:iam::xxxxxxx:role/s3_spectrum_role\\n</code></pre>\\n<p>driver_repo 的司机行程特征库元数据示例:</p>\n<pre><code class=\\"lang-\\">from datetime import timedelta\\nfrom feast import Entity, Feature, FeatureView, RedshiftSource, ValueType\\ndriver = Entity(\\n name=&quot;driver_id&quot;,\\n join_key=&quot;driver_id&quot;,\\n value_type=ValueType.INT64,\\n)\\ndriver_stats_source = RedshiftSource(\\n table=&quot;feast_driver_hourly_stats&quot;,\\n event_timestamp_column=&quot;event_timestamp&quot;,\\n created_timestamp_column=&quot;created&quot;,\\n)\\n\\ndriver_stats_fv = FeatureView(\\n name=&quot;driver_hourly_stats&quot;,\\n entities=[&quot;driver_id&quot;],\\n ttl=timedelta(weeks=52),\\n features=[\\n Feature(name=&quot;conv_rate&quot;, dtype=ValueType.FLOAT),\\n Feature(name=&quot;acc_rate&quot;, dtype=ValueType.FLOAT),\\n Feature(name=&quot;avg_daily_trips&quot;, dtype=ValueType.INT64),\\n ],\\n batch_source=driver_stats_source,\\n tags={&quot;team&quot;: &quot;driver_performance&quot;},\\n)\\n</code></pre>\\n<p>部署成功后可以在 Redshift 看到离线特征库的 Spectuam schema 及库表,DDB 中可以看到在线特征库的表</p>\n<p>Redshift 离线特征库:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/09ed41c597d64ff38457a91c27ba0fa5_image.png\\" alt=\\"image.png\\" /></p>\n<p>DDB 在线特征库:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/e4b743025fbc49d0a2204d362c478a31_image.png\\" alt=\\"image.png\\" /></p>\n<h5><a id=\\"Feast_SDK_API_189\\"></a><strong>使用Feast SDK API进行特征库操作</strong></h5>\\n<h6><a id=\\"_192\\"></a><strong>连接特征库</strong></h6>\\n<p>安装部署完成后,在 python 代码中,可以方便的通过加载注册的 repository 路径,来连接到特征库及特征组</p>\n<p>在 repository 中注册的特征组,也可以直接 import 实例化</p>\n<pre><code class=\\"lang-\\">from datetime import datetime, timedelta\\nimport pandas as pd\\nfrom feast import FeatureStore\\nfrom driver_repo import driver, driver_stats_fv\\nfs = FeatureStore(repo_path=&quot;./&quot;)\\n&gt;&gt;&gt; print(fs)\\n&lt;feast.feature_store.FeatureStore object at 0x7f48d47098d0&gt;\\n&gt;&gt;&gt; print(driver_stats_fv)\\n{\\n &quot;spec&quot;: {\\n &quot;name&quot;: &quot;driver_hourly_stats&quot;,\\n &quot;entities&quot;: [\\n &quot;driver_id&quot;\\n ],\\n &quot;features&quot;: [\\n {\\n &quot;name&quot;: &quot;conv_rate&quot;,\\n &quot;valueType&quot;: &quot;FLOAT&quot;\\n },\\n {\\n &quot;name&quot;: &quot;acc_rate&quot;,\\n &quot;valueType&quot;: &quot;FLOAT&quot;\\n },\\n {\\n &quot;name&quot;: &quot;avg_daily_trips&quot;,\\n &quot;valueType&quot;: &quot;INT64&quot;\\n }\\n ],\\n &quot;tags&quot;: {\\n &quot;team&quot;: &quot;driver_performance&quot;\\n },\\n &quot;ttl&quot;: &quot;31449600s&quot;,\\n &quot;batchSource&quot;: {\\n &quot;type&quot;: &quot;BATCH_REDSHIFT&quot;,\\n &quot;eventTimestampColumn&quot;: &quot;event_timestamp&quot;,\\n &quot;createdTimestampColumn&quot;: &quot;created&quot;,\\n &quot;redshiftOptions&quot;: {\\n &quot;table&quot;: &quot;feast_driver_hourly_stats&quot;\\n },\\n &quot;dataSourceClassType&quot;: &quot;feast.infra.offline_stores.redshift_source.RedshiftSource&quot;\\n },\\n &quot;online&quot;: true\\n },\\n &quot;meta&quot;: {}\\n}\\n</code></pre>\\n<h6><a id=\\"_248\\"></a><strong>离线特征数据提取</strong></h6>\\n<p>通过 Feast get_historical_features API,可以抽取离线特征库数据用于离线训练或特征组合</p>\n<pre><code class=\\"lang-\\">features = [&quot;driver_hourly_stats:conv_rate&quot;, &quot;driver_hourly_stats:acc_rate&quot;]\\nentity_df = pd.DataFrame(\\n {\\n &quot;event_timestamp&quot;: [\\n pd.Timestamp(dt, unit=&quot;ms&quot;, tz=&quot;UTC&quot;).round(&quot;ms&quot;)\\n for dt in pd.date_range(\\n start=datetime.now() - timedelta(days=3),\\n end=datetime.now(),\\n periods=3,\\n )\\n ],\\n &quot;driver_id&quot;: [1001, 1002, 1003],\\n }\\n )\\n training_df = fs.get_historical_features(\\n features=features, entity_df=entity_df\\n ).to_df()\\n</code></pre>\\n<p>如上我们抽取特征标识(entity 字段为 driver_id)为 1001,1002,1003, 时间版本为最近 3 天的离线特征库数据</p>\n<pre><code class=\\"lang-\\">&gt;&gt;&gt; training_df\\n event_timestamp driver_id conv_rate acc_rate\\n0 2022-07-04 02:33:54.114 1001 0.036082 0.707744\\n1 2022-07-05 14:33:54.114 1002 0.522306 0.983233\\n2 2022-07-07 02:33:54.114 1003 0.734294 0.034062\\n</code></pre>\\n<h6><a id=\\"_284\\"></a><strong>离线特征组合</strong></h6>\\n<p>多个特征组需要联合并抽取作为模型训练时,get_historical_features 可以指定多个特征 view 的 features,基于 event_timestamp 做 point-in-time 关联,从而得到同一时间版本的离线特征组合的数据</p>\n<pre><code class=\\"lang-\\">feast_features = [\\n &quot;zipcode_features:city&quot;,\\n &quot;zipcode_features:state&quot;,\\n &quot;zipcode_features:location_type&quot;,\\n &quot;zipcode_features:tax_returns_filed&quot;,\\n &quot;zipcode_features:population&quot;,\\n &quot;zipcode_features:total_wages&quot;,\\n &quot;credit_history:credit_card_due&quot;,\\n &quot;credit_history:mortgage_due&quot;,\\n &quot;credit_history:student_loan_due&quot;,\\n &quot;credit_history:vehicle_loan_due&quot;,\\n &quot;credit_history:hard_pulls&quot;,\\n &quot;credit_history:missed_payments_2y&quot;,\\n &quot;credit_history:missed_payments_1y&quot;,\\n &quot;credit_history:missed_payments_6m&quot;,\\n &quot;credit_history:bankruptcies&quot;,\\n ]\\ntraining_df = self.fs.get_historical_features(\\n entity_df=entity_df, features=feast_features\\n).to_df()\\n</code></pre>\\n<p>如上代码示例,在抽取离线特征时,关联了 credit_history 和 zipcode_features 两个离线特征库的相应特征字段,Feast 会在后台拼接 Redshift Sql 关联对应的库表及 event_timestamp 等条件</p>\n<h6><a id=\\"_315\\"></a><strong>离线特征数据同步在线特征库</strong></h6>\\n<p>通过Feast 提供的 <code>materialize cli</code>,可以将指定时间版本的 Redshift 离线特征数据同步到 DynamoDB 的在线特征库中</p>\\n<p>materialize-incremental cli 会记录该 repository 特征库下每次同步的增量时间版本,因此每次执行会把自上次执行至今的最新数据增量同步到 DynamoDB</p>\n<pre><code class=\\"lang-\\">CURRENT_TIME=\$(date -u +&quot;%Y-%m-%dT%H:%M:%S&quot;)\\nfeast materialize-incremental \$CURRENT_TIME\\n\\nMaterializing 1 feature views to 2022-07-07 08:00:03+00:00 into the sqlite online\\nstore.\\ndriver_hourly_stats from 2022-07-06 16:25:47+00:00 to 2022-07-07 08:00:03+00:00:\\n100%|████████████████████████████████████████████| 5/5 [00:00&lt;00:00, 592.05it/s]\\n</code></pre>\\n<p>当然也可以使用 <code>materialize</code> 显式指定开始时间(startdt)和截止时间(enddt), feast 会将指定时间版本的离线特征库数据同步到在线特征库</p>\\n<pre><code class=\\"lang-\\">feast materialize 2022-07-13T00:00:00 2022-07-19T00:00:00\\n\\nMaterializing 1 feature views from 2022-07-13 00:00:00+00:00 to 2022-07-19 00:00:00+00:00 into the dynamodb online store.\\ndriver_hourly_stats:\\n100%|█████████████████████████████████████████████████████████████████| 5/5 [00:00&lt;00:00, 51.18it/s]\\n</code></pre>\\n<h6><a id=\\"_343\\"></a><strong>在线特征查询</strong></h6>\\n<pre><code class=\\"lang-\\">&gt;&gt;&gt; online_features = fs.get_online_features(\\n features=features, entity_rows=[{&quot;driver_id&quot;: 1001}, {&quot;driver_id&quot;: 1002}],\\n ).to_dict()\\n&gt;&gt;&gt; print(pd.DataFrame.from_dict(online_features))\\n acc_rate conv_rate driver_id\\n0 0.179407 0.984951 1001\\n1 0.023422 0.069323 1002\\n</code></pre>\\n<h4><a id=\\"Feast_offline_store_on_Spark__357\\"></a><strong>Feast offline store on Spark 方案</strong></h4>\\n<p>上文我们看到的是 Feast 依托 Amazon Redshift 作为离线特征库存储和特征抽取的方案,虽然安装部署简介明快,上手方便,但 Redshift 定位是云服务数据仓库,虽然在 sql 兼容性、扩展性上优秀,但灵活性不足,如:</p>\n<ul>\\n<li>离线特征抽取必须要指定 event_timestamp 版本,无法直接查询最新 snapshot</li>\n<li>point-in-time 关联查询直接拼接 partition over 分组 sql 并下压,海量数据情况下,多历史版本的特征库 time travel 抽取时会膨胀数倍,存在性能瓶颈</li>\n</ul>\\n<p>Feast 自0.19版本开始,支持 Spark 作为离线特征库历史数据提取,版本查询,同步在线特征库的计算框架</p>\n<p>Spark 作为高性能分布式计算引擎,在海量数据场景下性能优异,且使用 Spark 时,Feast FeatureView 的 DataSource 既可以是指向 Hive 中的表,也可以是指向对象存储上的文件,通过 Hive 表可以兼容诸如 Hudi、iceberg 等多种数据湖架构。</p>\n<p>同时,通过 Spark 离线特征库抽取的特征数据,Feast 将其封装为 Spark DataFrame,从而可以方便的加载到 S3分布式存储,因而也避免了 Pandas DataFrame 保存在本地磁盘的存储空间问题。</p>\n<h5><a id=\\"Feast_pointintime_correct_join_Spark__372\\"></a><strong>Feast point-in-time correct join Spark 实现</strong></h5>\\n<p>point-in-time correct join,根据源码来看,使用 pySpark+SparkSQL 实现,因此整体思路和 Redshift 类似:</p>\n<ul>\\n<li>将 entity_df 由 DataFrame 转化为 Spark DataFrame,并注册成临时表</li>\n<li>根据用户指定要关联的 features,找到对应的 FeatureView,进而找到底层的 DataSource 和相关的元数据</li>\n<li>根据以上信息,即 query_context,通过 jinjia 渲染一个 SparkSQL,并提交给 Spark 集群计算</li>\n<li>计算完成的结果就是实现 point-in-time correct join 之后的 training dataset</li>\n</ul>\\n<h5><a id=\\"Feast_offline_store_on_Amazon_EMR_383\\"></a><strong>Feast offline store on Amazon EMR安装部署</strong></h5>\\n<p>Amazon EMR 是全托管的 hadoop 大数据集群,提供了良好的弹性伸缩,高可用,存算分离等特性,且通过 EMRFS 原生集成 Amazon S3云存储,用于承载 Feast 的 Spark 离线特征库具有天然的亲和力。</p>\n<p>以下详细介绍 Feast Spark 离线特征库在 Amazon EMR 的安装部署步骤及使用方法</p>\n<h6><a id=\\"_Amazon_EMR__391\\"></a><strong>启动 Amazon EMR 集群</strong></h6>\\n<p>Amazon EMR 的启动方法本文不再赘述,感兴趣的同学可以参阅 Amazon EMR 文档</p>\n<p>此处选择 emr 6.5版本,Spark 3.1.2</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/014d764b306f4627b4ee11e65c521e7a_image.png\\" alt=\\"image.png\\" /></p>\n<h6><a id=\\"Offline_store_on_EMR__401\\"></a><strong>Offline store on EMR 特征库配置</strong></h6>\\n<p>我们在 emr 主节点上可以 feast init 特征库,从而直接利用 Amazon EMR上spark 与 S3的原生集成,通过 emrfs 读写 S3数据湖上各种格式文件,不再需要 hadoop s3开源 lib 的支持</p>\n<p>feast init my_project 后,在该特征库的 yaml 配置文件中,指定 Feast spark 的对应参数即可:</p>\n<pre><code class=\\"lang-\\">project: feast_spark_project\\nregistry: data/registry.db\\nprovider: local\\noffline_store:\\n type: spark\\n spark_conf:\\n spark.master: yarn\\n spark.ui.enabled: &quot;true&quot;\\n spark.eventLog.enabled: &quot;true&quot;\\n spark.sql.catalogImplementation: &quot;hive&quot;\\n spark.sql.parser.quotedRegexColumnNames: &quot;true&quot;\\n spark.sql.session.timeZone: &quot;UTC&quot;\\n</code></pre>\\n<p>配置完成后,通过 feast apply cli 同样部署到 EMR spark</p>\n<p>注:在 EMR master 节点上 pyspark lib 路径需要在环境变量中设置,以便 feast 找到 spark 的 home 目录及相应配置</p>\n<pre><code class=\\"lang-\\">source /etc/spark/conf/spark-env.sh\\nexport PYTHONPATH=&quot;\${SPARK_HOME}/python/:\$PYTHONPATH&quot;\\nexport PYTHONPATH=&quot;\${SPARK_HOME}/python/lib/py4j-0.10.9-src.zip:\$PYTHONPATH&quot;\\n</code></pre>\\n<h6><a id=\\"Feast_on_Spark__433\\"></a><strong>Feast on Spark 离线特征库元数据</strong></h6>\\n<pre><code class=\\"lang-\\">from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource,)\\n\\ndriver_hourly_stats= SparkSource(\\n name=&quot;driver_hourly_stats&quot;,\\n query=&quot;SELECT event_timestamp as ts, created_timestamp as created, conv_rate,conv_rate,conv_rate FROM emr_feature_store.driver_hourly_stats&quot;,\\n event_timestamp_column=&quot;ts&quot;,\\n created_timestamp_column=&quot;created&quot;\\n )\\n\\n</code></pre>\\n<p>Feast 的 sparkSource 提供了 query, table,及原始 raw 文件路径几种初始化方法,本文中使用 query 方式。</p>\n<p>需要注意 query 方式中,需要指定 event timestamp field 特征字段以便 Feast 识别作为 point-in-time cross join 时间版本抽取及特征 join 的依据</p>\n<h6><a id=\\"Feast_Spark_offline_store__452\\"></a><strong>Feast Spark offline store 执行</strong></h6>\\n<p>配置 Spark 作为 Feast offline store 后,通过 Amazon EMR 上 spark history UI,可以清楚的看到其 get_historical_features 方法,底层 Feast 使用 SparkSQL 创建临时视图,拼接 event time join 的 sql,并查询上文中 source 数据湖上 hive 库表等各个步骤的业务逻辑:</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/116184a35b2741b8a940beb38d37c77b_image.png\\" alt=\\"image.png\\" /></p>\n<p>跟踪 Spark history UI 上,Spark Sql 的各个 query 可以看到,Feast 的 get_historical_features 方法执行时,会构造临时表 entity_dataframe,即用户调用 get_historical_features 方法时,传入的样本列表。再构建 driver_hourly_stats_base,即需要 join 及 point-in-time 查询的即样例特征时序表</p>\n<pre><code class=\\"lang-\\">== Parsed Logical Plan ==\\n'CreateViewStatement [driver_hourly_stats__cleaned], (\\n\\n WITH driver_hourly_stats__entity_dataframe AS (\\n SELECT\\n driver_id,\\n entity_timestamp,\\n driver_hourly_stats__entity_row_unique_id\\n FROM entity_dataframe\\n GROUP BY\\n driver_id,\\n entity_timestamp,\\n driver_hourly_stats__entity_row_unique_id\\n ),\\n\\ndriver_hourly_stats__base AS (\\n SELECT\\n subquery.*,\\n entity_dataframe.entity_timestamp,\\n entity_dataframe.driver_hourly_stats__entity_row_unique_id\\n FROM driver_hourly_stats__subquery AS subquery\\n INNER JOIN driver_hourly_stats__entity_dataframe AS entity_dataframe\\n ON TRUE\\n AND subquery.event_timestamp &lt;= entity_dataframe.entity_timestamp\\n\\n \\n AND subquery.event_timestamp &gt;= entity_dataframe.entity_timestamp - 86400 * interval '1' second\\n \\n\\n \\n AND subquery.driver_id = entity_dataframe.driver_id\\n \\n ),\\n</code></pre>\\n<p>后续的 subquery、dedup 及 cleaned 子查询,会基于以上的两张基础表,进行基于特征标识字段 driver_id 和时序时间戳字段 event_timestamp 的分组排序,剔重等操作,最后 join 样本列表临时表 entity_dataframe,整个流程与 Redshift 上基本一致</p>\n<pre><code class=\\"lang-\\">driver_hourly_stats__subquery AS (\\n SELECT\\n ts as event_timestamp,\\n created as created_timestamp,\\n driver_id AS driver_id,\\n \\n conv_rate as conv_rate, \\n \\n acc_rate as acc_rate\\n \\n FROM (SELECT driver_id,event_timestamp as ts, created_timestamp as created, conv_rate,acc_rate,avg_daily_trips FROM emr_feature_store.driver_hourly_stats)\\n WHERE ts &lt;= '2022-07-25T03:27:05.903000'\\n \\n AND ts &gt;= '2022-07-21T03:27:05.903000'\\n \\n ),\\n\\n driver_hourly_stats__dedup AS (\\n SELECT\\n driver_hourly_stats__entity_row_unique_id,\\n event_timestamp,\\n MAX(created_timestamp) as created_timestamp\\n FROM driver_hourly_stats__base\\n GROUP BY driver_hourly_stats__entity_row_unique_id, event_timestamp\\n ),\\ndriver_hourly_stats__latest AS (\\n SELECT\\n event_timestamp,\\n created_timestamp,\\n driver_hourly_stats__entity_row_unique_id\\n FROM\\n (\\n SELECT *,\\n ROW_NUMBER() OVER(\\n PARTITION BY driver_hourly_stats__entity_row_unique_id\\n ORDER BY event_timestamp DESC,created_timestamp DESC\\n ) AS row_number\\n FROM driver_hourly_stats__base\\n \\n INNER JOIN driver_hourly_stats__dedup\\n USING (driver_hourly_stats__entity_row_unique_id, event_timestamp, created_timestamp)\\n \\n )\\n WHERE row_number = 1\\n )\\n</code></pre>\\n<p>API 结果返回可以 to_df 为 Spark 的 Dataframe,从而实现 remote 存储离线特征库抽取结果数据的操作,这也从另一方面解决了原有 Redshift 离线特征存储,特征抽取只能返回 pandas Dataframe 的劣势,在大数据量离线特征场景下更有优势</p>\n<h4><a id=\\"_550\\"></a><strong>总结</strong></h4>\\n<p>综上所述,Feast 框架整体架构和在 Amazon 的构建是非常简洁明快的,对构建 MLOps 平台的用户而言,其主要有价值的优势如下:</p>\n<ul>\\n<li>同时提供了离线,在线特征库,离线-在线特征库快照版本同步功能</li>\n<li>轻量级,快速部署使用, 代码即配置,feast apply 即可部署到 Amazon</li>\n<li>通过 repository 文件系统隔离特征库,方便 MLOps 多租户多 CICD 协同开发</li>\n<li>API 抽象程度高,贴近 AI/ML 算法工程师业务语言</li>\n</ul>\\n<p>对于海量离线特征数据抽取时 point-in-time cross join 的版本查询数据膨胀的业界难点,Feast 也可以通过 on EMR Spark 的构建方式,优化解决其性能问题</p>\n<h4><a id=\\"_563\\"></a><strong>参考资料</strong></h4>\\n<p>Amazon Sagemaker Feature Store: <a href=\\"https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html\\" target=\\"_blank\\">https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html</a></p>\\n<p>Feast官方:<a href=\\"https://docs.feast.dev/getting-started/architecture-and-components/overview\\" target=\\"_blank\\">https://docs.feast.dev/getting-started/architecture-and-components/overview</a></p>\\n<p>Amazon EMR集群部署:<a href=\\"https://docs.aws.amazon.com/zh_cn/emr/latest/ManagementGuide/emr-what-is-emr.html\\" target=\\"_blank\\">https://docs.aws.amazon.com/zh_cn/emr/latest/ManagementGuide/emr-what-is-emr.html</a></p>\\n<h4><a id=\\"_573\\"></a><strong>本篇作者</strong></h4>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/251089bc94a3449988ff3e039457bd75_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"_579\\"></a><strong>唐清原</strong></h4>\\n<p>Amazon 数据分析解决方案架构师,负责 Amazon Data Analytic 服务方案架构设计以及性能优化,迁移,治理等 Deep Dive 支持。10+数据领域研发及架构设计经验,历任 Oracle 高级咨询顾问,咪咕文化数据集市高级架构师,澳新银行数据分析领域架构师职务。在大数据,数据湖,智能湖仓,及相关推荐系统 /MLOps 平台等项目有丰富实战经验</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭