Today I hit a code performance obstacle with Apache Spark. I had:
dataset: RDD[(String, Array[Long, Double])]
which contained my original dataset (with only 3 keys, i.e. 3 distinct time series of data), and the other RDD was:
transformations: RDD[(String, List[String])]
whose key referred to the unique variable names I gave to each vector in my dataset, and the List[String] for each of them was a set of transformations (example: "log", "lag by 5", "differentiate", etc.) to apply to these vectors. There could be several occurrences of the same key in the transformations RDD (I needed to apply different sequences of transformations on the same time series).
I made the mistake of initially doing:
The join took 1-2 minutes to complete even with small vectors of 1000 to 2000 data points. Needless to say, it was absurdly slow.
After some googling I discovered that:
Instead, I followed a recommendation on one of the posts I stumbled upon to write my own hash join, since once of my datasets (the dataset RDD) has a very small amount of keys. (And I know, from the business logic context, that it will always be the case)
My code ended up looking like this (simplifying a bit to reduce business context-specific complexity in the input and resulting RDDs in my actual code):
dataset.join(transformations) //before doing .mapPartitions() to actually apply the transformations
val joinedSet = transformations.map(t => (t._1, (t._2, dataset.lookup(t._1)))
The hash join is a classic algorithm to handle the case where one table (or data structure) has a much smaller set of keys/rows than the other table to be joined against.
Essentially, you iterate on each row of the larger table, looking up (hash lookup) the corresponding row in the smaller table at each step.
In a functional language this takes 1 line of code, but in a more imperative one it would look a bit like:
outputList = new List()
(note that the outputList is a list and not a dictionary, to reflect the fact that key-value pair RDDs are not necessarily unique by key)
In any case, with this new line of code instead of the join, the code ran in a matter of 100 milliseconds to 2 seconds.