Skip to content

ProgressReporter

ProgressReporter is an abstraction of execution progress reporters that report statistics of execution of a streaming query.

Contract

currentBatchId

currentBatchId: Long

ID of the streaming batch

Used when:

id

id: UUID

Universally unique identifier (UUID) of the streaming query (that remains unchanged between restarts)

lastExecution

lastExecution: QueryExecution

IncrementalExecution of the streaming execution round (a batch or an epoch)

IncrementalExecution is created and executed in the queryPlanning phase of MicroBatchExecution and ContinuousExecution stream execution engines.

logicalPlan

logicalPlan: LogicalPlan

Logical query plan of the streaming query

Important

The most interesting usage of the LogicalPlan is when stream execution engines replace (transform) input StreamingExecutionRelation and StreamingDataSourceV2Relation operators with (operators with) data or LocalRelation (to represent no data at a source).

Used when ProgressReporter is requested for the following:

name

name: String

Name of the streaming query

newData

newData: Map[SparkDataStream, LogicalPlan]

SparkDataStreams (from all data sources) with the more recent unprocessed input data (as LogicalPlan)

Used exclusively for MicroBatchExecution (when requested to run a single micro-batch)

Used when ProgressReporter is requested to extractSourceToNumInputRows

offsetSeqMetadata

offsetSeqMetadata: OffsetSeqMetadata

OffsetSeqMetadata (with the current micro-batch event-time watermark and timestamp)

postEvent

postEvent(
  event: StreamingQueryListener.Event): Unit

Posts StreamingQueryListener.Event

Used when:

runId

runId: UUID

Universally unique identifier (UUID) of a single run of the streaming query (that changes every restart)

sink

sink: Table

The one and only Table of the streaming query

sinkCommitProgress

sinkCommitProgress: Option[StreamWriterCommitProgress]

StreamWriterCommitProgress with number of output rows:

  • None when MicroBatchExecution stream execution engine is requested to populateStartOffsets

  • Assigned a StreamWriterCommitProgress when MicroBatchExecution stream execution engine is about to complete running a micro-batch

Used when finishTrigger (and updating progress)

sources

sources: Seq[SparkDataStream]

sparkSession

sparkSession: SparkSession

SparkSession of the streaming query

Tip

Find out more on SparkSession in The Internals of Spark SQL online book.

triggerClock

triggerClock: Clock

Clock of the streaming query

Implementations

spark.sql.streaming.noDataProgressEventInterval

ProgressReporter uses the spark.sql.streaming.noDataProgressEventInterval configuration property to control how long to wait between two progress events when there is no data (default: 10000L) when finishing a trigger.

Demo

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sampleQuery = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(10.seconds))
  .start

// Using public API
import org.apache.spark.sql.streaming.SourceProgress
scala> sampleQuery.
     |   lastProgress.
     |   sources.
     |   map { case sp: SourceProgress =>
     |     s"source = ${sp.description} => endOffset = ${sp.endOffset}" }.
     |   foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => endOffset = 663

scala> println(sampleQuery.lastProgress.sources(0))
res40: org.apache.spark.sql.streaming.SourceProgress =
{
  "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
  "startOffset" : 333,
  "endOffset" : 343,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.9998000399920015,
  "processedRowsPerSecond" : 200.0
}

// With a hack
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val offsets = sampleQuery.
  asInstanceOf[StreamingQueryWrapper].
  streamingQuery.
  availableOffsets.
  map { case (source, offset) =>
    s"source = $source => offset = $offset" }
scala> offsets.foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => offset = 293

StreamingQueryProgress Queue

progressBuffer: Queue[StreamingQueryProgress]

progressBuffer is a scala.collection.mutable.Queue of StreamingQueryProgresses.

progressBuffer has a new StreamingQueryProgress added when ProgressReporter is requested to update progress of a streaming query.

The oldest StreamingQueryProgress is removed (dequeued) above spark.sql.streaming.numRecentProgressUpdates threshold.

progressBuffer is used when ProgressReporter is requested for the last and the recent StreamingQueryProgresses.

Current StreamingQueryStatus

status: StreamingQueryStatus

status is the current StreamingQueryStatus.

status is used when StreamingQueryWrapper is requested for the current status of a streaming query.

Updating Progress of Streaming Query

updateProgress(
  newProgress: StreamingQueryProgress): Unit

updateProgress records the input newProgress and posts a QueryProgressEvent event.

ProgressReporter's Reporting Query Progress

updateProgress adds the input newProgress to progressBuffer.

updateProgress removes elements from progressBuffer if their number is or exceeds the value of spark.sql.streaming.numRecentProgressUpdates configuration property.

updateProgress posts a QueryProgressEvent (with the input newProgress).

updateProgress prints out the following INFO message to the logs:

Streaming query made progress: [newProgress]

updateProgress is used when ProgressReporter is requested to finish up a trigger.

Initializing Query Progress for New Trigger

startTrigger(): Unit

startTrigger prints out the following DEBUG message to the logs:

Starting Trigger Calculation

.startTrigger's Internal Registry Changes For New Trigger [cols="30,70",options="header",width="100%"] |=== | Registry | New Value

| <> | <>

| <> | Requests the <> for the current timestamp (in millis)

| <> | Enables (true) the isTriggerActive flag of the <>

| <> | null

| <> | null

| <> | Clears the <>

|===

startTrigger is used when:

StreamExecution starts running batches (as part of TriggerExecutor executing a batch runner).

Finishing Up Streaming Batch (Trigger)

finishTrigger(hasNewData: Boolean): Unit

finishTrigger sets currentTriggerEndTimestamp to the current time (using triggerClock).

finishTrigger <>.

finishTrigger calculates the processing time (in seconds) as the difference between the <> and <> timestamps.

finishTrigger calculates the input time (in seconds) as the difference between the start time of the <> and <> triggers.

.ProgressReporter's finishTrigger and Timestamps image::images/ProgressReporter-finishTrigger-timestamps.png[align="center"]

finishTrigger prints out the following DEBUG message to the logs:

Execution stats: [executionStats]

finishTrigger creates a <> (aka source statistics) for <>.

finishTrigger creates a <> (aka sink statistics) for the <>.

finishTrigger creates a StreamingQueryProgress.

If there was any data (using the input hasNewData flag), finishTrigger resets <> (i.e. becomes the minimum possible time) and <>.

Otherwise, when no data was available (using the input hasNewData flag), finishTrigger <> only when <> passed.

In the end, finishTrigger disables isTriggerActive flag of <> (i.e. sets it to false).

NOTE: finishTrigger is used exclusively when MicroBatchExecution is requested to <> (after <> at the end of a streaming batch).

Time-Tracking Section (Recording Execution Time)

reportTimeTaken[T](
  triggerDetailKey: String)(
  body: => T): T

reportTimeTaken measures the time to execute body and records it in the currentDurationsMs internal registry under triggerDetailKey key. If the triggerDetailKey key was recorded already, the current execution time is added.

In the end, reportTimeTaken prints out the following DEBUG message to the logs and returns the result of executing body.

[triggerDetailKey] took [time] ms

reportTimeTaken is used when stream execution engines are requested to execute the following phases (that appear as triggerDetailKey in the DEBUG message in the logs):

  1. MicroBatchExecution

    1. triggerExecution
    2. getOffset
    3. setOffsetRange
    4. getEndOffset
    5. walCommit
    6. getBatch
    7. queryPlanning
    8. addBatch
  2. ContinuousExecution

    1. queryPlanning
    2. runContinuous

Updating Status Message

updateStatusMessage(
  message: String): Unit

updateStatusMessage simply updates the message in the StreamingQueryStatus internal registry.

updateStatusMessage is used when:

Generating Execution Statistics

extractExecutionStats(
  hasNewData: Boolean): ExecutionStats

extractExecutionStats generates an ExecutionStats of the <> of the streaming query.

Internally, extractExecutionStats generate watermark metric (using the event-time watermark of the <>) if there is a EventTimeWatermark unary logical operator in the <> of the streaming query.

extractExecutionStats extractStateOperatorMetrics.

extractExecutionStats extractSourceToNumInputRows.

extractExecutionStats finds the EventTimeWatermarkExec unary physical operator (with non-zero EventTimeStats) and generates max, min, and avg statistics.

In the end, extractExecutionStats creates a ExecutionStats with the execution statistics.

If the input hasNewData flag is turned off (false), extractExecutionStats returns an ExecutionStats with no input rows and event-time statistics (that require data to be processed to have any sense).

NOTE: extractExecutionStats is used exclusively when ProgressReporter is requested to <>.

Generating StateStoreWriter Metrics (StateOperatorProgress)

extractStateOperatorMetrics(
  hasNewData: Boolean): Seq[StateOperatorProgress]

extractStateOperatorMetrics requests the <> for the optimized execution plan (executedPlan) and finds all StateStoreWriter physical operators and requests them for StateOperatorProgress.

extractStateOperatorMetrics clears (zeros) the numRowsUpdated metric for the given hasNewData turned off (false).

extractStateOperatorMetrics returns an empty collection for the <> uninitialized (null).

extractStateOperatorMetrics is used when ProgressReporter is requested to generate execution statistics.

Recording Trigger Offsets (StreamProgress)

recordTriggerOffsets(
  from: StreamProgress,
  to: StreamProgress): Unit

recordTriggerOffsets simply sets (records) the <> and <> internal registries to the json representations of the from and to StreamProgresses.

recordTriggerOffsets is used when:

  • MicroBatchExecution is requested to <>

  • ContinuousExecution is requested to <>

Last StreamingQueryProgress

lastProgress: StreamingQueryProgress

The last StreamingQueryProgress

currentDurationsMs

scala.collection.mutable.HashMap of action names (aka triggerDetailKey) and their cumulative times (in milliseconds).

currentDurationsMs

Starts empty when ProgressReporter sets the state for a new batch with new entries added or updated when reporting execution time (of an action).

currentDurationsMs is available as durationMs of a streaming query.

scala> :type q
org.apache.spark.sql.streaming.StreamingQuery

scala> query.lastProgress.durationMs
res1: java.util.Map[String,Long] = {triggerExecution=60, queryPlanning=1, getBatch=5, getOffset=0, addBatch=30, walCommit=23}

scala> println(q.lastProgress)
{
  "id" : "03fc78fc-fe19-408c-a1ae-812d0e28fcee",
  "runId" : "8c247071-afba-40e5-aad2-0e6f45f22488",
  "name" : null,
  "timestamp" : "2017-08-14T20:30:00.004Z",
  "batchId" : 1,
  "numInputRows" : 432,
  "inputRowsPerSecond" : 0.9993568953312452,
  "processedRowsPerSecond" : 1380.1916932907347,
  "durationMs" : {
    "addBatch" : 237,
    "getBatch" : 26,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 313,
    "walCommit" : 45
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
    "startOffset" : 0,
    "endOffset" : 432,
    "numInputRows" : 432,
    "inputRowsPerSecond" : 0.9993568953312452,
    "processedRowsPerSecond" : 1380.1916932907347
  } ],
  "sink" : {
    "description" : "ConsoleSink[numRows=20, truncate=true]"
  }
}

Internal Properties

currentTriggerEndTimestamp

Timestamp of when the current batch/trigger has ended

Default: -1L

currentTriggerStartOffsets

currentTriggerStartOffsets: Map[BaseStreamingSource, String]

Start offsets (in JSON format) per streaming source

Used exclusively when <> (for a SourceProgress)

Reset (null) when <>

Initialized when <>

currentTriggerStartTimestamp

Timestamp of when the current batch/trigger has started

Default: -1L

lastTriggerStartTimestamp

Timestamp of when the last batch/trigger started

Default: -1L

Logging

Configure logging of the concrete stream execution progress reporters to see what happens inside: