Arm Community
Arm Community
  • Site
  • User
  • Site
  • Search
  • User
Arm Community blogs
Arm Community blogs
Servers and Cloud Computing blog Spark on AWS Graviton2: Machine Learning with MLlib
  • Blogs
  • Mentions
  • Sub-Groups
  • Tags
  • Jump...
  • Cancel
More blogs in Arm Community blogs
  • AI blog

  • Announcements

  • Architectures and Processors blog

  • Automotive blog

  • Embedded and Microcontrollers blog

  • Internet of Things (IoT) blog

  • Laptops and Desktops blog

  • Mobile, Graphics, and Gaming blog

  • Operating Systems blog

  • Servers and Cloud Computing blog

  • SoC Design and Simulation blog

  • Tools, Software and IDEs blog

Tags
  • aws
  • Open Source Software
  • Machine Learning (ML)
  • Graviton2
  • infrastructure
Actions
  • RSS
  • More
  • Cancel
Related blog posts
Related forum threads

Spark on AWS Graviton2: Machine Learning with MLlib

Masoud Koleini
Masoud Koleini
November 22, 2021
10 minute read time.

Apache Spark is a framework for running big data computations over a cluster of machines. One important Spark use case is big data analytics with machine learning. The size of training dataset can be terabytes. Such datasets do not fit into a single machine’s memory and must be distributed over a cluster of machines to be processed.

A machine learning pipeline has multiple stages, including data extraction, transformation, and training. Depending on the transformation process and intermediate data size, the required memory may be different from the size of the original data. In addition, different machine learning algorithms have different resource requirements. Therefore, users should evaluate their ML pipeline to assign appropriate cluster resources before deploying it into a production environment. 

AWS Graviton2 processors use 64-bit Arm Neoverse cores. Instances powered by the Graviton2 processor provide better price-performance compared to x86-based instances. This blog shows how to train a and K-means clustering model on a Spark cluster running on Amazon EKS with Graviton2-based instances.

Spark machine learning library

Spark’s machine learning library is called MLlib. It implements ML algorithms, data transformations, and pipelines in a distributed fashion. It allows users to save the trained models and load them back in prediction phase. The new MLlib library (also known as Spark ML) is based on Spark’s Dataframe API, allowing Spark to apply optimizations on the data pipeline. This blog demonstrates two ML case studies (Decision Tree Regression and K-means clustering) using large sets of data running on an EKS cluster with Graviton2 instances.

Requirements

For the case studies in this document, readers must follow the guidelines in our previous Twitter Analysis blog. This blog shows how to set up an EKS cluster, fine-tune the cluster, and create a Spark Scala project. Note that you may must change the number of Graviton2 machines running as worker nodes based on the case study you are implementing.

Case study 1: Decision Tree Regression - housing prices

Decision trees are one of the popular methods in supervised learning. Decisions trees are used in solving regression and classification problems. The decision tree’s internal nodes express a question on the input feature, branches show the decisions, and leaves are outcomes.

Spark uses Random Forest and Gradient-Boosted Trees as two popular tree ensemble algorithms to reduce possible overfitting of decision trees. One important benefit of running decision trees on Spark is that the implementation efficiently distributes the training over multiple nodes.

California housing dataset prices

The dataset used in this case study contains California housing prices for different geographical locations, age of the property, number of bedrooms, and several other features. We train a decision tree regression model using this dataset, so we can predict the price of other houses in the market.

The size of the original dataset is small (1.2MB). Therefore, the same data is replicated for this case study to create a dataset of size 55.7GB (broken into multiple CSV files). 90% of the data is used for training, and the remaining 10% is run through the model for prediction. The dataset is stored inside an Amazon S3 bucket.

Diagram

This diagram shows how the different components in the ML case study interact.

Components in decision tree ML application

Figure 1. Components in decision tree ML application 

AWS components and permissions

A bucket  with the dataset uploaded should be created before running the Spark job. The ML pipeline should also have read and write access to the AWS  S3 bucket.

To deploy the required resources using Terraform, create a file called ml.tf inside the EKS terraform folder (remove twitter.tf from the previous case study). Place the following content in the file. This creates an S3 bucket called spark-ml-demo and add the right permissions to the role:

resource "aws_s3_bucket" "spark-ml-demo" {
  bucket = "spark-ml-demo"
}

resource "aws_iam_role_policy_attachment" "s3-full-access" {
  role       = "eksWorkerRole"
  policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
}

Run terraform apply to create the resources, then upload the dataset into the spark-ml-demo   bucket.

Spark job implementation

Similar to the Twitter , readers are required to create an sbt project, and add the following dependencies to the build.sbt file:

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-core" % "3.1.2",
    "org.apache.spark" %% "spark-sql" % "3.1.2",
    "org.apache.spark" %% "spark-mllib" % "3.1.2",
    "software.amazon.awssdk" % "s3" % "2.17.31",
    "org.apache.hadoop" % "hadoop-common" % "3.2.0",
    "org.apache.hadoop" % "hadoop-aws" % "3.2.0",
)

The following code reads the dataset (housing_1.csv, …) from the spark-ml-demo  S3 bucket (assuming that the dataset is broken into multiple CSV files inside the bucket) into a dataframe. The input gets split into training and test datasets. Training dataset gets vectorized and passed to the decision tree regressor fit function to create a model which is a transformer in the Spark ML context.

The next step in the code covers ML inference. It vectorizes the test dataset using the same vector assembler for training data transformation. The prediction dataframe is generated by calling the transform function of the model on the vectorized test dataset. The result is written back to the same S3 bucket under the predictions folder.

Note that only a subset of features is used in training.

package sparkml

import org.apache.spark.sql._
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.log4j._

object RealEstate {
    // this is for California dataset:
    //   https://developers.google.com/machine-learning/crash-course/california-housing-data-description

    case class Property(longitude: Double, latitude: Double, housing_median_age: Double, total_rooms: Double,
                        total_bedrooms: Double, population: Double, households: Double, median_income: Double,
                        median_house_value: Double, ocean_proximity: String)

    def main(array: Array[String]) {

        // set the log level
        Logger.getLogger("org").setLevel(Level.ERROR)

        val spark = SparkSession
            .builder()
            .appName("realEstate")
            .getOrCreate()

        import spark.implicits._

        val realEstate = spark
            .read
            .option("header", "true")
            .option("inferSchema", "true")
            .csv("s3a://spark-ml-demo/housing_*.csv")
            .as[Property]

        val Array(trainingData, testData) = realEstate.randomSplit(Array(0.9, 0.1))

        val assembler = new VectorAssembler()
            .setInputCols(Array("housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"))
            .setOutputCol("features")
            .setHandleInvalid("skip")

        val trainDf = assembler
            .transform(trainingData)
            .select("features", "median_house_value")

        val dt = new DecisionTreeRegressor()
            .setLabelCol("median_house_value")
            .setFeaturesCol("features")

        val model = dt.fit(trainDf)

        val testDF = assembler
            .transform(testData)
            .select("features", "median_house_value")

        val predictions = model.transform(testDF)
            .select("median_house_value", "prediction")

        predictions.write.csv("s3a://spark-ml-demo/predictions")

        spark.stop()
    }
}

In the above code, we only used numerical features for training and inference. It is left to the readers to add categorical features like ocean_proximity to the input columns and to verify the results.

Testing the program locally

Store the original dataset on your machine and apply the following modifications to the Scala script:

  1. Set master in your Spark config to run locally
val spark = SparkSession
.builder()
.appName("realEstate")
.master("local[*]")
.getOrCreate()
  1. Replace S3 dataset location to the address of the csv file on your machine
  2. Replace output address on S3 with a local directory, like /tmp

Now, running the code locally creates a CSV file containing the predictions. Make sure you have reverted the changes back before packaging your software.

Packaging the code and dependencies

Follow the guidelines in Twitter analysis case study for packaging the code and dependencies.

Submitting a Spark job

The process is like the Twitter analysis case study, except here, we are working with a much larger dataset and more detailed resource planning.

This case study runs on 7 memory-optimized r6g.4xlarge instances. Those instances come with 16 vCPUs and 128GiB of memory. The spark-submit command to run the job on the EKS cluster assuming that the name of the application JAR file is spark_ml.jar (replace KUBERNETES_MASTER_ADDRESS and DOCKER_IMAGE_ADDRESS with the corresponding information). The number of vCPUs allocated to the driver and each executor is set to 5, which allows 3 pods to run on a single machine. The number of executors and the memory allocation is calculated accordingly.

bin/spark-submit \
  --class sparkml.RealEstate \
  --master k8s://<KUBERNETES_MASTER_ADDRESS> \
  --deploy-mode cluster \
  --conf <DOCKER_IMAGE_ADDRESS> \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.driver.pod.name="spark-ml" \
  --conf spark.kubernetes.namespace=default \
  --conf spark.executor.instances=20 \
  --conf spark.driver.cores=5 \
  --conf spark.executor.cores=5 \
  --conf spark.executor.memory=34g \
  --conf spark.driver.memory=34g \
  --conf spark.memory.fraction=0.8 \
  --name sparkML \
  local:///opt/spark/jars/spark_ml.jar

To monitor if your application is running error free on the Kubernetes cluster, you can use the Spark UI to monitor different statistics of the Spark job. Run the Spark UI by submitting the following command and open the UI on your browser (localhost:4040):

kubectl port-forward spark-ml 4040:4040

Prediction results

The case study runs for 6 minutes and writes the prediction result into the S3 bucket broken on multiple CSV files. The CSV files have two columns: the first column is the actual median house value, and the second one is the predicted value (you can modify the machine learning script to include other relevant features in the CSV file).

95800,132367.88804380846
113800,190347.262329421
113800,190347.262329421
67500,104197.72693265839
67500,104197.72693265839
65800,104197.72693265839
68600,104197.72693265839
71100,190347.262329421
53500,104197.72693265839
71300,158820.92941131844
59200,104197.72693265839
59200,104197.72693265839

Case study 2: K-means clustering: Uber driver locations

This section shows how to run K-means clustering training and inference on a Spark cluster running on Amazon EKS.

K-means clustering algorithm

K-means is the most popular clustering algorithm in the unsupervised learning world. Given k as the expected number of clusters, the algorithm randomly initializes k cluster centers called cluster centroids, and iteratively:

  1. Assigns datapoints to a cluster depending on their Euclidian distance to the centroids
  2. Moves cluster centroids to the mean of the data points in each cluster                                                                                                                              

The iteration continues until it converges. Spark uses a variant of K-means algorithm that can be parallelized over the cluster.

Uber driver location dataset

The K-means case study uses the dataset for Uber driver locations which includes the latitude and longitude of the drivers at different dates and times. The size of the dataset in CSV format is 209.7MB. Therefore, the data is replicated to create a dataset of size 195.5GB on Amazon S3 (broken into multiple CSV files). It uses 90% of the data for training, and the rest for testing.

The diagram, requirements, and dependencies are the same as the previous example for decision tress.

Spark job implementation

The following is the implementation of K-means clustering on Spark, where k is set to 16. It stores the result back into an S3 bucket under the uber-predictions folder.

package spark_ml

import org.apache.spark.sql._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.log4j._
import org.apache.spark.ml.clustering.KMeans

object Uber {

        def main(array: Array[String]) {

            // set the log level
            Logger.getLogger("org").setLevel(Level.ERROR)

            val spark = SparkSession
                .builder()
                .appName("uber")
                .getOrCreate()

            val uber = spark
                .read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("s3a://spark-ml-demo/uber-raw-data*.csv")

            val Array(trainingData, testData) = uber.select("Lat", "Lon").randomSplit(Array(0.90, 0.1))

            val assembler = new VectorAssembler()
                .setInputCols(Array("Lat", "Lon"))
                .setOutputCol("features")
                .setHandleInvalid("skip")

            val trainDf = assembler
                .transform(trainingData)

            val dt = new KMeans()
                .setK(16)
                .setFeaturesCol("features")
                .setPredictionCol("prediction")

            val model = dt.fit(trainDf)
            model.clusterCenters.foreach(println)

            val testDF = assembler
                .transform(testData)

            val predictions = model.transform(testDF)
                .select("Lat", "Lon", "prediction")

            predictions.write.csv("s3a://spark-ml-demo/uber-predictions")

            spark.stop()
        }
}

Testing the program locally

Use locally stored Uber driver location datasets to run the code on your machine. Follow the same procedures as for the decision tree regression.

Packaging the code and dependencies

Follow the same guidelines as in the Twitter analysis case study for packaging the code and dependencies.

Submitting Spark job

The K-means case study runs on 9 memory-optimized r6g.4xlarge instances. Therefore, the number of executors in the spark-submit parameters is changed accordingly:

bin/spark-submit \
  --class sparkml.Uber \
  --master k8s://<KUBERNETES_MASTER_ADDRESS> \
  --deploy-mode cluster \
  --conf <DOCKER_IMAGE_ADDRESS> \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.driver.pod.name="spark-ml" \
  --conf spark.kubernetes.namespace=default \
  --conf spark.executor.instances=26 \
  --conf spark.driver.cores=5 \
  --conf spark.executor.cores=5 \
  --conf spark.executor.memory=34g \
  --conf spark.driver.memory=34g \
  --conf spark.memory.fraction=0.8 \
  --name sparkML \
  local:///opt/spark/jars/spark_ml.jar

Clustering results

The case study completes in 34 minutes over the Graviton2 cluster of workers. When the Spark job successfully terminates, the results should be in the uber-prediction folder broken into multiple CSV files. There are three columns in the CSV file: the first two columns are latitudes and longitudes, and the third column is the cluster number. You can also print the cluster centroids by calling model.clusterCenters.foreach(println) after the line where the model is created.

40.8971,-73.8652,4                                                                                       
40.8972,-73.8653,4                                                                                       
40.8972,-73.8653,4                                                                                        
40.8973,-74.4901,15                                                                                      
40.8973,-73.8635,4                                                                                        
40.8977,-73.947,4                                                                                        
40.8977,-73.9469,4                                                                                       
40.8978,-73.9139,4                                                                                        
40.8981,-73.9717,3                                                                                       
40.8982,-74.1262,0                                                                                        
40.8982,-73.8673,4                                                                                       
40.8985,-73.901,4                                                                                        
40.8985,-73.901,4                                                                                        
40.8985,-73.9009,4                                

Conclusion

In this blog, we demonstrated two ML case studies: decision tree regression and K-means clustering, using large sets of data and running on an EKS Kubernetes cluster with Graviton2 instances.  Spark’s MLlib helps industries to run machine learning pipelines (transformation, storage, learning, and prediction) on terabytes of data in a distributed way. The number of workers increases when the data size becomes larger and the ML algorithm requires more resources.  Studies by AWS In addition, lower hourly rates of Graviton2 instances provide users with up to 30% lower costs. This benefits users hugely when tens or hundreds of worker nodes are operating to perform ML computations for a long time.  

Visit the AWS Graviton page for customer stories on adoption of Arm-based processors. For any queries related to your software workloads running on Arm Neoverse platforms, feel free to reach out to us at sw-ecosystem@arm.com.  

Anonymous
Servers and Cloud Computing blog
  • Harness the Power of Retrieval-Augmented Generation with Arm Neoverse-powered Google Axion Processors

    Na Li
    Na Li
    This blog explores the performance benefits of RAG and provides pointers for building a RAG application on Arm®︎ Neoverse-based Google Axion Processors for optimized AI workloads.
    • April 7, 2025
  • Arm CMN S3: Driving CXL storage innovation

    John Xavier Lionel
    John Xavier Lionel
    CXL are revolutionizing the storage landscape. Neoverse CMN S3 plays a pivotal role in enabling high-performance, scalable storage devices configured as CXL Type 1 and Type 3.
    • February 24, 2025
  • Streamline Arm adoption with GitHub Copilot and Arm64 Runners

    Michael Gamble
    Michael Gamble
    The Arm for GitHub Copilot extension is here to change the way developers approach architecture migration.
    • February 19, 2025