Apache Spark is a framework for running big data computations over a cluster of machines. One important Spark use case is big data analytics with machine learning. The size of training dataset can be terabytes. Such datasets do not fit into a single machine’s memory and must be distributed over a cluster of machines to be processed.
A machine learning pipeline has multiple stages, including data extraction, transformation, and training. Depending on the transformation process and intermediate data size, the required memory may be different from the size of the original data. In addition, different machine learning algorithms have different resource requirements. Therefore, users should evaluate their ML pipeline to assign appropriate cluster resources before deploying it into a production environment.
AWS Graviton2 processors use 64-bit Arm Neoverse cores. Instances powered by the Graviton2 processor provide better price-performance compared to x86-based instances. This blog shows how to train a and K-means clustering model on a Spark cluster running on Amazon EKS with Graviton2-based instances.
Spark’s machine learning library is called MLlib. It implements ML algorithms, data transformations, and pipelines in a distributed fashion. It allows users to save the trained models and load them back in prediction phase. The new MLlib library (also known as Spark ML) is based on Spark’s Dataframe API, allowing Spark to apply optimizations on the data pipeline. This blog demonstrates two ML case studies (Decision Tree Regression and K-means clustering) using large sets of data running on an EKS cluster with Graviton2 instances.
For the case studies in this document, readers must follow the guidelines in our previous Twitter Analysis blog. This blog shows how to set up an EKS cluster, fine-tune the cluster, and create a Spark Scala project. Note that you may must change the number of Graviton2 machines running as worker nodes based on the case study you are implementing.
Decision trees are one of the popular methods in supervised learning. Decisions trees are used in solving regression and classification problems. The decision tree’s internal nodes express a question on the input feature, branches show the decisions, and leaves are outcomes.
Spark uses Random Forest and Gradient-Boosted Trees as two popular tree ensemble algorithms to reduce possible overfitting of decision trees. One important benefit of running decision trees on Spark is that the implementation efficiently distributes the training over multiple nodes.
The dataset used in this case study contains California housing prices for different geographical locations, age of the property, number of bedrooms, and several other features. We train a decision tree regression model using this dataset, so we can predict the price of other houses in the market.
The size of the original dataset is small (1.2MB). Therefore, the same data is replicated for this case study to create a dataset of size 55.7GB (broken into multiple CSV files). 90% of the data is used for training, and the remaining 10% is run through the model for prediction. The dataset is stored inside an Amazon S3 bucket.
This diagram shows how the different components in the ML case study interact.
Figure 1. Components in decision tree ML application
A bucket with the dataset uploaded should be created before running the Spark job. The ML pipeline should also have read and write access to the AWS S3 bucket.
To deploy the required resources using Terraform, create a file called ml.tf inside the EKS terraform folder (remove twitter.tf from the previous case study). Place the following content in the file. This creates an S3 bucket called spark-ml-demo and add the right permissions to the role:
resource "aws_s3_bucket" "spark-ml-demo" { bucket = "spark-ml-demo" } resource "aws_iam_role_policy_attachment" "s3-full-access" { role = "eksWorkerRole" policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess" }
Run terraform apply to create the resources, then upload the dataset into the spark-ml-demo bucket.
Similar to the Twitter , readers are required to create an sbt project, and add the following dependencies to the build.sbt file:
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.1.2", "org.apache.spark" %% "spark-sql" % "3.1.2", "org.apache.spark" %% "spark-mllib" % "3.1.2", "software.amazon.awssdk" % "s3" % "2.17.31", "org.apache.hadoop" % "hadoop-common" % "3.2.0", "org.apache.hadoop" % "hadoop-aws" % "3.2.0", )
The following code reads the dataset (housing_1.csv, …) from the spark-ml-demo S3 bucket (assuming that the dataset is broken into multiple CSV files inside the bucket) into a dataframe. The input gets split into training and test datasets. Training dataset gets vectorized and passed to the decision tree regressor fit function to create a model which is a transformer in the Spark ML context.
The next step in the code covers ML inference. It vectorizes the test dataset using the same vector assembler for training data transformation. The prediction dataframe is generated by calling the transform function of the model on the vectorized test dataset. The result is written back to the same S3 bucket under the predictions folder.
Note that only a subset of features is used in training.
package sparkml import org.apache.spark.sql._ import org.apache.spark.ml.regression.DecisionTreeRegressor import org.apache.spark.ml.feature.VectorAssembler import org.apache.log4j._ object RealEstate { // this is for California dataset: // https://developers.google.com/machine-learning/crash-course/california-housing-data-description case class Property(longitude: Double, latitude: Double, housing_median_age: Double, total_rooms: Double, total_bedrooms: Double, population: Double, households: Double, median_income: Double, median_house_value: Double, ocean_proximity: String) def main(array: Array[String]) { // set the log level Logger.getLogger("org").setLevel(Level.ERROR) val spark = SparkSession .builder() .appName("realEstate") .getOrCreate() import spark.implicits._ val realEstate = spark .read .option("header", "true") .option("inferSchema", "true") .csv("s3a://spark-ml-demo/housing_*.csv") .as[Property] val Array(trainingData, testData) = realEstate.randomSplit(Array(0.9, 0.1)) val assembler = new VectorAssembler() .setInputCols(Array("housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income")) .setOutputCol("features") .setHandleInvalid("skip") val trainDf = assembler .transform(trainingData) .select("features", "median_house_value") val dt = new DecisionTreeRegressor() .setLabelCol("median_house_value") .setFeaturesCol("features") val model = dt.fit(trainDf) val testDF = assembler .transform(testData) .select("features", "median_house_value") val predictions = model.transform(testDF) .select("median_house_value", "prediction") predictions.write.csv("s3a://spark-ml-demo/predictions") spark.stop() } }
In the above code, we only used numerical features for training and inference. It is left to the readers to add categorical features like ocean_proximity to the input columns and to verify the results.
Store the original dataset on your machine and apply the following modifications to the Scala script:
val spark = SparkSession.builder().appName("realEstate").master("local[*]").getOrCreate()
Now, running the code locally creates a CSV file containing the predictions. Make sure you have reverted the changes back before packaging your software.
Follow the guidelines in Twitter analysis case study for packaging the code and dependencies.
The process is like the Twitter analysis case study, except here, we are working with a much larger dataset and more detailed resource planning.
This case study runs on 7 memory-optimized r6g.4xlarge instances. Those instances come with 16 vCPUs and 128GiB of memory. The spark-submit command to run the job on the EKS cluster assuming that the name of the application JAR file is spark_ml.jar (replace KUBERNETES_MASTER_ADDRESS and DOCKER_IMAGE_ADDRESS with the corresponding information). The number of vCPUs allocated to the driver and each executor is set to 5, which allows 3 pods to run on a single machine. The number of executors and the memory allocation is calculated accordingly.
bin/spark-submit \ --class sparkml.RealEstate \ --master k8s://<KUBERNETES_MASTER_ADDRESS> \ --deploy-mode cluster \ --conf <DOCKER_IMAGE_ADDRESS> \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.driver.pod.name="spark-ml" \ --conf spark.kubernetes.namespace=default \ --conf spark.executor.instances=20 \ --conf spark.driver.cores=5 \ --conf spark.executor.cores=5 \ --conf spark.executor.memory=34g \ --conf spark.driver.memory=34g \ --conf spark.memory.fraction=0.8 \ --name sparkML \ local:///opt/spark/jars/spark_ml.jar
To monitor if your application is running error free on the Kubernetes cluster, you can use the Spark UI to monitor different statistics of the Spark job. Run the Spark UI by submitting the following command and open the UI on your browser (localhost:4040):
kubectl port-forward spark-ml 4040:4040
The case study runs for 6 minutes and writes the prediction result into the S3 bucket broken on multiple CSV files. The CSV files have two columns: the first column is the actual median house value, and the second one is the predicted value (you can modify the machine learning script to include other relevant features in the CSV file).
95800,132367.88804380846113800,190347.262329421113800,190347.26232942167500,104197.7269326583967500,104197.7269326583965800,104197.7269326583968600,104197.7269326583971100,190347.26232942153500,104197.7269326583971300,158820.9294113184459200,104197.7269326583959200,104197.72693265839
This section shows how to run K-means clustering training and inference on a Spark cluster running on Amazon EKS.
K-means is the most popular clustering algorithm in the unsupervised learning world. Given k as the expected number of clusters, the algorithm randomly initializes k cluster centers called cluster centroids, and iteratively:
The iteration continues until it converges. Spark uses a variant of K-means algorithm that can be parallelized over the cluster.
The K-means case study uses the dataset for Uber driver locations which includes the latitude and longitude of the drivers at different dates and times. The size of the dataset in CSV format is 209.7MB. Therefore, the data is replicated to create a dataset of size 195.5GB on Amazon S3 (broken into multiple CSV files). It uses 90% of the data for training, and the rest for testing.
The diagram, requirements, and dependencies are the same as the previous example for decision tress.
The following is the implementation of K-means clustering on Spark, where k is set to 16. It stores the result back into an S3 bucket under the uber-predictions folder.
package spark_ml import org.apache.spark.sql._ import org.apache.spark.ml.feature.VectorAssembler import org.apache.log4j._ import org.apache.spark.ml.clustering.KMeans object Uber { def main(array: Array[String]) { // set the log level Logger.getLogger("org").setLevel(Level.ERROR) val spark = SparkSession .builder() .appName("uber") .getOrCreate() val uber = spark .read .option("header", "true") .option("inferSchema", "true") .csv("s3a://spark-ml-demo/uber-raw-data*.csv") val Array(trainingData, testData) = uber.select("Lat", "Lon").randomSplit(Array(0.90, 0.1)) val assembler = new VectorAssembler() .setInputCols(Array("Lat", "Lon")) .setOutputCol("features") .setHandleInvalid("skip") val trainDf = assembler .transform(trainingData) val dt = new KMeans() .setK(16) .setFeaturesCol("features") .setPredictionCol("prediction") val model = dt.fit(trainDf) model.clusterCenters.foreach(println) val testDF = assembler .transform(testData) val predictions = model.transform(testDF) .select("Lat", "Lon", "prediction") predictions.write.csv("s3a://spark-ml-demo/uber-predictions") spark.stop() } }
Use locally stored Uber driver location datasets to run the code on your machine. Follow the same procedures as for the decision tree regression.
Follow the same guidelines as in the Twitter analysis case study for packaging the code and dependencies.
The K-means case study runs on 9 memory-optimized r6g.4xlarge instances. Therefore, the number of executors in the spark-submit parameters is changed accordingly:
bin/spark-submit \ --class sparkml.Uber \ --master k8s://<KUBERNETES_MASTER_ADDRESS> \ --deploy-mode cluster \ --conf <DOCKER_IMAGE_ADDRESS> \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.driver.pod.name="spark-ml" \ --conf spark.kubernetes.namespace=default \ --conf spark.executor.instances=26 \ --conf spark.driver.cores=5 \ --conf spark.executor.cores=5 \ --conf spark.executor.memory=34g \ --conf spark.driver.memory=34g \ --conf spark.memory.fraction=0.8 \ --name sparkML \ local:///opt/spark/jars/spark_ml.jar
The case study completes in 34 minutes over the Graviton2 cluster of workers. When the Spark job successfully terminates, the results should be in the uber-prediction folder broken into multiple CSV files. There are three columns in the CSV file: the first two columns are latitudes and longitudes, and the third column is the cluster number. You can also print the cluster centroids by calling model.clusterCenters.foreach(println) after the line where the model is created.
40.8971,-73.8652,4 40.8972,-73.8653,4 40.8972,-73.8653,4 40.8973,-74.4901,15 40.8973,-73.8635,4 40.8977,-73.947,4 40.8977,-73.9469,4 40.8978,-73.9139,4 40.8981,-73.9717,3 40.8982,-74.1262,0 40.8982,-73.8673,4 40.8985,-73.901,4 40.8985,-73.901,4 40.8985,-73.9009,4
In this blog, we demonstrated two ML case studies: decision tree regression and K-means clustering, using large sets of data and running on an EKS Kubernetes cluster with Graviton2 instances. Spark’s MLlib helps industries to run machine learning pipelines (transformation, storage, learning, and prediction) on terabytes of data in a distributed way. The number of workers increases when the data size becomes larger and the ML algorithm requires more resources. Studies by AWS In addition, lower hourly rates of Graviton2 instances provide users with up to 30% lower costs. This benefits users hugely when tens or hundreds of worker nodes are operating to perform ML computations for a long time.
Visit the AWS Graviton page for customer stories on adoption of Arm-based processors. For any queries related to your software workloads running on Arm Neoverse platforms, feel free to reach out to us at sw-ecosystem@arm.com.