This report focuses on how to tune a Spark application to run on a cluster of instances. We define the concepts for the cluster/Spark parameters, and explain how to configure them given a specific set of resources. We use a K-Means Machine Learning algorithm as a case study to analyze and tune the parameters to achieve the required performance while optimally using the available resources.
When this report was written Graviton3 only offered compute-optimized instances (C7g). So we used Graviton2 based clusters because more instance types (M6g, R6g, C6g) were available for use based on requirements. On February 13, after this report was written, AWS did introduce general-purpose M7g and memory-optimized R7g EC2 instances based on Graviton3.
This report is an in-depth guide. If you prefer a shorter summary of Spark performance on AWS Graviton2, please see our previous blog.
[CTAToken URL = "https://armkeil.blob.core.windows.net/developer/Files/pdf/white-paper/spark-on-aws-graviton2.pdf" target="_blank" text="Download Whitepaper" class ="green"]
We are assuming that the reader already has a basic familiarity with Spark concepts and components. The reader is required to have some experience with Spark coding and an understanding of resource usage and execution-time analysis through the Spark Web UI.
This article uses the HiBench suite for Spark performance analysis. HiBench is designed for big data analysis of Hadoop, Spark, and streaming data engines. It can run different patterns of workloads including micro benchmarks such as sort, word count, and eDFSIO. It can also run SQL benchmarks such as scan, join, and aggregation. Last, it can run Machine Learning benchmarks for K-Means clustering, Gradient Boosted Tress and many more. The benchmarks can run on different data sizes from tiny to big data.
HiBench workloads run in two phases:
We set up the cluster in AWS in the following way:
Different workloads may require different tunings based on the nature of the compute. For instance, data analysis problems may be compute-intensive or memory-intensive. For example, K-Means clustering algorithm in machine learning is a compute-intensive algorithm, while Word Count is more memory intensive. For this report, we explore tuning parameters to run K-Means clustering in an efficient way on AWS instances.
We divide Spark tuning into two separate categories:
Tuning at the cluster/workload level involves choosing runtime parameters for a Spark computation given a specific number of instances with fix resources (CPU cores and memory). For example, on HiBench the following parameters can be set to run the workloads:
Assuming that there are W worker nodes running on the cluster. Each worker node has M gigabytes of memory and C CPU (vCPU) cores. The following explains how to calculate the previous parameters:
There are multiple ways for considering the number of executors:
In principle, the two approaches work in a similar manner when total resources and parallelism (number of parallel tasks) are set the same. However, there are some practical differences that makes using a higher number of executors more suitable:
The downside for using a higher number of executors is that the broadcast data and caching are replicated on each executor. Therefore, if there are E executors on a node, the broadcast data cache are replicated E times on the same node.
To reduce the negative effect of data shuffling between executors, try running the executors on a few large sized instances, instead of many small sized instances.
The following are suggestions on how to calculate the different Yarn/Spark parameters:
Calculating the rest would be straight forward:
The downside of this approach is that users must be aware of the input data size after deserialization. Also, the data size is different at each shuffle stage. So, one approach is to increase the number of partitions until the performance starts to drop.
The parameters that start with yarn.nodemanager refer to node settings, while the ones that start with yarn.scheduler are for single containers (executors) running on the nodes.
Figure 1 shows the way that parameters define memory allocation for the cluster.
Figure 1. Spark memory allocation parameters on Yarn cluster
Spark parameters can be set inside the spark-defaults.conf file in the spark folder. For HiBench, spark parameters are set inside conf/spark.conf in the HiBench folder. The configuration parameters are as follows:
Similar parameters for the executors exist for the driver (Application Requester).
Yarn-specific parameters are defined in core-site.xml:
Spark can be fine-tuned depending on the application that is running. It is up to the developer to tune the memory, garbage collection, and serialization based on the code, data structures, and other parameters. See Tuning Spark document for more details.
Monitoring resources used for running a computation on a Spark cluster is of high importance as it helps finding out if
It is possible to analyze Spark performance and metrics using Web UI. For deep dive analysis, all the data during one or more runs can be collected and viewed using Spark’s history server.
Other ways to collect metrics such as memory/disk/CPU from the machine directly (specifically when running pseudo-distributed cluster on a single machine) includes using tools like System Activity Report (SAR).
Spark’s machine learning library is called MLlib. It implements ML algorithms, data transformations, and pipelines in a distributed fashion. MLlib allows users to save the trained models and load them back in the prediction phase. The new library (also known as Spark ML) is based on Spark’s Dataframe API and applies optimizations to the data pipeline. This article demonstrates K-means clustering benchmarking as a case study for Spark resource allocation and tuning analysis.
K-Means is an unsupervised clustering algorithm. Given K as the number of clusters, the algorithm first allocates K (semi)-random points (centroids). And iteratively refines their values until no further refinement is possible, or the maximum number of iterations is reached.
Spark implementations of K-Means run iterations over partitions independently and collects the results of each iteration back to the driver for centroid refinements. All the iterations are over the same set of input data; so, it caches the data into the memory of each executor for faster computation.
Our benchmarks run on m6gd.16xlarge EC2 instance, this instance has 64 vCPUs and 256GB of memory. Considering 5 cores per executor (scheduler maximum CPU allocation), the number of executors is set to 12, consuming 60 vCPUs. For K-Means, we assign only 1 vCPU for the driver (default) but set the driver memory to be the same as the executors. The memory per executor is set to 16GB (analysis of the metrics shows that this value can be increased). Yarn’s scheduler and node manager parameters are set accordingly. For instance, the scheduler’s maximum allocation of memory is defined to be 18022MB (1.1 * 16GB), and the node manager resources are set to 218264MB (12 executors + safeguard).
HiBench K-Means benchmarks default values are:
We run the K-Means algorithm on Hibench for three workloads of different sizes:
K-Means is a compute-intensive algorithm. The following is the CPU usage of the K-Means algorithm running on large, huge, and gigantic data sizes of HiBench:
Figure 2. CPU usage for large, huge, and gigantic workloads
The total CPU usage of all the executors is 93% based on the number of cores assigned per executor (60/64). Therefore, apparently the gigantic workload uses most of the processing capacity.
Figure 3 shows the memory usage of all the three workloads.
Figure 3. Memory usage for large, huge, and gigantic workloads
The memory usage for all the workloads is below the maximum amount set in the configuration. The two graphs (CPU and memory usage) show that one tuning possibility is to use compute-optimized instances to run the computation. These instances are cheaper than general purpose (M) and memory-optimized (R) instances of the same size.
It is also important to note that the total data size for the gigantic workload is only 37.1GB. Even when considering the memory used during the centroid computation, using close to 200GB of memory looks excessive for K-Means. The reason is the in-memory caching that HiBench K-Means benchmark enforces, which can change the way we tune the cluster to run the code.
In previous sections, the best practices suggest assigning 5 cores per executor, calculating the number of executors to use and assigning memory to each executor based on the available memory on the instance. However, in the case that the Spark program uses in-memory caching, all the caches replicate on all the executors. So, if you run E executors on the same instance, your cache consumes E times more memory compared to running a single executor.
The following chart compares gigantic workload memory usage for:
And all the executors running on the same instance (pseudo-distributed configuration).
Figure 4. Memory usage comparison for gigantic workloads for 12 and 6 executors running on the same machine
The second configuration clearly consumes less memory due to the smaller number of cache replications. So, when caching a large chunk of data, reducing the number of executors with bigger sizes can help decrease memory usage. Analysis shows that except for the initial phase for data pre-processing, processing times of K-Means stages do not differ.
The implementation of K-Means only requires shuffling a small amount of data (which is an expensive operation in distributed computing). Different stages are separated by collecting centroid-based calculations over each partition into the driver for final centroid computation, which has a very small data size. Therefore, distributing the workload into a cluster of instances will not have a considerable impact on the performance.
Tuning Spark computations are application specific and depends on different parameters such as data storage, caching, and shuffling. In cases like caching and storage, it is possible to use disk storage when memory could become a bottleneck (this might considerably degrade the performance on AWS since SSD read/write throughputs are throttled).
For applications like K-Means that are CPU intensive and do not involve shuffling, most of the tuning is on memory management. Using a smaller number of executors of bigger size, caching on both disk and memory based on availability, and using compute-optimized instances are options to consider for performance and cost optimizations.
References
[1] Tuning Spark, https://spark.apache.org/docs/latest/tuning.html
[2] HiBench suite, https://github.com/Intel-bigdata/HiBench
[3] Hadoop: Setting up a Single Node Cluster, https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html
[4] Decoding Memory in Spark, https://medium.com/walmartglobaltech/decoding-memory-in-spark-parameters-that-are-often-confused-c11be7488a24