When created (for a streaming query),
ContinuousExecution is given the <
When requested to <
ContinuousExecution collects ContinuousReadSupport data sources (inside ContinuousExecutionRelation) from the <
ContinuousReadSupport to create a ContinuousReader (that are stored in <
ContinuousExecution uses __epoch_coordinator_id local property for...FIXME
ContinuousExecution uses __continuous_start_epoch local property for...FIXME
ContinuousExecution uses __continuous_epoch_interval local property for...FIXME
Running Activated Streaming Query¶
runActivatedStream( sparkSessionForStream: SparkSession): Unit
runActivatedStream is part of StreamExecution abstraction.
Running Streaming Query in Continuous Mode¶
runContinuous( sparkSessionForQuery: SparkSession): Unit
runContinuous initializes the continuousSources internal registry by traversing the analyzed logical plan to find ContinuousExecutionRelation leaf logical operators and requests their ContinuousReadSupport data sources to create a ContinuousReader (with the sources metadata directory under the checkpoint directory).
runContinuous gets the start offsets (they may or may not be available).
runContinuous transforms the analyzed logical plan. For every ContinuousExecutionRelation
runContinuous finds the corresponding ContinuousReader (in the continuousSources), requests it to deserialize the start offsets (from their JSON representation), and then setStartOffset. In the end,
runContinuous creates a StreamingDataSourceV2Relation (with the read schema of the
ContinuousReader and the
runContinuous rewires the transformed plan (with the
StreamingDataSourceV2Relation) to use the new attributes from the source (the reader).
CurrentDate expressions are not supported for continuous processing.
runContinuous finds the only ContinuousReader (of the only
StreamingDataSourceV2Relation) in the query plan with the
In queryPlanning time-tracking section,
runContinuous creates an IncrementalExecution (that becomes the lastExecution) that is immediately executed (the entire query execution pipeline is executed up to and including executedPlan).
runContinuous sets the following local properties:
runContinuous uses the
EpochCoordinatorRef helper to create a remote reference to the EpochCoordinator RPC endpoint (with the ContinuousReader, the currentEpochCoordinatorId, and the currentBatchId).
runContinuous creates a daemon epoch update thread and starts it immediately.
In runContinuous time-tracking section,
runContinuous requests the physical query plan (of the IncrementalExecution) to execute (that simply requests the physical operator to
doExecute and generate an
runContinuous is used when
ContinuousExecution is requested to run an activated streaming query.
==== [[runContinuous-epoch-update-thread]] Epoch Update Thread
runContinuous creates an epoch update thread that...FIXME
==== [[getStartOffsets]] Getting Start Offsets From Checkpoint --
getStartOffsets Internal Method
getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq¶
getStartOffsets is used exclusively when
ContinuousExecution is requested to <
commit( epoch: Long): Unit
commit adds the given epoch to commit log and the committedOffsets, and requests the <
commit removes old log entries from the offset and commit logs (to keep spark.sql.streaming.minBatchesToRetain entries only).
At this point,
commit may simply return when the stream execution thread is no longer alive (died).
commit adds the single <
commit requests the single <
commit supports only one continuous source (registered in the <
commit asserts that the given epoch is available in the offsetLog internal registry (i.e. the offset for the given epoch has been reported before).
commit is used when
EpochCoordinator is requested to commitEpoch.
addOffset( epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit
If the offsets at the current and previous epochs are the same,
addOffset turns the noNewData internal flag on.
addOffset supports exactly one continuous source.
addOffset is used when
EpochCoordinator is requested to handle a ReportPartitionOffset message.
Analyzed Logical Plan of Streaming Query¶
logicalPlan transforms the <
. For every StreamingRelationV2 leaf logical operator with a ContinuousReadSupport source,
logicalPlan looks it up for the corresponding ContinuousExecutionRelation (if available in the internal lookup registry) or creates a
ContinuousExecutionRelation (with the
ContinuousReadSupport source, the options and the output attributes of the
. For any other
logicalPlan throws an
Data source [name] does not support continuous processing.
logicalPlan is part of the StreamExecution abstraction.
ContinuousExecution takes the following when created:
- [[name]] The name of the structured query
- [[checkpointRoot]] Path to the checkpoint directory (aka metadata directory)
- [[analyzedPlan]] Analyzed logical query plan (
- [[trigger]] Trigger
- [[outputMode]] OutputMode
- [[extraOptions]] Options (
deleteCheckpointOnStopflag to control whether to delete the checkpoint directory on stop
ContinuousExecution is created when
StreamingQueryManager is requested to create a streaming query with a StreamWriteSupport sink and a ContinuousTrigger (when
DataStreamWriter is requested to start an execution of the streaming query).
Stopping Stream Processing¶
stop is part of the StreamingQuery abstraction.
stop transitions the streaming query to
If the queryExecutionThread is alive (i.e. it has been started and has not yet died),
stop interrupts it and waits for this thread to die.
In the end,
stop prints out the following INFO message to the logs:
Query [prettyIdString] was stopped
prettyIdString is in the format of
queryName [id = [id], runId = [runId]].
awaitEpoch Internal Method
awaitEpoch(epoch: Long): Unit¶
awaitEpoch seems to be used exclusively in tests.
As asserted in <
ContinuousExecution is requested to <
ContinuousExecution supports one <
ContinuousExecution simply gives the <
import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val sq = spark .readStream .format("rate") .load .writeStream .format("console") .option("truncate", false) .trigger(../Trigger.Continuous(1.minute)) // <-- Gives ContinuousExecution .queryName("rate2console") .start import org.apache.spark.sql.streaming.StreamingQuery assert(sq.isInstanceOf[StreamingQuery]) // The following gives access to the internals // And to ContinuousExecution import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper val engine = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery import org.apache.spark.sql.execution.streaming.StreamExecution assert(engine.isInstanceOf[StreamExecution]) import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution val continuousEngine = engine.asInstanceOf[ContinuousExecution] assert(continuousEngine.trigger == Trigger.Continuous(1.minute))
ALL logging level for
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution logger to see what happens inside.
Add the following line to
Refer to Logging.