{"value":"Masoud Koleini 2021年11月16日 \n\n### **介绍**\n\nApacheSpark 是一个通用框架,用于在机器集群上进行分布式计算。数据可以存储在分布式数据库(如 ApacheHadoop 分布式文件系统)中,也可以通过流模块收集。然后,数据可以由运行在一组 worker 上的 Spark 执行器进行处理。Spark 可以使用自己的集群管理器独立运行,也可以在 ApacheMesos、Hadoop Thread 或 Kubernetes 上运行。\n\nAmazon Graviton2 处理器使用64位 Arm Neoverse 内核。与基于x86的实例相比,由 Graviron2处理器提供支持的 Amazon EC2实例提供了更好的性价比。Amazon 最近的一份报告显示,与 Graviton2工人一起在EKS上运行 Spark 时,性能提高了15%,价格降低了30%。本博客解释了在 Kubernetes 集群上使用 Spark Streaming 和基于 Graviton2的 worker 实例实现两个案例研究。共享技术细节和最佳实践,以帮助用户克服在实现类似应用程序时遇到的问题。\n\n### **Kubernetes:一个容器编排系统**\n\nKubernetes 是一个流行的开源框架,用于部署、扩展和管理容器化应用程序。Kubernetes 集群由以下部分组成:\n\n控制平面:通过调度应用程序、保留其所需状态以及管理其扩展和更新来管理集群。这是通过接收提交的用户请求的 API 服务器来处理的。此外,集群节点通过与控制平面通信来同步其状态。\n\n工作节点:由在每个工作节点上运行的名为 Kubelet 的代理管理的主机吊舱(应用程序)。Kubelet 处理与 API 服务器的控制平面通信,并通过 CRI(容器运行时接口)根据所需状态管理容器生命周期。\n\nKubernetes 附带了一个名为 kubectl 的便捷工具,这是一个与 API 服务器通信的命令行接口实用程序。kubectl 命令行参数允许用户直接描述他们想要部署的资源。此外,用户可以在 yaml 文件中表达所有需求,并通过 kubectl 运行该文件进行部署。\n\n![image.png](1)\n\n图1。使用 Docker 作为容器运行时的 Kubernetes 群集\n\nKubernetes 是 Spark 支持的集群管理器之一。Spark 在 Kubernetes 集群上运行作业的方式是首先在一个工作节点上创建一个驱动程序吊舱。然后,驱动程序 po d根据请求的资源生成执行程序p od。作业完成后,所有 executor pod 终止,驱动程序 pod 移动到完成状态,允许用户访问日志以进行调试。\n\nSpark 二进制文件附带一个名为 Spark submit 的工具。它可以与多个集群管理器对话以运行作业。传递给 spark submit 的参数定义了群集管理器的类型、其 API 服务器的地址、执行器的数量、驱动程序和执行器资源、容器映像地址、应用程序 JAR 文件以及其他功能参数。在接下来的部分中,我们将使用 spark submit 在 Kubernetes 集群上部署 spark 应用程序。\n\n\n\n### **亚马逊弹性 Kubernetes 服务**\n\nAmazon EKS 是 Amazon 提供的一项托管 Kubernetes 服务。本节介绍如何使用 Terraform 以编程方式创建基于 Graviton2的 EKS 集群。我们建议读者在继续阅读本文档的其余部分之前对 Terraform 有所了解。\n\n### **使用 Terraform 的 EKS 群集资源调配**\n\nHashiCorp 提供了关于如何在 Amazon 上提供 EKS 集群的文档。Terraform 脚本在 github 存储库中可用。但是,它可能需要一些调整才能在具有不同配置的 Amazon 帐户上运行。\n\n当前工作组参数默认为启动x86实例。运行基于 Gravion2的实例需要进行一些更改:\n\n确定要运行集群的 EKS 支持的 Kubernetes 版本。您可以在 eks-cluster.tf 中设置版本。\n\n确定 Kubernetes 版本的最佳 Amazon Linux AMI\n\n使用以下信息更新工作组参数(以下是使用内存优化实例发布博客时Kubernetes 1.21.2的示例)\n```\n\nname = \"eks-nodes-aarch64\"\n ami_id = \"ami-08899b9102b960c9c\"\n instance_type = \"r6g.4xlarge\"\n```\n### **Spark 驱动服务帐户**\n\nSpark 驱动程序应具有在工作实例上生成执行器的正确权限,同时在其中一个 Kubernetes 节点上作为 pod 运行。用户可以为 Spark 创建 Kubernetes 服务帐户和 ClusterRoleBinding,可以在运行 Spark 作业时将其分配给驱动程序。使用 Spark 的文档创建帐户并使用 kubectl 进行绑定。\n\n### **微调集群**\n\n要在保持资源优化的同时管理成本,微调集群至关重要。以下是一些调整和最佳实践,您可以遵循这些调整和最佳实践来充分利用 Kubernetes 群集:\n\n标记工作组节点,并在具有特定标签的实例集上运行 Spark 驱动程序和执行器。此外,监视和度量服务器最好在单独的机器上运行,而不是与执行者共享资源。\n\n动态生成工作节点,并在作业完成时销毁它们。这可能会增加工作流的复杂性,并需要正确的节点配置才能加入集群。但它不允许空闲实例在集群内运行,从而节省了大量资金。\n\n在生产环境中运行之前,应根据具体情况计算每个 Spark 作业所需的资源。为作业分配太多的资源将导致更高的成本,而资源不足可能导致作业在运行数小时后失败。\n\n经过良好调优的 Spark 作业应该在 UI 中显示很少的失败任务。请参阅 Tuning Spark 文档和 Amazon 最佳实践,以了解有关如何优化群集的指导原则。我们遵循类似的原则来配置集群中的资源、每个执行器的 VCPU 以及执行器内存计算。\n\n### **案例研究1:最流行的 hashtag**\n\n本案例研究解释了如何在 EKS 集群中使用 Spark Streaming 部署 Twitter 分析作业。本节演示如何在 Scala 中编写作业,将其打包为 JAR 文件,创建包含应用程序的 Spark docker 映像,并在 EKS 集群中部署容器。\n\n我们演示了部署两种常用的 tweet 分析场景:查找最流行的 hashtag 和 tweet 的情绪分析。\n\n### **要求**\n\nTwitter 开发者帐户:实现流媒体应用程序的第一步是创建一个Twitter 开发者帐户。您必须首先使用开发者门户创建项目和应用程序。然后,创建 API 密钥、API 密钥、访问令牌和访问令牌密钥,以验证应用程序并读取推文。\n\n请注意,Twitter 对应用程序应用了速率限制,以提供可靠的服务。因此,一个执行器就足以运行两个流式用例。\n\n### **使用 Spark 流处理推文**\n\nSpark Streaming 是 Spark 中的一个 API,用于可靠地处理高通量数据流,如 Kafka、Amazon Kinesis、HDFS/S3、Flume 和 Twitter。它将输入流拆分为小批量,并通过 Spark 引擎运行它们,从而创建一个处理数据的批量流。数据流的高级抽象称为 Dstream(离散化流),它在内部被映射为一系列 RDD(弹性分布式数据集)。\n\nSpark 在 Spark SQL 之上附带了另一个流 API,称为结构化流。它允许数据以数据集/数据帧(RDD 之上的 API)的形式呈现,并允许在流数据上优化 Spark SQL 引擎处理。但是,它不包括 Twitter 连接器,并且在通过窗口处理聚合数据时有一些限制。因此,为了简单起见,我们在本案例研究中使用 Spark Streaming API。\n\n下图显示了实时 Twitter 分析应用程序中的组件如何交互:\n\n![image.png](2)\n\n图2。实时 Twitter 分析应用程序中的组件\n\nSpark Streaming API 以窗口方式从 Twitter API 读取推特流。Spark 引擎按窗口运行作业,并将结果发送到 Kinesis 流,该流由Kinesis 流的订户使用。\n\n### **Amazon 组件和权限**\n\n本案例研究需要以下 Amazon 组件和权限:\n\n1.将输出流写入 Kinesis 数据流的权限\n\n2.执行器写入输出流的写入权限\n\n3.用于检查点的 S3 bucket\n\n\n4.出于检查点目的对 S3进行读/写的权限\n\n如果使用 HashiCorp Terraform 脚本创建 EKS 集群,请采取以下步骤添加所需资源:\n\n首先,将工作人员角色名称添加到 eks 模块:\n```\nworkers_role_name = \"eksWorkerRole\"\n```\n然后,在同一文件夹中创建一个文件 twitter.tf,添加以下内容,并运行 terraform apply 在 Amazon 环境中创建所需的组件:\n```\nresource \"aws_kinesis_stream\" \"spark-analysis-stream\" {\n name = \"spark_analysis_stream\"\n shard_count = 1\n\n tags = {\n // required tags\n }\n}\n\nresource \"aws_iam_role_policy_attachment\" \"spark-kinesis-access\" {\n role = \"eksWorkerRole\"\n policy_arn = \"arn:aws:iam::aws:policy/AmazonKinesisFullAccess\"\n}\n\nresource \"aws_s3_bucket\" \" spark-streaming-checkpoints\" {\n bucket = \"spark-streaming-checkpoints\"\n}\n\nresource \"aws_iam_role_policy_attachment\" \"s3-full-access\" {\n role = \"eksWorkerRole\"\n policy_arn = \"arn:aws:iam::aws:policy/AmazonS3FullAccess\"\n}\n\n```\n确保 terraform 在没有抛出任何错误的情况下完成请求。\n\n### **Spark 实施**\n\n要求:读者应熟悉 Scala 编程和 sbt 项目。使用 IDE(如 IntelliJ)或命令行创建 sbt 项目。\n\n创建 Scala sbt 项目,并更新 build.sbt 以将以下依赖项添加到项目中(使用 Scala 版本2.12.12进行测试):\n```\n\nlibraryDependencies ++= Seq(\n \"org.apache.spark\" %% \"spark-core\" % \"3.1.2\",\n \"org.apache.spark\" %% \"spark-sql\" % \"3.1.2\",\n \"org.apache.spark\" %% \"spark-streaming\" % \"3.1.2\",\n \"org.apache.spark\" %% \"spark-streaming-kinesis-asl\" % \"3.1.2\",\n \"org.apache.bahir\" %% \"spark-streaming-twitter\" % \"2.4.0\",\n \"org.twitter4j\" % \"twitter4j-core\" % \"4.0.4\",\n \"org.twitter4j\" % \"twitter4j-stream\" % \"4.0.4\",\n \"edu.stanford.nlp\" % \"stanford-corenlp\" % \"4.2.1\",\n \"edu.stanford.nlp\" % \"stanford-corenlp\" % \"4.2.1\" classifier \"models\",\n \"software.amazon.awssdk\" % \"kinesis\" % \"2.17.31\",\n \"org.apache.hadoop\" % \"hadoop-common\" % \"3.2.0\",\n \"org.apache.hadoop\" % \"hadoop-aws\" % \"3.2.0\",\n)\n```\n使用下面的示例 scala 代码在 bigdata 包(src/Main/scala/bigdata)中创建 Main.scala。代码在1800秒大小的滑动窗口上,每10秒移动一次,找到5个最流行的英文推文标签。它将结果作为 JSON 对象写入 Kinesis 流,供订阅者使用。\n```\npackage bigdata\n\nimport java.nio.charset.StandardCharsets.UTF_8\n\n// AWS imports\nimport software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider\nimport software.amazon.awssdk.core.SdkBytes\nimport software.amazon.awssdk.regions.Region\nimport software.amazon.awssdk.services.kinesis.KinesisAsyncClient\nimport software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;\nimport software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;\n\n// Log imports\nimport org.apache.log4j._\nimport org.apache.spark.streaming._\nimport org.apache.spark.streaming.twitter._\n\n// Twitter API imports\nimport twitter4j.auth.OAuthAuthorization\nimport twitter4j.conf.ConfigurationBuilder\n\n// Spark imports\nimport org.apache.spark._\n\nobject Main extends App {\n\n // Create output JSON to send to output\n def createJSON(tuples: List[(String, Int)]): String = tuples match {\n case Nil => \"{}\"\n case (x, y) :: Nil => \"\\\"\" + x + \"\\\": \" + y.toString\n case (x, y) :: tail => \"\\\"\" + x + \"\\\": \" + y.toString + \", \" + createJSON(tail)\n }\n\n def createRecordEntry(record: String, partition_key: String, stream_name: String): PutRecordsRequest = {\n val recordEntry = PutRecordsRequestEntry\n .builder()\n .data(SdkBytes.fromString(record, UTF_8))\n .partitionKey(partition_key)\n .build()\n\n val putRecordsRequest = PutRecordsRequest\n .builder()\n .streamName(stream_name)\n .records(recordEntry)\n .build();\n\n putRecordsRequest\n }\n\n // Set the log level to only print errors\n Logger.getLogger(\"org\").setLevel(Level.ERROR)\n\n // Read Twitter credentials from the environment variables\n System.setProperty(\"twitter4j.oauth.consumerKey\", sys.env(\"TWITTER_CONSUMER_KEY\"))\n System.setProperty(\"twitter4j.oauth.consumerSecret\", sys.env(\"TWITTER_CONSUMER_SECRET\"))\n System.setProperty(\"twitter4j.oauth.accessToken\", sys.env(\"TWITTER_ACCESS_TOKEN\"))\n System.setProperty(\"twitter4j.oauth.accessTokenSecret\", sys.env(\"TWITTER_TOKEN_SECRET\"))\n\n // Kinesis streams parameters\n val kinesisStream = \"spark_analysis_stream\"\n val partitionKey = \"data\"\n val kinesisRegion = \"us-east-1\"\n\n // Sliding window parameters\n val slideIntervalSec = 10\n val windowLengthSec = 1800\n\n // Directory to output top hashtags\n val outputDirectory = \"./twitter\"\n\n val slideInterval = Seconds(slideIntervalSec)\n val windowLength = Seconds(windowLengthSec)\n\n // set AWS kinesis client\n val kinesisClient = KinesisAsyncClient\n .builder()\n .region(Region.of(kinesisRegion))\n .build()\n\n // Setup the SparkConfig and StreamingContext\n val conf = new SparkConf().setAppName(\"twitterAnalysis\")\n val ssc = new StreamingContext(conf, Seconds(1))\n\n val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))\n val twitterStream = TwitterUtils.createStream(ssc, auth)\n\n // Analyse English tweets only\n val tweet = twitterStream.filter(_.getLang == \"en\").map(_.getText)\n\n // Finding the topmost popular hashtags\n val hashTagStream = tweet.flatMap(_.split(\" \")).filter(_.startsWith(\"#\")).map(w => (w, 1))\n val windowedHashTagCount = hashTagStream.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), (x: Int, y: Int) => (x - y), windowLength, slideInterval)\n val orderedHashTags = windowedHashTagCount.transform(rdd => {\n val list = rdd.sortBy(_._2, false).take(5)\n kinesisClient.putRecords(createRecordEntry(\"{\" + createJSON(list.toList) + \"}\", partitionKey, kinesisStream))\n rdd\n })\n orderedHashTags.print()\n\n ssc.checkpoint(\"s3://spark-streaming-checkpoints/checkpoints\")\n ssc.start()\n ssc.awaitTermination()\n}\n```\n### **在本地测试程序**\n\n您可以在本地运行该程序,以确认结果是否符合预期。但是,您必须对上面的 Scala 程序进行轻微修改(并在创建包之前将其还原)\n\n1.将 Spark 配置中的 master 设置为本地运行\n```\nval conf = new SparkConf().setMaster(\"local[*]\").setAppName(\"twitterAnalysis\")\n```\n2.向 Kinesis 客户端添加 Amazon 凭据(profileName 是您要使用的 Amazon 配置文件)\n```\n\nval kinesisClient = KinesisAsyncClient\n.builder()\n.region(Region.of(kinesisRegion))\n .credentialsProvider(ProfileCredentialsProvider.create(profileName))\n .build()\n```\n3.将 S3上的检查点地址替换为本地目录,如/tmp\n\n现在应该可以在本地机器上运行基准测试了。使用以下 Python 脚本从 Kinesis 流中读取最流行的 hashtags:\n```\nimport boto3\nimport json\nfrom datetime import datetime\nimport calendar\nimport random\nimport time\n\nmy_stream_name = 'spark_analysis_stream'\n\nkinesis_client = boto3.client('kinesis', region_name='us-east-1')\n\nresponse = kinesis_client.describe_stream(StreamName=my_stream_name)\n\nmy_shard_id = response['StreamDescription']['Shards'][0]['ShardId']\n\nshard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,\n ShardId=my_shard_id,\n ShardIteratorType='LATEST')\n\nmy_shard_iterator = shard_iterator['ShardIterator']\n\nrecord_response = kinesis_client.get_records(ShardIterator=my_shard_iterator)\n\nrecord_num = 0\nwhile 'NextShardIterator' in record_response:\n record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'])\n\n for record in record_response['Records']:\n print(record_num, record['Data'].decode('utf-8'))\n record_num = record_num + 1\n```\n假设上述脚本存储为 kinesis_read.py,则运行此命令以获取5条最流行的推文:\n```\nAWS_PROFILE=<AWS_PROFILE_NAME> python ./kinesis_read.py\n```\n### **打包代码和依赖项**\n\n由于您要构建一个在 Gravion2机器上运行的容器,因此需要在基于 Amazon Gravion2的实例上构建该容器。\n\n首先,使用 sbt assembly 创建一个包含包及其依赖项的 JAR 文件,不包括特定于 Spark 的 JAR 文件(IntelliJ 用户可以按照本文对其应用程序进行打包)。接下来,下载Spark 3.1.2并将应用程序 JAR 文件复制到'jars'目录中。\n\n接下来,构建 Spark 容器映像并将其推送到 AmazonElasticContainerRegistry,以便工作节点可以拉取它来创建 Spark 容器。\n\n### **提交 Spark**\n\n现在,是将作业提交到 EKS 集群的时候了。应用程序由一个执行器提交,并使用默认配置。这不是一个需要大量资源的计算,因为进入流的 tweet 数量是有限的。在运行 spark submit 之前,请更换 spark submit 命令中的以下参数:\n\n1.KUBERNETES_主地址:配置文件中的 KUBERNETES 主地址(默认配置文件位置为~/.kube/config)\n\n2.DOCKER_IMAGE_ADDRESS:推送到 Amazon ECR 的 Spark DOCKER 映像的地址\n\n3.Twitter API keys\n\n\n假设应用程序 JAR 文件名为bigdata-assembly-0.1.JAR,在替换所需参数后,在下载的 Spark 文件夹中运行以下命令。请注意,RDD 复制至少需要两个执行器来保持容错性:\n```\nbin/spark-submit \\\n --class bigdata.Main \\\n --master k8s://<KUBERNETES_MASTER_ADDRESS> \\\n --deploy-mode cluster \\\n --conf spark.executor.instances=2 \\\n --conf <DOCKER_IMAGE_ADDRESS> \\\n --conf spark.kubernetes.driver.pod.name=\"twitter\" \\\n --conf spark.kubernetes.namespace=default \\\n --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\\n --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_KEY> \\\n --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.kubernetes.driverEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \\\n --conf spark.kubernetes.driverEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \\\n --conf spark.executorEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.executorEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.executorEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \\\n --conf spark.executorEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \\\n --name sparkTwitter \\\n local:///opt/spark/jars/bigdata-assembly-0.1.jar\n```\n它应该在没有错误中断的情况下开始运行。\n\n运行以下命令以确保驱动程序和执行器吊舱正常工作:\n```\n\n$ kubectl get pods\nNAME READY STATUS RESTARTS AGE\nspark-twitter 1/1 Running 0 6m1s\ntwitteranalysis-c435bf7c50edfbbe-exec-1 1/1 Running 0 5m54s\n```\nPython 脚本的输出(在上一节中介绍)类似于以下内容:\n```\n$ AWS_PROFILE=default python ./kinesis_read.py\n \n0 {\"#AdBor\": 3, \"#TAMED_DASHED\": 2, \"#etsy\": 2, \"#KingdomHearts\": 2, \"#GOT7\": 2}\n1 {\"#etsy\": 3, \"#MACLovesLisa\": 3, \"#Airforce_Result_Do\": 3, \"#sora\": 3, \"#AdBor\": 3}\n2 {\"#Airforce_Result_Do\": 4, \"#sora\": 4, \"#etsy\": 3, \"#MACLovesLisa\": 3, \"#kingdom\": 3}\n3 {\"#sora\": 5, \"#MACLovesLisa\": 4, \"#Airforce_Result_Do\": 4, \"#etsy\": 3, \"#kingdom\": 3}\n```\n### **案例研究2:推特的情绪分析**\n\n另一个有趣的例子是使用已经训练过的模型对 Spark Streaming connector 接收到的推文进行情感分析。这一次,不是寻找最流行的 hashtag,而是计算滑动窗口上不同情绪的数量并发送到 Kinesis 流。\n\n该存储库使用 StanfordNLP 库实现了 Twitter 情绪分析。只需将此 scala 文件添加到包的目录下,并替换文件中的包名。核心依赖项已经添加到 sbt 文件中,因此它们应该已经被提取到依赖项目录中。\n\n更改主 Scala 对象很简单。此代码段替换 Main.scala 中特定于哈希标记的计算代码:\n```\n\n// transform sentiments into string\n import bigdata.SentimentAnalysisUtils._\n\n def sentimentToString(sentiment: SENTIMENT_TYPE): String = {\n sentiment match {\n case NOT_UNDERSTOOD => \"NOT_UNDERSTOOD\"\n case VERY_NEGATIVE => \"VERY_NEGATIVE\"\n case NEGATIVE => \"NEGATIVE\"\n case NEUTRAL => \"NEUTRAL\"\n case POSITIVE => \"POSITIVE\"\n case VERY_POSITIVE => \"VERY_POSITIVE\"\n }\n }\n\n val sentiments = tweet.map(x => (SentimentAnalysisUtils.detectSentiment(x), 1))\n val windowedSentimentCount = sentiments.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), (x: Int, y: Int) => (x - y), windowLength, slideInterval)\n val sentimentsCount = windowedSentimentCount.transform( rdd => {\n val list = rdd.sortBy(_._2, false).take(5).map(x => (sentimentToString(x._1), x._2))\n\n kinesisClient.putRecords(createRecordEntry(\"{\" + createJSON(list.toList) + \"}\", partitionKey, kinesisStream))\n rdd\n })\n\n sentimentsCount.print()\n```\n按照与前面案例研究相同的步骤构建 JAR 文件、创建 Spark 容器和提交作业。由于情感分析由于更高的计算要求而使用更多的资源,因此在本案例研究中使用了5个工作节点。以下是在群集上运行作业的 spark submit 命令:\n\n```\nbin/spark-submit \\\n --class bigdata.Main \\\n --master k8s://<KUBERNETES_MASTER_ADDRESS> \\\n --deploy-mode cluster \\\n --conf spark.executor.instances=14 \\\n --conf <DOCKER_IMAGE_ADDRESS> \\\n --conf spark.kubernetes.driver.pod.name=\"twitter\" \\\n --conf spark.kubernetes.namespace=default \\\n --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\\n --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_KEY> \\\n --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.kubernetes.driverEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \\\n --conf spark.kubernetes.driverEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \\\n --conf spark.executorEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.executorEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.executorEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \\\n --conf spark.executorEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \\\n --conf spark.executor.cores=5 \\\n --conf spark.driver.cores=5 \\\n --conf spark.driver.memory=34g \\\n --conf spark.executor.memory=34g \\\n --conf spark.memory.fraction=0.8 \\\n --name sparkTwitter \\\n local:///opt/spark/jars/bigdata-assembly-0.1.jar\n```\n运行同一个 Python 脚本,通过从 Kinesis 流读取数据,显示如下输出:\n```\n$ AWS_PROFILE=default python ./kinesis_read.py\n \n0 {\"NEUTRAL\": 552, \"NEGATIVE\": 179, \"POSITIVE\": 70, \"NOT_UNDERSTOOD\": 1}\n1 {\"NEUTRAL\": 1563, \"NEGATIVE\": 537, \"POSITIVE\": 200, \"VERY_POSITIVE\": 2, \"NOT_UNDERSTOOD\": 2}\n2 {\"NEUTRAL\": 3982, \"NEGATIVE\": 1356, \"POSITIVE\": 536, \"NOT_UNDERSTOOD\": 4, \"VERY_POSITIVE\": 3}\n```\n在前面的代码中,我们只使用数字特征进行训练和推理。读者可以在输入列中添加分类功能,如ocean_邻近性,并验证结果。\n\n### **结论**\n\n在这个博客中,我们回顾了两个实时 Twitter 分析案例研究,使用 EKS 上的 Spark Streaming 和基于 Graviron2的实例:查找最流行的 hashtag 和对推特的情绪分析。与基于 x86的实例相比,基于 Amazon Gravion2的实例提供更低的每小时价格,同时提供有竞争力的性能。通常,Spark 作业可能需要在集群中的大量工作节点上运行。因此,在存在大量数据集和需要大量资源的计算的情况下,降低每个实例的成本可以显著降低运行 Spark 作业的成本。\n\n有关采用基于 Arm 处理器的客户案例,请访问 Amazon Graviton 页面。有关在 Arm Neoverse 平台上运行的软件工作负载的任何查询,请随时联系sw-ecosystem@arm.com.\n[https://aws.amazon.com/ec2/graviton/](https://aws.amazon.com/ec2/graviton/)\n\n[阅读原文](https://community.arm.com/arm-community-blogs/b/infrastructure-solutions-blog/posts/spark-on-aws-graviton2-real-time-analysis-using-spark-streaming)","render":"<p>Masoud Koleini 2021年11月16日</p>\n<h3><a id=\"_2\"></a><strong>介绍</strong></h3>\n<p>ApacheSpark 是一个通用框架,用于在机器集群上进行分布式计算。数据可以存储在分布式数据库(如 ApacheHadoop 分布式文件系统)中,也可以通过流模块收集。然后,数据可以由运行在一组 worker 上的 Spark 执行器进行处理。Spark 可以使用自己的集群管理器独立运行,也可以在 ApacheMesos、Hadoop Thread 或 Kubernetes 上运行。</p>\n<p>Amazon Graviton2 处理器使用64位 Arm Neoverse 内核。与基于x86的实例相比,由 Graviron2处理器提供支持的 Amazon EC2实例提供了更好的性价比。Amazon 最近的一份报告显示,与 Graviton2工人一起在EKS上运行 Spark 时,性能提高了15%,价格降低了30%。本博客解释了在 Kubernetes 集群上使用 Spark Streaming 和基于 Graviton2的 worker 实例实现两个案例研究。共享技术细节和最佳实践,以帮助用户克服在实现类似应用程序时遇到的问题。</p>\n<h3><a id=\"Kubernetes_8\"></a><strong>Kubernetes:一个容器编排系统</strong></h3>\n<p>Kubernetes 是一个流行的开源框架,用于部署、扩展和管理容器化应用程序。Kubernetes 集群由以下部分组成:</p>\n<p>控制平面:通过调度应用程序、保留其所需状态以及管理其扩展和更新来管理集群。这是通过接收提交的用户请求的 API 服务器来处理的。此外,集群节点通过与控制平面通信来同步其状态。</p>\n<p>工作节点:由在每个工作节点上运行的名为 Kubelet 的代理管理的主机吊舱(应用程序)。Kubelet 处理与 API 服务器的控制平面通信,并通过 CRI(容器运行时接口)根据所需状态管理容器生命周期。</p>\n<p>Kubernetes 附带了一个名为 kubectl 的便捷工具,这是一个与 API 服务器通信的命令行接口实用程序。kubectl 命令行参数允许用户直接描述他们想要部署的资源。此外,用户可以在 yaml 文件中表达所有需求,并通过 kubectl 运行该文件进行部署。</p>\n<p><img src=\"1\" alt=\"image.png\" /></p>\n<p>图1。使用 Docker 作为容器运行时的 Kubernetes 群集</p>\n<p>Kubernetes 是 Spark 支持的集群管理器之一。Spark 在 Kubernetes 集群上运行作业的方式是首先在一个工作节点上创建一个驱动程序吊舱。然后,驱动程序 po d根据请求的资源生成执行程序p od。作业完成后,所有 executor pod 终止,驱动程序 pod 移动到完成状态,允许用户访问日志以进行调试。</p>\n<p>Spark 二进制文件附带一个名为 Spark submit 的工具。它可以与多个集群管理器对话以运行作业。传递给 spark submit 的参数定义了群集管理器的类型、其 API 服务器的地址、执行器的数量、驱动程序和执行器资源、容器映像地址、应用程序 JAR 文件以及其他功能参数。在接下来的部分中,我们将使用 spark submit 在 Kubernetes 集群上部署 spark 应用程序。</p>\n<h3><a id=\"_Kubernetes__28\"></a><strong>亚马逊弹性 Kubernetes 服务</strong></h3>\n<p>Amazon EKS 是 Amazon 提供的一项托管 Kubernetes 服务。本节介绍如何使用 Terraform 以编程方式创建基于 Graviton2的 EKS 集群。我们建议读者在继续阅读本文档的其余部分之前对 Terraform 有所了解。</p>\n<h3><a id=\"_Terraform__EKS__32\"></a><strong>使用 Terraform 的 EKS 群集资源调配</strong></h3>\n<p>HashiCorp 提供了关于如何在 Amazon 上提供 EKS 集群的文档。Terraform 脚本在 github 存储库中可用。但是,它可能需要一些调整才能在具有不同配置的 Amazon 帐户上运行。</p>\n<p>当前工作组参数默认为启动x86实例。运行基于 Gravion2的实例需要进行一些更改:</p>\n<p>确定要运行集群的 EKS 支持的 Kubernetes 版本。您可以在 eks-cluster.tf 中设置版本。</p>\n<p>确定 Kubernetes 版本的最佳 Amazon Linux AMI</p>\n<p>使用以下信息更新工作组参数(以下是使用内存优化实例发布博客时Kubernetes 1.21.2的示例)</p>\n<pre><code class=\"lang-\">\nname = "eks-nodes-aarch64"\n ami_id = "ami-08899b9102b960c9c"\n instance_type = "r6g.4xlarge"\n</code></pre>\n<h3><a id=\"Spark__49\"></a><strong>Spark 驱动服务帐户</strong></h3>\n<p>Spark 驱动程序应具有在工作实例上生成执行器的正确权限,同时在其中一个 Kubernetes 节点上作为 pod 运行。用户可以为 Spark 创建 Kubernetes 服务帐户和 ClusterRoleBinding,可以在运行 Spark 作业时将其分配给驱动程序。使用 Spark 的文档创建帐户并使用 kubectl 进行绑定。</p>\n<h3><a id=\"_53\"></a><strong>微调集群</strong></h3>\n<p>要在保持资源优化的同时管理成本,微调集群至关重要。以下是一些调整和最佳实践,您可以遵循这些调整和最佳实践来充分利用 Kubernetes 群集:</p>\n<p>标记工作组节点,并在具有特定标签的实例集上运行 Spark 驱动程序和执行器。此外,监视和度量服务器最好在单独的机器上运行,而不是与执行者共享资源。</p>\n<p>动态生成工作节点,并在作业完成时销毁它们。这可能会增加工作流的复杂性,并需要正确的节点配置才能加入集群。但它不允许空闲实例在集群内运行,从而节省了大量资金。</p>\n<p>在生产环境中运行之前,应根据具体情况计算每个 Spark 作业所需的资源。为作业分配太多的资源将导致更高的成本,而资源不足可能导致作业在运行数小时后失败。</p>\n<p>经过良好调优的 Spark 作业应该在 UI 中显示很少的失败任务。请参阅 Tuning Spark 文档和 Amazon 最佳实践,以了解有关如何优化群集的指导原则。我们遵循类似的原则来配置集群中的资源、每个执行器的 VCPU 以及执行器内存计算。</p>\n<h3><a id=\"1_hashtag_65\"></a><strong>案例研究1:最流行的 hashtag</strong></h3>\n<p>本案例研究解释了如何在 EKS 集群中使用 Spark Streaming 部署 Twitter 分析作业。本节演示如何在 Scala 中编写作业,将其打包为 JAR 文件,创建包含应用程序的 Spark docker 映像,并在 EKS 集群中部署容器。</p>\n<p>我们演示了部署两种常用的 tweet 分析场景:查找最流行的 hashtag 和 tweet 的情绪分析。</p>\n<h3><a id=\"_71\"></a><strong>要求</strong></h3>\n<p>Twitter 开发者帐户:实现流媒体应用程序的第一步是创建一个Twitter 开发者帐户。您必须首先使用开发者门户创建项目和应用程序。然后,创建 API 密钥、API 密钥、访问令牌和访问令牌密钥,以验证应用程序并读取推文。</p>\n<p>请注意,Twitter 对应用程序应用了速率限制,以提供可靠的服务。因此,一个执行器就足以运行两个流式用例。</p>\n<h3><a id=\"_Spark__77\"></a><strong>使用 Spark 流处理推文</strong></h3>\n<p>Spark Streaming 是 Spark 中的一个 API,用于可靠地处理高通量数据流,如 Kafka、Amazon Kinesis、HDFS/S3、Flume 和 Twitter。它将输入流拆分为小批量,并通过 Spark 引擎运行它们,从而创建一个处理数据的批量流。数据流的高级抽象称为 Dstream(离散化流),它在内部被映射为一系列 RDD(弹性分布式数据集)。</p>\n<p>Spark 在 Spark SQL 之上附带了另一个流 API,称为结构化流。它允许数据以数据集/数据帧(RDD 之上的 API)的形式呈现,并允许在流数据上优化 Spark SQL 引擎处理。但是,它不包括 Twitter 连接器,并且在通过窗口处理聚合数据时有一些限制。因此,为了简单起见,我们在本案例研究中使用 Spark Streaming API。</p>\n<p>下图显示了实时 Twitter 分析应用程序中的组件如何交互:</p>\n<p><img src=\"2\" alt=\"image.png\" /></p>\n<p>图2。实时 Twitter 分析应用程序中的组件</p>\n<p>Spark Streaming API 以窗口方式从 Twitter API 读取推特流。Spark 引擎按窗口运行作业,并将结果发送到 Kinesis 流,该流由Kinesis 流的订户使用。</p>\n<h3><a id=\"Amazon__91\"></a><strong>Amazon 组件和权限</strong></h3>\n<p>本案例研究需要以下 Amazon 组件和权限:</p>\n<p>1.将输出流写入 Kinesis 数据流的权限</p>\n<p>2.执行器写入输出流的写入权限</p>\n<p>3.用于检查点的 S3 bucket</p>\n<p>4.出于检查点目的对 S3进行读/写的权限</p>\n<p>如果使用 HashiCorp Terraform 脚本创建 EKS 集群,请采取以下步骤添加所需资源:</p>\n<p>首先,将工作人员角色名称添加到 eks 模块:</p>\n<pre><code class=\"lang-\">workers_role_name = "eksWorkerRole"\n</code></pre>\n<p>然后,在同一文件夹中创建一个文件 twitter.tf,添加以下内容,并运行 terraform apply 在 Amazon 环境中创建所需的组件:</p>\n<pre><code class=\"lang-\">resource "aws_kinesis_stream" "spark-analysis-stream" {\n name = "spark_analysis_stream"\n shard_count = 1\n\n tags = {\n // required tags\n }\n}\n\nresource "aws_iam_role_policy_attachment" "spark-kinesis-access" {\n role = "eksWorkerRole"\n policy_arn = "arn:aws:iam::aws:policy/AmazonKinesisFullAccess"\n}\n\nresource "aws_s3_bucket" " spark-streaming-checkpoints" {\n bucket = "spark-streaming-checkpoints"\n}\n\nresource "aws_iam_role_policy_attachment" "s3-full-access" {\n role = "eksWorkerRole"\n policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"\n}\n\n</code></pre>\n<p>确保 terraform 在没有抛出任何错误的情况下完成请求。</p>\n<h3><a id=\"Spark__138\"></a><strong>Spark 实施</strong></h3>\n<p>要求:读者应熟悉 Scala 编程和 sbt 项目。使用 IDE(如 IntelliJ)或命令行创建 sbt 项目。</p>\n<p>创建 Scala sbt 项目,并更新 build.sbt 以将以下依赖项添加到项目中(使用 Scala 版本2.12.12进行测试):</p>\n<pre><code class=\"lang-\">\nlibraryDependencies ++= Seq(\n "org.apache.spark" %% "spark-core" % "3.1.2",\n "org.apache.spark" %% "spark-sql" % "3.1.2",\n "org.apache.spark" %% "spark-streaming" % "3.1.2",\n "org.apache.spark" %% "spark-streaming-kinesis-asl" % "3.1.2",\n "org.apache.bahir" %% "spark-streaming-twitter" % "2.4.0",\n "org.twitter4j" % "twitter4j-core" % "4.0.4",\n "org.twitter4j" % "twitter4j-stream" % "4.0.4",\n "edu.stanford.nlp" % "stanford-corenlp" % "4.2.1",\n "edu.stanford.nlp" % "stanford-corenlp" % "4.2.1" classifier "models",\n "software.amazon.awssdk" % "kinesis" % "2.17.31",\n "org.apache.hadoop" % "hadoop-common" % "3.2.0",\n "org.apache.hadoop" % "hadoop-aws" % "3.2.0",\n)\n</code></pre>\n<p>使用下面的示例 scala 代码在 bigdata 包(src/Main/scala/bigdata)中创建 Main.scala。代码在1800秒大小的滑动窗口上,每10秒移动一次,找到5个最流行的英文推文标签。它将结果作为 JSON 对象写入 Kinesis 流,供订阅者使用。</p>\n<pre><code class=\"lang-\">package bigdata\n\nimport java.nio.charset.StandardCharsets.UTF_8\n\n// AWS imports\nimport software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider\nimport software.amazon.awssdk.core.SdkBytes\nimport software.amazon.awssdk.regions.Region\nimport software.amazon.awssdk.services.kinesis.KinesisAsyncClient\nimport software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;\nimport software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;\n\n// Log imports\nimport org.apache.log4j._\nimport org.apache.spark.streaming._\nimport org.apache.spark.streaming.twitter._\n\n// Twitter API imports\nimport twitter4j.auth.OAuthAuthorization\nimport twitter4j.conf.ConfigurationBuilder\n\n// Spark imports\nimport org.apache.spark._\n\nobject Main extends App {\n\n // Create output JSON to send to output\n def createJSON(tuples: List[(String, Int)]): String = tuples match {\n case Nil => "{}"\n case (x, y) :: Nil => "\\"" + x + "\\": " + y.toString\n case (x, y) :: tail => "\\"" + x + "\\": " + y.toString + ", " + createJSON(tail)\n }\n\n def createRecordEntry(record: String, partition_key: String, stream_name: String): PutRecordsRequest = {\n val recordEntry = PutRecordsRequestEntry\n .builder()\n .data(SdkBytes.fromString(record, UTF_8))\n .partitionKey(partition_key)\n .build()\n\n val putRecordsRequest = PutRecordsRequest\n .builder()\n .streamName(stream_name)\n .records(recordEntry)\n .build();\n\n putRecordsRequest\n }\n\n // Set the log level to only print errors\n Logger.getLogger("org").setLevel(Level.ERROR)\n\n // Read Twitter credentials from the environment variables\n System.setProperty("twitter4j.oauth.consumerKey", sys.env("TWITTER_CONSUMER_KEY"))\n System.setProperty("twitter4j.oauth.consumerSecret", sys.env("TWITTER_CONSUMER_SECRET"))\n System.setProperty("twitter4j.oauth.accessToken", sys.env("TWITTER_ACCESS_TOKEN"))\n System.setProperty("twitter4j.oauth.accessTokenSecret", sys.env("TWITTER_TOKEN_SECRET"))\n\n // Kinesis streams parameters\n val kinesisStream = "spark_analysis_stream"\n val partitionKey = "data"\n val kinesisRegion = "us-east-1"\n\n // Sliding window parameters\n val slideIntervalSec = 10\n val windowLengthSec = 1800\n\n // Directory to output top hashtags\n val outputDirectory = "./twitter"\n\n val slideInterval = Seconds(slideIntervalSec)\n val windowLength = Seconds(windowLengthSec)\n\n // set AWS kinesis client\n val kinesisClient = KinesisAsyncClient\n .builder()\n .region(Region.of(kinesisRegion))\n .build()\n\n // Setup the SparkConfig and StreamingContext\n val conf = new SparkConf().setAppName("twitterAnalysis")\n val ssc = new StreamingContext(conf, Seconds(1))\n\n val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))\n val twitterStream = TwitterUtils.createStream(ssc, auth)\n\n // Analyse English tweets only\n val tweet = twitterStream.filter(_.getLang == "en").map(_.getText)\n\n // Finding the topmost popular hashtags\n val hashTagStream = tweet.flatMap(_.split(" ")).filter(_.startsWith("#")).map(w => (w, 1))\n val windowedHashTagCount = hashTagStream.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), (x: Int, y: Int) => (x - y), windowLength, slideInterval)\n val orderedHashTags = windowedHashTagCount.transform(rdd => {\n val list = rdd.sortBy(_._2, false).take(5)\n kinesisClient.putRecords(createRecordEntry("{" + createJSON(list.toList) + "}", partitionKey, kinesisStream))\n rdd\n })\n orderedHashTags.print()\n\n ssc.checkpoint("s3://spark-streaming-checkpoints/checkpoints")\n ssc.start()\n ssc.awaitTermination()\n}\n</code></pre>\n<h3><a id=\"_266\"></a><strong>在本地测试程序</strong></h3>\n<p>您可以在本地运行该程序,以确认结果是否符合预期。但是,您必须对上面的 Scala 程序进行轻微修改(并在创建包之前将其还原)</p>\n<p>1.将 Spark 配置中的 master 设置为本地运行</p>\n<pre><code class=\"lang-\">val conf = new SparkConf().setMaster("local[*]").setAppName("twitterAnalysis")\n</code></pre>\n<p>2.向 Kinesis 客户端添加 Amazon 凭据(profileName 是您要使用的 Amazon 配置文件)</p>\n<pre><code class=\"lang-\">\nval kinesisClient = KinesisAsyncClient\n.builder()\n.region(Region.of(kinesisRegion))\n .credentialsProvider(ProfileCredentialsProvider.create(profileName))\n .build()\n</code></pre>\n<p>3.将 S3上的检查点地址替换为本地目录,如/tmp</p>\n<p>现在应该可以在本地机器上运行基准测试了。使用以下 Python 脚本从 Kinesis 流中读取最流行的 hashtags:</p>\n<pre><code class=\"lang-\">import boto3\nimport json\nfrom datetime import datetime\nimport calendar\nimport random\nimport time\n\nmy_stream_name = 'spark_analysis_stream'\n\nkinesis_client = boto3.client('kinesis', region_name='us-east-1')\n\nresponse = kinesis_client.describe_stream(StreamName=my_stream_name)\n\nmy_shard_id = response['StreamDescription']['Shards'][0]['ShardId']\n\nshard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,\n ShardId=my_shard_id,\n ShardIteratorType='LATEST')\n\nmy_shard_iterator = shard_iterator['ShardIterator']\n\nrecord_response = kinesis_client.get_records(ShardIterator=my_shard_iterator)\n\nrecord_num = 0\nwhile 'NextShardIterator' in record_response:\n record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'])\n\n for record in record_response['Records']:\n print(record_num, record['Data'].decode('utf-8'))\n record_num = record_num + 1\n</code></pre>\n<p>假设上述脚本存储为 kinesis_read.py,则运行此命令以获取5条最流行的推文:</p>\n<pre><code class=\"lang-\">AWS_PROFILE=<AWS_PROFILE_NAME> python ./kinesis_read.py\n</code></pre>\n<h3><a id=\"_322\"></a><strong>打包代码和依赖项</strong></h3>\n<p>由于您要构建一个在 Gravion2机器上运行的容器,因此需要在基于 Amazon Gravion2的实例上构建该容器。</p>\n<p>首先,使用 sbt assembly 创建一个包含包及其依赖项的 JAR 文件,不包括特定于 Spark 的 JAR 文件(IntelliJ 用户可以按照本文对其应用程序进行打包)。接下来,下载Spark 3.1.2并将应用程序 JAR 文件复制到’jars’目录中。</p>\n<p>接下来,构建 Spark 容器映像并将其推送到 AmazonElasticContainerRegistry,以便工作节点可以拉取它来创建 Spark 容器。</p>\n<h3><a id=\"_Spark_330\"></a><strong>提交 Spark</strong></h3>\n<p>现在,是将作业提交到 EKS 集群的时候了。应用程序由一个执行器提交,并使用默认配置。这不是一个需要大量资源的计算,因为进入流的 tweet 数量是有限的。在运行 spark submit 之前,请更换 spark submit 命令中的以下参数:</p>\n<p>1.KUBERNETES_主地址:配置文件中的 KUBERNETES 主地址(默认配置文件位置为~/.kube/config)</p>\n<p>2.DOCKER_IMAGE_ADDRESS:推送到 Amazon ECR 的 Spark DOCKER 映像的地址</p>\n<p>3.Twitter API keys</p>\n<p>假设应用程序 JAR 文件名为bigdata-assembly-0.1.JAR,在替换所需参数后,在下载的 Spark 文件夹中运行以下命令。请注意,RDD 复制至少需要两个执行器来保持容错性:</p>\n<pre><code class=\"lang-\">bin/spark-submit \\\n --class bigdata.Main \\\n --master k8s://<KUBERNETES_MASTER_ADDRESS> \\\n --deploy-mode cluster \\\n --conf spark.executor.instances=2 \\\n --conf <DOCKER_IMAGE_ADDRESS> \\\n --conf spark.kubernetes.driver.pod.name="twitter" \\\n --conf spark.kubernetes.namespace=default \\\n --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\\n --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_KEY> \\\n --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.kubernetes.driverEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \\\n --conf spark.kubernetes.driverEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \\\n --conf spark.executorEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.executorEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.executorEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \\\n --conf spark.executorEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \\\n --name sparkTwitter \\\n local:///opt/spark/jars/bigdata-assembly-0.1.jar\n</code></pre>\n<p>它应该在没有错误中断的情况下开始运行。</p>\n<p>运行以下命令以确保驱动程序和执行器吊舱正常工作:</p>\n<pre><code class=\"lang-\">\n$ kubectl get pods\nNAME READY STATUS RESTARTS AGE\nspark-twitter 1/1 Running 0 6m1s\ntwitteranalysis-c435bf7c50edfbbe-exec-1 1/1 Running 0 5m54s\n</code></pre>\n<p>Python 脚本的输出(在上一节中介绍)类似于以下内容:</p>\n<pre><code class=\"lang-\">$ AWS_PROFILE=default python ./kinesis_read.py\n \n0 {"#AdBor": 3, "#TAMED_DASHED": 2, "#etsy": 2, "#KingdomHearts": 2, "#GOT7": 2}\n1 {"#etsy": 3, "#MACLovesLisa": 3, "#Airforce_Result_Do": 3, "#sora": 3, "#AdBor": 3}\n2 {"#Airforce_Result_Do": 4, "#sora": 4, "#etsy": 3, "#MACLovesLisa": 3, "#kingdom": 3}\n3 {"#sora": 5, "#MACLovesLisa": 4, "#Airforce_Result_Do": 4, "#etsy": 3, "#kingdom": 3}\n</code></pre>\n<h3><a id=\"2_382\"></a><strong>案例研究2:推特的情绪分析</strong></h3>\n<p>另一个有趣的例子是使用已经训练过的模型对 Spark Streaming connector 接收到的推文进行情感分析。这一次,不是寻找最流行的 hashtag,而是计算滑动窗口上不同情绪的数量并发送到 Kinesis 流。</p>\n<p>该存储库使用 StanfordNLP 库实现了 Twitter 情绪分析。只需将此 scala 文件添加到包的目录下,并替换文件中的包名。核心依赖项已经添加到 sbt 文件中,因此它们应该已经被提取到依赖项目录中。</p>\n<p>更改主 Scala 对象很简单。此代码段替换 Main.scala 中特定于哈希标记的计算代码:</p>\n<pre><code class=\"lang-\">\n// transform sentiments into string\n import bigdata.SentimentAnalysisUtils._\n\n def sentimentToString(sentiment: SENTIMENT_TYPE): String = {\n sentiment match {\n case NOT_UNDERSTOOD => "NOT_UNDERSTOOD"\n case VERY_NEGATIVE => "VERY_NEGATIVE"\n case NEGATIVE => "NEGATIVE"\n case NEUTRAL => "NEUTRAL"\n case POSITIVE => "POSITIVE"\n case VERY_POSITIVE => "VERY_POSITIVE"\n }\n }\n\n val sentiments = tweet.map(x => (SentimentAnalysisUtils.detectSentiment(x), 1))\n val windowedSentimentCount = sentiments.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), (x: Int, y: Int) => (x - y), windowLength, slideInterval)\n val sentimentsCount = windowedSentimentCount.transform( rdd => {\n val list = rdd.sortBy(_._2, false).take(5).map(x => (sentimentToString(x._1), x._2))\n\n kinesisClient.putRecords(createRecordEntry("{" + createJSON(list.toList) + "}", partitionKey, kinesisStream))\n rdd\n })\n\n sentimentsCount.print()\n</code></pre>\n<p>按照与前面案例研究相同的步骤构建 JAR 文件、创建 Spark 容器和提交作业。由于情感分析由于更高的计算要求而使用更多的资源,因此在本案例研究中使用了5个工作节点。以下是在群集上运行作业的 spark submit 命令:</p>\n<pre><code class=\"lang-\">bin/spark-submit \\\n --class bigdata.Main \\\n --master k8s://<KUBERNETES_MASTER_ADDRESS> \\\n --deploy-mode cluster \\\n --conf spark.executor.instances=14 \\\n --conf <DOCKER_IMAGE_ADDRESS> \\\n --conf spark.kubernetes.driver.pod.name="twitter" \\\n --conf spark.kubernetes.namespace=default \\\n --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \\\n --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_KEY> \\\n --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.kubernetes.driverEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \\\n --conf spark.kubernetes.driverEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \\\n --conf spark.executorEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.executorEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \\\n --conf spark.executorEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \\\n --conf spark.executorEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \\\n --conf spark.executor.cores=5 \\\n --conf spark.driver.cores=5 \\\n --conf spark.driver.memory=34g \\\n --conf spark.executor.memory=34g \\\n --conf spark.memory.fraction=0.8 \\\n --name sparkTwitter \\\n local:///opt/spark/jars/bigdata-assembly-0.1.jar\n</code></pre>\n<p>运行同一个 Python 脚本,通过从 Kinesis 流读取数据,显示如下输出:</p>\n<pre><code class=\"lang-\">$ AWS_PROFILE=default python ./kinesis_read.py\n \n0 {"NEUTRAL": 552, "NEGATIVE": 179, "POSITIVE": 70, "NOT_UNDERSTOOD": 1}\n1 {"NEUTRAL": 1563, "NEGATIVE": 537, "POSITIVE": 200, "VERY_POSITIVE": 2, "NOT_UNDERSTOOD": 2}\n2 {"NEUTRAL": 3982, "NEGATIVE": 1356, "POSITIVE": 536, "NOT_UNDERSTOOD": 4, "VERY_POSITIVE": 3}\n</code></pre>\n<p>在前面的代码中,我们只使用数字特征进行训练和推理。读者可以在输入列中添加分类功能,如ocean_邻近性,并验证结果。</p>\n<h3><a id=\"_454\"></a><strong>结论</strong></h3>\n<p>在这个博客中,我们回顾了两个实时 Twitter 分析案例研究,使用 EKS 上的 Spark Streaming 和基于 Graviron2的实例:查找最流行的 hashtag 和对推特的情绪分析。与基于 x86的实例相比,基于 Amazon Gravion2的实例提供更低的每小时价格,同时提供有竞争力的性能。通常,Spark 作业可能需要在集群中的大量工作节点上运行。因此,在存在大量数据集和需要大量资源的计算的情况下,降低每个实例的成本可以显著降低运行 Spark 作业的成本。</p>\n<p>有关采用基于 Arm 处理器的客户案例,请访问 Amazon Graviton 页面。有关在 Arm Neoverse 平台上运行的软件工作负载的任何查询,请随时联系sw-ecosystem@arm.com.<br />\n<a href=\"https://aws.amazon.com/ec2/graviton/\" target=\"_blank\">https://aws.amazon.com/ec2/graviton/</a></p>\n<p><a href=\"https://community.arm.com/arm-community-blogs/b/infrastructure-solutions-blog/posts/spark-on-aws-graviton2-real-time-analysis-using-spark-streaming\" target=\"_blank\">阅读原文</a></p>\n"}