Spark Trigger Options

Spark Trigger Options

Objective

In this article let’s see different trigger options available in spark structured streaming. Setting the right trigger for a stream will decide how quick your stream reads the next set of records. Trigger option enables you to control latency and throughput of the job.

No alt text provided for this image

Default Behavior

When you don’t set any trigger option to write stream then Spark will try to process the next set of records as soon as it’s done with current micro batch. The default behavior of write streams in spark structured streaming is the micro batch. In micro batch, the incoming records are grouped into small windows and processed in a periodic fashion.

Bottom line: Spark will try running next micro batch As Soon As Possible (ASAP).

Below diagram explains sequence of a micro-batch.

No alt text provided for this image
  1. Driver updates Write Ahead Log (WAL) with unprocessed offset.
  2. Micro-batch job to process the records is created.
  3. Input is read and processed.
  4. Output of micro-batch is persisted in the target.

Refer code below to create write stream without any trigger. Refer complete sample job here.

val defaultStream = rateRawData.writeStream
  .format("console")
  .queryName("Default")
  .option("checkpointLocation", "sparkCheckPoint\\Rate2ConsoleDefaultTrigger\\cp1")
  .start()

Below is a screenshot of spark UI processing 1000 records per second without any trigger option set. As shown below spark tries to process ASAP almost on an average every second.

No alt text provided for this image

Below is a screenshot of spark UI processing 90,000 records per second without any trigger option set. As you can see micro batches are triggered at uneven intervals.

No alt text provided for this image
No alt text provided for this image

Once

When you set Trigger option “Once” it processes only once and then terminates the stream. The behavior of this option is very similar to a batch job. The stream is created once and all the pending records are processed and then the stream is terminated.

Refer code below to create write stream with “Once” trigger. Refer complete sample job here.

val onceStream   ​= rateRawData.writeStream
  .format("console")
  .queryName("Once")
  .trigger(Trigger.Once())
  .option("checkpointLocation", "sparkCheckPoint\\Rate2ConsoleOnceTrigger\\cp1")
  .start()

Processing time

This is the most widely used and recommended practice in Spark structured streaming. Trigger option of processing time gives you better control of how often micro batch jobs should get triggered. For example, if you set a trigger interval of “20 seconds” then for every 20 seconds a micro-batch will process a batch of records. Based on the speed at which you want your job to process the records you can fine-tune the trigger interval and throughput of the job as well.

In the case of micro-batch processing, duration exceeds the interval defined, it will start the next micro batch ASAP. For example, if the defined interval is “20 seconds” but processing takes “30 seconds” then next batch will start ASAP.

Refer code below to create write stream with “Processing time” trigger. Refer complete sample job here.

val processingTimeStream = rateRawData.writeStream
  .format("console")
  .queryName("Micro Batch")
  .trigger(Trigger.ProcessingTime("20 seconds"))
  .option("checkpointLocation", "sparkCheckPoint\\Rate2ConsoleProgressTrigger\\cp1")
  .start()

Below is a screenshot of spark UI processing 90K records per second with 20 seconds trigger. As you can see spark tries to process micro batch every 20 seconds.

No alt text provided for this image
No alt text provided for this image

Continuous

Continuous stream processing was introduced in Spark 2.3 as experimental. As of Spark 2.4.0, it’s still experimental. It enables you to process records in milliseconds which would otherwise take seconds in micro-batch. In this trigger option records are not processed in micro-batch instead long running task is created per write stream and processed as quickly as possible. “Exactly once” semantics is not supported in continuous processing, only “at least once” semantics is supported. Offsets of the processed records are periodically committed asynchronously based on epoch markers added during processing. The time interval between two offset commits process (epoch markers) is epochs. The diagram below represents a continuous stream process in sequence.

No alt text provided for this image
  1. Drive creates a long-running task
  2. Input records are processed
  3. Processed records are stored in the target
  4. A task persists offsets asynchronously
  5. Offsets are committed to Write Ahead Log (WAL)

Bottom line: Long-running task processes the data rather than micro batch to achieve low latency. Epoch markers trigger offset commit asynchronously.

Sample code to create a write stream with “Continuous” trigger. Refer complete sample job here.

Below is a screenshot of spark UI processing 1 records per second using a continuous trigger. As you can see there is one long-running task instead of micro batches.

No alt text provided for this image
No alt text provided for this image

Below is a screenshot of spark UI processing of multiple stream processing 1 records per second using a continuous trigger. As you can see there are 2 long running tasks for each stream and again no micro batches.

No alt text provided for this image
No alt text provided for this image

Comparison

Table below summaries the difference between all the trigger options:

No alt text provided for this image

Summary

Use the correct trigger option based on your latency requirements and achieve the preferred result. Use continuous streaming if you want milliseconds level latency otherwise go with custom processing time rather than default trigger option. Refer all trigger options related code here.

Thanks for the write up Daniel, very informative.

Very Good Article Daniel! Thanks for the info.

Like
Reply

Crisp and clear!! Thank you for article.

Crisp and easy to understand ... Thanks for publishing ...

Like
Reply

Very nicely explained Daniel.

Like
Reply

To view or add a comment, sign in

More articles by Sylvester Daniel

  • Build Netflix like personalized thumbnail

    When you browse through the Netflix catalog have you ever noticed movie thumbnails are different across user profiles…

    4 Comments
  • Lambda architecture & Time travel using Delta Lake

    In this part of the article, let’s understand Delta Lake setup and how to implement a simplified lambda architecture…

  • Delta Lake Intro

    In this article series let's understand common challenges in big data ETL jobs, what is Delta Lake, it’s features…

  • Kafka Consumer Delivery Semantics

    This article is a continuation of part 1 Kafka technical overview, part 2 Kafka producer overview, part 3 Kafka…

    1 Comment
  • Kafka Consumer Overview

    This article is a continuation of part 1 Kafka technical overview, part 2 Kafka producer overview and part 3 Kafka…

    3 Comments
  • Kafka producer delivery semantics

    This article is a continuation of part 1 Kafka technical overview and part 2 Kafka producer overview articles. Let's…

    6 Comments
  • Kafka producer overview

    This article is a continuation of part 1 Kafka technical overview article. In part 2 of the series let's look into the…

    8 Comments
  • Kafka Technical Overview

    Objective In this article series, we will learn Kafka basics, Kafka delivery semantics, and configuration to achieve…

    6 Comments
  • Spark named write streams

    Objective In this article let's understand the benefits of naming Spark write stream. In Spark UI when analyzing a…

Others also viewed

Explore content categories