Skip to content

ContinuousExecution

ContinuousExecution is the stream execution engine of Continuous Stream Processing.

ContinuousExecution is <> when StreamingQueryManager is requested to create a streaming query with a <> and a <> (when DataStreamWriter is requested to start an execution of the streaming query).

ContinuousExecution can only run streaming queries with StreamingRelationV2 leaf logical operators with ContinuousReadSupport data source.

[[sources]] ContinuousExecution supports one <> only in a <> (and asserts it when <> and <>). When requested for available streaming sources, ContinuousExecution simply gives the <>.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.Continuous(1.minute)) // <-- Gives ContinuousExecution
  .queryName("rate2console")
  .start

import org.apache.spark.sql.streaming.StreamingQuery
assert(sq.isInstanceOf[StreamingQuery])

// The following gives access to the internals
// And to ContinuousExecution
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val engine = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
import org.apache.spark.sql.execution.streaming.StreamExecution
assert(engine.isInstanceOf[StreamExecution])

import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
val continuousEngine = engine.asInstanceOf[ContinuousExecution]
assert(continuousEngine.trigger == Trigger.Continuous(1.minute))

When <> (for a streaming query), ContinuousExecution is given the <>. The analyzed logical plan is immediately transformed to include a ContinuousExecutionRelation for every StreamingRelationV2 leaf logical operator with ContinuousReadSupport data source (and is the logical plan internally).

Note

ContinuousExecution uses the same instance of ContinuousExecutionRelation for the same instances of StreamingRelationV2 with ContinuousReadSupport data source.

When requested to <>, ContinuousExecution collects ContinuousReadSupport data sources (inside ContinuousExecutionRelation) from the <> and requests each and every ContinuousReadSupport to create a ContinuousReader (that are stored in <> internal registry).

[[EPOCH_COORDINATOR_ID_KEY]] ContinuousExecution uses __epoch_coordinator_id local property for...FIXME

[[START_EPOCH_KEY]] ContinuousExecution uses __continuous_start_epoch local property for...FIXME

[[EPOCH_INTERVAL_KEY]] ContinuousExecution uses __continuous_epoch_interval local property for...FIXME

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution=ALL

Refer to <>.

TriggerExecutor

TriggerExecutor for the Trigger:

Used when...FIXME

Note

StreamExecution throws an IllegalStateException when the Trigger is not a ContinuousTrigger.

=== [[runActivatedStream]] Running Activated Streaming Query -- runActivatedStream Method

[source, scala]

runActivatedStream(sparkSessionForStream: SparkSession): Unit

runActivatedStream simply runs the streaming query in continuous mode as long as the state is ACTIVE.

runActivatedStream is part of StreamExecution abstraction.

Running Streaming Query in Continuous Mode

runContinuous(
  sparkSessionForQuery: SparkSession): Unit

runContinuous initializes the <> internal registry by traversing the <> to find ContinuousExecutionRelation leaf logical operators and requests their ContinuousReadSupport data sources to create a ContinuousReader (with the sources metadata directory under the checkpoint directory).

runContinuous initializes the uniqueSources internal registry to be the <> distinct.

runContinuous <> (they may or may not be available).

runContinuous transforms the <>. For every ContinuousExecutionRelation runContinuous finds the corresponding <> (in the <>), requests it to <> (from their JSON representation), and then <>. In the end, runContinuous creates a StreamingDataSourceV2Relation (with the read schema of the ContinuousReader and the ContinuousReader itself).

runContinuous rewires the transformed plan (with the StreamingDataSourceV2Relation) to use the new attributes from the source (the reader).

Important

CurrentTimestamp and CurrentDate expressions are not supported for continuous processing.

runContinuous...FIXME

runContinuous finds the only ContinuousReader (of the only StreamingDataSourceV2Relation) in the query plan with the WriteToContinuousDataSource.

In queryPlanning time-tracking section, runContinuous creates an IncrementalExecution (that becomes the lastExecution) that is immediately executed (the entire query execution pipeline is executed up to and including executedPlan).

runContinuous sets the following local properties:

runContinuous uses the EpochCoordinatorRef helper to <> (with the <>, the <>, and the currentBatchId).

NOTE: The <> runs on the driver as the single point to coordinate epochs across partition tasks.

runContinuous creates a daemon <> and starts it immediately.

[[runContinuous-runContinuous]] In runContinuous time-tracking section, runContinuous requests the physical query plan (of the IncrementalExecution) to execute (that simply requests the physical operator to doExecute and generate an RDD[InternalRow]).

runContinuous is used when ContinuousExecution is requested to <>.

==== [[runContinuous-epoch-update-thread]] Epoch Update Thread

runContinuous creates an epoch update thread that...FIXME

==== [[getStartOffsets]] Getting Start Offsets From Checkpoint -- getStartOffsets Internal Method

[source, scala]

getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq

getStartOffsets...FIXME

NOTE: getStartOffsets is used exclusively when ContinuousExecution is requested to <>.

=== [[commit]] Committing Epoch -- commit Method

[source, scala]

commit(epoch: Long): Unit

In essence, commit adds the given epoch to commit log and the committedOffsets, and requests the <> to <>. In the end, commit removes old log entries from the offset and commit logs (to keep spark.sql.streaming.minBatchesToRetain entries only).

Internally, commit recordTriggerOffsets (with the from and to offsets as the committedOffsets and availableOffsets, respectively).

At this point, commit may simply return when the stream execution thread is no longer alive (died).

commit requests the commit log to store a metadata for the epoch.

commit requests the single <> to <> for the epoch (from the offset write-ahead log).

commit adds the single <> and the offset (for the epoch) to the committedOffsets registry.

commit requests the single <> to <>.

commit requests the offset and commit logs to remove log entries to keep spark.sql.streaming.minBatchesToRetain only.

commit then acquires the awaitProgressLock, wakes up all threads waiting for the awaitProgressLockCondition and in the end releases the awaitProgressLock.

NOTE: commit supports only one continuous source (registered in the <> internal registry).

commit asserts that the given epoch is available in the offsetLog internal registry (i.e. the offset for the given epoch has been reported before).

commit is used when EpochCoordinator is requested to commitEpoch.

=== [[addOffset]] addOffset Method

[source, scala]

addOffset( epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit


In essense, addOffset requests the given <> to <> (with the given PartitionOffsets) and then requests the OffsetSeqLog to register the offset with the given epoch.

ContinuousExecution.addOffset

Internally, addOffset requests the given <> to <> (with the given PartitionOffsets) and to get the current "global" offset back.

addOffset then requests the OffsetSeqLog to add the current "global" offset for the given epoch.

addOffset requests the OffsetSeqLog for the offset at the previous epoch.

If the offsets at the current and previous epochs are the same, addOffset turns the noNewData internal flag on.

addOffset then acquires the awaitProgressLock, wakes up all threads waiting for the awaitProgressLockCondition and in the end releases the awaitProgressLock.

NOTE: addOffset supports exactly one <>.

addOffset is used when EpochCoordinator is requested to <>.

Analyzed Logical Plan of Streaming Query

logicalPlan: LogicalPlan

logicalPlan resolves StreamingRelationV2 leaf logical operators (with a ContinuousReadSupport source) to ContinuousExecutionRelation leaf logical operators.

Internally, logicalPlan transforms the <> as follows:

. For every StreamingRelationV2 leaf logical operator with a ContinuousReadSupport source, logicalPlan looks it up for the corresponding ContinuousExecutionRelation (if available in the internal lookup registry) or creates a ContinuousExecutionRelation (with the ContinuousReadSupport source, the options and the output attributes of the StreamingRelationV2 operator)

. For any other StreamingRelationV2, logicalPlan throws an UnsupportedOperationException: +

Data source [name] does not support continuous processing.

logicalPlan is part of the StreamExecution abstraction.

Creating Instance

ContinuousExecution takes the following when created:

  • [[sparkSession]] SparkSession
  • [[name]] The name of the structured query
  • [[checkpointRoot]] Path to the checkpoint directory (aka metadata directory)
  • [[analyzedPlan]] Analyzed logical query plan (LogicalPlan)
  • [[trigger]] Trigger
  • [[triggerClock]] Clock
  • [[outputMode]] OutputMode
  • [[extraOptions]] Options (Map[String, String])
  • [[deleteCheckpointOnStop]] deleteCheckpointOnStop flag to control whether to delete the checkpoint directory on stop

=== [[stop]] Stopping Stream Processing (Execution of Streaming Query) -- stop Method

[source, scala]

stop(): Unit

NOTE: stop is part of the <> to stop a streaming query.

stop transitions the streaming query to TERMINATED state.

If the queryExecutionThread is alive (i.e. it has been started and has not yet died), stop interrupts it and waits for this thread to die.

In the end, stop prints out the following INFO message to the logs:

Query [prettyIdString] was stopped

Note

prettyIdString is in the format of queryName [id = [id], runId = [runId]].

=== [[awaitEpoch]] awaitEpoch Internal Method

[source, scala]

awaitEpoch(epoch: Long): Unit

awaitEpoch...FIXME

NOTE: awaitEpoch seems to be used exclusively in tests.

=== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| continuousSources a| [[continuousSources]]

[source, scala]

continuousSources: Seq[ContinuousReader]

Registry of <> (in the <>)

As asserted in <> and <> there could only be exactly one ContinuousReaders registered.

Used when ContinuousExecution is requested to <>, <>, and <>

Use <> to access the current value

| currentEpochCoordinatorId | [[currentEpochCoordinatorId]] FIXME

Used when...FIXME |===