Spark Trigger Options
Objective
In this article let’s see different trigger options available in spark structured streaming. Setting the right trigger for a stream will decide how quick your stream reads the next set of records. Trigger option enables you to control latency and throughput of the job.
Default Behavior
When you don’t set any trigger option to write stream then Spark will try to process the next set of records as soon as it’s done with current micro batch. The default behavior of write streams in spark structured streaming is the micro batch. In micro batch, the incoming records are grouped into small windows and processed in a periodic fashion.
Bottom line: Spark will try running next micro batch As Soon As Possible (ASAP).
Below diagram explains sequence of a micro-batch.
- Driver updates Write Ahead Log (WAL) with unprocessed offset.
- Micro-batch job to process the records is created.
- Input is read and processed.
- Output of micro-batch is persisted in the target.
Refer code below to create write stream without any trigger. Refer complete sample job here.
val defaultStream = rateRawData.writeStream
.format("console")
.queryName("Default")
.option("checkpointLocation", "sparkCheckPoint\\Rate2ConsoleDefaultTrigger\\cp1")
.start()
Below is a screenshot of spark UI processing 1000 records per second without any trigger option set. As shown below spark tries to process ASAP almost on an average every second.
Below is a screenshot of spark UI processing 90,000 records per second without any trigger option set. As you can see micro batches are triggered at uneven intervals.
Once
When you set Trigger option “Once” it processes only once and then terminates the stream. The behavior of this option is very similar to a batch job. The stream is created once and all the pending records are processed and then the stream is terminated.
Refer code below to create write stream with “Once” trigger. Refer complete sample job here.
val onceStream = rateRawData.writeStream
.format("console")
.queryName("Once")
.trigger(Trigger.Once())
.option("checkpointLocation", "sparkCheckPoint\\Rate2ConsoleOnceTrigger\\cp1")
.start()
Processing time
This is the most widely used and recommended practice in Spark structured streaming. Trigger option of processing time gives you better control of how often micro batch jobs should get triggered. For example, if you set a trigger interval of “20 seconds” then for every 20 seconds a micro-batch will process a batch of records. Based on the speed at which you want your job to process the records you can fine-tune the trigger interval and throughput of the job as well.
In the case of micro-batch processing, duration exceeds the interval defined, it will start the next micro batch ASAP. For example, if the defined interval is “20 seconds” but processing takes “30 seconds” then next batch will start ASAP.
Refer code below to create write stream with “Processing time” trigger. Refer complete sample job here.
val processingTimeStream = rateRawData.writeStream
.format("console")
.queryName("Micro Batch")
.trigger(Trigger.ProcessingTime("20 seconds"))
.option("checkpointLocation", "sparkCheckPoint\\Rate2ConsoleProgressTrigger\\cp1")
.start()
Below is a screenshot of spark UI processing 90K records per second with 20 seconds trigger. As you can see spark tries to process micro batch every 20 seconds.
Continuous
Continuous stream processing was introduced in Spark 2.3 as experimental. As of Spark 2.4.0, it’s still experimental. It enables you to process records in milliseconds which would otherwise take seconds in micro-batch. In this trigger option records are not processed in micro-batch instead long running task is created per write stream and processed as quickly as possible. “Exactly once” semantics is not supported in continuous processing, only “at least once” semantics is supported. Offsets of the processed records are periodically committed asynchronously based on epoch markers added during processing. The time interval between two offset commits process (epoch markers) is epochs. The diagram below represents a continuous stream process in sequence.
- Drive creates a long-running task
- Input records are processed
- Processed records are stored in the target
- A task persists offsets asynchronously
- Offsets are committed to Write Ahead Log (WAL)
Bottom line: Long-running task processes the data rather than micro batch to achieve low latency. Epoch markers trigger offset commit asynchronously.
Sample code to create a write stream with “Continuous” trigger. Refer complete sample job here.
Below is a screenshot of spark UI processing 1 records per second using a continuous trigger. As you can see there is one long-running task instead of micro batches.
Below is a screenshot of spark UI processing of multiple stream processing 1 records per second using a continuous trigger. As you can see there are 2 long running tasks for each stream and again no micro batches.
Comparison
Table below summaries the difference between all the trigger options:
Summary
Use the correct trigger option based on your latency requirements and achieve the preferred result. Use continuous streaming if you want milliseconds level latency otherwise go with custom processing time rather than default trigger option. Refer all trigger options related code here.
Thanks for the write up Daniel, very informative.
Very Good Article Daniel! Thanks for the info.
Crisp and clear!! Thank you for article.
Crisp and easy to understand ... Thanks for publishing ...
Very nicely explained Daniel.