Apache Spark – implicits object (Implicits Conversions)
In the SparkSession class there is one object defined as implicits, which extends SQLImplicits abstract class. So once we have SparkSession instance available we can import implicits.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("learn")
.getOrCreate()
import spark.implicits._
SQLImplicits – A collection of implicit methods for converting common Scala native objects or RDD into DataFrame/Datasets through implicit Encoders. See below examples implicits methods:
localSeqToDatasetHolder
Convert a local sequence to Dataset/Dataframe.
Method Signature:
implicit def localSeqToDatasetHolder[T](s: Seq[T])(implicit evidence$7: Encoder[T]): DatasetHolder[T]
Input: Seq[T], implicit Encoder[T] Ouput: DatasetHolder[T]
DatasetHolder is created implicitly when localSeqToDatasetHolder/rddToDatasetHolder implicit conversions is used. DatasetHolder has toDS and toDF methods that simply return the Dataset[T]/DataFrame depending on what method has been used toDS/toDF.
DatasetHolder – Case Class Implementation:
case class DatasetHolder[T] private[sql] (private val ds : org.apache.spark.sql.Dataset[T]) extends scala.AnyRef with scala.Product with scala.Serializable {
def toDS() : org.apache.spark.sql.Dataset[T] = { ... }
def toDF() : org.apache.spark.sql.DataFrame = { ... }
def toDF(colNames : _root_.scala.Predef.String*) : org.apache.spark.sql.DataFrame = { ... }
}
Example Seq.toDF():
In this example we have sequence of tuples, then we convert it to dataframe using toDF. Implicitly it uses the localSeqToDatasetHolder & encoder.
Recommended by LinkedIn
import org.apache.spark.sql.SparkSession
object Learn {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("learn")
.getOrCreate()
val localSeqData = Seq(("James","16"),("Michael","26"),("Robert","30"))
import spark.implicits.{newProductEncoder,localSeqToDatasetHolder}
val df = localSeqData.toDF()
df.printSchema()
df.show()
}
}
rddToDatasetHolder
Convert a RDD to Dataset/Dataframe.
Method Signature:
implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit evidence$6: Encoder[T]): DatasetHolder[T]
Example rdd.toDF():
In this example we have sequence of tuples, then we convert it to RDD using parallelize & then to dataframe using toDF. Implicitly it uses the rddToDatasetHolder & encoder.
import org.apache.spark.sql.SparkSession
object Learn {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("learn")
.getOrCreate()
val localSeqData = Seq(("James","16"),("Michael","26"),("Robert","30"))
val rdd = spark.sparkContext.parallelize(localSeqData)
import spark.implicits.{newProductEncoder,rddToDatasetHolder}
val df = rdd.toDF()
df.printSchema()
df.show()
}
}
newProductEncoder
In both examples we have used newProductEncoder from spark.implicits. As in our local sequence data we have tuples (tuples are subclasses of scala.Product), hence this encoder works fine of this usecase. There are many other encoders present in the spark.implicits. Depending on requirement we can import or we can import everything by giving import spark.implicits._
Implementation of newProductEncoder is present in trait LowPrioritySQLImplicit, which is extended by SQLImplicits.
Method Signature:
implicit def newProductEncoder [T <: Product](implicit evidence$8: runtime.universe.TypeTag[T]): Encoder[T]
It takes input of type T, where type T is bounded to be a subclass of scala.Product. Like tuples and case classes are subclasses of scala.Product. Return Encoder[T] implicitly, which is used in localSeqToDatasetHolder/rddToDatasetHolder.