ProgressReporter¶
ProgressReporter
is an abstraction of execution progress reporters that report statistics of execution of a streaming query.
Contract¶
currentBatchId¶
currentBatchId: Long
ID of the streaming batch
Used when:
MicroBatchExecution
is requested to plan a query for the batch (while running a batch)ContinuousExecution
is requested to plan a query for the epoch (while running continuously)ProgressReporter
is requested for a new StreamingQueryProgress (while finishing a trigger)- other usage
id¶
id: UUID
Universally unique identifier (UUID) of the streaming query (that remains unchanged between restarts)
lastExecution¶
lastExecution: QueryExecution
IncrementalExecution of the streaming execution round (a batch or an epoch)
IncrementalExecution
is created and executed in the queryPlanning phase of MicroBatchExecution and ContinuousExecution stream execution engines.
logicalPlan¶
logicalPlan: LogicalPlan
Logical query plan of the streaming query
Important
The most interesting usage of the LogicalPlan
is when stream execution engines replace (transform) input StreamingExecutionRelation and StreamingDataSourceV2Relation operators with (operators with) data or LocalRelation
(to represent no data at a source).
Used when ProgressReporter
is requested for the following:
- extract statistics from the most recent query execution (to add
watermark
metric for streaming watermark) - extractSourceToNumInputRows
name¶
name: String
Name of the streaming query
newData¶
newData: Map[SparkDataStream, LogicalPlan]
SparkDataStreams (from all data sources) with the more recent unprocessed input data (as LogicalPlan
)
Used exclusively for MicroBatchExecution (when requested to run a single micro-batch)
Used when ProgressReporter
is requested to extractSourceToNumInputRows
offsetSeqMetadata¶
offsetSeqMetadata: OffsetSeqMetadata
OffsetSeqMetadata (with the current micro-batch event-time watermark and timestamp)
postEvent¶
postEvent(
event: StreamingQueryListener.Event): Unit
Posts StreamingQueryListener.Event
Used when:
ProgressReporter
is requested to update progress (and posts a QueryProgressEvent)StreamExecution
is requested to run stream processing (and posts a QueryStartedEvent at the beginning and a QueryTerminatedEvent after a query has been stopped)
runId¶
runId: UUID
Universally unique identifier (UUID) of a single run of the streaming query (that changes every restart)
sink¶
sink: Table
The one and only Table
of the streaming query
sinkCommitProgress¶
sinkCommitProgress: Option[StreamWriterCommitProgress]
StreamWriterCommitProgress
with number of output rows:
-
None
whenMicroBatchExecution
stream execution engine is requested to populateStartOffsets -
Assigned a
StreamWriterCommitProgress
whenMicroBatchExecution
stream execution engine is about to complete running a micro-batch
Used when finishTrigger (and updating progress)
sources¶
sources: Seq[SparkDataStream]
sparkSession¶
sparkSession: SparkSession
SparkSession
of the streaming query
Tip
Find out more on SparkSession in The Internals of Spark SQL online book.
triggerClock¶
triggerClock: Clock
Clock of the streaming query
Implementations¶
spark.sql.streaming.noDataProgressEventInterval¶
ProgressReporter
uses the spark.sql.streaming.noDataProgressEventInterval configuration property to control how long to wait between two progress events when there is no data (default: 10000L
) when finishing a trigger.
Demo¶
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sampleQuery = spark
.readStream
.format("rate")
.load
.writeStream
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime(10.seconds))
.start
// Using public API
import org.apache.spark.sql.streaming.SourceProgress
scala> sampleQuery.
| lastProgress.
| sources.
| map { case sp: SourceProgress =>
| s"source = ${sp.description} => endOffset = ${sp.endOffset}" }.
| foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => endOffset = 663
scala> println(sampleQuery.lastProgress.sources(0))
res40: org.apache.spark.sql.streaming.SourceProgress =
{
"description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
"startOffset" : 333,
"endOffset" : 343,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.9998000399920015,
"processedRowsPerSecond" : 200.0
}
// With a hack
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val offsets = sampleQuery.
asInstanceOf[StreamingQueryWrapper].
streamingQuery.
availableOffsets.
map { case (source, offset) =>
s"source = $source => offset = $offset" }
scala> offsets.foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => offset = 293
StreamingQueryProgress Queue¶
progressBuffer: Queue[StreamingQueryProgress]
progressBuffer
is a scala.collection.mutable.Queue of StreamingQueryProgresses.
progressBuffer
has a new StreamingQueryProgress
added when ProgressReporter
is requested to update progress of a streaming query.
The oldest StreamingQueryProgress
is removed (dequeued) above spark.sql.streaming.numRecentProgressUpdates threshold.
progressBuffer
is used when ProgressReporter
is requested for the last and the recent StreamingQueryProgresses.
Current StreamingQueryStatus¶
status: StreamingQueryStatus
status
is the current StreamingQueryStatus.
status
is used when StreamingQueryWrapper
is requested for the current status of a streaming query.
Updating Progress of Streaming Query¶
updateProgress(
newProgress: StreamingQueryProgress): Unit
updateProgress
records the input newProgress
and posts a QueryProgressEvent event.
updateProgress
adds the input newProgress
to progressBuffer.
updateProgress
removes elements from progressBuffer if their number is or exceeds the value of spark.sql.streaming.numRecentProgressUpdates configuration property.
updateProgress
posts a QueryProgressEvent (with the input newProgress
).
updateProgress
prints out the following INFO message to the logs:
Streaming query made progress: [newProgress]
updateProgress
is used when ProgressReporter
is requested to finish up a trigger.
Initializing Query Progress for New Trigger¶
startTrigger(): Unit
startTrigger
prints out the following DEBUG message to the logs:
Starting Trigger Calculation
.startTrigger's Internal Registry Changes For New Trigger [cols="30,70",options="header",width="100%"] |=== | Registry | New Value
| <
| <
| <true
) the isTriggerActive
flag of the <
| <null
| <null
| <
|===
startTrigger
is used when:
-
MicroBatchExecution
stream execution engine is requested to run an activated streaming query (at the beginning of every trigger) -
ContinuousExecution
stream execution engine is requested to run an activated streaming query (at the beginning of every trigger)
StreamExecution
starts running batches (as part of TriggerExecutor executing a batch runner).
Finishing Up Streaming Batch (Trigger)¶
finishTrigger(hasNewData: Boolean): Unit
finishTrigger
sets currentTriggerEndTimestamp to the current time (using triggerClock).
finishTrigger
<
finishTrigger
calculates the processing time (in seconds) as the difference between the <
finishTrigger
calculates the input time (in seconds) as the difference between the start time of the <
.ProgressReporter's finishTrigger and Timestamps image::images/ProgressReporter-finishTrigger-timestamps.png[align="center"]
finishTrigger
prints out the following DEBUG message to the logs:
Execution stats: [executionStats]
finishTrigger
creates a <
finishTrigger
creates a <
finishTrigger
creates a StreamingQueryProgress.
If there was any data (using the input hasNewData
flag), finishTrigger
resets <
Otherwise, when no data was available (using the input hasNewData
flag), finishTrigger
<
In the end, finishTrigger
disables isTriggerActive
flag of <false
).
NOTE: finishTrigger
is used exclusively when MicroBatchExecution
is requested to <
Time-Tracking Section (Recording Execution Time)¶
reportTimeTaken[T](
triggerDetailKey: String)(
body: => T): T
reportTimeTaken
measures the time to execute body
and records it in the currentDurationsMs internal registry under triggerDetailKey
key. If the triggerDetailKey
key was recorded already, the current execution time is added.
In the end, reportTimeTaken
prints out the following DEBUG message to the logs and returns the result of executing body
.
[triggerDetailKey] took [time] ms
reportTimeTaken
is used when stream execution engines are requested to execute the following phases (that appear as triggerDetailKey
in the DEBUG message in the logs):
-
MicroBatchExecution
-
ContinuousExecution
Updating Status Message¶
updateStatusMessage(
message: String): Unit
updateStatusMessage
simply updates the message
in the StreamingQueryStatus internal registry.
updateStatusMessage
is used when:
-
StreamExecution
is requested to run stream processing -
MicroBatchExecution
is requested to run an activated streaming query or construct the next streaming micro-batch
Generating Execution Statistics¶
extractExecutionStats(
hasNewData: Boolean): ExecutionStats
extractExecutionStats
generates an ExecutionStats of the <
Internally, extractExecutionStats
generate watermark metric (using the event-time watermark of the <
extractExecutionStats
extractStateOperatorMetrics.
extractExecutionStats
extractSourceToNumInputRows.
extractExecutionStats
finds the EventTimeWatermarkExec unary physical operator (with non-zero EventTimeStats) and generates max, min, and avg statistics.
In the end, extractExecutionStats
creates a ExecutionStats with the execution statistics.
If the input hasNewData
flag is turned off (false
), extractExecutionStats
returns an ExecutionStats with no input rows and event-time statistics (that require data to be processed to have any sense).
NOTE: extractExecutionStats
is used exclusively when ProgressReporter
is requested to <
Generating StateStoreWriter Metrics (StateOperatorProgress)¶
extractStateOperatorMetrics(
hasNewData: Boolean): Seq[StateOperatorProgress]
extractStateOperatorMetrics
requests the <executedPlan
) and finds all StateStoreWriter physical operators and requests them for StateOperatorProgress.
extractStateOperatorMetrics
clears (zeros) the numRowsUpdated metric for the given hasNewData
turned off (false
).
extractStateOperatorMetrics
returns an empty collection for the <null
).
extractStateOperatorMetrics
is used when ProgressReporter
is requested to generate execution statistics.
Recording Trigger Offsets (StreamProgress)¶
recordTriggerOffsets(
from: StreamProgress,
to: StreamProgress): Unit
recordTriggerOffsets
simply sets (records) the <from
and to
StreamProgresses.
recordTriggerOffsets
is used when:
-
MicroBatchExecution
is requested to <> -
ContinuousExecution
is requested to <>
Last StreamingQueryProgress¶
lastProgress: StreamingQueryProgress
The last StreamingQueryProgress
currentDurationsMs¶
scala.collection.mutable.HashMap of action names (aka triggerDetailKey) and their cumulative times (in milliseconds).
Starts empty when ProgressReporter
sets the state for a new batch with new entries added or updated when reporting execution time (of an action).
currentDurationsMs
is available as durationMs
of a streaming query.
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery
scala> query.lastProgress.durationMs
res1: java.util.Map[String,Long] = {triggerExecution=60, queryPlanning=1, getBatch=5, getOffset=0, addBatch=30, walCommit=23}
scala> println(q.lastProgress)
{
"id" : "03fc78fc-fe19-408c-a1ae-812d0e28fcee",
"runId" : "8c247071-afba-40e5-aad2-0e6f45f22488",
"name" : null,
"timestamp" : "2017-08-14T20:30:00.004Z",
"batchId" : 1,
"numInputRows" : 432,
"inputRowsPerSecond" : 0.9993568953312452,
"processedRowsPerSecond" : 1380.1916932907347,
"durationMs" : {
"addBatch" : 237,
"getBatch" : 26,
"getOffset" : 0,
"queryPlanning" : 1,
"triggerExecution" : 313,
"walCommit" : 45
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
"startOffset" : 0,
"endOffset" : 432,
"numInputRows" : 432,
"inputRowsPerSecond" : 0.9993568953312452,
"processedRowsPerSecond" : 1380.1916932907347
} ],
"sink" : {
"description" : "ConsoleSink[numRows=20, truncate=true]"
}
}
Internal Properties¶
currentTriggerEndTimestamp¶
Timestamp of when the current batch/trigger has ended
Default: -1L
currentTriggerStartOffsets¶
currentTriggerStartOffsets: Map[BaseStreamingSource, String]
Start offsets (in JSON format) per streaming source
Used exclusively when <
Reset (null
) when <
Initialized when <
currentTriggerStartTimestamp¶
Timestamp of when the current batch/trigger has started
Default: -1L
lastTriggerStartTimestamp¶
Timestamp of when the last batch/trigger started
Default: -1L
Logging¶
Configure logging of the concrete stream execution progress reporters to see what happens inside: