When Life Gives You Options, Make Sure Not To Choose Some(null)

It was a casual afternoon at the office, writing some code, working on features. Then, all of the sudden, Spark started getting angry. If you run a production environment with Spark, you probably know that feeling when those red dots start to accumulate in the Spark UI:

Aint nobody got time for that!

It was an old friend, one I haven’t seem in a while. Its one of these friends that creeps on you from behind, when you’re not looking. Even when he knows he’s not invited, he still feels so comfortable coming back, peeping his head every now and then:

java.lang.NullPointerException

Hello nullness my old friend

When we moved from C# to Scala, it was written in bold everywhere: “Use Option[A]! avoid null at all costs”. And that actually made sense. You have this perfectly good data structure (one might even throw the M word) which makes it easy not to use null. It takes some time getting used to especially for someone making his first steps in a functional programming language. So we embraced the advice and started using Option[A] everywhere we wanted to convey the absence of a value, and so far it’s worked great. So how did our old friend still manage to creep in and cause our Spark job to scream and yell at us?

Diagnosing the issue

Spark wasn’t being very helpfull here. Mostly those red dots were accomodated by the NullPointerException and the useless Driver stacktrace, but all the actual action was happening inside the Executor nodes running the code. After some investigation, I managed to get a hold of the actual StackTrace causing the problem:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 15345.0 failed 4 times, most recent failure: Lost task 7.3 in stage 15345.0 (TID 27874, XXX.XXX.XXX.XXX): java.lang.NullPointerException
	at scala.collection.immutable.StringOps$.length$extension(StringOps.scala:47)
	at scala.collection.immutable.StringOps.length(StringOps.scala:47)
	at scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:193)
	at scala.collection.immutable.StringOps.segmentLength(StringOps.scala:29)
	at scala.collection.GenSeqLike$class.prefixLength(GenSeqLike.scala:93)
	at scala.collection.immutable.StringOps.prefixLength(StringOps.scala:29)
	at scala.collection.IndexedSeqOptimized$class.span(IndexedSeqOptimized.scala:159)
	at scala.collection.immutable.StringOps.span(StringOps.scala:29)
	at argonaut.PrettyParams.appendJsonString$1(PrettyParams.scala:131)
	at argonaut.PrettyParams.argonaut$PrettyParams$$encloseJsonString$1(PrettyParams.scala:148)
	at argonaut.PrettyParams$$anonfun$argonaut$PrettyParams$$trav$1$4.apply(PrettyParams.scala:187)
	at argonaut.PrettyParams$$anonfun$argonaut$PrettyParams$$trav$1$4.apply(PrettyParams.scala:187)
	at argonaut.Json$class.fold(Json.scala:32)
	at argonaut.JString.fold(Json.scala:472)
	at argonaut.PrettyParams.argonaut$PrettyParams$$trav$1(PrettyParams.scala:178)
	at argonaut.PrettyParams$$anonfun$argonaut$PrettyParams$$trav$1$6$$anonfun$apply$3.apply(PrettyParams.scala:204)
	at argonaut.PrettyParams$$anonfun$argonaut$PrettyParams$$trav$1$6$$anonfun$apply$3.apply(PrettyParams.scala:198)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at argonaut.PrettyParams$$anonfun$argonaut$PrettyParams$$trav$1$6.apply(PrettyParams.scala:198)
	at argonaut.PrettyParams$$anonfun$argonaut$PrettyParams$$trav$1$6.apply(PrettyParams.scala:197)
	at argonaut.Json$class.fold(Json.scala:34)
	at argonaut.JObject.fold(Json.scala:474)
	at argonaut.PrettyParams.argonaut$PrettyParams$$trav$1(PrettyParams.scala:178)
	at argonaut.PrettyParams.pretty(PrettyParams.scala:211)
	at argonaut.Json$class.nospaces(Json.scala:422)
	at argonaut.JObject.nospaces(Json.scala:474)
	at argonaut.Json$class.toString(Json.scala:464)
	at argonaut.JObject.toString(Json.scala:474)
	at com.our.code.SomeClass.serialize(SomeClass.scala:12)

Most of this StackTrace comes from Argonaut, a purely functional JSON parsing (If you don’t know Argonaut and have to do some JSON parsing in Scala, you should definitely check it out).

We were serializing a class to JSON and somewhere along the lines, a String is null. This was weird especially considering our class looked like this:

case class Foo(bar: Option[String], baz: Option[Int])

Not only that, but Argonaut handles options out of the box via it’s unique DSL for serialization:

EncodeJson(
  foo => {
    ("bar" :?= foo.bar) ->?:
    ("baz" :?= foo.baz) ->?:
    jEmptyObject
  }
)

Where :?= knows how to handle the Option[A] inside bar and baz. (Yes, I know there is shorter syntax for serialization, and yes I’m aware of argonaut-shapeless :), but for the sake of the example)

So WTF is going on? We don’t use null in our code, everything is perfectly wrapped in options, the mighty functional gods are happy, where is this coming from?

Some(null) is never EVER what you wanted

It took me a couple of hours to realize what was happening, and I have to say it did quite surprise me. Foo is the product of a Map[String, String] lookup. What we do prior to generating Foo is parse a String into key value pairs and then extract specific values which generate Foo.

A rough sketch of the code looks like this:

val s: String = ???
val kvps: scala.collection.immutable.Map[String, String] = parseLongStringIntoKeyValuePairs(a)

val foo = Foo(kvps.get("bar"), kvps.get("baz"))

If you’re familiar with Scalas immutable Map[A, B] you know that it’s get method returns an Option[B] (where B is the type of the value). The documentation looks like this:

abstract def get(key: K): Option[V]

Optionally returns the value associated with a key.
  key: the key value
  returns: an option value containing the value associated with key in this map, or None if none exists.

or None if none exists”. Ok, that makes sense, But what happens if null creeps in as a value? What would you expect the following to return?

val map: Map[String, String] = Map("bar" -> null)
map.get("bar")

If you guessed None, you’re wrong:

val map: Map[String, String] = Map("bar" -> null)
map.get("bar")

// Exiting paste mode, now interpreting.

map: Map[String,String] = Map(bar -> null)
res1: Option[String] = Some(null)

Some(null) - YAY! BEST OF BOTH WORLDS.

But just a minute ago I told you guys “We never use null, always Option[A]”. Was I lying to you? No, I wasn’t. The problem is that parseLongStringIntoKeyValuePairs is actally an interop with a Java library which parses the string and may definitely return null in the presence of a key with no value.

This feels weird though

This has been discussed many times in the Scala ecosystem. I guess the TLDR; is that Some(null) may actually convey something under specific contexts that None cannot, such as the existance of an empty value (where None may convey no value at all). This leads to a long phylosophical discussion about the meaning of null, None and the essence of the human race. Be it correct or not, this definitely gave me a good bite in the a** and something everyone should be aware of.

Fixing the problem

A quick fix for this problem is trivial. The first thing that comes to mind is Option.apply which handles null values gracefully by returning None:

map.get("bar").flatMap(Option.apply)

Wrapping up

Some(null) is evil sorcery IMO. I would never use it to convey the emptiness of an existing value, I can think of many other ways to encode such a value (I like to use the Foo.empty pattern when empty is a lazy val), especially when in the Scala ecosystem.

Of course a trivial unit test could of shown that this happens, but many times in Scala I have the warm feeling that Option[A] means “this can never be null”, but we should always keep in mind something like the above may happen.

Improving Spark Streaming Checkpointing Performance With AWS EFS

Update 10.03.2017 - There is a “gotcha” when using EFS for checkpointing which can be a deal breaker, pricing wise. I have updated the last part of the post (Downsides) to reflect the problem and a possible workaround.

Introduction

When doing stream processing with Apache Spark and in order to be a resilient to failures, one must provide a checkpointing endpoint for Spark to save it’s internal state. When using Spark with YARN as the resource manager one can easily checkpoint to HDFS. But what happens if we don’t need HDFS or want to use YARN? What if we’re using Spark Standalone as our cluster manager?

Up until recently, checkpointing to S3 was the de-facto storage for Spark with Standalone cluster manager running on AWS. This has certain caveats which make it problematic in a production environment (or any other environment, really). What I’d like to walk you through today is how I’ve experienced working with S3 as a checkpoint backing store in production and introduce you to an alternative approach using AWS EFS (Elastic File System). Lets get started.

Choosing a solution for checkpointing

Learning what checkpointing is and how it helps, you very soon realise that you need a distributed file system to store the intermediate data for recovery. Since our product was running on AWS, the viable solutions were:

  1. HDFS - Required us to install HDFS and run YARN as a cluster manager.
  2. Amazon S3 - Remote blob storage, pretty easy to set up and get running, low cost of maintenance.

Since we didn’t need (and want) an entire HDFS cluster only for the sake of checkpointing, so we decided on an S3 based solution that required us to provide an endpoint and credentials and we were on our way. It was quick, efficient and tied the loose end for us quickly.

When things start to go wrong

Rather quickly we started noticing Spark tasks failing with a “directory not found” due to S3s read-after-write semantics. Sparks checkpointing mechanism (to be precise, the underlying S3AFileSystem provided by Apache Hadoop) first writes all data to a temporary directory and only upon completion attempts to list the directory written to, making sure the folder exists, and only then it renames the checkpoint directory to its real name. Listing a directory after a PUT operation in S3 is eventually consistent per S3 documentation and would be the cause of sporadic failures which caused the checkpointing task to fail entirely.

This meant that:

  1. Checkpointing wasn’t 100% reliable, thus making driver recovery not reliable.
  2. Failed tasks accumulated in the Executor UI view, making it difficult to distinguish between random checkpoint failures and actual business logic failures.

And that was a problem. A better solution was needed that would give us reliable semantics of checkpointing.

Amazon Elastic File System

From the offical AWS documentation:

Amazon Elastic File System (Amazon EFS) provides simple, scalable file storage for use with Amazon EC2. With Amazon EFS, storage capacity is elastic, growing and shrinking automatically as you add and remove files, so your applications have the storage they need, when they need it.

Amazon Elastic File System (EFS) is a distributed file system which mounts onto your EC2 instances. It resembles Elastic Block Storage, but extends it in the way that allows to mount a single file system to multiple EC2 instances. This is a classic use case for Spark, since we need the same mount available a cross all Worker instances.

Setting up EFS

AWS docs lay out how one creates a EFS instace an attaches it to existing EC2 instances. The essential steps are:

  1. Step 2: Create Your Amazon EFS File System
  2. Step 3: Connect to Your Amazon EC2 Instance and Mount the Amazon EFS File System
  3. Network File System (NFS)–Level Users, Groups, and Permissions

One important thing I’d like to point out is permissions. Once you set up the EFS mount, automatically all permissions go to the root user. If your Spark application uses a different user to run under (which it usually does, under it’s own spark user) you have to remember to set permissions to that user on the checkpointing directory. You must make sure that user (be it spark or any other) has an identical userid and groupid under all EC2 instances. If you don’t, you’ll end up with permission denied errors while trying to write checkpoint data. If you have already set up an existing user and want to align all user and group ids to that user, read this tutorial on how that can be done (it’s pretty easy and straight forward).

Checkpoint directory format for StreamingContext

After the EFS mount is up an running on all EC2 instances, we need to pass the mounted directory to our StreamingContext. We do this by passing the exact location of the directory we chose with a file:// prefix. Assume that our mount location is /mnt/spark/:

val sc = new SparkContext("spark://127.0.0.1:7077", "EFSTestApp")
val streamingContext = StreamingContext.getOrCreate("file:///mnt/spark/", () => {
  new StreamingContext(sc, Seconds(4))
})

Spark will use LocalFileSystem from org.apache.hadoop.fs as the underlying file system for checkpointing.

Performance and stability gains

As you probably know, every Spark cluster has different size, different workloads and different computations being processed. There usually doesn’t exist a “when size fits all” solution. I encourage you to take this paragraph with a grain as salt as always with system performance.

Under AWS EFS, I’ve witnessed a x2 to x3 improvement in checkpointing times. Our streaming app checkpoints every 40 seconds, using 3 executors each with 14GB of memory and a constant message stream of ~ 2000-5000 messages/sec. Checkpointing took between 8-10 seconds on S3. Under EFS, that checkpointing time reduced to between 2-4 seconds for the same workload. Note that this will highly vary depending on your cluster setup, the size and count of each executor, number of Worker nodes.

Additionally, we now no longer experience failing tasks due to checkpointing errors, which is extremely important for fault tolerance of the streaming application:

Spark Streaming With No Task Failures

Downsides (Updated 10.3.2017)

There is an “issue” when running checkpointing with EFS which I’ve been hit in the face with in production. The way AWS EFS works is that your throughput for reads/writes is determined by the size of your filesystem. Initially, AWS gives you enough burst credits to be able to write at 100M writes/sec and starts studying your use of the filesystem (and taking away your credits). About a week later, it determines the pattern of use while checking how much you write and read and the size of the file system you have. If the file size is small, your IOPS get limited. More specifically, while using this solution only for checkpointing our streaming data (which varies between a couple of KBs to a couple of MBs), we were limited to 50K writes/second, which is definitely not enough and can cause your processing delay to increase substantually.

The (rather pricey) workaround for this issue is to make the file system large enough so that you constantly get 50M writes/second. To get this kind of throughput, you need your system to be at least 1TB in size. This can be an abitrary “junk” file just sitting around in the directory to increase it’s size. Price wise, this will cost you 300$ a month out of the box, without the additional price for the data you actually checkpoint. If this kind of price is a non issue for you then EFS will still be a nice solution.

Here is the size to throughput table AWS guarantees:

File System Size Aggregate Read/Write Throughput
A 100 GiB file system can… Burst to 100 MiB/s for up to 72 minutes each day, orDrive up to 5 MiB/s continuously
A 1 TiB file system can… Burst to 100 MiB/s for 12 hours each day, orDrive 50 MiB/s continuously
A 10 TiB file system can… Burst to 1 GiB/s for 12 hours each day, orDrive 500 MiB/s continuously
Generally, a larger file system can… Burst to 100MiB/s per TiB of storage for 12 hours each day, orDrive 50 MiB/s per TiB of storage continuously

I advise you to go through reading the performance section of the documentation before making a choice.

Wrapping up

Amazon Elastic File System is a relatively new but promising approach for Sparks checkpointing needs. It provides an elastic file system that can scale infinitely and be mounted to multiple EC2 instances easily and quickly. Moving away from S3 provided us with a stable file system to checkpoint our data, removing sporadic failures caused by S3’s eventual consistency model in respect to read-after-write.

I definitely recommend you to giving it a try.

Why You Might Be Misusing Sparks Streaming API

Disclaimer: Yes, I know the topic is controversial a bit, and I know most of this information is conveyed in Sparks documentation for it’s Streaming API, yet I felt the urge to write this piece after seeing this mistake happen many times over.

More often than not I see a question on StackOverflow from people who are new to Spark Streaming which look roughly like this:

Question: “I’m trying to do XYZ but it’s not working, what can I do? Here is my code:”

val sparkContext = new SparkContext("MyApp")
val streamingContext = new StreamingContext(sparkContext, Seconds(4))

val dataStream = streamingContext.socketTextStream("127.0.0.1", 1337)
dataStream.foreachRDD { rdd => 
  // Process RDD Here
}

Uhm, ok, what’s wrong with that?

When I started learning Spark my first landing point was an explanation about how RDDs (Resilient Distributed DataSets) work. The usual example was a word count where all the operations were performed on an RDD. I think it is safe to assume this is the entry point for many others who learn Spark (although today DataFrame\Sets are becoming the go to approach for beginners).

When one makes the leap to working with Spark Streaming, it may be a little bit unclear what the additional abstraction of a DStream means. This causes a lot of people to seek something they can grasp, and the most familiar method they encounter is foreachRDD, which takes an RDD as input and yields Unit (a result of a typical side effecting method). Then, they can again work on the RDD level which they already feel comfortable with and understand. That is missing the point of DStreams entirely, which is why I want to give a brief look into what we can do on the DStream itself without peeking into the underlying RDD.

Enter DStream

DStream is Sparks abstraction over micro-batches. It uses streaming sources, be that a network socket, Kafka or Kinesis (and the likes) providing us with a continuious flow of data that we read at every batch interval assigned to the StreamingContext.

In order to work with the DStream API, we must understand how the abstraction works. DStream is basically a sequence of RDDs. At a given batch interval a single RDD is consumed and passed through all the transformations we supply to the DStream. When we do:

val dataStream = streamingContext.socketTextStream("127.0.0.1", 1337)
dataStream
 .flatMap(_.split(" "))
 .filter(_ == "")
 .map((word, 1L))
 .count

That means we apply flatMap, filter, map and count onto the underlying RDD itself as well! There are at least as many of these transformations on DStream as there are for RDD, and these are the transformations we should be working with in our Streaming application. There is a comprehensive list of all the operations on the Spark Streaming documentation page under Transformations on DStreams

More operations on key value pairs

Similar to the PairRDDFunctions which brings in (implicitly) transformations on pairs inside an RDD, we have the equivalent PairDStreamFunctions with many such methods, primarly:

  • combineByKey - Combine elements of each key in DStream’s RDDs using custom functions.
  • groupByKey - Return a new DStream by applying groupByKey on each RDD
  • mapValues - Return a new DStream by applying a map function to the value of each key-value pairs in ‘this’ DStream without changing the key.
  • mapWithState - Return a MapWithStateDStream by applying a function to every key-value element of this stream, while maintaining some state data for each unique key.
  • reduceByKey - Return a new DStream by applying reduceByKey to each RDD. The values for each key are merged using the supplied reduce function. org.apache.spark.Partitioner is used to control the partitioning of each RDD.

And many more for you to enjoy and take advantage of.

Thats awesome! So why do I need foreachRDD at all?

Similar to RDDs, when Spark builds its graph of execution we distinguish between regular transformations and output transformations. The former are lazily evaluated when building the graph while the latter play a role in the materialization of the graph. If our DStream graph had only regular transformations applied to it, we would get an exception at runtime saying there’s no output transformation defined.

foreachRDD is useful when we’ve finished extracting and transforming our dataset, and we now want to load it to an external source. Let’s say I want to send transformed messages to RabbitMQ as part of my flow, I’ll iterate the underlying RDD partitions and send each message:

transformedDataStream.
  foreachRDD { rdd: RDD[String] =>
    val rabbitClient = new RabbitMQClient()
    rdd.foreachPartition { partition: Iterator[String] =>
      partition.foreach(msg => rabbitClient.send(msg))
    }
  } 

transformedDataStream is an arbitrary DStream after we’ve performed all our transformation logic on it. The result of all these transformations a DStream[String]. Inside foreachRDD, we get a single RDD[String] where we then iterate each of it’s partitions creating a RabbitMQClient to send each message inside the partition iterator.

There are several more of these output transformations listed on the Spark Streaming documentation page which are very useful.

Wrapping up

Spark Streamings DStream abstraction provides powerfull transformation for processing data in a streaming fashion. When we do stream processing in Spark, we’re processing many individual micro-batched RDDs which we can reason about in our system flowing one after the ever. When we apply transformations on the DStream it percolates all the way down to each RDD that is passed through without us needing to apply the transformations on it by ourselves. Finally, the use of foreachRDD should be kept to when we want to take of our transformed data and perform some side effecting operation to it, mostly things like sending data over the wire to a database, pub-sub and the likes. Use it wisely and only when you truely need to!

Diving Into Spark 2.1.0 Blacklisting Feature

Disclaimer: What I am about to talk about is an experimental Spark feature. We are about to dive into an implementation detail, which is subject to change at any point in time. It is an advantage if you have prior knowledge of how Sparks scheduling works, although if you don’t I will try to lay that out throughout this post, so don’t be afraid :).

Introduction

Spark 2.1.0 comes with a new feature called “blacklist”. Blacklisting enables you to set threshholds on the number of failed tasks on each executor and node, such that a task set or even an entire stage will be blacklisted for those problematic units.

The basics - Jobs, stages and tasks

When we create a Spark graph, the DAGScheduler takes our logical plan (or RDD linage) which is composed of transformations and translates them into a physical plan. For example, let’s take the classic word count:

val textFile = sc.parallelize(Seq("hello dear readers", "welcome to my blog", "hope you have a good time"))
val count = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
                 .count

println(count)

We have three transformations operating on the text file: flatMap, map and reduceByKey (lets ignore parallalize for the moment). The first two are called “narrow transformations” and can be executed sequentially one after the other as all the data is available locally for the given partition. reduceByKey is called a wide transformation because it requires a shuffle of the data to be able to reduce all elements with the same key together (If you want a lengthier explanation of narrow vs wide transformations, see this blog post by Ricky Ho)

Spark creates the following physical plan from this code:

Spark Physical Plan

As you can see, the first stage contains three tasks: parallalize, flatMap and map and the second stage has one task, reduceByKey. Stages are bounded by wide transformations, and that is why reduceByKey is started as part of stage 1, followed by count.

We have established that a job is:

  1. Conceived of one or more stages.
  2. Each stage has a set of tasks, bounded by wide transformations.
  3. Each task is executed on a particular executor in the Spark cluster.

As an optimization, Spark takes several narrow transformations that can run sequentially and executes them together as a task set, saving us the need to send intermediate results back to the driver.

Creating a scenario

Lets imagine we’ve been assigned a task which requires us to fetch a decent amount of data over the network, do some transformations and then save the output of those transformations to the database. We design the code carefully and finally create a spark job, which does exactly what we want and publish that job to the Spark cluster to begin processing. Spark builds the graph and dispatches the individual tasks to the available executors to do the processing. We see that everything works well and we deploy our job to production.

We’ve happily deployed our job which is working great. After a few days running we start noticing a particular problem with one of the nodes in the cluster. We notice that every time that node has to execute the fetching of the data from our external service, the task fails with a network timeout, eventually causing the entire stage to fail. This job is mission critical to our company and we cannot afford to stop it and let everything wait. What do we do??

Enter Blacklisting

Spark 2.1.0 enables us to blacklist a problematic executor and even an entire node (which may contain one to N executors) from receiving a particular task, task set or whole stage. In our example, we saw that a faulty node was causing tasks to fail, and we want to do something about it. Let us see how this new feature can help.

If we take a look at the Spark Configuration section of the documentation, we see all the settings we can tune:

Flag Default Description
spark.blacklist.enabled false If set to “true”, prevent Spark from scheduling tasks on executors that have been blacklisted due to too many task failures. The blacklisting algorithm can be further controlled by the other “spark.blacklist” configuration options.
spark.blacklist.task.maxTaskAttemptsPerExecutor 1 (Experimental) For a given task, how many times it can be retried on one executor before the executor is blacklisted for that task.
spark.blacklist.task.maxTaskAttemptsPerNode 2 (Experimental) For a given task, how many times it can be retried on one node, before the entire node is blacklisted for that task.
spark.blacklist.stage.maxFailedTasksPerExecutor 2 (Experimental) How many different tasks must fail on one executor, within one stage, before the executor is blacklisted for that stage.
spark.blacklist.stage.maxFailedExecutorsPerNode 2 (Experimental) How many different executors are marked as blacklisted for a given stage, before the entire node is marked as failed for the stage.

We can select both the number of attempts made for each task both for an executor or node, and more importantly we can mark how many times we want to allow a particular task to fail on a single executor before mark it as blacklisted, and how many executors can fail on a given node before that node is completely blacklisted. Marking a node as blacklisted means that the entire stage of the underlying task may never run again on that particular executor/node for the entire lifetime of our job.

The algorithm

After we understood the basic configurations of blacklisting, let us look at the code. In order to do that, we need a little background on how task scheduling works in Spark. This is an interesting topic and quite complex, so I will try to skim through without going into too much detail (if you are interested in more detail you can find it in Jacek Laskowskis excellent “Mastering Spark” gitbook).

Sparks scheduling model is similar to the one of Apache Mesos, where each executor offers its resources, and the scheduling algorithm selects which node gets to run each job and when.

Let us explore how a single job gets scheduled. We’ll explore the operations starting the DAGScheduler and below, there are more operations which are invoked by the SparkContext that are less relevant:

  1. Everything starts with the famous DAGScheduler which builds the DAG and emits stages.
  2. Each stage in turn is disassembled to tasks (or to be more accurate, a set of tasks). A TaskSetManager is created for each task set. This manager is going to be in charge of the set of tasks throughout the lifetime of its execution, including re-scheduling on failure or aborting in case something bad happens.
  3. After the TaskSetManager is created, the backend creates work offers for all the executors and calls the TaskScheduler with these offers.
  4. TaskScheduler receives the work offers of the executors and iterates its TaskSetManagers and attempts to schedule the task sets on each of the available executor resources (remember this, we’ll come back to this soon).
  5. After all work has been assigned to the executors, the TaskScheduler notifies the CoarseGrainedSchedulerBackend which then serializes each task and send them off to the executors.

If you want to follow this flow in the code base that is a little complicated, the hierarchy looks roughly like this:

DAGScheduler.handleJobSubmitted -> DAGScheduler.submitStage -> DAGScheduler.submitMissingStages -> TaskScheduler.submitTasks -> CoarseGrainedSchedulerBackend.reviveOffers -> CoarseGrainedSchedulerBackend.makeOffers -> TaskScheduler.resourceOffers -> TaskScheduler.resourceOfferSingleTaskSet -> TaskSetManager.resourceOffer -> TaskScheduler.resourceOfferSingleTaskSet CoarseGrainedSchedulerBackend.launchTasks.

There is lots of bookeeping going on between these calls, but I just wanted to outline the high level orchestration of how tasks gets scheduled, I hope you’ve managed to follow. If not, don’t worry, just remember that when tasks are scheduled their owner is an object named TaskSetManager which controls everything related to the set of tasks. This object has an important method called resourceOffer, which we are going to look into.

TaskSetManager, TaskSetBlacklist and the blacklist bookkeeping

A new class was introduced in 2.1.0 called TaskSetBlacklist. It is in charge of bookeeping failed tasks. This class reads the configurations set by the users via the spark.blacklisting.* flags, and is consumed internally by each TaskSetManager keeping track of additional information:

private val MAX_TASK_ATTEMPTS_PER_EXECUTOR = conf.get(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR)
private val MAX_TASK_ATTEMPTS_PER_NODE = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
private val MAX_FAILURES_PER_EXEC_STAGE = conf.get(config.MAX_FAILURES_PER_EXEC_STAGE)
private val MAX_FAILED_EXEC_PER_NODE_STAGE = conf.get(config.MAX_FAILED_EXEC_PER_NODE_STAGE)

The mappings that keep track of failures look like this:

private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]()
private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()
val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()
private val blacklistedExecs = new HashSet[String]()
private val blacklistedNodes = new HashSet[String]()

As you can see, there are three maps and two hash sets:

  1. Node -> Executor Failures: Maps from a node to all its executors that have failed a task,
  2. Node -> Blacklisted Task Indexes: Indexes of task ids that have been blacklisted on that particular node,
  3. Executor -> Task Set Failure Count: Maps an executor to all it’s failures of a single task set.
  4. Black listed executors
  5. Black listed nodes

TaskSetBlacklist exposes a bunch of utility methods for the consumption from the TaskSetManager holding it. For example:

/**
* Return true if this executor is blacklisted for the given stage.  Completely ignores
* anything to do with the node the executor is on.  That
* is to keep this method as fast as possible in the inner-loop of the scheduler, where those
* filters will already have been applied.
*/
def isExecutorBlacklistedForTaskSet(executorId: String): Boolean = {
    blacklistedExecs.contains(executorId)
}

The real interesting method is the one in charge of updating task status upon failure, called updateBlacklistForFailedTask. This method is invoked by the TaskSetManager when the TaskScheduler signals a failed task:

private[scheduler] def updateBlacklistForFailedTask(
    host: String,
    exec: String,
    index: Int): Unit = {
  val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
  execFailures.updateWithFailure(index)

  // check if this task has also failed on other executors on the same host -- if its gone
  // over the limit, blacklist this task from the entire host.
  val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
  execsWithFailuresOnNode += exec
  val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
  execToFailures.get(exec).map { failures =>
      // We count task attempts here, not the number of unique executors with failures.  This is
      // because jobs are aborted based on the number task attempts; if we counted unique
      // executors, it would be hard to config to ensure that you try another
      // node before hitting the max number of task failures.
      failures.getNumTaskFailures(index)
    }
  }.sum
  if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) {
    nodeToBlacklistedTaskIndexes.getOrElseUpdate(host, new HashSet()) += index
  }

  // Check if enough tasks have failed on the executor to blacklist it for the entire stage.
  if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
    if (blacklistedExecs.add(exec)) {
      logInfo(s"Blacklisting executor ${exec} for stage $stageId")
      // This executor has been pushed into the blacklist for this stage.  Let's check if it
      // pushes the whole node into the blacklist.
      val blacklistedExecutorsOnNode =
        execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
      if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) {
        if (blacklistedNodes.add(host)) {
          logInfo(s"Blacklisting ${host} for stage $stageId")
        }
      }
    }
  }
}

Breaking down the execution flow:

  1. Update the count of failures for this executor for the given task id.
  2. Check if there were multiple failures of this task by other executors on the same node, if there were and we’ve exceeded the MAX_TASK_ATTEMPTS_PER_NODE the entire node is blacklisted for this particular task index.
  3. Check if this failure means we’ve exceeded the allowed number of failures for the entire stage on the given executor. If we have mark the entire stage as blacklisted for the executor.
  4. Check if this failure means we’ve exceeded the number of allowed executor failures for the node, If we have, blacklist the node for the entire stage.

The execution flow is pretty clear, we start from the smallest unit which is a single task, and end up checking the entire stage on the node which executed the task set.

We’ve now seen where the TaskSetManager updates internal status regarding the task execution, but when (and where) does it check if a particular task can be scheduled on a given executor/node? It does so exactly when the TaskScheduler asks it to assign a WorkOffer to an executor, inside the resourceOffer method call:

/**
* Respond to an offer of a single executor from the scheduler by finding a task
*
* NOTE: this function is either called with a maxLocality which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NO_PREF locality which will be not modified
*
* @param execId the executor Id of the offered resource
* @param host  the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
*/
@throws[TaskNotSerializableException]
def resourceOffer(
    execId: String,
    host: String,
    maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] = {
  val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
    blacklist.isNodeBlacklistedForTaskSet(host) ||
    blacklist.isExecutorBlacklistedForTaskSet(execId)

  if (!isZombie && !offerBlacklisted) {
      var allowedLocality = maxLocality

      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => { 
          // Do task queueing stuff.
      }
  }
}

taskSetBlacklistHelperOpt is a Option[TaskSetBlacklist] instance which is only set to Some[TaskSetBlacklist] if the flag was enabled in the configuration. Prior to assinging an offer to an executor, the TaskSetManager checks to see if the host/executor is blacklisted for the task set, if it isn’t, it returns an Option[TaskDescription], assigning the executor this particular task set.

However, there needs to be an additional check. As we’ve seen, we don’t only blacklist an entire task set, we also blacklist indiviudal tasks from running on an executor/node. For that, there is an additional call inside a method called dequeueTask to isTaskBlacklistedOnExecOrNode which checks if the task is blacklisted to run on the executor or node. If it is, attempts to schedule it on the next executor.

What happends if all nodes are blacklisted?

Good question! Spark has an additional validation method inside TaskScheduler.resourceOffers which kicks in if none of the tasks have been scheduled to run. This can happen when all nodes are blacklisted:

if (!launchedAnyTask) {
  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}

I won’t go into the implementation of the method, which you can find here. This method validates that all the tasks actually can’t run due to blacklisting, and if it finds out they can’t, it aborts the task set.

Wrapping up

Spark 2.1.0 brings a new ability to configure blacklisting of problematic executors and even entire nodes. This feature can be useful when one is experiencing problems with a particular executor, such as network problems. If you are having failures at the node level, such as disk filled up which is failing your tasks, it is possible to block entire nodes from recieving task sets and entire stages.

The main players in this blacklisting games are the TaskSetManager, responsible for a given task set, and its TaskSetBlacklist instance which handles all the bookkeeping data on who failed where. Together they are consumed by the TaskScheduler which is the one in charge of the actual scheduling of the tasks.

I hope I’ve managed to explain to gory details of the implementation in an approachable way and that you have a basic understanding of what is going on under the covers.

MutableList And The Short Path To A StackOverflowError

When working with collections in Scala, or any other high level programming language, one does not always stop to think about the underlying implementation of the collection. We want to find the right tool for the right job, and we want to do is as fast as we can. The Scala collection library brings us a wealth of options, be them mutable or immutable, and it can sometimes become confusing which one we should choose.

An interesting case came about when a colleague of mine at work was running into a weird StackOverflowError when running a Spark job. We were seeing a long StackTrace both when trying to serialize with both Kryo and Java serializers.

The program looked rather innocent. Here is a close reproduce:

object Foo {
  sealed trait A
  case class B(i: Int, s: String)
  case class C(i: Int, x: Long)
  case class D(i: Int)

  case class Result(list: mutable.MutableList[Int])

  def doStuff(a: Seq[A]): Result = {
    val mutable = new collection.mutable.MutableList[Int]

    a.foreach {
      case B(i, _) => mutable += i
      case C(i, _) => mutable += i
      case D(i) => mutable += i
    }

    Result(mutable)
  }
}

The code that was running was iterating over a Seq[A], parsing them and appending them to a MutableList[Int]. The doStuff method was part of a Spark DStream.map operation, which was consuming records from Kafka, parsing them and them handing them off for some more stateful computation, which looked like:

object SparkJob {
  def main(args: Array[String]): Unit = {
    val dStream = KafkaUtil.createDStream(...)
    dStream
     .mapPartitions(it => Foo.doStuff(it.asSeq))
     .mapWithState(spec)
     .foreachRDD(x => // Stuff)
  }
}

One important thing to note is that this issue suddenly started appearing in their QA environment. There wasn’t much change to the code, all that was done was extending some classes which were inheriting A, and that more data was starting to come from Kafka, but there weren’t any major changes to the code base.

This was the exception we were seeing (this one for JavaSerializer):

Exception in thread “main” java.lang.StackOverflowError at java.io.ObjectStreamClass$WeakClassKey.(ObjectStreamClass.java:2351) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:326) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

The class hierarchy was a bit beefier than my simplified example. It consisted by itself of a view nested case classes, but none of them were hiding a recursive data structure. This was weird.

We started a divide and conquer approach, eliminating piece by piece of the code, trying to figure out which class was causing the trouble. Then, by mere chance, I looked at the MutableList and told him: “Lets try running this with an ArrayBuffer instead, and see what happens”. To our surprise, the Stackoverflow was gone and not reproducing anymore.

So what’s the deal with MutableList[A]?

After the long intro, let’s get down to the gory detail, what’s up with MutableList? Well, if we peek under the hood and look at the implementation, MutableList[T] is a simple LinkedList[T] with a first and last elements. It has both a method head of type T, and a method tail of type MutableList[A] (similar to List[A]):

@SerialVersionUID(5938451523372603072L)
class MutableList[A]
extends AbstractSeq[A]
   with LinearSeq[A]
   with LinearSeqOptimized[A, MutableList[A]]
   with GenericTraversableTemplate[A, MutableList]
   with Builder[A, MutableList[A]]
   with Serializable
{
  override def companion: GenericCompanion[MutableList] = MutableList
  override protected[this] def newBuilder: Builder[A, MutableList[A]] = new MutableList[A]

  protected var first0: LinkedList[A] = new LinkedList[A]
  protected var last0: LinkedList[A] = first0
  protected var len: Int = 0

  /** Returns the first element in this list
   */
  override def head: A = if (nonEmpty) first0.head else throw new NoSuchElementException

  /** Returns the rest of this list
   */
  override def tail: MutableList[A] = {
    val tl = new MutableList[A]
    tailImpl(tl)
    tl
  }

  // Shortened for brevity
}

When we attempt to serialize a recursive data structure such as LinkedList[A] or even deeply nested structures, be it Kryo or Java serialization, we need to traverse them all the way down. Since LinkedList[A] holds an element of type A and a pointer to the next element of LinkedList[A], we need to go deep down. Each time the serializer encounters a new class of type LinkedList[A] (which is the next pointer), it will open up a new stack frame and begin to iterate it to find the next element in line to be written. If we have many such elements that cause us to open a new frame, we eventually blow up.

I tried playing around to see how many elements we can fit in a MutableList[Int] before it explodes. I ran this test on Scala 2.12.0 and Java 1.8.0_91 in a x64 process (which should have a 1MB stack AFAIK), it took exactly 1335 elements to make this blow up:

object StackoverflowTest {
  def main(args: Array[String]): Unit = {
    val mutable = new collection.mutable.MutableList[Int]
    (1 to 1335).foreach(x => mutable += x)
    val ous = new ObjectOutputStream(new ByteArrayOutputStream())
    ous.writeObject(mutable)
    ous.close()
  }
}

But wait, isn’t List[A] in Scala also defined as a linked list?

How can it be that when using the equivalent with a List[A] in Scala, this doesn’t blow up?

object ListSerializationTest {
  def main(args: Array[String]): Unit = {
    val mutable = List.range(0, 1500)
    val ous = new ObjectOutputStream(new ByteArrayOutputStream())
    ous.writeObject(mutable)
    ous.close()
  }
}

And well, it doesn’t. Turns that List[A] has some secret sauce!

writeObject and readObject:

Every object that uses Java serialization can provide a private writeObject and readObject pair which lay out exactly how to serialize the object. Scala uses a custom class called SerializationProxy to provide an iterative version of serialization/deserialization for List[A]:

@SerialVersionUID(1L)
private class SerializationProxy[A](@transient private var orig: List[A]) extends Serializable {

  private def writeObject(out: ObjectOutputStream) {
    out.defaultWriteObject()
    var xs: List[A] = orig
    while (!xs.isEmpty) {
      out.writeObject(xs.head)
      xs = xs.tail
    }
    out.writeObject(ListSerializeEnd)
  }

  // Java serialization calls this before readResolve during de-serialization.
  // Read the whole list and store it in `orig`.
  private def readObject(in: ObjectInputStream) {
    in.defaultReadObject()
    val builder = List.newBuilder[A]
    while (true) in.readObject match {
      case ListSerializeEnd =>
        orig = builder.result()
        return
      case a =>
        builder += a.asInstanceOf[A]
    }
  }

  // Provide the result stored in `orig` for Java serialization
  private def readResolve(): AnyRef = orig
}

At runtime, the java serializer will reflect over the class and try to find these methods and use them to serialize or deserialize the object. This is exactly the reason why it works and we don’t blow up with a StackOverflowError in our face.

Conclusion

The first and most obvious thing, always consider the data structures you’re using and make sure they’re the right one for the job! Although the language provides us with high level abstractions of collections, strive to know what’s going on under the covers and make sure the collection you’re using won’t come back to bite you. If for performance reasons you’re looking to use a mutable collection, think about encapsulating the use of ArrayBuffer or ListBuffer internally and exposing an immutable Array[A] and List[A] respectively that you create once you’re done filling them up.

An generally remember that MutableList[A] is internally using a LinkedList[A] and there isn’t a custom writeObject and readObject pair implemented for them in the Scala collection, and watch out if you need to be transfering it over the wire.

These kind of bugs are hard to discover and we’re lucky that we found about them early and not in production.