Build a predictive maintenance solution with Amazon Kinesis, Amazon Glue, and Amazon SageMaker

海外精选
海外精选的内容汇集了全球优质的亚马逊云科技相关技术内容。同时,内容中提到的“AWS” 是 “Amazon Web Services” 的缩写,在此网站不作为商标展示。
0
0
{"value":"Organizations are increasingly building and using machine learning (ML)-powered solutions for a variety of use cases and problems, including predictive maintenance of machine parts, product recommendations based on customer preferences, credit profiling, content moderation, fraud detection, and more. In many of these scenarios, the effectiveness and benefits derived from these ML-powered solutions can be further enhanced when they can process and derive insights from data events in near-real time.\n\nAlthough the business value and benefits of near-real-time ML-powered solutions are well established, the architecture required to implement these solutions at scale with optimum reliability and performance is complicated. This post describes how you can combine [Amazon Kinesis](https://aws.amazon.com/kinesis/), [AWS Glue](https://aws.amazon.com/glue/), and [Amazon SageMaker](https://aws.amazon.com/sagemaker/) to build a near-real-time feature engineering and inference solution for predictive maintenance.\n\n#### **Use case overview**\nWe focus on a predictive maintenance use case where sensors deployed in the field (such as industrial equipment or network devices), need to replaced or rectified before they become faulty and cause downtime. Downtime can be expensive for businesses and can lead to poor customer experience. Predictive maintenance powered by an ML model can also help in augmenting the regular schedule-based maintenance cycles by informing when a machine part in good condition should not be replaced, therefore avoiding unnecessary cost.\n\nIn this post, we focus on applying machine learning to a synthetic dataset containing machine failures due to features such as air temperature, process temperature, rotation speed, torque, and tool wear. The dataset used is sourced from the [UCI Data Repository](https://archive.ics.uci.edu/ml/datasets/AI4I+2020+Predictive+Maintenance+Dataset).\n\nMachine failure consists of five independent failure modes:\n- Tool Wear Failure (TWF)\n- Heat Dissipation Failure (HDF)\n- Power Failure (PWF)\n- Over-strain Failure (OSF)\n- Random Failure (RNF)\n\nThe machine failure label indicates whether the machine has failed for a particular data point if any of the preceding failure modes are true. If at least one of the failure modes is true, the process fails and the machine failure label is set to 1. The objective for the ML model is to identify machine failures correctly, so a downstream predictive maintenance action can be initiated.\n\n#### **Solution overview**\nFor our predictive maintenance use case, we assume that device sensors stream various measurements and readings about machine parts. Our solution then takes a slice of streaming data each time (micro-batch), and performs processing and feature engineering to create features. The created features are then used to generate inferences from a trained and deployed ML model in near-real time. The generated inferences can be further processed and consumed by downstream applications, to take appropriate actions and initiate maintenance activity.\n\nThe following diagram shows the architecture of our overall solution.\n\n![image.png](https://dev-media.amazoncloud.cn/0115f0c5913a4743928f1c98e811b019_image.png)\n\nThe solution broadly consists of the following sections, which are explained in detail later in this post:\n\n- **Streaming data source and ingestion** – We use [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Streams to collect streaming data from the field sensors at scale and make it available for further processing.\n- **Near-real-time feature engineering** – We use [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming jobs to read data from a Kinesis data stream and perform data processing and feature engineering, before storing the derived features in [Amazon Simple Storage Service](http://aws.amazon.com/s3) ([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)). [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) provides a reliable and cost-effective option to store large volumes of data.\n- **Model training and deployment** – We use the AI4I predictive maintenance dataset from the UCI Data Repository to train an ML model based on the XGBoost algorithm using SageMaker. We then deploy the trained model to a SageMaker asynchronous inference endpoint.\n- **Near-real-time ML inference** – After the features are available in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), we need to generate inferences from the deployed model in near-real time. SageMaker asynchronous inference endpoints are well suited for this requirement because they support larger payload sizes (up to 1 GB) and can generate inferences within minutes (up to a maximum of 15 minutes). We use S3 event notifications to run an [AWS Lambda](http://aws.amazon.com/lambda) function to invoke a SageMaker asynchronous inference endpoint. SageMaker asynchronous inference endpoints accept S3 locations as input, generate inferences from the deployed model, and write these inferences back to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) in near-real time.\n\nThe source code for this solution is located on [GitHub](https://github.com/aws-samples/amazon-sagemaker-predictive-maintenance). The solution has been tested and should be run in us-east-1.\n\nWe use an [AWS CloudFormation](http://aws.amazon.com/cloudformation) template, deployed using [AWS Serverless Application Model](https://aws.amazon.com/serverless/sam/) (AWS SAM), and SageMaker notebooks to deploy the solution.\n\n#### **Prerequisites**\nTo get started, as a prerequisite, you must have the [SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html), [Python 3](https://realpython.com/installing-python/), and [PIP](https://pip.pypa.io/en/stable/installing/) installed. You must also have the [AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html) (AWS CLI) configured properly.\n\n#### **Deploy the solution**\nYou can use [AWS CloudShell](https://aws.amazon.com/cloudshell/) to run these steps. CloudShell is a browser-based shell that is pre-authenticated with your console credentials and includes pre-installed common development and operations tools (such as AWS SAM, AWS CLI, and Python). Therefore, no local installation or configuration is required.\n- We begin by creating an S3 bucket where we store the script for our [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming job. Run the following command in your terminal to create a new bucket:\n```\\naws s3api create-bucket --bucket sample-script-bucket-\$RANDOM --region us-east-1\\n```\n- Note down the name of the bucket created.\n\n![image.png](https://dev-media.amazoncloud.cn/127d26b0a9a74c7ebbca7a9e3239fafb_image.png)\n\n- Next, we clone the code repository locally, which contains the CloudFormation template to deploy the stack. Run the following command in your terminal:\n```\\ngit clone https://github.com/aws-samples/amazon-sagemaker-predictive-maintenance\\n```\n- Navigate to the sam-template directory:\n\n```cd amazon-sagemaker-predictive-maintenance/sam-template\\n```\n![image.png](https://dev-media.amazoncloud.cn/a5e0593c243445cda6e427809006606a_image.png)\n\n- Run the following command to copy the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job script (from glue_streaming/app.py) to the S3 bucket you created:\n```\\naws s3 cp glue_streaming/app.py s3://sample-script-bucket-30232/glue_streaming/app.py\\n```\n![image.png](https://dev-media.amazoncloud.cn/ec22ad0f818a411cb318e0c89a10132d_image.png)\n\n- You can now go ahead with the build and deployment of the solution, through the CloudFormation template via AWS SAM. Run the following command:\n```\\nsam build\\n```\n![image.png](https://dev-media.amazoncloud.cn/d6f1c820feae4adbb69290c0abc91a56_image.png)\n\n```\\nsam deploy --guided\\n```\n- Provide arguments for the deployment such as the stack name, preferred AWS Region (```us-east-1```), and ```GlueScriptsBucket```.\nMake sure you provide the same S3 bucket that you created earlier for the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) script S3 bucket (parameter ```GlueScriptsBucket``` in the following screenshot).\n\n![image.png](https://dev-media.amazoncloud.cn/0088d8ba89df456eb78bf33c02282e0c_image.png)\n\nAfter you provide the required arguments, AWS SAM starts the stack deployment. The following screenshot shows the resources created.\n\n![image.png](https://dev-media.amazoncloud.cn/e970e76f17704b68955711d9b9999e5d_image.png)\n\nAfter the stack is deployed successfully, you should see the following message.\n\n![image.png](https://dev-media.amazoncloud.cn/ddd71ec35c314338b9a197c282ba996b_image.png)\n\n- On the [AWS CloudFormation](https://aws.amazon.com/cn/cloudformation/?trk=cndc-detail) console, open the stack (for this post, ```nrt-streaming-inference```) that was provided when deploying the CloudFormation template.\n- On the **Resources** tab, note the SageMaker notebook instance ID.\n\n![image.png](https://dev-media.amazoncloud.cn/994fa9ae733643a489a54be64faf9042_image.png)\n\n- On the SageMaker console, open this instance.\n\n![image.png](https://dev-media.amazoncloud.cn/df4a0ae1e5214cde9c5bbf65c6b90fe9_image.png)\n\nThe SageMaker notebook instance already has the required notebooks pre-loaded.\n\nNavigate to the notebooks folder and open and follow the instructions within the notebooks (```Data_Pre-Processing.ipynb``` and ```ModelTraining-Evaluation-and-Deployment.ipynb```) to explore the dataset, perform preprocessing and feature engineering, and train and deploy the model to a SageMaker asynchronous inference endpoint.\n\n![image.png](https://dev-media.amazoncloud.cn/f08bbb4161a34c6491badfaeb6692a3e_image.png)\n\n#### **Streaming data source and ingestion**\nKinesis Data Streams is a serverless, scalable, and durable real-time data streaming service that you can use to collect and process large streams of data records in real time. Kinesis Data Streams enables capturing, processing, and storing data streams from a variety of sources, such as IT infrastructure log data, application logs, social media, market data feeds, web clickstream data, IoT devices and sensors, and more. You can provision a Kinesis data stream in on-demand mode or provisioned mode depending on the throughput and scaling requirements. For more information, see [Choosing the Data Stream Capacity Mode](https://docs.aws.amazon.com/streams/latest/dev/how-do-i-size-a-stream.html).\n\nFor our use case, we assume that various sensors are sending measurements such as temperature, rotation speed, torque, and tool wear to a data stream. Kinesis Data Streams acts as a funnel to collect and ingest data streams.\n\nWe use the [Amazon Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html) (KDG) later in this post to generate and send data to a Kinesis data stream, simulating data being generated by sensors. The data from the data stream sensor-data-stream is ingested and processed using an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming job, which we discuss next.\n\n#### **Near-real-time feature engineering**\n[AWS Glue streaming jobs](https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html) provide a convenient way to process streaming data at scale, without the need to manage the compute environment. [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) allows you to perform extract, transform, and load (ETL) operations on streaming data using continuously running jobs. [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming ETL is built on the Apache Spark Structured Streaming engine, and can ingest streams from Kinesis, Apache Kafka, and [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/) (Amazon MSK).\n\nThe streaming ETL job can use both [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) built-in transforms and transforms that are native to Apache Spark Structured Streaming. You can also use the Spark ML and [MLLib](https://spark.apache.org/mllib/) libraries in [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) jobs for easier feature processing using readily available helper libraries.\n\nIf the schema of the streaming data source is pre-determined, you can specify it in an AWS Data Catalog table. If the schema definition can’t be determined beforehand, you can enable schema detection in the streaming ETL job. The job then automatically determines the schema from the incoming data. Additionally, you can use the [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) to allow central discovery, control, and evolution of data stream schemas. You can further integrate the Schema Registry with the Data Catalog to optionally use schemas stored in the Schema Registry when creating or updating [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) tables or partitions in the Data Catalog.\n\nFor this post, we create an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) Data Catalog table (```sensor-stream```) with our Kinesis data stream as the source and define the schema for our sensor data.\n\nWe create an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) dynamic dataframe from the Data Catalog table to read the streaming data from Kinesis. We also specify the following options:\n\n- A window size of 60 seconds, so that the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) job reads and processes data in 60-second windows\n- The starting position ```TRIM_HORIZON```, to allow reading from the oldest records in the Kinesis data stream\n\nWe also use Spark MLlib’s [StringIndexer](https://spark.apache.org/docs/latest/ml-features#stringindexer) feature transformer to encode the string column type into label indexes. This transformation is implemented using [Spark ML Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html). Spark ML Pipelines provide a uniform set of high-level APIs for ML algorithms to make it easier to combine multiple algorithms into a single pipeline or workflow.\n\nWe use the foreachBatch API to invoke a function named processBatch, which in turn processes the data referenced by this dataframe. See the following code:\n```\\n# Read from Kinesis Data Stream\\nsourceStreamData = glueContext.create_data_frame.from_catalog(database = \\"sensordb\\", table_name = \\"sensor-stream\\", transformation_ctx = \\"sourceStreamData\\", additional_options = {\\"startingPosition\\": \\"TRIM_HORIZON\\"})\\ntype_indexer = StringIndexer(inputCol=\\"type\\", outputCol=\\"type_enc\\", stringOrderType=\\"alphabetAsc\\")\\npipeline = Pipeline(stages=[type_indexer])\\nglueContext.forEachBatch(frame = sourceStreamData, batch_function = processBatch, options = {\\"windowSize\\": \\"60 seconds\\", \\"checkpointLocation\\": checkpoint_location})\\n```\nThe function processBatch performs the specified transformations and partitions the data in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) based on year, month, day, and batch ID.\n\nWe also re-partition the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) partitions into a single partition, to avoid having too many small files in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail). Having several small files can impede read performance, because it amplifies the overhead related to seeking, opening, and reading each file. We finally write the features to generate inferences into a prefix (features) within the S3 bucket. See the following code:\n```\\n# Function that gets called to perform processing, feature engineering and writes to S3 for every micro batch of streaming data from Kinesis.\\ndef processBatch(data_frame, batchId):\\ntransformer = pipeline.fit(data_frame)\\nnow = datetime.datetime.now()\\nyear = now.year\\nmonth = now.month\\nday = now.day\\nhour = now.hour\\nminute = now.minute\\nif (data_frame.count() > 0):\\ndata_frame = transformer.transform(data_frame)\\ndata_frame = data_frame.drop(\\"type\\")\\ndata_frame = DynamicFrame.fromDF(data_frame, glueContext, \\"from_data_frame\\")\\ndata_frame.printSchema()\\n# Write output features to S3\\ns3prefix = \\"features\\" + \\"/year=\\" + \\"{:0>4}\\".format(str(year)) + \\"/month=\\" + \\"{:0>2}\\".format(str(month)) + \\"/day=\\" + \\"{:0>2}\\".format(str(day)) + \\"/hour=\\" + \\"{:0>2}\\".format(str(hour)) + \\"/min=\\" + \\"{:0>2}\\".format(str(minute)) + \\"/batchid=\\" + str(batchId)\\ns3path = \\"s3://\\" + out_bucket_name + \\"/\\" + s3prefix + \\"/\\"\\nprint(\\"-------write start time------------\\")\\nprint(str(datetime.datetime.now()))\\ndata_frame = data_frame.toDF().repartition(1)\\ndata_frame.write.mode(\\"overwrite\\").option(\\"header\\",False).csv(s3path)\\nprint(\\"-------write end time------------\\")\\nprint(str(datetime.datetime.now()))\\n```\n#### **Model training and deployment**\nSageMaker is a fully managed and integrated ML service that enables data scientists and ML engineers to quickly and easily build, train, and deploy ML models.\n\nWithin the Data_Pre-Processing.ipynb notebook, we first import the AI4I Predictive Maintenance dataset from the UCI Data Repository and perform exploratory data analysis (EDA). We also perform feature engineering to make our features more useful for training the model.\n\nFor example, within the dataset, we have a feature named type, which represents the product’s quality type as L (low), M (medium), or H (high). Because this is categorical feature, we need to encode it before training our model. We use Scikit-Learn’s LabelEncoder to achieve this:\n```\\nfrom sklearn.preprocessing import LabelEncoder\\ntype_encoder = LabelEncoder()\\ntype_encoder.fit(origdf['type'])\\ntype_values = type_encoder.transform(origdf['type'])\\n```\nAfter the features are processed and the curated train and test datasets are generated, we’re ready to train an ML model to predict whether the machine failed or not based on system readings. We train a XGBoost model, using the SageMaker built-in algorithm. [XGBoost](https://xgboost.readthedocs.io/en/stable/) can provide good results for multiple types of ML problems, including classification, even when training samples are limited.\n\n[SageMaker training jobs](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html) provide a powerful and flexible way to train ML models on SageMaker. SageMaker manages the underlying compute infrastructure and provides [multiple options](https://docs.aws.amazon.com/sagemaker/latest/dg/algorithms-choose.html#algorithms-choose-implementation) to choose from, for diverse model training requirements, based on the use case.\n```\\nxgb = sagemaker.estimator.Estimator(container,\\nrole,\\ninstance_count=1,\\ninstance_type='ml.c4.4xlarge',\\noutput_path=xgb_upload_location,\\nsagemaker_session=sagemaker_session)\\nxgb.set_hyperparameters(max_depth=5,\\neta=0.2,\\ngamma=4,\\nmin_child_weight=6,\\nsubsample=0.8,\\nsilent=0,\\nobjective='binary:hinge',\\nnum_round=100)\\n\\nxgb.fit({'train': s3_train_channel, 'validation': s3_valid_channel})\\n```\nWhen the model training is complete and the model evaluation is satisfactory based on the business requirements, we can begin model deployment. We first create an endpoint configuration with the AsyncInferenceConfig object option and using the model trained earlier:\n```\\nendpoint_config_name = resource_name.format(\\"EndpointConfig\\")\\ncreate_endpoint_config_response = sm_client.create_endpoint_config(\\nEndpointConfigName=endpoint_config_name,\\nProductionVariants=[\\n{\\n\\"VariantName\\": \\"variant1\\",\\n\\"ModelName\\": model_name,\\n\\"InstanceType\\": \\"ml.m5.xlarge\\",\\n\\"InitialInstanceCount\\": 1,\\n}\\n],\\nAsyncInferenceConfig={\\n\\"OutputConfig\\": {\\n\\"S3OutputPath\\": f\\"s3://{bucket}/{prefix}/output\\",\\n#Specify Amazon SNS topics\\n\\"NotificationConfig\\": {\\n\\"SuccessTopic\\": \\"arn:aws:sns:<region>:<account-id>:<success-sns-topic>\\",\\n\\"ErrorTopic\\": \\"arn:aws:sns:<region>:<account-id>:<error-sns-topic>\\",\\n}},\\n\\"ClientConfig\\": {\\"MaxConcurrentInvocationsPerInstance\\": 4},\\n},)\\n```\nWe then create a SageMaker asynchronous inference endpoint, using the endpoint configuration we created. After it’s provisioned, we can start invoking the endpoint to generate inferences asynchronously.\n```\\nendpoint_name = resource_name.format(\\"Endpoint\\")\\ncreate_endpoint_response = sm_client.create_endpoint(\\nEndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)\\n```\n#### **Near-real-time inference**\nSageMaker [asynchronous inference](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference.html) endpoints provide the ability to queue incoming inference requests and process them asynchronously in near-real time. This is ideal for applications that have inference requests with larger payload sizes (up to 1 GB), may require longer processing times (up to 15 minutes), and have near-real-time latency requirements. Asynchronous inference also enables you to save on costs by auto scaling the instance count to zero when there are no requests to process, so you only pay when your endpoint is processing requests.\n\nYou can create a SageMaker asynchronous inference endpoint similar to how you create a real-time inference endpoint and additionally specify the ```AsyncInferenceConfig``` object, while creating your endpoint configuration with the EndpointConfig field in the CreateEndpointConfig API. The following diagram shows the inference workflow and how an asynchronous inference endpoint generates an inference.\n\n![image.png](https://dev-media.amazoncloud.cn/fcf53ab579104a3b8b1d3f84a17e7bd1_image.png)\n\n\nTo invoke the asynchronous inference endpoint, the request payload should be stored in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) and reference to this payload needs to be provided as part of the InvokeEndpointAsync request. Upon invocation, SageMaker queues the request for processing and returns an identifier and output location as a response. Upon processing, SageMaker places the result in the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) location. You can optionally choose to receive success or error notifications with [Amazon Simple Notification Service](http://aws.amazon.com/sns) ([Amazon SNS](https://aws.amazon.com/cn/sns/?trk=cndc-detail)).\n\n#### **Test the end-to-end solution**\nTo test the solution, complete the following steps:\n- On the [AWS CloudFormation](https://aws.amazon.com/cn/cloudformation/?trk=cndc-detail) console, open the stack you created earlier (```nrt-streaming-inference```).\n- On the **Outputs** tab, copy the name of the S3 bucket (```EventsBucket```).\n\nThis is the S3 bucket to which our [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming job writes features after reading and processing from the Kinesis data stream.\n\n![image.png](https://dev-media.amazoncloud.cn/fa9012ca7073418188087b86375c07ae_image.png)\n\nNext, we set up event notifications for this S3 bucket.\n\n- On the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) console, navigate to the bucket ```EventsBucket```.\n- On the**Properties** tab, in the **Event notifications** section, choose **Create event notification**.\n\n![image.png](https://dev-media.amazoncloud.cn/c6e4f31e0011417881fd4da33a7d47f9_image.png)\n\n![image.png](https://dev-media.amazoncloud.cn/d7156625b1d4495e8b58933a0742d6d9_image.png)\n\n- For **Event name**, enter ```invoke-endpoint-lambda```.\n- For **Prefix**, enter ```features/```.\n- For **Suffix**, enter ```.csv```.\n- For **Event types**, select **All object create events**.\n\n![image.png](https://dev-media.amazoncloud.cn/7fa38733b831486b965da332d9fb95b5_image.png)\n\n![image.png](https://dev-media.amazoncloud.cn/c184d6439dc246ae8229d6fac63a2efb_image.png)\n\n- For **Destination**, select **Lambda function**.\n- For **Lambda function**, and choose the function ```invoke-endpoint-asynch```.\n- Choose **Save changes**.\n\n![image.png](https://dev-media.amazoncloud.cn/7b0ac84e4c994e2082e6d0e44faba9b4_image.png)\n\n- On the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) console, open the job ```GlueStreaming-Kinesis-S3```.\n- Choose **Run job**.\n\n![image.png](https://dev-media.amazoncloud.cn/f7407eae16d64b8e80420b09bfbb79a8_image.png)\n\nNext we use the Kinesis Data Generator (KDG) to simulate sensors sending data to our Kinesis data stream. If this is your first time using the KDG, refer to [Overview](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) for the initial setup. The KDG provides a CloudFormation template to create the user and assign just enough permissions to use the KDG for sending events to Kinesis. Run the [CloudFormation template](https://aws-kdg-tools.s3.us-west-2.amazonaws.com/cognito-setup.json) within the AWS account that you’re using to build the solution in this post. After the KDG is set up, log in and access the KDG to send test events to our Kinesis data stream.\n\n- Use the Region in which you created the Kinesis data stream (us-east-1).\n- On the drop-down menu, choose the data stream ```sensor-data-stream```.\n- In the **Records per second** section, select **Constant** and enter 100.\n- Unselect **Compress Records**.\n- For **Record template**, use the following template:\n\n```\\n{\\n\\"air_temperature\\": {{random.number({\\"min\\":295,\\"max\\":305, \\"precision\\":0.01})}},\\n\\"process_temperature\\": {{random.number({\\"min\\":305,\\"max\\":315, \\"precision\\":0.01})}},\\n\\"rotational_speed\\": {{random.number({\\"min\\":1150,\\"max\\":2900})}},\\n\\"torque\\": {{random.number({\\"min\\":3,\\"max\\":80, \\"precision\\":0.01})}},\\n\\"tool_wear\\": {{random.number({\\"min\\":0,\\"max\\":250})}},\\n\\"type\\": \\"{{random.arrayElement([\\"L\\",\\"M\\",\\"H\\"])}}\\"\\n}\\n```\n- Click **Send data** to start sending data to the Kinesis data stream.\n\n![image.png](https://dev-media.amazoncloud.cn/c13379f92b704a19a1cc0a75bc3734ee_image.png)\n\nThe [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming job reads and extracts a micro-batch of data (representing sensor readings) from the Kinesis data stream based on the window size provided. The streaming job then processes and performs feature engineering on this micro-batch before partitioning and writing it to the prefix features within the S3 bucket.\n\nAs new features created by the [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming job are written to the S3 bucket, a Lambda function (```invoke-endpoint-asynch```) is triggered, which invokes a SageMaker asynchronous inference endpoint by sending an invocation request to get inferences from our deployed ML model. The asynchronous inference endpoint queues the request for asynchronous invocation. When the processing is complete, SageMaker stores the inference results in the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) location (```S3OutputPath```) that was specified during the asynchronous inference endpoint configuration.\n\nFor our use case, the inference results indicate if a machine part is likely to fail or not, based on the sensor readings.\n\n![image.png](https://dev-media.amazoncloud.cn/9e5b40a8b647438ba523bc528a5a381a_image.png)\n\nSageMaker also sends a success or error notification with [Amazon SNS](https://aws.amazon.com/cn/sns/?trk=cndc-detail). For example, if you set up an [email subscription](https://docs.aws.amazon.com/sns/latest/dg/sns-email-notifications.html#create-subscribe-endpoint-to-topic-console) for the success and error SNS topics (specified within the asynchronous SageMaker inference endpoint configuration), an email can be sent every time an inference request is processed. The following screenshot shows a sample email from the SNS success topic.\n\n![image.png](https://dev-media.amazoncloud.cn/577b48e3239b49b8ba02f736eca7635f_image.png)\n\nFor real-world applications, you can integrate SNS notifications with other services such as [Amazon Simple Queue Service](https://aws.amazon.com/sqs/) ([Amazon SQS](https://aws.amazon.com/cn/sqs/?trk=cndc-detail)) and Lambda for additional postprocessing of the generated inferences or integration with other downstream applications, based on your requirements. For example, for our predictive maintenance use case, you can invoke a Lambda function based on an SNS notification to read the generated inference from [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), further process it (such as aggregation or filtering), and initiate workflows such as sending work orders for equipment repair to technicians.\n\n#### **Clean up**\nWhen you’re done testing the stack, delete the resources (especially the Kinesis data stream, Glue streaming job, and SNS topics) to avoid unexpected charges.\n\nRun the following code to delete your stack:\n```\\nsam delete nrt-streaming-inference\\n```\nAlso delete the resources such as SageMaker endpoints by following the cleanup section in the ModelTraining-Evaluation-and-Deployment notebook.\n\n#### **Conclusion**\nIn this post, we used a predictive maintenance use case to demonstrate how to use various services such as Kinesis, [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail), and SageMaker to build a near-real-time inference pipeline. We encourage you to try this solution and let us know what you think.\n\nIf you have any questions, share them in the comments.\n\n##### **About the authors**\n\n![image.png](https://dev-media.amazoncloud.cn/85ae57bdd192434297b2db00dd04f3b5_image.png)\n\n**Rahul Sharma** is a Solutions Architect at AWS Data Lab, helping AWS customers design and build AI/ML solutions. Prior to joining AWS, Rahul has spent several years in the finance and insurance sector, helping customers build data and analytical platforms.\n\n![image.png](https://dev-media.amazoncloud.cn/35ed5eb18e9f4b7bbfeafd5c1f39d741_image.png)\n\n**Pat Reilly** is an Architect in the AWS Data Lab, where he helps customers design and build data workloads to support their business. Prior to AWS, Pat consulted at an AWS Partner, building AWS data workloads across a variety of industries.","render":"<p>Organizations are increasingly building and using machine learning (ML)-powered solutions for a variety of use cases and problems, including predictive maintenance of machine parts, product recommendations based on customer preferences, credit profiling, content moderation, fraud detection, and more. In many of these scenarios, the effectiveness and benefits derived from these ML-powered solutions can be further enhanced when they can process and derive insights from data events in near-real time.</p>\n<p>Although the business value and benefits of near-real-time ML-powered solutions are well established, the architecture required to implement these solutions at scale with optimum reliability and performance is complicated. This post describes how you can combine <a href=\\"https://aws.amazon.com/kinesis/\\" target=\\"_blank\\">Amazon Kinesis</a>, <a href=\\"https://aws.amazon.com/glue/\\" target=\\"_blank\\">AWS Glue</a>, and <a href=\\"https://aws.amazon.com/sagemaker/\\" target=\\"_blank\\">Amazon SageMaker</a> to build a near-real-time feature engineering and inference solution for predictive maintenance.</p>\\n<h4><a id=\\"Use_case_overview_4\\"></a><strong>Use case overview</strong></h4>\\n<p>We focus on a predictive maintenance use case where sensors deployed in the field (such as industrial equipment or network devices), need to replaced or rectified before they become faulty and cause downtime. Downtime can be expensive for businesses and can lead to poor customer experience. Predictive maintenance powered by an ML model can also help in augmenting the regular schedule-based maintenance cycles by informing when a machine part in good condition should not be replaced, therefore avoiding unnecessary cost.</p>\n<p>In this post, we focus on applying machine learning to a synthetic dataset containing machine failures due to features such as air temperature, process temperature, rotation speed, torque, and tool wear. The dataset used is sourced from the <a href=\\"https://archive.ics.uci.edu/ml/datasets/AI4I+2020+Predictive+Maintenance+Dataset\\" target=\\"_blank\\">UCI Data Repository</a>.</p>\\n<p>Machine failure consists of five independent failure modes:</p>\n<ul>\\n<li>Tool Wear Failure (TWF)</li>\n<li>Heat Dissipation Failure (HDF)</li>\n<li>Power Failure (PWF)</li>\n<li>Over-strain Failure (OSF)</li>\n<li>Random Failure (RNF)</li>\n</ul>\\n<p>The machine failure label indicates whether the machine has failed for a particular data point if any of the preceding failure modes are true. If at least one of the failure modes is true, the process fails and the machine failure label is set to 1. The objective for the ML model is to identify machine failures correctly, so a downstream predictive maintenance action can be initiated.</p>\n<h4><a id=\\"Solution_overview_18\\"></a><strong>Solution overview</strong></h4>\\n<p>For our predictive maintenance use case, we assume that device sensors stream various measurements and readings about machine parts. Our solution then takes a slice of streaming data each time (micro-batch), and performs processing and feature engineering to create features. The created features are then used to generate inferences from a trained and deployed ML model in near-real time. The generated inferences can be further processed and consumed by downstream applications, to take appropriate actions and initiate maintenance activity.</p>\n<p>The following diagram shows the architecture of our overall solution.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/0115f0c5913a4743928f1c98e811b019_image.png\\" alt=\\"image.png\\" /></p>\n<p>The solution broadly consists of the following sections, which are explained in detail later in this post:</p>\n<ul>\\n<li><strong>Streaming data source and ingestion</strong> – We use [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) Data Streams to collect streaming data from the field sensors at scale and make it available for further processing.</li>\\n<li><strong>Near-real-time feature engineering</strong> – We use [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming jobs to read data from a Kinesis data stream and perform data processing and feature engineering, before storing the derived features in <a href=\\"http://aws.amazon.com/s3\\" target=\\"_blank\\">Amazon Simple Storage Service</a> ([Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail)). [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) provides a reliable and cost-effective option to store large volumes of data.</li>\\n<li><strong>Model training and deployment</strong> – We use the AI4I predictive maintenance dataset from the UCI Data Repository to train an ML model based on the XGBoost algorithm using SageMaker. We then deploy the trained model to a SageMaker asynchronous inference endpoint.</li>\\n<li><strong>Near-real-time ML inference</strong> – After the features are available in [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), we need to generate inferences from the deployed model in near-real time. SageMaker asynchronous inference endpoints are well suited for this requirement because they support larger payload sizes (up to 1 GB) and can generate inferences within minutes (up to a maximum of 15 minutes). We use S3 event notifications to run an <a href=\\"http://aws.amazon.com/lambda\\" target=\\"_blank\\">AWS Lambda</a> function to invoke a SageMaker asynchronous inference endpoint. SageMaker asynchronous inference endpoints accept S3 locations as input, generate inferences from the deployed model, and write these inferences back to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) in near-real time.</li>\\n</ul>\n<p>The source code for this solution is located on <a href=\\"https://github.com/aws-samples/amazon-sagemaker-predictive-maintenance\\" target=\\"_blank\\">GitHub</a>. The solution has been tested and should be run in us-east-1.</p>\\n<p>We use an <a href=\\"http://aws.amazon.com/cloudformation\\" target=\\"_blank\\">AWS CloudFormation</a> template, deployed using <a href=\\"https://aws.amazon.com/serverless/sam/\\" target=\\"_blank\\">AWS Serverless Application Model</a> (AWS SAM), and SageMaker notebooks to deploy the solution.</p>\\n<h4><a id=\\"Prerequisites_36\\"></a><strong>Prerequisites</strong></h4>\\n<p>To get started, as a prerequisite, you must have the <a href=\\"https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html\\" target=\\"_blank\\">SAM CLI</a>, <a href=\\"https://realpython.com/installing-python/\\" target=\\"_blank\\">Python 3</a>, and <a href=\\"https://pip.pypa.io/en/stable/installing/\\" target=\\"_blank\\">PIP</a> installed. You must also have the <a href=\\"https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html\\" target=\\"_blank\\">AWS Command Line Interface</a> (AWS CLI) configured properly.</p>\\n<h4><a id=\\"Deploy_the_solution_39\\"></a><strong>Deploy the solution</strong></h4>\\n<p>You can use <a href=\\"https://aws.amazon.com/cloudshell/\\" target=\\"_blank\\">AWS CloudShell</a> to run these steps. CloudShell is a browser-based shell that is pre-authenticated with your console credentials and includes pre-installed common development and operations tools (such as AWS SAM, AWS CLI, and Python). Therefore, no local installation or configuration is required.</p>\\n<ul>\\n<li>We begin by creating an S3 bucket where we store the script for our AWS Glue streaming job. Run the following command in your terminal to create a new bucket:</li>\n</ul>\\n<pre><code class=\\"lang-\\">aws s3api create-bucket --bucket sample-script-bucket-\$RANDOM --region us-east-1\\n</code></pre>\\n<ul>\\n<li>Note down the name of the bucket created.</li>\n</ul>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/127d26b0a9a74c7ebbca7a9e3239fafb_image.png\\" alt=\\"image.png\\" /></p>\n<ul>\\n<li>Next, we clone the code repository locally, which contains the CloudFormation template to deploy the stack. Run the following command in your terminal:</li>\n</ul>\\n<pre><code class=\\"lang-\\">git clone https://github.com/aws-samples/amazon-sagemaker-predictive-maintenance\\n</code></pre>\\n<ul>\\n<li>Navigate to the sam-template directory:</li>\n</ul>\\n<pre><code class=\\"lang-cd\\"></code></pre>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/a5e0593c243445cda6e427809006606a_image.png\\" alt=\\"image.png\\" /></p>\n<ul>\\n<li>Run the following command to copy the AWS Glue job script (from glue_streaming/app.py) to the S3 bucket you created:</li>\n</ul>\\n<pre><code class=\\"lang-\\">aws s3 cp glue_streaming/app.py s3://sample-script-bucket-30232/glue_streaming/app.py\\n</code></pre>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/ec22ad0f818a411cb318e0c89a10132d_image.png\\" alt=\\"image.png\\" /></p>\n<ul>\\n<li>You can now go ahead with the build and deployment of the solution, through the CloudFormation template via AWS SAM. Run the following command:</li>\n</ul>\\n<pre><code class=\\"lang-\\">sam build\\n</code></pre>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/d6f1c820feae4adbb69290c0abc91a56_image.png\\" alt=\\"image.png\\" /></p>\n<pre><code class=\\"lang-\\">sam deploy --guided\\n</code></pre>\\n<ul>\\n<li>Provide arguments for the deployment such as the stack name, preferred AWS Region (<code>us-east-1</code>), and <code>GlueScriptsBucket</code>.<br />\\nMake sure you provide the same S3 bucket that you created earlier for the AWS Glue script S3 bucket (parameter <code>GlueScriptsBucket</code> in the following screenshot).</li>\\n</ul>\n<p><img src=\\"https://dev-media.amazoncloud.cn/0088d8ba89df456eb78bf33c02282e0c_image.png\\" alt=\\"image.png\\" /></p>\n<p>After you provide the required arguments, AWS SAM starts the stack deployment. The following screenshot shows the resources created.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/e970e76f17704b68955711d9b9999e5d_image.png\\" alt=\\"image.png\\" /></p>\n<p>After the stack is deployed successfully, you should see the following message.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/ddd71ec35c314338b9a197c282ba996b_image.png\\" alt=\\"image.png\\" /></p>\n<ul>\\n<li>On the AWS CloudFormation console, open the stack (for this post, <code>nrt-streaming-inference</code>) that was provided when deploying the CloudFormation template.</li>\\n<li>On the <strong>Resources</strong> tab, note the SageMaker notebook instance ID.</li>\\n</ul>\n<p><img src=\\"https://dev-media.amazoncloud.cn/994fa9ae733643a489a54be64faf9042_image.png\\" alt=\\"image.png\\" /></p>\n<ul>\\n<li>On the SageMaker console, open this instance.</li>\n</ul>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/df4a0ae1e5214cde9c5bbf65c6b90fe9_image.png\\" alt=\\"image.png\\" /></p>\n<p>The SageMaker notebook instance already has the required notebooks pre-loaded.</p>\n<p>Navigate to the notebooks folder and open and follow the instructions within the notebooks (<code>Data_Pre-Processing.ipynb</code> and <code>ModelTraining-Evaluation-and-Deployment.ipynb</code>) to explore the dataset, perform preprocessing and feature engineering, and train and deploy the model to a SageMaker asynchronous inference endpoint.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/f08bbb4161a34c6491badfaeb6692a3e_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Streaming_data_source_and_ingestion_102\\"></a><strong>Streaming data source and ingestion</strong></h4>\\n<p>Kinesis Data Streams is a serverless, scalable, and durable real-time data streaming service that you can use to collect and process large streams of data records in real time. Kinesis Data Streams enables capturing, processing, and storing data streams from a variety of sources, such as IT infrastructure log data, application logs, social media, market data feeds, web clickstream data, IoT devices and sensors, and more. You can provision a Kinesis data stream in on-demand mode or provisioned mode depending on the throughput and scaling requirements. For more information, see <a href=\\"https://docs.aws.amazon.com/streams/latest/dev/how-do-i-size-a-stream.html\\" target=\\"_blank\\">Choosing the Data Stream Capacity Mode</a>.</p>\\n<p>For our use case, we assume that various sensors are sending measurements such as temperature, rotation speed, torque, and tool wear to a data stream. Kinesis Data Streams acts as a funnel to collect and ingest data streams.</p>\n<p>We use the <a href=\\"https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html\\" target=\\"_blank\\">Amazon Kinesis Data Generator</a> (KDG) later in this post to generate and send data to a Kinesis data stream, simulating data being generated by sensors. The data from the data stream sensor-data-stream is ingested and processed using an [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming job, which we discuss next.</p>\\n<h4><a id=\\"Nearrealtime_feature_engineering_109\\"></a><strong>Near-real-time feature engineering</strong></h4>\\n<p><a href=\\"https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html\\" target=\\"_blank\\">AWS Glue streaming jobs</a> provide a convenient way to process streaming data at scale, without the need to manage the compute environment. [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) allows you to perform extract, transform, and load (ETL) operations on streaming data using continuously running jobs. [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) streaming ETL is built on the Apache Spark Structured Streaming engine, and can ingest streams from Kinesis, Apache Kafka, and <a href=\\"https://aws.amazon.com/msk/\\" target=\\"_blank\\">Amazon Managed Streaming for Apache Kafka</a> (Amazon MSK).</p>\\n<p>The streaming ETL job can use both AWS Glue built-in transforms and transforms that are native to Apache Spark Structured Streaming. You can also use the Spark ML and <a href=\\"https://spark.apache.org/mllib/\\" target=\\"_blank\\">MLLib</a> libraries in [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) jobs for easier feature processing using readily available helper libraries.</p>\\n<p>If the schema of the streaming data source is pre-determined, you can specify it in an AWS Data Catalog table. If the schema definition can’t be determined beforehand, you can enable schema detection in the streaming ETL job. The job then automatically determines the schema from the incoming data. Additionally, you can use the <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html\\" target=\\"_blank\\">AWS Glue Schema Registry</a> to allow central discovery, control, and evolution of data stream schemas. You can further integrate the Schema Registry with the Data Catalog to optionally use schemas stored in the Schema Registry when creating or updating [AWS Glue](https://aws.amazon.com/cn/glue/?trk=cndc-detail) tables or partitions in the Data Catalog.</p>\\n<p>For this post, we create an AWS Glue Data Catalog table (<code>sensor-stream</code>) with our Kinesis data stream as the source and define the schema for our sensor data.</p>\\n<p>We create an AWS Glue dynamic dataframe from the Data Catalog table to read the streaming data from Kinesis. We also specify the following options:</p>\n<ul>\\n<li>A window size of 60 seconds, so that the AWS Glue job reads and processes data in 60-second windows</li>\n<li>The starting position <code>TRIM_HORIZON</code>, to allow reading from the oldest records in the Kinesis data stream</li>\\n</ul>\n<p>We also use Spark MLlib’s <a href=\\"https://spark.apache.org/docs/latest/ml-features#stringindexer\\" target=\\"_blank\\">StringIndexer</a> feature transformer to encode the string column type into label indexes. This transformation is implemented using <a href=\\"https://spark.apache.org/docs/latest/ml-pipeline.html\\" target=\\"_blank\\">Spark ML Pipelines</a>. Spark ML Pipelines provide a uniform set of high-level APIs for ML algorithms to make it easier to combine multiple algorithms into a single pipeline or workflow.</p>\\n<p>We use the foreachBatch API to invoke a function named processBatch, which in turn processes the data referenced by this dataframe. See the following code:</p>\n<pre><code class=\\"lang-\\"># Read from Kinesis Data Stream\\nsourceStreamData = glueContext.create_data_frame.from_catalog(database = &quot;sensordb&quot;, table_name = &quot;sensor-stream&quot;, transformation_ctx = &quot;sourceStreamData&quot;, additional_options = {&quot;startingPosition&quot;: &quot;TRIM_HORIZON&quot;})\\ntype_indexer = StringIndexer(inputCol=&quot;type&quot;, outputCol=&quot;type_enc&quot;, stringOrderType=&quot;alphabetAsc&quot;)\\npipeline = Pipeline(stages=[type_indexer])\\nglueContext.forEachBatch(frame = sourceStreamData, batch_function = processBatch, options = {&quot;windowSize&quot;: &quot;60 seconds&quot;, &quot;checkpointLocation&quot;: checkpoint_location})\\n</code></pre>\\n<p>The function processBatch performs the specified transformations and partitions the data in Amazon S3 based on year, month, day, and batch ID.</p>\n<p>We also re-partition the AWS Glue partitions into a single partition, to avoid having too many small files in Amazon S3. Having several small files can impede read performance, because it amplifies the overhead related to seeking, opening, and reading each file. We finally write the features to generate inferences into a prefix (features) within the S3 bucket. See the following code:</p>\n<pre><code class=\\"lang-\\"># Function that gets called to perform processing, feature engineering and writes to S3 for every micro batch of streaming data from Kinesis.\\ndef processBatch(data_frame, batchId):\\ntransformer = pipeline.fit(data_frame)\\nnow = datetime.datetime.now()\\nyear = now.year\\nmonth = now.month\\nday = now.day\\nhour = now.hour\\nminute = now.minute\\nif (data_frame.count() &gt; 0):\\ndata_frame = transformer.transform(data_frame)\\ndata_frame = data_frame.drop(&quot;type&quot;)\\ndata_frame = DynamicFrame.fromDF(data_frame, glueContext, &quot;from_data_frame&quot;)\\ndata_frame.printSchema()\\n# Write output features to S3\\ns3prefix = &quot;features&quot; + &quot;/year=&quot; + &quot;{:0&gt;4}&quot;.format(str(year)) + &quot;/month=&quot; + &quot;{:0&gt;2}&quot;.format(str(month)) + &quot;/day=&quot; + &quot;{:0&gt;2}&quot;.format(str(day)) + &quot;/hour=&quot; + &quot;{:0&gt;2}&quot;.format(str(hour)) + &quot;/min=&quot; + &quot;{:0&gt;2}&quot;.format(str(minute)) + &quot;/batchid=&quot; + str(batchId)\\ns3path = &quot;s3://&quot; + out_bucket_name + &quot;/&quot; + s3prefix + &quot;/&quot;\\nprint(&quot;-------write start time------------&quot;)\\nprint(str(datetime.datetime.now()))\\ndata_frame = data_frame.toDF().repartition(1)\\ndata_frame.write.mode(&quot;overwrite&quot;).option(&quot;header&quot;,False).csv(s3path)\\nprint(&quot;-------write end time------------&quot;)\\nprint(str(datetime.datetime.now()))\\n</code></pre>\\n<h4><a id=\\"Model_training_and_deployment_161\\"></a><strong>Model training and deployment</strong></h4>\\n<p>SageMaker is a fully managed and integrated ML service that enables data scientists and ML engineers to quickly and easily build, train, and deploy ML models.</p>\n<p>Within the Data_Pre-Processing.ipynb notebook, we first import the AI4I Predictive Maintenance dataset from the UCI Data Repository and perform exploratory data analysis (EDA). We also perform feature engineering to make our features more useful for training the model.</p>\n<p>For example, within the dataset, we have a feature named type, which represents the product’s quality type as L (low), M (medium), or H (high). Because this is categorical feature, we need to encode it before training our model. We use Scikit-Learn’s LabelEncoder to achieve this:</p>\n<pre><code class=\\"lang-\\">from sklearn.preprocessing import LabelEncoder\\ntype_encoder = LabelEncoder()\\ntype_encoder.fit(origdf['type'])\\ntype_values = type_encoder.transform(origdf['type'])\\n</code></pre>\\n<p>After the features are processed and the curated train and test datasets are generated, we’re ready to train an ML model to predict whether the machine failed or not based on system readings. We train a XGBoost model, using the SageMaker built-in algorithm. <a href=\\"https://xgboost.readthedocs.io/en/stable/\\" target=\\"_blank\\">XGBoost</a> can provide good results for multiple types of ML problems, including classification, even when training samples are limited.</p>\\n<p><a href=\\"https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html\\" target=\\"_blank\\">SageMaker training jobs</a> provide a powerful and flexible way to train ML models on SageMaker. SageMaker manages the underlying compute infrastructure and provides <a href=\\"https://docs.aws.amazon.com/sagemaker/latest/dg/algorithms-choose.html#algorithms-choose-implementation\\" target=\\"_blank\\">multiple options</a> to choose from, for diverse model training requirements, based on the use case.</p>\\n<pre><code class=\\"lang-\\">xgb = sagemaker.estimator.Estimator(container,\\nrole,\\ninstance_count=1,\\ninstance_type='ml.c4.4xlarge',\\noutput_path=xgb_upload_location,\\nsagemaker_session=sagemaker_session)\\nxgb.set_hyperparameters(max_depth=5,\\neta=0.2,\\ngamma=4,\\nmin_child_weight=6,\\nsubsample=0.8,\\nsilent=0,\\nobjective='binary:hinge',\\nnum_round=100)\\n\\nxgb.fit({'train': s3_train_channel, 'validation': s3_valid_channel})\\n</code></pre>\\n<p>When the model training is complete and the model evaluation is satisfactory based on the business requirements, we can begin model deployment. We first create an endpoint configuration with the AsyncInferenceConfig object option and using the model trained earlier:</p>\n<pre><code class=\\"lang-\\">endpoint_config_name = resource_name.format(&quot;EndpointConfig&quot;)\\ncreate_endpoint_config_response = sm_client.create_endpoint_config(\\nEndpointConfigName=endpoint_config_name,\\nProductionVariants=[\\n{\\n&quot;VariantName&quot;: &quot;variant1&quot;,\\n&quot;ModelName&quot;: model_name,\\n&quot;InstanceType&quot;: &quot;ml.m5.xlarge&quot;,\\n&quot;InitialInstanceCount&quot;: 1,\\n}\\n],\\nAsyncInferenceConfig={\\n&quot;OutputConfig&quot;: {\\n&quot;S3OutputPath&quot;: f&quot;s3://{bucket}/{prefix}/output&quot;,\\n#Specify Amazon SNS topics\\n&quot;NotificationConfig&quot;: {\\n&quot;SuccessTopic&quot;: &quot;arn:aws:sns:&lt;region&gt;:&lt;account-id&gt;:&lt;success-sns-topic&gt;&quot;,\\n&quot;ErrorTopic&quot;: &quot;arn:aws:sns:&lt;region&gt;:&lt;account-id&gt;:&lt;error-sns-topic&gt;&quot;,\\n}},\\n&quot;ClientConfig&quot;: {&quot;MaxConcurrentInvocationsPerInstance&quot;: 4},\\n},)\\n</code></pre>\\n<p>We then create a SageMaker asynchronous inference endpoint, using the endpoint configuration we created. After it’s provisioned, we can start invoking the endpoint to generate inferences asynchronously.</p>\n<pre><code class=\\"lang-\\">endpoint_name = resource_name.format(&quot;Endpoint&quot;)\\ncreate_endpoint_response = sm_client.create_endpoint(\\nEndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)\\n</code></pre>\\n<h4><a id=\\"Nearrealtime_inference_224\\"></a><strong>Near-real-time inference</strong></h4>\\n<p>SageMaker <a href=\\"https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference.html\\" target=\\"_blank\\">asynchronous inference</a> endpoints provide the ability to queue incoming inference requests and process them asynchronously in near-real time. This is ideal for applications that have inference requests with larger payload sizes (up to 1 GB), may require longer processing times (up to 15 minutes), and have near-real-time latency requirements. Asynchronous inference also enables you to save on costs by auto scaling the instance count to zero when there are no requests to process, so you only pay when your endpoint is processing requests.</p>\\n<p>You can create a SageMaker asynchronous inference endpoint similar to how you create a real-time inference endpoint and additionally specify the <code>AsyncInferenceConfig</code> object, while creating your endpoint configuration with the EndpointConfig field in the CreateEndpointConfig API. The following diagram shows the inference workflow and how an asynchronous inference endpoint generates an inference.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/fcf53ab579104a3b8b1d3f84a17e7bd1_image.png\\" alt=\\"image.png\\" /></p>\n<p>To invoke the asynchronous inference endpoint, the request payload should be stored in Amazon S3 and reference to this payload needs to be provided as part of the InvokeEndpointAsync request. Upon invocation, SageMaker queues the request for processing and returns an identifier and output location as a response. Upon processing, SageMaker places the result in the Amazon S3 location. You can optionally choose to receive success or error notifications with <a href=\\"http://aws.amazon.com/sns\\" target=\\"_blank\\">Amazon Simple Notification Service</a> ([Amazon SNS](https://aws.amazon.com/cn/sns/?trk=cndc-detail)).</p>\\n<h4><a id=\\"Test_the_endtoend_solution_234\\"></a><strong>Test the end-to-end solution</strong></h4>\\n<p>To test the solution, complete the following steps:</p>\n<ul>\\n<li>On the AWS CloudFormation console, open the stack you created earlier (<code>nrt-streaming-inference</code>).</li>\\n<li>On the <strong>Outputs</strong> tab, copy the name of the S3 bucket (<code>EventsBucket</code>).</li>\\n</ul>\n<p>This is the S3 bucket to which our AWS Glue streaming job writes features after reading and processing from the Kinesis data stream.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/fa9012ca7073418188087b86375c07ae_image.png\\" alt=\\"image.png\\" /></p>\n<p>Next, we set up event notifications for this S3 bucket.</p>\n<ul>\\n<li>On the Amazon S3 console, navigate to the bucket <code>EventsBucket</code>.</li>\\n<li>On the<strong>Properties</strong> tab, in the <strong>Event notifications</strong> section, choose <strong>Create event notification</strong>.</li>\\n</ul>\n<p><img src=\\"https://dev-media.amazoncloud.cn/c6e4f31e0011417881fd4da33a7d47f9_image.png\\" alt=\\"image.png\\" /></p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/d7156625b1d4495e8b58933a0742d6d9_image.png\\" alt=\\"image.png\\" /></p>\n<ul>\\n<li>For <strong>Event name</strong>, enter <code>invoke-endpoint-lambda</code>.</li>\\n<li>For <strong>Prefix</strong>, enter <code>features/</code>.</li>\\n<li>For <strong>Suffix</strong>, enter <code>.csv</code>.</li>\\n<li>For <strong>Event types</strong>, select <strong>All object create events</strong>.</li>\\n</ul>\n<p><img src=\\"https://dev-media.amazoncloud.cn/7fa38733b831486b965da332d9fb95b5_image.png\\" alt=\\"image.png\\" /></p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/c184d6439dc246ae8229d6fac63a2efb_image.png\\" alt=\\"image.png\\" /></p>\n<ul>\\n<li>For <strong>Destination</strong>, select <strong>Lambda function</strong>.</li>\\n<li>For <strong>Lambda function</strong>, and choose the function <code>invoke-endpoint-asynch</code>.</li>\\n<li>Choose <strong>Save changes</strong>.</li>\\n</ul>\n<p><img src=\\"https://dev-media.amazoncloud.cn/7b0ac84e4c994e2082e6d0e44faba9b4_image.png\\" alt=\\"image.png\\" /></p>\n<ul>\\n<li>On the AWS Glue console, open the job <code>GlueStreaming-Kinesis-S3</code>.</li>\\n<li>Choose <strong>Run job</strong>.</li>\\n</ul>\n<p><img src=\\"https://dev-media.amazoncloud.cn/f7407eae16d64b8e80420b09bfbb79a8_image.png\\" alt=\\"image.png\\" /></p>\n<p>Next we use the Kinesis Data Generator (KDG) to simulate sensors sending data to our Kinesis data stream. If this is your first time using the KDG, refer to <a href=\\"https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html\\" target=\\"_blank\\">Overview</a> for the initial setup. The KDG provides a CloudFormation template to create the user and assign just enough permissions to use the KDG for sending events to Kinesis. Run the <a href=\\"https://aws-kdg-tools.s3.us-west-2.amazonaws.com/cognito-setup.json\\" target=\\"_blank\\">CloudFormation template</a> within the AWS account that you’re using to build the solution in this post. After the KDG is set up, log in and access the KDG to send test events to our Kinesis data stream.</p>\\n<ul>\\n<li>Use the Region in which you created the Kinesis data stream (us-east-1).</li>\n<li>On the drop-down menu, choose the data stream <code>sensor-data-stream</code>.</li>\\n<li>In the <strong>Records per second</strong> section, select <strong>Constant</strong> and enter 100.</li>\\n<li>Unselect <strong>Compress Records</strong>.</li>\\n<li>For <strong>Record template</strong>, use the following template:</li>\\n</ul>\n<pre><code class=\\"lang-\\">{\\n&quot;air_temperature&quot;: {{random.number({&quot;min&quot;:295,&quot;max&quot;:305, &quot;precision&quot;:0.01})}},\\n&quot;process_temperature&quot;: {{random.number({&quot;min&quot;:305,&quot;max&quot;:315, &quot;precision&quot;:0.01})}},\\n&quot;rotational_speed&quot;: {{random.number({&quot;min&quot;:1150,&quot;max&quot;:2900})}},\\n&quot;torque&quot;: {{random.number({&quot;min&quot;:3,&quot;max&quot;:80, &quot;precision&quot;:0.01})}},\\n&quot;tool_wear&quot;: {{random.number({&quot;min&quot;:0,&quot;max&quot;:250})}},\\n&quot;type&quot;: &quot;{{random.arrayElement([&quot;L&quot;,&quot;M&quot;,&quot;H&quot;])}}&quot;\\n}\\n</code></pre>\\n<ul>\\n<li>Click <strong>Send data</strong> to start sending data to the Kinesis data stream.</li>\\n</ul>\n<p><img src=\\"https://dev-media.amazoncloud.cn/c13379f92b704a19a1cc0a75bc3734ee_image.png\\" alt=\\"image.png\\" /></p>\n<p>The AWS Glue streaming job reads and extracts a micro-batch of data (representing sensor readings) from the Kinesis data stream based on the window size provided. The streaming job then processes and performs feature engineering on this micro-batch before partitioning and writing it to the prefix features within the S3 bucket.</p>\n<p>As new features created by the AWS Glue streaming job are written to the S3 bucket, a Lambda function (<code>invoke-endpoint-asynch</code>) is triggered, which invokes a SageMaker asynchronous inference endpoint by sending an invocation request to get inferences from our deployed ML model. The asynchronous inference endpoint queues the request for asynchronous invocation. When the processing is complete, SageMaker stores the inference results in the [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) location (<code>S3OutputPath</code>) that was specified during the asynchronous inference endpoint configuration.</p>\\n<p>For our use case, the inference results indicate if a machine part is likely to fail or not, based on the sensor readings.</p>\n<p><img src=\\"https://dev-media.amazoncloud.cn/9e5b40a8b647438ba523bc528a5a381a_image.png\\" alt=\\"image.png\\" /></p>\n<p>SageMaker also sends a success or error notification with Amazon SNS. For example, if you set up an <a href=\\"https://docs.aws.amazon.com/sns/latest/dg/sns-email-notifications.html#create-subscribe-endpoint-to-topic-console\\" target=\\"_blank\\">email subscription</a> for the success and error SNS topics (specified within the asynchronous SageMaker inference endpoint configuration), an email can be sent every time an inference request is processed. The following screenshot shows a sample email from the SNS success topic.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/577b48e3239b49b8ba02f736eca7635f_image.png\\" alt=\\"image.png\\" /></p>\n<p>For real-world applications, you can integrate SNS notifications with other services such as <a href=\\"https://aws.amazon.com/sqs/\\" target=\\"_blank\\">Amazon Simple Queue Service</a> ([Amazon SQS](https://aws.amazon.com/cn/sqs/?trk=cndc-detail)) and Lambda for additional postprocessing of the generated inferences or integration with other downstream applications, based on your requirements. For example, for our predictive maintenance use case, you can invoke a Lambda function based on an SNS notification to read the generated inference from [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), further process it (such as aggregation or filtering), and initiate workflows such as sending work orders for equipment repair to technicians.</p>\\n<h4><a id=\\"Clean_up_308\\"></a><strong>Clean up</strong></h4>\\n<p>When you’re done testing the stack, delete the resources (especially the Kinesis data stream, Glue streaming job, and SNS topics) to avoid unexpected charges.</p>\n<p>Run the following code to delete your stack:</p>\n<pre><code class=\\"lang-\\">sam delete nrt-streaming-inference\\n</code></pre>\\n<p>Also delete the resources such as SageMaker endpoints by following the cleanup section in the ModelTraining-Evaluation-and-Deployment notebook.</p>\n<h4><a id=\\"Conclusion_317\\"></a><strong>Conclusion</strong></h4>\\n<p>In this post, we used a predictive maintenance use case to demonstrate how to use various services such as Kinesis, AWS Glue, and SageMaker to build a near-real-time inference pipeline. We encourage you to try this solution and let us know what you think.</p>\n<p>If you have any questions, share them in the comments.</p>\n<h5><a id=\\"About_the_authors_322\\"></a><strong>About the authors</strong></h5>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/85ae57bdd192434297b2db00dd04f3b5_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>Rahul Sharma</strong> is a Solutions Architect at AWS Data Lab, helping AWS customers design and build AI/ML solutions. Prior to joining AWS, Rahul has spent several years in the finance and insurance sector, helping customers build data and analytical platforms.</p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/35ed5eb18e9f4b7bbfeafd5c1f39d741_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>Pat Reilly</strong> is an Architect in the AWS Data Lab, where he helps customers design and build data workloads to support their business. Prior to AWS, Pat consulted at an AWS Partner, building AWS data workloads across a variety of industries.</p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭