Apache Spark is a general-purpose framework for distributed computing over a cluster of machines. Data can be stored in a distributed database such as Apache Hadoop Distributed File System or collected through streaming modules. The data can then be processed by Spark executors running on a set of workers. Spark can run standalone using its own cluster manager, or be run on top of Apache Mesos, Hadoop Yarn, or Kubernetes.
AWS Graviton2 processors use 64-bit Arm Neoverse cores. Amazon EC2 instances powered by the Graviton2 processor provide better price-performance compared to x86-based instances. A recent report by AWS has shown up to 15% improved performance and up to 30% lower price when running Spark on EKS with Graviton2 workers. This blog explains the implementation of two case studies using Spark Streaming on a Kubernetes cluster with Graviton2-based worker instances. Technical details and best practices are shared to help users overcome problems faced when implementing similar applications.
Kubernetes is a popular open-source framework for deploying, scaling, and managing containerized applications. A Kubernetes cluster consists of:
Kubernetes comes with a handy tool called kubectl, a command-line interface utility to communicate with the API server. kubectl command-line parameters allow users to directly describe the resources they would like to deploy. In addition, users can express all the requirements in a yaml file and run it through kubectl for deployment.
Figure 1. Kubernetes cluster using Docker as container runtime
Kubernetes is one of the cluster managers supported by Spark. The way Spark runs a job on a Kubernetes cluster is to first create a driver pod on one of the worker nodes. Then the driver pod spawns executor pods based on the resources requested. When the job is complete, all executor pods terminate and the driver pod moves to a completed state, allowing users to access logs for debugging purposes.
Spark binaries come with a tool called spark-submit. It can talk to multiple cluster managers to run a job. The parameters passed to spark-submit defines the type of cluster manager, the address of its API server, number of executors, driver and executor resources, container image address, application JAR file, and other functional parameters. We will use spark-submit in the next sections to deploy Spark applications on a Kubernetes cluster.
Amazon EKS is a managed Kubernetes service offered by AWS. This section explains how to programmatically create a Graviton2 based EKS cluster using Terraform. We recommend readers become somewhat familiar with Terraform before proceeding to the rest of the document.
HashiCorp has provided the documentation on how to provision an EKS cluster on AWS. The Terraform scripts are available in a github repository. However, it might need a few tweaks to run on AWS accounts with different configurations.
Current worker group parameters are defaulted to launch x86 instances. Running Graviton2-based instances requires a few changes:
The Spark driver should have the right permission to spawn executors on worker instances while running as a pod on one of the Kubernetes nodes. Users can create a Kubernetes service account and ClusterRoleBinding for Spark, which can be assigned to the driver at the time of running Spark’s job. Use Spark’s documentation to create the account and binding using kubectl.
To manage your costs while keeping the resources optimized, fine-tuning the cluster is of critical importance. The following are a couple of tunings and best practices you can follow to make the best of your Kubernetes cluster:
A well-tuned Spark job should have very little failed tasks shown in the UI. Refer to the Tuning Spark documentation and AWS best practices for guidelines on how to tune your cluster. We follow similar principles to configure the resources in the cluster, vCPUs per executor, and executor memory calculations.
This case study explains how to deploy a Twitter analysis job using Spark Streaming inside an EKS cluster. This section demonstrates how to write the job in Scala, package it as a JAR file, create a Spark docker image containing the application, and deploy the container inside the EKS cluster.
We demonstrate deploying two commonly used scenarios for analyzing tweets: finding the most popular hashtags, and sentiment analysis of the tweets.
Twitter developer account: The first step towards implementing the streaming application is to create a Twitter developer account. You must first create a project and an App using the developer portal. Then, create an API Key, API Secret, Access Token, and Access Token Secret to authenticate your application and read the tweets.
Note that Twitter applies rate limits to the applications to provide reliable service. Therefore, a single executor would be sufficient to run both streaming use cases.
Spark Streaming is an API within Spark for reliable stream processing of high-throughput streams of data such as Kafka, AWS Kinesis, HDFS/S3, Flume, and Twitter. It splits the input stream into mini batches and runs them through the Spark engine, creating a stream of batches of processed data. The high-level abstraction of the data stream is called Dstream (discretized stream), which internally is mapped into a sequence of RDDs (Resilient Distributed Dataset).
Spark comes with another streaming API on top of Spark SQL called Structured Streaming. It allows data to be presented as Datasets/Dataframes (APIs on top of RDD) and allows optimized Spark SQL engine processing over the streaming data. However, it does not include Twitter connector and has some restrictions when processing aggregated data over a window. Therefore, for the sake of simplicity, we use the Spark Streaming API for this case study.
The following diagram represents how components in the real-time Twitter analysis application interact:
Figure 2. Components in real-time Twitter analysis application
The Spark Streaming API reads the stream of tweets from the Twitter API in a windowed manner. The Spark engine runs the job per window and sends the result to a Kinesis stream, which is consumed by the subscribers of the Kinesis stream.
The following AWS components and permissions are required for this case study:
If the HashiCorp Terraform script is used to create the EKS cluster, take the following steps to add the required resources:
First, add worker role name to the eks module:
Then, create a file twitter.tf inside the same folder, add the following content, and run terraform apply to create the required components inside AWS environment:
resource "aws_kinesis_stream" "spark-analysis-stream" { name = "spark_analysis_stream" shard_count = 1 tags = { // required tags } } resource "aws_iam_role_policy_attachment" "spark-kinesis-access" { role = "eksWorkerRole" policy_arn = "arn:aws:iam::aws:policy/AmazonKinesisFullAccess" } resource "aws_s3_bucket" " spark-streaming-checkpoints" { bucket = "spark-streaming-checkpoints" } resource "aws_iam_role_policy_attachment" "s3-full-access" { role = "eksWorkerRole" policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess" }
Make sure that terraform finishes the request without throwing any error.
Requirement: Readers should have some familiarity with Scala programming and sbt projects. Create a sbt project using an IDE like IntelliJ, or command line.
Create a Scala sbt project, and update build.sbt to add the following dependencies to the project (tested with Scala version 2.12.12):
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.1.2", "org.apache.spark" %% "spark-sql" % "3.1.2", "org.apache.spark" %% "spark-streaming" % "3.1.2", "org.apache.spark" %% "spark-streaming-kinesis-asl" % "3.1.2", "org.apache.bahir" %% "spark-streaming-twitter" % "2.4.0", "org.twitter4j" % "twitter4j-core" % "4.0.4", "org.twitter4j" % "twitter4j-stream" % "4.0.4", "edu.stanford.nlp" % "stanford-corenlp" % "4.2.1", "edu.stanford.nlp" % "stanford-corenlp" % "4.2.1" classifier "models", "software.amazon.awssdk" % "kinesis" % "2.17.31", "org.apache.hadoop" % "hadoop-common" % "3.2.0", "org.apache.hadoop" % "hadoop-aws" % "3.2.0", )
Create Main.scala inside the bigdata package (src/main/scala/bigdata) with the sample Scala code below. The code finds the 5 most popular hashtags for tweets in English, on a sliding window of size 1800 seconds, moving every 10 seconds. It writes the result as a JSON object into a Kinesis stream, which is made available to subscribers for consumption.
package bigdata import java.nio.charset.StandardCharsets.UTF_8 // AWS imports import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider import software.amazon.awssdk.core.SdkBytes import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.kinesis.KinesisAsyncClient import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; // Log imports import org.apache.log4j._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ // Twitter API imports import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder // Spark imports import org.apache.spark._ object Main extends App { // Create output JSON to send to output def createJSON(tuples: List[(String, Int)]): String = tuples match { case Nil => "{}" case (x, y) :: Nil => "\"" + x + "\": " + y.toString case (x, y) :: tail => "\"" + x + "\": " + y.toString + ", " + createJSON(tail) } def createRecordEntry(record: String, partition_key: String, stream_name: String): PutRecordsRequest = { val recordEntry = PutRecordsRequestEntry .builder() .data(SdkBytes.fromString(record, UTF_8)) .partitionKey(partition_key) .build() val putRecordsRequest = PutRecordsRequest .builder() .streamName(stream_name) .records(recordEntry) .build(); putRecordsRequest } // Set the log level to only print errors Logger.getLogger("org").setLevel(Level.ERROR) // Read Twitter credentials from the environment variables System.setProperty("twitter4j.oauth.consumerKey", sys.env("TWITTER_CONSUMER_KEY")) System.setProperty("twitter4j.oauth.consumerSecret", sys.env("TWITTER_CONSUMER_SECRET")) System.setProperty("twitter4j.oauth.accessToken", sys.env("TWITTER_ACCESS_TOKEN")) System.setProperty("twitter4j.oauth.accessTokenSecret", sys.env("TWITTER_TOKEN_SECRET")) // Kinesis streams parameters val kinesisStream = "spark_analysis_stream" val partitionKey = "data" val kinesisRegion = "us-east-1" // Sliding window parameters val slideIntervalSec = 10 val windowLengthSec = 1800 // Directory to output top hashtags val outputDirectory = "./twitter" val slideInterval = Seconds(slideIntervalSec) val windowLength = Seconds(windowLengthSec) // set AWS kinesis client val kinesisClient = KinesisAsyncClient .builder() .region(Region.of(kinesisRegion)) .build() // Setup the SparkConfig and StreamingContext val conf = new SparkConf().setAppName("twitterAnalysis") val ssc = new StreamingContext(conf, Seconds(1)) val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build())) val twitterStream = TwitterUtils.createStream(ssc, auth) // Analyse English tweets only val tweet = twitterStream.filter(_.getLang == "en").map(_.getText) // Finding the topmost popular hashtags val hashTagStream = tweet.flatMap(_.split(" ")).filter(_.startsWith("#")).map(w => (w, 1)) val windowedHashTagCount = hashTagStream.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), (x: Int, y: Int) => (x - y), windowLength, slideInterval) val orderedHashTags = windowedHashTagCount.transform(rdd => { val list = rdd.sortBy(_._2, false).take(5) kinesisClient.putRecords(createRecordEntry("{" + createJSON(list.toList) + "}", partitionKey, kinesisStream)) rdd }) orderedHashTags.print() ssc.checkpoint("s3://spark-streaming-checkpoints/checkpoints") ssc.start() ssc.awaitTermination() }
You can run the program locally to confirm that the result is what you expect. However, you must apply minor modifications to the above Scala program (and revert it back before creating the package)
1. Set master in your Spark config to run locally
2. Add AWS credentials to the Kinesis client (profileName is the AWS profile you would like to use)
3. Replace checkpointing address on S3 with a local directory, like /tmp
Now it should be possible to run the benchmarks on the local machine. Use the following Python script to read the most popular hashtags from the Kinesis stream:
import boto3 import json from datetime import datetime import calendar import random import time my_stream_name = 'spark_analysis_stream' kinesis_client = boto3.client('kinesis', region_name='us-east-1') response = kinesis_client.describe_stream(StreamName=my_stream_name) my_shard_id = response['StreamDescription']['Shards'][0]['ShardId'] shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name, ShardId=my_shard_id, ShardIteratorType='LATEST') my_shard_iterator = shard_iterator['ShardIterator'] record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator) record_num = 0 while 'NextShardIterator' in record_response: record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator']) for record in record_response['Records']: print(record_num, record['Data'].decode('utf-8')) record_num = record_num + 1
Assuming the above script is stored as kinesis_read.py, run this command to get the 5 most popular tweets:
Since you are going to build a container to run on Graviton2 machines, it is required to build the container on an AWS Graviton2-based instance.
First, create a JAR file containing your package and its dependencies using sbt assembly, excluding the Spark-specific JAR files (IntelliJ users can follow this article to package their application). Next, download Spark 3.1.2 and copy the application JAR file into the `jars` directory.
Next, build the Spark container image and push it to Amazon Elastic Container Registry so worker nodes can pull it to create Spark containers.
Now, it is time to submit the job to the EKS cluster. The application is submitted with a single executor and using default configurations. It is not a resource-hungry computation since the number of tweets entering the stream is limited. Before running spark-submit, replace the following parameters in the spark-submit command:
Assuming that the application JAR file is named bigdata-assembly-0.1.jar, run the following inside the downloaded Spark folder after replacing the required parameters. Please note that at least two executors are needed for RDD replication to preserve fault tolerance:
bin/spark-submit \ --class bigdata.Main \ --master k8s://<KUBERNETES_MASTER_ADDRESS> \ --deploy-mode cluster \ --conf spark.executor.instances=2 \ --conf <DOCKER_IMAGE_ADDRESS> \ --conf spark.kubernetes.driver.pod.name="twitter" \ --conf spark.kubernetes.namespace=default \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_KEY> \ --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \ --conf spark.kubernetes.driverEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \ --conf spark.kubernetes.driverEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \ --conf spark.executorEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_SECRET> \ --conf spark.executorEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \ --conf spark.executorEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \ --conf spark.executorEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \ --name sparkTwitter \ local:///opt/spark/jars/bigdata-assembly-0.1.jar
It should start running without interruption by errors.
Run the following commands to make sure driver and executor pods are operating properly:
Python script’s output (presented in the previous section) is similar to the following:
Another interesting example is sentiment analysis of the tweets received by the Spark Streaming connector using an already trained model. This time, instead of finding the most popular hashtags, the number of different sentiments over a sliding window is counted and sent to the Kinesis stream.
This repository has an implementation of Twitter sentiment analysis using StanfordNLP library. Simply add this scala file under your package’s directory and replace the package name in the file. The core dependencies are already added to the sbt file, so they should be already fetched into the dependency directory.
Changing the main Scala object is simple. This code snippet replaces the hashtag-specific computational code in Main.scala:
// transform sentiments into string import bigdata.SentimentAnalysisUtils._ def sentimentToString(sentiment: SENTIMENT_TYPE): String = { sentiment match { case NOT_UNDERSTOOD => "NOT_UNDERSTOOD" case VERY_NEGATIVE => "VERY_NEGATIVE" case NEGATIVE => "NEGATIVE" case NEUTRAL => "NEUTRAL" case POSITIVE => "POSITIVE" case VERY_POSITIVE => "VERY_POSITIVE" } } val sentiments = tweet.map(x => (SentimentAnalysisUtils.detectSentiment(x), 1)) val windowedSentimentCount = sentiments.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), (x: Int, y: Int) => (x - y), windowLength, slideInterval) val sentimentsCount = windowedSentimentCount.transform( rdd => { val list = rdd.sortBy(_._2, false).take(5).map(x => (sentimentToString(x._1), x._2)) kinesisClient.putRecords(createRecordEntry("{" + createJSON(list.toList) + "}", partitionKey, kinesisStream)) rdd }) sentimentsCount.print()
Following the same steps as the previous case study for building the JAR file, creating the Spark container, and submitting the job. Since sentiment analysis uses more resources due to the higher computation requirements, 5 worker nodes are used in this case study. The following is the spark-submit command to run the job on the cluster:
bin/spark-submit \ --class bigdata.Main \ --master k8s://<KUBERNETES_MASTER_ADDRESS> \ --deploy-mode cluster \ --conf spark.executor.instances=14 \ --conf <DOCKER_IMAGE_ADDRESS> \ --conf spark.kubernetes.driver.pod.name="twitter" \ --conf spark.kubernetes.namespace=default \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_KEY> \ --conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \ --conf spark.kubernetes.driverEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \ --conf spark.kubernetes.driverEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \ --conf spark.executorEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_SECRET> \ --conf spark.executorEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \ --conf spark.executorEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \ --conf spark.executorEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \ --conf spark.executor.cores=5 \ --conf spark.driver.cores=5 \ --conf spark.driver.memory=34g \ --conf spark.executor.memory=34g \ --conf spark.memory.fraction=0.8 \ --name sparkTwitter \ local:///opt/spark/jars/bigdata-assembly-0.1.jar
Running the same Python script shows an output like the following by reading the data from the Kinesis stream:
In the previous code, we only used numerical features for training and inference. It is left to the readers to add categorical features like ocean_proximity to the input columns and to verify the results.
In this blog, we reviewed two real-time Twitter analysis case studies using Spark Streaming on EKS with Graviton2-based instances: finding most popular hashtags, and sentiment analysis of the tweets. AWS Graviton2-based instances offer lower price per hour compared to x86-based instances while delivering competitive performance. In general, Spark jobs can require running on a high number of worker nodes in a cluster. Therefore, lower cost per instance can decrease the costs of running Spark jobs significantly in the presence of large datasets and resource-hungry computations.
Visit the AWS Graviton page for customer stories on adoption of Arm-based processors. For any queries related to your software workloads running on Arm Neoverse platforms, feel free to reach out to us at sw-ecosystem@arm.com.
[CTAToken URL = "https://aws.amazon.com/ec2/graviton/" target="_blank" text="Explore AWS Graviton" class ="green"]