Workflow as code+SageMaker, DolphinScheduler的机器学习选股系统新玩法

Python
DevOps
机器学习
Apache
Amazon SageMaker
0
0
![image.png](https://dev-media.amazoncloud.cn/91080a0566374193b15ed01112b1ca68_image.png "image.png") 作者 | 周捷光 Apache DolphinScheduler Committer > 摘要 > > Apache DolphinScheduler 已经在DataOps领域提供了强大的分布式可视化工作流调度能力,最新发布的3.1.0版本新增了[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)任务调度的能力,逐步开箱即用式地支持主流的MLOps项目/服务商的功能。目前已经有MLflow,DVC,Jupyter,OpenMLDB, SageMaker等任务组件可以让用户低成本,更容易地编排[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)系统。 > > 这次我们介绍基于 YAML 文件来创建[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)工作流,并在其中使用DolphinScheduler上调用[Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 来进行股票系统训练与预测。 ## PyDolphinScheduler 3.1.0 PyDolphinScheduler 3.1.0发布后,用户除了可以使用Python脚本定义工作流以外,也可以使用YAML文件定义工作流。 **Python** ![image.png](https://dev-media.amazoncloud.cn/aa2e4489fb5b43de8b787b16b648b64c_image.png "image.png") 执行 python tutorial.py即可提交工作流到DolphinScheduler中。 **YAML** ![image.png](https://dev-media.amazoncloud.cn/0386ca254042415784d0bd09ba43ff71_image.png "image.png") 执行 `pydolphinscheduler yaml -f tutorial.yaml`即可提交工作流到DolphinScheduler中。 对于喜欢用code形式工作流的的用户,可以直接使用以上两种方法来定义管理工作流,也可以直接搭配git来管理工作流,进行CICD等。 ## 系统介绍 前情回顾:[自动更新选股模型,实时监控,基于 Apache DolphinSchedule 打造机器学习智能选股系统](https://mp.weixin.qq.com/s/u15hzHaUJgIf8r_Gw5F1iw) 这次的[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)选股系统主要有两个新的玩法: 1. 使用pydolphinscheduler来支持workflow as code的方式构建工作流,无需在页面点点点,运行一行命令即可快速创建工作流并运行 2. 下载与处理完股票数据后,会将数据送到[Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail)中使用SageMaker Pipeline执行数据集处理,训练,评估以及批量推理 ### 系统概况 关于系统前后端更详细的介绍可翻阅之前介绍的[自动更新选股模型,实时监控,基于 Apache DolphinSchedule 打造机器学习智能选股系统](http://mp.weixin.qq.com/s?__biz=MzA4NDYxNTc2NA==&mid=2247510329&idx=1&sn=c12dd1e955d3f8f00e08340ee61c18a9&chksm=9fe69e02a891171469acdb8164a2d16b9b0b9266e4cd07b288e5ed1d6f83ecf6be222fed1c0d&scene=21#wechat_redirect)。 接下来会简要回顾系统,并着重介绍如**何通过配置文件创建工作流与在DS上调用SageMaker。** 系统展示 系统运行原理 - 选股逻辑 计算整个股票市场中符合五日均线高于10日均线信号的股票作为标的池, 对于上述股票池中的每个股票构建以下特征用于训练模型 1. 股价与布林带三条轨道的相对值 2. 股价与多条均线的相对值;多条均线的多个间隔的斜率 3. 当前K线的形态,用talib pattern计算,如是否为三只乌鸦,是否为十字星等,详情可见K线模式识别(https://www.jianshu.com/p/fd5c7f49db33) 备注:你也可以加上任何你认为有用的信息作为特征 - 模型训练 以第二天是否上涨进行二分类,使用SageMaker的Pipeline中的训练节点进行训练,每天构建120天的数据集,其中后7天用于评估。 - 任务调度 系统中所有的任务调度均使用DolphinScheduler完成。 DolphinScheduler可以定时**每天晚上自动更新模型,并上线进行预测**。在当天模型表现不好时,可以调参调特征一键重新训练新的模型并评估上线。 还有其对任务的容错机制,可以保证系统能够稳定运行。 - 前端展示 前端展示使用Observable实现,得益于其Notebook丰富和易用的可视化数据分析特性,构建出监控实时选股系统的效果监控。 *项目地址* 项目中涉及到的工作流任务的实现可以在这里找到 GitHub - DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis/tree/sagemaker), 切换至SageMaker 分支。 ``` git clone https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis cd DolphinScheduler-MLOps-Stock-Analysis git checkout sagemaker ``` 由于前端展示方便与之前无异,因此主要介绍选股系统工作流介绍。 ### 选股系统工作流介绍 主要包含三个子工作流: - `prepare_datas` : 每日数据下载,信号计算,特征(在量化交易中称为因子)计算 - `train_model`:生成训练数据,调用SageMaker Pipeline更新模型,对模型进行评估与批量数据推理 - `recommend_stock`:将SageMaker Pipeline推理的数据下载下来后录入数据库进行股票推荐。 ![image.png](https://dev-media.amazoncloud.cn/70c5050d19474f23b51784f732300c14_image.png "image.png") 以下是几个子工作流的介绍: *Prepare datas* 下图所示是数据准备的工作流 `prepare_datas`,该工作流会下载股票数据并进行信号计算和特征计算 - `download_data `: 下载全市场的股票日线数据 - `calc_signals`:进行信号计算(计算每天符合信号条件的股票,如每天5日均线与10日均线金叉的股票) - `calc_features`: 特征(量化交易中称为因子)计算(计算每个股票每天的特征值,如收盘价与5日均线的相对值,股票是否是十字星形态等) ![image.png](https://dev-media.amazoncloud.cn/c683a3398e364ad1bb48a7e5db296963_image.png "image.png") 上面定义了工作流prepare_datas,包含三个任务。 *training_model* 下图为工作流 `training_model `的各个任务的DAG图,该工作流主要执行以下任务 - `prepare_trainging_data` : 准备模型训练数据,并上传至[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)中(后会交给SageMaker进行数据转换成训练集,验证集和测试集) - `prepare_inference_data `: 准备需要预测的股票数据,并上传至[Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)中 -` sagemaker`:执行SageMaker Pipeline ![image.png](https://dev-media.amazoncloud.cn/5d6996c1ecf34266ab1a9c29e29e27c1_image.png "image.png") *SageMaker* 其中sagemaker组件中的任务定义如下,JSON数据与AWS SageMaker pipeline启动数据格式一致,包含pipeline的名字和入参等。 ![image.png](https://dev-media.amazoncloud.cn/a0ab5b36a0cb42428fe2fbafe1d33772_image.png "image.png") 该任务中,会启动一个SageMaker Pipeline的执行,并持续跟踪执行的情况,直至其完成。 其他的信息可见DolphinScheduler SageMaker组件文档(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/sagemaker.html) Pipeline主要有4个步骤: - StrockProcess: 处理数据集成为SageMaker内置算法适配的格式,并切分为训练集,验证集和测试集 - StockTrain: 训练模型 - StockEval: 评估模型 - StockInference: 使用模型进行批量股票预测 Pipeline定义的notebook可见: DolphinScheduler-MLOps-Stock-Analysis/pipeline_stock.ipynb at sagemaker(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis/blob/sagemaker/pipeline_stock.ipynb) 在[Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) Studio中可以查看: ![image.png](https://dev-media.amazoncloud.cn/d5a9dcc13c0547a2b25eabcb4524c997_image.png "image.png") *recommend_stoc*k YAML 文件定义如下: ![image.png](https://dev-media.amazoncloud.cn/aa4111879b444d108faaee8629f5adea_image.png "image.png") 该工作流目前只有一个任务,DS中显示如下: ![image.png](https://dev-media.amazoncloud.cn/67bfd9c2938a49109e498ea64b278581_image.png "image.png") *run_system* 现在我们需要将三个工作流合并成一个名为run_system的工作流,即我们展示的 ![image.png](https://dev-media.amazoncloud.cn/bebd453fc4ee4b4a842f5f200cd22df0_image.png "image.png") 定义YAML文件: 其中 - $ENV{STOCK_PROJECT} 语法表示会将环境变量STOCK_PROJECT进行填充 - $WORKFLOW{"prepare_datas.yaml"}语法表示创建 prepare_datas.yaml 中的工作流并引用其名字,填充进来作为子工作流。 ![image.png](https://dev-media.amazoncloud.cn/ec60cfdd409a42ab8ef0c34e59a38c68_image.png "image.png") *创建并执行工作流* 我们创建一个sh文件 pydolphin_init.sh如下: ``` # /bin/bash # init config user=Stock-Analysis password=123456 tenant=\$USER project_name=pydolphin api_address=127.0.0.1 api_port=25333 pydolphinscheduler config --set java_gateway.address \$api_address pydolphinscheduler config --set java_gateway.port \$api_port pydolphinscheduler config --set default.user.name \$user pydolphinscheduler config --set default.user.password \$password pydolphinscheduler config --set default.user.tenant \$tenant pydolphinscheduler config --set default.workflow.user \$user pydolphinscheduler config --set default.workflow.tenant \$tenant pydolphinscheduler config --set default.workflow.project \$project_name pydolphinscheduler config --set default.workflow.queue default # 以上配置为pydolphinscheduler相关配置,包含ds地址,端口配置,以及租户,用户,项目的创建(若不存在) # 以下为创建工作流的命令 # 配置环境变量,将会在yaml中引用 export STOCK_PROJECT=\$(pwd) # 创建工作流 pydolphinscheduler yaml -f pyds/run_system.yaml 我们就可以运行该脚本来启动选股系统项目了。 ``` ### 项目实操 *DolphinScheduler安装与启动* - 安装dolphinscheduler **下载安装包:** apache-dolphinscheduler-3.1.0-bin.tar.gz(https://www.apache.org/dyn/closer.lua/dolphinscheduler/3.1.0/apache-dolphinscheduler-3.1.0-bin.tar.gz) **Standalone启动详情可见**(https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/installation/standalone.html) 解压: ``` tar -zxvf apache-dolphinscheduler-3.1.0-bin.tar.gz cd apache-dolphinscheduler-3.1.0-bin ``` 修改配置`standalone-server/conf/common.properties `中以下字段添加 aws 密钥,用于SageMaker组件的身份验证 ``` # The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required resource.aws.access.key.id=<YOUR AWS ACCESS KEY> # The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required resource.aws.secret.access.key=<YOUR AWS SECRET KEY> # The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required resource.aws.region=<AWS REGION> ``` - 启动DolphinScheduler ``` bash bin/dolphinscheduler-daemon.sh start standalone-server ``` - 登录 DolphinScheduler 浏览器访问地址 http://localhost:12345/dolphinscheduler/ui 即可登录系统UI。默认的用户名和密码是 **admin/dolphinscheduler123** **DolphinScheduler-MLOps-Stock-Analysis** 现在我们拉取选股系统的代码,并切换至sagemaker分支 ``` git clone https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis cd DolphinScheduler-MLOps-Stock-Analysis git checkout sagemaker ``` 修改一下dmsa/db.py中的mysql配置,来保存信号特征和股票推荐结果 ``` class CONFIG: MYSQL_USER = 'root' MYSQL_PASSWORD = '123456' MYSQL_HOST = 'xxxxxxxxxxxxxxxx' MYSQL_PORT = 3306 MYSQL_DATABASE = 'dolphinscheduler_mlops_stock' ``` 因为需要将数据上传到AWS S3,需要配置一下文件 `~/.aws/config` ``` [default] aws_access_key_id = <YOUR AWS ACCESS KEY> aws_secret_access_key = <YOUR AWS SECRET KEY> region = <YOUR AWS SECRET KEY> ``` **准备Python环境** ``` virtualenv -p /usr/bin/python3 env source env/bin/activate pip install -r requirements.txt ``` **安装PyDolphinScheduler:** `python -m pip install apache-dolphinscheduler`,可以在任意python环境中安装,实际任务执行无需pydolphinscheduler,pydolphinscheduler用于快速使用code的形式提交工作流。 然后执行命令` bash pydolphin_init.sh `即可。 如果想要快速运行,可以在` pyds/prepare_datas.yaml`中的 `python -m dmsa.data.download \${data_path} `后面加上200,表示只用200个股票进行pipeline的执行 ![image.png](https://dev-media.amazoncloud.cn/5da245369fd64c01bdb571dc12c39848_image.png "image.png") ## 总结 本文展示了在DolphinScheduler中使用yaml配置文件构建和执行工作流,并从中调用[Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) Pipeline提供[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)能力,构建[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)选股系统。希望能给大家带来以下收获:: 1. 了解如何使用DolphinScheduler构建一个选股系统。 2. 了解如何使用DolphinScheduler连接MLOps系统与其上下游任务。 3. 了解DolphinScheduler SageMaker 组件的使用。 4. 了解DolphinScheduler 通过配置文件管理工作流的实践。 打个广告: - 如果你对DolphinScheduler构建[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)系统感兴趣可以加入社区一起参与交流 - 如果你有DolphinScheduler有构建[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)系统或者调度[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)任务经验的同学也欢迎投稿
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭