Skip to content

Stateful Stream Processing

Stateful Stream Processing is a stream processing with state (implicit or explicit).

In Spark Structured Streaming, a streaming query is stateful when is one of the following (that makes use of StateStores):

Versioned State, StateStores and StateStoreProviders

Spark Structured Streaming uses StateStores for versioned and fault-tolerant key-value state stores.

State stores are checkpointed incrementally to avoid state loss and for increased performance.

State stores are managed by StateStoreProviders with HDFSBackedStateStoreProvider being the default and only known implementation. HDFSBackedStateStoreProvider uses Hadoop DFS-compliant file system for state checkpointing and fault-tolerance.

State store providers manage versioned state per stateful operator (and partition it operates on).

The lifecycle of a StateStoreProvider begins when StateStore utility (on a Spark executor) is requested for the StateStore by provider ID and version.

IMPORTANT: It is worth to notice that since StateStore and StateStoreProvider utilities are Scala objects that makes it possible that there can only be one instance of StateStore and StateStoreProvider on a single JVM. Scala objects are (sort of) singletons which means that there will be exactly one instance of each per JVM and that is exactly the JVM of a Spark executor. As long as the executor is up and running state versions are cached and no Hadoop DFS is used (except for the initial load).

When requested for a StateStore, StateStore utility is given the version of a state store to look up. The version is either the <> (in Continuous Stream Processing) or the current batch ID (in Micro-Batch Stream Processing).

StateStore utility requests StateStoreProvider utility to <> that creates the StateStoreProvider implementation (based on spark.sql.streaming.stateStore.providerClass internal configuration property) and requests it to <>.

The initialized StateStoreProvider is cached in loadedProviders internal lookup table (for a <>) for later lookups.

StateStoreProvider utility then requests the StateStoreProvider for the <>. (e.g. a HDFSBackedStateStore in case of HDFSBackedStateStoreProvider).

An instance of StateStoreProvider is requested to <> or <> (when <>) in <> that runs periodically every spark.sql.streaming.stateStore.maintenanceInterval configuration property.

IncrementalExecution — QueryExecution of Streaming Queries

Regardless of the query language (Dataset API or SQL), any structured query (incl. streaming queries) becomes a logical query plan.

In Spark Structured Streaming it is IncrementalExecution that plans streaming queries for execution.

While planning a streaming query for execution (aka query planning), IncrementalExecution uses the state preparation rule. The rule fills out the following physical operators with the execution-specific configuration (with StatefulOperatorStateInfo being the most important for stateful stream processing):

==== [[IncrementalExecution-shouldRunAnotherBatch]] Micro-Batch Stream Processing and Extra Non-Data Batch for StateStoreWriter Stateful Operators

In <> (with <> engine), IncrementalExecution uses shouldRunAnotherBatch flag that allows StateStoreWriters stateful physical operators to indicate whether the last batch execution requires another non-data batch.

The following StateStoreWriters redefine shouldRunAnotherBatch flag.

[[StateStoreWriters-shouldRunAnotherBatch]] .StateStoreWriters and shouldRunAnotherBatch Flag [cols="30,70",options="header",width="100%"] |=== | StateStoreWriter | shouldRunAnotherBatch Flag

| FlatMapGroupsWithStateExec a| [[shouldRunAnotherBatch-FlatMapGroupsWithStateExec]] Based on GroupStateTimeout

| <> a| [[shouldRunAnotherBatch-StateStoreSaveExec]] Based on <>

| <> a| [[shouldRunAnotherBatch-StreamingDeduplicateExec]] Based on <>

| <> a| [[shouldRunAnotherBatch-StreamingSymmetricHashJoinExec]] Based on <>

|===

StateStoreRDD

Right after query planning, a stateful streaming query (a single micro-batch actually) becomes an RDD with one or more StateStoreRDDs.

You can find the StateStoreRDDs of a streaming query in the RDD lineage.

scala> :type streamingQuery
org.apache.spark.sql.streaming.StreamingQuery

scala> streamingQuery.explain
== Physical Plan ==
*(4) HashAggregate(keys=[window#13-T0ms, value#3L], functions=[count(1)])
+- StateStoreSave [window#13-T0ms, value#3L], state info [ checkpoint = file:/tmp/checkpoint-counts/state, runId = 1dec2d81-f2d0-45b9-8f16-39ede66e13e7, opId = 0, ver = 1, numPartitions = 1], Append, 10000, 2
   +- *(3) HashAggregate(keys=[window#13-T0ms, value#3L], functions=[merge_count(1)])
      +- StateStoreRestore [window#13-T0ms, value#3L], state info [ checkpoint = file:/tmp/checkpoint-counts/state, runId = 1dec2d81-f2d0-45b9-8f16-39ede66e13e7, opId = 0, ver = 1, numPartitions = 1], 2
         +- *(2) HashAggregate(keys=[window#13-T0ms, value#3L], functions=[merge_count(1)])
            +- Exchange hashpartitioning(window#13-T0ms, value#3L, 1)
               +- *(1) HashAggregate(keys=[window#13-T0ms, value#3L], functions=[partial_count(1)])
                  +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(time#2-T0ms, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#13-T0ms, value#3L]
                     +- *(1) Filter isnotnull(time#2-T0ms)
                        +- EventTimeWatermark time#2: timestamp, interval
                           +- LocalTableScan <empty>, [time#2, value#3L]

import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = streamingQuery.asInstanceOf[StreamingQueryWrapper].streamingQuery

scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution

scala> :type se.lastExecution
org.apache.spark.sql.execution.streaming.IncrementalExecution

val rdd = se.lastExecution.toRdd
scala> rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[39] at toRdd at <console>:40 []
 |  StateStoreRDD[38] at toRdd at <console>:40 [] // <-- here
 |  MapPartitionsRDD[37] at toRdd at <console>:40 []
 |  StateStoreRDD[36] at toRdd at <console>:40 [] // <-- here
 |  MapPartitionsRDD[35] at toRdd at <console>:40 []
 |  ShuffledRowRDD[17] at start at <pastie>:67 []
 +-(1) MapPartitionsRDD[16] at start at <pastie>:67 []
    |  MapPartitionsRDD[15] at start at <pastie>:67 []
    |  MapPartitionsRDD[14] at start at <pastie>:67 []
    |  MapPartitionsRDD[13] at start at <pastie>:67 []
    |  ParallelCollectionRDD[12] at start at <pastie>:67 []

StateStoreCoordinator RPC Endpoint, StateStoreRDD and Preferred Locations

Since execution of a stateful streaming query happens on Spark executors whereas planning is on the driver, Spark Structured Streaming uses RPC environment for tracking locations of the state stores in use. That makes the tasks (of a structured query) to be scheduled where the state (of a partition) is.

When planned for execution, the StateStoreRDD is first asked for the preferred locations of a partition (which happens on the driver) that are later used to compute it (on Spark executors).

Spark Structured Streaming uses RPC environment to keep track of StateStores (their StateStoreProvider actually) for RDD planning.

Every time StateStoreRDD is requested for the preferred locations of a partition, it communicates with the StateStoreCoordinator RPC endpoint that knows the locations of the required StateStores (per host and executor ID).

StateStoreRDD uses StateStoreProviderId with StateStoreId to uniquely identify the state store to use for (associate with) a stateful operator and a partition.

State Management

The state in a stateful streaming query can be implicit or explicit.