Master Spark fundamentals & optimizations 

Streamhub’s all-in-one notes on how to optimize & scale Spark jobs for something more than a POC

At Streamhub, we deal with many forms of data like user’s (every minute) player viewership data, user’s commerce data, user’s subscription information, metadata feeds, metadata front-loaded, user panels, and several other third-party datasets. It is easy to imagine the amount of work required to unify these various forms of data from various sources and ingest them into our ecosystem and making them richer, standard and useful to our clients. At some point, we were running hundreds of jobs daily, crunching over terabytes of data to support our extensive use-cases. All this heavy lifting is done by Apache Spark running over AWS Elastic MapReduce, in the Yarn mode.

In the last two years, we have learned massively about how to optimize Spark jobs over EMR. Writing your first job in Spark might take a day or two but learning to tune Spark takes months if not years. This article was initially created as internal team notes which I am now publishing hoping more people can benefit from it. This covers almost everything you should know to run your Spark jobs efficiently (on EMR/Yarn but covers general aspects as well) which comprises of performance optimizations, resource allocations, cost reductions, understanding important concepts, common mistakes in coding/configuring jobs, common exceptions and some must-reads/watch about Spark we have found over time. Right, literally everything!

This article will touch upon the following topics: 

EMR Instance-types: Understand your workload and use specialized instances
EMR Purchasing options: choose the right option to optimize cost for your use-case: Spot or Spot Fleet or Spot blocking
Resource utilization: Configure your spark cluster to fully use your resources
— Static resource allocation
— — Understanding Spark’s memory usage is important
— Dynamic resource allocation
GC Tuning
Bad code and related problems
— Avoid shuffle
— Some preferred methods: ReduceByKey over GroupByKey
— Avoid serialization of the whole object
— Operations reordering
— Improve Joins
— — Using Broadcast variable (Joining very large datasets with a relatively small dataset)
— — Using filters pre-join (Joining very large datasets with a mid-sized dataset)
Degree of parallelism
— Repartition
— Coalesce
More on partitioners
Choice of serializer
Dedicated Spark local directories
Writing to databases
Data storage formats: Parquet, ORC, Databricks Delta & compression options
Caching intermediate datasets
Use higher-level APIs: Dataframes/Datasets

This article was originally on Medium, in case you find this format is easier to read:  https://medium.com/@sambodhi_72782/spark-tuning-manual-47b98ccb2b2c

Reducing your processing cost is important, also because lower is the cost, better resources you can get for your spending and lower effort you have to make in improving the performance.

Instance-types: Understand your workload and use specialised instances

  • ML mostly CPU intensive, ETL mostly IO intensive
  • Spark is incredibly memory intensive, we use memory optimized instance types like r4 or newer r5 family instances.

Purchasing options: choose the right option to optimize cost for your

use-case: Spot or Spot Fleet or Spot blocking

  • Spot: Spot instances can help reduce your EC2 costs by 40–80%.
  • Spot Fleet: The advantage of using a Spot Fleet is that instead of specifying the instance types you want, you can specify your computing and memory capacity requirements. Also, you can specify multiple subnets/availability zones. AWS will provide the available instances which fulfill that requirement. This profoundly increases your chance of getting Spot Instances.
  • Spot Block: Spot Instances with a specified duration are designed not to be interrupted and will run continuously for the duration you select. AWS charges more for blocking, but still much less than on-demand. You can also specify the duration for which it will continue to look for spot instances before switching to on-demand instances.

Good reads:

https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-requests.html#fixed-duration-spot-instances

https://aws.amazon.com/blogs/aws/new-ec2-spot-blocks-for-defined-duration-workloads

A common problem we noticed related to this was instances being blacklisted from cluster or cluster resizing unexpectedly. It is more common to see this in long-running jobs when it tries to reacquire the instances and causing the tasks/jobs to fail, which after a couple of retries may lead to this. You can see the instances blacklisted in Spark console and you can check EMR console if see if the problem was due to instances being unavailable:

Resource utilisation: Configure your spark cluster to fully use your resources

Static resource allocation

When you configure your cluster, to fully utilize your resources ensure the following equation holds true:

M = spark.executor.memory + spark.yarn.executor.memoryOverhead (by default 0.1 of executor.memory) < container-memory.

Where ‘Container memory’ is the amount of physical memory that can be allocated per container. According to Cloudera documentation, when running Spark on YARN, each Spark executor runs as a YARN container. Multiple executors (and therefore containers) can run in one instance, where M cannot be less than yarn.scheduler.minimum-allocation-mb or more than yarn.scheduler.maximum-allocation-mb and sum M for all executors/containers on a single host cannot be more than yarn.nodemanager.resource.memory-mb.

Exceptions related to this — INFO Client: Verifying our application has not requested more than the maximum memory capacity of the cluster java.lang. IllegalArgumentException: Required executor memory (17408), overhead (1740 MB), is above the max threshold (12288 MB) of this cluster! Please check the values of ‘yarn.scheduler.maximum-allocation-mb’ and/or ‘yarn.nodemanager.resource.memory-mb’. If you are on the EMR version before 4.1.0, there are more chances to see this error. Why? Because before this certain instance type had yarn.scheduler.maximum-allocation-mb set to a lower value than yarn.nodemanager.resource.memory-mb. So for certain configurations like maximizeResourceAllocation, it might try to use all the available memory in which case it would hit the max threshold. With release 4.1.0, AWS fixed this to make yarn.scheduler.maximum-allocation-mb equal to yarn.nodemanager.resource.memory-mb which fixes this problem. You can check the values of these configuration parameters for different instance types here.

Easier to understand with an example –

  • Let’s say we are using r4.4xlarge instances with 16 vCPUs, 130 GB RAM but available memory for containers is 116736 MB.
  • Leave one core for the OS and let’s say we set the number of executor-cores as 3 (16–1 = 15).
  • Calculate the number of executors: Number of executors per instance = (total number of virtual cores per instance — 1) / executor-cores with 3 cores per executor, we can have 5 executors-per-instance (16–1) / 3 = 5. So if our cluster has 4 instances, num-executors (in cluster) = 5 * 4 = 20.
  • Set spark.executor.memory 18 GB. Therefore, in our case: (18 + (0.1 * 18)) * 5 = 99GB < 116736MB (container memory)

This is just an example of how you can configure your cluster to make maximum utilization of the resources, obviously, there are many different configurations possible. You need to experiment with them and see what works best for your application.

Understanding Spark’s memory usage is important

Memory usage largely falls into two categories: execution and storage. Few points to grasp to understand the memory model:

M = spark.executor.memory + spark.yarn.executor.memoryOverhead (by default 0.1 of executor.memory) < container-memory.

Where M is the unified shared memory between execution and storage. The default value of spark.memory.fraction is 0.6 or 60%

The rest of 40% memory is used as ‘user memory’ reserved for user data structures, internal metadata, etc

M = execution memory + storage memory

Execution memory <= M * (1 — spark.memory.storageFraction)

Storage memory >= M * spark.memory.storageFraction

Spark may evict the storage memory if required by Execution but only until total storage memory usage falls under this threshold defined by spark.memory.storageFraction. Default value of spark.memory.storageFraction is 0.5.

One of the common exceptions related to memory is – ‘ExecutorLostFailure Reason: Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead

Memory overhead is the amount of off-heap memory allocated to each executor. Memory overhead is used for buffers and thread stacks. By default, it is set to either 10% of executor memory. You can try to increase this but it should respect the equation above i.e. (spark.executor.memory + spark.yarn.executor.memoryOverhead) *num-of-executors-per-instance < container memory.

Because this is about off-heap memory, there can be other (and better) ways to fix this like reducing executor core (since this would reduce the max number of tasks which reduces the required memory) or increasing number of partitions (since there are more partitions, amount of memory required per partition is less) or increasing executor memory (since this value is a factor of executor memory, increasing executor memory would increase memory overhead).

Another most common exception related is OutOfMemory, which can happen for various reasons like misconfiguring the clusters — over utilising the memory, objects persisted only in memory not spilling on disk, too much shuffling or insufficient user memory. For example, you are doing heavy processing in your application like image processing or something, you might want to decrease spark.memory.fraction to give more to user memory.

Good reads

https://spark.apache.org/docs/latest/tuning.html#memory-management-overview

Dynamic resource allocation

As you can see, configuring your cluster properly requires a bit of maths and if our workload is changing something elastic would be better. Dynamic resource allocation simplifies this for you. It adapts resources used in processing according to the workload. You can control this through configuration parameter spark.dynamicAllocation.enabled and other parameters that allow you to set the initial, minimum and maximum number of executors.

Good reads

https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

GC Tuning

Spark heavily relies on Java’s memory management and garbage collection since it can store large objects in memory.

The first step in GC tuning is to collect statistics on the frequency and execution times of the GC. This can be done passing -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps as spark.executor.extraJavaOptions and spark.driver.extraJavaOptions in a job’s configuration. Similarly, you can set the GC type like -XX:+UseG1GC specifies that the G1GC garbage collector should be used (default is -XX:+UseParallelGC).

Spark has seen evolution in GC from the traditional memory management (heap space is divided into Young-Eden/Survivor and Old generations) — Concurrent Mark Sweep (CMS) GC and ParallelOld GC, to G1 GC (heap space divided into regions) which aims to achieve both high throughput and low pause, to project Tungsten available with higher-level APIs which aims to reduce GC by exploiting the schema knowledge of data to layout memory explicitly.

From oracle: In the case of G1 GC, the heap is partitioned into a set of equal-sized heap regions, each a contiguous range of virtual memory. Certain region sets are assigned the same roles (Eden, Survivor, Old) as in the older collectors, but there is not a fixed size for them. This provides greater flexibility in memory usage. G1 tracks the liveliness of objects in the region (‘remembered set’) and concentrates its collection and compaction activity on the areas of the heap that are likely to be full of reclaimable objects, that is, garbage. This makes pauses much controlled.

A problem with G1 GC is ‘humongous objects’ — any object that is more than half a region size is considered a humongous object. If you see back-to-back concurrent cycles initiated due to Humongous allocations a probable fix is to increase -XX:G1HeapRegionSize such that previous Humongous objects are no longer Humongous and will follow the regular allocation. My colleague, Yash Datta, talks more about fine-tuning with G1 GC here.

Until Java 1.8u40, the reclamation of humongous regions was only done during full GC events. This was fixed or improved in later releases https://bugs.openjdk.java.net/browse/JDK-8027959 so the impact of the issue has been reduced significantly for newer JVMs.

Bad code and related problems

There are these brilliant slides ‘Everyday I’m Shuffling by Holden Karau and Vida Ha from Databricks which I guess every spark engineer must-read for tips to write better Spark programs. I am listing here the very common problems we faced which may or may not be covered here, skipping the details if they are already covered.

Good read: https://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs

Avoid shuffle (we know this but we still do!)

Use built-in functions and avoid unnecessary collect. collect() sends all the partitions to the single driver which can cause OOM errors.

Some preferred methods: ReduceByKey over GroupByKey

As ReduceByKey combines data before shuffling, therefore minimizing the amount of data transferred over the network compared to GroupByKey. GroupByKey most of the time causes of out of disk problems.

Avoid serialization of the whole object

Avoid serialization of the whole object due to references in closures passed to transformation functions (map, flatMap, filter, etc) or while broadcasting a variable. When you apply such transformations, your transformation code or closure is:

  1. Serialized on the driver node
  2. Shipped over the network to the worker nodes
  3. Deserialized
  4. Executed on each partition independently

For example, the code below will fail since it will try to serialize Executor (which is not Serializable), it would throw SparkException: Task not serializable Caused by java.io.NotSerializableException.

val rdd: RDD[String] = …
object Executor { 
def
bc = sc.broadcast(map)

rdd
.map(bc.value.get)
}

Another common example: 

object Executor { 
def
f (log: String): String = log.stripSuffix(“”)

val
sample = rdd.map(f(_))
}
object Executor { 
rdd
.map(_.stripSuffix(“”))
}

2. Instead of def, use val function and enclose it in a function or block so it wouldn’t need to serialize the whole object because it can access everything required the scope of the block.

object Executor { 
val
xyz = …

val
block = {

val
f: String => String = (log: String) => log.stripSuffix(“”)

rdd
.map(f(_))
}
}

3. Put it into a companion object which is Serializable.

class Executor { 
val
xyz = …
.. }
object Executor extends java.io.Serializable {
def
f (log: String): String = log.stripSuffix(“”)
}
val block = rawRDD.map(f(_))

If you are using RDD, you have to explicitly broadcast, for example: 

val bc = sc.broadcast(smallRdd.keyBy(_.id).collect.toMap
largeRdd.map (bc.value.get(_.id))

If you are using dataframe, broadcast joins are done automatically. Config parameter spark.sql.autoBroadcastJoinThreshold and a broadcast hint is used used to control broadcast and therefore affects the performance of your job.

val df1 = spark.range(1000)
val df2 = spark.range(1000)
df1.join(df2, Seq(“id”)).explain
== Physical Plan ==
*(2) Project [id#0L]
+- *(2) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight
:- *(2) Range (0, 1000, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Range (0, 1000, step=1, splits=8)

 

 spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1)

df1.join(df2, Seq(“id”)).explain

  • RangePartitioner: If keys in your data follow a particular ordering, range partitioning is an efficient partitioning technique. In the range partitioning method, tuples having keys within the same range will appear on the same machine.
Operations on RDDs that propagates a partitioner

Choice of Serializer

Serialization is sometimes a bottleneck when shuffling and caching data.

  • By default, Spark serializes objects using Java’s ObjectOutputStream framework, and can work with any class you create that implements java.io.Serializable.
  • Kryo is significantly faster and more compact than Java serialization (often as much as 10x). Kryo requires you to register every class you are going to use, this lets Kyro avoid to write the name of the class along with every object which is one of the largest overhead in serialization.

However, Datasets’ encoders are considered to be much faster compared to serialization/deserialization via Java/Kryo.

Good reads:

Apache Spark Datasets https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html

Tuning Spark https://spark.apache.org/docs/latest/tuning.html#data-serialization

Dedicated Spark local directories

Spark writes intermediate output files from map tasks, shuffles and data that gets stored to disk to a temporary directory or ‘scratch’ space. It is set through spark.local.dir or yarn.nodemanager.local-dirs in yarn mode, which by default is /tmp. /tmp is used by the operating system so it might get filled up very quickly which can lead to exceptions like java.io.IOException: No space left on device.

In such cases, if you may notice that the ‘Shuffle write’ is huge, meaning there is a lot of movement in data, implying ‘scratch’ might be used heavily, can get above exception. Increasing the scratch space might fix your problem.

Spark recommends this directory to be fast, local and if possible dedicated. You can also pass a list of directories to this parameter.

EMR provides a simple interface to add volumes to nodes in your cluster. For example, here I have added two volumes to my core instance:

Mounted as:

 Filesystem Size Used Avail Use% Mounted on

/dev/xvdb2 95G 857M 95G 1% /mnt
/dev/xvdc 90G 425M 90G 1% /mnt1 

If you check yarn-site.xml, you will see ‘yarn.nodemanager.local-dirs’ is already set to ‘/mnt/yarn,/mnt1/yarn’.

Writing to databases

Normally you would initialize the Database Connection on the Worker rather than the Driver since network sockets are non-serializable. When persisting an RDD to a database, connections should be initiated on each partition (foreachPartition) rather than in the driver or for each entry (which would fail due to too many connections or extremely slow).

Your connector should be writing in batches, you can tune the parameters related to it like batch size, the number of parallel writes, etc.

Prefer the connectors which are ‘locality aware’ so it can reduce the data movement based on the placement of your Database and Spark instances

Data storage formats & compression options

Parquet

Parquet is a columnar format that automatically preserves the schema of the original data. Also, this format is supported by many other data processing systems. It can save both time and space since it enables the jobs to read a smaller fraction of the data which is required for the calculation

due to push-down filters and default compression, snappy, it comes with. Apart from compression, parquet uses many different encodings for different data types which you can read about here

Since Parquet files require a schema, if you are using RDDs, you would normally convert your RDD to Dataframe. RDDs are not required to have schema but Dataframes must-have.

 val df = rdd.toDF() // convert RDD to Dataframe

df.write.parquet(<filepath>) // save as parquet 

ORC

ORC is also a columnar like Parquet but in some cases, also as per our benchmarks with Presto, can outperform Parquet. Here is a comparison by Nexla of common data formats.

Databricks Delta

We are still exploring this but Delta looks very promising especially for ML workloads since over the usual Parquet format, it adds transactional properties to your data and makes versioning and rollbacks possible, this allows you to reproduce your ML experiments. It can be called as ‘super parquet’ format.

Compression Options

This slide from Yahoo! nicely summarises the popular compressions.

Good read

Compression options -Yahoo! -https://www.slideshare.net/Hadoop_Summit/singh-kamat-june27425pmroom210c

Caching intermediate datasets

Available levels:

  • MEMORY_ONLY (default) — deserialization, fast scanning (in runtime) but worse for GC pressure because you have random objects sitting around inside of cache.
  • MEMORY_ONLY_SER stores as serialized Java objects (one byte array per partition). This can help cut down on GC, space-efficient than deserialized objects but more CPU-intensive to read.
  • MEMORY_AND_DISK avoids expensive recomputation. If the RDD does not fit in memory, at least it would be in the disk and you won’t have to repeat the expensive computation.

To my surprise, this was the simplest and most effective method for us to gain performance. Some of our jobs ran six times faster after changing the persistence from default to MEMORY_AND_DISK_SER! That is because, in the case of MEMORY_ONLY caching if allowed storage memory is full, blocks are evicted for the newer blocks to be cached. But with MEMORY_AND_DISK, cached blocks are simply spilled over to disk without evicting any cached blocks.

Use higher-level APIs

If your data is structured or semi-structured, it is better to use Dataframes/Datasets which not only provides higher-level abstraction, richer semantics, type-safety at compile-time, columnar access etc but also better space and speed efficiency which is mainly due to following optimizations:

Tungsten: which substantially improves the memory and CPU efficiency. It uses the knowledge of data schema to directly lay out the memory explicitly. As a result, Tungsten Encoders can efficiently serialize/deserialize JVM objects as well as generate compact bytecode that can execute at superior speeds.

Catalyst query optimizer: which can do optimizations like reordering of operations or reducing the amount of data that must be read for a calculation.

Good reads:

A Tale of Three Apache Spark APIs https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

Power of Datasets https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html

Project Tungsten and Catalyst SQL optimizer https://medium.com/@goyalsaurabh66/project-tungsten-and-catalyst-sql-optimizer-9d3c83806b63

These are the basic concepts you must know to continuously optimize and scale your jobs on production, irrespective of whether you are working with RDDs or SparkSQL or higher-level APIs, which now does many optimizations under the hood and recommended but knowing fundamentals always helps. I still refer to these notes from time to time to refresh.

If you happen to reach the end of this article to read this, I will be happy to hear your feedback.

Share This