Effective use of condition in Delta table Merge command using PySpark

Effective use of condition in Delta table Merge command using PySpark

There was a scenario where I had to do incremental load for some big wide Delta tables in Databricks using Pyspark. I had to chose Merge() command to accomplish this and it was fairly straight forward to do the Upsert (Insert & Update) operation on the big Delta tables. The problem was that I do not want to do the update operation for the matching records when there was no changes in any of the columns or need to find which of the columns had changes in order to do the update operation.

The PySpark Merge() command from official documentation:

deltaTable = ... # DeltaTable with schema (key, value)


# DataFrame with changes having following columns
# - key: key of the change
# - time: time of change for ordering between changes (can replaced by other ordering id)
# - newValue: updated or inserted value if key was not deleted
# - deleted: true if the key was deleted, false if the key was inserted or updated
changesDF = spark.table("changes")


# Find the latest change for each key based on the timestamp
# Note: For nested structs, max on struct is computed as
# max on first struct field, if equal fall back to second fields, and so on.
latestChangeForEachKey = changesDF \
  .selectExpr("key", "struct(time, newValue, deleted) as otherCols") \
  .groupBy("key") \
  .agg(max("otherCols").alias("latest")) \
  .select("key", "latest.*") \


deltaTable.alias("t").merge(
    latestChangeForEachKey.alias("s"),
    "s.key = t.key") \
  .whenMatchedDelete(condition = "s.deleted = true") \
  .whenMatchedUpdate(set = {
    "key": "s.key",
    "value": "s.newValue"
  }) \
  .whenNotMatchedInsert(
    condition = "s.deleted = false",
    values = {
      "key": "s.key",
      "value": "s.newValue"
    }
  ).execute()        

The challenge here is that for wide tables (more than 30 columns) I had to find out which of those columns had changes in them by manually comparing (hard-coding the column names) them between 'Source' & 'Target. Instead, I wanted it to be dynamic based on the table schema and avoid hard-coding the column names. I couldn't find a solution for this in online. So I did some research on this to build a condition dynamically and compare them using hash algorithm after concatenating all columns .

I will now explain how did I do this. Start with importing required python libraries and get the source data as PySpark dataframe, which will contain only the incremental data (inserts & updates). I will mainly concentrate on the updates for the matching records.

import pyspark.sql.functions as f
from pyspark.sql.types import *
from delta.tables import *


dfs = spark.sql("select * from landing.source")        

Next step is to get the dynamic condition that needs to be passed for the 'WhenMatchedUpdate' clause. For this we need to get the column list from the schema of the 'Target' table and then concatenate them once for 'Source' and once for 'Target' using the 'For' loop. You can also do this separately from the respective 'Source' table and 'Target' table. After concatenating them just apply 'md5' hash function for both source & target and use not equal operator ('<>' or '!=') as show below:

out_df = spark.sql("Select * from default.target")
src_join_condition=""
trg_join_condition=""
for i in out_df.columns:
    src_condition = "SOURCE."+i
    src_join_condition+= (src_condition+" || ")

    trg_condition = "TARGET."+i
    trg_join_condition+= (trg_condition+" || ")

src_join_condition = src_join_condition[:-4]
trg_join_condition = trg_join_condition[:-4]

match_cond = "md5("+src_join_condition+") <> md5("+trg_join_condition+")"
print(match_cond)        

The reason for using md5 hash function after concatenating the columns is that it should be more efficient and performant over comparing each columns individually, besides the code will look clean aesthetically.

Next comes the final step where we will be using this condition in the Merge() command while performing the Upsert. Here the 'SOURCE' refers to the source data coming in as dataframe, while the 'TARGET' refers to the target delta table. Please do note that I am using 'whenMatchedUpdateAll()' to update all the columns for that row and similarly 'whenNotMatchedInsertAll()' for inserting the new rows in to the target delta table. The dynamic condition used in the update clause will make sure that the target delta table rows will only be updated only when there is a change in any of the columns for that matching row else it will just be skipped even when the business keys are matching.

resp = out.alias("TARGET").merge(
source = dfs.alias("SOURCE"),
condition = "SOURCE.key = TARGET.key and SOURCE.id = TARGET.id"
).whenMatchedUpdateAll(
  condition = match_cond ## this is built dynamically in the above code
) \
.whenNotMatchedInsertAll() \
.execute()

print(resp)        

Ta! Da!... we have accomplished building a simple & efficient solution for a small use case with a few lines of code. Please do let me know if there are other better ways of doing this. But I felt like sharing this with others who might need this for such a use case.

Side Note: The same can be done for join condition of the 'Source' & 'Target' business keys in the Merge() command. For this we need to define what are the business keys in a config table or file.

condition = "SOURCE.key = TARGET.key and SOURCE.id = TARGET.id"        

Finally, I hope this will be helpful to some one!!!

Your approach is very interesting, I've been working with a similar scenario. Something I've done differently is to persist the hash field in the target table and create an index for it, this way we have a gain in the seek of the comparison

Thanks This is very insightful

Like
Reply

Thanks for posting! I found this useful.

To view or add a comment, sign in

More articles by Ashok Kumar Manoharan

Others also viewed

Explore content categories