MicroBatchExecution¶
MicroBatchExecution
is the stream execution engine in Micro-Batch Stream Processing.
MicroBatchExecution
is created when StreamingQueryManager
is requested to create a streaming query (when DataStreamWriter
is requested to start an execution of the streaming query) with the following:
-
Any type of <
> -
Any type of trigger but ContinuousTrigger
import org.apache.spark.sql.streaming.Trigger
val query = spark
.readStream
.format("rate")
.load
.writeStream
.format("console") // <-- not a StreamWriteSupport sink
.option("truncate", false)
.trigger(Trigger.Once) // <-- Gives MicroBatchExecution
.queryName("rate2console")
.start
// The following gives access to the internals
// And to MicroBatchExecution
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val engine = query.asInstanceOf[StreamingQueryWrapper].streamingQuery
import org.apache.spark.sql.execution.streaming.StreamExecution
assert(engine.isInstanceOf[StreamExecution])
import org.apache.spark.sql.execution.streaming.MicroBatchExecution
val microBatchEngine = engine.asInstanceOf[MicroBatchExecution]
assert(microBatchEngine.trigger == Trigger.Once)
Once <MicroBatchExecution
(as a stream execution engine) is requested to <
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.execution.streaming.MicroBatchExecution
to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.MicroBatchExecution=ALL
Refer to <>.¶
Creating Instance¶
MicroBatchExecution
takes the following to be created:
- [[sparkSession]]
SparkSession
- [[name]] Name of the streaming query
- [[checkpointRoot]] Path of the checkpoint directory
- [[analyzedPlan]] Analyzed logical query plan of the streaming query (
LogicalPlan
) - [[trigger]] Trigger
- [[triggerClock]] Trigger clock (
Clock
) - [[outputMode]] OutputMode
- [[extraOptions]] Extra options (
Map[String, String]
) - [[deleteCheckpointOnStop]]
deleteCheckpointOnStop
flag to control whether to delete the checkpoint directory on stop
MicroBatchExecution
initializes the <
=== [[triggerExecutor]] MicroBatchExecution and TriggerExecutor -- triggerExecutor
Property
[source, scala]¶
triggerExecutor: TriggerExecutor¶
triggerExecutor
is the TriggerExecutor of the streaming query that is how micro-batches are executed at regular intervals.
triggerExecutor
is initialized based on the given <MicroBatchExecution
):
-
OneTimeExecutor for OneTimeTrigger (aka Trigger.Once trigger)
triggerExecutor
throws an IllegalStateException
when the <
Unknown type of trigger: [trigger]
NOTE: triggerExecutor
is used exclusively when StreamExecution
is requested to <
Running Activated Streaming Query¶
runActivatedStream(
sparkSessionForStream: SparkSession): Unit
runActivatedStream
simply requests the TriggerExecutor to execute micro-batches using the batch runner (until MicroBatchExecution
is terminated due to a query stop or a failure).
runActivatedStream
is part of StreamExecution abstraction.
TriggerExecutor's Batch Runner¶
The batch runner (of the TriggerExecutor) is executed as long as the MicroBatchExecution
is active.
Note
trigger and batch are considered equivalent and used interchangeably.
[[runActivatedStream-startTrigger]] The batch runner initializes query progress for the new trigger (aka startTrigger).
[[runActivatedStream-triggerExecution]][[runActivatedStream-triggerExecution-populateStartOffsets]] The batch runner starts triggerExecution execution phase that is made up of the following steps:
-
Populating start offsets from checkpoint before the first "zero" batch (at every start or restart)
At the start or restart (resume) of a streaming query (when the <-1
), the batch runner <
Stream started from [committedOffsets]
The batch runner sets the human-readable description for any Spark job submitted (that streaming sources may submit to get new data) as the batch description.
[[runActivatedStream-triggerExecution-isCurrentBatchConstructed]] The batch runner <
The batch runner <
The batch runner updates the current StreamingQueryStatus with the <
[[runActivatedStream-triggerExecution-runBatch]] With the <true
), the batch runner updates the status message to one of the following (per <
Processing new data
No new data but cleaning up state
With the <false
), the batch runner simply updates the status message to the following:
Waiting for data to arrive
[[runActivatedStream-triggerExecution-finishTrigger]] The batch runner finalizes query progress for the trigger (with a flag that indicates whether the current batch had new data).
With the <true
), the batch runner increments the <false
).
With the <false
), the batch runner simply sleeps (as long as configured using the spark.sql.streaming.pollingDelay configuration property).
In the end, the batch runner updates the status message to the following status and returns whether the MicroBatchExecution
is active or not.
Waiting for next trigger
Populating Start Offsets From Checkpoint (Resuming from Checkpoint)¶
populateStartOffsets(
sparkSessionToRunBatches: SparkSession): Unit
populateStartOffsets
requests the Offset Write-Ahead Log for the latest committed batch id with metadata (i.e. OffsetSeq).
Note
The batch id could not be available in the write-ahead log when a streaming query started with a new log or no batch was persisted (added) to the log before.
populateStartOffsets
branches off based on whether the latest committed batch was available or not.
populateStartOffsets
is used when MicroBatchExecution
is requested to run an activated streaming query (before the first "zero" micro-batch).
Latest Committed Batch Available¶
When the latest committed batch id with the metadata was available in the Offset Write-Ahead Log, populateStartOffsets
(re)initializes the internal state as follows:
-
Sets the current batch ID to the latest committed batch ID found
-
Turns the isCurrentBatchConstructed internal flag on (
true
) -
Sets the available offsets to the offsets (from the metadata)
When the latest batch ID found is greater than 0
, populateStartOffsets
requests the Offset Write-Ahead Log for the second latest batch ID with metadata or throws an IllegalStateException
if not found.
batch [latestBatchId - 1] doesn't exist
populateStartOffsets
sets the committed offsets to the second latest committed offsets.
[[populateStartOffsets-getLatest-available-offsetSeqMetadata]] populateStartOffsets
updates the offset metadata.
CAUTION: FIXME Describe me
populateStartOffsets
requests the Offset Commit Log for the latest committed batch id with metadata.
CAUTION: FIXME Describe me
When the latest committed batch id with metadata was found which is exactly the latest batch ID (found in the Offset Commit Log), populateStartOffsets
...FIXME
When the latest committed batch id with metadata was found, but it is not exactly the second latest batch ID (found in the Offset Commit Log), populateStartOffsets
prints out the following WARN message to the logs:
Batch completion log latest batch id is [latestCommittedBatchId], which is not trailing batchid [latestBatchId] by one
When no commit log present in the Offset Commit Log, populateStartOffsets
prints out the following INFO message to the logs:
no commit log present
In the end, populateStartOffsets
prints out the following DEBUG message to the logs:
Resuming at batch [currentBatchId] with committed offsets [committedOffsets] and available offsets [availableOffsets]
No Latest Committed Batch¶
When the latest committed batch id with the metadata could not be found in the Offset Write-Ahead Log, it is assumed that the streaming query is started for the very first time (or the checkpoint location has changed).
populateStartOffsets
prints out the following INFO message to the logs:
Starting new streaming query.
[[populateStartOffsets-currentBatchId-0]] populateStartOffsets
sets the current batch ID to 0
and creates a new WatermarkTracker.
Constructing Or Skipping Next Streaming Micro-Batch¶
constructNextBatch(
noDataBatchesEnabled: Boolean): Boolean
constructNextBatch
is used when MicroBatchExecution
is requested to run the activated streaming query.
Note
constructNextBatch
is only executed when the isCurrentBatchConstructed internal flag is enabled (true
).
constructNextBatch
performs the following steps:
-
Requesting the latest offsets from every streaming source (of the streaming query)
-
Updating availableOffsets StreamProgress with the latest available offsets
-
Updating batch metadata with the current event-time watermark and batch timestamp
-
Checking whether to construct the next micro-batch or not (skip it)
In the end, constructNextBatch
returns whether the next streaming micro-batch was constructed or skipped.
Requesting Latest Offsets from Streaming Sources (getOffset, setOffsetRange and getEndOffset Phases)¶
constructNextBatch
firstly requests every streaming source for the latest offsets.
NOTE: constructNextBatch
checks out the latest offset in every streaming data source sequentially, i.e. one data source at a time.
.MicroBatchExecution's Getting Offsets From Streaming Sources image::images/MicroBatchExecution-constructNextBatch.png[align="center"]
For every streaming source (Data Source API V1), constructNextBatch
updates the status message to the following:
Getting offsets from [source]
[[constructNextBatch-getOffset]] In getOffset time-tracking section, constructNextBatch
requests the Source
for the <
For every <constructNextBatch
updates the status message to the following:
Getting offsets from [source]
[[constructNextBatch-setOffsetRange]] In setOffsetRange time-tracking section, constructNextBatch
finds the available offsets of the source (in the <MicroBatchReader
to <constructNextBatch
requests the MicroBatchReader
to <
[[constructNextBatch-getEndOffset]] In getEndOffset time-tracking section, constructNextBatch
requests the MicroBatchReader
for the <
Updating availableOffsets StreamProgress with Latest Available Offsets¶
constructNextBatch
updates the availableOffsets StreamProgress with the latest reported offsets.
Updating Batch Metadata with Current Event-Time Watermark and Batch Timestamp¶
constructNextBatch
updates the batch metadata with the current event-time watermark (from the <
Checking Whether to Construct Next Micro-Batch or Not (Skip It)¶
constructNextBatch
checks whether or not the next streaming micro-batch should be constructed (lastExecutionRequiresAnotherBatch
).
constructNextBatch
uses the last IncrementalExecution if the last execution requires another micro-batch (using the batch metadata) and the given noDataBatchesEnabled
flag is enabled (true
).
constructNextBatch
also <
NOTE: shouldConstructNextBatch
local flag is enabled (true
) when <noDataBatchesEnabled
flag is enabled).
constructNextBatch
prints out the following TRACE message to the logs:
noDataBatchesEnabled = [noDataBatchesEnabled], lastExecutionRequiresAnotherBatch = [lastExecutionRequiresAnotherBatch], isNewDataAvailable = [isNewDataAvailable], shouldConstructNextBatch = [shouldConstructNextBatch]
constructNextBatch
branches off per whether to <shouldConstructNextBatch
flag in the above TRACE message).
Constructing Next Micro-Batch¶
With the <true
), constructNextBatch
updates the status message to the following:
Writing offsets to log
[[constructNextBatch-walCommit]] In walCommit time-tracking section, constructNextBatch
requests the availableOffsets StreamProgress to convert to OffsetSeq (with the <
constructNextBatch
prints out the following INFO message to the logs:
Committed offsets for batch [currentBatchId]. Metadata [offsetSeqMetadata]
NOTE: FIXME (if (currentBatchId != 0) ...
)
NOTE: FIXME (if (minLogEntriesToMaintain < currentBatchId) ...
)
constructNextBatch
turns the noNewData internal flag off (false
).
In case of a failure while adding the available offsets to the write-ahead log, constructNextBatch
throws an AssertionError
:
Concurrent update to the log. Multiple streaming jobs detected for [currentBatchId]
Skipping Next Micro-Batch¶
With the <false
), constructNextBatch
turns the noNewData flag on (true
) and wakes up (notifies) all threads waiting for the awaitProgressLockCondition lock.
Running Single Streaming Micro-Batch¶
runBatch(
sparkSessionToRunBatch: SparkSession): Unit
runBatch
prints out the following DEBUG message to the logs (with the current batch ID):
Running batch [currentBatchId]
runBatch
then performs the following steps (aka phases):
. <
In the end, runBatch
prints out the following DEBUG message to the logs (with the current batch ID):
Completed batch [currentBatchId]
NOTE: runBatch
is used exclusively when MicroBatchExecution
is requested to <
getBatch Phase -- Creating Logical Query Plans For Unprocessed Data From Sources and MicroBatchReaders¶
In getBatch time-tracking section, runBatch
goes over the available offsets and processes every <newData
) for data processing (per offset ranges).
NOTE: runBatch
requests sources and readers for data per offset range sequentially, one by one.
.StreamExecution's Running Single Streaming Batch (getBatch Phase) image::images/StreamExecution-runBatch-getBatch.png[align="center"]
getBatch Phase and Sources¶
For a Source (with the available offsets different from the committedOffsets registry), runBatch
does the following:
-
Requests the committedOffsets for the committed offsets for the
Source
(if available) -
Requests the
Source
for a dataframe for the offset range (the current and available offsets)
runBatch
prints out the following DEBUG message to the logs.
Retrieving data from [source]: [current] -> [available]
In the end, runBatch
returns the Source
and the logical plan of the streaming dataset (for the offset range).
In case the Source
returns a dataframe that is not streaming, runBatch
throws an AssertionError
:
DataFrame returned by getBatch from [source] did not have isStreaming=true\n[logicalQueryPlan]
getBatch Phase and MicroBatchReaders¶
For a <runBatch
does the following:
-
Requests the committedOffsets for the committed offsets for the
MicroBatchReader
(if available) -
Requests the
MicroBatchReader
to <> (if available) -
Requests the
MicroBatchReader
to <> (only for SerializedOffsets) -
Requests the
MicroBatchReader
to <> (the current and available offsets)
runBatch
prints out the following DEBUG message to the logs.
Retrieving data from [reader]: [current] -> [availableV2]
runBatch
looks up the DataSourceV2
and the options for the MicroBatchReader
(in the <
In the end, runBatch
requests the MicroBatchReader
for the <DataSourceV2
, options, and the MicroBatchReader
).
Transforming Logical Plan to Include Sources and MicroBatchReaders with New Data¶
.StreamExecution's Running Single Streaming Batch (and Transforming Logical Plan for New Data) image::images/StreamExecution-runBatch-newBatchesPlan.png[align="center"]
runBatch
transforms the <newBatchesPlan
with logical plans to process data that has arrived since the last batch).
For every StreamingExecutionRelation, runBatch
tries to find the corresponding logical plan for processing new data.
If the logical plan is found, runBatch
makes the plan a child operator of Project
(with Aliases
) logical operator and replaces the StreamingExecutionRelation
.
Otherwise, if not found, runBatch
simply creates an empty streaming LocalRelation
(for scanning data from an empty local collection).
In case the number of columns in dataframes with new data and StreamingExecutionRelation
's do not match, runBatch
throws an AssertionError
:
Invalid batch: [output] != [dataPlan.output]
Transforming CurrentTimestamp and CurrentDate Expressions (Per Batch Metadata)¶
runBatch
replaces all CurrentTimestamp
and CurrentDate
expressions in the <
Note
CurrentTimestamp
and CurrentDate
expressions correspond to current_timestamp
and current_date
standard function, respectively.
Adapting Transformed Logical Plan to Sink with StreamWriteSupport¶
runBatch
...FIXME
For a Sink (Data Source API V1), runBatch
changes nothing.
For any other <runBatch
simply throws an IllegalArgumentException
:
unknown sink type for [sink]
Setting Local Properties¶
runBatch
sets the local properties.
Local Property | Value |
---|---|
streaming.sql.batchId | currentBatchId |
__is_continuous_processing | false |
queryPlanning Phase -- Creating and Preparing IncrementalExecution for Execution¶
.StreamExecution's Query Planning (queryPlanning Phase) image::images/StreamExecution-runBatch-queryPlanning.png[align="center"]
In queryPlanning time-tracking section, runBatch
creates a new IncrementalExecution with the following:
-
<
> -
<
> -
state
<>
In the end (of the queryPlanning
phase), runBatch
requests the IncrementalExecution
to prepare the transformed logical plan for execution (i.e. execute the executedPlan
query execution phase).
TIP: Read up on the executedPlan
query execution phase in https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-QueryExecution.html[The Internals of Spark SQL].
nextBatch Phase — Creating DataFrame (with IncrementalExecution for New Data)¶
runBatch
creates a new DataFrame
with the new <
The DataFrame
represents the result of executing the current micro-batch of the streaming query.
addBatch Phase — Adding DataFrame With New Data to Sink¶
In addBatch time-tracking section, runBatch
adds the DataFrame
with new data to the BaseStreamingSink.
For a Sink (Data Source API V1), runBatch
simply requests the Sink
to add the DataFrame (with the batch ID).
runBatch
uses SQLExecution.withNewExecutionId
to execute and track all the Spark jobs under one execution id (so it is reported as one single multi-job execution, e.g. in web UI).
Note
SQLExecution.withNewExecutionId
posts a SparkListenerSQLExecutionStart
event before execution and a SparkListenerSQLExecutionEnd
event right afterwards.
Tip
Register SparkListener
to get notified about the SQL execution events (SparkListenerSQLExecutionStart
and SparkListenerSQLExecutionEnd
).
Updating Watermark and Committing Offsets to Offset Commit Log¶
runBatch
requests the WatermarkTracker to update event-time watermark (with the executedPlan
of the IncrementalExecution).
runBatch
requests the Offset Commit Log to persisting metadata of the streaming micro-batch (with the current batch ID and event-time watermark of the WatermarkTracker).
In the end, runBatch
adds the available offsets to the committed offsets (and updates the offsets of every source with new data in the current micro-batch).
Stopping Stream Processing (Execution of Streaming Query)¶
stop(): Unit
stop
sets the state to TERMINATED
.
When the stream execution thread is alive, stop
requests the current SparkContext
to cancelJobGroup
identified by the runId and waits for this thread to die. Just to make sure that there are no more streaming jobs, stop
requests the current SparkContext
to cancelJobGroup
identified by the runId again.
In the end, stop
prints out the following INFO message to the logs:
Query [prettyIdString] was stopped
stop
is part of the StreamingQuery abstraction.
=== [[isNewDataAvailable]] Checking Whether New Data Is Available (Based on Available and Committed Offsets) -- isNewDataAvailable
Internal Method
[source, scala]¶
isNewDataAvailable: Boolean¶
isNewDataAvailable
checks whether there is a streaming source (in the <
isNewDataAvailable
is positive (true
) when there is at least one such streaming source.
NOTE: isNewDataAvailable
is used when MicroBatchExecution
is requested to <
Analyzed Logical Plan¶
logicalPlan: LogicalPlan
logicalPlan
is part of the StreamExecution abstraction.
logicalPlan
resolves (replaces) StreamingRelation, StreamingRelationV2 logical operators to StreamingExecutionRelation logical operators. logicalPlan
uses the transformed logical plan to set the uniqueSources and <StreamingExecutionRelations
unique and not, respectively.
Lazy Value
logicalPlan
is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.
Internally, logicalPlan
transforms the <
For every StreamingRelation logical operator, logicalPlan
tries to replace it with the StreamingExecutionRelation that was used earlier for the same StreamingRelation
(if used multiple times in the plan) or creates a new one. While creating a new StreamingExecutionRelation
, logicalPlan
requests the DataSource
to create a streaming Source with the metadata path as sources/uniqueID
directory in the checkpoint root directory. logicalPlan
prints out the following INFO message to the logs:
Using Source [source] from DataSourceV1 named '[sourceName]' [dataSourceV1]
For every StreamingRelationV2 logical operator with a MicroBatchStream data source (which is not on the list of spark.sql.streaming.disabledV2MicroBatchReaders), logicalPlan
tries to replace it with the StreamingExecutionRelation that was used earlier for the same StreamingRelationV2
(if used multiple times in the plan) or creates a new one. While creating a new StreamingExecutionRelation
, logicalPlan
requests the MicroBatchStream
to create a MicroBatchStream with the metadata path as sources/uniqueID
directory in the checkpoint root directory. logicalPlan
prints out the following INFO message to the logs:
Using MicroBatchReader [reader] from DataSourceV2 named '[sourceName]' [dataSourceV2]
For every other StreamingRelationV2 leaf logical operator, logicalPlan
tries to replace it with the StreamingExecutionRelation that was used earlier for the same StreamingRelationV2
(if used multiple times in the plan) or creates a new one. While creating a new StreamingExecutionRelation
, logicalPlan
requests the StreamingRelation
for the underlying DataSource that is in turn requested to create a streaming Source with the metadata path as sources/uniqueID
directory in the checkpoint root directory. logicalPlan
prints out the following INFO message to the logs:
Using Source [source] from DataSourceV2 named '[sourceName]' [dataSourceV2]
logicalPlan
requests the transformed analyzed logical plan for all StreamingExecutionRelations
that are then requested for BaseStreamingSources, and saves them as the <
In the end, logicalPlan
sets the uniqueSources internal registry to be the unique BaseStreamingSources
above.
logicalPlan
throws an AssertionError
when not executed on the stream execution thread.
logicalPlan must be initialized in QueryExecutionThread but the current thread was [currentThread]
streaming.sql.batchId Local Property¶
MicroBatchExecution
defines streaming.sql.batchId as the name of the local property to be the current batch or epoch IDs (that Spark tasks can use at execution time).
streaming.sql.batchId
is used when:
MicroBatchExecution
is requested to run a single streaming micro-batch (and sets the property to be the current batch ID)DataWritingSparkTask
is requested to run (and needs an epoch ID)
WatermarkTracker¶
WatermarkTracker that is created when MicroBatchExecution
is requested to populate start offsets (when requested to run an activated streaming query)
Internal Properties¶
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| isCurrentBatchConstructed a| [[isCurrentBatchConstructed]] Flag to control whether to <true
) or not (false
)
Default: false
-
When disabled (
false
), changed to whatever <> gives back when < > -
Disabled (
false
) after <> (when enabled after < >) -
Enabled (
true
) when <> (when < >) and re-starting a streaming query from a checkpoint (using the Offset Write-Ahead Log) -
Disabled (
false
) when <> (when < >) and re-starting a streaming query from a checkpoint when the latest offset checkpointed (written) to the offset write-ahead log has been successfully processed and committed to the Offset Commit Log
| readerToDataSourceMap a| [[readerToDataSourceMap]] (Map[MicroBatchReader, (DataSourceV2, Map[String, String])]
)
| sources a| [[sources]] Streaming sources and readers (of the StreamingExecutionRelations of the <
Default: (empty)
sources
is part of the ProgressReporter abstraction.
- Initialized when
MicroBatchExecution
is requested for the <>
Used when:
|===