Spark SQL Window functions using plain SQL.

Spark SQL Window functions using plain SQL.

Spark got several window functions, which are...

Ranking functions : row_number(), rank(), dense_rank(), percent_rank(), ntile()

Analytical functions : cume_dist() , lag() , lead()

Aggregate functions : Sum(), first(), last(), max(), min(), mean(), stddev() etc...

In this post, we are going to explore some of window functions in Spark SQL. These are the following some of the analytics function which we are going to discuss :

  • ROW_NUMBER
  • RANK
  • DENSE_RANK
  • CUME_DIST
  • PERCENT_RANK
  • NTILE

This is the the test data which I am applying windows analytical functions

ID,FIRST_NAME,LAST_NAME,DESIGNATION,DEPARTMENT,SALARY
1001,Ram,Ghadiyaram,Director of Sales,Sales,30000
1002,Ravi,Rangasamy,Marketing Manager,Sales,25000
1003,Ramesh, Rangasamy,Assistant Manager,Sales,25000
1004,Prem,Sure,Account Coordinator,Account,15000
1005,Phani ,G,Accountant II,Account,20000
1006,Krishna,G,Account Coordinator,Account,15000
1007,Rakesh,Krishnamurthy,Assistant Manager,Sales,25000
1008,Gally,Johnson,Manager,Account,28000
1009,Richard,Grill,Account Coordinator,Account,12000
1010,Sofia,Ketty,Sales Coordinator,Sales,20000

Row Number :

This is an analytics function which represents each row with a unique value(1,2,3,….) based on the column value used in OVER clause.

Query :

SELECT department, salary
, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num 
FROM emp_dept_tbl

RANK :

The RANK analytics function is used to assign a rank to the rows based on the column values in OVER clause. The row with equal values assigned the same rank with next rank value skipped.

Query :

 SELECT department, salary
, RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk 
FROM emp_dept_tbl

see the result in the below code list and result at the bottom

Here, if you see all rows with salary 15000 in department account and 25000 in department sales having the same rank as 3 and 2 respectively and next row with next salary have rank 5 with next rank value skipped.

DENSE_RANK

The DENSE_RANK analytics function in spark-sql/hive used to assign a rank to each row. The rows with equal values receive the same rank and this rank assigned in the sequential order so that no rank values are skipped.

Here, the rank value has not been skipped like in RANK function.

Query :

SELECT department, salary
, DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk 
FROM emp_dept_tbl

CUME_DIST

This function stands for cumulative distribution. It computes the relative position of a column value in a group. Here, we can calculate the cumulative distribution of salaries among all departments. For a row, the cumulative distribution of salary is calculated as:

Cum_dist(salary) = Number of rows with the value lower than or equals to salary / total number of rows in the dataset

Query :

SELECT department, salary
, CUME_DIST() OVER (PARTITION BY department  ORDER BY salary DESC) AS cum_dist 
FROM emp_dept_tbl

PERCENT_RANK

It is very similar to the CUME_DIST function. It ranks the row as a percentage. The first row in any dataset has percent_rank 0 and the return value is of the double type.

Let’s rank the salary department wise as percentage:

Here, first it is getting the rank of each row in the dataset and then calculating the percentage rank using below formula:

Percent_Rank = (rank decreased by 1)/(remaining rows in the group)

Query :

SELECT department, salary
, RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk, PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS perc_rnk FROM emp_dept_tbl

see the result in the below code list and result at the bottom

For example, if we take the salary 15000 from Account and 20000 from the Sales department. The rank of these two rows are 3 and 5 respectively and Total remaining rows in each department is 4, then percent rank would be calculated as:

(3-1)/4 = 0.5 and (5-1)/4 = 1.

NoteWe are decreasing by 1 as the first row in each group will start from value 0.

NTILE

It divides the number of rows in a partition into a specific number of ranked groups (bucket) as equally as possible. It returns a bucket member associated with it.

Query :

SELECT department, salary
, NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS ntile 
FROM emp_dept_tbl

See the result in the below code list and result at the bottom

Here, we have divided the data set into 4 buckets.

Remainder = (total records in each partition)/NTILES

Remainder = 5%4 = 1

So, the extra 1 value (the remainder of 5%4) is allocated to bucket 1, which therefore have one more value than rest of the buckets(2, 3, & 4).

Complete code listing here :

package com.examples

import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}

/**
  * @author : Ram Ghadiyaram
  */
object AnalyticalFunctionTest extends App with Logging {
  Logger.getLogger("org").setLevel(Level.WARN)


  val spark: SparkSession = SparkSession.builder().master("local[*]").appName(this.getClass.getName)

    .config("spark.sql.warehouse.dir", new java.io.File("spark-warehouse").getAbsolutePath)
    .master("local[*]")
    .getOrCreate()
  //spark.sparkContext.getConf.getAll.foreach(println)

  import spark.implicits._

  val csvData: Dataset[String] = spark.sparkContext.parallelize(
    """
      |ID,FIRST_NAME,LAST_NAME,DESIGNATION,DEPARTMENT,SALARY
      |1001,Ram,Ghadiyaram,Director of Sales,Sales,30000
      |1002,Ravi,Rangasamy,Marketing Manager,Sales,25000
      |1003,Ramesh, Rangasamy,Assistant Manager,Sales,25000
      |1004,Prem,Sure,Account Coordinator,Account,15000
      |1005,Phani ,G,Accountant II,Account,20000
      |1006,Krishna,G,Account Coordinator,Account,15000
      |1007,Rakesh,Krishnamurthy,Assistant Manager,Sales,25000
      |1008,Gally,Johnson,Manager,Account,28000
      |1009,Richard,Grill,Account Coordinator,Account,12000
      |1010,Sofia,Ketty,Sales Coordinator,Sales,20000
      |""".stripMargin.lines.toList).toDS()
  val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
  frame.show()
  frame.printSchema()
  frame.createOrReplaceTempView("emp_dept_tbl")
  println("1) Row number :    ")
  spark.sql(
    """
      |SELECT department, salary, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC)
      |AS row_num FROM emp_dept_tbl
      |""".stripMargin).show
  println("2) Rank  :The RANK analytics function is used to assign a rank to the rows based on the column values in OVER clause. The row with equal values assigned the same rank with next rank value skipped.    ")
  spark.sql(
    """
      |SELECT department, salary, RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk FROM emp_dept_tbl
      |""".stripMargin
  ).show
  println("3) DENSE_RANK  :   The DENSE_RANK analytics function in spark-sql/hive used to assign a rank to each row. The rows with equal values receive the same rank and this rank assigned in the sequential order so that no rank values are skipped. ")
  spark.sql(
    """
      |SELECT department, salary, DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk FROM emp_dept_tbl
      |""".stripMargin
  ).show
  println("All 3 Row number, Rank, and Dense Rank together ")
  spark.sql(
    """
      |SELECT department AS dept, salary AS sal,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,
DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk
FROM emp_dept_tbl
      |""".stripMargin
  ).show
  spark.sql(
    """
      |SELECT department, salary, RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk FROM emp_dept_tbl
      |""".stripMargin
  ).show
  println("CUME_DIST :This function stands for cumulative distribution. It computes the relative position of a column value in a group. ")
  spark.sql(
  """
    |SELECT department, salary, CUME_DIST() OVER (PARTITION BY department  ORDER BY salary DESC) AS cum_dist FROM emp_dept_tbl
    |""".stripMargin
).show
  println("PERCENT_RANK : It is very similar to the CUME_DIST function. It ranks the row as a percentage. The first row in any dataset has percent_rank 0 and the return value is of the double type.")
  spark.sql(
    """
      |SELECT department, salary, RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk, PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS perc_rnk FROM emp_dept_tbl
      |""".stripMargin
  ).show
  println("NTILE : It divides the number of rows in a partition into a specific number of ranked groups (bucket) as equally as possible. It returns a bucket member associated with it.")
  spark.sql(
    """
      |SELECT department, salary, NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS ntile FROM emp_dept_tbl
      |""".stripMargin
  ).show
}

Result :

2020-11-29 02:32:19 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+----+----------+-------------+-------------------+----------+------+
|  ID|FIRST_NAME|    LAST_NAME|        DESIGNATION|DEPARTMENT|SALARY|
+----+----------+-------------+-------------------+----------+------+
|1001|       Ram|   Ghadiyaram|  Director of Sales|     Sales| 30000|
|1002|      Ravi|    Rangasamy|  Marketing Manager|     Sales| 25000|
|1003|    Ramesh|    Rangasamy|  Assistant Manager|     Sales| 25000|
|1004|      Prem|         Sure|Account Coordinator|   Account| 15000|
|1005|    Phani |            G|      Accountant II|   Account| 20000|
|1006|   Krishna|            G|Account Coordinator|   Account| 15000|
|1007|    Rakesh|Krishnamurthy|  Assistant Manager|     Sales| 25000|
|1008|     Gally|      Johnson|            Manager|   Account| 28000|
|1009|   Richard|        Grill|Account Coordinator|   Account| 12000|
|1010|     Sofia|        Ketty|  Sales Coordinator|     Sales| 20000|
+----+----------+-------------+-------------------+----------+------+


root
 |-- ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- DESIGNATION: string (nullable = true)
 |-- DEPARTMENT: string (nullable = true)
 |-- SALARY: integer (nullable = true)


1) Row number :    
+----------+------+-------+
|department|salary|row_num|
+----------+------+-------+
|     Sales| 30000|      1|
|     Sales| 25000|      2|
|     Sales| 25000|      3|
|     Sales| 25000|      4|
|     Sales| 20000|      5|
|   Account| 28000|      1|
|   Account| 20000|      2|
|   Account| 15000|      3|
|   Account| 15000|      4|
|   Account| 12000|      5|
+----------+------+-------+


2) Rank  :The RANK analytics function is used to assign a rank to the rows based on the column values in OVER clause. The row with equal values assigned the same rank with next rank value skipped.    
+----------+------+---+
|department|salary|rnk|
+----------+------+---+
|     Sales| 30000|  1|
|     Sales| 25000|  2|
|     Sales| 25000|  2|
|     Sales| 25000|  2|
|     Sales| 20000|  5|
|   Account| 28000|  1|
|   Account| 20000|  2|
|   Account| 15000|  3|
|   Account| 15000|  3|
|   Account| 12000|  5|
+----------+------+---+


3) DENSE_RANK  :   The DENSE_RANK analytics function in spark-sql/hive used to assign a rank to each row. The rows with equal values receive the same rank and this rank assigned in the sequential order so that no rank values are skipped. 
+----------+------+-------+
|department|salary|dns_rnk|
+----------+------+-------+
|     Sales| 30000|      1|
|     Sales| 25000|      2|
|     Sales| 25000|      2|
|     Sales| 25000|      2|
|     Sales| 20000|      3|
|   Account| 28000|      1|
|   Account| 20000|      2|
|   Account| 15000|      3|
|   Account| 15000|      3|
|   Account| 12000|      4|
+----------+------+-------+


All 3 Row number, Rank, and Dense Rank together 
+-------+-----+-------+---+-------+
|   dept|  sal|row_num|rnk|dns_rnk|
+-------+-----+-------+---+-------+
|  Sales|30000|      1|  1|      1|
|  Sales|25000|      2|  2|      2|
|  Sales|25000|      3|  2|      2|
|  Sales|25000|      4|  2|      2|
|  Sales|20000|      5|  5|      3|
|Account|28000|      1|  1|      1|
|Account|20000|      2|  2|      2|
|Account|15000|      3|  3|      3|
|Account|15000|      4|  3|      3|
|Account|12000|      5|  5|      4|
+-------+-----+-------+---+-------+


+----------+------+---+
|department|salary|rnk|
+----------+------+---+
|     Sales| 30000|  1|
|     Sales| 25000|  2|
|     Sales| 25000|  2|
|     Sales| 25000|  2|
|     Sales| 20000|  5|
|   Account| 28000|  1|
|   Account| 20000|  2|
|   Account| 15000|  3|
|   Account| 15000|  3|
|   Account| 12000|  5|
+----------+------+---+


CUME_DIST :This function stands for cumulative distribution. It computes the relative position of a column value in a group. 
+----------+------+--------+
|department|salary|cum_dist|
+----------+------+--------+
|     Sales| 30000|     0.2|
|     Sales| 25000|     0.8|
|     Sales| 25000|     0.8|
|     Sales| 25000|     0.8|
|     Sales| 20000|     1.0|
|   Account| 28000|     0.2|
|   Account| 20000|     0.4|
|   Account| 15000|     0.8|
|   Account| 15000|     0.8|
|   Account| 12000|     1.0|
+----------+------+--------+


PERCENT_RANK : It is very similar to the CUME_DIST function. It ranks the row as a percentage. The first row in any dataset has percent_rank 0 and the return value is of the double type.
+----------+------+---+--------+
|department|salary|rnk|perc_rnk|
+----------+------+---+--------+
|     Sales| 30000|  1|     0.0|
|     Sales| 25000|  2|    0.25|
|     Sales| 25000|  2|    0.25|
|     Sales| 25000|  2|    0.25|
|     Sales| 20000|  5|     1.0|
|   Account| 28000|  1|     0.0|
|   Account| 20000|  2|    0.25|
|   Account| 15000|  3|     0.5|
|   Account| 15000|  3|     0.5|
|   Account| 12000|  5|     1.0|
+----------+------+---+--------+


NTILE : It divides the number of rows in a partition into a specific number of ranked groups (bucket) as equally as possible. It returns a bucket member associated with it.
+----------+------+-----+
|department|salary|ntile|
+----------+------+-----+
|     Sales| 30000|    1|
|     Sales| 25000|    1|
|     Sales| 25000|    2|
|     Sales| 25000|    3|
|     Sales| 20000|    4|
|   Account| 28000|    1|
|   Account| 20000|    1|
|   Account| 15000|    2|
|   Account| 15000|    3|
|   Account| 12000|    4|
+----------+------+-----+

Note : In the case of cume_dist() Here, In Sales department if we take salary 25000 , the cumulative distributions is 0.8 that means 80% employee’s salaries are less than or equal to 25000 Similarly in Account department , if we take salary 20000 , the cumulative distributions is 0.4 that means 40% employee’s salaries are less than or equal to 20000.

Happy learning

No alt text provided for this image


To view or add a comment, sign in

More articles by Ram Ghadiyaram

Others also viewed

Explore content categories