Skip to content

WatermarkTracker

WatermarkTracker tracks the event-time watermark of a streaming query (across EventTimeWatermarkExec operators in a physical query plan) based on a given MultipleWatermarkPolicy.

WatermarkTracker is used in MicroBatchExecution.

Creating Instance

WatermarkTracker takes the following to be created:

WatermarkTracker is created (using apply) when MicroBatchExecution is requested to populate start offsets at start or restart (from a checkpoint).

MultipleWatermarkPolicy

WatermarkTracker is given a MultipleWatermarkPolicy when created that can be one of the following:

  • MaxWatermark (alias: min)
  • MinWatermark (alias: max)

Creating WatermarkTracker

apply(
  conf: RuntimeConfig): WatermarkTracker

apply uses the spark.sql.streaming.multipleWatermarkPolicy configuration property for the global watermark policy (default: min) and creates a WatermarkTracker.

apply is used when MicroBatchExecution is requested to populate start offsets at start or restart (from a checkpoint).

Global Event-Time Watermark

globalWatermarkMs: Long

WatermarkTracker uses globalWatermarkMs internal registry to keep track of global event-time watermark (based on MultipleWatermarkPolicy across all EventTimeWatermarkExec operators in a physical query plan).

Default: 0

globalWatermarkMs is used when WatermarkTracker is requested to updateWatermark.

The event-time watermark can be updated in setWatermark and updateWatermark.

The event-time watermark is used (as currentWatermark method) when MicroBatchExecution stream execution engine is requested to populateStartOffsets and constructNextBatch and runBatch.

Updating Watermark (at Startup and Restart)

setWatermark(
  newWatermarkMs: Long): Unit

setWatermark sets the global event-time watermark to the given newWatermarkMs value.

setWatermark is used when MicroBatchExecution is requested to populate start offsets at start or restart (from a checkpoint).

Updating Watermark (at Execution)

updateWatermark(
  executedPlan: SparkPlan): Unit

updateWatermark requests the given SparkPlan physical operator to collect all EventTimeWatermarkExec unary physical operators.

updateWatermark simply exits when no EventTimeWatermarkExec was found.

updateWatermark...FIXME

updateWatermark is used when MicroBatchExecution is requested to run a single streaming batch (when requested to run an activated streaming query).

Watermarks by EventTimeWatermarkExec Operator Registry

operatorToWatermarkMap: Map[Int, Long]

WatermarkTracker uses operatorToWatermarkMap internal registry to keep track of event-time watermarks of every EventTimeWatermarkExec physical operator in a streaming query plan.

operatorToWatermarkMap is used when WatermarkTracker is requested to updateWatermark.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.WatermarkTracker logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.WatermarkTracker=ALL

Refer to Logging.


Last update: 2021-02-07