Skip to content

StreamExecution

StreamExecution is an abstraction of stream execution engines (streaming query processing engines) that can run a structured query (on a stream execution thread).

Creating Instance of StreamExecution

Note

Continuous query, streaming query, continuous Dataset, streaming Dataset are all considered high-level synonyms for an executable entity that stream execution engines run using the analyzed logical plan internally.

Important

StreamExecution does not support adaptive query execution and cost-based optimizer (and turns them off when requested to run stream processing).

StreamExecution is the execution environment of a streaming query that is executed every trigger and in the end adds the results to a sink.

StreamExecution corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.minutes)).
  start
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery

// Pull out StreamExecution off StreamingQueryWrapper
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution

Contract

Logical Plan

logicalPlan: LogicalPlan

Analyzed logical plan of the streaming query to execute

Used when StreamExecution is requested to run stream processing

logicalPlan is part of the ProgressReporter abstraction.

Running Activated Streaming Query

runActivatedStream(
  sparkSessionForStream: SparkSession): Unit

Executes (runs) the activated streaming query (that is described by the logical plan)

Used when StreamExecution is requested to run the streaming query (when transitioning from INITIALIZING to ACTIVE state)

Implementations

Creating Instance

StreamExecution takes the following to be created:

  • SparkSession
  • Name of the streaming query (can be null)
  • Path of the checkpoint directory (metadata directory)
  • Streaming query (not used due to logicalPlan)
  • Table (Spark SQL)
  • Trigger
  • Clock
  • OutputMode
  • deleteCheckpointOnStop flag (whether to delete the checkpoint directory on stop)
Abstract Class

StreamExecution is an abstract class and cannot be created directly. It is created indirectly for the concrete StreamExecutions.

Demo

import org.apache.spark.sql.streaming.StreamingQuery
assert(sq.isInstanceOf[StreamingQuery])

import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val se = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery

scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution

Configuration Properties

s.s.s.minBatchesToRetain

StreamExecution uses the spark.sql.streaming.minBatchesToRetain configuration property to allow the StreamExecutions to discard old log entries (from the offset and commit logs).

s.s.s.pollingDelay

StreamExecution uses spark.sql.streaming.pollingDelay configuration property to control how long to delay polling for new data (when no data was available to process in a batch).

ProgressReporter

StreamExecution is a ProgressReporter and reports status of the streaming query (when it starts, progresses and terminates) by posting StreamingQueryListener events.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .text("server-logs")
  .writeStream
  .format("console")
  .queryName("debug")
  .trigger(Trigger.ProcessingTime(20.seconds))
  .start

// Enable the log level to see the INFO and DEBUG messages
// log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG

17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query.
17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms
17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {}
17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms
17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370",
  "runId" : "920b227e-6d02-4a03-a271-c62120258cea",
  "name" : "debug",
  "timestamp" : "2017-06-18T19:21:07.693Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 5,
    "triggerExecution" : 9
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a"
  }
}
17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation
17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())

Unique Streaming Sources

StreamExecution tracks unique streaming data sources in uniqueSources internal registry.

StreamExecution's uniqueSources Registry of Streaming Data Sources

Used when StreamExecution:

Streaming Query Identifiers

The name, id and runId are all unique across all active queries (in a StreamingQueryManager). The difference is that:

  • name is optional and user-defined

  • id is a UUID that is auto-generated at the time StreamExecution is created and persisted to metadata checkpoint file

  • runId is a UUID that is auto-generated every time StreamExecution is created

Id

StreamExecution is uniquely identified by an ID of the streaming query (which is the id of the StreamMetadata).

Since the StreamMetadata is persisted (to the metadata file in the checkpoint directory), the streaming query ID "survives" query restarts as long as the checkpoint directory is preserved.

Run Id

StreamExecution is uniquely identified by a run ID of the streaming query. A run ID is a randomly-generated 128-bit universally unique identifier (UUID) that is assigned at the time StreamExecution is created.

runId does not "survive" query restarts and will always be different yet unique (across all active queries).

StreamMetadata

StreamExecution uses a StreamMetadata that is persisted in the metadata file in the checkpoint directory.

If the metadata file is available it is read and is the way to recover the id of a streaming query when resumed (i.e. restarted after a failure or a planned stop).

Metadata Logs

Write-Ahead Offset Log

offsetLog: OffsetSeqLog

offsetLog is a Hadoop DFS-based metadata storage (of OffsetSeqs) with offsets metadata directory.

offsetLog is used as a Write-Ahead Log of Offsets to persist offsets of the data about to be processed in every trigger.

Tip

Monitor offsets and commits metadata logs to know the progress of a streaming query.

The number of entries in the OffsetSeqLog is controlled using spark.sql.streaming.minBatchesToRetain configuration property.

offsetLog is used when:

Offset Commit Log

StreamExecution uses offset commit log (CommitLog with commits metadata checkpoint directory) for streaming batches successfully executed (with a single file per batch with a file name being the batch id) or committed epochs.

Note

Metadata log or metadata checkpoint are synonyms and are often used interchangeably.

commitLog is used by the <> for the following:

  • MicroBatchExecution is requested to <> (that in turn requests to <> at the very beginning of the streaming query execution and later regularly every <>)

  • ContinuousExecution is requested to <> (that in turn requests to <> at the very beginning of the streaming query execution and later regularly every <>)

State of Streaming Query

state: AtomicReference[State]

state indicates the internal state of execution of the streaming query (as java.util.concurrent.atomic.AtomicReference).

ACTIVE

StreamExecution has been requested to <> (and is about to <>)

INITIALIZING

StreamExecution has been created.

TERMINATED

Indicates that:

RECONFIGURING

Used when ContinuousExecution is requested to run a streaming query in continuous mode (and the ContinuousReader indicated a need for reconfiguration)

Creating StreamingWrite

createStreamingWrite(
  table: SupportsWrite,
  options: Map[String, String],
  inputPlan: LogicalPlan): StreamingWrite

createStreamingWrite creates a LogicalWriteInfoImpl (with the query ID, the schema of the input LogicalPlan and the given options).

createStreamingWrite requests the given SupportsWrite table for a WriteBuilder (for the LogicalWriteInfoImpl).

Tip

Learn more about SupportsWrite and WriteBuilder in The Internals of Spark SQL online book.

createStreamingWrite branches based on the OutputMode:

  • For Append output mode, createStreamingWrite requests the WriteBuilder to build a StreamingWrite.

  • For Complete output mode, createStreamingWrite assumes that the WriteBuilder is a SupportsTruncate and requests it to truncate followed by buildForStreaming

  • For Update output mode, createStreamingWrite assumes that the WriteBuilder is a SupportsStreamingUpdate and requests it to update followed by buildForStreaming

Tip

Learn more about SupportsTruncate and SupportsStreamingUpdate in The Internals of Spark SQL online book.

createStreamingWrite is used when MicroBatchExecution and ContinuousExecution stream execution engines are requested for analyzed logical plans.

Available Offsets (StreamProgress)

availableOffsets: StreamProgress

availableOffsets is a registry of offsets per streaming source to track what data (by offset) is available for processing for every streaming source in the streaming query (and have not yet been committed).

availableOffsets works in tandem with the committedOffsets internal registry.

availableOffsets is empty when StreamExecution is created (i.e. no offsets are reported for any streaming source in the streaming query).

availableOffsets is used when:

Committed Offsets (StreamProgress)

committedOffsets: StreamProgress

committedOffsets is a registry of offsets per streaming source to track what data (by offset) has already been processed and committed (to the sink or state stores) for every streaming source in the streaming query.

committedOffsets works in tandem with the availableOffsets internal registry.

committedOffsets is used when:

  • MicroBatchExecution stream execution engine is requested for the <>, to <> and <>
  • ContinuousExecution stream execution engine is requested for the <> and to <>
  • StreamExecution is requested for the internal string representation

Fully-Qualified (Resolved) Path to Checkpoint Root Directory

resolvedCheckpointRoot: String

resolvedCheckpointRoot is a fully-qualified path of the given checkpoint root directory.

The given checkpoint root directory is defined using checkpointLocation option or the spark.sql.streaming.checkpointLocation configuration property with queryName option.

checkpointLocation and queryName options are defined when StreamingQueryManager is requested to create a streaming query.

resolvedCheckpointRoot is used when creating the path to the checkpoint directory and when StreamExecution finishes running streaming batches.

resolvedCheckpointRoot is used for the logicalPlan (while transforming analyzedPlan and planning StreamingRelation logical operators to corresponding StreamingExecutionRelation physical operators with the streaming data sources created passing in the path to sources directory to store checkpointing metadata).

resolvedCheckpointRoot is printed out immediately when resolved as a INFO message to the logs:

Checkpoint root [checkpointRoot] resolved to [resolvedCheckpointRoot].

resolvedCheckpointRoot is printed out again as a INFO message to the logs when StreamExecution is started:

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

StreamWriterCommitProgress

sinkCommitProgress: Option[StreamWriterCommitProgress]

sinkCommitProgress is part of the ProgressReporter abstraction.

StreamExecution initializes sinkCommitProgress registry to be None when created.

Last Query Execution Of Streaming Query (IncrementalExecution)

lastExecution: IncrementalExecution

lastExecution is part of the ProgressReporter abstraction.

lastExecution is a IncrementalExecution (a QueryExecution of a streaming query) of the most recent (last) execution.

lastExecution is created when the <> are requested for the following:

  • MicroBatchExecution is requested to <> (when in <>)

  • ContinuousExecution stream execution engine is requested to <> (when in <>)

lastExecution is used when:

Explaining Streaming Query

explain(): Unit // <1>
explain(extended: Boolean): Unit
<1> Turns the extended flag off (false)

explain simply prints out <> to the standard output.

Stopping Streaming Sources and Readers

stopSources(): Unit

stopSources requests every streaming source to stop.

In case of an non-fatal exception, stopSources prints out the following WARN message to the logs:

Failed to stop streaming source: [source]. Resources may have leaked.

stopSources is used when:

  • StreamExecution is requested to <> (and <> successfully or not)

  • ContinuousExecution is requested to <> (and terminates)

Running Stream Processing

runStream(): Unit

runStream simply prepares the environment to execute the activated streaming query.

runStream is used when the stream execution thread is requested to start (when DataStreamWriter is requested to start an execution of the streaming query).

Internally, runStream sets the job group (to all the Spark jobs started by this thread) as follows:

Note

runStream uses the SparkSession to access SparkContext and assign the job group id.

Learn more about SparkContext.setJobGroup method in The Internals of Apache Spark online book.

runStream sets sql.streaming.queryId local property to id.

runStream requests the MetricsSystem to register the MetricsReporter when spark.sql.streaming.metricsEnabled configuration property is enabled.

runStream notifies StreamingQueryListeners that the streaming query has been started (by posting a new QueryStartedEvent event with id, runId, and name).

StreamingQueryListener Notified about Query's Start (onQueryStarted)

runStream unblocks the main starting thread (by decrementing the count of the startLatch that when 0 lets the starting thread continue).

runStream updates the status message to be Initializing sources.

runStream initializes the analyzed logical plan.

Lazy Value

The analyzed logical plan is a Scala lazy value to guarantee that the code to initialize it is executed once only (when accessed for the first time) and cached afterwards.

runStream disables adaptive query execution and cost-based join optimization (by turning spark.sql.adaptive.enabled and spark.sql.cbo.enabled configuration properties off, respectively).

runStream creates a new "zero" OffsetSeqMetadata.

(when in INITIALIZING state) runStream enters ACTIVE state:

Note

runBatches does the main work only when first started (when in INITIALIZING state).

runStream...FIXME (describe the failed and stop states)

Once TriggerExecutor has finished executing batches, runBatches updates the status message to Stopped.

NOTE: TriggerExecutor finishes executing batches when the batch runner returns whether the streaming query is stopped or not (while active).

finally Block

runStream releases the startLatch and initializationLatch latches.

runStream stopSources.

runStream enters TERMINATED state.

runStream sets the StreamingQueryStatus with the isTriggerActive and isDataAvailable flags off (false).

runStream removes the stream metrics reporter from the application's MetricsSystem.

runStream requests the StreamingQueryManager to handle termination of a streaming query.

runStream creates a new QueryTerminatedEvent (with the id and run id of the streaming query) and posts it.

With the deleteCheckpointOnStop flag enabled and no StreamingQueryException, runStream deletes the checkpoint directory.

In the end, runStream releases the terminationLatch latch.

TriggerExecutor's Batch Runner

Batch Runner (batchRunner) is an executable block executed by TriggerExecutor in runBatches.

batchRunner starts trigger calculation.

As long as the query is not stopped (i.e. state is not TERMINATED), batchRunner executes the streaming batch for the trigger.

In triggerExecution time-tracking section, runBatches branches off per currentBatchId:

If there is data available in the sources, batchRunner marks currentStatus with isDataAvailable enabled.

Tip

You can check out the status of a streaming query using status method.

scala> spark.streams.active(0).status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Waiting for next trigger",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}

batchRunner then updates the status message to Processing new data and runs the current streaming batch.

StreamExecution's Running Batches (on Execution Thread)

After triggerExecution section has finished, batchRunner finishes the streaming batch for the trigger (and collects query execution statistics).

When there was <> in the sources, batchRunner updates committed offsets (by adding the <> to BatchCommitLog and adding availableOffsets to committedOffsets).

batchRunner prints out the following DEBUG message to the logs:

batch [currentBatchId] committed

batchRunner increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.

When no data was available in the sources to process, batchRunner does the following:

  1. Marks currentStatus with isDataAvailable disabled

  2. Updates the status message to Waiting for data to arrive

  3. Sleeps the current thread for pollingDelayMs milliseconds.

batchRunner updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)

Starting Streaming Query (on Stream Execution Thread)

start(): Unit

start starts a stream execution thread that simply runs stream processing (and hence the streaming query).

StreamExecution's Starting Streaming Query (on Execution Thread)

start prints out the following INFO message to the logs:

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

start then starts the <> (as a daemon thread).

NOTE: start uses Java's ++https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#start--++[java.lang.Thread.start] to run the streaming query on a separate execution thread.

NOTE: When started, a streaming query runs in its own execution thread on JVM.

In the end, start pauses the main thread (using the <> until StreamExecution is requested to <> that in turn sends a QueryStartedEvent to all streaming listeners followed by decrementing the count of the <>).

start is used when StreamingQueryManager is requested to start a streaming query (when DataStreamWriter is requested to start an execution of the streaming query).

Path to Checkpoint Directory

checkpointFile(
  name: String): String

checkpointFile gives the path of a directory with name in checkpoint directory.

checkpointFile is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).

Posting StreamingQueryListener Event

postEvent(
  event: StreamingQueryListener.Event): Unit

postEvent is a part of the ProgressReporter abstraction.

postEvent simply requests the StreamingQueryManager to post the input event (to the StreamingQueryListenerBus in the current SparkSession).

Note

postEvent uses SparkSession to access the current StreamingQueryManager.

postEvent is used when:

Waiting Until No New Data Available in Sources or Query Has Been Terminated

processAllAvailable(): Unit

processAllAvailable is a part of the StreamingQuery abstraction.

processAllAvailable reports the <> if reported (and returns immediately).

NOTE: <> is reported exclusively when StreamExecution is requested to <> (that terminated with an exception).

processAllAvailable returns immediately when StreamExecution is no longer <> (in TERMINATED state).

processAllAvailable acquires a lock on the <> and turns the <> internal flag off (false).

processAllAvailable keeps polling with 10-second pauses (locked on <>) until <> flag is turned on (true) or StreamExecution is no longer <> (in TERMINATED state).

NOTE: The 10-second pause is hardcoded and cannot be changed.

In the end, processAllAvailable releases <> lock.

processAllAvailable throws an IllegalStateException when executed on the <>:

Cannot wait for a query state from the same thread that is running the query

Stream Execution Thread

queryExecutionThread: QueryExecutionThread

queryExecutionThread is a Java thread of execution (java.util.Thread) that runs a streaming query.

queryExecutionThread is started (as a daemon thread) when StreamExecution is requested to <>. At that time, start prints out the following INFO message to the logs (with the <> and the <>):

Starting [prettyIdString]. Use [resolvedCheckpointRoot] to store the query checkpoint.

When started, queryExecutionThread sets the <> and <>.

queryExecutionThread uses the name stream execution thread for [id] (that uses <> for the id, i.e. queryName [id = [id], runId = [runId]]).

queryExecutionThread is a QueryExecutionThread that is a custom UninterruptibleThread from Apache Spark with runUninterruptibly method for running a block of code without being interrupted by Thread.interrupt().

Tip

Use Java's jconsole or jstack to monitor stream execution threads.

$ jstack <driver-pid> | grep -e "stream execution thread"
"stream execution thread for kafka-topic1 [id =...

Current Batch Metadata (Event-Time Watermark and Timestamp)

offsetSeqMetadata: OffsetSeqMetadata

offsetSeqMetadata is a OffsetSeqMetadata.

offsetSeqMetadata is used to create an IncrementalExecution in the queryPlanning phase of the MicroBatchExecution and ContinuousExecution execution engines.

offsetSeqMetadata is initialized (with 0 for batchWatermarkMs and batchTimestampMs) when StreamExecution is requested to <>.

offsetSeqMetadata is then updated (with the current event-time watermark and timestamp) when MicroBatchExecution is requested to <>.

NOTE: MicroBatchExecution uses the <> for the current event-time watermark and the <> for the current batch timestamp.

offsetSeqMetadata is stored (checkpointed) in <> of MicroBatchExecution (and printed out as INFO message to the logs).

offsetSeqMetadata is restored (re-created) from a checkpointed state when MicroBatchExecution is requested to <>.

offsetSeqMetadata is part of the ProgressReporter abstraction.

isActive

isActive: Boolean

isActive is part of the StreamingQuery abstraction.

isActive is enabled (true) as long as the State is not TERMINATED.

Human-Readable HTML Description of Spark Jobs (for web UI)

getBatchDescriptionString: String

getBatchDescriptionString is a human-readable description (in HTML format) that uses the optional name if defined, the <>, the <> and batchDescription that can be init (for the <> negative) or the current batch ID itself.

getBatchDescriptionString is of the following format:

[name]
id = [id]
runId = [runId]
batch = [batchDescription]

Monitoring Streaming Query using web UI (Spark Jobs)

getBatchDescriptionString is used when:

  • MicroBatchExecution stream execution engine is requested to <> (as the job description of any Spark jobs triggerred as part of query execution)

  • StreamExecution is requested to <> (as the job group description of any Spark jobs triggerred as part of query execution)

No New Data Available

noNewData: Boolean

noNewData is a flag that indicates that a batch has completed with no new data left and processAllAvailable could stop waiting till all streaming data is processed.

Default: false

Turned on (true) when:

  • MicroBatchExecution stream execution engine is requested to <> (while <>)

  • ContinuousExecution stream execution engine is requested to <>

Turned off (false) when:

  • MicroBatchExecution stream execution engine is requested to <> (right after the <> phase)

  • StreamExecution is requested to <>

Current Batch ID

newData Registry

newData: Map[BaseStreamingSource, LogicalPlan]

Registry of the streaming sources (in the logical query plan) that have new data available in the current batch. The new data is a streaming DataFrame.

newData is part of the ProgressReporter abstraction.

Set when StreamExecution is requested to requests unprocessed data from streaming sources (while running a single streaming batch)

Used when StreamExecution is requested to transform the logical plan (of the streaming query) to include the Sources and the MicroBatchReaders with new data (while running a single streaming batch)

Streaming Metrics

StreamExecution uses MetricsReporter for reporting streaming metrics.

MetricsReporter is created with the following source name (with name if defined or id):

spark.streaming.[name or id]

MetricsReporter is registered only when spark.sql.streaming.metricsEnabled configuration property is enabled (when StreamExecution is requested to runStream).

MetricsReporter is deactivated (removed) when a streaming query is stopped (when StreamExecution is requested to runStream).

Latches

StreamExecution uses java.util.concurrent.CountDownLatches (with count 1).

initializationLatch

Counted down when requested to runStream:

Awaited for tests only (which seems to indicate that it is a test-only latch)

startLatch

Counted down when requested to runStream:

Awaited when requested to start (to pause the main thread until StreamExecution was requested to run the streaming query on a separate thread)

terminationLatch

Counted down at the end of runStream

Awaited when requested to awaitTermination (that pauses the thread until the streaming query has finished successfully or not).

Locks

awaitProgressLock

StreamExecution uses a fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention)

__is_continuous_processing Local Property

StreamExecution uses __is_continuous_processing local property (default: false) to differentiate between <> (true) and <> (false) which is used when StateStoreRDD is requested to compute a partition (and finds a StateStore for a given version).

Logging

Enable ALL logging level for org.apache.spark.sql.execution.streaming.StreamExecution logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=ALL

Refer to Logging.


Last update: 2021-02-07