Window Functions : Spark Sql
Why do we need Window Functions ?
There was no way to both operate on a group of rows while still returning single value for every input row.
And because of this, It was hard to perform various Data Processing tasks such as :
1. Calculating a Moving Average
2. Calculating a Cumulative Sum
3. Accessing the values of a row appearing before the current row.
So, What are Window Functions ?
At its core, A Window Function calculates a return value for every input row of a table based on a group of rows, called the Frame.
- Every Input row can have a unique frame associated with it.
Spark Sql supports 3 types of Window Functions :
- Ranking Function
- Analytic Function
- Aggregate Function
How to implement this in Spark programs….
Before moving to coding session, we need to understand first a very important concept : window Specification
Window Specification : It defines which rows are included in the frame associated with a given input row.
To implement window specification, we need to implement 3 things,
1. Partitioning Specification : It controls which rows will be in the same position with the given row.
2. Ordering Specification : Controls ordering in 2 ways Ascending or Descending.
3. Frame Specification : It tells which rows will be taken in the frame for the current input row, based on their relative position to the current row.
Let’s understand this amazing concept practically.
Suppose we have some data named as ProductPrices
Use Cases:
1. What are most expensive and second most expensive products in every category.
2. What is the difference between price of each product and price of most expensive product in the same category of that product.
To implement these use cases we need to use Window Function obviously, that’s why we are here.
So we need to import
import org.apache.spark.sql.expressions.Window
Use Case 1:
To find most expensive product we need to order these in the descending order of Price_per_10_Kg. And as we need to find expensive products per category so we need to partition the data also.
Let’s read the data and make Data Frame out of it.
val dataset = Seq(
("Orange","Fruit",400),
("Apple","Fruit",800),
("Almond","Dry Fruit",5000),
("Okra","Vegetable",350),
("Tomato","Vegetable",900),
("Cashew","Dry Fruit",7000),
("Pomegranate","Fruit",650),
("Banana","Fruit",300),
("Dry figs","Dry Fruit",6800)).toDF("Product","Category","Price_per_10_Kg")
Now we will create window specification as
val overCategory = Window.partitionBy('Category).orderBy('Price_per_10_Kg.desc)
As I described above , In window Specification we have defined Partitioning and Ordering Specification.
Now we will use this overCategory Window Specification for ranking using dense_rank() function.
val ranked = dataset.withColumn("Rank", dense_rank.over(overCategory))
Hurrraaayyy… It’s done and now we can filter data for Most expensive and second most expensive products using where clause.
ranked.where('rank <=2).show
Use Case 2:
This use-case is very simple and will be done in just 3 line of code.
As in this we need to calculate Price difference per category.
We will create window specification so that we can partition the data and get in ordered by Price descending.
Let’s have a look at screenshot of full code, I am not attaching code here so that you people can try at your home in reality not just go through it.
I hope it will clear your doubts about Windowing Functions in Spark-Sql and will help in implementing something valuable.
Please leave your thoughts in comment section.
Happy Spark Coding… :)
#spark #sql #hadoop #BigData #Windowing #function #tutorial #article
Good Content