Spark SQL Window functions using plain SQL.
Spark got several window functions, which are...
Ranking functions : row_number(), rank(), dense_rank(), percent_rank(), ntile()
Analytical functions : cume_dist() , lag() , lead()
Aggregate functions : Sum(), first(), last(), max(), min(), mean(), stddev() etc...
In this post, we are going to explore some of window functions in Spark SQL. These are the following some of the analytics function which we are going to discuss :
- ROW_NUMBER
- RANK
- DENSE_RANK
- CUME_DIST
- PERCENT_RANK
- NTILE
This is the the test data which I am applying windows analytical functions
ID,FIRST_NAME,LAST_NAME,DESIGNATION,DEPARTMENT,SALARY 1001,Ram,Ghadiyaram,Director of Sales,Sales,30000 1002,Ravi,Rangasamy,Marketing Manager,Sales,25000 1003,Ramesh, Rangasamy,Assistant Manager,Sales,25000 1004,Prem,Sure,Account Coordinator,Account,15000 1005,Phani ,G,Accountant II,Account,20000 1006,Krishna,G,Account Coordinator,Account,15000 1007,Rakesh,Krishnamurthy,Assistant Manager,Sales,25000 1008,Gally,Johnson,Manager,Account,28000 1009,Richard,Grill,Account Coordinator,Account,12000 1010,Sofia,Ketty,Sales Coordinator,Sales,20000
Row Number :
This is an analytics function which represents each row with a unique value(1,2,3,….) based on the column value used in OVER clause.
Query :
SELECT department, salary , ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num FROM emp_dept_tbl
RANK :
The RANK analytics function is used to assign a rank to the rows based on the column values in OVER clause. The row with equal values assigned the same rank with next rank value skipped.
Query :
SELECT department, salary , RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk FROM emp_dept_tbl
see the result in the below code list and result at the bottom
Here, if you see all rows with salary 15000 in department account and 25000 in department sales having the same rank as 3 and 2 respectively and next row with next salary have rank 5 with next rank value skipped.
DENSE_RANK
The DENSE_RANK analytics function in spark-sql/hive used to assign a rank to each row. The rows with equal values receive the same rank and this rank assigned in the sequential order so that no rank values are skipped.
Here, the rank value has not been skipped like in RANK function.
Query :
SELECT department, salary , DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk FROM emp_dept_tbl
CUME_DIST
This function stands for cumulative distribution. It computes the relative position of a column value in a group. Here, we can calculate the cumulative distribution of salaries among all departments. For a row, the cumulative distribution of salary is calculated as:
Cum_dist(salary) = Number of rows with the value lower than or equals to salary / total number of rows in the dataset
Query :
SELECT department, salary , CUME_DIST() OVER (PARTITION BY department ORDER BY salary DESC) AS cum_dist FROM emp_dept_tbl
PERCENT_RANK
It is very similar to the CUME_DIST function. It ranks the row as a percentage. The first row in any dataset has percent_rank 0 and the return value is of the double type.
Let’s rank the salary department wise as percentage:
Here, first it is getting the rank of each row in the dataset and then calculating the percentage rank using below formula:
Percent_Rank = (rank decreased by 1)/(remaining rows in the group)
Query :
SELECT department, salary , RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk, PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS perc_rnk FROM emp_dept_tbl
see the result in the below code list and result at the bottom
For example, if we take the salary 15000 from Account and 20000 from the Sales department. The rank of these two rows are 3 and 5 respectively and Total remaining rows in each department is 4, then percent rank would be calculated as:
(3-1)/4 = 0.5 and (5-1)/4 = 1.
Note: We are decreasing by 1 as the first row in each group will start from value 0.
NTILE
It divides the number of rows in a partition into a specific number of ranked groups (bucket) as equally as possible. It returns a bucket member associated with it.
Query :
SELECT department, salary , NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS ntile FROM emp_dept_tbl
See the result in the below code list and result at the bottom
Here, we have divided the data set into 4 buckets.
Remainder = (total records in each partition)/NTILES
Remainder = 5%4 = 1
So, the extra 1 value (the remainder of 5%4) is allocated to bucket 1, which therefore have one more value than rest of the buckets(2, 3, & 4).
Complete code listing here :
package com.examples
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
/**
* @author : Ram Ghadiyaram
*/
object AnalyticalFunctionTest extends App with Logging {
Logger.getLogger("org").setLevel(Level.WARN)
val spark: SparkSession = SparkSession.builder().master("local[*]").appName(this.getClass.getName)
.config("spark.sql.warehouse.dir", new java.io.File("spark-warehouse").getAbsolutePath)
.master("local[*]")
.getOrCreate()
//spark.sparkContext.getConf.getAll.foreach(println)
import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
"""
|ID,FIRST_NAME,LAST_NAME,DESIGNATION,DEPARTMENT,SALARY
|1001,Ram,Ghadiyaram,Director of Sales,Sales,30000
|1002,Ravi,Rangasamy,Marketing Manager,Sales,25000
|1003,Ramesh, Rangasamy,Assistant Manager,Sales,25000
|1004,Prem,Sure,Account Coordinator,Account,15000
|1005,Phani ,G,Accountant II,Account,20000
|1006,Krishna,G,Account Coordinator,Account,15000
|1007,Rakesh,Krishnamurthy,Assistant Manager,Sales,25000
|1008,Gally,Johnson,Manager,Account,28000
|1009,Richard,Grill,Account Coordinator,Account,12000
|1010,Sofia,Ketty,Sales Coordinator,Sales,20000
|""".stripMargin.lines.toList).toDS()
val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
frame.show()
frame.printSchema()
frame.createOrReplaceTempView("emp_dept_tbl")
println("1) Row number : ")
spark.sql(
"""
|SELECT department, salary, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC)
|AS row_num FROM emp_dept_tbl
|""".stripMargin).show
println("2) Rank :The RANK analytics function is used to assign a rank to the rows based on the column values in OVER clause. The row with equal values assigned the same rank with next rank value skipped. ")
spark.sql(
"""
|SELECT department, salary, RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk FROM emp_dept_tbl
|""".stripMargin
).show
println("3) DENSE_RANK : The DENSE_RANK analytics function in spark-sql/hive used to assign a rank to each row. The rows with equal values receive the same rank and this rank assigned in the sequential order so that no rank values are skipped. ")
spark.sql(
"""
|SELECT department, salary, DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk FROM emp_dept_tbl
|""".stripMargin
).show
println("All 3 Row number, Rank, and Dense Rank together ")
spark.sql(
"""
|SELECT department AS dept, salary AS sal,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,
DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk
FROM emp_dept_tbl
|""".stripMargin
).show
spark.sql(
"""
|SELECT department, salary, RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk FROM emp_dept_tbl
|""".stripMargin
).show
println("CUME_DIST :This function stands for cumulative distribution. It computes the relative position of a column value in a group. ")
spark.sql(
"""
|SELECT department, salary, CUME_DIST() OVER (PARTITION BY department ORDER BY salary DESC) AS cum_dist FROM emp_dept_tbl
|""".stripMargin
).show
println("PERCENT_RANK : It is very similar to the CUME_DIST function. It ranks the row as a percentage. The first row in any dataset has percent_rank 0 and the return value is of the double type.")
spark.sql(
"""
|SELECT department, salary, RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk, PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS perc_rnk FROM emp_dept_tbl
|""".stripMargin
).show
println("NTILE : It divides the number of rows in a partition into a specific number of ranked groups (bucket) as equally as possible. It returns a bucket member associated with it.")
spark.sql(
"""
|SELECT department, salary, NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS ntile FROM emp_dept_tbl
|""".stripMargin
).show
}
Result :
2020-11-29 02:32:19 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +----+----------+-------------+-------------------+----------+------+ | ID|FIRST_NAME| LAST_NAME| DESIGNATION|DEPARTMENT|SALARY| +----+----------+-------------+-------------------+----------+------+ |1001| Ram| Ghadiyaram| Director of Sales| Sales| 30000| |1002| Ravi| Rangasamy| Marketing Manager| Sales| 25000| |1003| Ramesh| Rangasamy| Assistant Manager| Sales| 25000| |1004| Prem| Sure|Account Coordinator| Account| 15000| |1005| Phani | G| Accountant II| Account| 20000| |1006| Krishna| G|Account Coordinator| Account| 15000| |1007| Rakesh|Krishnamurthy| Assistant Manager| Sales| 25000| |1008| Gally| Johnson| Manager| Account| 28000| |1009| Richard| Grill|Account Coordinator| Account| 12000| |1010| Sofia| Ketty| Sales Coordinator| Sales| 20000| +----+----------+-------------+-------------------+----------+------+ root |-- ID: integer (nullable = true) |-- FIRST_NAME: string (nullable = true) |-- LAST_NAME: string (nullable = true) |-- DESIGNATION: string (nullable = true) |-- DEPARTMENT: string (nullable = true) |-- SALARY: integer (nullable = true) 1) Row number : +----------+------+-------+ |department|salary|row_num| +----------+------+-------+ | Sales| 30000| 1| | Sales| 25000| 2| | Sales| 25000| 3| | Sales| 25000| 4| | Sales| 20000| 5| | Account| 28000| 1| | Account| 20000| 2| | Account| 15000| 3| | Account| 15000| 4| | Account| 12000| 5| +----------+------+-------+ 2) Rank :The RANK analytics function is used to assign a rank to the rows based on the column values in OVER clause. The row with equal values assigned the same rank with next rank value skipped. +----------+------+---+ |department|salary|rnk| +----------+------+---+ | Sales| 30000| 1| | Sales| 25000| 2| | Sales| 25000| 2| | Sales| 25000| 2| | Sales| 20000| 5| | Account| 28000| 1| | Account| 20000| 2| | Account| 15000| 3| | Account| 15000| 3| | Account| 12000| 5| +----------+------+---+ 3) DENSE_RANK : The DENSE_RANK analytics function in spark-sql/hive used to assign a rank to each row. The rows with equal values receive the same rank and this rank assigned in the sequential order so that no rank values are skipped. +----------+------+-------+ |department|salary|dns_rnk| +----------+------+-------+ | Sales| 30000| 1| | Sales| 25000| 2| | Sales| 25000| 2| | Sales| 25000| 2| | Sales| 20000| 3| | Account| 28000| 1| | Account| 20000| 2| | Account| 15000| 3| | Account| 15000| 3| | Account| 12000| 4| +----------+------+-------+ All 3 Row number, Rank, and Dense Rank together +-------+-----+-------+---+-------+ | dept| sal|row_num|rnk|dns_rnk| +-------+-----+-------+---+-------+ | Sales|30000| 1| 1| 1| | Sales|25000| 2| 2| 2| | Sales|25000| 3| 2| 2| | Sales|25000| 4| 2| 2| | Sales|20000| 5| 5| 3| |Account|28000| 1| 1| 1| |Account|20000| 2| 2| 2| |Account|15000| 3| 3| 3| |Account|15000| 4| 3| 3| |Account|12000| 5| 5| 4| +-------+-----+-------+---+-------+ +----------+------+---+ |department|salary|rnk| +----------+------+---+ | Sales| 30000| 1| | Sales| 25000| 2| | Sales| 25000| 2| | Sales| 25000| 2| | Sales| 20000| 5| | Account| 28000| 1| | Account| 20000| 2| | Account| 15000| 3| | Account| 15000| 3| | Account| 12000| 5| +----------+------+---+ CUME_DIST :This function stands for cumulative distribution. It computes the relative position of a column value in a group. +----------+------+--------+ |department|salary|cum_dist| +----------+------+--------+ | Sales| 30000| 0.2| | Sales| 25000| 0.8| | Sales| 25000| 0.8| | Sales| 25000| 0.8| | Sales| 20000| 1.0| | Account| 28000| 0.2| | Account| 20000| 0.4| | Account| 15000| 0.8| | Account| 15000| 0.8| | Account| 12000| 1.0| +----------+------+--------+ PERCENT_RANK : It is very similar to the CUME_DIST function. It ranks the row as a percentage. The first row in any dataset has percent_rank 0 and the return value is of the double type. +----------+------+---+--------+ |department|salary|rnk|perc_rnk| +----------+------+---+--------+ | Sales| 30000| 1| 0.0| | Sales| 25000| 2| 0.25| | Sales| 25000| 2| 0.25| | Sales| 25000| 2| 0.25| | Sales| 20000| 5| 1.0| | Account| 28000| 1| 0.0| | Account| 20000| 2| 0.25| | Account| 15000| 3| 0.5| | Account| 15000| 3| 0.5| | Account| 12000| 5| 1.0| +----------+------+---+--------+ NTILE : It divides the number of rows in a partition into a specific number of ranked groups (bucket) as equally as possible. It returns a bucket member associated with it. +----------+------+-----+ |department|salary|ntile| +----------+------+-----+ | Sales| 30000| 1| | Sales| 25000| 1| | Sales| 25000| 2| | Sales| 25000| 3| | Sales| 20000| 4| | Account| 28000| 1| | Account| 20000| 1| | Account| 15000| 2| | Account| 15000| 3| | Account| 12000| 4| +----------+------+-----+
Note : In the case of cume_dist() Here, In Sales department if we take salary 25000 , the cumulative distributions is 0.8 that means 80% employee’s salaries are less than or equal to 25000 Similarly in Account department , if we take salary 20000 , the cumulative distributions is 0.4 that means 40% employee’s salaries are less than or equal to 20000.
Happy learning