ContinuousExecution¶
ContinuousExecution
is the stream execution engine of Continuous Stream Processing.
ContinuousExecution
is <StreamingQueryManager
is requested to create a streaming query with a <DataStreamWriter
is requested to start an execution of the streaming query).
ContinuousExecution
can only run streaming queries with StreamingRelationV2 leaf logical operators with ContinuousReadSupport data source.
[[sources]] ContinuousExecution
supports one <ContinuousExecution
simply gives the <
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
.readStream
.format("rate")
.load
.writeStream
.format("console")
.option("truncate", false)
.trigger(Trigger.Continuous(1.minute)) // <-- Gives ContinuousExecution
.queryName("rate2console")
.start
import org.apache.spark.sql.streaming.StreamingQuery
assert(sq.isInstanceOf[StreamingQuery])
// The following gives access to the internals
// And to ContinuousExecution
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val engine = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
import org.apache.spark.sql.execution.streaming.StreamExecution
assert(engine.isInstanceOf[StreamExecution])
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
val continuousEngine = engine.asInstanceOf[ContinuousExecution]
assert(continuousEngine.trigger == Trigger.Continuous(1.minute))
When <ContinuousExecution
is given the <
Note
ContinuousExecution
uses the same instance of ContinuousExecutionRelation
for the same instances of StreamingRelationV2 with ContinuousReadSupport data source.
When requested to <ContinuousExecution
collects ContinuousReadSupport data sources (inside ContinuousExecutionRelation) from the <ContinuousReadSupport
to create a ContinuousReader (that are stored in <
[[EPOCH_COORDINATOR_ID_KEY]] ContinuousExecution
uses __epoch_coordinator_id local property for...FIXME
[[START_EPOCH_KEY]] ContinuousExecution
uses __continuous_start_epoch local property for...FIXME
[[EPOCH_INTERVAL_KEY]] ContinuousExecution
uses __continuous_epoch_interval local property for...FIXME
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution=ALL
Refer to <>.¶
TriggerExecutor¶
TriggerExecutor for the Trigger:
ProcessingTimeExecutor
for ContinuousTrigger
Used when...FIXME
Note
StreamExecution
throws an IllegalStateException
when the Trigger is not a ContinuousTrigger.
=== [[runActivatedStream]] Running Activated Streaming Query -- runActivatedStream
Method
[source, scala]¶
runActivatedStream(sparkSessionForStream: SparkSession): Unit¶
runActivatedStream
simply runs the streaming query in continuous mode as long as the state is ACTIVE
.
runActivatedStream
is part of StreamExecution abstraction.
Running Streaming Query in Continuous Mode¶
runContinuous(
sparkSessionForQuery: SparkSession): Unit
runContinuous
initializes the <
runContinuous
initializes the uniqueSources internal registry to be the <
runContinuous
<
runContinuous
transforms the <runContinuous
finds the corresponding <runContinuous
creates a StreamingDataSourceV2Relation (with the read schema of the ContinuousReader
and the ContinuousReader
itself).
runContinuous
rewires the transformed plan (with the StreamingDataSourceV2Relation
) to use the new attributes from the source (the reader).
Important
CurrentTimestamp
and CurrentDate
expressions are not supported for continuous processing.
runContinuous
...FIXME
runContinuous
finds the only ContinuousReader (of the only StreamingDataSourceV2Relation
) in the query plan with the WriteToContinuousDataSource
.
In queryPlanning time-tracking section, runContinuous
creates an IncrementalExecution (that becomes the lastExecution) that is immediately executed (the entire query execution pipeline is executed up to and including executedPlan).
runContinuous
sets the following local properties:
-
__is_continuous_processing as
true
-
<
> as the currentBatchId -
<
> as the < >, i.e. runId followed by --
with a random UUID -
<
> as the interval of the ContinuousTrigger
runContinuous
uses the EpochCoordinatorRef
helper to <
NOTE: The <
runContinuous
creates a daemon <
[[runContinuous-runContinuous]] In runContinuous time-tracking section, runContinuous
requests the physical query plan (of the IncrementalExecution) to execute (that simply requests the physical operator to doExecute
and generate an RDD[InternalRow]
).
runContinuous
is used when ContinuousExecution
is requested to <
==== [[runContinuous-epoch-update-thread]] Epoch Update Thread
runContinuous
creates an epoch update thread that...FIXME
==== [[getStartOffsets]] Getting Start Offsets From Checkpoint -- getStartOffsets
Internal Method
[source, scala]¶
getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq¶
getStartOffsets
...FIXME
NOTE: getStartOffsets
is used exclusively when ContinuousExecution
is requested to <
=== [[commit]] Committing Epoch -- commit
Method
[source, scala]¶
commit(epoch: Long): Unit¶
In essence, commit
adds the given epoch to commit log and the committedOffsets, and requests the <commit
removes old log entries from the offset and commit logs (to keep spark.sql.streaming.minBatchesToRetain entries only).
Internally, commit
recordTriggerOffsets (with the from and to offsets as the committedOffsets and availableOffsets, respectively).
At this point, commit
may simply return when the stream execution thread is no longer alive (died).
commit
requests the commit log to store a metadata for the epoch.
commit
requests the single <
commit
adds the single <
commit
requests the single <
commit
requests the offset and commit logs to remove log entries to keep spark.sql.streaming.minBatchesToRetain only.
commit
then acquires the awaitProgressLock, wakes up all threads waiting for the awaitProgressLockCondition and in the end releases the awaitProgressLock.
NOTE: commit
supports only one continuous source (registered in the <
commit
asserts that the given epoch is available in the offsetLog internal registry (i.e. the offset for the given epoch has been reported before).
commit
is used when EpochCoordinator
is requested to commitEpoch.
=== [[addOffset]] addOffset
Method
[source, scala]¶
addOffset( epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit
In essense, addOffset
requests the given <PartitionOffsets
) and then requests the OffsetSeqLog to register the offset with the given epoch.
Internally, addOffset
requests the given <PartitionOffsets
) and to get the current "global" offset back.
addOffset
then requests the OffsetSeqLog to add the current "global" offset for the given epoch
.
addOffset
requests the OffsetSeqLog for the offset at the previous epoch.
If the offsets at the current and previous epochs are the same, addOffset
turns the noNewData internal flag on.
addOffset
then acquires the awaitProgressLock, wakes up all threads waiting for the awaitProgressLockCondition and in the end releases the awaitProgressLock.
NOTE: addOffset
supports exactly one <
addOffset
is used when EpochCoordinator
is requested to <
Analyzed Logical Plan of Streaming Query¶
logicalPlan: LogicalPlan
logicalPlan
resolves StreamingRelationV2 leaf logical operators (with a ContinuousReadSupport source) to ContinuousExecutionRelation leaf logical operators.
Internally, logicalPlan
transforms the <
. For every StreamingRelationV2 leaf logical operator with a ContinuousReadSupport source, logicalPlan
looks it up for the corresponding ContinuousExecutionRelation (if available in the internal lookup registry) or creates a ContinuousExecutionRelation
(with the ContinuousReadSupport
source, the options and the output attributes of the StreamingRelationV2
operator)
. For any other StreamingRelationV2
, logicalPlan
throws an UnsupportedOperationException
: +
Data source [name] does not support continuous processing.
logicalPlan
is part of the StreamExecution abstraction.
Creating Instance¶
ContinuousExecution
takes the following when created:
- [[sparkSession]]
SparkSession
- [[name]] The name of the structured query
- [[checkpointRoot]] Path to the checkpoint directory (aka metadata directory)
- [[analyzedPlan]] Analyzed logical query plan (
LogicalPlan
) - [[trigger]] Trigger
- [[triggerClock]]
Clock
- [[outputMode]] OutputMode
- [[extraOptions]] Options (
Map[String, String]
) - [[deleteCheckpointOnStop]]
deleteCheckpointOnStop
flag to control whether to delete the checkpoint directory on stop
=== [[stop]] Stopping Stream Processing (Execution of Streaming Query) -- stop
Method
[source, scala]¶
stop(): Unit¶
NOTE: stop
is part of the <
stop
transitions the streaming query to TERMINATED
state.
If the queryExecutionThread is alive (i.e. it has been started and has not yet died), stop
interrupts it and waits for this thread to die.
In the end, stop
prints out the following INFO message to the logs:
Query [prettyIdString] was stopped
Note
prettyIdString is in the format of queryName [id = [id], runId = [runId]]
.
=== [[awaitEpoch]] awaitEpoch
Internal Method
[source, scala]¶
awaitEpoch(epoch: Long): Unit¶
awaitEpoch
...FIXME
NOTE: awaitEpoch
seems to be used exclusively in tests.
=== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| continuousSources a| [[continuousSources]]
[source, scala]¶
continuousSources: Seq[ContinuousReader]¶
Registry of <
As asserted in <ContinuousReaders
registered.
Used when ContinuousExecution
is requested to <
Use <
| currentEpochCoordinatorId | [[currentEpochCoordinatorId]] FIXME
Used when...FIXME |===