Skip to content

StateStore

StateStore is the <> of <> for managing state in Stateful Stream Processing (e.g. for persisting running aggregates in Streaming Aggregation).

StateStore supports incremental checkpointing in which only the key-value "Row" pairs that changed are <> or <> (without touching other key-value pairs).

StateStore is identified with the <> (among other properties for identification).

[[implementations]] NOTE: HDFSBackedStateStore is the default and only known implementation of the <> in Spark Structured Streaming.

[[contract]] .StateStore Contract [cols="30m,70",options="header",width="100%"] |=== | Method | Description

| abort a| [[abort]]

[source, scala]

abort(): Unit

Aborts (discards) changes to the state store

Used when:

| commit a| [[commit]]

[source, scala]

commit(): Long

Commits the changes to the state store (and returns the current version)

Used when:

| get a| [[get]]

get(
  key: UnsafeRow): UnsafeRow

Looks up (gets) the value of the given non-null key

Used when:

| getRange a| [[getRange]]

[source, scala]

getRange( start: Option[UnsafeRow], end: Option[UnsafeRow]): Iterator[UnsafeRowPair]


Gets the key-value pairs of UnsafeRows for the specified range (with optional approximate start and end extents)

Used when:

NOTE: All the uses above assume the start and end as None that basically is <>.

| hasCommitted a| [[hasCommitted]]

[source, scala]

hasCommitted: Boolean

Flag to indicate whether state changes have been committed (true) or not (false)

Used when:

  • RDD (via StateStoreOps implicit class) is requested to mapPartitionsWithStateStore (and a task finishes and may need to <>)

  • SymmetricHashJoinStateManager is requested to abortIfNeeded (when a task finishes and may need to <>))

| id a| [[id]]

[source, scala]

id: StateStoreId

The <> of the state store

Used when:

| iterator a| [[iterator]]

[source, scala]

iterator(): Iterator[UnsafeRowPair]

Returns an iterator with all the kay-value pairs in the state store

Used when:

| metrics a| [[metrics]]

[source, scala]

metrics: StateStoreMetrics

StateStoreMetrics of the state store

Used when:

| put a| [[put]]

[source, scala]

put( key: UnsafeRow, value: UnsafeRow): Unit


Stores (puts) the value for the (non-null) key

Used when:

| remove a| [[remove]]

[source, scala]

remove(key: UnsafeRow): Unit

Removes the (non-null) key from the state store

Used when:

| version a| [[version]]

[source, scala]

version: Long

Version of the state store

Used exclusively when HDFSBackedStateStore state store is requested for a new version (that simply the current version incremented)

|===

[NOTE]

StateStore was introduced in https://github.com/apache/spark/commit/8c826880f5eaa3221c4e9e7d3fece54e821a0b98[[SPARK-13809][SQL] State store for streaming aggregations].

Read the motivation and design in https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit[State Store for Streaming Aggregations].

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.StateStore$ logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.StateStore$=ALL

Refer to <>.

=== [[coordinatorRef]] Creating (and Caching) RPC Endpoint Reference to StateStoreCoordinator for Executors -- coordinatorRef Internal Object Method

[source, scala]

coordinatorRef: Option[StateStoreCoordinatorRef]

coordinatorRef requests the SparkEnv helper object for the current SparkEnv.

If the SparkEnv is available and the <<_coordRef, _coordRef>> is not assigned yet, coordinatorRef prints out the following DEBUG message to the logs followed by requesting the StateStoreCoordinatorRef for the StateStoreCoordinator endpoint.

Getting StateStoreCoordinatorRef

If the SparkEnv is available, coordinatorRef prints out the following INFO message to the logs:

Retrieved reference to StateStoreCoordinator: [_coordRef]

NOTE: coordinatorRef is used when StateStore helper object is requested to <> (when StateStore object helper is requested to <>) and <> (when StateStore object helper is requested to <>).

=== [[unload]] Unloading State Store Provider -- unload Method

[source, scala]

unload(storeProviderId: StateStoreProviderId): Unit

unload...FIXME

NOTE: unload is used when StateStore helper object is requested to <> and <>.

=== [[stop]] stop Object Method

[source, scala]

stop(): Unit

stop...FIXME

NOTE: stop seems only be used in tests.

=== [[reportActiveStoreInstance]] Announcing New StateStoreProvider -- reportActiveStoreInstance Internal Object Method

[source, scala]

reportActiveStoreInstance( storeProviderId: StateStoreProviderId): Unit


reportActiveStoreInstance takes the current host and executorId (from the BlockManager on the Spark executor) and requests the <> to reportActiveInstance.

NOTE: reportActiveStoreInstance uses SparkEnv to access the BlockManager.

In the end, reportActiveStoreInstance prints out the following INFO message to the logs:

Reported that the loaded instance [storeProviderId] is active

NOTE: reportActiveStoreInstance is used exclusively when StateStore utility is requested to <>.

=== [[MaintenanceTask]] MaintenanceTask Daemon Thread

MaintenanceTask is a daemon thread that <>.

When an error occurs, MaintenanceTask clears <> internal registry.

MaintenanceTask is scheduled on state-store-maintenance-task thread pool that runs periodically every spark.sql.streaming.stateStore.maintenanceInterval.

Looking Up StateStore by Provider ID

get(
  storeProviderId: StateStoreProviderId,
  keySchema: StructType,
  valueSchema: StructType,
  indexOrdinal: Option[Int],
  version: Long,
  storeConf: StateStoreConf,
  hadoopConf: Configuration): StateStore

get finds StateStore for the specified StateStoreProviderId and version.

NOTE: The version is either the <> (in Continuous Stream Processing) or the current batch ID (in Micro-Batch Stream Processing).

Internally, get looks up the <> (by storeProviderId) in the <> internal cache. If unavailable, get uses the StateStoreProvider utility to <>.

get will also <> (unless already started) and <>.

In the end, get requests the StateStoreProvider to <>.

get is used when:

==== [[startMaintenanceIfNeeded]] Starting Periodic Maintenance Task (Unless Already Started) -- startMaintenanceIfNeeded Internal Object Method

[source, scala]

startMaintenanceIfNeeded(): Unit

startMaintenanceIfNeeded schedules <> to start after and every spark.sql.streaming.stateStore.maintenanceInterval (defaults to 60s).

NOTE: startMaintenanceIfNeeded does nothing when the maintenance task has already been started and is still running.

NOTE: startMaintenanceIfNeeded is used exclusively when StateStore is requested to <>.

==== [[doMaintenance]] Doing State Maintenance of Registered State Store Providers -- doMaintenance Internal Object Method

[source, scala]

doMaintenance(): Unit

Internally, doMaintenance prints the following DEBUG message to the logs:

Doing maintenance

doMaintenance then requests every spark-sql-streaming-StateStoreProvider.md[StateStoreProvider] (registered in <>) to spark-sql-streaming-StateStoreProvider.md#doMaintenance[do its own internal maintenance] (only when a StateStoreProvider <>).

When a StateStoreProvider is <>, doMaintenance <> and prints the following INFO message to the logs:

Unloaded [provider]

NOTE: doMaintenance is used exclusively in <>.

==== [[verifyIfStoreInstanceActive]] verifyIfStoreInstanceActive Internal Object Method

[source, scala]

verifyIfStoreInstanceActive(storeProviderId: StateStoreProviderId): Boolean

verifyIfStoreInstanceActive...FIXME

NOTE: verifyIfStoreInstanceActive is used exclusively when StateStore helper object is requested to <> (from a running <>).

=== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| loadedProviders | [[loadedProviders]] Loaded providers internal cache, i.e. <> per <>

Used in...FIXME

| _coordRef | [[_coordRef]] StateStoreCoordinator RPC endpoint (a RpcEndpointRef to StateStoreCoordinator)

Used in...FIXME |===


Last update: 2021-02-07