Why You Might Be Misusing Sparks Streaming API

Disclaimer: Yes, I know the topic is controversial a bit, and I know most of this information is conveyed in Sparks documentation for it’s Streaming API, yet I felt the urge to write this piece after seeing this mistake happen many times over.

More often than not I see a question on StackOverflow from people who are new to Spark Streaming which look roughly like this:

Question: “I’m trying to do XYZ but it’s not working, what can I do? Here is my code:”

val sparkContext = new SparkContext("MyApp")
val streamingContext = new StreamingContext(sparkContext, Seconds(4))

val dataStream = streamingContext.socketTextStream("127.0.0.1", 1337)
dataStream.foreachRDD { rdd => 
  // Process RDD Here
}

Uhm, ok, what’s wrong with that?

When I started learning Spark my first landing point was an explanation about how RDDs (Resilient Distributed DataSets) work. The usual example was a word count where all the operations were performed on an RDD. I think it is safe to assume this is the entry point for many others who learn Spark (although today DataFrame\Sets are becoming the go to approach for beginners).

When one makes the leap to working with Spark Streaming, it may be a little bit unclear what the additional abstraction of a DStream means. This causes a lot of people to seek something they can grasp, and the most familiar method they encounter is foreachRDD, which takes an RDD as input and yields Unit (a result of a typical side effecting method). Then, they can again work on the RDD level which they already feel comfortable with and understand. That is missing the point of DStreams entirely, which is why I want to give a brief look into what we can do on the DStream itself without peeking into the underlying RDD.

Enter DStream

DStream is Sparks abstraction over micro-batches. It uses streaming sources, be that a network socket, Kafka or Kinesis (and the likes) providing us with a continuious flow of data that we read at every batch interval assigned to the StreamingContext.

In order to work with the DStream API, we must understand how the abstraction works. DStream is basically a sequence of RDDs. At a given batch interval a single RDD is consumed and passed through all the transformations we supply to the DStream. When we do:

val dataStream = streamingContext.socketTextStream("127.0.0.1", 1337)
dataStream
 .flatMap(_.split(" "))
 .filter(_ == "")
 .map((word, 1L))
 .count

That means we apply flatMap, filter, map and count onto the underlying RDD itself as well! There are at least as many of these transformations on DStream as there are for RDD, and these are the transformations we should be working with in our Streaming application. There is a comprehensive list of all the operations on the Spark Streaming documentation page under Transformations on DStreams

More operations on key value pairs

Similar to the PairRDDFunctions which brings in (implicitly) transformations on pairs inside an RDD, we have the equivalent PairDStreamFunctions with many such methods, primarly:

  • combineByKey - Combine elements of each key in DStream’s RDDs using custom functions.
  • groupByKey - Return a new DStream by applying groupByKey on each RDD
  • mapValues - Return a new DStream by applying a map function to the value of each key-value pairs in ‘this’ DStream without changing the key.
  • mapWithState - Return a MapWithStateDStream by applying a function to every key-value element of this stream, while maintaining some state data for each unique key.
  • reduceByKey - Return a new DStream by applying reduceByKey to each RDD. The values for each key are merged using the supplied reduce function. org.apache.spark.Partitioner is used to control the partitioning of each RDD.

And many more for you to enjoy and take advantage of.

Thats awesome! So why do I need foreachRDD at all?

Similar to RDDs, when Spark builds its graph of execution we distinguish between regular transformations and output transformations. The former are lazily evaluated when building the graph while the latter play a role in the materialization of the graph. If our DStream graph had only regular transformations applied to it, we would get an exception at runtime saying there’s no output transformation defined.

foreachRDD is useful when we’ve finished extracting and transforming our dataset, and we now want to load it to an external source. Let’s say I want to send transformed messages to RabbitMQ as part of my flow, I’ll iterate the underlying RDD partitions and send each message:

transformedDataStream.
  foreachRDD { rdd: RDD[String] =>
    val rabbitClient = new RabbitMQClient()
    rdd.foreachPartition { partition: Iterator[String] =>
      partition.foreach(msg => rabbitClient.send(msg))
    }
  } 

transformedDataStream is an arbitrary DStream after we’ve performed all our transformation logic on it. The result of all these transformations a DStream[String]. Inside foreachRDD, we get a single RDD[String] where we then iterate each of it’s partitions creating a RabbitMQClient to send each message inside the partition iterator.

There are several more of these output transformations listed on the Spark Streaming documentation page which are very useful.

Wrapping up

Spark Streamings DStream abstraction provides powerfull transformation for processing data in a streaming fashion. When we do stream processing in Spark, we’re processing many individual micro-batched RDDs which we can reason about in our system flowing one after the ever. When we apply transformations on the DStream it percolates all the way down to each RDD that is passed through without us needing to apply the transformations on it by ourselves. Finally, the use of foreachRDD should be kept to when we want to take of our transformed data and perform some side effecting operation to it, mostly things like sending data over the wire to a database, pub-sub and the likes. Use it wisely and only when you truely need to!

Diving Into Spark 2.1.0 Blacklisting Feature

Disclaimer: What I am about to talk about is an experimental Spark feature. We are about to dive into an implementation detail, which is subject to change at any point in time. It is an advantage if you have prior knowledge of how Sparks scheduling works, although if you don’t I will try to lay that out throughout this post, so don’t be afraid :).

Introduction

Spark 2.1.0 comes with a new feature called “blacklist”. Blacklisting enables you to set threshholds on the number of failed tasks on each executor and node, such that a task set or even an entire stage will be blacklisted for those problematic units.

The basics - Jobs, stages and tasks

When we create a Spark graph, the DAGScheduler takes our logical plan (or RDD linage) which is composed of transformations and translates them into a physical plan. For example, let’s take the classic word count:

val textFile = sc.parallelize(Seq("hello dear readers", "welcome to my blog", "hope you have a good time"))
val count = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
                 .count

println(count)

We have three transformations operating on the text file: flatMap, map and reduceByKey (lets ignore parallalize for the moment). The first two are called “narrow transformations” and can be executed sequentially one after the other as all the data is available locally for the given partition. reduceByKey is called a wide transformation because it requires a shuffle of the data to be able to reduce all elements with the same key together (If you want a lengthier explanation of narrow vs wide transformations, see this blog post by Ricky Ho)

Spark creates the following physical plan from this code:

Spark Physical Plan

As you can see, the first stage contains three tasks: parallalize, flatMap and map and the second stage has one task, reduceByKey. Stages are bounded by wide transformations, and that is why reduceByKey is started as part of stage 1, followed by count.

We have established that a job is:

  1. Conceived of one or more stages.
  2. Each stage has a set of tasks, bounded by wide transformations.
  3. Each task is executed on a particular executor in the Spark cluster.

As an optimization, Spark takes several narrow transformations that can run sequentially and executes them together as a task set, saving us the need to send intermediate results back to the driver.

Creating a scenario

Lets imagine we’ve been assigned a task which requires us to fetch a decent amount of data over the network, do some transformations and then save the output of those transformations to the database. We design the code carefully and finally create a spark job, which does exactly what we want and publish that job to the Spark cluster to begin processing. Spark builds the graph and dispatches the individual tasks to the available executors to do the processing. We see that everything works well and we deploy our job to production.

We’ve happily deployed our job which is working great. After a few days running we start noticing a particular problem with one of the nodes in the cluster. We notice that every time that node has to execute the fetching of the data from our external service, the task fails with a network timeout, eventually causing the entire stage to fail. This job is mission critical to our company and we cannot afford to stop it and let everything wait. What do we do??

Enter Blacklisting

Spark 2.1.0 enables us to blacklist a problematic executor and even an entire node (which may contain one to N executors) from receiving a particular task, task set or whole stage. In our example, we saw that a faulty node was causing tasks to fail, and we want to do something about it. Let us see how this new feature can help.

If we take a look at the Spark Configuration section of the documentation, we see all the settings we can tune:

Flag Default Description
spark.blacklist.enabled false If set to “true”, prevent Spark from scheduling tasks on executors that have been blacklisted due to too many task failures. The blacklisting algorithm can be further controlled by the other “spark.blacklist” configuration options.
spark.blacklist.task.maxTaskAttemptsPerExecutor 1 (Experimental) For a given task, how many times it can be retried on one executor before the executor is blacklisted for that task.
spark.blacklist.task.maxTaskAttemptsPerNode 2 (Experimental) For a given task, how many times it can be retried on one node, before the entire node is blacklisted for that task.
spark.blacklist.stage.maxFailedTasksPerExecutor 2 (Experimental) How many different tasks must fail on one executor, within one stage, before the executor is blacklisted for that stage.
spark.blacklist.stage.maxFailedExecutorsPerNode 2 (Experimental) How many different executors are marked as blacklisted for a given stage, before the entire node is marked as failed for the stage.

We can select both the number of attempts made for each task both for an executor or node, and more importantly we can mark how many times we want to allow a particular task to fail on a single executor before mark it as blacklisted, and how many executors can fail on a given node before that node is completely blacklisted. Marking a node as blacklisted means that the entire stage of the underlying task may never run again on that particular executor/node for the entire lifetime of our job.

The algorithm

After we understood the basic configurations of blacklisting, let us look at the code. In order to do that, we need a little background on how task scheduling works in Spark. This is an interesting topic and quite complex, so I will try to skim through without going into too much detail (if you are interested in more detail you can find it in Jacek Laskowskis excellent “Mastering Spark” gitbook).

Sparks scheduling model is similar to the one of Apache Mesos, where each executor offers its resources, and the scheduling algorithm selects which node gets to run each job and when.

Let us explore how a single job gets scheduled. We’ll explore the operations starting the DAGScheduler and below, there are more operations which are invoked by the SparkContext that are less relevant:

  1. Everything starts with the famous DAGScheduler which builds the DAG and emits stages.
  2. Each stage in turn is disassembled to tasks (or to be more accurate, a set of tasks). A TaskSetManager is created for each task set. This manager is going to be in charge of the set of tasks throughout the lifetime of its execution, including re-scheduling on failure or aborting in case something bad happens.
  3. After the TaskSetManager is created, the backend creates work offers for all the executors and calls the TaskScheduler with these offers.
  4. TaskScheduler receives the work offers of the executors and iterates its TaskSetManagers and attempts to schedule the task sets on each of the available executor resources (remember this, we’ll come back to this soon).
  5. After all work has been assigned to the executors, the TaskScheduler notifies the CoarseGrainedSchedulerBackend which then serializes each task and send them off to the executors.

If you want to follow this flow in the code base that is a little complicated, the hierarchy looks roughly like this:

DAGScheduler.handleJobSubmitted -> DAGScheduler.submitStage -> DAGScheduler.submitMissingStages -> TaskScheduler.submitTasks -> CoarseGrainedSchedulerBackend.reviveOffers -> CoarseGrainedSchedulerBackend.makeOffers -> TaskScheduler.resourceOffers -> TaskScheduler.resourceOfferSingleTaskSet -> TaskSetManager.resourceOffer -> TaskScheduler.resourceOfferSingleTaskSet CoarseGrainedSchedulerBackend.launchTasks.

There is lots of bookeeping going on between these calls, but I just wanted to outline the high level orchestration of how tasks gets scheduled, I hope you’ve managed to follow. If not, don’t worry, just remember that when tasks are scheduled their owner is an object named TaskSetManager which controls everything related to the set of tasks. This object has an important method called resourceOffer, which we are going to look into.

TaskSetManager, TaskSetBlacklist and the blacklist bookkeeping

A new class was introduced in 2.1.0 called TaskSetBlacklist. It is in charge of bookeeping failed tasks. This class reads the configurations set by the users via the spark.blacklisting.* flags, and is consumed internally by each TaskSetManager keeping track of additional information:

private val MAX_TASK_ATTEMPTS_PER_EXECUTOR = conf.get(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR)
private val MAX_TASK_ATTEMPTS_PER_NODE = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
private val MAX_FAILURES_PER_EXEC_STAGE = conf.get(config.MAX_FAILURES_PER_EXEC_STAGE)
private val MAX_FAILED_EXEC_PER_NODE_STAGE = conf.get(config.MAX_FAILED_EXEC_PER_NODE_STAGE)

The mappings that keep track of failures look like this:

private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]()
private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()
val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()
private val blacklistedExecs = new HashSet[String]()
private val blacklistedNodes = new HashSet[String]()

As you can see, there are three maps and two hash sets:

  1. Node -> Executor Failures: Maps from a node to all its executors that have failed a task,
  2. Node -> Blacklisted Task Indexes: Indexes of task ids that have been blacklisted on that particular node,
  3. Executor -> Task Set Failure Count: Maps an executor to all it’s failures of a single task set.
  4. Black listed executors
  5. Black listed nodes

TaskSetBlacklist exposes a bunch of utility methods for the consumption from the TaskSetManager holding it. For example:

/**
* Return true if this executor is blacklisted for the given stage.  Completely ignores
* anything to do with the node the executor is on.  That
* is to keep this method as fast as possible in the inner-loop of the scheduler, where those
* filters will already have been applied.
*/
def isExecutorBlacklistedForTaskSet(executorId: String): Boolean = {
    blacklistedExecs.contains(executorId)
}

The real interesting method is the one in charge of updating task status upon failure, called updateBlacklistForFailedTask. This method is invoked by the TaskSetManager when the TaskScheduler signals a failed task:

private[scheduler] def updateBlacklistForFailedTask(
    host: String,
    exec: String,
    index: Int): Unit = {
  val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
  execFailures.updateWithFailure(index)

  // check if this task has also failed on other executors on the same host -- if its gone
  // over the limit, blacklist this task from the entire host.
  val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
  execsWithFailuresOnNode += exec
  val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
  execToFailures.get(exec).map { failures =>
      // We count task attempts here, not the number of unique executors with failures.  This is
      // because jobs are aborted based on the number task attempts; if we counted unique
      // executors, it would be hard to config to ensure that you try another
      // node before hitting the max number of task failures.
      failures.getNumTaskFailures(index)
    }
  }.sum
  if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) {
    nodeToBlacklistedTaskIndexes.getOrElseUpdate(host, new HashSet()) += index
  }

  // Check if enough tasks have failed on the executor to blacklist it for the entire stage.
  if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
    if (blacklistedExecs.add(exec)) {
      logInfo(s"Blacklisting executor ${exec} for stage $stageId")
      // This executor has been pushed into the blacklist for this stage.  Let's check if it
      // pushes the whole node into the blacklist.
      val blacklistedExecutorsOnNode =
        execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
      if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) {
        if (blacklistedNodes.add(host)) {
          logInfo(s"Blacklisting ${host} for stage $stageId")
        }
      }
    }
  }
}

Breaking down the execution flow:

  1. Update the count of failures for this executor for the given task id.
  2. Check if there were multiple failures of this task by other executors on the same node, if there were and we’ve exceeded the MAX_TASK_ATTEMPTS_PER_NODE the entire node is blacklisted for this particular task index.
  3. Check if this failure means we’ve exceeded the allowed number of failures for the entire stage on the given executor. If we have mark the entire stage as blacklisted for the executor.
  4. Check if this failure means we’ve exceeded the number of allowed executor failures for the node, If we have, blacklist the node for the entire stage.

The execution flow is pretty clear, we start from the smallest unit which is a single task, and end up checking the entire stage on the node which executed the task set.

We’ve now seen where the TaskSetManager updates internal status regarding the task execution, but when (and where) does it check if a particular task can be scheduled on a given executor/node? It does so exactly when the TaskScheduler asks it to assign a WorkOffer to an executor, inside the resourceOffer method call:

/**
* Respond to an offer of a single executor from the scheduler by finding a task
*
* NOTE: this function is either called with a maxLocality which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NO_PREF locality which will be not modified
*
* @param execId the executor Id of the offered resource
* @param host  the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
*/
@throws[TaskNotSerializableException]
def resourceOffer(
    execId: String,
    host: String,
    maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] = {
  val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
    blacklist.isNodeBlacklistedForTaskSet(host) ||
    blacklist.isExecutorBlacklistedForTaskSet(execId)

  if (!isZombie && !offerBlacklisted) {
      var allowedLocality = maxLocality

      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => { 
          // Do task queueing stuff.
      }
  }
}

taskSetBlacklistHelperOpt is a Option[TaskSetBlacklist] instance which is only set to Some[TaskSetBlacklist] if the flag was enabled in the configuration. Prior to assinging an offer to an executor, the TaskSetManager checks to see if the host/executor is blacklisted for the task set, if it isn’t, it returns an Option[TaskDescription], assigning the executor this particular task set.

However, there needs to be an additional check. As we’ve seen, we don’t only blacklist an entire task set, we also blacklist indiviudal tasks from running on an executor/node. For that, there is an additional call inside a method called dequeueTask to isTaskBlacklistedOnExecOrNode which checks if the task is blacklisted to run on the executor or node. If it is, attempts to schedule it on the next executor.

What happends if all nodes are blacklisted?

Good question! Spark has an additional validation method inside TaskScheduler.resourceOffers which kicks in if none of the tasks have been scheduled to run. This can happen when all nodes are blacklisted:

if (!launchedAnyTask) {
  taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}

I won’t go into the implementation of the method, which you can find here. This method validates that all the tasks actually can’t run due to blacklisting, and if it finds out they can’t, it aborts the task set.

Wrapping up

Spark 2.1.0 brings a new ability to configure blacklisting of problematic executors and even entire nodes. This feature can be useful when one is experiencing problems with a particular executor, such as network problems. If you are having failures at the node level, such as disk filled up which is failing your tasks, it is possible to block entire nodes from recieving task sets and entire stages.

The main players in this blacklisting games are the TaskSetManager, responsible for a given task set, and its TaskSetBlacklist instance which handles all the bookkeeping data on who failed where. Together they are consumed by the TaskScheduler which is the one in charge of the actual scheduling of the tasks.

I hope I’ve managed to explain to gory details of the implementation in an approachable way and that you have a basic understanding of what is going on under the covers.

MutableList And The Short Path To A StackOverflowError

When working with collections in Scala, or any other high level programming language, one does not always stop to think about the underlying implementation of the collection. We want to find the right tool for the right job, and we want to do is as fast as we can. The Scala collection library brings us a wealth of options, be them mutable or immutable, and it can sometimes become confusing which one we should choose.

An interesting case came about when a colleague of mine at work was running into a weird StackOverflowError when running a Spark job. We were seeing a long StackTrace both when trying to serialize with both Kryo and Java serializers.

The program looked rather innocent. Here is a close reproduce:

object Foo {
  sealed trait A
  case class B(i: Int, s: String)
  case class C(i: Int, x: Long)
  case class D(i: Int)

  case class Result(list: mutable.MutableList[Int])

  def doStuff(a: Seq[A]): Result = {
    val mutable = new collection.mutable.MutableList[Int]

    a.foreach {
      case B(i, _) => mutable += i
      case C(i, _) => mutable += i
      case D(i) => mutable += i
    }

    Result(mutable)
  }
}

The code that was running was iterating over a Seq[A], parsing them and appending them to a MutableList[Int]. The doStuff method was part of a Spark DStream.map operation, which was consuming records from Kafka, parsing them and them handing them off for some more stateful computation, which looked like:

object SparkJob {
  def main(args: Array[String]): Unit = {
    val dStream = KafkaUtil.createDStream(...)
    dStream
     .mapPartitions(it => Foo.doStuff(it.asSeq))
     .mapWithState(spec)
     .foreachRDD(x => // Stuff)
  }
}

One important thing to note is that this issue suddenly started appearing in their QA environment. There wasn’t much change to the code, all that was done was extending some classes which were inheriting A, and that more data was starting to come from Kafka, but there weren’t any major changes to the code base.

This was the exception we were seeing (this one for JavaSerializer):

Exception in thread “main” java.lang.StackOverflowError at java.io.ObjectStreamClass$WeakClassKey.(ObjectStreamClass.java:2351) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:326) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

The class hierarchy was a bit beefier than my simplified example. It consisted by itself of a view nested case classes, but none of them were hiding a recursive data structure. This was weird.

We started a divide and conquer approach, eliminating piece by piece of the code, trying to figure out which class was causing the trouble. Then, by mere chance, I looked at the MutableList and told him: “Lets try running this with an ArrayBuffer instead, and see what happens”. To our surprise, the Stackoverflow was gone and not reproducing anymore.

So what’s the deal with MutableList[A]?

After the long intro, let’s get down to the gory detail, what’s up with MutableList? Well, if we peek under the hood and look at the implementation, MutableList[T] is a simple LinkedList[T] with a first and last elements. It has both a method head of type T, and a method tail of type MutableList[A] (similar to List[A]):

@SerialVersionUID(5938451523372603072L)
class MutableList[A]
extends AbstractSeq[A]
   with LinearSeq[A]
   with LinearSeqOptimized[A, MutableList[A]]
   with GenericTraversableTemplate[A, MutableList]
   with Builder[A, MutableList[A]]
   with Serializable
{
  override def companion: GenericCompanion[MutableList] = MutableList
  override protected[this] def newBuilder: Builder[A, MutableList[A]] = new MutableList[A]

  protected var first0: LinkedList[A] = new LinkedList[A]
  protected var last0: LinkedList[A] = first0
  protected var len: Int = 0

  /** Returns the first element in this list
   */
  override def head: A = if (nonEmpty) first0.head else throw new NoSuchElementException

  /** Returns the rest of this list
   */
  override def tail: MutableList[A] = {
    val tl = new MutableList[A]
    tailImpl(tl)
    tl
  }

  // Shortened for brevity
}

When we attempt to serialize a recursive data structure such as LinkedList[A] or even deeply nested structures, be it Kryo or Java serialization, we need to traverse them all the way down. Since LinkedList[A] holds an element of type A and a pointer to the next element of LinkedList[A], we need to go deep down. Each time the serializer encounters a new class of type LinkedList[A] (which is the next pointer), it will open up a new stack frame and begin to iterate it to find the next element in line to be written. If we have many such elements that cause us to open a new frame, we eventually blow up.

I tried playing around to see how many elements we can fit in a MutableList[Int] before it explodes. I ran this test on Scala 2.12.0 and Java 1.8.0_91 in a x64 process (which should have a 1MB stack AFAIK), it took exactly 1335 elements to make this blow up:

object StackoverflowTest {
  def main(args: Array[String]): Unit = {
    val mutable = new collection.mutable.MutableList[Int]
    (1 to 1335).foreach(x => mutable += x)
    val ous = new ObjectOutputStream(new ByteArrayOutputStream())
    ous.writeObject(mutable)
    ous.close()
  }
}

But wait, isn’t List[A] in Scala also defined as a linked list?

How can it be that when using the equivalent with a List[A] in Scala, this doesn’t blow up?

object ListSerializationTest {
  def main(args: Array[String]): Unit = {
    val mutable = List.range(0, 1500)
    val ous = new ObjectOutputStream(new ByteArrayOutputStream())
    ous.writeObject(mutable)
    ous.close()
  }
}

And well, it doesn’t. Turns that List[A] has some secret sauce!

writeObject and readObject:

Every object that uses Java serialization can provide a private writeObject and readObject pair which lay out exactly how to serialize the object. Scala uses a custom class called SerializationProxy to provide an iterative version of serialization/deserialization for List[A]:

@SerialVersionUID(1L)
private class SerializationProxy[A](@transient private var orig: List[A]) extends Serializable {

  private def writeObject(out: ObjectOutputStream) {
    out.defaultWriteObject()
    var xs: List[A] = orig
    while (!xs.isEmpty) {
      out.writeObject(xs.head)
      xs = xs.tail
    }
    out.writeObject(ListSerializeEnd)
  }

  // Java serialization calls this before readResolve during de-serialization.
  // Read the whole list and store it in `orig`.
  private def readObject(in: ObjectInputStream) {
    in.defaultReadObject()
    val builder = List.newBuilder[A]
    while (true) in.readObject match {
      case ListSerializeEnd =>
        orig = builder.result()
        return
      case a =>
        builder += a.asInstanceOf[A]
    }
  }

  // Provide the result stored in `orig` for Java serialization
  private def readResolve(): AnyRef = orig
}

At runtime, the java serializer will reflect over the class and try to find these methods and use them to serialize or deserialize the object. This is exactly the reason why it works and we don’t blow up with a StackOverflowError in our face.

Conclusion

The first and most obvious thing, always consider the data structures you’re using and make sure they’re the right one for the job! Although the language provides us with high level abstractions of collections, strive to know what’s going on under the covers and make sure the collection you’re using won’t come back to bite you. If for performance reasons you’re looking to use a mutable collection, think about encapsulating the use of ArrayBuffer or ListBuffer internally and exposing an immutable Array[A] and List[A] respectively that you create once you’re done filling them up.

An generally remember that MutableList[A] is internally using a LinkedList[A] and there isn’t a custom writeObject and readObject pair implemented for them in the Scala collection, and watch out if you need to be transfering it over the wire.

These kind of bugs are hard to discover and we’re lucky that we found about them early and not in production.

The Case Of The Immutable Map and Object Who Forgot To Override HashCode

Disclaimer: What we’re about to look at is an implementation detail valid for the current point in time, and is valid for Scala 2.10.x and 2.11.x (not sure about previous versions). This is subject to change at any given time, and you should definitely not rely these side effect when writing code.

Consider the following example. Given a class Foo:

class Foo(val value: Int) {
    override def equals(obj: scala.Any): Boolean = obj match {
      case other: Foo => value == other.value
      case _ => false
    }
}

What would you expect the following to print?

import scala.collection.mutable

val immutableMap = Map(new Foo(1) -> 1)
val mutableMap = mutable.Map(new Foo(1) -> 1)

immutableMap.getOrElse(new Foo(1), -1)
mutableMap.getOrElse(new Foo(1), -1)

If you’re thinking: “Well, he didn’t override Object.hashCode, so both immutable and mutable Maps aren’t going to find the value. This is should fallback to -1, you might be surprised:

scala> immutableMap.getOrElse(new Foo(1), -1)
res3: Int = 1

scala> mutableMap.getOrElse(new Foo(1), -1)
res4: Int = -1

Hmmm.. What?

How is immutable.Map retrieving 1? and why is mutable.Map outputting the expected result (-1)? Maybe we’re just lucky and both objects have the same hash code?

scala> val first = new Foo(1)
first: Foo = Foo@687b0ddc

scala> val second = new Foo(1)
second: Foo = Foo@186481d4

scala> first.hashCode == second.hashCode
res6: Boolean = false

scala> first.hashCode
res7: Int = 1752894940

scala> second.hashCode
res8: Int = 409240020

Doesn’t seem so.

This is precisely the point of this post. We’ll see how Scala has a special implementation for immutable.Map and what side effects that might have.

The ground rules for custom objects as Map keys

To all accustomed with the Map data structure know that any object used as a key should obey the following rules:

  1. Override Object.equals - Equality, if not explicitly overridden, is reference equality. We desire such that not only the same instances be equal, but also two objects which follow our custom equality semantics, that their value fields be equal.
  2. Override Object.hashCode - Any two objects, if equal, should yield the same hash code, but not vice versa (see Pigeonhole principle). This is extremely important for objects used as keys of a Map, since (most) implementations relay on the hash code of the key to determine where the value will be stored. That same hash code will be used later when one requests a lookup by a given key.
  3. Hash code should be generated from immutable fields - It is common to use the objects fields as part of the hash code algorithm. If our value field was mutable, one could mutate it at runtime causing a different hash code to be generated, and a side effect of that would be not being able to retrieve it from the Map.

But our custom object doesn’t exactly follow these rules. It does override equals, but not hashCode.

This is where things get interesting.

The secret sauce of immutable.Map

Scala has a custom implementation for up to 4 key value pairs (Map1, …, Map4). These custom implementations don’t rely on the implementation of hashcode to find the entry in the Map, they simply store the key value pairs as fields. and do an equality check on the key:

class Map1[A, +B](key1: A, value1: B) extends AbstractMap[A, B] with Map[A, B] with Serializable {
    override def size = 1
    def get(key: A): Option[B] =
      if (key == key1) Some(value1) else None

You see that the key is directly compared to key1, and this is exactly why immutable.Map retrieves the 1, since our equals implementation is in order.

If we did some REPL tests for cases which are below and above 4 elements, we’d see inconsistent results that are caused by this implementation detail:

scala> val upToFourMap = Map(new Foo(1) -> 1, new Foo(2) -> 2, new Foo(3) -> 3, new Foo(4) -> 4)

scala> upToFourMap.getOrElse(new Foo(1), -1)
res2: Int = 1

scala> val upToFiveMap = Map(new Foo(1) -> 1, new Foo(2) -> 2, new Foo(3) -> 3, new Foo(4) -> 4, new Foo(5) -> 5)

scala> upToFiveMap.getOrElse(new Foo(1), -1)
res1: Int = -1

Once we leave to custom realm of the optimized immutable.Map implementations, we see the results we expect.

Correcting our Foo implementation

To set the record straight, lets put our object in order and override Object.hashCode:

override def hashCode(): Int = value.hashCode()

And now let’s re-run our tests:

scala> val upToFourMap = Map(new Foo(1) -> 1, new Foo(2) -> 2, new Foo(3) -> 3, new Foo(4) -> 4)

scala> upToFourMap.getOrElse(new Foo(1), -1)
res1: Int = 1

scala> val upToFiveMap = Map(new Foo(1) -> 1, new Foo(2) -> 2, new Foo(3) -> 3, new Foo(4) -> 4, new Foo(5) -> 5)

scala> upToFiveMap.getOrElse(new Foo(1), -1)
res3: Int = 1

We now see that once hashCode is in order, the results line up.

Summing up

In this post we looked at a corner case which we discovered through our faulty implementation mixed in with special immutable.Map implementation. When one uses a custom object as key, make sure to implement the prerequisites we’ve mentioned. This inconsistency, although caused by our flawed implementation, can be quite surprising.

Leveraging Spark Speculation To Identify And Re-Schedule Slow Running Tasks

Introduction

Sparks Speculation engine is a tool able to detect slow running tasks (we’ll soon see what that means) and re-schedule their execution. This can be especially helpful in jobs that require a strict batch to processing time ratio. In this post, we’ll go through the speculation algorithm and see an example of how one can put it to good use.

The speculation algorithm

The “spark.speculation” key in Spark Configuration page says:

If set to “true”, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.

Sparks Speculation allows us to express a “task running slowly in a stage” by the following flags:

Key Default Value Description
spark.speculation false If set to “true”, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.
spark.speculation.interval 100ms How often Spark will check for tasks to speculate.
spark.speculation.multiplier 1.5 How many times slower a task is than the median to be considered for speculation.
spark.speculation.quantile 0.75 Percentage of tasks which must be complete before speculation is enabled for a in particular stage.

You can add these flags to your spark-submit, passing them under --conf (pretty straightforward, as with other configuration values), e.g.:

spark-submit \
--conf "spark.speculation=true" \
--conf "spark.speculation.multiplier=5" \
--conf "spark.speculation.quantile=0.90" \
--class "org.asyncified.myClass" "path/to/uberjar.jar"

Each flag takes part in the general algorithm in the detection of a slow task. Let’s now break down the algorithm and see how it works.

For each stage in the current batch:

  1. Look if the amount of finished tasks in the stage are equal to or greater than speculation.quantile * numOfTasks, otherwise don’t speculate. (i.e. if a stage has 20 tasks and the quantile is 0.75, at least 15 tasks need finish for speculation to start).
  2. Scan all successfully executed tasks in the given stage and calculate a median of task execution time.
  3. Calculate the threshold that a task has to exceed in order to be eligible for re-launching. This is defined by speculation.multiplier * median.
  4. Iterate all the current running tasks in the stage. If there exists a task that’s running time is larger than threshold, re-launch the task.

That sounds great and all, but why would I want to use this?

Allow me to tell a short story: We run a solution based on Spark Streaming which needs to operate 24x7. In preparation for making this job production ready, I stress tested our Streaming job to make sure we’re going to be able to handle the foreseen production scale. Our service is part of a longer pipeline, and is responsible for consuming data from Kafka, doing some stateful processing and passing data along to the next component in the pipeline. Our stateful transformations uses Amazon S3 to store intermediate checkpoints in case of job failure.

During the stress tests run, approximately after 15-20 hours, I noticed a strange behavior. During execution of an arbitrary batch, one task would block indefinitely on a socketRead0 call with the following stack trace:

java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:170) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) sun.security.ssl.InputRecord.read(InputRecord.java:532) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387) ….. org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:472) org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:168) org.apache.spark.rdd.ReliableCheckpointRDDanonfun$writeRDDToCheckpointDirectory$1.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) org.apache.spark.scheduler.Task.run(Task.scala:89) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)

If I kill and re-run the same task, it would run fine without hanging. This wasn’t an easily reproducible bug. (Turns out we were hitting a bug (more accurately HTTPCLIENT-1478) in HttpComponents.HttpClient, a library used by Spark and org.apache.hadoop.fs.s3native.NativeS3FileSystem to perform HTTP/API calls to S3).

The problem with the hang was that Spark has to wait for a every one of the stages in a batch to complete before starting to process the next one in queue, and if a single task in the batch is stuck, the entire streaming job now starts to accumulate incoming future batches, only to resume once the previous batch completed.

The problem was that the task itself was actually hanging indefinitely. We were in quite a pickle…

There were a few possible solutions to the problem:

  1. Pass the bugless HttpComponents.HttpClient to Sparks master/worker nodes classpath - If we can get Spark to see the newer version of the library before loading it’s own problematic version containing the bug (with hope that the API compatibility was kept between versions), this might work. Problem is, anyone who had to go head to head with Spark and class loading prioritization knows this is definitely not a pleasant journey to get into.

  2. Upgrade to Spark 2.0.0 - Spark 2.0.0 comes with v4.5.4 of the library, which fixed the bug we were hitting. This was problematic as we’re already running Spark in production, and we’d have to do full regression, not to speak of making sure everything is compatible between our current running version (1.6.1) and 2.0.0.

  3. Turn on speculation - Turn on speculation and “teach it” to identify the rebellious task, kill it and re-schedule.

The choice was split into two. The long run solution is of course upgrading to Spark 2.0.0. But at this point in time, we already have a running streaming job in production, and we need this workaround to work. So we choose to turn on speculation while we planning the upgrade.

But wait… isn’t this a hack?

Yes, this is definitely a workaround to the actual problem. It doesn’t actually solve HttpClient arbitrarily hanging every now and then. But when running in production, you’re sometimes need to “buy time” until you can actually come up with a suitable solution, and speculation hit the nail on the head and allowed the streaming job to run continuously.

Speculation doesn’t magically make your job work faster. Re-scheduling actually has an overhead that will cause the particular batch to incur a scheduling and processing delay, so it needs to be used wisely and with caution.

How will I know a task was marked for re-scheduling due to speculation?

In the Spark UI, when drilling down into a specific stage, one can view all the running/completed tasks per that stage. If a task was re-scheduled, it’s name will be decorated with “(speculated)”, and you’ll usually see a different locality for that given task (Usually, it will run on locality ANY).

Tuning speculation to fit your requirements

When turning on speculation we tried hard to configure it to spot the task that was getting blocked due to the bug. Our bug would reproduce once every 15-20 hours, and it would cause the task to block indefinitely, which is pretty easy to identify. We set the multiplier so it would identify tasks taking 8 times longer than others, and kept the quantile as is (0.75). We didn’t care that it would take time until the problematic task was identified and “eligible” for re-scheduling, as long as other tasks weren’t mistakenly marked as for re-scheduling for no reason.

Wrapping up

Spark speculation is a great tool to have at your disposal. It can identify out of the ordinary slow running tasks and re-schedule them to execute on a different worker. This can be helpful, as we’ve seen, in cases where a slow running task might be caused due to a bug. If you decide to turn it on, take the time to adjust it with the right configuration for you, it is a trail and error path until you feel comfortable with the right settings. It is important to note that speculation isn’t going to fix your slow job and that’s not it’s purpose at all. If your overall job performance is lacking, speculation is not the weapon of choice.

Appendix

For the brave, here’s the actual piece of code that does the calculation for speculation. It is defined inside TaskSetManager.checkSpeculatableTasks:

/**
   * Check for tasks to be speculated and return true if there are any. This is called periodically
   * by the TaskScheduler.
   *
   * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
   * we don't scan the whole task set. It might also help to make this sorted by launch time.
   */
  override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
    // Can't speculate if we only have one task, and no need to speculate if the task set is a
    // zombie.
    if (isZombie || numTasks == 1) {
      return false
    }
    var foundTasks = false
    val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
    logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
    if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
      val time = clock.getTimeMillis()
      val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
      Arrays.sort(durations)
      val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1))
      val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
      // TODO: Threshold should also look at standard deviation of task durations and have a lower
      // bound based on that.
      logDebug("Task length threshold for speculation: " + threshold)
      for ((tid, info) <- taskInfos) {
        val index = info.index
        if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
          !speculatableTasks.contains(index)) {
          logInfo(
            "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
              .format(index, taskSet.id, info.host, threshold))
          speculatableTasks += index
          foundTasks = true
        }
      }
    }
    foundTasks
  }