Feast on Amazon 解决方案

0
0
{"value":"\n\n#### **背景&引言**\n\n\n众所周知,AI 算法模型开发落地有三个主要阶段:数据准备、模型训练、模型部署。目前已经有较多厂商及开源社区推出通用的 AI、MLOps 平台支撑模型训练与部署阶段,但主要偏重于机器学习模型开发,部署,服务层面,自 2019 年后才陆续有各厂商推出数据准备支撑阶段的产品及服务,即特征平台(如 [Amazon Sagemaker feature Store](https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html))。\n\n特征平台的主要能力包含:特征注册中心、离线存储&消费、在线存储&消费、离线&在线特征同步,特征版本,尤其特征版本最为重要,实现特征 point-in-time cross join,避免特征穿越造成 train-server skew 的重要功能特性。\n\n各个厂商在特征平台的架构和实现方式方面迥然不同,缺乏跨平台的通用的特征库方案。\n\nFeast (**Fea**ture **St**ore) 是一套开源特征库框架,纯 python 框架,与 Pandas dataframe 无缝集成,对 ML,AI 算法工程师友好,它提供了在线,离线特征库注册,特征库存储,特征数据摄取、训练数据检索、特征版本、离线-在线特征同步等功能;且具有云原生亲和力,可以构建在多个公有云平台上。\n\n本文介绍了 Feast 框架的整体架构及设计思路,并 step by step 详细说明了 Feast on Amazon 集成和使用,包括安装部署离线/在线特征库、使用特征库、特征库同步的方法等。对于使用 Feast 开源框架构建 MLOps 平台的用户,本文可以作为快速构建和开发指南。\n\n\n#### **Feast 整体架构**\n\n\n![image.png](https://dev-media.amazoncloud.cn/e9c40747242947c4818e05e014ee05de_image.png)\n\nFeast的主要功能组件:\n\n- **Feast Repo&Registry**:轻量级的目录级及 Split 文件数据库格式 Repository,用于特征库基础设施及元数据注册\n- **Feast Python SDK/CLI**: 开发构建及使用特征库的主要功能组件\n - **Feast Apply**:命令行工具执行安装部署配置的特征库到底层基础设施,并且注册特征库元数据到 Runtime 运行态\n - **Feast Materialize**:离线-在线特征库版本同步工具\n - **Get Online Features**:在线特征数据提取,调用对应的在线特征库基础设施 API 抽取特征数据,用于模型推断\n - **Get Historical Features**:离线特征数据抽取,调用对应的离线特征库基础设施 API 抽取历史特征数据,用于模型训练或者特征组合\n- **Online Store**: 在线特征库,根据不同云厂商的 nosql 数据库承载,存储特征快照版本数据\n- **Offline Store**:离线特征库,根据不同云厂商数仓承载,存储特征历史版本数据\n\n\n#### **Feast On Amazon 安装部署方案**\n\n\n##### **依赖准备**\n\n\n- Feast on Amazon 使用 Redshift 作为离线特征库,需要 Redshift 集群(如果采用 Spectrum 外部表,还需要 Spectrum 角色及 Glue Catalog 权限)\n- Feast on Amazon 使用 DynamoDB 作为在线特征库,需要 DynamoDB 读写权限\n- 可以用 Terraform 或者 CloudFormation 准备需要的 Redshift,DDB,IAM 角色等\n- 以下使用 Terraform 为例安装部署 Feast 需要的 Redshift,S3,IAM 角色等各种基础设施\n\n1) 安装部署 Terraform\n\n```\nsudo yum install python3-devel\nsudo yum install -y yum-utils\nsudo yum-config-manager —add-repo https://rpm.releases.hashicorp.com/AmazonLinux/hashicorp.repo\nsudo yum -y install terraform\n```\n\n2) 编写 Terraform 配置文件\n\n```\nproject: feast_aws_repo\nregistry: data/registry.db\nprovider: aws\nonline_store:\n type: dynamodb\n region: ap-southeast-1\noffline_store:\n type: redshift\n cluster_id: feast-demo2-redshift-cluster\n region: ap-southeast-1\n database: flinkstreamdb\n user: awsuser\n s3_staging_location: s3://feastdemobucket\n iam_role: arn:aws:iam::**********:role/s3_spectrum_role\n```\n\n3) 构建基础设施\n\n```\ncd infra\nsudo terraform init\nsudo terraform plan -var=\"admin_password=xxxxx\"\nsudo terraform apply -var=\"admin_password=xxxxx\"\n```\n\n4) 如果需要 Spectrum 承载离线特征库,需要在 Redshift 中建立 Spectrum 外部 schema,以便指向Glue Catalog 中的 s3 外部表\n\n```\naws redshift-data execute-statement \\\n —region ap-southeast-1 \\\n —cluster-identifier feast-demo-redshift-cluster \\\n —db-user awsuser \\\n —database dev —sql \"create external schema spectrum from data catalog database 'flinkstreamdb' iam_role \\\n 'arn:aws:iam::**********:role/s3_spectrum_role' create external database if not exists;“\n```\n\n\n##### **Feast 特征库 Repository 准备**\n\n\n1) 依赖安装及升级\n\n```\npip3 install -U numpy==1.21\npip3 install feast[aws]\n```\n\n2) 初始化 repository\n\n```\nfeast init -t xxxxx(repository_name)\nAWS Region (e.g. us-west-2): ap-southeast-1\nRedshift Cluster ID: feast-demo-redshift-cluster\nRedshift Database Name: flinkstreamdb\nRedshift User Name: awsuser\nRedshift S3 Staging Location (s3://*): s3://feastdemobucket\nRedshift IAM Role for S3 (arn:aws:iam::*:role/*): arn:aws:iam::xxxxxx:role/s3_spectrum_role\n```\n\n创建好的特征库的 schema 及骨架示例:\n\n```\n$ tree ./feast_aws_repo/\n./feast_aws_repo/\n├── data\n│ └── registry.db\n├── driver_repo.py\n├── feature_store.yaml\n```\n\n- *.yam l 配置指定 Feast repository 的基础环境资源(s3、Redshift、DDB 等)\n- *.py 配置特征库元数据,特征 view 及 schema 等\n- db 保存基于 *.py 元数据构建后的特征组,特征库对象实例,以便运行态使用\n\n安装部署后的 feature_store.yaml 示例:\n\n```\nproject: feast_aws_repo\nregistry: data/registry.db\nprovider: aws\nonline_store:\n type: dynamodb\n region: ap-southeast-1\noffline_store:\n type: redshift\n cluster_id: feast-demo2-redshift-cluster\n region: ap-southeast-1\n database: flinkstreamdb\n user: awsuser\n s3_staging_location: s3://feastdemobucket\n iam_role: arn:aws:iam::xxxxxxx:role/s3_spectrum_role\n```\n\ndriver_repo 的司机行程特征库元数据示例:\n\n```\nfrom datetime import timedelta\nfrom feast import Entity, Feature, FeatureView, RedshiftSource, ValueType\ndriver = Entity(\n name=\"driver_id\",\n join_key=\"driver_id\",\n value_type=ValueType.INT64,\n)\ndriver_stats_source = RedshiftSource(\n table=\"feast_driver_hourly_stats\",\n event_timestamp_column=\"event_timestamp\",\n created_timestamp_column=\"created\",\n)\n\ndriver_stats_fv = FeatureView(\n name=\"driver_hourly_stats\",\n entities=[\"driver_id\"],\n ttl=timedelta(weeks=52),\n features=[\n Feature(name=\"conv_rate\", dtype=ValueType.FLOAT),\n Feature(name=\"acc_rate\", dtype=ValueType.FLOAT),\n Feature(name=\"avg_daily_trips\", dtype=ValueType.INT64),\n ],\n batch_source=driver_stats_source,\n tags={\"team\": \"driver_performance\"},\n)\n```\n\n部署成功后可以在 Redshift 看到离线特征库的 Spectuam schema 及库表,DDB 中可以看到在线特征库的表\n\nRedshift 离线特征库:\n\n![image.png](https://dev-media.amazoncloud.cn/09ed41c597d64ff38457a91c27ba0fa5_image.png)\n\nDDB 在线特征库:\n\n![image.png](https://dev-media.amazoncloud.cn/e4b743025fbc49d0a2204d362c478a31_image.png)\n\n##### **使用Feast SDK API进行特征库操作**\n\n\n###### **连接特征库**\n\n\n安装部署完成后,在 python 代码中,可以方便的通过加载注册的 repository 路径,来连接到特征库及特征组\n\n在 repository 中注册的特征组,也可以直接 import 实例化\n\n```\nfrom datetime import datetime, timedelta\nimport pandas as pd\nfrom feast import FeatureStore\nfrom driver_repo import driver, driver_stats_fv\nfs = FeatureStore(repo_path=\"./\")\n>>> print(fs)\n<feast.feature_store.FeatureStore object at 0x7f48d47098d0>\n>>> print(driver_stats_fv)\n{\n \"spec\": {\n \"name\": \"driver_hourly_stats\",\n \"entities\": [\n \"driver_id\"\n ],\n \"features\": [\n {\n \"name\": \"conv_rate\",\n \"valueType\": \"FLOAT\"\n },\n {\n \"name\": \"acc_rate\",\n \"valueType\": \"FLOAT\"\n },\n {\n \"name\": \"avg_daily_trips\",\n \"valueType\": \"INT64\"\n }\n ],\n \"tags\": {\n \"team\": \"driver_performance\"\n },\n \"ttl\": \"31449600s\",\n \"batchSource\": {\n \"type\": \"BATCH_REDSHIFT\",\n \"eventTimestampColumn\": \"event_timestamp\",\n \"createdTimestampColumn\": \"created\",\n \"redshiftOptions\": {\n \"table\": \"feast_driver_hourly_stats\"\n },\n \"dataSourceClassType\": \"feast.infra.offline_stores.redshift_source.RedshiftSource\"\n },\n \"online\": true\n },\n \"meta\": {}\n}\n```\n\n\n###### **离线特征数据提取**\n\n\n通过 Feast get_historical_features API,可以抽取离线特征库数据用于离线训练或特征组合\n\n```\nfeatures = [\"driver_hourly_stats:conv_rate\", \"driver_hourly_stats:acc_rate\"]\nentity_df = pd.DataFrame(\n {\n \"event_timestamp\": [\n pd.Timestamp(dt, unit=\"ms\", tz=\"UTC\").round(\"ms\")\n for dt in pd.date_range(\n start=datetime.now() - timedelta(days=3),\n end=datetime.now(),\n periods=3,\n )\n ],\n \"driver_id\": [1001, 1002, 1003],\n }\n )\n training_df = fs.get_historical_features(\n features=features, entity_df=entity_df\n ).to_df()\n```\n\n如上我们抽取特征标识(entity 字段为 driver_id)为 1001,1002,1003, 时间版本为最近 3 天的离线特征库数据\n\n```\n>>> training_df\n event_timestamp driver_id conv_rate acc_rate\n0 2022-07-04 02:33:54.114 1001 0.036082 0.707744\n1 2022-07-05 14:33:54.114 1002 0.522306 0.983233\n2 2022-07-07 02:33:54.114 1003 0.734294 0.034062\n```\n\n\n###### **离线特征组合**\n\n\n多个特征组需要联合并抽取作为模型训练时,get_historical_features 可以指定多个特征 view 的 features,基于 event_timestamp 做 point-in-time 关联,从而得到同一时间版本的离线特征组合的数据\n\n ```\nfeast_features = [\n \"zipcode_features:city\",\n \"zipcode_features:state\",\n \"zipcode_features:location_type\",\n \"zipcode_features:tax_returns_filed\",\n \"zipcode_features:population\",\n \"zipcode_features:total_wages\",\n \"credit_history:credit_card_due\",\n \"credit_history:mortgage_due\",\n \"credit_history:student_loan_due\",\n \"credit_history:vehicle_loan_due\",\n \"credit_history:hard_pulls\",\n \"credit_history:missed_payments_2y\",\n \"credit_history:missed_payments_1y\",\n \"credit_history:missed_payments_6m\",\n \"credit_history:bankruptcies\",\n ]\ntraining_df = self.fs.get_historical_features(\n entity_df=entity_df, features=feast_features\n).to_df()\n```\n\n如上代码示例,在抽取离线特征时,关联了 credit_history 和 zipcode_features 两个离线特征库的相应特征字段,Feast 会在后台拼接 Redshift Sql 关联对应的库表及 event_timestamp 等条件\n\n\n###### **离线特征数据同步在线特征库**\n\n\n通过Feast 提供的 ```materialize cli```,可以将指定时间版本的 Redshift 离线特征数据同步到 DynamoDB 的在线特征库中\n\nmaterialize-incremental cli 会记录该 repository 特征库下每次同步的增量时间版本,因此每次执行会把自上次执行至今的最新数据增量同步到 DynamoDB\n\n```\nCURRENT_TIME=$(date -u +\"%Y-%m-%dT%H:%M:%S\")\nfeast materialize-incremental $CURRENT_TIME\n\nMaterializing 1 feature views to 2022-07-07 08:00:03+00:00 into the sqlite online\nstore.\ndriver_hourly_stats from 2022-07-06 16:25:47+00:00 to 2022-07-07 08:00:03+00:00:\n100%|████████████████████████████████████████████| 5/5 [00:00<00:00, 592.05it/s]\n```\n\n当然也可以使用 ```materialize``` 显式指定开始时间(startdt)和截止时间(enddt), feast 会将指定时间版本的离线特征库数据同步到在线特征库\n\n```\nfeast materialize 2022-07-13T00:00:00 2022-07-19T00:00:00\n\nMaterializing 1 feature views from 2022-07-13 00:00:00+00:00 to 2022-07-19 00:00:00+00:00 into the dynamodb online store.\ndriver_hourly_stats:\n100%|█████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 51.18it/s]\n```\n\n\n###### **在线特征查询**\n\n\n```\n>>> online_features = fs.get_online_features(\n features=features, entity_rows=[{\"driver_id\": 1001}, {\"driver_id\": 1002}],\n ).to_dict()\n>>> print(pd.DataFrame.from_dict(online_features))\n acc_rate conv_rate driver_id\n0 0.179407 0.984951 1001\n1 0.023422 0.069323 1002\n```\n\n\n#### **Feast offline store on Spark 方案**\n\n\n上文我们看到的是 Feast 依托 Amazon Redshift 作为离线特征库存储和特征抽取的方案,虽然安装部署简介明快,上手方便,但 Redshift 定位是云服务数据仓库,虽然在 sql 兼容性、扩展性上优秀,但灵活性不足,如:\n\n- 离线特征抽取必须要指定 event_timestamp 版本,无法直接查询最新 snapshot\n- point-in-time 关联查询直接拼接 partition over 分组 sql 并下压,海量数据情况下,多历史版本的特征库 time travel 抽取时会膨胀数倍,存在性能瓶颈\n\nFeast 自0.19版本开始,支持 Spark 作为离线特征库历史数据提取,版本查询,同步在线特征库的计算框架\n\nSpark 作为高性能分布式计算引擎,在海量数据场景下性能优异,且使用 Spark 时,Feast FeatureView 的 DataSource 既可以是指向 Hive 中的表,也可以是指向对象存储上的文件,通过 Hive 表可以兼容诸如 Hudi、iceberg 等多种数据湖架构。\n\n同时,通过 Spark 离线特征库抽取的特征数据,Feast 将其封装为 Spark DataFrame,从而可以方便的加载到 S3分布式存储,因而也避免了 Pandas DataFrame 保存在本地磁盘的存储空间问题。\n\n\n##### **Feast point-in-time correct join Spark 实现**\n\n\npoint-in-time correct join,根据源码来看,使用 pySpark+SparkSQL 实现,因此整体思路和 Redshift 类似:\n\n- 将 entity_df 由 DataFrame 转化为 Spark DataFrame,并注册成临时表\n- 根据用户指定要关联的 features,找到对应的 FeatureView,进而找到底层的 DataSource 和相关的元数据\n- 根据以上信息,即 query_context,通过 jinjia 渲染一个 SparkSQL,并提交给 Spark 集群计算\n- 计算完成的结果就是实现 point-in-time correct join 之后的 training dataset\n\n\n##### **Feast offline store on Amazon EMR安装部署**\n\n\nAmazon EMR 是全托管的 hadoop 大数据集群,提供了良好的弹性伸缩,高可用,存算分离等特性,且通过 EMRFS 原生集成 Amazon S3云存储,用于承载 Feast 的 Spark 离线特征库具有天然的亲和力。\n\n以下详细介绍 Feast Spark 离线特征库在 Amazon EMR 的安装部署步骤及使用方法\n\n\n###### **启动 Amazon EMR 集群**\n\n\nAmazon EMR 的启动方法本文不再赘述,感兴趣的同学可以参阅 Amazon EMR 文档\n\n此处选择 emr 6.5版本,Spark 3.1.2\n\n![image.png](https://dev-media.amazoncloud.cn/014d764b306f4627b4ee11e65c521e7a_image.png)\n\n\n###### **Offline store on EMR 特征库配置**\n\n\n我们在 emr 主节点上可以 feast init 特征库,从而直接利用 Amazon EMR上spark 与 S3的原生集成,通过 emrfs 读写 S3数据湖上各种格式文件,不再需要 hadoop s3开源 lib 的支持\n\nfeast init my_project 后,在该特征库的 yaml 配置文件中,指定 Feast spark 的对应参数即可:\n\n```\nproject: feast_spark_project\nregistry: data/registry.db\nprovider: local\noffline_store:\n type: spark\n spark_conf:\n spark.master: yarn\n spark.ui.enabled: \"true\"\n spark.eventLog.enabled: \"true\"\n spark.sql.catalogImplementation: \"hive\"\n spark.sql.parser.quotedRegexColumnNames: \"true\"\n spark.sql.session.timeZone: \"UTC\"\n```\n\n配置完成后,通过 feast apply cli 同样部署到 EMR spark\n\n注:在 EMR master 节点上 pyspark lib 路径需要在环境变量中设置,以便 feast 找到 spark 的 home 目录及相应配置\n\n```\nsource /etc/spark/conf/spark-env.sh\nexport PYTHONPATH=\"${SPARK_HOME}/python/:$PYTHONPATH\"\nexport PYTHONPATH=\"${SPARK_HOME}/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH\"\n```\n\n###### **Feast on Spark 离线特征库元数据**\n\n```\nfrom feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource,)\n\ndriver_hourly_stats= SparkSource(\n name=\"driver_hourly_stats\",\n query=\"SELECT event_timestamp as ts, created_timestamp as created, conv_rate,conv_rate,conv_rate FROM emr_feature_store.driver_hourly_stats\",\n event_timestamp_column=\"ts\",\n created_timestamp_column=\"created\"\n )\n\n```\n \nFeast 的 sparkSource 提供了 query, table,及原始 raw 文件路径几种初始化方法,本文中使用 query 方式。\n\n需要注意 query 方式中,需要指定 event timestamp field 特征字段以便 Feast 识别作为 point-in-time cross join 时间版本抽取及特征 join 的依据\n\n\n###### **Feast Spark offline store 执行**\n\n\n配置 Spark 作为 Feast offline store 后,通过 Amazon EMR 上 spark history UI,可以清楚的看到其 get_historical_features 方法,底层 Feast 使用 SparkSQL 创建临时视图,拼接 event time join 的 sql,并查询上文中 source 数据湖上 hive 库表等各个步骤的业务逻辑:\n\n![image.png](https://dev-media.amazoncloud.cn/116184a35b2741b8a940beb38d37c77b_image.png)\n\n跟踪 Spark history UI 上,Spark Sql 的各个 query 可以看到,Feast 的 get_historical_features 方法执行时,会构造临时表 entity_dataframe,即用户调用 get_historical_features 方法时,传入的样本列表。再构建 driver_hourly_stats_base,即需要 join 及 point-in-time 查询的即样例特征时序表\n\n```\n== Parsed Logical Plan ==\n'CreateViewStatement [driver_hourly_stats__cleaned], (\n\n WITH driver_hourly_stats__entity_dataframe AS (\n SELECT\n driver_id,\n entity_timestamp,\n driver_hourly_stats__entity_row_unique_id\n FROM entity_dataframe\n GROUP BY\n driver_id,\n entity_timestamp,\n driver_hourly_stats__entity_row_unique_id\n ),\n\ndriver_hourly_stats__base AS (\n SELECT\n subquery.*,\n entity_dataframe.entity_timestamp,\n entity_dataframe.driver_hourly_stats__entity_row_unique_id\n FROM driver_hourly_stats__subquery AS subquery\n INNER JOIN driver_hourly_stats__entity_dataframe AS entity_dataframe\n ON TRUE\n AND subquery.event_timestamp <= entity_dataframe.entity_timestamp\n\n \n AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - 86400 * interval '1' second\n \n\n \n AND subquery.driver_id = entity_dataframe.driver_id\n \n ),\n```\n\n后续的 subquery、dedup 及 cleaned 子查询,会基于以上的两张基础表,进行基于特征标识字段 driver_id 和时序时间戳字段 event_timestamp 的分组排序,剔重等操作,最后 join 样本列表临时表 entity_dataframe,整个流程与 Redshift 上基本一致\n\n```\ndriver_hourly_stats__subquery AS (\n SELECT\n ts as event_timestamp,\n created as created_timestamp,\n driver_id AS driver_id,\n \n conv_rate as conv_rate, \n \n acc_rate as acc_rate\n \n FROM (SELECT driver_id,event_timestamp as ts, created_timestamp as created, conv_rate,acc_rate,avg_daily_trips FROM emr_feature_store.driver_hourly_stats)\n WHERE ts <= '2022-07-25T03:27:05.903000'\n \n AND ts >= '2022-07-21T03:27:05.903000'\n \n ),\n\n driver_hourly_stats__dedup AS (\n SELECT\n driver_hourly_stats__entity_row_unique_id,\n event_timestamp,\n MAX(created_timestamp) as created_timestamp\n FROM driver_hourly_stats__base\n GROUP BY driver_hourly_stats__entity_row_unique_id, event_timestamp\n ),\ndriver_hourly_stats__latest AS (\n SELECT\n event_timestamp,\n created_timestamp,\n driver_hourly_stats__entity_row_unique_id\n FROM\n (\n SELECT *,\n ROW_NUMBER() OVER(\n PARTITION BY driver_hourly_stats__entity_row_unique_id\n ORDER BY event_timestamp DESC,created_timestamp DESC\n ) AS row_number\n FROM driver_hourly_stats__base\n \n INNER JOIN driver_hourly_stats__dedup\n USING (driver_hourly_stats__entity_row_unique_id, event_timestamp, created_timestamp)\n \n )\n WHERE row_number = 1\n )\n```\n\nAPI 结果返回可以 to_df 为 Spark 的 Dataframe,从而实现 remote 存储离线特征库抽取结果数据的操作,这也从另一方面解决了原有 Redshift 离线特征存储,特征抽取只能返回 pandas Dataframe 的劣势,在大数据量离线特征场景下更有优势\n\n\n#### **总结**\n\n\n综上所述,Feast 框架整体架构和在 Amazon 的构建是非常简洁明快的,对构建 MLOps 平台的用户而言,其主要有价值的优势如下:\n\n- 同时提供了离线,在线特征库,离线-在线特征库快照版本同步功能\n- 轻量级,快速部署使用, 代码即配置,feast apply 即可部署到 Amazon\n- 通过 repository 文件系统隔离特征库,方便 MLOps 多租户多 CICD 协同开发\n- API 抽象程度高,贴近 AI/ML 算法工程师业务语言\n\n对于海量离线特征数据抽取时 point-in-time cross join 的版本查询数据膨胀的业界难点,Feast 也可以通过 on EMR Spark 的构建方式,优化解决其性能问题\n\n\n#### **参考资料**\n\n\nAmazon Sagemaker Feature Store: [https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html](https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html)\n\nFeast官方:[https://docs.feast.dev/getting-started/architecture-and-components/overview](https://docs.feast.dev/getting-started/architecture-and-components/overview)\n\nAmazon EMR集群部署:[https://docs.aws.amazon.com/zh_cn/emr/latest/ManagementGuide/emr-what-is-emr.html](https://docs.aws.amazon.com/zh_cn/emr/latest/ManagementGuide/emr-what-is-emr.html)\n\n\n#### **本篇作者**\n\n\n![image.png](https://dev-media.amazoncloud.cn/251089bc94a3449988ff3e039457bd75_image.png)\n\n\n#### **唐清原**\n\n\nAmazon 数据分析解决方案架构师,负责 Amazon Data Analytic 服务方案架构设计以及性能优化,迁移,治理等 Deep Dive 支持。10+数据领域研发及架构设计经验,历任 Oracle 高级咨询顾问,咪咕文化数据集市高级架构师,澳新银行数据分析领域架构师职务。在大数据,数据湖,智能湖仓,及相关推荐系统 /MLOps 平台等项目有丰富实战经验","render":"<h4><a id=\"_2\"></a><strong>背景&amp;引言</strong></h4>\n<p>众所周知,AI 算法模型开发落地有三个主要阶段:数据准备、模型训练、模型部署。目前已经有较多厂商及开源社区推出通用的 AI、MLOps 平台支撑模型训练与部署阶段,但主要偏重于机器学习模型开发,部署,服务层面,自 2019 年后才陆续有各厂商推出数据准备支撑阶段的产品及服务,即特征平台(如 <a href=\"https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html\" target=\"_blank\">Amazon Sagemaker feature Store</a>)。</p>\n<p>特征平台的主要能力包含:特征注册中心、离线存储&amp;消费、在线存储&amp;消费、离线&amp;在线特征同步,特征版本,尤其特征版本最为重要,实现特征 point-in-time cross join,避免特征穿越造成 train-server skew 的重要功能特性。</p>\n<p>各个厂商在特征平台的架构和实现方式方面迥然不同,缺乏跨平台的通用的特征库方案。</p>\n<p>Feast (<strong>Fea</strong>ture <strong>St</strong>ore) 是一套开源特征库框架,纯 python 框架,与 Pandas dataframe 无缝集成,对 ML,AI 算法工程师友好,它提供了在线,离线特征库注册,特征库存储,特征数据摄取、训练数据检索、特征版本、离线-在线特征同步等功能;且具有云原生亲和力,可以构建在多个公有云平台上。</p>\n<p>本文介绍了 Feast 框架的整体架构及设计思路,并 step by step 详细说明了 Feast on Amazon 集成和使用,包括安装部署离线/在线特征库、使用特征库、特征库同步的方法等。对于使用 Feast 开源框架构建 MLOps 平台的用户,本文可以作为快速构建和开发指南。</p>\n<h4><a id=\"Feast__16\"></a><strong>Feast 整体架构</strong></h4>\n<p><img src=\"https://dev-media.amazoncloud.cn/e9c40747242947c4818e05e014ee05de_image.png\" alt=\"image.png\" /></p>\n<p>Feast的主要功能组件:</p>\n<ul>\n<li><strong>Feast Repo&amp;Registry</strong>:轻量级的目录级及 Split 文件数据库格式 Repository,用于特征库基础设施及元数据注册</li>\n<li><strong>Feast Python SDK/CLI</strong>: 开发构建及使用特征库的主要功能组件\n<ul>\n<li><strong>Feast Apply</strong>:命令行工具执行安装部署配置的特征库到底层基础设施,并且注册特征库元数据到 Runtime 运行态</li>\n<li><strong>Feast Materialize</strong>:离线-在线特征库版本同步工具</li>\n<li><strong>Get Online Features</strong>:在线特征数据提取,调用对应的在线特征库基础设施 API 抽取特征数据,用于模型推断</li>\n<li><strong>Get Historical Features</strong>:离线特征数据抽取,调用对应的离线特征库基础设施 API 抽取历史特征数据,用于模型训练或者特征组合</li>\n</ul>\n</li>\n<li><strong>Online Store</strong>: 在线特征库,根据不同云厂商的 nosql 数据库承载,存储特征快照版本数据</li>\n<li><strong>Offline Store</strong>:离线特征库,根据不同云厂商数仓承载,存储特征历史版本数据</li>\n</ul>\n<h4><a id=\"Feast_On_Amazon__33\"></a><strong>Feast On Amazon 安装部署方案</strong></h4>\n<h5><a id=\"_36\"></a><strong>依赖准备</strong></h5>\n<ul>\n<li>Feast on Amazon 使用 Redshift 作为离线特征库,需要 Redshift 集群(如果采用 Spectrum 外部表,还需要 Spectrum 角色及 Glue Catalog 权限)</li>\n<li>Feast on Amazon 使用 DynamoDB 作为在线特征库,需要 DynamoDB 读写权限</li>\n<li>可以用 Terraform 或者 CloudFormation 准备需要的 Redshift,DDB,IAM 角色等</li>\n<li>以下使用 Terraform 为例安装部署 Feast 需要的 Redshift,S3,IAM 角色等各种基础设施</li>\n</ul>\n<ol>\n<li>安装部署 Terraform</li>\n</ol>\n<pre><code class=\"lang-\">sudo yum install python3-devel\nsudo yum install -y yum-utils\nsudo yum-config-manager —add-repo https://rpm.releases.hashicorp.com/AmazonLinux/hashicorp.repo\nsudo yum -y install terraform\n</code></pre>\n<ol start=\"2\">\n<li>编写 Terraform 配置文件</li>\n</ol>\n<pre><code class=\"lang-\">project: feast_aws_repo\nregistry: data/registry.db\nprovider: aws\nonline_store:\n type: dynamodb\n region: ap-southeast-1\noffline_store:\n type: redshift\n cluster_id: feast-demo2-redshift-cluster\n region: ap-southeast-1\n database: flinkstreamdb\n user: awsuser\n s3_staging_location: s3://feastdemobucket\n iam_role: arn:aws:iam::**********:role/s3_spectrum_role\n</code></pre>\n<ol start=\"3\">\n<li>构建基础设施</li>\n</ol>\n<pre><code class=\"lang-\">cd infra\nsudo terraform init\nsudo terraform plan -var=&quot;admin_password=xxxxx&quot;\nsudo terraform apply -var=&quot;admin_password=xxxxx&quot;\n</code></pre>\n<ol start=\"4\">\n<li>如果需要 Spectrum 承载离线特征库,需要在 Redshift 中建立 Spectrum 外部 schema,以便指向Glue Catalog 中的 s3 外部表</li>\n</ol>\n<pre><code class=\"lang-\">aws redshift-data execute-statement \\\n —region ap-southeast-1 \\\n —cluster-identifier feast-demo-redshift-cluster \\\n —db-user awsuser \\\n —database dev —sql &quot;create external schema spectrum from data catalog database 'flinkstreamdb' iam_role \\\n 'arn:aws:iam::**********:role/s3_spectrum_role' create external database if not exists;“\n</code></pre>\n<h5><a id=\"Feast__Repository__93\"></a><strong>Feast 特征库 Repository 准备</strong></h5>\n<ol>\n<li>依赖安装及升级</li>\n</ol>\n<pre><code class=\"lang-\">pip3 install -U numpy==1.21\npip3 install feast[aws]\n</code></pre>\n<ol start=\"2\">\n<li>初始化 repository</li>\n</ol>\n<pre><code class=\"lang-\">feast init -t xxxxx(repository_name)\nAWS Region (e.g. us-west-2): ap-southeast-1\nRedshift Cluster ID: feast-demo-redshift-cluster\nRedshift Database Name: flinkstreamdb\nRedshift User Name: awsuser\nRedshift S3 Staging Location (s3://*): s3://feastdemobucket\nRedshift IAM Role for S3 (arn:aws:iam::*:role/*): arn:aws:iam::xxxxxx:role/s3_spectrum_role\n</code></pre>\n<p>创建好的特征库的 schema 及骨架示例:</p>\n<pre><code class=\"lang-\">$ tree ./feast_aws_repo/\n./feast_aws_repo/\n├── data\n│ └── registry.db\n├── driver_repo.py\n├── feature_store.yaml\n</code></pre>\n<ul>\n<li>*.yam l 配置指定 Feast repository 的基础环境资源(s3、Redshift、DDB 等)</li>\n<li>*.py 配置特征库元数据,特征 view 及 schema 等</li>\n<li>db 保存基于 *.py 元数据构建后的特征组,特征库对象实例,以便运行态使用</li>\n</ul>\n<p>安装部署后的 feature_store.yaml 示例:</p>\n<pre><code class=\"lang-\">project: feast_aws_repo\nregistry: data/registry.db\nprovider: aws\nonline_store:\n type: dynamodb\n region: ap-southeast-1\noffline_store:\n type: redshift\n cluster_id: feast-demo2-redshift-cluster\n region: ap-southeast-1\n database: flinkstreamdb\n user: awsuser\n s3_staging_location: s3://feastdemobucket\n iam_role: arn:aws:iam::xxxxxxx:role/s3_spectrum_role\n</code></pre>\n<p>driver_repo 的司机行程特征库元数据示例:</p>\n<pre><code class=\"lang-\">from datetime import timedelta\nfrom feast import Entity, Feature, FeatureView, RedshiftSource, ValueType\ndriver = Entity(\n name=&quot;driver_id&quot;,\n join_key=&quot;driver_id&quot;,\n value_type=ValueType.INT64,\n)\ndriver_stats_source = RedshiftSource(\n table=&quot;feast_driver_hourly_stats&quot;,\n event_timestamp_column=&quot;event_timestamp&quot;,\n created_timestamp_column=&quot;created&quot;,\n)\n\ndriver_stats_fv = FeatureView(\n name=&quot;driver_hourly_stats&quot;,\n entities=[&quot;driver_id&quot;],\n ttl=timedelta(weeks=52),\n features=[\n Feature(name=&quot;conv_rate&quot;, dtype=ValueType.FLOAT),\n Feature(name=&quot;acc_rate&quot;, dtype=ValueType.FLOAT),\n Feature(name=&quot;avg_daily_trips&quot;, dtype=ValueType.INT64),\n ],\n batch_source=driver_stats_source,\n tags={&quot;team&quot;: &quot;driver_performance&quot;},\n)\n</code></pre>\n<p>部署成功后可以在 Redshift 看到离线特征库的 Spectuam schema 及库表,DDB 中可以看到在线特征库的表</p>\n<p>Redshift 离线特征库:</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/09ed41c597d64ff38457a91c27ba0fa5_image.png\" alt=\"image.png\" /></p>\n<p>DDB 在线特征库:</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/e4b743025fbc49d0a2204d362c478a31_image.png\" alt=\"image.png\" /></p>\n<h5><a id=\"Feast_SDK_API_189\"></a><strong>使用Feast SDK API进行特征库操作</strong></h5>\n<h6><a id=\"_192\"></a><strong>连接特征库</strong></h6>\n<p>安装部署完成后,在 python 代码中,可以方便的通过加载注册的 repository 路径,来连接到特征库及特征组</p>\n<p>在 repository 中注册的特征组,也可以直接 import 实例化</p>\n<pre><code class=\"lang-\">from datetime import datetime, timedelta\nimport pandas as pd\nfrom feast import FeatureStore\nfrom driver_repo import driver, driver_stats_fv\nfs = FeatureStore(repo_path=&quot;./&quot;)\n&gt;&gt;&gt; print(fs)\n&lt;feast.feature_store.FeatureStore object at 0x7f48d47098d0&gt;\n&gt;&gt;&gt; print(driver_stats_fv)\n{\n &quot;spec&quot;: {\n &quot;name&quot;: &quot;driver_hourly_stats&quot;,\n &quot;entities&quot;: [\n &quot;driver_id&quot;\n ],\n &quot;features&quot;: [\n {\n &quot;name&quot;: &quot;conv_rate&quot;,\n &quot;valueType&quot;: &quot;FLOAT&quot;\n },\n {\n &quot;name&quot;: &quot;acc_rate&quot;,\n &quot;valueType&quot;: &quot;FLOAT&quot;\n },\n {\n &quot;name&quot;: &quot;avg_daily_trips&quot;,\n &quot;valueType&quot;: &quot;INT64&quot;\n }\n ],\n &quot;tags&quot;: {\n &quot;team&quot;: &quot;driver_performance&quot;\n },\n &quot;ttl&quot;: &quot;31449600s&quot;,\n &quot;batchSource&quot;: {\n &quot;type&quot;: &quot;BATCH_REDSHIFT&quot;,\n &quot;eventTimestampColumn&quot;: &quot;event_timestamp&quot;,\n &quot;createdTimestampColumn&quot;: &quot;created&quot;,\n &quot;redshiftOptions&quot;: {\n &quot;table&quot;: &quot;feast_driver_hourly_stats&quot;\n },\n &quot;dataSourceClassType&quot;: &quot;feast.infra.offline_stores.redshift_source.RedshiftSource&quot;\n },\n &quot;online&quot;: true\n },\n &quot;meta&quot;: {}\n}\n</code></pre>\n<h6><a id=\"_248\"></a><strong>离线特征数据提取</strong></h6>\n<p>通过 Feast get_historical_features API,可以抽取离线特征库数据用于离线训练或特征组合</p>\n<pre><code class=\"lang-\">features = [&quot;driver_hourly_stats:conv_rate&quot;, &quot;driver_hourly_stats:acc_rate&quot;]\nentity_df = pd.DataFrame(\n {\n &quot;event_timestamp&quot;: [\n pd.Timestamp(dt, unit=&quot;ms&quot;, tz=&quot;UTC&quot;).round(&quot;ms&quot;)\n for dt in pd.date_range(\n start=datetime.now() - timedelta(days=3),\n end=datetime.now(),\n periods=3,\n )\n ],\n &quot;driver_id&quot;: [1001, 1002, 1003],\n }\n )\n training_df = fs.get_historical_features(\n features=features, entity_df=entity_df\n ).to_df()\n</code></pre>\n<p>如上我们抽取特征标识(entity 字段为 driver_id)为 1001,1002,1003, 时间版本为最近 3 天的离线特征库数据</p>\n<pre><code class=\"lang-\">&gt;&gt;&gt; training_df\n event_timestamp driver_id conv_rate acc_rate\n0 2022-07-04 02:33:54.114 1001 0.036082 0.707744\n1 2022-07-05 14:33:54.114 1002 0.522306 0.983233\n2 2022-07-07 02:33:54.114 1003 0.734294 0.034062\n</code></pre>\n<h6><a id=\"_284\"></a><strong>离线特征组合</strong></h6>\n<p>多个特征组需要联合并抽取作为模型训练时,get_historical_features 可以指定多个特征 view 的 features,基于 event_timestamp 做 point-in-time 关联,从而得到同一时间版本的离线特征组合的数据</p>\n<pre><code class=\"lang-\">feast_features = [\n &quot;zipcode_features:city&quot;,\n &quot;zipcode_features:state&quot;,\n &quot;zipcode_features:location_type&quot;,\n &quot;zipcode_features:tax_returns_filed&quot;,\n &quot;zipcode_features:population&quot;,\n &quot;zipcode_features:total_wages&quot;,\n &quot;credit_history:credit_card_due&quot;,\n &quot;credit_history:mortgage_due&quot;,\n &quot;credit_history:student_loan_due&quot;,\n &quot;credit_history:vehicle_loan_due&quot;,\n &quot;credit_history:hard_pulls&quot;,\n &quot;credit_history:missed_payments_2y&quot;,\n &quot;credit_history:missed_payments_1y&quot;,\n &quot;credit_history:missed_payments_6m&quot;,\n &quot;credit_history:bankruptcies&quot;,\n ]\ntraining_df = self.fs.get_historical_features(\n entity_df=entity_df, features=feast_features\n).to_df()\n</code></pre>\n<p>如上代码示例,在抽取离线特征时,关联了 credit_history 和 zipcode_features 两个离线特征库的相应特征字段,Feast 会在后台拼接 Redshift Sql 关联对应的库表及 event_timestamp 等条件</p>\n<h6><a id=\"_315\"></a><strong>离线特征数据同步在线特征库</strong></h6>\n<p>通过Feast 提供的 <code>materialize cli</code>,可以将指定时间版本的 Redshift 离线特征数据同步到 DynamoDB 的在线特征库中</p>\n<p>materialize-incremental cli 会记录该 repository 特征库下每次同步的增量时间版本,因此每次执行会把自上次执行至今的最新数据增量同步到 DynamoDB</p>\n<pre><code class=\"lang-\">CURRENT_TIME=$(date -u +&quot;%Y-%m-%dT%H:%M:%S&quot;)\nfeast materialize-incremental $CURRENT_TIME\n\nMaterializing 1 feature views to 2022-07-07 08:00:03+00:00 into the sqlite online\nstore.\ndriver_hourly_stats from 2022-07-06 16:25:47+00:00 to 2022-07-07 08:00:03+00:00:\n100%|████████████████████████████████████████████| 5/5 [00:00&lt;00:00, 592.05it/s]\n</code></pre>\n<p>当然也可以使用 <code>materialize</code> 显式指定开始时间(startdt)和截止时间(enddt), feast 会将指定时间版本的离线特征库数据同步到在线特征库</p>\n<pre><code class=\"lang-\">feast materialize 2022-07-13T00:00:00 2022-07-19T00:00:00\n\nMaterializing 1 feature views from 2022-07-13 00:00:00+00:00 to 2022-07-19 00:00:00+00:00 into the dynamodb online store.\ndriver_hourly_stats:\n100%|█████████████████████████████████████████████████████████████████| 5/5 [00:00&lt;00:00, 51.18it/s]\n</code></pre>\n<h6><a id=\"_343\"></a><strong>在线特征查询</strong></h6>\n<pre><code class=\"lang-\">&gt;&gt;&gt; online_features = fs.get_online_features(\n features=features, entity_rows=[{&quot;driver_id&quot;: 1001}, {&quot;driver_id&quot;: 1002}],\n ).to_dict()\n&gt;&gt;&gt; print(pd.DataFrame.from_dict(online_features))\n acc_rate conv_rate driver_id\n0 0.179407 0.984951 1001\n1 0.023422 0.069323 1002\n</code></pre>\n<h4><a id=\"Feast_offline_store_on_Spark__357\"></a><strong>Feast offline store on Spark 方案</strong></h4>\n<p>上文我们看到的是 Feast 依托 Amazon Redshift 作为离线特征库存储和特征抽取的方案,虽然安装部署简介明快,上手方便,但 Redshift 定位是云服务数据仓库,虽然在 sql 兼容性、扩展性上优秀,但灵活性不足,如:</p>\n<ul>\n<li>离线特征抽取必须要指定 event_timestamp 版本,无法直接查询最新 snapshot</li>\n<li>point-in-time 关联查询直接拼接 partition over 分组 sql 并下压,海量数据情况下,多历史版本的特征库 time travel 抽取时会膨胀数倍,存在性能瓶颈</li>\n</ul>\n<p>Feast 自0.19版本开始,支持 Spark 作为离线特征库历史数据提取,版本查询,同步在线特征库的计算框架</p>\n<p>Spark 作为高性能分布式计算引擎,在海量数据场景下性能优异,且使用 Spark 时,Feast FeatureView 的 DataSource 既可以是指向 Hive 中的表,也可以是指向对象存储上的文件,通过 Hive 表可以兼容诸如 Hudi、iceberg 等多种数据湖架构。</p>\n<p>同时,通过 Spark 离线特征库抽取的特征数据,Feast 将其封装为 Spark DataFrame,从而可以方便的加载到 S3分布式存储,因而也避免了 Pandas DataFrame 保存在本地磁盘的存储空间问题。</p>\n<h5><a id=\"Feast_pointintime_correct_join_Spark__372\"></a><strong>Feast point-in-time correct join Spark 实现</strong></h5>\n<p>point-in-time correct join,根据源码来看,使用 pySpark+SparkSQL 实现,因此整体思路和 Redshift 类似:</p>\n<ul>\n<li>将 entity_df 由 DataFrame 转化为 Spark DataFrame,并注册成临时表</li>\n<li>根据用户指定要关联的 features,找到对应的 FeatureView,进而找到底层的 DataSource 和相关的元数据</li>\n<li>根据以上信息,即 query_context,通过 jinjia 渲染一个 SparkSQL,并提交给 Spark 集群计算</li>\n<li>计算完成的结果就是实现 point-in-time correct join 之后的 training dataset</li>\n</ul>\n<h5><a id=\"Feast_offline_store_on_Amazon_EMR_383\"></a><strong>Feast offline store on Amazon EMR安装部署</strong></h5>\n<p>Amazon EMR 是全托管的 hadoop 大数据集群,提供了良好的弹性伸缩,高可用,存算分离等特性,且通过 EMRFS 原生集成 Amazon S3云存储,用于承载 Feast 的 Spark 离线特征库具有天然的亲和力。</p>\n<p>以下详细介绍 Feast Spark 离线特征库在 Amazon EMR 的安装部署步骤及使用方法</p>\n<h6><a id=\"_Amazon_EMR__391\"></a><strong>启动 Amazon EMR 集群</strong></h6>\n<p>Amazon EMR 的启动方法本文不再赘述,感兴趣的同学可以参阅 Amazon EMR 文档</p>\n<p>此处选择 emr 6.5版本,Spark 3.1.2</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/014d764b306f4627b4ee11e65c521e7a_image.png\" alt=\"image.png\" /></p>\n<h6><a id=\"Offline_store_on_EMR__401\"></a><strong>Offline store on EMR 特征库配置</strong></h6>\n<p>我们在 emr 主节点上可以 feast init 特征库,从而直接利用 Amazon EMR上spark 与 S3的原生集成,通过 emrfs 读写 S3数据湖上各种格式文件,不再需要 hadoop s3开源 lib 的支持</p>\n<p>feast init my_project 后,在该特征库的 yaml 配置文件中,指定 Feast spark 的对应参数即可:</p>\n<pre><code class=\"lang-\">project: feast_spark_project\nregistry: data/registry.db\nprovider: local\noffline_store:\n type: spark\n spark_conf:\n spark.master: yarn\n spark.ui.enabled: &quot;true&quot;\n spark.eventLog.enabled: &quot;true&quot;\n spark.sql.catalogImplementation: &quot;hive&quot;\n spark.sql.parser.quotedRegexColumnNames: &quot;true&quot;\n spark.sql.session.timeZone: &quot;UTC&quot;\n</code></pre>\n<p>配置完成后,通过 feast apply cli 同样部署到 EMR spark</p>\n<p>注:在 EMR master 节点上 pyspark lib 路径需要在环境变量中设置,以便 feast 找到 spark 的 home 目录及相应配置</p>\n<pre><code class=\"lang-\">source /etc/spark/conf/spark-env.sh\nexport PYTHONPATH=&quot;${SPARK_HOME}/python/:$PYTHONPATH&quot;\nexport PYTHONPATH=&quot;${SPARK_HOME}/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH&quot;\n</code></pre>\n<h6><a id=\"Feast_on_Spark__433\"></a><strong>Feast on Spark 离线特征库元数据</strong></h6>\n<pre><code class=\"lang-\">from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource,)\n\ndriver_hourly_stats= SparkSource(\n name=&quot;driver_hourly_stats&quot;,\n query=&quot;SELECT event_timestamp as ts, created_timestamp as created, conv_rate,conv_rate,conv_rate FROM emr_feature_store.driver_hourly_stats&quot;,\n event_timestamp_column=&quot;ts&quot;,\n created_timestamp_column=&quot;created&quot;\n )\n\n</code></pre>\n<p>Feast 的 sparkSource 提供了 query, table,及原始 raw 文件路径几种初始化方法,本文中使用 query 方式。</p>\n<p>需要注意 query 方式中,需要指定 event timestamp field 特征字段以便 Feast 识别作为 point-in-time cross join 时间版本抽取及特征 join 的依据</p>\n<h6><a id=\"Feast_Spark_offline_store__452\"></a><strong>Feast Spark offline store 执行</strong></h6>\n<p>配置 Spark 作为 Feast offline store 后,通过 Amazon EMR 上 spark history UI,可以清楚的看到其 get_historical_features 方法,底层 Feast 使用 SparkSQL 创建临时视图,拼接 event time join 的 sql,并查询上文中 source 数据湖上 hive 库表等各个步骤的业务逻辑:</p>\n<p><img src=\"https://dev-media.amazoncloud.cn/116184a35b2741b8a940beb38d37c77b_image.png\" alt=\"image.png\" /></p>\n<p>跟踪 Spark history UI 上,Spark Sql 的各个 query 可以看到,Feast 的 get_historical_features 方法执行时,会构造临时表 entity_dataframe,即用户调用 get_historical_features 方法时,传入的样本列表。再构建 driver_hourly_stats_base,即需要 join 及 point-in-time 查询的即样例特征时序表</p>\n<pre><code class=\"lang-\">== Parsed Logical Plan ==\n'CreateViewStatement [driver_hourly_stats__cleaned], (\n\n WITH driver_hourly_stats__entity_dataframe AS (\n SELECT\n driver_id,\n entity_timestamp,\n driver_hourly_stats__entity_row_unique_id\n FROM entity_dataframe\n GROUP BY\n driver_id,\n entity_timestamp,\n driver_hourly_stats__entity_row_unique_id\n ),\n\ndriver_hourly_stats__base AS (\n SELECT\n subquery.*,\n entity_dataframe.entity_timestamp,\n entity_dataframe.driver_hourly_stats__entity_row_unique_id\n FROM driver_hourly_stats__subquery AS subquery\n INNER JOIN driver_hourly_stats__entity_dataframe AS entity_dataframe\n ON TRUE\n AND subquery.event_timestamp &lt;= entity_dataframe.entity_timestamp\n\n \n AND subquery.event_timestamp &gt;= entity_dataframe.entity_timestamp - 86400 * interval '1' second\n \n\n \n AND subquery.driver_id = entity_dataframe.driver_id\n \n ),\n</code></pre>\n<p>后续的 subquery、dedup 及 cleaned 子查询,会基于以上的两张基础表,进行基于特征标识字段 driver_id 和时序时间戳字段 event_timestamp 的分组排序,剔重等操作,最后 join 样本列表临时表 entity_dataframe,整个流程与 Redshift 上基本一致</p>\n<pre><code class=\"lang-\">driver_hourly_stats__subquery AS (\n SELECT\n ts as event_timestamp,\n created as created_timestamp,\n driver_id AS driver_id,\n \n conv_rate as conv_rate, \n \n acc_rate as acc_rate\n \n FROM (SELECT driver_id,event_timestamp as ts, created_timestamp as created, conv_rate,acc_rate,avg_daily_trips FROM emr_feature_store.driver_hourly_stats)\n WHERE ts &lt;= '2022-07-25T03:27:05.903000'\n \n AND ts &gt;= '2022-07-21T03:27:05.903000'\n \n ),\n\n driver_hourly_stats__dedup AS (\n SELECT\n driver_hourly_stats__entity_row_unique_id,\n event_timestamp,\n MAX(created_timestamp) as created_timestamp\n FROM driver_hourly_stats__base\n GROUP BY driver_hourly_stats__entity_row_unique_id, event_timestamp\n ),\ndriver_hourly_stats__latest AS (\n SELECT\n event_timestamp,\n created_timestamp,\n driver_hourly_stats__entity_row_unique_id\n FROM\n (\n SELECT *,\n ROW_NUMBER() OVER(\n PARTITION BY driver_hourly_stats__entity_row_unique_id\n ORDER BY event_timestamp DESC,created_timestamp DESC\n ) AS row_number\n FROM driver_hourly_stats__base\n \n INNER JOIN driver_hourly_stats__dedup\n USING (driver_hourly_stats__entity_row_unique_id, event_timestamp, created_timestamp)\n \n )\n WHERE row_number = 1\n )\n</code></pre>\n<p>API 结果返回可以 to_df 为 Spark 的 Dataframe,从而实现 remote 存储离线特征库抽取结果数据的操作,这也从另一方面解决了原有 Redshift 离线特征存储,特征抽取只能返回 pandas Dataframe 的劣势,在大数据量离线特征场景下更有优势</p>\n<h4><a id=\"_550\"></a><strong>总结</strong></h4>\n<p>综上所述,Feast 框架整体架构和在 Amazon 的构建是非常简洁明快的,对构建 MLOps 平台的用户而言,其主要有价值的优势如下:</p>\n<ul>\n<li>同时提供了离线,在线特征库,离线-在线特征库快照版本同步功能</li>\n<li>轻量级,快速部署使用, 代码即配置,feast apply 即可部署到 Amazon</li>\n<li>通过 repository 文件系统隔离特征库,方便 MLOps 多租户多 CICD 协同开发</li>\n<li>API 抽象程度高,贴近 AI/ML 算法工程师业务语言</li>\n</ul>\n<p>对于海量离线特征数据抽取时 point-in-time cross join 的版本查询数据膨胀的业界难点,Feast 也可以通过 on EMR Spark 的构建方式,优化解决其性能问题</p>\n<h4><a id=\"_563\"></a><strong>参考资料</strong></h4>\n<p>Amazon Sagemaker Feature Store: <a href=\"https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html\" target=\"_blank\">https://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/feature-store.html</a></p>\n<p>Feast官方:<a href=\"https://docs.feast.dev/getting-started/architecture-and-components/overview\" target=\"_blank\">https://docs.feast.dev/getting-started/architecture-and-components/overview</a></p>\n<p>Amazon EMR集群部署:<a href=\"https://docs.aws.amazon.com/zh_cn/emr/latest/ManagementGuide/emr-what-is-emr.html\" target=\"_blank\">https://docs.aws.amazon.com/zh_cn/emr/latest/ManagementGuide/emr-what-is-emr.html</a></p>\n<h4><a id=\"_573\"></a><strong>本篇作者</strong></h4>\n<p><img src=\"https://dev-media.amazoncloud.cn/251089bc94a3449988ff3e039457bd75_image.png\" alt=\"image.png\" /></p>\n<h4><a id=\"_579\"></a><strong>唐清原</strong></h4>\n<p>Amazon 数据分析解决方案架构师,负责 Amazon Data Analytic 服务方案架构设计以及性能优化,迁移,治理等 Deep Dive 支持。10+数据领域研发及架构设计经验,历任 Oracle 高级咨询顾问,咪咕文化数据集市高级架构师,澳新银行数据分析领域架构师职务。在大数据,数据湖,智能湖仓,及相关推荐系统 /MLOps 平台等项目有丰富实战经验</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭