Why you should never join Data frames on a function (Apache Spark)
Apache Spark will always try to optimize your queries unless of course, if you don't let it.
Case and point, join on a function instead of columns values.
When you ask Spark to join on column values it'll try to break the two data frames to different executors, based on the column values.
In this example it might send the age column values 1-1000 of the men and women data frames to the same executor while sending 1001-2000 of the data frames to a different executor and so on, Thus it'll join chunks of the data frame in parallel on each executor.
However, if Spark cannot tell in advance what are the join values, if the join is on a function, it has to push a copy of the right dataframe to all the executors and merge it as a cartesian multiplication. Not fun.
Consider the following Scala code:
test("join on column vs join on function"){
val men = createPersonDF("M",1000000)
val women = createPersonDF("F",1000000)
var startTs = System.currentTimeMillis()
// this taked ~3 seconds on my computer
var resultA = men.join(women,Seq("age")).collect
println(s"join col ${System.currentTimeMillis()-startTs}")
startTs = System.currentTimeMillis()
val eqUDF = udf((a:Long,b:Long)=>fn.eq(a,b))
// this similar join will take about 20 minutes!
val resultB = men.join(women,eqUDF(men.col("age"),women.col("age"))).collect()
println(s"join function ${System.currentTimeMillis()-startTs}")
}
def createPersonDF(gender:String, n : Int) = {
sqlContext.createDataFrame(sc.parallelize(Range(1, n)
.map(x => Person("X" + x, x, gender))))
}
}
case class Person(name :String,age : Long, sex : String)
object fn {
def eq(a :Long, b:Long) = a==b
}
How bad is this?
On my dev computer (Mac pro, I7, 16GB) the first join took about 3 seconds while the other took about 20 minutes.
You can not always join on column values, sometimes there is a complex relationship between the data frames that requires a different approach.
In these cases you should look for a specific solution, perhaps, if one table is small you can use it as a lookup table, if both data frames are of similar structure you may want to union them and then iterate on the result data frame to produce a similar effect.