Amazon Graviton2上的 Spark:使用 MLlib 的机器学习

机器学习
Amazon Elastic Kubernetes Service (EKS)
0
0
{"value":"Masoud Koleini 2021年11月22日\n\nApacheSpark 是一个在机器集群上运行大数据计算的框架。Spark 的一个重要用例是机器学习的大数据分析。训练数据集的大小可以是 TB。这样的数据集不适合单个机器的内存,必须分布在要处理的机器集群上。\n\n机器学习管道有多个阶段,包括数据提取、转换和训练。根据转换过程和中间数据大小,所需内存可能与原始数据的大小不同。另外,不同的机器学习算法有不同的资源需求。因此,在将其部署到生产环境之前,用户应该评估其 ML 管道以分配适当的集群资源。\n\nAmazon Graviton2处理器使用64位 Arm Neoverse 内核。与基于 x86的实例相比,由 Graviron2处理器支持的实例提供了更好的性价比。这个博客展示了如何在 AmazonEKS 上运行的 Spark 集群上使用基于 Graviton2的实例来训练一个和 K-means 集群模型。\n\n### **Spark 机器学习库**\n\nSpark 的机器学习库称为 MLlib。它以分布式方式实现ML算法、数据转换和管道。它允许用户保存经过训练的模型,并在预测阶段重新加载它们。新的 MLlib 库(也称为 Spark ML)基于 Spark 的 Dataframe API,允许 Spark 在数据管道上应用优化。这个博客演示了两个 ML 案例研究(决策树回归和 K-均值聚类),使用 EKS 集群上运行的大型数据集和 Graviton2实例。\n\n### **要求**\n\n对于本文中的案例研究,读者必须遵循我们之前 Twitter 分析博客中的指导原则。本博客展示了如何设置 EKS 集群、微调集群以及创建 Spark Scala 项目。请注意,您可能必须根据正在实施的案例研究更改作为工作节点运行的 Graviton2计算机的数量\n\n\n\n### **案例研究1:决策树回归-房价**\n\n决策树是监督学习中常用的方法之一。决策树用于解决回归和分类问题。决策树的内部节点表示关于输入特征的问题,分支表示决策,叶子表示结果。\n\nSpark 使用 Random Forest 和 Gradient-Boosted Trees 作为两种流行的树集成算法,以减少决策树可能的过度拟合。在 Spark 上运行决策树的一个重要好处是,实现可以将训练有效地分布到多个节点上。\n\n\n\n#### **加州房价数据集**\n\n本案例研究中使用的数据集包含不同地理位置的加利福尼亚房价、房产年龄、卧室数量以及其他一些特征。我们利用这个数据集训练了一个决策树回归模型,这样我们就可以预测市场上其他房屋的价格。\n\n原始数据集的大小很小(1.2MB)。因此,本案例研究将复制相同的数据,以创建大小为55.7GB 的数据集(分解为多个 CSV 文件)。90%的数据用于训练,其余10%通过模型进行预测。数据集存储在 AmazonS3 bucket 中。\n\n\n\n#### **简图**\n\n此图显示了 ML 案例研究中的不同组件是如何相互作用的。\n\n![image.png](1)\n\n#### **Amazon 组件和权限**\n\n在运行 Spark 作业之前,应创建已上载数据集的 bucket。ML 管道还应该具有对 Amazon S3存储桶的读写访问权限。\n\n要使用 Terraform 部署所需资源,请在 EKS Terraform 文件夹中创建一个名为 ml.tf 的文件(从前面的案例研究中删除 twitter.tf)。在文件中放置以下内容。这将创建一个名为 spark ml demo 的 S3 bucket,并为角色添加正确的权限:\n```\n\nresource \"aws_s3_bucket\" \"spark-ml-demo\" {\n bucket = \"spark-ml-demo\"\n}\n\nresource \"aws_iam_role_policy_attachment\" \"s3-full-access\" {\n role = \"eksWorkerRole\"\n policy_arn = \"arn:aws:iam::aws:policy/AmazonS3FullAccess\"\n}\n```\n运行 terraform apply 来创建资源,然后将数据集上传到 spark ml demo bucket 中。\n\n\n#### **Spark 实施**\n\n与 Twitter 类似,读者需要创建 sbt 项目,并将以下依赖项添加到 build.sbt 文件中:\n```\n\nlibraryDependencies ++= Seq(\n \"org.apache.spark\" %% \"spark-core\" % \"3.1.2\",\n \"org.apache.spark\" %% \"spark-sql\" % \"3.1.2\",\n \"org.apache.spark\" %% \"spark-mllib\" % \"3.1.2\",\n \"software.amazon.awssdk\" % \"s3\" % \"2.17.31\",\n \"org.apache.hadoop\" % \"hadoop-common\" % \"3.2.0\",\n \"org.apache.hadoop\" % \"hadoop-aws\" % \"3.2.0\",\n)\n```\n下面的代码将 spark ml demo S3 bucket 中的数据集(包含_1.csv,…)读取到数据帧中(假设数据集在 bucket 中被分解为多个 csv 文件)。输入被分成训练和测试数据集。训练数据集被矢量化并传递给决策树回归器拟合函数,以创建一个模型,该模型是 Spark ML 上下文中的转换器。\n\n代码中的下一步将介绍 ML 推理。它使用相同的向量汇编程序对测试数据集进行向量化,以进行训练数据转换。预测数据帧是通过在矢量化测试数据集上调用模型的转换函数生成的。结果将写回 Predicts 文件夹下的同一个 S3 bucket。\n\n请注意,在培训中仅使用特征的子集。\n```\n\npackage sparkml\n\nimport org.apache.spark.sql._\nimport org.apache.spark.ml.regression.DecisionTreeRegressor\nimport org.apache.spark.ml.feature.VectorAssembler\nimport org.apache.log4j._\n\nobject RealEstate {\n // this is for California dataset:\n // https://developers.google.com/machine-learning/crash-course/california-housing-data-description\n\n case class Property(longitude: Double, latitude: Double, housing_median_age: Double, total_rooms: Double,\n total_bedrooms: Double, population: Double, households: Double, median_income: Double,\n median_house_value: Double, ocean_proximity: String)\n\n def main(array: Array[String]) {\n\n // set the log level\n Logger.getLogger(\"org\").setLevel(Level.ERROR)\n\n val spark = SparkSession\n .builder()\n .appName(\"realEstate\")\n .getOrCreate()\n\n import spark.implicits._\n\n val realEstate = spark\n .read\n .option(\"header\", \"true\")\n .option(\"inferSchema\", \"true\")\n .csv(\"s3a://spark-ml-demo/housing_*.csv\")\n .as[Property]\n\n val Array(trainingData, testData) = realEstate.randomSplit(Array(0.9, 0.1))\n\n val assembler = new VectorAssembler()\n .setInputCols(Array(\"housing_median_age\", \"total_rooms\", \"total_bedrooms\", \"population\", \"households\", \"median_income\"))\n .setOutputCol(\"features\")\n .setHandleInvalid(\"skip\")\n\n val trainDf = assembler\n .transform(trainingData)\n .select(\"features\", \"median_house_value\")\n\n val dt = new DecisionTreeRegressor()\n .setLabelCol(\"median_house_value\")\n .setFeaturesCol(\"features\")\n\n val model = dt.fit(trainDf)\n\n val testDF = assembler\n .transform(testData)\n .select(\"features\", \"median_house_value\")\n\n val predictions = model.transform(testDF)\n .select(\"median_house_value\", \"prediction\")\n\n predictions.write.csv(\"s3a://spark-ml-demo/predictions\")\n\n spark.stop()\n }\n}\n```\n在上面的代码中,我们只使用数字特征进行训练和推理。读者可以在输入列中添加分类功能,如 ocean_邻近性,并验证结果。\n\n#### **在本地测试程序**\n\n将原始数据集存储在计算机上,并对 Scala 脚本应用以下修改:\n\n1.将 Spark 配置中的 master 设置为本地运行\n```\n\nval spark = SparkSession\n.builder()\n.appName(\"realEstate\")\n.master(\"local[*]\")\n.getOrCreate()\n```\n2.将 S3数据集位置替换为计算机上 csv 文件的地址\n\n3.将 S3上的输出地址替换为本地目录,如/tmp\n\n现在,在本地运行代码将创建一个包含预测的 CSV 文件。在打包软件之前,请确保已还原更改。\n#### **打包代码和依赖项**\n\n遵循 Twitter 分析案例研究中的指导原则打包代码和依赖项。\n\n#### **提交 Spark**\n这个过程类似于 Twitter 分析案例研究,除了这里,我们正在处理更大的数据集和更详细的资源规划。\n\n本案例研究在7个内存优化的 r6g.4X 大型实例上运行。这些实例具有16个 vCPU 和128GiB 的内存。spark submit 命令在 EKS 集群上运行作业,假设应用程序JAR文件的名称为 spark_ml.JAR(用相应的信息替换KUBERNETES_MASTER_地址和 DOCKER_IMAGE_\t地址)。分配给驱动程序和每个执行器的 VCPU 数量设置为5,这允许3个 POD 在一台机器上运行。相应地计算执行器的数量和内存分配。\n```\nbin/spark-submit \\\n --class sparkml.RealEstate \\\n --master k8s://<KUBERNETES_MASTER_ADDRESS> \\\n --deploy-mode cluster \\\n --conf <DOCKER_IMAGE_ADDRESS> \\\n --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\\n --conf spark.kubernetes.driver.pod.name=\"spark-ml\" \\\n --conf spark.kubernetes.namespace=default \\\n --conf spark.executor.instances=20 \\\n --conf spark.driver.cores=5 \\\n --conf spark.executor.cores=5 \\\n --conf spark.executor.memory=34g \\\n --conf spark.driver.memory=34g \\\n --conf spark.memory.fraction=0.8 \\\n --name sparkML \\\n local:///opt/spark/jars/spark_ml.jar\n```\n要监视应用程序在 Kubernetes 群集上是否无错误运行,可以使用 Spark UI 监视 Spark 作业的不同统计信息。通过提交以下命令运行 Spark UI,并在浏览器(localhost:4040)上打开该 UI:\n```\nkubectl port-forward spark-ml 4040:4040\n```\n#### **预测结果**\n\n案例研究运行6分钟,并将预测结果写入多个 CSV 文件上的 S3存储桶中。CSV 文件有两列:第一列是实际房屋中值,第二列是预测值(您可以修改机器学习脚本以在 CSV 文件中包含其他相关功能)。\n```\n\n95800,132367.88804380846\n113800,190347.262329421\n113800,190347.262329421\n67500,104197.72693265839\n67500,104197.72693265839\n65800,104197.72693265839\n68600,104197.72693265839\n71100,190347.262329421\n53500,104197.72693265839\n71300,158820.92941131844\n59200,104197.72693265839\n59200,104197.72693265839\n```\n### **案例研究2:K-均值聚类:Uber 驱动程序位置**\n\n本节介绍如何在 Amazon EKS 上运行的 Spark 群集上运行 K-means 群集训练和推理。\n\n#### **K-均值聚类算法**\n\nK-means 是无监督学习领域中最流行的聚类算法。假设k为预期的簇数,该算法随机初始化称为簇质心的k个簇中心,并迭代:\n\n1.根据数据点到质心的欧几里德距离,将数据点指定给簇\n\n2.将簇质心移动到每个簇中数据点的平均值\n\n迭代将继续,直到收敛。Spark 使用 K-means 算法的一种变体,可以在集群上并行化。\n\n#### **Uber 驱动程序位置数据集**\n\nK-means 案例研究使用 Uber 司机位置数据集,其中包括司机在不同日期和时间的纬度和经度。CSV 格式的数据集大小为209.7MB。因此,数据被复制以在 Amazon S3上创建大小为195.5GB 的数据集(分解为多个 CSV 文件)。它将90%的数据用于培训,其余用于测试。\n\n图、需求和依赖关系与前面的决策树示例相同。\n\n\n#### **Spark 实施**\n\n以下是 Spark 上 K-means 集群的实现,其中K设置为16。它将结果存储回 uber predictions 文件夹下的 S3 bucket 中。\n```\npackage spark_ml\n\nimport org.apache.spark.sql._\nimport org.apache.spark.ml.feature.VectorAssembler\nimport org.apache.log4j._\nimport org.apache.spark.ml.clustering.KMeans\n\nobject Uber {\n\n def main(array: Array[String]) {\n\n // set the log level\n Logger.getLogger(\"org\").setLevel(Level.ERROR)\n\n val spark = SparkSession\n .builder()\n .appName(\"uber\")\n .getOrCreate()\n\n val uber = spark\n .read\n .option(\"header\", \"true\")\n .option(\"inferSchema\", \"true\")\n .csv(\"s3a://spark-ml-demo/uber-raw-data*.csv\")\n\n val Array(trainingData, testData) = uber.select(\"Lat\", \"Lon\").randomSplit(Array(0.90, 0.1))\n\n val assembler = new VectorAssembler()\n .setInputCols(Array(\"Lat\", \"Lon\"))\n .setOutputCol(\"features\")\n .setHandleInvalid(\"skip\")\n\n val trainDf = assembler\n .transform(trainingData)\n\n val dt = new KMeans()\n .setK(16)\n .setFeaturesCol(\"features\")\n .setPredictionCol(\"prediction\")\n\n val model = dt.fit(trainDf)\n model.clusterCenters.foreach(println)\n\n val testDF = assembler\n .transform(testData)\n\n val predictions = model.transform(testDF)\n .select(\"Lat\", \"Lon\", \"prediction\")\n\n predictions.write.csv(\"s3a://spark-ml-demo/uber-predictions\")\n\n spark.stop()\n }\n}\n```\n#### **在本地测试程序**\n\n使用本地存储的 Uber 驱动程序位置数据集在您的计算机上运行代码。遵循与决策树回归相同的步骤。\n\n#### **打包代码和依赖项**\n\n遵循与 Twitter 分析案例研究中相同的准则打包代码和依赖项。\n\n#### **提交 Spark**\n\nK-means 案例研究在9个内存优化的 r6g.4X 大型实例上运行。因此,spark submit 参数中的执行器数量会相应更改:\n```\nbin/spark-submit \\\n --class sparkml.Uber \\\n --master k8s://<KUBERNETES_MASTER_ADDRESS> \\\n --deploy-mode cluster \\\n --conf <DOCKER_IMAGE_ADDRESS> \\\n --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\\n --conf spark.kubernetes.driver.pod.name=\"spark-ml\" \\\n --conf spark.kubernetes.namespace=default \\\n --conf spark.executor.instances=26 \\\n --conf spark.driver.cores=5 \\\n --conf spark.executor.cores=5 \\\n --conf spark.executor.memory=34g \\\n --conf spark.driver.memory=34g \\\n --conf spark.memory.fraction=0.8 \\\n --name sparkML \\\n local:///opt/spark/jars/spark_ml.jar\n```\n#### **聚类结果**\n\n该案例研究在34分钟内完成,覆盖了 Gravion2工人集群。当 Spark 作业成功终止时,结果应位于 uber prediction 文件夹中,并分解为多个 CSV 文件。CSV 文件中有三列:前两列是纬度和经度,第三列是簇编号。您还可以通过在创建模型的行之后调用 model.clusterCenters.foreach(println)来打印 cluster centroids。\n```\n\n40.8971,-73.8652,4 \n40.8972,-73.8653,4 \n40.8972,-73.8653,4 \n40.8973,-74.4901,15 \n40.8973,-73.8635,4 \n40.8977,-73.947,4 \n40.8977,-73.9469,4 \n40.8978,-73.9139,4 \n40.8981,-73.9717,3 \n40.8982,-74.1262,0 \n40.8982,-73.8673,4 \n40.8985,-73.901,4 \n40.8985,-73.901,4 \n40.8985,-73.9009,4 \n```\n### **结论**\n\n在这个博客中,我们演示了两个 ML 案例研究:决策树回归和 K-均值聚类,使用大量数据集,并在 EKS Kubernetes 集群上运行 Graviton2实例。Spark 的 MLlib 帮助行业以分布式方式在 TB 数据上运行机器学习管道(转换、存储、学习和预测)。当数据大小变大并且ML算法需要更多资源时,工作人员的数量会增加。Amazon 的研究另外,较低的每小时 Gravion2实例为用户提供了高达30%的成本降低。当数十个或数百个工作节点长时间运行以执行 ML 计算时,这将为用户带来巨大的好处。\n\n有关采用基于 Arm 处理器的客户案例,请访问 Amazon Graviton 页面。有关在 Arm Neoverse 平台上运行的软件工作负载的任何查询,请随时联系 sw-ecosystem@arm.com.\n\n[阅读原文](https://community.arm.com/arm-community-blogs/b/infrastructure-solutions-blog/posts/spark-with-machine-learning-on-aws-graviton2)","render":"<p>Masoud Koleini 2021年11月22日</p>\n<p>ApacheSpark 是一个在机器集群上运行大数据计算的框架。Spark 的一个重要用例是机器学习的大数据分析。训练数据集的大小可以是 TB。这样的数据集不适合单个机器的内存,必须分布在要处理的机器集群上。</p>\n<p>机器学习管道有多个阶段,包括数据提取、转换和训练。根据转换过程和中间数据大小,所需内存可能与原始数据的大小不同。另外,不同的机器学习算法有不同的资源需求。因此,在将其部署到生产环境之前,用户应该评估其 ML 管道以分配适当的集群资源。</p>\n<p>Amazon Graviton2处理器使用64位 Arm Neoverse 内核。与基于 x86的实例相比,由 Graviron2处理器支持的实例提供了更好的性价比。这个博客展示了如何在 AmazonEKS 上运行的 Spark 集群上使用基于 Graviton2的实例来训练一个和 K-means 集群模型。</p>\n<h3><a id=\"Spark__8\"></a><strong>Spark 机器学习库</strong></h3>\n<p>Spark 的机器学习库称为 MLlib。它以分布式方式实现ML算法、数据转换和管道。它允许用户保存经过训练的模型,并在预测阶段重新加载它们。新的 MLlib 库(也称为 Spark ML)基于 Spark 的 Dataframe API,允许 Spark 在数据管道上应用优化。这个博客演示了两个 ML 案例研究(决策树回归和 K-均值聚类),使用 EKS 集群上运行的大型数据集和 Graviton2实例。</p>\n<h3><a id=\"_12\"></a><strong>要求</strong></h3>\n<p>对于本文中的案例研究,读者必须遵循我们之前 Twitter 分析博客中的指导原则。本博客展示了如何设置 EKS 集群、微调集群以及创建 Spark Scala 项目。请注意,您可能必须根据正在实施的案例研究更改作为工作节点运行的 Graviton2计算机的数量</p>\n<h3><a id=\"1_18\"></a><strong>案例研究1:决策树回归-房价</strong></h3>\n<p>决策树是监督学习中常用的方法之一。决策树用于解决回归和分类问题。决策树的内部节点表示关于输入特征的问题,分支表示决策,叶子表示结果。</p>\n<p>Spark 使用 Random Forest 和 Gradient-Boosted Trees 作为两种流行的树集成算法,以减少决策树可能的过度拟合。在 Spark 上运行决策树的一个重要好处是,实现可以将训练有效地分布到多个节点上。</p>\n<h4><a id=\"_26\"></a><strong>加州房价数据集</strong></h4>\n<p>本案例研究中使用的数据集包含不同地理位置的加利福尼亚房价、房产年龄、卧室数量以及其他一些特征。我们利用这个数据集训练了一个决策树回归模型,这样我们就可以预测市场上其他房屋的价格。</p>\n<p>原始数据集的大小很小(1.2MB)。因此,本案例研究将复制相同的数据,以创建大小为55.7GB 的数据集(分解为多个 CSV 文件)。90%的数据用于训练,其余10%通过模型进行预测。数据集存储在 AmazonS3 bucket 中。</p>\n<h4><a id=\"_34\"></a><strong>简图</strong></h4>\n<p>此图显示了 ML 案例研究中的不同组件是如何相互作用的。</p>\n<p><img src=\"1\" alt=\"image.png\" /></p>\n<h4><a id=\"Amazon__40\"></a><strong>Amazon 组件和权限</strong></h4>\n<p>在运行 Spark 作业之前,应创建已上载数据集的 bucket。ML 管道还应该具有对 Amazon S3存储桶的读写访问权限。</p>\n<p>要使用 Terraform 部署所需资源,请在 EKS Terraform 文件夹中创建一个名为 ml.tf 的文件(从前面的案例研究中删除 twitter.tf)。在文件中放置以下内容。这将创建一个名为 spark ml demo 的 S3 bucket,并为角色添加正确的权限:</p>\n<pre><code class=\"lang-\">\nresource &quot;aws_s3_bucket&quot; &quot;spark-ml-demo&quot; {\n bucket = &quot;spark-ml-demo&quot;\n}\n\nresource &quot;aws_iam_role_policy_attachment&quot; &quot;s3-full-access&quot; {\n role = &quot;eksWorkerRole&quot;\n policy_arn = &quot;arn:aws:iam::aws:policy/AmazonS3FullAccess&quot;\n}\n</code></pre>\n<p>运行 terraform apply 来创建资源,然后将数据集上传到 spark ml demo bucket 中。</p>\n<h4><a id=\"Spark__59\"></a><strong>Spark 实施</strong></h4>\n<p>与 Twitter 类似,读者需要创建 sbt 项目,并将以下依赖项添加到 build.sbt 文件中:</p>\n<pre><code class=\"lang-\">\nlibraryDependencies ++= Seq(\n &quot;org.apache.spark&quot; %% &quot;spark-core&quot; % &quot;3.1.2&quot;,\n &quot;org.apache.spark&quot; %% &quot;spark-sql&quot; % &quot;3.1.2&quot;,\n &quot;org.apache.spark&quot; %% &quot;spark-mllib&quot; % &quot;3.1.2&quot;,\n &quot;software.amazon.awssdk&quot; % &quot;s3&quot; % &quot;2.17.31&quot;,\n &quot;org.apache.hadoop&quot; % &quot;hadoop-common&quot; % &quot;3.2.0&quot;,\n &quot;org.apache.hadoop&quot; % &quot;hadoop-aws&quot; % &quot;3.2.0&quot;,\n)\n</code></pre>\n<p>下面的代码将 spark ml demo S3 bucket 中的数据集(包含_1.csv,…)读取到数据帧中(假设数据集在 bucket 中被分解为多个 csv 文件)。输入被分成训练和测试数据集。训练数据集被矢量化并传递给决策树回归器拟合函数,以创建一个模型,该模型是 Spark ML 上下文中的转换器。</p>\n<p>代码中的下一步将介绍 ML 推理。它使用相同的向量汇编程序对测试数据集进行向量化,以进行训练数据转换。预测数据帧是通过在矢量化测试数据集上调用模型的转换函数生成的。结果将写回 Predicts 文件夹下的同一个 S3 bucket。</p>\n<p>请注意,在培训中仅使用特征的子集。</p>\n<pre><code class=\"lang-\">\npackage sparkml\n\nimport org.apache.spark.sql._\nimport org.apache.spark.ml.regression.DecisionTreeRegressor\nimport org.apache.spark.ml.feature.VectorAssembler\nimport org.apache.log4j._\n\nobject RealEstate {\n // this is for California dataset:\n // https://developers.google.com/machine-learning/crash-course/california-housing-data-description\n\n case class Property(longitude: Double, latitude: Double, housing_median_age: Double, total_rooms: Double,\n total_bedrooms: Double, population: Double, households: Double, median_income: Double,\n median_house_value: Double, ocean_proximity: String)\n\n def main(array: Array[String]) {\n\n // set the log level\n Logger.getLogger(&quot;org&quot;).setLevel(Level.ERROR)\n\n val spark = SparkSession\n .builder()\n .appName(&quot;realEstate&quot;)\n .getOrCreate()\n\n import spark.implicits._\n\n val realEstate = spark\n .read\n .option(&quot;header&quot;, &quot;true&quot;)\n .option(&quot;inferSchema&quot;, &quot;true&quot;)\n .csv(&quot;s3a://spark-ml-demo/housing_*.csv&quot;)\n .as[Property]\n\n val Array(trainingData, testData) = realEstate.randomSplit(Array(0.9, 0.1))\n\n val assembler = new VectorAssembler()\n .setInputCols(Array(&quot;housing_median_age&quot;, &quot;total_rooms&quot;, &quot;total_bedrooms&quot;, &quot;population&quot;, &quot;households&quot;, &quot;median_income&quot;))\n .setOutputCol(&quot;features&quot;)\n .setHandleInvalid(&quot;skip&quot;)\n\n val trainDf = assembler\n .transform(trainingData)\n .select(&quot;features&quot;, &quot;median_house_value&quot;)\n\n val dt = new DecisionTreeRegressor()\n .setLabelCol(&quot;median_house_value&quot;)\n .setFeaturesCol(&quot;features&quot;)\n\n val model = dt.fit(trainDf)\n\n val testDF = assembler\n .transform(testData)\n .select(&quot;features&quot;, &quot;median_house_value&quot;)\n\n val predictions = model.transform(testDF)\n .select(&quot;median_house_value&quot;, &quot;prediction&quot;)\n\n predictions.write.csv(&quot;s3a://spark-ml-demo/predictions&quot;)\n\n spark.stop()\n }\n}\n</code></pre>\n<p>在上面的代码中,我们只使用数字特征进行训练和推理。读者可以在输入列中添加分类功能,如 ocean_邻近性,并验证结果。</p>\n<h4><a id=\"_146\"></a><strong>在本地测试程序</strong></h4>\n<p>将原始数据集存储在计算机上,并对 Scala 脚本应用以下修改:</p>\n<p>1.将 Spark 配置中的 master 设置为本地运行</p>\n<pre><code class=\"lang-\">\nval spark = SparkSession\n.builder()\n.appName(&quot;realEstate&quot;)\n.master(&quot;local[*]&quot;)\n.getOrCreate()\n</code></pre>\n<p>2.将 S3数据集位置替换为计算机上 csv 文件的地址</p>\n<p>3.将 S3上的输出地址替换为本地目录,如/tmp</p>\n<p>现在,在本地运行代码将创建一个包含预测的 CSV 文件。在打包软件之前,请确保已还原更改。</p>\n<h4><a id=\"_164\"></a><strong>打包代码和依赖项</strong></h4>\n<p>遵循 Twitter 分析案例研究中的指导原则打包代码和依赖项。</p>\n<h4><a id=\"_Spark_168\"></a><strong>提交 Spark</strong></h4>\n<p>这个过程类似于 Twitter 分析案例研究,除了这里,我们正在处理更大的数据集和更详细的资源规划。</p>\n<p>本案例研究在7个内存优化的 r6g.4X 大型实例上运行。这些实例具有16个 vCPU 和128GiB 的内存。spark submit 命令在 EKS 集群上运行作业,假设应用程序JAR文件的名称为 spark_ml.JAR(用相应的信息替换KUBERNETES_MASTER_地址和 DOCKER_IMAGE_\t地址)。分配给驱动程序和每个执行器的 VCPU 数量设置为5,这允许3个 POD 在一台机器上运行。相应地计算执行器的数量和内存分配。</p>\n<pre><code class=\"lang-\">bin/spark-submit \\\n --class sparkml.RealEstate \\\n --master k8s://&lt;KUBERNETES_MASTER_ADDRESS&gt; \\\n --deploy-mode cluster \\\n --conf &lt;DOCKER_IMAGE_ADDRESS&gt; \\\n --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\\n --conf spark.kubernetes.driver.pod.name=&quot;spark-ml&quot; \\\n --conf spark.kubernetes.namespace=default \\\n --conf spark.executor.instances=20 \\\n --conf spark.driver.cores=5 \\\n --conf spark.executor.cores=5 \\\n --conf spark.executor.memory=34g \\\n --conf spark.driver.memory=34g \\\n --conf spark.memory.fraction=0.8 \\\n --name sparkML \\\n local:///opt/spark/jars/spark_ml.jar\n</code></pre>\n<p>要监视应用程序在 Kubernetes 群集上是否无错误运行,可以使用 Spark UI 监视 Spark 作业的不同统计信息。通过提交以下命令运行 Spark UI,并在浏览器(localhost:4040)上打开该 UI:</p>\n<pre><code class=\"lang-\">kubectl port-forward spark-ml 4040:4040\n</code></pre>\n<h4><a id=\"_194\"></a><strong>预测结果</strong></h4>\n<p>案例研究运行6分钟,并将预测结果写入多个 CSV 文件上的 S3存储桶中。CSV 文件有两列:第一列是实际房屋中值,第二列是预测值(您可以修改机器学习脚本以在 CSV 文件中包含其他相关功能)。</p>\n<pre><code class=\"lang-\">\n95800,132367.88804380846\n113800,190347.262329421\n113800,190347.262329421\n67500,104197.72693265839\n67500,104197.72693265839\n65800,104197.72693265839\n68600,104197.72693265839\n71100,190347.262329421\n53500,104197.72693265839\n71300,158820.92941131844\n59200,104197.72693265839\n59200,104197.72693265839\n</code></pre>\n<h3><a id=\"2KUber__212\"></a><strong>案例研究2:K-均值聚类:Uber 驱动程序位置</strong></h3>\n<p>本节介绍如何在 Amazon EKS 上运行的 Spark 群集上运行 K-means 群集训练和推理。</p>\n<h4><a id=\"K_216\"></a><strong>K-均值聚类算法</strong></h4>\n<p>K-means 是无监督学习领域中最流行的聚类算法。假设k为预期的簇数,该算法随机初始化称为簇质心的k个簇中心,并迭代:</p>\n<p>1.根据数据点到质心的欧几里德距离,将数据点指定给簇</p>\n<p>2.将簇质心移动到每个簇中数据点的平均值</p>\n<p>迭代将继续,直到收敛。Spark 使用 K-means 算法的一种变体,可以在集群上并行化。</p>\n<h4><a id=\"Uber__226\"></a><strong>Uber 驱动程序位置数据集</strong></h4>\n<p>K-means 案例研究使用 Uber 司机位置数据集,其中包括司机在不同日期和时间的纬度和经度。CSV 格式的数据集大小为209.7MB。因此,数据被复制以在 Amazon S3上创建大小为195.5GB 的数据集(分解为多个 CSV 文件)。它将90%的数据用于培训,其余用于测试。</p>\n<p>图、需求和依赖关系与前面的决策树示例相同。</p>\n<h4><a id=\"Spark__233\"></a><strong>Spark 实施</strong></h4>\n<p>以下是 Spark 上 K-means 集群的实现,其中K设置为16。它将结果存储回 uber predictions 文件夹下的 S3 bucket 中。</p>\n<pre><code class=\"lang-\">package spark_ml\n\nimport org.apache.spark.sql._\nimport org.apache.spark.ml.feature.VectorAssembler\nimport org.apache.log4j._\nimport org.apache.spark.ml.clustering.KMeans\n\nobject Uber {\n\n def main(array: Array[String]) {\n\n // set the log level\n Logger.getLogger(&quot;org&quot;).setLevel(Level.ERROR)\n\n val spark = SparkSession\n .builder()\n .appName(&quot;uber&quot;)\n .getOrCreate()\n\n val uber = spark\n .read\n .option(&quot;header&quot;, &quot;true&quot;)\n .option(&quot;inferSchema&quot;, &quot;true&quot;)\n .csv(&quot;s3a://spark-ml-demo/uber-raw-data*.csv&quot;)\n\n val Array(trainingData, testData) = uber.select(&quot;Lat&quot;, &quot;Lon&quot;).randomSplit(Array(0.90, 0.1))\n\n val assembler = new VectorAssembler()\n .setInputCols(Array(&quot;Lat&quot;, &quot;Lon&quot;))\n .setOutputCol(&quot;features&quot;)\n .setHandleInvalid(&quot;skip&quot;)\n\n val trainDf = assembler\n .transform(trainingData)\n\n val dt = new KMeans()\n .setK(16)\n .setFeaturesCol(&quot;features&quot;)\n .setPredictionCol(&quot;prediction&quot;)\n\n val model = dt.fit(trainDf)\n model.clusterCenters.foreach(println)\n\n val testDF = assembler\n .transform(testData)\n\n val predictions = model.transform(testDF)\n .select(&quot;Lat&quot;, &quot;Lon&quot;, &quot;prediction&quot;)\n\n predictions.write.csv(&quot;s3a://spark-ml-demo/uber-predictions&quot;)\n\n spark.stop()\n }\n}\n</code></pre>\n<h4><a id=\"_292\"></a><strong>在本地测试程序</strong></h4>\n<p>使用本地存储的 Uber 驱动程序位置数据集在您的计算机上运行代码。遵循与决策树回归相同的步骤。</p>\n<h4><a id=\"_296\"></a><strong>打包代码和依赖项</strong></h4>\n<p>遵循与 Twitter 分析案例研究中相同的准则打包代码和依赖项。</p>\n<h4><a id=\"_Spark_300\"></a><strong>提交 Spark</strong></h4>\n<p>K-means 案例研究在9个内存优化的 r6g.4X 大型实例上运行。因此,spark submit 参数中的执行器数量会相应更改:</p>\n<pre><code class=\"lang-\">bin/spark-submit \\\n --class sparkml.Uber \\\n --master k8s://&lt;KUBERNETES_MASTER_ADDRESS&gt; \\\n --deploy-mode cluster \\\n --conf &lt;DOCKER_IMAGE_ADDRESS&gt; \\\n --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\\n --conf spark.kubernetes.driver.pod.name=&quot;spark-ml&quot; \\\n --conf spark.kubernetes.namespace=default \\\n --conf spark.executor.instances=26 \\\n --conf spark.driver.cores=5 \\\n --conf spark.executor.cores=5 \\\n --conf spark.executor.memory=34g \\\n --conf spark.driver.memory=34g \\\n --conf spark.memory.fraction=0.8 \\\n --name sparkML \\\n local:///opt/spark/jars/spark_ml.jar\n</code></pre>\n<h4><a id=\"_321\"></a><strong>聚类结果</strong></h4>\n<p>该案例研究在34分钟内完成,覆盖了 Gravion2工人集群。当 Spark 作业成功终止时,结果应位于 uber prediction 文件夹中,并分解为多个 CSV 文件。CSV 文件中有三列:前两列是纬度和经度,第三列是簇编号。您还可以通过在创建模型的行之后调用 model.clusterCenters.foreach(println)来打印 cluster centroids。</p>\n<pre><code class=\"lang-\">\n40.8971,-73.8652,4 \n40.8972,-73.8653,4 \n40.8972,-73.8653,4 \n40.8973,-74.4901,15 \n40.8973,-73.8635,4 \n40.8977,-73.947,4 \n40.8977,-73.9469,4 \n40.8978,-73.9139,4 \n40.8981,-73.9717,3 \n40.8982,-74.1262,0 \n40.8982,-73.8673,4 \n40.8985,-73.901,4 \n40.8985,-73.901,4 \n40.8985,-73.9009,4 \n</code></pre>\n<h3><a id=\"_341\"></a><strong>结论</strong></h3>\n<p>在这个博客中,我们演示了两个 ML 案例研究:决策树回归和 K-均值聚类,使用大量数据集,并在 EKS Kubernetes 集群上运行 Graviton2实例。Spark 的 MLlib 帮助行业以分布式方式在 TB 数据上运行机器学习管道(转换、存储、学习和预测)。当数据大小变大并且ML算法需要更多资源时,工作人员的数量会增加。Amazon 的研究另外,较低的每小时 Gravion2实例为用户提供了高达30%的成本降低。当数十个或数百个工作节点长时间运行以执行 ML 计算时,这将为用户带来巨大的好处。</p>\n<p>有关采用基于 Arm 处理器的客户案例,请访问 Amazon Graviton 页面。有关在 Arm Neoverse 平台上运行的软件工作负载的任何查询,请随时联系 sw-ecosystem@arm.com.</p>\n<p><a href=\"https://community.arm.com/arm-community-blogs/b/infrastructure-solutions-blog/posts/spark-with-machine-learning-on-aws-graviton2\" target=\"_blank\">阅读原文</a></p>\n"}
目录
亚马逊云科技解决方案 基于行业客户应用场景及技术领域的解决方案
联系亚马逊云科技专家
亚马逊云科技解决方案
基于行业客户应用场景及技术领域的解决方案
联系专家
0
目录
关闭