Schema inference and evolution in Auto Loader
---------------------------------------------
----------------------------------------------
Schema inference and evolution in Auto Loader
----------------------------------------------
----------------------------------------------
JSON format support is available in Databricks Runtime 8.2 and above;
CSV format support is available in Databricks Runtime 8.3 and above;
Avro format support is available in Databricks Runtime 10.2 and above
Auto Loader Automatically :
-Detects the new columns in the data
-Restarts so you don’t have to manage the tracking and handling of schema changes yourself.
How does the inference happen ?
Schema inference Discovery Method:
-Auto Loader samples the first 50 GB or 1000 files , whichever limit is crossed first.
Avoid Inference cost for batch streams and for stability:
Set the option cloudFiles.schemaLocation
A hidden directory _schemas is created at this location to track schema changes to the input data over the time
Single Source and Single Auto Loader source:
Provide the checkpoint location as cloudFiles.schemaLocation.
Otherwise, provide a unique directory for this option.
Auto Loader infers columns in text-based file formats like CSV and JSON as string columns. This avoids datatype conflict
Retain the original Spark schema inference ? If yes, then set the option cloudFiles.inferColumnTypes to true.
Take care of case sensitivity for schema inference by using schema hints.
What is schema hints?
Spark infers nested Json as Structs, and integers as longs while Auto Loader infers it as String
.option("cloudFiles.schemaHints", "date DATE, merchant_info.establishmentDate DATE, reward_options MAP<STRING,STRING>, time TIMESTAMP")
Inferred schema
|-- date: string
|-- quantity: int
|-- merchant_info: struct
| |-- id: string
| |-- name: string
| |-- establishmentDate: string
|-- reward_options: struct
| |-- delivery_address: string
Auto loader Inference
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- establishmentDate: string -> date
|-- rewar_options: struct -> map<string,string>
|-- time: timestamp
Schema Evolution:
If a column is not present at the start of the stream, you can also use schema hints to add that column to the inferred schema.
Auto Loader performs schema inference on the latest micro-batch of data
schema location is updated with the latest schema.
New columns are merged to the end of the schema.
The data types of existing columns remain unchanged.
Set your Auto Loader stream within a Databricks job for your stream to restart automatically after schema changes.
Auto Loader supports the following modes for schema evolution, which you set in the option cloudFiles.schemaEvolutionMode:
addNewColumns:
The streaming job fails with an UnknownFieldException.
New columns added to schema
No evolution of data types
failOnNewColumns:
Schema needs to be updated for the job to run
rescue:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
spark.readStream.format("cloudFiles").option("cloudFiles.rescuedDataColumn", "_rescued_data").option("cloudFiles.format", <format>).schema(<schema>).load(<path>).
Stream doesn't fail on schema changes
Stream runs with default schema( inferred schema at start)
Any data type changes or new columns that are added are present in the rescued data column that is automatically added to your stream’s schema as _rescued_data.
None:
The default mode when a schema is provided.
Does not evolve the schema, ignore new new columns.
Partition columns are not considered for schema evolution.
Case:
When cloudFiles.inferColumnTypes is false, and cloudFiles.schemaEvolutionMode is addNewColumns :
The schema is being inferred
RescuedDataColumn captures only columns that have changed
Parsers support three modes when parsing records:
PERMISSIVE, DROPMALFORMED, and FAILFAST.
Corrupt records— incomplete or malformed JSON or CSV—are dropped or throw errors
Datatype mismatch is still considered as legitimate files and hence will not break the pipeline.
Please feel free to add anything that i missed.