Best practices to optimize cost and performance for Amazon Glue streaming ETL jobs

机器学习
物联网
海外精选
海外精选的内容汇集了全球优质的亚马逊云科技相关技术内容。同时,内容中提到的“AWS” 是 “Amazon Web Services” 的缩写,在此网站不作为商标展示。
0
0
{"value":"[Amazon Web Services Glue](https://aws.amazon.com/glue) streaming extract, transform, and load (ETL) jobs allow you to process and enrich vast amounts of incoming data from systems such as [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/), [Amazon Managed Streaming for Apache Kafka](https://aws.amazon.com/msk/)(Amazon MSK), or any other [Apache Kafka](https://kafka.apache.org/) cluster. It uses the [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) framework to perform data processing in near-real time.\n\nThis post covers use cases where data needs to be efficiently processed, delivered, and possibly actioned in a limited amount of time. This can cover a wide range of cases, such as log processing and alarming, continuous data ingestion and enrichment, data validation, internet of things, machine learning (ML), and more.\n\nWe discuss the following topics:\n\n- Development tools that help you code faster using our newly launched [Amazon Web Services Glue Studio notebooks](Amazon Web Services Glue Studio notebooks)\n- How to monitor and tune your streaming jobs\n- Best practices for sizing and scaling your Amazon Web Services Glue cluster, using our newly launched features like **auto scaling** and the small worker type **G 0.25X**\n\n### Development tools\n[Amazon Web Services Glue Studio notebooks](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html) can speed up the development of your streaming job by allowing data engineers to work using an interactive notebook and test code changes to get quick feedback—from business logic coding to testing configuration changes—as part of tuning.\n\nBefore you run any code in the notebook (which would start the session), you need to set some important configurations.\n\nThe magic ```%streaming``` creates the session cluster using the same runtime as Amazon Web Services Glue streaming jobs. This way, you interactively develop and test your code using the same runtime that you use later in the production job.\n\nAdditionally, configure Spark UI logs, which will be very useful for monitoring and tuning the job.\n\nSee the following configuration:\n```\\n%streaming\\n%%configure\\n{\\n\\"--enable-spark-ui\\": \\"true\\",\\n\\"--spark-event-logs-path\\": \\"s3://your_bucket/sparkui/\\"\\n}\\n```\nFor additional configuration options such as version or number of workers, refer to [Configuring Amazon Web Services Glue interactive sessions for Jupyter and Amazon Web Services Glue Studio notebooks.](https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions-magics.html)\n\nTo visualize the Spark UI logs, you need a Spark history server. If you don’t have one already, refer to [Launching the Spark History Server](https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions-magics.html) for deployment instructions.\n\nStructured Streaming is based on streaming DataFrames, which represent micro-batches of messages.\nThe following code is an example of creating a stream DataFrame using [Amazon Kinesis](https://aws.amazon.com/cn/kinesis/?trk=cndc-detail) as the source:\n```\\nkinesis_options = {\\n \\"streamARN\\": \\"arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream\\",\\n \\"startingPosition\\": \\"TRIM_HORIZON\\",\\n \\"inferSchema\\": \\"true\\",\\n \\"classification\\": \\"json\\"\\n}\\nkinesisDF = glueContext.create_data_frame_from_options(\\n connection_type=\\"kinesis\\",\\n connection_options=kinesis_options\\n)\\n```\nThe Amazon Web Services Glue API helps you create the DataFrame by doing schema detection and auto decompression, [depending on the format.](https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html) You can also build it yourself using the Spark API directly:\n```\\nkinesisDF = spark.readStream.format(\\"kinesis\\").options(**kinesis_options).load()\\n```\nAfter your run any code cell, it triggers the startup of the session, and the application soon appears in the history server as an incomplete app (at the bottom of the page there is a link to display incomplete apps) named GlueReplApp, because it’s a session cluster. For a regular job, it’s listed with the job name given when it was created.\n\n![image.png](https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/4955c46371904a219afdc85146118976_image.png)\n\nFrom the notebook, you can [take a sample of the streaming data](https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions-streaming.html#w51aac26c19c35b7). This can help development and give an indication of the type and size of the streaming messages, which might impact performance.\n\n### Monitor the cluster with Structured Streaming\nThe best way to monitor and tune your Amazon Web Services Glue streaming job is using the Spark UI; it gives you the overall streaming job trends on the **Structured Streaming** tab and the details of each individual micro-batch processing job.\n\n#### Overall view of the streaming job\nOn the **Structured Streaming** tab, you can see a summary of the streams running in the cluster, as in the following example.\n\n![image.png](https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/6dc12bae0ad544fe87968dc5bcb8c6af_image.png)\n\nNormally there is just one streaming query, representing a streaming ETL. If you start multiple in parallel, it’s good if you give it a recognizable name, calling ```queryName()``` if you use the ```writeStream``` API directly on the DataFrame.\n\nAfter a good number of batches are complete (such as 10), enough for the averages to stabilize, you can use **Avg Input/sec** column to monitor how many events or messages the job is processing. This can be confusing because the column to the right, **Avg Process/sec**, is similar but often has a higher number. The difference is that this process time tells us how efficient our code is, whereas the average input tells us how many messages the cluster is reading and processing.\n\nThe important thing to note is that if the two values are similar, it means the job is working at maximum capacity. It’s making the best use of the hardware but it likely won’t be able to cope with an increase in volume without causing delays.\n\nIn the last column is the latest batch number. Because they’re numbered incrementally from zero, this tells us how many batches the query has processed so far.\n\nWhen you choose the link in the “Run ID” column of a streaming query, you can review the details with graphs and histograms, as in the following example.\n\n![image.png](https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/4af3e7271c4a43ceb062fa7cb45cd843_image.png)\n\nThe first two rows correspond to the data that is used to calculate the averages shown on the summary page.\n\nFor **Input Rate**, each data point is calculated by dividing the number of events read for the batch by the time passed between the current batch start and the previous batch start. In a healthy system that is able to keep up, this is equal to the configured trigger interval (in the ```GlueContext.forEachBatch()``` API, this is set using the option ```windowSize```).\n\nBecause it uses the current batch rows with the previous batch latency, this graph is often unstable in the first batches until the **Batch Duration** (the last line graph) stabilizes.\n\nIn this example, when it stabilizes, it gets completely flat. This means that either the influx of messages is constant or the job is hitting the limit per batch set (we discuss how to do this later in the post).\n\nBe careful if you set a limit per batch that is constantly hit, you could be silently building a backlog, but everything could look good in the job metrics. To monitor this, have a metric of latency measuring the difference between the message timestamp when it gets created and the time it’s processed.\n\n**Process Rate** is calculated by dividing the number of messages in a batch by the time it took to process that batch. For instance, if the batch contains 1,000 messages, and the trigger interval is 10 seconds but the batch only needed 5 seconds to process it, the process rate would be 1000/5 = 200 msg/sec. while the input rate for that batch (assuming the previous batch also ran within the interval) is 1000/10 = 100 msg/sec.\n\nThis metric is useful to measure how efficient our code processing the batch is, and therefore it can get higher than the input rate (this doesn’t mean it’s processing more messages, just using less time). As mentioned earlier, if both metrics get close, it means the batch duration is close to the interval and therefore additional traffic is likely to start causing batch trigger delays (because the previous batch is still running) and increase latency.\n\nLater in this post, we show how auto scaling can help prevent this situation.\n\n**Input Rows** shows the number of messages read for each batch, like input rate, but using volume instead of rate.\n\nIt’s important to note that if the batch processes the data multiple times (for example, writing to multiple destinations), the messages are counted multiple times. If the rates are greater than the expected, this could be the reason. In general, to avoid reading messages multiple times, you should cache the batch while processing it, which is the default when you use the ```GlueContext.forEachBatch()``` API.\n\nThe last two rows tell us how long it takes to process each batch and how is that time spent. It’s normal to see the first batches take much longer until the system warms up and stabilizes.\nThe important thing to look for is that the durations are roughly stable and well under the configured trigger interval. If that’s not the case, the next batch gets delayed and could start a compounding delay by building a backlog or increasing batch size (if the limit allows taking the extra messages pending).\n\nIn **Operation Duration**, the majority of time should be spent on addBatch (the mustard color), which is the actual work. The rest are fixed overhead, therefore the smaller the batch process, the more percentage of time that will take. This represents the trade-off between small batches with lower latency or bigger batches but more computing efficient.\n\nAlso, it’s normal for the first batch to spend significant time in the latestOffset (the brown bar), locating the point at which it needs to start processing when there is no checkpoint.\n\nThe following query statistics show another example.\n\n![image.png](https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/8f2f446016b2459087338cd130d71bab_image.png)\n\nIn this case, the input has some variation (meaning it’s not hitting the batch limit). Also, the process rate is roughly the same as the input rate. This tells us the system is at max capacity and struggling to keep up. By comparing the input rows and input rate, we can guess that the interval configured is just 3 seconds and the batch duration is barely able to meet that latency.\n\nFinally, in **Operation Duration**, you can observe that because the batches are so frequent, a significant amount of time (proportionally speaking) is spent saving the checkpoint (the dark green bar).\n\nWith this information, we can probably improve the stability of the job by increasing the trigger interval to 5 seconds or more. This way, it checkpoints less often and has more time to process data, which might be enough to get batch duration consistently under the interval. The trade-off is that the latency between when a message is published and when it’s processed is longer.\n\n#### Monitor individual batch processing\nOn the **Jobs** tab, you can see how long each batch is taking and dig into the different steps the processing involves to understand how the time is spent. You can also check if there are tasks that succeed after retry. If this happens continuously, it can silently hurt performance.\n\nFor instance, the following screenshot shows the batches on the **Jobs** tab of the Spark UI of our streaming job.\n\n\n![image.png](https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/f880b50d3f8d452685f54c7c1e4cc7a3_image.png)\n\nEach batch is considered a job by Spark (don’t confuse the job ID with the batch number; they only match if there is no other action). The job group is the streaming query ID (this is important only when running multiple queries).\n\nThe streaming job in this example has a single stage with 100 partitions. Both batches processed them successfully, so the stage is marked as succeeded and all the tasks completed (100/100 in the progress bar).\n\nHowever, there is a difference in the first batch: there were 20 task failures. You know all the failed tasks succeeded in the retries, otherwise the stage would have been marked as failed. For the stage to fail, the same task would have to fail four times (or as configured by ```spark.task.maxFailures```).\n\nIf the stage fails, the batch fails as well and possibly the whole job; if the job was started by using ```GlueContext.forEachBatch()```, it has a number of retries as per the ```batchMaxRetries``` parameter (three by default).\n\nThese failures are important because they have two effects:\n\n- They can silently cause delays in the batch processing, depending on how long it took to fail and retry.\n- They can cause records to be sent multiple times if the failure is in the last stage of the batch, depending on the type of output. If the output is files, in general it won’t cause duplicates. However, if the destination is [Amazon DynamoDB](https://aws.amazon.com/dynamodb/), JDBC, [Amazon OpenSearch Service](https://aws.amazon.com/opensearch-service/), or another output that uses batching, it’s possible that some part of the output has already been sent. If you can’t tolerate any duplicates, the destination system should handle this (for example, being idempotent).\n\n\nChoosing the description link takes you to the **Stages** tab for that job. Here you can dig into the failure: What is the exception? Is it always in the same executor? Does it succeed on the first retry or took multiple?\n\nIdeally, you want to identify these failures and solve them. For example, maybe the destination system is throttling us because doesn’t have enough provisioned capacity, or a larger timeout is needed. Otherwise, you should at least monitor it and decide if it is systemic or sporadic.\n\n### Sizing and scaling\nDefining how to split the data is a key element in any distributed system to run and scale efficiently. The design decisions on the messaging system will have a strong influence on how the streaming job will perform and scale, and thereby affect the job parallelism.\n\nIn the case of Amazon Web Services Glue Streaming, this division of work is based on Apache Spark partitions, which define how to split the work so it can be processed in parallel. Each time the job reads a batch from the source, it divides the incoming data into Spark partitions.\n\nFor Apache Kafka, each topic partition becomes a Spark partition; similarly, for Kinesis, each stream shard becomes a Spark partition. To simplify, I’ll refer to this parallelism level as number of partitions, meaning Spark partitions that will be determined by the input Kafka partitions or Kinesis shards on a one-to-one basis.\n\nThe goal is to have enough parallelism and capacity to process each batch of data in less time than the configured batch interval and therefore be able to keep up. For instance, with a batch interval of 60 seconds, the job lets 60 seconds of data build up and then processes that data. If that work takes more than 60 seconds, the next batch waits until the previous batch is complete before starting a new batch with the data that has built up since the previous batch started.\n\nIt’s a good practice to limit the amount of data to process in a single batch, instead of just taking everything that has been added since the last one. This helps make the job more stable and predictable during peak times. It allows you to test that the job can handle volume of data without issues (for example, memory or throttling).\n\nTo do so, specify a limit when defining the source stream DataFrame:\n\n- For Kinesis, specify the limit using ```kinesis.executor.maxFetchRecordsPerShard```, and revise this number if the number of shards changes substantially. You might need to increase ```kinesis.executor.maxFetchTimeInMs``` as well, in order to allow more time to read the batch and make sure it’s not truncated.\n- For Kafka, set ```maxOffsetsPerTrigger```, which divides that allowance equally between the number of partitions.\nThe following is an example of setting this config for Kafka (for Kinesis, it’s equivalent but using Kinesis properties):\n\n```\\nkafka_properties= {\\n \\"kafka.bootstrap.servers\\": \\"bootstrapserver1:9092\\",\\n \\"subscribe\\": \\"mytopic\\",\\n \\"startingOffsets\\": \\"latest\\",\\n \\"maxOffsetsPerTrigger\\": \\"5000000\\"\\n}\\n# Pass the properties as options when creating the DataFrame\\nspark.spark.readStream.format(\\"kafka\\").options(**kafka_properties).load()\\n```\n\n#### Initial benchmark\nIf the events can be processed individually (no interdependency such as grouping), you can get a rough estimation of how many messages a single Spark core can handle by running with a single partition source (one Kafka partition or one Kinesis shard stream) with data preloaded into it and run batches with a limit and the minimum interval (1 second). This simulates a stress test with no downtime between batches.\n\nFor these repeated tests, clear the checkpoint directory, use a different one (for example, make it dynamic using the timestamp in the path), or just disable the checkpointing (if using the Spark API directly), so you can reuse the same data.\nLeave a few batches to run (at least 10) to give time for the system and the metrics to stabilize.\n\nStart with a small limit (using the limit configuration properties explained in the previous section) and do multiple reruns, increasing the value. Record the batch duration for that limit and the throughput input rate (because it’s a stress test, the process rate should be similar).\n\nIn general, larger batches tend to be more efficient up to a point. This is because the fixed overhead taken for each to checkpoint, plan, and coordinate the nodes is more significant if the batches are smaller and therefore more frequent.\n\nThen pick your reference initial settings based on the requirements:\n\n- If a goal SLA is required, use the largest batch size whose batch duration is less than half the latency SLA. This is because in the worst case, a message that is stored just after a batch is triggered has to wait at least the interval and then the processing time (which should be less than the interval). When the system is keeping up, the latency in this worst case would be close to twice the interval, so aim for the batch duration to be less than half the target latency.\n- In the case where the throughput is the priority over latency, just pick the batch size that provides a higher average process rate and define an interval that allows some buffer over the observed batch duration.\n\n\nNow you have an idea of the number of messages per core our ETL can handle and the latency. These numbers are idealistic because the system won’t scale perfectly linearly when you add more partitions and nodes. You can use the messages per core obtained to divide the total number of messages per second to process and get the minimum number of Spark partitions needed (each core handles one partition in parallel).\n\nWith this number of estimated Spark cores, calculate the number of nodes needed depending on the type and version, as summarized in the following table.\n\n![image.png](https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/ca132361ec5a422186ee94a087079ba6_image.png)\n\nUsing the newer version 3 is preferable because it includes more optimizations and features like auto scaling (which we discuss later). Regarding size, unless the job has some operation that is heavy on memory, it’s preferable to use the smaller instances so there aren’t so many cores competing for memory, disk, and network shared resources.\n\nSpark cores are equivalent to threads; therefore, you can have more (or less) than the actual cores available in the instance. This doesn’t mean that having more Spark cores is going to necessarily be faster if they’re not backed by physical cores, it just means you have more parallelism competing for the same CPU.\n\n#### Sizing the cluster when you control the input message system\nThis is the ideal case because you can optimize the performance and the efficiency as needed.\n\nWith the benchmark information you just gathered, you can define your initial Amazon Web Services Glue cluster size and configure Kafka or Kinesis with the number of partitions or topics estimated, plus some buffer. Test this baseline setup and adjust as needed until the job can comfortably meet the total volume and required latency.\n\nFor instance, if we have determined that we need 32 cores to be well within the latency requirement for the volume of data to process, then we can create an Amazon Web Services Glue 3.0 cluster with 9 G.1X nodes (a driver and 8 workers with 4 cores = 32) which reads from a Kinesis data stream with 32 shards.\n\nImagine that the volume of data in that stream doubles and we want to keep the latency requirements. To do so, we double the number of workers (16 + 1 driver = 17) and the number of shards on the stream (now 64). Remember this is just a reference and needs to be validated; in practice you might need more or less nodes depending on the cluster size, if the destination system can keep up, complexity of transformations, or other parameters.\n\n#### Sizing the cluster when you don’t control the message system configuration\nIn this case, your options for tuning are much more limited.\n\nCheck if a cluster with the same number of Spark cores as existing partitions (determined by the message system) is able to keep up with the expected volume of data and latency, plus some allowance for peak times.\n\nIf that’s not the case, adding more nodes alone won’t help. You need to repartition the incoming data inside Amazon Web Services Glue. This operation adds an overhead to redistribute the data internally, but it’s the only way the job can scale out in this scenario.\n\nLet’s illustrate with an example. Imagine we have a Kinesis data stream with one shard that we don’t control, and there isn’t enough volume to justify asking the owner to increase the shards. In the cluster, significant computing for each message is needed; for each message, it runs heuristics and other ML techniques to take action depending on the calculations. After running some benchmarks, the calculations can be done promptly for the expected volume of messages using 8 cores working in parallel. By default, because there is only one shard, only one core will process all the messages sequentially.\n\nTo solve this scenario, we can provision an Amazon Web Services Glue 3.0 cluster with 3 G 1X nodes to have 8 worker cores available. In the code repartition, the batch distributes the messages randomly (as evenly as possible) between them:\n\n```\\ndef batch_function(data_frame, batch_id):\\n # Repartition so the udf is called in parallel for each partition\\n data_frame.repartition(8).foreach(process_event_udf)\\n\\nglueContext.forEachBatch(frame=streaming_df, batch_function=batch_function)\\n```\n\nIf the messaging system resizes the number of partitions or shards, the job picks up this change on the next batch. You might need to adjust the cluster capacity accordingly with the new data volume.\n\nThe streaming job is able to process more partitions than Spark cores are available, but might cause inefficiencies because the additional partitions will be queued and won’t start being processed until others finish. This might result in many nodes being idle while the remaining partitions finish and the next batch can be triggered.\n\n#### When the messages have processing interdependencies\nIf the messages to be processed depend on other messages (either in the same or previous batches), that’s likely to be a limiting factor on the scalability. In that case, it might help to analyze a batch (job in Spark UI) to see where the time is spent and if there are imbalances by checking the task duration percentiles on the Stages tab (you can also reach this page by choosing a stage on the Jobs tab).\n\n### Auto scaling\nUp to now, you have seen sizing methods to handle a stable stream of data with the occasional peak.\nHowever, for variable incoming volumes of data, this isn’t cost-effective because you need to size for the worst-case scenario or accept higher latency at peak times.\n\nThis is where Amazon Web Services Glue Streaming 3.0 auto scaling comes in. You can enable it for the job and define the maximum number of workers you want to allow (for example, using the number you have determined needed for the peak times).\n\nThe runtime monitors the trend of time spent on batch processing and compares it with the configured interval. Based on that, it makes a decision to increase or decrease the number of workers as needed, being more aggressive as the batch times get near or go over the allowed interval time.\n\nThe following screenshot is an example of a streaming job with auto scaling enabled.\n\n![image.png](https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/a5ecd567f72441cea9c0b9d244b709cb_image.png)\n\n#### Splitting workloads\nYou have seen how to scale a single job by adding nodes and partitioning the data as needed, which is enough on most cases. As the cluster grows, there is still a single driver and the nodes have to wait for the others to complete the batch before they can take additional work. If it reaches a point that increasing the cluster size is no longer effective, you might want to consider splitting the workload between separate jobs.\n\nIn the case of Kinesis, you need to divide the data into multiple streams, but for Apache Kafka, you can divide a topic into multiple jobs by assigning partitions to each one. To do so, instead of the usual ```subscribe``` or ```subscribePattern``` where the topics are listed, use the property assign to assign using JSON a subset of the topic partitions that the job will handle (for example, ```{\\"topic1\\": [0,1,2]}```). At the time of this writing, it’s not possible to specify a range, so you need to list all the partitions, for instance building that list dynamically in the code.\n\n#### Sizing down\nFor low volumes of traffic, Amazon Web Services Glue Streaming has a special type of small node: G 0.25X, which provides two cores and 4 GB RAM for a quarter of the cost of a DPU, so it’s very cost-effective. However, even with that frugal capacity, if you have many small streams, having a small cluster for each one is still not practical.\n\nFor such situations, there are currently a few options:\n\n- Configure the stream DataFrame to feed from multiple Kafka topics or Kinesis streams. Then in the DataFrame, use the columns ```topic``` and ```streamName```, for Kafka and Kinesis sources respectively, to determine how to handle the data (for example, different transformations or destinations). Make sure the DataFrame is cached, so you don’t read the data multiple times.\n- If you have a mix of Kafka and Kinesis sources, you can define a DataFrame for each, join them, and process as needed using the columns mentioned in the previous point.\n- The preceding two cases require all the sources to have the same batch interval and links their processing (for example, a busier stream can delay a slower one). To have independent stream processing inside the same cluster, you can trigger the processing of separate stream’s DataFrames using separate threads. Each stream is monitored separately in the Spark UI, but you’re responsible for starting and managing those threads and handle errors.\n\n\n### Settings\nIn this post, we showed some config settings that impact performance. The following table summarizes the ones we discussed and other important config properties to use when creating the input stream DataFrame.\n\n![image.png](https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/19e9a62b9dca475889dff9b85032087d_image.png)\n\n### Further optimizations\nIn general, it’s worth doing some compression on the messages to save on transfer time (at the expense of some CPU, depending on the compression type used).\n\nIf the producer compresses the messages individually, Amazon Web Services Glue can detect it and decompress automatically in most cases, depending on the format and type. For more information, refer to [Adding Streaming ETL Jobs in Amazon Web Services Glue](https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html).\n\nIf using Kafka, you have the option to compress the topic. This way, the compression is more effective because it’s done in batches, end-to-end, and it’s transparent to the producer and consumer.\n\nBy default, the ```GlueContext.forEachBatch``` function caches the incoming data. This is helpful if the data needs to be sent to multiple sinks (for example, as [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) files and also to update a DynamoDB table) because otherwise the job would read the data multiple times from the source. But it can be detrimental to performance if the volume of data is big and there is only one output.\n\nTo disable this option, set ```persistDataFrame``` as ```false```:\n\n```\\nglueContext.forEachBatch(\\n frame=myStreamDataFrame,\\n batch_function=processBatch,\\n options={\\n \\"windowSize\\": \\"30 seconds\\",\\n \\"checkpointLocation\\": myCheckpointPath,\\n \\"persistDataFrame\\": \\"false\\"\\n }\\n)\\n```\n\nIn streaming jobs, it’s common to have to join streaming data with another DataFrame to do enrichment (for example, lookups). In that case, you want to avoid any shuffle if possible, because it splits stages and causes data to be moved between nodes.\n\nWhen the DataFrame you’re joining to is relatively small to fit in memory, consider using a broadcast join. However, bear in mind it will be distributed to the nodes on every batch, so it might not be worth it if the batches are too small.\n\nIf you need to shuffle, consider enabling the [Kryo serializer](https://spark.apache.org/docs/latest/tuning.html#data-serialization) (if using custom serializable classes you need to register them first to use it).\n\nAs in any Amazon Web Services Glue jobs, avoid using custom ```udf()``` if you can do the same with the provided API like Spark SQL. User-defined functions (UDFs) prevent the runtime engine from performing many optimizations (the UDF code is a black box for the engine) and in the case of Python, it forces the movement of data between processes.\n\nAvoid generating too many small files (especially columnar like Parquet or ORC, which have overhead per file). To do so, it might be a good idea to coalesce the micro-batch DataFrame before writing the output. If you’re writing partitioned data to [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail), repartition based on columns can significantly reduce the number of output files created.\n\n### Conclusion\nIn this post, you saw how to approach sizing and tuning an Amazon Web Services Glue streaming job in different scenarios, including planning considerations, configuration, monitoring, tips, and pitfalls.\n\nYou can now use these techniques to monitor and improve your existing streaming jobs or use them when designing and building new ones.\n\n**About the author**\n\n![image.png](https://dev-media.amazoncloud.cn/f2b7bd68a6d4436cacae3a34fc6033f4_image.png)\n\n**Gonzalo Herreros** is a Senior Big Data Architect on the Amazon Web Services Glue team.","render":"<p><a href=\\"https://aws.amazon.com/glue\\" target=\\"_blank\\">Amazon Web Services Glue</a> streaming extract, transform, and load (ETL) jobs allow you to process and enrich vast amounts of incoming data from systems such as <a href=\\"https://aws.amazon.com/kinesis/data-streams/\\" target=\\"_blank\\">Amazon Kinesis Data Streams</a>, <a href=\\"https://aws.amazon.com/msk/\\" target=\\"_blank\\">Amazon Managed Streaming for Apache Kafka</a>(Amazon MSK), or any other <a href=\\"https://kafka.apache.org/\\" target=\\"_blank\\">Apache Kafka</a> cluster. It uses the <a href=\\"https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html\\" target=\\"_blank\\">Spark Structured Streaming</a> framework to perform data processing in near-real time.</p>\\n<p>This post covers use cases where data needs to be efficiently processed, delivered, and possibly actioned in a limited amount of time. This can cover a wide range of cases, such as log processing and alarming, continuous data ingestion and enrichment, data validation, internet of things, machine learning (ML), and more.</p>\n<p>We discuss the following topics:</p>\n<ul>\\n<li>Development tools that help you code faster using our newly launched [Amazon Web Services Glue Studio notebooks](Amazon Web Services Glue Studio notebooks)</li>\n<li>How to monitor and tune your streaming jobs</li>\n<li>Best practices for sizing and scaling your Amazon Web Services Glue cluster, using our newly launched features like <strong>auto scaling</strong> and the small worker type <strong>G 0.25X</strong></li>\\n</ul>\n<h3><a id=\\"Development_tools_10\\"></a>Development tools</h3>\\n<p><a href=\\"https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html\\" target=\\"_blank\\">Amazon Web Services Glue Studio notebooks</a> can speed up the development of your streaming job by allowing data engineers to work using an interactive notebook and test code changes to get quick feedback—from business logic coding to testing configuration changes—as part of tuning.</p>\\n<p>Before you run any code in the notebook (which would start the session), you need to set some important configurations.</p>\n<p>The magic <code>%streaming</code> creates the session cluster using the same runtime as Amazon Web Services Glue streaming jobs. This way, you interactively develop and test your code using the same runtime that you use later in the production job.</p>\\n<p>Additionally, configure Spark UI logs, which will be very useful for monitoring and tuning the job.</p>\n<p>See the following configuration:</p>\n<pre><code class=\\"lang-\\">%streaming\\n%%configure\\n{\\n&quot;--enable-spark-ui&quot;: &quot;true&quot;,\\n&quot;--spark-event-logs-path&quot;: &quot;s3://your_bucket/sparkui/&quot;\\n}\\n</code></pre>\\n<p>For additional configuration options such as version or number of workers, refer to <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions-magics.html\\" target=\\"_blank\\">Configuring Amazon Web Services Glue interactive sessions for Jupyter and Amazon Web Services Glue Studio notebooks.</a></p>\\n<p>To visualize the Spark UI logs, you need a Spark history server. If you don’t have one already, refer to <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions-magics.html\\" target=\\"_blank\\">Launching the Spark History Server</a> for deployment instructions.</p>\\n<p>Structured Streaming is based on streaming DataFrames, which represent micro-batches of messages.<br />\\nThe following code is an example of creating a stream DataFrame using Amazon Kinesis as the source:</p>\n<pre><code class=\\"lang-\\">kinesis_options = {\\n &quot;streamARN&quot;: &quot;arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream&quot;,\\n &quot;startingPosition&quot;: &quot;TRIM_HORIZON&quot;,\\n &quot;inferSchema&quot;: &quot;true&quot;,\\n &quot;classification&quot;: &quot;json&quot;\\n}\\nkinesisDF = glueContext.create_data_frame_from_options(\\n connection_type=&quot;kinesis&quot;,\\n connection_options=kinesis_options\\n)\\n</code></pre>\\n<p>The Amazon Web Services Glue API helps you create the DataFrame by doing schema detection and auto decompression, <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html\\" target=\\"_blank\\">depending on the format.</a> You can also build it yourself using the Spark API directly:</p>\\n<pre><code class=\\"lang-\\">kinesisDF = spark.readStream.format(&quot;kinesis&quot;).options(**kinesis_options).load()\\n</code></pre>\\n<p>After your run any code cell, it triggers the startup of the session, and the application soon appears in the history server as an incomplete app (at the bottom of the page there is a link to display incomplete apps) named GlueReplApp, because it’s a session cluster. For a regular job, it’s listed with the job name given when it was created.</p>\n<p><img src=\\"https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/4955c46371904a219afdc85146118976_image.png\\" alt=\\"image.png\\" /></p>\n<p>From the notebook, you can <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions-streaming.html#w51aac26c19c35b7\\" target=\\"_blank\\">take a sample of the streaming data</a>. This can help development and give an indication of the type and size of the streaming messages, which might impact performance.</p>\\n<h3><a id=\\"Monitor_the_cluster_with_Structured_Streaming_56\\"></a>Monitor the cluster with Structured Streaming</h3>\\n<p>The best way to monitor and tune your Amazon Web Services Glue streaming job is using the Spark UI; it gives you the overall streaming job trends on the <strong>Structured Streaming</strong> tab and the details of each individual micro-batch processing job.</p>\\n<h4><a id=\\"Overall_view_of_the_streaming_job_59\\"></a>Overall view of the streaming job</h4>\\n<p>On the <strong>Structured Streaming</strong> tab, you can see a summary of the streams running in the cluster, as in the following example.</p>\\n<p><img src=\\"https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/6dc12bae0ad544fe87968dc5bcb8c6af_image.png\\" alt=\\"image.png\\" /></p>\n<p>Normally there is just one streaming query, representing a streaming ETL. If you start multiple in parallel, it’s good if you give it a recognizable name, calling <code>queryName()</code> if you use the <code>writeStream</code> API directly on the DataFrame.</p>\\n<p>After a good number of batches are complete (such as 10), enough for the averages to stabilize, you can use <strong>Avg Input/sec</strong> column to monitor how many events or messages the job is processing. This can be confusing because the column to the right, <strong>Avg Process/sec</strong>, is similar but often has a higher number. The difference is that this process time tells us how efficient our code is, whereas the average input tells us how many messages the cluster is reading and processing.</p>\\n<p>The important thing to note is that if the two values are similar, it means the job is working at maximum capacity. It’s making the best use of the hardware but it likely won’t be able to cope with an increase in volume without causing delays.</p>\n<p>In the last column is the latest batch number. Because they’re numbered incrementally from zero, this tells us how many batches the query has processed so far.</p>\n<p>When you choose the link in the “Run ID” column of a streaming query, you can review the details with graphs and histograms, as in the following example.</p>\n<p><img src=\\"https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/4af3e7271c4a43ceb062fa7cb45cd843_image.png\\" alt=\\"image.png\\" /></p>\n<p>The first two rows correspond to the data that is used to calculate the averages shown on the summary page.</p>\n<p>For <strong>Input Rate</strong>, each data point is calculated by dividing the number of events read for the batch by the time passed between the current batch start and the previous batch start. In a healthy system that is able to keep up, this is equal to the configured trigger interval (in the <code>GlueContext.forEachBatch()</code> API, this is set using the option <code>windowSize</code>).</p>\\n<p>Because it uses the current batch rows with the previous batch latency, this graph is often unstable in the first batches until the <strong>Batch Duration</strong> (the last line graph) stabilizes.</p>\\n<p>In this example, when it stabilizes, it gets completely flat. This means that either the influx of messages is constant or the job is hitting the limit per batch set (we discuss how to do this later in the post).</p>\n<p>Be careful if you set a limit per batch that is constantly hit, you could be silently building a backlog, but everything could look good in the job metrics. To monitor this, have a metric of latency measuring the difference between the message timestamp when it gets created and the time it’s processed.</p>\n<p><strong>Process Rate</strong> is calculated by dividing the number of messages in a batch by the time it took to process that batch. For instance, if the batch contains 1,000 messages, and the trigger interval is 10 seconds but the batch only needed 5 seconds to process it, the process rate would be 1000/5 = 200 msg/sec. while the input rate for that batch (assuming the previous batch also ran within the interval) is 1000/10 = 100 msg/sec.</p>\\n<p>This metric is useful to measure how efficient our code processing the batch is, and therefore it can get higher than the input rate (this doesn’t mean it’s processing more messages, just using less time). As mentioned earlier, if both metrics get close, it means the batch duration is close to the interval and therefore additional traffic is likely to start causing batch trigger delays (because the previous batch is still running) and increase latency.</p>\n<p>Later in this post, we show how auto scaling can help prevent this situation.</p>\n<p><strong>Input Rows</strong> shows the number of messages read for each batch, like input rate, but using volume instead of rate.</p>\\n<p>It’s important to note that if the batch processes the data multiple times (for example, writing to multiple destinations), the messages are counted multiple times. If the rates are greater than the expected, this could be the reason. In general, to avoid reading messages multiple times, you should cache the batch while processing it, which is the default when you use the <code>GlueContext.forEachBatch()</code> API.</p>\\n<p>The last two rows tell us how long it takes to process each batch and how is that time spent. It’s normal to see the first batches take much longer until the system warms up and stabilizes.<br />\\nThe important thing to look for is that the durations are roughly stable and well under the configured trigger interval. If that’s not the case, the next batch gets delayed and could start a compounding delay by building a backlog or increasing batch size (if the limit allows taking the extra messages pending).</p>\n<p>In <strong>Operation Duration</strong>, the majority of time should be spent on addBatch (the mustard color), which is the actual work. The rest are fixed overhead, therefore the smaller the batch process, the more percentage of time that will take. This represents the trade-off between small batches with lower latency or bigger batches but more computing efficient.</p>\\n<p>Also, it’s normal for the first batch to spend significant time in the latestOffset (the brown bar), locating the point at which it needs to start processing when there is no checkpoint.</p>\n<p>The following query statistics show another example.</p>\n<p><img src=\\"https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/8f2f446016b2459087338cd130d71bab_image.png\\" alt=\\"image.png\\" /></p>\n<p>In this case, the input has some variation (meaning it’s not hitting the batch limit). Also, the process rate is roughly the same as the input rate. This tells us the system is at max capacity and struggling to keep up. By comparing the input rows and input rate, we can guess that the interval configured is just 3 seconds and the batch duration is barely able to meet that latency.</p>\n<p>Finally, in <strong>Operation Duration</strong>, you can observe that because the batches are so frequent, a significant amount of time (proportionally speaking) is spent saving the checkpoint (the dark green bar).</p>\\n<p>With this information, we can probably improve the stability of the job by increasing the trigger interval to 5 seconds or more. This way, it checkpoints less often and has more time to process data, which might be enough to get batch duration consistently under the interval. The trade-off is that the latency between when a message is published and when it’s processed is longer.</p>\n<h4><a id=\\"Monitor_individual_batch_processing_113\\"></a>Monitor individual batch processing</h4>\\n<p>On the <strong>Jobs</strong> tab, you can see how long each batch is taking and dig into the different steps the processing involves to understand how the time is spent. You can also check if there are tasks that succeed after retry. If this happens continuously, it can silently hurt performance.</p>\\n<p>For instance, the following screenshot shows the batches on the <strong>Jobs</strong> tab of the Spark UI of our streaming job.</p>\\n<p><img src=\\"https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/f880b50d3f8d452685f54c7c1e4cc7a3_image.png\\" alt=\\"image.png\\" /></p>\n<p>Each batch is considered a job by Spark (don’t confuse the job ID with the batch number; they only match if there is no other action). The job group is the streaming query ID (this is important only when running multiple queries).</p>\n<p>The streaming job in this example has a single stage with 100 partitions. Both batches processed them successfully, so the stage is marked as succeeded and all the tasks completed (100/100 in the progress bar).</p>\n<p>However, there is a difference in the first batch: there were 20 task failures. You know all the failed tasks succeeded in the retries, otherwise the stage would have been marked as failed. For the stage to fail, the same task would have to fail four times (or as configured by <code>spark.task.maxFailures</code>).</p>\\n<p>If the stage fails, the batch fails as well and possibly the whole job; if the job was started by using <code>GlueContext.forEachBatch()</code>, it has a number of retries as per the <code>batchMaxRetries</code> parameter (three by default).</p>\\n<p>These failures are important because they have two effects:</p>\n<ul>\\n<li>They can silently cause delays in the batch processing, depending on how long it took to fail and retry.</li>\n<li>They can cause records to be sent multiple times if the failure is in the last stage of the batch, depending on the type of output. If the output is files, in general it won’t cause duplicates. However, if the destination is <a href=\\"https://aws.amazon.com/dynamodb/\\" target=\\"_blank\\">Amazon DynamoDB</a>, JDBC, <a href=\\"https://aws.amazon.com/opensearch-service/\\" target=\\"_blank\\">Amazon OpenSearch Service</a>, or another output that uses batching, it’s possible that some part of the output has already been sent. If you can’t tolerate any duplicates, the destination system should handle this (for example, being idempotent).</li>\\n</ul>\n<p>Choosing the description link takes you to the <strong>Stages</strong> tab for that job. Here you can dig into the failure: What is the exception? Is it always in the same executor? Does it succeed on the first retry or took multiple?</p>\\n<p>Ideally, you want to identify these failures and solve them. For example, maybe the destination system is throttling us because doesn’t have enough provisioned capacity, or a larger timeout is needed. Otherwise, you should at least monitor it and decide if it is systemic or sporadic.</p>\n<h3><a id=\\"Sizing_and_scaling_139\\"></a>Sizing and scaling</h3>\\n<p>Defining how to split the data is a key element in any distributed system to run and scale efficiently. The design decisions on the messaging system will have a strong influence on how the streaming job will perform and scale, and thereby affect the job parallelism.</p>\n<p>In the case of Amazon Web Services Glue Streaming, this division of work is based on Apache Spark partitions, which define how to split the work so it can be processed in parallel. Each time the job reads a batch from the source, it divides the incoming data into Spark partitions.</p>\n<p>For Apache Kafka, each topic partition becomes a Spark partition; similarly, for Kinesis, each stream shard becomes a Spark partition. To simplify, I’ll refer to this parallelism level as number of partitions, meaning Spark partitions that will be determined by the input Kafka partitions or Kinesis shards on a one-to-one basis.</p>\n<p>The goal is to have enough parallelism and capacity to process each batch of data in less time than the configured batch interval and therefore be able to keep up. For instance, with a batch interval of 60 seconds, the job lets 60 seconds of data build up and then processes that data. If that work takes more than 60 seconds, the next batch waits until the previous batch is complete before starting a new batch with the data that has built up since the previous batch started.</p>\n<p>It’s a good practice to limit the amount of data to process in a single batch, instead of just taking everything that has been added since the last one. This helps make the job more stable and predictable during peak times. It allows you to test that the job can handle volume of data without issues (for example, memory or throttling).</p>\n<p>To do so, specify a limit when defining the source stream DataFrame:</p>\n<ul>\\n<li>For Kinesis, specify the limit using <code>kinesis.executor.maxFetchRecordsPerShard</code>, and revise this number if the number of shards changes substantially. You might need to increase <code>kinesis.executor.maxFetchTimeInMs</code> as well, in order to allow more time to read the batch and make sure it’s not truncated.</li>\\n<li>For Kafka, set <code>maxOffsetsPerTrigger</code>, which divides that allowance equally between the number of partitions.<br />\\nThe following is an example of setting this config for Kafka (for Kinesis, it’s equivalent but using Kinesis properties):</li>\n</ul>\\n<pre><code class=\\"lang-\\">kafka_properties= {\\n &quot;kafka.bootstrap.servers&quot;: &quot;bootstrapserver1:9092&quot;,\\n &quot;subscribe&quot;: &quot;mytopic&quot;,\\n &quot;startingOffsets&quot;: &quot;latest&quot;,\\n &quot;maxOffsetsPerTrigger&quot;: &quot;5000000&quot;\\n}\\n# Pass the properties as options when creating the DataFrame\\nspark.spark.readStream.format(&quot;kafka&quot;).options(**kafka_properties).load()\\n</code></pre>\\n<h4><a id=\\"Initial_benchmark_167\\"></a>Initial benchmark</h4>\\n<p>If the events can be processed individually (no interdependency such as grouping), you can get a rough estimation of how many messages a single Spark core can handle by running with a single partition source (one Kafka partition or one Kinesis shard stream) with data preloaded into it and run batches with a limit and the minimum interval (1 second). This simulates a stress test with no downtime between batches.</p>\n<p>For these repeated tests, clear the checkpoint directory, use a different one (for example, make it dynamic using the timestamp in the path), or just disable the checkpointing (if using the Spark API directly), so you can reuse the same data.<br />\\nLeave a few batches to run (at least 10) to give time for the system and the metrics to stabilize.</p>\n<p>Start with a small limit (using the limit configuration properties explained in the previous section) and do multiple reruns, increasing the value. Record the batch duration for that limit and the throughput input rate (because it’s a stress test, the process rate should be similar).</p>\n<p>In general, larger batches tend to be more efficient up to a point. This is because the fixed overhead taken for each to checkpoint, plan, and coordinate the nodes is more significant if the batches are smaller and therefore more frequent.</p>\n<p>Then pick your reference initial settings based on the requirements:</p>\n<ul>\\n<li>If a goal SLA is required, use the largest batch size whose batch duration is less than half the latency SLA. This is because in the worst case, a message that is stored just after a batch is triggered has to wait at least the interval and then the processing time (which should be less than the interval). When the system is keeping up, the latency in this worst case would be close to twice the interval, so aim for the batch duration to be less than half the target latency.</li>\n<li>In the case where the throughput is the priority over latency, just pick the batch size that provides a higher average process rate and define an interval that allows some buffer over the observed batch duration.</li>\n</ul>\\n<p>Now you have an idea of the number of messages per core our ETL can handle and the latency. These numbers are idealistic because the system won’t scale perfectly linearly when you add more partitions and nodes. You can use the messages per core obtained to divide the total number of messages per second to process and get the minimum number of Spark partitions needed (each core handles one partition in parallel).</p>\n<p>With this number of estimated Spark cores, calculate the number of nodes needed depending on the type and version, as summarized in the following table.</p>\n<p><img src=\\"https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/ca132361ec5a422186ee94a087079ba6_image.png\\" alt=\\"image.png\\" /></p>\n<p>Using the newer version 3 is preferable because it includes more optimizations and features like auto scaling (which we discuss later). Regarding size, unless the job has some operation that is heavy on memory, it’s preferable to use the smaller instances so there aren’t so many cores competing for memory, disk, and network shared resources.</p>\n<p>Spark cores are equivalent to threads; therefore, you can have more (or less) than the actual cores available in the instance. This doesn’t mean that having more Spark cores is going to necessarily be faster if they’re not backed by physical cores, it just means you have more parallelism competing for the same CPU.</p>\n<h4><a id=\\"Sizing_the_cluster_when_you_control_the_input_message_system_193\\"></a>Sizing the cluster when you control the input message system</h4>\\n<p>This is the ideal case because you can optimize the performance and the efficiency as needed.</p>\n<p>With the benchmark information you just gathered, you can define your initial Amazon Web Services Glue cluster size and configure Kafka or Kinesis with the number of partitions or topics estimated, plus some buffer. Test this baseline setup and adjust as needed until the job can comfortably meet the total volume and required latency.</p>\n<p>For instance, if we have determined that we need 32 cores to be well within the latency requirement for the volume of data to process, then we can create an Amazon Web Services Glue 3.0 cluster with 9 G.1X nodes (a driver and 8 workers with 4 cores = 32) which reads from a Kinesis data stream with 32 shards.</p>\n<p>Imagine that the volume of data in that stream doubles and we want to keep the latency requirements. To do so, we double the number of workers (16 + 1 driver = 17) and the number of shards on the stream (now 64). Remember this is just a reference and needs to be validated; in practice you might need more or less nodes depending on the cluster size, if the destination system can keep up, complexity of transformations, or other parameters.</p>\n<h4><a id=\\"Sizing_the_cluster_when_you_dont_control_the_message_system_configuration_202\\"></a>Sizing the cluster when you don’t control the message system configuration</h4>\\n<p>In this case, your options for tuning are much more limited.</p>\n<p>Check if a cluster with the same number of Spark cores as existing partitions (determined by the message system) is able to keep up with the expected volume of data and latency, plus some allowance for peak times.</p>\n<p>If that’s not the case, adding more nodes alone won’t help. You need to repartition the incoming data inside Amazon Web Services Glue. This operation adds an overhead to redistribute the data internally, but it’s the only way the job can scale out in this scenario.</p>\n<p>Let’s illustrate with an example. Imagine we have a Kinesis data stream with one shard that we don’t control, and there isn’t enough volume to justify asking the owner to increase the shards. In the cluster, significant computing for each message is needed; for each message, it runs heuristics and other ML techniques to take action depending on the calculations. After running some benchmarks, the calculations can be done promptly for the expected volume of messages using 8 cores working in parallel. By default, because there is only one shard, only one core will process all the messages sequentially.</p>\n<p>To solve this scenario, we can provision an Amazon Web Services Glue 3.0 cluster with 3 G 1X nodes to have 8 worker cores available. In the code repartition, the batch distributes the messages randomly (as evenly as possible) between them:</p>\n<pre><code class=\\"lang-\\">def batch_function(data_frame, batch_id):\\n # Repartition so the udf is called in parallel for each partition\\n data_frame.repartition(8).foreach(process_event_udf)\\n\\nglueContext.forEachBatch(frame=streaming_df, batch_function=batch_function)\\n</code></pre>\\n<p>If the messaging system resizes the number of partitions or shards, the job picks up this change on the next batch. You might need to adjust the cluster capacity accordingly with the new data volume.</p>\n<p>The streaming job is able to process more partitions than Spark cores are available, but might cause inefficiencies because the additional partitions will be queued and won’t start being processed until others finish. This might result in many nodes being idle while the remaining partitions finish and the next batch can be triggered.</p>\n<h4><a id=\\"When_the_messages_have_processing_interdependencies_225\\"></a>When the messages have processing interdependencies</h4>\\n<p>If the messages to be processed depend on other messages (either in the same or previous batches), that’s likely to be a limiting factor on the scalability. In that case, it might help to analyze a batch (job in Spark UI) to see where the time is spent and if there are imbalances by checking the task duration percentiles on the Stages tab (you can also reach this page by choosing a stage on the Jobs tab).</p>\n<h3><a id=\\"Auto_scaling_228\\"></a>Auto scaling</h3>\\n<p>Up to now, you have seen sizing methods to handle a stable stream of data with the occasional peak.<br />\\nHowever, for variable incoming volumes of data, this isn’t cost-effective because you need to size for the worst-case scenario or accept higher latency at peak times.</p>\n<p>This is where Amazon Web Services Glue Streaming 3.0 auto scaling comes in. You can enable it for the job and define the maximum number of workers you want to allow (for example, using the number you have determined needed for the peak times).</p>\n<p>The runtime monitors the trend of time spent on batch processing and compares it with the configured interval. Based on that, it makes a decision to increase or decrease the number of workers as needed, being more aggressive as the batch times get near or go over the allowed interval time.</p>\n<p>The following screenshot is an example of a streaming job with auto scaling enabled.</p>\n<p><img src=\\"https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/a5ecd567f72441cea9c0b9d244b709cb_image.png\\" alt=\\"image.png\\" /></p>\n<h4><a id=\\"Splitting_workloads_240\\"></a>Splitting workloads</h4>\\n<p>You have seen how to scale a single job by adding nodes and partitioning the data as needed, which is enough on most cases. As the cluster grows, there is still a single driver and the nodes have to wait for the others to complete the batch before they can take additional work. If it reaches a point that increasing the cluster size is no longer effective, you might want to consider splitting the workload between separate jobs.</p>\n<p>In the case of Kinesis, you need to divide the data into multiple streams, but for Apache Kafka, you can divide a topic into multiple jobs by assigning partitions to each one. To do so, instead of the usual <code>subscribe</code> or <code>subscribePattern</code> where the topics are listed, use the property assign to assign using JSON a subset of the topic partitions that the job will handle (for example, <code>{&quot;topic1&quot;: [0,1,2]}</code>). At the time of this writing, it’s not possible to specify a range, so you need to list all the partitions, for instance building that list dynamically in the code.</p>\\n<h4><a id=\\"Sizing_down_245\\"></a>Sizing down</h4>\\n<p>For low volumes of traffic, Amazon Web Services Glue Streaming has a special type of small node: G 0.25X, which provides two cores and 4 GB RAM for a quarter of the cost of a DPU, so it’s very cost-effective. However, even with that frugal capacity, if you have many small streams, having a small cluster for each one is still not practical.</p>\n<p>For such situations, there are currently a few options:</p>\n<ul>\\n<li>Configure the stream DataFrame to feed from multiple Kafka topics or Kinesis streams. Then in the DataFrame, use the columns <code>topic</code> and <code>streamName</code>, for Kafka and Kinesis sources respectively, to determine how to handle the data (for example, different transformations or destinations). Make sure the DataFrame is cached, so you don’t read the data multiple times.</li>\\n<li>If you have a mix of Kafka and Kinesis sources, you can define a DataFrame for each, join them, and process as needed using the columns mentioned in the previous point.</li>\n<li>The preceding two cases require all the sources to have the same batch interval and links their processing (for example, a busier stream can delay a slower one). To have independent stream processing inside the same cluster, you can trigger the processing of separate stream’s DataFrames using separate threads. Each stream is monitored separately in the Spark UI, but you’re responsible for starting and managing those threads and handle errors.</li>\n</ul>\\n<h3><a id=\\"Settings_255\\"></a>Settings</h3>\\n<p>In this post, we showed some config settings that impact performance. The following table summarizes the ones we discussed and other important config properties to use when creating the input stream DataFrame.</p>\n<p><img src=\\"https://awsdevweb.s3.cn-north-1.amazonaws.com.cn/19e9a62b9dca475889dff9b85032087d_image.png\\" alt=\\"image.png\\" /></p>\n<h3><a id=\\"Further_optimizations_260\\"></a>Further optimizations</h3>\\n<p>In general, it’s worth doing some compression on the messages to save on transfer time (at the expense of some CPU, depending on the compression type used).</p>\n<p>If the producer compresses the messages individually, Amazon Web Services Glue can detect it and decompress automatically in most cases, depending on the format and type. For more information, refer to <a href=\\"https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html\\" target=\\"_blank\\">Adding Streaming ETL Jobs in Amazon Web Services Glue</a>.</p>\\n<p>If using Kafka, you have the option to compress the topic. This way, the compression is more effective because it’s done in batches, end-to-end, and it’s transparent to the producer and consumer.</p>\n<p>By default, the <code>GlueContext.forEachBatch</code> function caches the incoming data. This is helpful if the data needs to be sent to multiple sinks (for example, as [Amazon S3](https://aws.amazon.com/cn/s3/?trk=cndc-detail) files and also to update a DynamoDB table) because otherwise the job would read the data multiple times from the source. But it can be detrimental to performance if the volume of data is big and there is only one output.</p>\\n<p>To disable this option, set <code>persistDataFrame</code> as <code>false</code>:</p>\\n<pre><code class=\\"lang-\\">glueContext.forEachBatch(\\n frame=myStreamDataFrame,\\n batch_function=processBatch,\\n options={\\n &quot;windowSize&quot;: &quot;30 seconds&quot;,\\n &quot;checkpointLocation&quot;: myCheckpointPath,\\n &quot;persistDataFrame&quot;: &quot;false&quot;\\n }\\n)\\n</code></pre>\\n<p>In streaming jobs, it’s common to have to join streaming data with another DataFrame to do enrichment (for example, lookups). In that case, you want to avoid any shuffle if possible, because it splits stages and causes data to be moved between nodes.</p>\n<p>When the DataFrame you’re joining to is relatively small to fit in memory, consider using a broadcast join. However, bear in mind it will be distributed to the nodes on every batch, so it might not be worth it if the batches are too small.</p>\n<p>If you need to shuffle, consider enabling the <a href=\\"https://spark.apache.org/docs/latest/tuning.html#data-serialization\\" target=\\"_blank\\">Kryo serializer</a> (if using custom serializable classes you need to register them first to use it).</p>\\n<p>As in any Amazon Web Services Glue jobs, avoid using custom <code>udf()</code> if you can do the same with the provided API like Spark SQL. User-defined functions (UDFs) prevent the runtime engine from performing many optimizations (the UDF code is a black box for the engine) and in the case of Python, it forces the movement of data between processes.</p>\\n<p>Avoid generating too many small files (especially columnar like Parquet or ORC, which have overhead per file). To do so, it might be a good idea to coalesce the micro-batch DataFrame before writing the output. If you’re writing partitioned data to Amazon S3, repartition based on columns can significantly reduce the number of output files created.</p>\n<h3><a id=\\"Conclusion_293\\"></a>Conclusion</h3>\\n<p>In this post, you saw how to approach sizing and tuning an Amazon Web Services Glue streaming job in different scenarios, including planning considerations, configuration, monitoring, tips, and pitfalls.</p>\n<p>You can now use these techniques to monitor and improve your existing streaming jobs or use them when designing and building new ones.</p>\n<p><strong>About the author</strong></p>\\n<p><img src=\\"https://dev-media.amazoncloud.cn/f2b7bd68a6d4436cacae3a34fc6033f4_image.png\\" alt=\\"image.png\\" /></p>\n<p><strong>Gonzalo Herreros</strong> is a Senior Big Data Architect on the Amazon Web Services Glue team.</p>\n"}
0
目录
关闭