Senior Data Engineer Interview: Right To Be Forgotten In Practice
Interviewer: Let's say you were responsible for designing the data warehouse and associated pipelines. How would you support the right to be forgotten while minimizing costs?
Candidate: The client goes to their account/settings page. From there they click a "Delete All My Data" button. The latter sends an API request to the backend, which in turn executes a DELETE query against the OLTP database. The delete event is forwarded to a message broker (e.g. Kafka) topic using a system that monitors the database transaction log (e.g. Debezium) or directly from application code using a client side SDK.
Let's suppose that our data lake consisted of Cloud storage and an open table format such as Delta or Iceberg. A streaming job (e.g. Spark), would subscribe to the topic and delete the records from the data lake using SQL.
For example:
DELETE FROM bronze WHERE user_id = 'xyz'
Interviewer: Let's suppose that you needed to delete something from a table being read by another job. How would you handle that case?
Candidate: If you try to modify the files in the middle of someone else's job, it will fail. However, with open table formats like Delta, they will continue to read a snapshot of the data for the duration of their query independent of other writes that are happening.
Interviewer: Chances are, if we're following anything like the Medallion architecture, there will be other tables derived from the raw tables. Therefore, we cannot simply delete the user's information from the raw table. We must delete it from all the derived tables as well. How would you accomplish this?
Candidate: We can achieve this by enabling Change Data Feed on our tables and then starting streaming jobs that listen for delete events.
For example:
deleteDF = (spark.readStream
.format('delta')
.option('readChangeFeed', 'true')
.option('startingVersion', start_version)
.table('bronze'))
def process_deletes(microBatchDF, batchId):
(microBatchDF
.filter("_change_type = 'delete'")
.createOrReplaceTempView("deletes"))
microbatchDF._jdf.sparkSession().sql("""
MERGE INTO silver u
USING deletes d
ON u.user_id = d.user_id
WHEN MATCHED
THEN DELETE
""")
query = (deletesDF.writeStream
.foreachBatch(process_deletes)
.outputMode('update')
.option('checkpointLocation', checkpoint_location)
.start())
query.awaitTermination()
Interviewer: When you execute a delete operation using an open table format like Delta or Iceberg, it actually creates a new Parquet file and points to that instead. As a result, deleted values are still present in older versions of the data and can be viewed using the time travel feature.
Recommended by LinkedIn
For example:
We can run this query to get the records deleted from the version 0 of the silver table.
SELECT * FROM silver@v0 u1
EXCEPT
SELECT * FROM silver u2
How would you comply with right to be forgotten requests?
Candidate: We'd need to run the vacuum command to permanently delete the data from the table.
For example:
VACUUM events RETAIN 0 hours
Interviewer: The problem with running the vacuum command like this is that we have limited our ability to retain the audit trail of the data transformations that were performed on our table. Is there a way we could achieve both simultaneously?
Candidate: We can make use of Pseudonymization. That is, we encrypt the data using a symmetrical key. Then, when we receive a right to be forgotten request, we simply delete the key that can be used to decrypt the data.
For example:
def encrypt_col(col_val):
kmsclient = boto3.client('kms', aws_access_key_id=awsAccessKeyId, aws_secret_access_key=awsSecretKey, region_name=awsRegion)
ciphertext = kmsclient.encrypt(KeyId=kmskeyid, Plaintext=str(col_val))
binary_encrypted = ciphertext[u'CiphertextBlob']
encrypted_col = base64.b64encode(binary_encrypted)
return encrypted_col.decode()
encrypt_udf = udf(encrypt_col, StringType())
def encrypt_df_col(df, piicol):
for col in df.columns:
if col in piicol:
df = df.withColumn('encrypted_' + col, encrypted_udf(df[col])).drop(col).withColumnRenamed('encrypted_' + col, col)
return df
silverEmployeeEncryptedDF = encrypt_df_col(employeeBronzeDF, get_piicol_list())
silverEmployeeEncryptedDF.write.format('delta').mode('overwrite').save('dbfs:/...')
def decrypt_col(col_val):
kmsclient = boto3.client('kms', aws_access_key_id=awsAccessKeyId, aws_secret_access_key=awsSecretKey, region_name=awsRegion)
plaintext = kmsclient.decrypt(CiphertextBlob=bytes(base64.b64decode(col_val)))
return plaintext['Plaintext'].decode()
decrypt_udf = udf(decrypt_col, StringType())
def decrypt_col(df, piicol):
for col in df.columns:
if col in piicol:
df = df.withColumn(col, decrypt_udf(df[col]))
return df
silverEmployeeDecryptedDF = decrypt_df_col(silverEmployeeEncryptedDF, get_piicol_list())