Skip to content


HDFSBackedStateStore is a concrete StateStore that uses a Hadoop DFS-compatible file system for versioned state persistence.

HDFSBackedStateStore is <> exclusively when HDFSBackedStateStoreProvider is requested for the specified version of state (store) for update (when StateStore utility is requested to look up a StateStore by provider id).

[[id]] HDFSBackedStateStore uses the StateStoreId of the owning HDFSBackedStateStoreProvider.

[[toString]] When requested for the textual representation, HDFSBackedStateStore gives HDFSStateStore[id=(op=[operatorId],part=[partitionId]),dir=[baseDir]].

[[logging]] [TIP] ==== HDFSBackedStateStore is an internal class of HDFSBackedStateStoreProvider and uses its logger. ====

=== [[creating-instance]] Creating HDFSBackedStateStore Instance

HDFSBackedStateStore takes the following to be created:

  • [[version]] Version
  • [[mapToUpdate]] State Map (ConcurrentHashMap[UnsafeRow, UnsafeRow])

HDFSBackedStateStore initializes the <>.

=== [[state]] Internal State -- state Internal Property

[source, scala]

state: STATE

state is the current state of HDFSBackedStateStore and can be in one of the three possible states: <>, <>, and <>.

State changes (to the internal <> registry) are allowed as long as HDFSBackedStateStore is in the default <> state. Right after a HDFSBackedStateStore transitions to either <> or <> state, no further state changes are allowed.

NOTE: Don't get confused with the term "state" as there are two states: the internal <> of HDFSBackedStateStore and the state of a streaming query (that HDFSBackedStateStore is responsible for).

[[states]] .Internal States [cols="30m,70",options="header",width="100%"] |=== | Name | Description

| ABORTED a| [[ABORTED]] After <>


<> flag indicates whether HDFSBackedStateStore is in this state or not.

| UPDATING a| [[UPDATING]] (default) Initial state after the HDFSBackedStateStore was <>

Allows for state changes (e.g. <>, <>, <>) and eventually <> or <> them


=== [[writeUpdateToDeltaFile]] writeUpdateToDeltaFile Internal Method

[source, scala]

writeUpdateToDeltaFile( output: DataOutputStream, key: UnsafeRow, value: UnsafeRow): Unit


=== [[put]] put Method

[source, scala]

put( key: UnsafeRow, value: UnsafeRow): Unit

NOTE: put is a part of[StateStore Contract] to...FIXME

put stores the copies of the key and value in <> internal registry followed by <> (using <>).

put reports an IllegalStateException when HDFSBackedStateStore is not in <> state:

Cannot put after already committed or aborted

=== [[commit]] Committing State Changes -- commit Method

[source, scala]

commit(): Long

commit is part of the StateStore abstraction.

commit requests the parent HDFSBackedStateStoreProvider to commit state changes (as a new version of state) (with the <>, the <> and the <>).

commit transitions HDFSBackedStateStore to <> state.

commit prints out the following INFO message to the logs:

Committed version [newVersion] for [this] to file [finalDeltaFile]

commit returns a <>.

commit throws an IllegalStateException when HDFSBackedStateStore is not in <> state:

Cannot commit after already committed or aborted

commit throws an IllegalStateException for any NonFatal exception:

Error committing version [newVersion] into [this]

=== [[abort]] Aborting State Changes -- abort Method

[source, scala]

abort(): Unit

abort is part of the StateStore abstraction.


=== [[metrics]] Performance Metrics -- metrics Method

[source, scala]

metrics: StateStoreMetrics

metrics is part of the StateStore abstraction.

metrics requests the performance metrics of the parent HDFSBackedStateStoreProvider.

The performance metrics of the provider used are only the ones listed in supportedCustomMetrics.

In the end, metrics returns a new StateStoreMetrics with the following:

=== [[hasCommitted]] Are State Changes Committed? -- hasCommitted Method

[source, scala]

hasCommitted: Boolean

hasCommitted is part of the StateStore abstraction.

hasCommitted returns true when HDFSBackedStateStore is in <> state and false otherwise.

Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| compressedStream a| [[compressedStream]]

[source, scala]

compressedStream: DataOutputStream

The compressed[] for the <>

| deltaFileStream a| [[deltaFileStream]]

[source, scala]

deltaFileStream: CheckpointFileManager.CancellableFSDataOutputStream

| finalDeltaFile a| [[finalDeltaFile]]

[source, scala]

finalDeltaFile: Path

The Hadoop[Path] of the deltaFile for the version

| newVersion a| [[newVersion]]

[source, scala]

newVersion: Long

Used exclusively when HDFSBackedStateStore is requested for the <>, to <> and <>