Spark UDAF with window function & Groupby
Apache Spark has become de-facto framework for big data processing . Spark has great library support for UDF (user defined function) and UDAF(user defined aggregate function).
Recently got chance to work on Spark UDAF. Let's say below is the input dataframe ( I am using Spark 3.0 for this UDAF development )
+--------+---------+-------+
|emp_name|dept_name|dept_id|
+--------+---------+-------+
| deepak| IT_1| 100|
| deepak| IT_2| 100|
| pankaj|Banking_1| 200|
| pankaj|Banking_2| 200|
+--------+---------+-------+
And we need something like below as output (some sort of custom aggregate ), where we want to concat two string column and perform some aggregation to collect name & dept as pair list with '::' as separator like below :-
+--------+---------+-------+--------------------------------------------+
|emp_name|dept_name|dept_id|names_list |
+--------+---------+-------+--------------------------------------------+
|pankaj |Banking_1|200 |{[pankaj :: Banking_1, pankaj :: Banking_2]}|
|pankaj |Banking_2|200 |{[pankaj :: Banking_1, pankaj :: Banking_2]}|
|deepak |IT_1 |100 |{[deepak :: IT_1, deepak :: IT_2]} |
|deepak |IT_2 |100 |{[deepak :: IT_1, deepak :: IT_2]} |
+--------+---------+-------+--------------------------------------------+
To get above we are going to write Spark UDAF in Java . First we going to create 3 classes
ListAggrUDAF.java : Main UDAF class
ListAggrBuffer.java : Class to hold grouped results.
ListAggrContainer.java : Final output object
Below is the main UDAF class, which perform aggr on the grouped record.
public class ListAggrUDAF extends Aggregator<String, ListAggrBuffer, ListAggrContainer>
private ListAggrContainer aggrOutput;
@Override
public ListAggrBuffer zero() {
aggrOutput = new ListAggrContainer();
return new ListAggrBuffer();
}
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
@Override
public ListAggrBuffer reduce(ListAggrBuffer buffer, String nextRecord) {
String[] s = nextRecord.split("@@");
String name = s[0];
String dept = s[1];
if (!buffer.getDataList().contains(name)) {
String val = String.join(" :: ", name, dept);
buffer.getDataList().add(val);
}
return buffer;
}
// Merge two intermediate values
@Override
public ListAggrBuffer merge(ListAggrBuffer left, ListAggrBuffer right) {
right.getDataList().forEach(i -> {
if (!left.getDataList().contains(i)) {
left.getDataList().add(i);
}
});
return left;
}
// Transform the output of the reduction
@Override
public ListAggrContainer finish(ListAggrBuffer reduction) {
List<String> data = new ArrayList<>();
reduction.getDataList().forEach(i -> {
data.add(i);
});
aggrOutput.setNames(data);
return aggrOutput;
}
// Specifies the Encoder for the intermediate value type
@Override
public Encoder<ListAggrBuffer> bufferEncoder() {
return Encoders.javaSerialization(ListAggrBuffer.class);
}
// Specifies the Encoder for the final output value type
@Override
public Encoder<ListAggrContainer> outputEncoder() {
return Encoders.bean(ListAggrContainer.class);
}
}
ListBuffer class will hold intermediate data
public class ListAggrBuffer implements Serializable
private List<String> dataList;
public ListAggrBuffer() {
this.dataList = new ArrayList<>();
}
public List<String> getDataList() {
return dataList;
}
public void setDataList(List<String> dataList) {
this.dataList = dataList;
}
}
Finally ListAggreContainer is our output object of the UDAF
Recommended by LinkedIn
public class ListAggrContainer implements Serializable
private List<String> names;
public List<String> getNames() {
return names;
}
public void setNames(List<String> names) {
this.names = names;
}
}
After above classes ready to use , below is how we can use in our Spark driver Object
object DeepakSparkApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Deepak_Spark_UDAF_APP")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.udf.register("my_aggregator", functions.udaf(new ListAggrUDAF()))
val df5 = Seq(
("deepak", "IT_1", "100"),
("deepak", "IT_2", "100"),
("pankaj", "Banking_1", "200"),
("pankaj", "Banking_2", "200")
).toDF("emp_name", "dept_name", "dept_id")
df5.createOrReplaceTempView("emp_ds")
println("===Raw DF output =====")
df5.show()
//use with window operation
val windowFunResDF = spark.sql("SELECT *, my_aggregator(concat_ws('@@',emp_name,dept_name)) over (partition by dept_id order by dept_id) as names_list FROM emp_ds")
//use with group by operation
val groupByResDF = spark.sql("SELECT my_aggregator(concat_ws('@@',emp_name,dept_name)) name_list FROM emp_ds group by dept_id")
println("=== windowFunResDF Output =====")
windowFunResDF.show(false)
println("=== groupByResDF Output =====")
groupByResDF.show(false)
groupByResDF.createOrReplaceTempView("group_res")
}
}
And the output will look like
===Raw DF output ====
+--------+---------+-------+
|emp_name|dept_name|dept_id|
+--------+---------+-------+
| deepak| IT_1| 100|
| deepak| IT_2| 100|
| pankaj|Banking_1| 200|
| pankaj|Banking_2| 200|
+--------+---------+-------+
=== windowFunResDF Output =====
+--------+---------+-------+--------------------------------------------+
|emp_name|dept_name|dept_id|names_list |
+--------+---------+-------+--------------------------------------------+
|pankaj |Banking_1|200 |{[pankaj :: Banking_1, pankaj :: Banking_2]}|
|pankaj |Banking_2|200 |{[pankaj :: Banking_1, pankaj :: Banking_2]}|
|deepak |IT_1 |100 |{[deepak :: IT_1, deepak :: IT_2]} |
|deepak |IT_2 |100 |{[deepak :: IT_1, deepak :: IT_2]} |
+--------+---------+-------+--------------------------------------------+
=== groupByResDF Output =====
+------------------------------------------------+
|name_list |
+------------------------------------------------+
|{[pankaj :: Banking_1, pankaj :: Banking_2]} |
|{[deepak :: IT_1, deepak :: IT_2]} |
+------------------------------------------------+
Moreover if we need to perform explode on the list, all we need is
spark.sql("select explode(name_list.names) name_dept_pair from group_res").show(false)
//output will be
+-------------------+
|name_dept_pair |
+-------------------+
|pankaj :: Banking_1|
|pankaj :: Banking_2|
|deepak :: IT_1 |
|deepak :: IT_2 |
+-------------------+
Summary :
We learned how to create custom UDAF in Spark 3.0 in Java, and use with group by and window operation.
More details on UDAF can be found
https://spark.apache.org/docs/3.2.1/sql-ref-functions-udf-aggregate.html
Happy Learning :)
Nice explaination
Very informative. Great work, Deepak.
Nice post Deepak!! Please continue sharing your insights with us 👍
Expressing your view and sharing with other as Knowledge! is an art....Thanks Deepak.