Apache Spark – implicits object (Implicits Conversions)

Apache Spark – implicits object (Implicits Conversions)

In the SparkSession class there is one object defined as implicits, which extends SQLImplicits abstract class. So once we have SparkSession instance available we can import implicits.

import org.apache.spark.sql.SparkSession
 
val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("learn")
      .getOrCreate()

import spark.implicits._
        

SQLImplicits – A collection of implicit methods for converting common Scala native objects or RDD into DataFrame/Datasets through implicit Encoders. See below examples implicits methods:

  • localSeqToDatasetHolder – Creates a Dataset/Dataframe from a local Scala Seq.
  • rddToDatasetHolder – Creates a Dataset/Dataframe from an RDD.

localSeqToDatasetHolder

Convert a local sequence to Dataset/Dataframe.

Method Signature:


implicit def localSeqToDatasetHolder[T](s: Seq[T])(implicit evidence$7: Encoder[T]): DatasetHolder[T]
        

Input: Seq[T], implicit Encoder[T] Ouput: DatasetHolder[T]

DatasetHolder is created implicitly when localSeqToDatasetHolder/rddToDatasetHolder implicit conversions is used. DatasetHolder has toDS and toDF methods that simply return the Dataset[T]/DataFrame depending on what method has been used toDS/toDF.

DatasetHolder – Case Class Implementation:


case class DatasetHolder[T] private[sql] (private val ds : org.apache.spark.sql.Dataset[T]) extends scala.AnyRef with scala.Product with scala.Serializable {
  def toDS() : org.apache.spark.sql.Dataset[T] = { ... }
  def toDF() : org.apache.spark.sql.DataFrame = { ... }
  def toDF(colNames : _root_.scala.Predef.String*) : org.apache.spark.sql.DataFrame = { ... }
}
        

Example Seq.toDF():

In this example we have sequence of tuples, then we convert it to dataframe using toDF. Implicitly it uses the localSeqToDatasetHolder & encoder.


import org.apache.spark.sql.SparkSession

object Learn {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("learn")
      .getOrCreate()

    val localSeqData = Seq(("James","16"),("Michael","26"),("Robert","30"))

    import spark.implicits.{newProductEncoder,localSeqToDatasetHolder}

    val df = localSeqData.toDF()
    df.printSchema()
    df.show()
  }
}
        

rddToDatasetHolder

Convert a RDD to Dataset/Dataframe.

Method Signature:


implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit evidence$6: Encoder[T]): DatasetHolder[T]
        

Example rdd.toDF():

In this example we have sequence of tuples, then we convert it to RDD using parallelize & then to dataframe using toDF. Implicitly it uses the rddToDatasetHolder & encoder.


import org.apache.spark.sql.SparkSession

object Learn {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("learn")
      .getOrCreate()

    val localSeqData = Seq(("James","16"),("Michael","26"),("Robert","30"))

    val rdd = spark.sparkContext.parallelize(localSeqData)

    import spark.implicits.{newProductEncoder,rddToDatasetHolder}

    val df = rdd.toDF()
    df.printSchema()
    df.show()
  }
}
        

newProductEncoder

In both examples we have used newProductEncoder from spark.implicits. As in our local sequence data we have tuples (tuples are subclasses of scala.Product), hence this encoder works fine of this usecase. There are many other encoders present in the spark.implicits. Depending on requirement we can import or we can import everything by giving import spark.implicits._

Implementation of newProductEncoder is present in trait LowPrioritySQLImplicit, which is extended by SQLImplicits.

Method Signature:


implicit def newProductEncoder [T <: Product](implicit evidence$8: runtime.universe.TypeTag[T]): Encoder[T]
        

It takes input of type T, where type T is bounded to be a subclass of scala.Product. Like tuples and case classes are subclasses of scala.Product. Return Encoder[T] implicitly, which is used in localSeqToDatasetHolder/rddToDatasetHolder.

References

To view or add a comment, sign in

More articles by Shobhit Singh

  • Apache Spark Catalyst Optimizer

    At the core of Spark SQL is the Catalyst Optimizer. Let's explore its different phases.

    1 Comment
  • EDA using Apache Spark

    There are bunch of methods available in Apache Spark for performing Exploratory Data Analysis. In this article we are…

  • Apache Spark – SparkSession

    SparkSession was introduced in version Spark 2.0.

  • Apache Spark - createDataFrame

    createDataFrame is an overloaded method present in SparkSession class type (org.apache.

  • Scala Object

    First thing first – Don’t get confused with the instance of an class. When we create an instance of an class, we…

  • Configuring Spark Application

    Apache Spark includes a number of different configurations. Depending on what we are trying to achieve.

  • Python Identifiers - Non-ASCII

    Identifiers i.e.

    1 Comment

Others also viewed

Explore content categories