> 文章作者:Bobilli Balwanth、Norbert Funke、Renuka Uttarala
由于不断变化的需求和现代化基础设施的动态性质,为大型应用程序规划容量可能会非常困难。例如,传统的反应式方法依赖于某些 DevOps 指标(如 CPU 和内存)的静态阈值,而这些指标在这样的环境中并不足以解决问题。在这篇文章中,我们展示了如何使用 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 内置算法,对存储在 [Amazon Timestream](https://aws.amazon.com/cn/timestream/?trk=cndc-detail) 中的汇总 DevOps 数据(CPU、内存、每秒交易量)进行预测分析。这样可以实现主动式容量规划,防止潜在的业务中断。通过这种方法,您可以使用 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail),对存储在 [Amazon Timestream](https://aws.amazon.com/cn/timestream/?trk=cndc-detail) 中的任何时间序列数据运行[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)。
[Amazon Timestream](https://aws.amazon.com/cn/timestream/?trk=cndc-detail) 是一种快速、可扩展且[无服务器](https://aws.amazon.com/cn/serverless/?trk=cndc-detail)的时间序列数据库服务,可轻松存储和分析每天数万亿个事件。[Amazon Timestream](https://aws.amazon.com/cn/timestream/?trk=cndc-detail) 会自动纵向扩展或缩减以调整容量和性能,因此您无需管理底层基础设施。
[Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 是一项完全托管式[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)(ML)服务。借助 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail),数据科学家和开发人员可以轻松快速地构建和训练[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)模型,然后直接将其部署到生产就绪型托管环境中。它提供了集成的 Jupyter 创作 Notebook 实例,可快速访问数据来源以进行探索和分析,因此您无需管理服务器。它还提供了常见的[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)算法,这些算法经过优化,可高效地运行在分布式环境中的超大量数据上。
### **解决方案概览**
DevOps 团队可以使用 Timestream 存储指标、日志和其它时间序列数据。然后,您可以查询这些数据,深入了解系统的行为。Timestream 能够以低延迟处理大量传入数据,这使团队能够执行实时分析。DevOps 团队可以实时分析性能指标和其它运营数据,以便快速制定决策。
以下参考架构展示了如何将 Timestream 用于 DevOps 应用场景。
![image.png](https://dev-media.amazoncloud.cn/a6b94eaca73a4c5695114c410187a4ab_image.png "image.png")
解决方案包含以下关键组件:
* 遥测数据,来自云端和本地运行的应用程序和服务器,使用开源收集代理(如 Prometheus 和 Telegraf)摄入。
* 数据也可以摄取自流式服务,例如 [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/cn/msk/?trk=cndc-detail) (Amazon MSK) 和 [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail),使用[适用于 Apache Flink 的亚马逊托管服务](https://aws.amazon.com/cn/managed-service-apache-flink/?trk=cndc-detail)摄入。
* 摄取数据后,您可以使用可视化工具来分析数据和生成控制面板,这些工具包括 Grafana 和 [Amazon QuickSight](https://aws.amazon.com/cn/quicksight/?trk=cndc-detail)(有关详细信息,请参阅 [Amazon QuickSight](https://aws.amazon.com/cn/quicksight/?trk=cndc-detail):
https\://docs.aws.amazon.com/timestream/latest/developerguide/Quicksight.html?trk=cndc-detail), 以及其它使用 JBDC 和 ODBC 驱动程序的工具。
* [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 用于运行预测分析。有关更多详细信息,请参阅 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail):
https\://docs.aws.amazon.com/timestream/latest/developerguide/Sagemaker.html?trk=cndc-detail
### **先决条件**
要理解这篇文章,您应该熟悉 Timestream、[Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail)、[Amazon Simple Storage Service](https://aws.amazon.com/cn/s3/?trk=cndc-detail) ([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail))、Amazon Identity and Access Management (IAM) 和 Python 的关键概念。这篇文章还包括一个动手试验室,使用 Amazon CloudFormation 模板和 Jupyter Notebook 预置,并与相关 Amazon 服务交互。这需要一个具有必要 IAM 权限的 Amazon 账户。
### **启动动手实验室**
完成以下步骤以启动动手实验室:
1. 启动 CloudFormation 模板:
https\://console.aws.amazon.com/cloudformation/home?#/stacks/create/review?templateURL=https\://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/DBBLOG-3596/predictive_analytics.yaml?trk=cndc-detail
**注意**:此解决方案创建的亚马逊云科技资源会在账户中产生费用,请务必在完成后删除堆栈。
2. 提供堆栈名称,将所有其它选项保留为默认值。此堆栈创建 Timestream 数据库和表,并提取汇总 DevOps 数据示例。它还会创建一个 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) Notebook 实例和 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 存储桶。
3. 堆栈完成后,记下 Notebook 实例和 S3 存储桶的名称,这些信息在 Amazon CloudFormation 控制台上堆栈的输出选项卡中列出。
我们使用 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) Notebook 实例来准备来自 Timestream 的数据、训练[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)模型和运行预测。
4. 要访问 Notebook 实例,请导航到 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 控制台,然后在导航窗格中选择 **Notebook 实例**。
5. 打开由 CloudFormation 堆栈创建的实例。
6. 当 Notebook 的状态为**正在使用**时,选择**打开 Jupyter**。
以下示例显示了一个名为 TimeseriesDataAnalysis 的 Notebook 实例。
![image.png](https://dev-media.amazoncloud.cn/8be66a897fe44524b3b084cc853f6092_image.png "image.png")
<!--StartFragment-->
7. 选择
timestream_predictive_analysis.ipynb 并将其标记为可信。
![image.png](https://dev-media.amazoncloud.cn/9477d0f0b90342c79b63b58417d2937f_image.png "image.png")
![image.png](https://dev-media.amazoncloud.cn/7c7c7e5f65fb419fa25e8263fe4f8fcf_image.png "image.png")
### **准备数据用于分析**
现在,您可以运行 Notebook 中的单元格,来开始分析数据并准备数据用于训练。请完成以下步骤:
1. 以下代码设置 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 会话并创建 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 和 Timestream 客户端。它还安装 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) Data Wrangler 库,该库将 Pandas 库的功能扩展到亚马逊云科技,连接 DataFrames 与亚马逊云科技数据和分析服务,从而为 Timestream 和许多其它亚马逊云科技服务提供快速集成。
```
import time
import numpy as np
import pandas as pd
import json
import matplotlib.pyplot as plt
import boto3
import sagemaker
from sagemaker import get_execution_role
from IPython import display
%pip install awswrangler
import awswrangler as wr
np.random.seed(1)
# 设置 Sagemaker 会话
prefix = "sagemaker/DEMO-deepar"
sagemaker_session = sagemaker.Session()
role = get_execution_role()
bucket = sagemaker_session.default_bucket()
# 设置 S3 存储桶路径来上传训练数据集
s3_data_path = f"{bucket}/{prefix}/data"
s3_output_path = f"{bucket}/{prefix}/output"
print(s3_data_path)
print(s3_output_path)
# 设置 S3 客户端
s3_client = boto3.client('s3')
# Timestream 配置。
DB_NAME = "Demo_Predictive_Analysis" # <--- 指定在 Amazon Timestream 中创建的数据库
TABLE_NAME = "Telemetry_Aggregated_Data" # <--- 指定在 Amazon Timestream 中创建的表
timestream_client = boto3.client('timestream-query')
```
2. 在此步骤结束时,记下 S3 存储桶路径的输出。
分析完成后,您可以删除这些存储桶
3. 从 Timestream 查询数据:
```
query = """
SELECT *
FROM "Demo_Predictive_Analysis"."Telemetry_Aggregated_Data"
"""
result = wr.timestream.query(sql=query,pagination_config={'PageSize': 1000})
display.display(result)
```
![image.png](https://dev-media.amazoncloud.cn/8dee8d2a6d954dedb66f2b92e31e3d89_image.png "image.png")
4. 可视化时间序列数据:
```
labels = ['cpu', 'memory', 'tps']
cpu_series = pd.Series(data = result['cpu_avg'].values, index = pd.to_datetime(result['time']))
memory_series = pd.Series(data = result['memory_avg'].values, index = pd.to_datetime(result['time']))
tps_series = pd.Series(data = result['tps_avg'].values, index = pd.to_datetime(result['time']))
## 收集列表中的所有序列
time_series = []
time_series.append(cpu_series)
time_series.append(memory_series)
time_series.append(tps_series)
for k in range(len(time_series)):
print(f'-------------------------------------------\\n\\tGraph {labels[k]}')
time_series[k].plot(label = labels[k])
plt.legend(loc='lower right')
plt.show()
```
以下是 CPU 使用率图。
![image.png](https://dev-media.amazoncloud.cn/841814380f37420da8746cbd70cc184d_image.png "image.png")
以下是内存使用情况图。
![image.png](https://dev-media.amazoncloud.cn/ce11bc5867d24b3082881ca7400ab173_image.png "image.png")
以下是每秒事务数(TPS,Transactions Per Second)图。
<!--StartFragment-->
![image.png](https://dev-media.amazoncloud.cn/2a2510d1bfc5406ea7e3afb7fc29efa5_image.png "image.png")
解决方案使用 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) DeepAR 预测算法,之所以选择该算法,是因为它使用循环神经网络(RNN,Recurrent Neural Network)来高效地预测一维时间序列数据。DeepAR 因其适应不同时间序列模式的能力而脱颖而出,这些特性使其成为一种多功能且强大的选择。它采用有监督学习方法,使用已标注的历史数据进行训练,并利用 RNN 架构的优势来捕获顺序数据中的时间依赖关系。
5. 使用以下 DeepAR 超参数来初始化[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)实例:
```
freq = "H" ## 时间,以小时为单位
prediction_length = 48
context_length = 72
data_length = 400
num_ts = 2
period = 24
hyperparameters = {
"time_freq": freq,
"context_length": str(context_length),
"prediction_length": str(prediction_length),
"num_cells": "40",
"num_layers": "3",
"likelihood": "gaussian",
"epochs": "20",
"mini_batch_size": "32",
"learning_rate": "0.001",
"dropout_rate": "0.05",
"early_stopping_patience": "10",
}
```
查看之前的图表,您会发现所有三个指标的模式看起来都相似。因此,我们只使用 CPU 指标进行训练。但是,我们可以使用训练后的模型来预测 CPU 之外的其它指标。如果数据模式不同,那么我们必须分别训练每个数据集并相应进行预测。
我们有大约 16 天的 24 小时周期内的数据。我们使用前 14 天的数据,在 3 天(72 小时)的上下文窗口中训练模型,并使用最后 2 天(48 小时)来测试我们的预测。
6. 训练数据是数据的前面部分,截止到最近 2 天(48 小时):
```
time_series_training = []
for ts in time_series:
time_series_training.append(ts[:-prediction_length])
time_series[0].plot(label="test", title = "cpu")
time_series_training[0].plot(label="train", ls=":")
plt.legend()
plt.show()
```
下图显示了数据并将其与测试数据叠加显示。
![image.png](https://dev-media.amazoncloud.cn/ea7eaec4f8844c17b7af5f797decc829_image.png "image.png")
7. 下一步根据 DeepAR 输入格式对数据进行格式化,以便将数据用于训练模型。然后,该步骤将数据集保存到 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)。
```
def series_to_obj(ts, cat=None):
obj = {"start": str(ts.index[0]), "target": list(ts)}
if cat is not None:
obj["cat"] = cat
return obj
def series_to_jsonline(ts, cat=None):
return json.dumps(series_to_obj(ts, cat))
encoding = "utf-8"
FILE_TRAIN = "train.json"
FILE_TEST = "test.json"
with open(FILE_TRAIN, "wb") as f:
for ts in time_series_training:
f.write(series_to_jsonline(ts).encode(encoding))
f.write("\\n".encode(encoding))
with open(FILE_TEST, "wb") as f:
for ts in time_series:
f.write(series_to_jsonline(ts).encode(encoding))
f.write("\\n".encode(encoding))
s3 = boto3.client("s3")
s3.upload_file(FILE_TRAIN, bucket, prefix + "/data/train/" + FILE_TRAIN)
s3.upload_file(FILE_TEST, bucket, prefix + "/data/test/" + FILE_TRAIN)
```
您可以导航到 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 控制台,然后查看先前创建的存储桶(例如 s3://sagemaker-\<region>-\<account_number>/sagemaker/DEMO-deepar/data)来验证文件 test.json 和 train.json。
### **使用 DeepAR 预测算法训练模型**
此步骤使用通用估计器训练模型。它使用包含 DeepAR 算法的 SageMaker 镜像启动[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)实例(实例类型为 ml.c4.xlarge):
```
image_uri = sagemaker.image_uris.retrieve("forecasting-deepar", boto3.Session().region_name)
estimator = sagemaker.estimator.Estimator(
sagemaker_session=sagemaker_session,
image_uri=image_uri,
role=role,
instance_count=1,
instance_type="ml.c4.xlarge",
base_job_name="DEMO-deepar",
output_path=f"s3://{s3_output_path}",
)
estimator.set_hyperparameters(**hyperparameters)
data_channels = {"train": f"s3://{s3_data_path}/train/", "test": f"s3://{s3_data_path}/test/"}
estimator.fit(inputs=data_channels)
```
等待模型训练完成(大约 5 分钟),然后再运行预测。
当训练作业完成后,您将看到以下响应。
![image.png](https://dev-media.amazoncloud.cn/0f7f4cd019e1472c8286a48573851344_image.png "image.png")
### **生成预测见解**
当模型训练阶段成功完成后,下一步就是通过部署端点来启动预测实例。
1. 使用以下代码部署端点:
```
job_name = estimator.latest_training_job.name
endpoint_name = sagemaker_session.endpoint_from_job(
job_name=job_name,
initial_instance_count=1,
instance_type="ml.m4.xlarge",
image_uri=image_uri,
role=role,
)
```
启动实例可能需要一段时间。最初,输出中只显示一个连字符(–)。等到状态行以感叹号(!)结尾。
![image.png](https://dev-media.amazoncloud.cn/4efa3cdf98664010b7b471230f5fda57_image.png "image.png")
2. 使用以下帮助程序类来运行预测:
```
class DeepARPredictor(sagemaker.predictor.RealTimePredictor):
def set_prediction_parameters(self, freq, prediction_length):
"""设置时间频率和预测长度参数。**必须** 调用此方法
然后才能使用“predict”。
Parameters:
freq -- 表示时间频率的字符串
prediction_length -- 整数,预测的时间点数量
返回值:无。
"""
self.freq = freq
self.prediction_length = prediction_length
def predict(
self,
ts,
cat=None,
encoding="utf-8",
num_samples=100,
quantiles=["0.1", "0.5", "0.9"],
content_type="application/json",
):
"""请求对“ts”中列出的时间序列进行预测,每个时间序列带有(可选)
对应的类别,在“cat”中列出。
Parameters:
ts -- “Pandas.Series”对象列表,要预测的时间序列
cat -- 整数列表(默认值:无)
encoding -- 字符串,用于请求的编码(默认值:“utf-8”)
num_samples -- 整数,预测时要计算的样本数(默认值:100)
quantiles -- 指定要计算的分位数的字符串列表(默认值:["0.1"、“0.5"、“0.9"])
返回值:“pandas.DataFrame”对象的列表,每个对象中包含预测
"""
prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
req = self.__encode_request(ts, cat, encoding, num_samples, quantiles)
res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": content_type})
return self.__decode_response(res, prediction_times, encoding)
def __encode_request(self, ts, cat, encoding, num_samples, quantiles):
instances = [series_to_obj(ts[k], cat[k] if cat else None) for k in range(len(ts))]
configuration = {
"num_samples": num_samples,
"output_types": ["quantiles"],
"quantiles": quantiles,
}
http_request_data = {"instances": instances, "configuration": configuration}
return json.dumps(http_request_data).encode(encoding)
def __decode_response(self, response, prediction_times, encoding): response_data = json.loads(response.decode(encoding))
list_of_df = []
for k in range(len(prediction_times)):
prediction_index = pd.date_range(
start=prediction_times[k], freq=self.freq, periods=self.prediction_length
)
list_of_df.append(
pd.DataFrame(
data=response_data["predictions"][k]["quantiles"], index=prediction_index
)
)
return list_of_df
predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
predictor.set_prediction_parameters(freq, prediction_length)
list_of_df = predictor.predict(time_series_training[:3], content_type="application/json")
actual_data = time_series[:3]
```
3. 最后,您可以将结果可视化:
```
for k in range(len(list_of_df)):
print(f'-------------------------------------------\\n\\tPrediction {labels[k]}')
plt.figure(figsize=(12, 6))
actual_data[k][-prediction_length - context_length :].plot(label=f'target - {labels[k]}')
p10 = list_of_df[k]["0.1"]
p90 = list_of_df[k]["0.9"]
plt.fill_between(p10.index, p10, p90, color="y", alpha=0.5, label="80% confidence interval")
list_of_df[k]["0.5"].plot(label="prediction median")
plt.legend()
plt.show()
```
下图显示了我们的 CPU 预测。
![image.png](https://dev-media.amazoncloud.cn/d1fc2a55c4d142a5a732576e0238614d_image.png "image.png")
下图显示了我们的内存预测。
![image.png](https://dev-media.amazoncloud.cn/59a1711ee0404dce9856f38ab6f5d2c2_image.png "image.png")
下图显示了我们的 TPS 预测。
![image.png](https://dev-media.amazoncloud.cn/2dd935faf88f4bedbf371f02cb6ad14b_image.png "image.png")
4. 删除端点
```
sagemaker_session.delete_endpoint(endpoint_name)
```
我们的预测结果与测试数据非常吻合,可以使用这些预测数据来规划容量。您可以按照本文中的步骤,无缝地扩展此解决方案,用于预测存储在 Timestream 中的其它时间序列数据。如果用户希望在现实场景中对一系列时间序列数据集进行准确预测,可以使用这种灵活且适用的解决方案。
### **Timestream 中的汇总**
通常,最佳做法是在训练模型之前,以较低的频率汇总时间序列数据。使用原始数据会使模型运行缓慢且导致准确性降低。
使用 Timestream 计划查询功能,您可以汇总数据并将数据存储在不同的 Timestream 表中。您可以为业务报告使用计划查询,汇总应用程序中的最终用户活动,因此您可以训练[机器学习](https://aws.amazon.com/cn/machine-learning/?trk=cndc-detail)模型来提供个性化的数据。您还可以使用计划查询提供警报,检测异常、网络入侵或欺诈活动,这样就可以立即采取补救措施。以下是 SQL 查询示例,该查询可以作为计划查询运行,以在 1 小时的时间间隔内汇总/向上采样数据:
```
select
microservice_name,
region,
'aggregate_host_metric' as measure_name,
bin(time, 1h) as time,
round(avg(memory),2) as memory_avg,
round(avg(tps),2) as tps_avg,
round(avg(cpu),2) as cpu_avg
from “Demo”.”source_metrics”
group by microservice_name, region, bin(time, 1h)
```
### **清理**
为避免产生费用,请使用亚马逊云科技管理控制台删除您在运行本练习时创建的资源:http\://aws.amazon.com/console?trk=cndc-detail
1. 删除在 CloudFormation 堆栈之外创建的 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 资源和 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 存储桶。
2. 清空创建的 [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) 存储桶,这样在删除堆栈时就不会遇到问题。
3. 删除为此解决方案创建的 CloudFormation 堆栈。
### **结论**
在这篇文章中,我们向您演示了如何使用 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) DeepAR 算法,对存储在 Timestream 中的 DevOps 时间序列数据运行预测分析,用于改善容量规划。通过结合 [Amazon SageMaker](https://aws.amazon.com/cn/sagemaker/?trk=cndc-detail) 和 Timestream 的功能,您可以对时间序列数据集进行预测并获得宝贵的见解。
有关时间流聚合的更多信息,请参阅 Queries with aggregate functions(使用聚合函数进行查询):
https\://docs.aws.amazon.com/timestream/latest/developerguide/sample-queries.iot-scenarios.html?trk=cndc-detail
有关高级时间序列分析函数,请参阅 Time-series functions(时间序列函数):
https\://docs.aws.amazon.com/timestream/latest/developerguide/timeseries-specific-constructs.functions.html?trk=cndc-detail
要了解有关使用 DeepAR 算法的更多信息,请参阅使用 DeepAR 算法的最佳实践:
https\://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html#deepar_best_practices?trk=cndc-detail