Spark UDAF with window function & Groupby

Spark UDAF with window function & Groupby

Apache Spark has become de-facto framework for big data processing . Spark has great library support for UDF (user defined function) and UDAF(user defined aggregate function).

Recently got chance to work on Spark UDAF. Let's say below is the input dataframe ( I am using Spark 3.0 for this UDAF development )

+--------+---------+-------+
|emp_name|dept_name|dept_id|
+--------+---------+-------+
|  deepak|     IT_1|    100|
|  deepak|     IT_2|    100|
|  pankaj|Banking_1|    200|
|  pankaj|Banking_2|    200|
+--------+---------+-------+        

And we need something like below as output (some sort of custom aggregate ), where we want to concat two string column and perform some aggregation to collect name & dept as pair list with '::' as separator like below :-

+--------+---------+-------+--------------------------------------------+
|emp_name|dept_name|dept_id|names_list                                  |
+--------+---------+-------+--------------------------------------------+
|pankaj  |Banking_1|200    |{[pankaj :: Banking_1, pankaj :: Banking_2]}|
|pankaj  |Banking_2|200    |{[pankaj :: Banking_1, pankaj :: Banking_2]}|
|deepak  |IT_1     |100    |{[deepak :: IT_1, deepak :: IT_2]}          |
|deepak  |IT_2     |100    |{[deepak :: IT_1, deepak :: IT_2]}          |
+--------+---------+-------+--------------------------------------------+        

To get above we are going to write Spark UDAF in Java . First we going to create 3 classes

ListAggrUDAF.java : Main UDAF class
ListAggrBuffer.java : Class to hold grouped results.
ListAggrContainer.java : Final output object        

Below is the main UDAF class, which perform aggr on the grouped record.

public class ListAggrUDAF extends Aggregator<String, ListAggrBuffer, ListAggrContainer> 
    private ListAggrContainer aggrOutput;
    @Override
    public ListAggrBuffer zero() {
        aggrOutput = new ListAggrContainer();
        return new ListAggrBuffer();
    }
    // Combine two values to produce a new value. For performance, the function may modify `buffer`
    // and return it instead of constructing a new object
    @Override
    public ListAggrBuffer reduce(ListAggrBuffer buffer, String nextRecord)   {
        String[] s = nextRecord.split("@@");
        String name = s[0];
        String dept = s[1];
        if (!buffer.getDataList().contains(name)) {
            String val = String.join(" :: ", name, dept);
            buffer.getDataList().add(val);
        }
        return buffer;
    }
    // Merge two intermediate values
    @Override
    public ListAggrBuffer merge(ListAggrBuffer left, ListAggrBuffer right) {
        right.getDataList().forEach(i -> {
            if (!left.getDataList().contains(i)) {
                left.getDataList().add(i);
            }
        });
        return left;
    }
    // Transform the output of the reduction
    @Override
    public ListAggrContainer finish(ListAggrBuffer reduction) {
        List<String> data = new ArrayList<>();
        reduction.getDataList().forEach(i -> {
            data.add(i);
        });
        aggrOutput.setNames(data);
        return aggrOutput;
    }
    // Specifies the Encoder for the intermediate value type
    @Override
    public Encoder<ListAggrBuffer> bufferEncoder() {
        return Encoders.javaSerialization(ListAggrBuffer.class);
    }
    // Specifies the Encoder for the final output value type
    @Override
    public Encoder<ListAggrContainer> outputEncoder() {
        return Encoders.bean(ListAggrContainer.class);
    }
}        

ListBuffer class will hold intermediate data

public class ListAggrBuffer implements Serializable 
    private List<String> dataList;
    public ListAggrBuffer() {
        this.dataList = new ArrayList<>();
    }
    public List<String> getDataList() {
        return dataList;
    }
    public void setDataList(List<String> dataList) {
        this.dataList = dataList;
    }
}        

Finally ListAggreContainer is our output object of the UDAF

public class ListAggrContainer implements Serializable 

    private List<String> names;

    public List<String> getNames() {
        return names;
    }

    public void setNames(List<String> names) {
        this.names = names;
    }
}        

After above classes ready to use , below is how we can use in our Spark driver Object

object DeepakSparkApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("Deepak_Spark_UDAF_APP")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._
    spark.udf.register("my_aggregator", functions.udaf(new ListAggrUDAF()))

    val df5 = Seq(
      ("deepak", "IT_1", "100"),
      ("deepak", "IT_2", "100"),
      ("pankaj", "Banking_1", "200"),
      ("pankaj", "Banking_2", "200")
    ).toDF("emp_name", "dept_name", "dept_id")

    df5.createOrReplaceTempView("emp_ds")

    println("===Raw DF output =====")
    df5.show()
    
    //use with window operation 
    val windowFunResDF = spark.sql("SELECT *, my_aggregator(concat_ws('@@',emp_name,dept_name)) over (partition by dept_id order by dept_id) as names_list FROM emp_ds")

    //use with group by operation 
    val groupByResDF = spark.sql("SELECT my_aggregator(concat_ws('@@',emp_name,dept_name)) name_list FROM emp_ds group by dept_id")
 
    println("=== windowFunResDF Output =====")
    windowFunResDF.show(false)
 
   println("=== groupByResDF Output =====")
    groupByResDF.show(false)

   groupByResDF.createOrReplaceTempView("group_res")
  }
}
        

And the output will look like

===Raw DF output ====
+--------+---------+-------+
|emp_name|dept_name|dept_id|
+--------+---------+-------+
|  deepak|     IT_1|    100|
|  deepak|     IT_2|    100|
|  pankaj|Banking_1|    200|
|  pankaj|Banking_2|    200|
+--------+---------+-------+


=== windowFunResDF Output =====
+--------+---------+-------+--------------------------------------------+
|emp_name|dept_name|dept_id|names_list                                  |
+--------+---------+-------+--------------------------------------------+
|pankaj  |Banking_1|200    |{[pankaj :: Banking_1, pankaj :: Banking_2]}|
|pankaj  |Banking_2|200    |{[pankaj :: Banking_1, pankaj :: Banking_2]}|
|deepak  |IT_1     |100    |{[deepak :: IT_1, deepak :: IT_2]}          |
|deepak  |IT_2     |100    |{[deepak :: IT_1, deepak :: IT_2]}          |
+--------+---------+-------+--------------------------------------------+


=== groupByResDF Output =====
+------------------------------------------------+
|name_list                                       |
+------------------------------------------------+
|{[pankaj :: Banking_1, pankaj :: Banking_2]}    |
|{[deepak :: IT_1, deepak :: IT_2]}              |
+------------------------------------------------+
        

Moreover if we need to perform explode on the list, all we need is

spark.sql("select explode(name_list.names) name_dept_pair from group_res").show(false)

//output will be 
+-------------------+
|name_dept_pair     |
+-------------------+
|pankaj :: Banking_1|
|pankaj :: Banking_2|
|deepak :: IT_1     |
|deepak :: IT_2     |
+-------------------+
        

Summary :

We learned how to create custom UDAF in Spark 3.0 in Java, and use with group by and window operation.

More details on UDAF can be found

https://spark.apache.org/docs/3.2.1/sql-ref-functions-udf-aggregate.html

Happy Learning :)

  • Deepak Dabi

Very informative. Great work, Deepak.

Nice post Deepak!! Please continue sharing your insights with us 👍

Expressing your view and sharing with other as Knowledge! is an art....Thanks Deepak.

To view or add a comment, sign in

More articles by Deepak Dabi

  • Gradle Java plugin to publish artifact in AWS S3

    Gradle is a great open-source build automation tool, which most of java developers prefer over maven due to its high…

    2 Comments
  • AWS Aurora Postgres and IAM Auth.

    Hi Folks ! Recently worked on AWS Aurora Postgres & IAM Authentication. This reduce one substantial action item of…

    1 Comment
  • Writing Avro From Spark to Kafka

    Hi All, Writing data from spark to any target is pretty standard, but when it comes to writing Avro object to Kafka;…

    2 Comments
  • Live stream process with history data -Kafka & spark streaming

    I was recently given an exercise to write an end to end flow where live events flow from Kafka as json format, which…

  • Cassandra As File Chunk Store

    Hi Guys, Cassandra is cool NoSQL DB and recently been getting traction due to its CQL (Cousin of SQL), probably could…

    5 Comments
  • HBase & Solr - Near Real time indexing and search

    For Solr - How beautiful an open-source cloud be :), Cheers to Team Solr for there good work. Now the use case is…

    8 Comments
  • HBase & Spark - Transformation+Aggregation of JSON.

    I came across a use case where the processing is a bit messy when data is stored in a json format into HBase; and you…

    8 Comments

Others also viewed

Explore content categories