Skip to content

HDFSBackedStateStoreProvider

HDFSBackedStateStoreProvider is a StateStoreProvider that uses a Hadoop DFS-compatible file system for versioned state checkpointing.

HDFSBackedStateStoreProvider is the default StateStoreProvider per the spark.sql.streaming.stateStore.providerClass internal configuration property.

HDFSBackedStateStoreProvider is <> and immediately requested to <> when StateStoreProvider utility is requested to <>. That is when HDFSBackedStateStoreProvider is given the <> that uniquely identifies the state store to use for a stateful operator and a partition.

HDFSStateStoreProvider uses HDFSBackedStateStores to manage state (<>).

HDFSBackedStateStoreProvider manages versioned state in delta and snapshot files (and uses a <> internally for faster access to state versions).

[[creating-instance]] HDFSBackedStateStoreProvider takes no arguments to be created.

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider=ALL

Refer to <>.

Performance Metrics

[cols="30,70",options="header",width="100%"] |=== | Name (in web UI) | Description

| memoryUsedBytes a| [[memoryUsedBytes]] Estimated size of the <> internal registry

| count of cache hit on states cache in provider a| [[metricLoadedMapCacheHit]][[loadedMapCacheHitCount]] The number of times <> was successful and found (hit) the requested state version in the <> internal cache

| count of cache miss on states cache in provider a| [[metricLoadedMapCacheMiss]][[loadedMapCacheMissCount]] The number of times <> could not find (missed) the requested state version in the <> internal cache

| estimated size of state only on current version a| [[metricStateOnCurrentVersionSizeBytes]][[stateOnCurrentVersionSizeBytes]] Estimated size of the current state (of the HDFSBackedStateStore)

|===

=== [[baseDir]] State Checkpoint Base Directory -- baseDir Lazy Internal Property

[source,scala]

baseDir: Path

baseDir is the base directory (as a Hadoop Path) for state checkpointing (for <> and <> state files).

baseDir is initialized lazily since it is not yet known when HDFSBackedStateStoreProvider is <>.

baseDir is initialized and created based on the <> of the <> when HDFSBackedStateStoreProvider is requested to <>.

=== [[stateStoreId]][[stateStoreId_]] StateStoreId -- Unique Identifier of State Store

As a <>, HDFSBackedStateStoreProvider is associated with a <> (which is a unique identifier of the state store for a stateful operator and a partition).

HDFSBackedStateStoreProvider is given the <> at <> (as requested by the <> contract).

The <> is then used for the following:

  • HDFSBackedStateStore is requested for the id

  • HDFSBackedStateStoreProvider is requested for the <> and the <>

=== [[toString]] Textual Representation -- toString Method

[source, scala]

toString: String

NOTE: toString is part of the ++https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#toString--++[java.lang.Object] contract for the string representation of the object.

HDFSBackedStateStoreProvider uses the <> and the <> for the textual representation:

HDFSStateStoreProvider[id = (op=[operatorId],part=[partitionId]),dir = [baseDir]]

=== [[getStore]] Loading Specified Version of State (Store) For Update -- getStore Method

[source, scala]

getStore( version: Long): StateStore


getStore is part of the StateStoreProvider abstraction.

getStore creates a new empty state (ConcurrentHashMap[UnsafeRow, UnsafeRow]) and <> for versions greater than 0.

In the end, getStore creates a new HDFSBackedStateStore for the specified version with the new state and prints out the following INFO message to the logs:

Retrieved version [version] of [this] for update

getStore throws an IllegalArgumentException when the specified version is less than 0 (negative):

Version cannot be less than 0

=== [[deltaFile]] deltaFile Internal Method

[source, scala]

deltaFile(version: Long): Path

deltaFile simply returns the Hadoop https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/fs/Path.html[Path] of the [version].delta file in the <>.

deltaFile is used when:

=== [[snapshotFile]] snapshotFile Internal Method

[source, scala]

snapshotFile(version: Long): Path

snapshotFile simply returns the Hadoop https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/fs/Path.html[Path] of the [version].snapshot file in the <>.

NOTE: snapshotFile is used when HDFSBackedStateStoreProvider is requested to <> or <>.

=== [[fetchFiles]] Listing All Delta And Snapshot Files In State Checkpoint Directory -- fetchFiles Internal Method

[source, scala]

fetchFiles(): Seq[StoreFile]

fetchFiles requests the <> for all the files in the <>.

For every file, fetchFiles splits the name into two parts with . (dot) as a separator (files with more or less than two parts are simply ignored) and registers a new StoreFile for snapshot and delta files:

  • For snapshot files, fetchFiles creates a new StoreFile with isSnapshot flag on (true)

  • For delta files, fetchFiles creates a new StoreFile with isSnapshot flag off (false)

NOTE: delta files are only registered if there was no snapshot file for the version.

fetchFiles prints out the following WARN message to the logs for any other files:

Could not identify file [path] for [this]

In the end, fetchFiles sorts the StoreFiles based on their version, prints out the following DEBUG message to the logs, and returns the files.

Current set of files for [this]: [storeFiles]

NOTE: fetchFiles is used when HDFSBackedStateStoreProvider is requested to <> and <>.

=== [[init]] Initializing StateStoreProvider -- init Method

[source, scala]

init( stateStoreId: StateStoreId, keySchema: StructType, valueSchema: StructType, indexOrdinal: Option[Int], storeConf: StateStoreConf, hadoopConf: Configuration): Unit


NOTE: init is part of the <> to initialize itself.

init records the values of the input arguments as the <>, <>, <>, <>, and <> internal properties.

init requests the given StateStoreConf for the spark.sql.streaming.maxBatchesToRetainInMemory configuration property (that is then recorded in the <> internal property).

In the end, init requests the <> to create the <> directory (with parent directories).

=== [[filesForVersion]] Finding Snapshot File and Delta Files For Version -- filesForVersion Internal Method

[source, scala]

filesForVersion( allFiles: Seq[StoreFile], version: Long): Seq[StoreFile]


filesForVersion finds the latest snapshot version among the given allFiles files up to and including the given version (it may or may not be available).

If a snapshot file was found (among the given file up to and including the given version), filesForVersion takes all delta files between the version of the snapshot file (exclusive) and the given version (inclusive) from the given allFiles files.

NOTE: The number of delta files should be the given version minus the snapshot version.

If a snapshot file was not found, filesForVersion takes all delta files up to the given version (inclusive) from the given allFiles files.

In the end, filesForVersion returns a snapshot version (if available) and all delta files up to the given version (inclusive).

NOTE: filesForVersion is used when HDFSBackedStateStoreProvider is requested to <> and <>.

=== [[doMaintenance]] State Maintenance (Snapshotting and Cleaning Up) -- doMaintenance Method

[source, scala]

doMaintenance(): Unit

NOTE: doMaintenance is part of the <> for optional state maintenance.

doMaintenance simply does <> followed by <>.

In case of any non-fatal errors, doMaintenance simply prints out the following WARN message to the logs:

Error performing snapshot and cleaning up [this]

==== [[doSnapshot]] State Snapshoting (Rolling Up Delta Files into Snapshot File) -- doSnapshot Internal Method

[source, scala]

doSnapshot(): Unit

doSnapshot <> (files) and prints out the following DEBUG message to the logs:

fetchFiles() took [time] ms.

doSnapshot returns immediately (and does nothing) when there are no delta and snapshot files.

doSnapshot takes the version of the latest file (lastVersion).

doSnapshot <> (among the files and for the last version).

doSnapshot looks up the last version in the <>.

When the last version was found in the cache and the number of delta files is above spark.sql.streaming.stateStore.minDeltasForSnapshot internal threshold, doSnapshot <>.

In the end, doSnapshot prints out the following DEBUG message to the logs:

writeSnapshotFile() took [time] ms.

In case of non-fatal errors, doSnapshot simply prints out the following WARN message to the logs:

Error doing snapshots for [this]

NOTE: doSnapshot is used exclusively when HDFSBackedStateStoreProvider is requested to <>.

==== [[cleanup]] Cleaning Up (Removing Old State Files) -- cleanup Internal Method

[source, scala]

cleanup(): Unit

cleanup <> (files) and prints out the following DEBUG message to the logs:

fetchFiles() took [time] ms.

cleanup returns immediately (and does nothing) when there are no delta and snapshot files.

cleanup takes the version of the latest state file (lastVersion) and decrements it by spark.sql.streaming.minBatchesToRetain configuration property that gives the earliest version to retain (and all older state files to be removed).

cleanup requests the <> to delete the path of every old state file.

cleanup prints out the following DEBUG message to the logs:

deleting files took [time] ms.

In the end, cleanup prints out the following INFO message to the logs:

Deleted files older than [version] for [this]: [filesToDelete]

In case of a non-fatal exception, cleanup prints out the following WARN message to the logs:

Error cleaning up files for [this]

NOTE: cleanup is used exclusively when HDFSBackedStateStoreProvider is requested for <>.

=== [[close]] Closing State Store Provider -- close Method

[source, scala]

close(): Unit

NOTE: close is part of the <> to close the state store provider.

close...FIXME

=== [[getMetricsForProvider]] getMetricsForProvider Method

[source, scala]

getMetricsForProvider(): Map[String, Long]

getMetricsForProvider returns the following <>:

  • <>

  • <>

  • <>

getMetricsForProvider is used when HDFSBackedStateStore is requested for performance metrics.

=== [[supportedCustomMetrics]] Supported StateStoreCustomMetrics -- supportedCustomMetrics Method

[source, scala]

supportedCustomMetrics: Seq[StateStoreCustomMetric]

NOTE: supportedCustomMetrics is part of the <> for the <> of a state store provider.

supportedCustomMetrics includes the following <>:

  • <>

  • <>

  • <>

=== [[commitUpdates]] Committing State Changes (As New Version of State) -- commitUpdates Internal Method

[source, scala]

commitUpdates( newVersion: Long, map: ConcurrentHashMap[UnsafeRow, UnsafeRow], output: DataOutputStream): Unit


commitUpdates <> (with the given DataOutputStream) followed by <> (with the given newVersion and the map state).

commitUpdates is used when HDFSBackedStateStore is requested to commit state changes.

=== [[loadMap]] Loading Specified Version of State (from Internal Cache or Snapshot and Delta Files) -- loadMap Internal Method

[source, scala]

loadMap( version: Long): ConcurrentHashMap[UnsafeRow, UnsafeRow]


loadMap firstly tries to find the state version in the <> internal cache and, if found, returns it immediately and increments the <> metric.

If the requested state version could not be found in the <> internal cache, loadMap prints out the following WARN message to the logs:

[options="wrap"]

The state for version [version] doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.

loadMap increments the <> metric.

loadMap <> and, if found, <> and returns it.

If not found, loadMap tries to find the most recent state version by decrementing the requested version until one is found in the <> internal cache or <>.

loadMap <> for all the remaining versions (from the snapshot version up to the requested one). loadMap <> (the closest snapshot and the remaining delta versions) and returns it.

In the end, loadMap prints out the following DEBUG message to the logs:

Loading state for [version] takes [elapsedMs] ms.

NOTE: loadMap is used exclusively when HDFSBackedStateStoreProvider is requested for the <>.

=== [[readSnapshotFile]] Loading State Snapshot File For Specified Version -- readSnapshotFile Internal Method

[source, scala]

readSnapshotFile( version: Long): Option[ConcurrentHashMap[UnsafeRow, UnsafeRow]]


readSnapshotFile <> for the given version.

readSnapshotFile requests the <> to open the snapshot file for reading and <> (input).

readSnapshotFile reads the decompressed input stream until an EOF (that is marked as the integer -1 in the stream) and inserts key and value rows in a state map (ConcurrentHashMap[UnsafeRow, UnsafeRow]):

  • First integer is the size of a key (buffer) followed by the key itself (of the size). readSnapshotFile creates an UnsafeRow for the key (with the number of fields as indicated by the number of fields of the <>).

  • Next integer is the size of a value (buffer) followed by the value itself (of the size). readSnapshotFile creates an UnsafeRow for the value (with the number of fields as indicated by the number of fields of the <>).

In the end, readSnapshotFile prints out the following INFO message to the logs and returns the key-value map.

Read snapshot file for version [version] of [this] from [fileToRead]

In case of FileNotFoundException readSnapshotFile simply returns None (to indicate no snapshot state file was available and so no state for the version).

readSnapshotFile throws an IOException for the size of a key or a value below 0:

Error reading snapshot file [fileToRead] of [this]: [key|value] size cannot be [keySize|valueSize]

NOTE: readSnapshotFile is used exclusively when HDFSBackedStateStoreProvider is requested to <>.

=== [[updateFromDeltaFile]] Updating State with State Changes For Specified Version (per Delta File) -- updateFromDeltaFile Internal Method

[source, scala]

updateFromDeltaFile( version: Long, map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit


[NOTE]

updateFromDeltaFile is very similar code-wise to <> with the two main differences:

  • updateFromDeltaFile is given the state map to update (while <> loads the state from a snapshot file)

  • updateFromDeltaFile removes a key from the state map when the value (size) is -1 (while <> throws an IOException)

The following description is almost an exact copy of <> just for completeness.

updateFromDeltaFile <> for the requested version.

updateFromDeltaFile requests the <> to open the delta file for reading and <> (input).

updateFromDeltaFile reads the decompressed input stream until an EOF (that is marked as the integer -1 in the stream) and inserts key and value rows in the given state map:

  • First integer is the size of a key (buffer) followed by the key itself (of the size). updateFromDeltaFile creates an UnsafeRow for the key (with the number of fields as indicated by the number of fields of the <>).

  • Next integer is the size of a value (buffer) followed by the value itself (of the size). updateFromDeltaFile creates an UnsafeRow for the value (with the number of fields as indicated by the number of fields of the <>) or removes the corresponding key from the state map (if the value size is -1)

NOTE: updateFromDeltaFile removes the key-value entry from the state map if the value (size) is -1.

In the end, updateFromDeltaFile prints out the following INFO message to the logs and returns the key-value map.

Read delta file for version [version] of [this] from [fileToRead]

updateFromDeltaFile throws an IllegalStateException in case of FileNotFoundException while opening the delta file for the specified version:

Error reading delta file [fileToRead] of [this]: [fileToRead] does not exist

NOTE: updateFromDeltaFile is used exclusively when HDFSBackedStateStoreProvider is requested to <>.

=== [[putStateIntoStateCacheMap]] Caching New Version of State -- putStateIntoStateCacheMap Internal Method

[source, scala]

putStateIntoStateCacheMap( newVersion: Long, map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit


putStateIntoStateCacheMap registers state for a given version, i.e. adds the map state under the newVersion key in the <> internal registry.

With the <> threshold as 0 or below, putStateIntoStateCacheMap simply removes all entries from the <> internal registry and returns.

putStateIntoStateCacheMap removes the oldest state version(s) in the <> internal registry until its size is at the <> threshold.

With the size of the <> internal registry is at the <> threshold, putStateIntoStateCacheMap does two more optimizations per newVersion

  • It does not add the given state when the version of the oldest state is earlier (larger) than the given newVersion

  • It removes the oldest state when older (smaller) than the given newVersion

NOTE: putStateIntoStateCacheMap is used when HDFSBackedStateStoreProvider is requested to <> and <>.

=== [[writeSnapshotFile]] Writing Compressed Snapshot File for Specified Version -- writeSnapshotFile Internal Method

[source, scala]

writeSnapshotFile( version: Long, map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit


writeSnapshotFile <> for the given version.

writeSnapshotFile requests the <> to create the snapshot file (with overwriting enabled) and <>.

For every key-value UnsafeRow pair in the given map, writeSnapshotFile writes the size of the key followed by the key itself (as bytes). writeSnapshotFile then writes the size of the value followed by the value itself (as bytes).

In the end, writeSnapshotFile prints out the following INFO message to the logs:

Written snapshot file for version [version] of [this] at [targetFile]

In case of any Throwable exception, writeSnapshotFile <> and re-throws the exception.

NOTE: writeSnapshotFile is used exclusively when HDFSBackedStateStoreProvider is requested to <>.

=== [[compressStream]] compressStream Internal Method

[source, scala]

compressStream( outputStream: DataOutputStream): DataOutputStream


compressStream creates a new LZ4CompressionCodec (based on the <>) and requests it to create a LZ4BlockOutputStream with the given DataOutputStream.

In the end, compressStream creates a new DataOutputStream with the LZ4BlockOutputStream.

NOTE: compressStream is used when...FIXME

=== [[cancelDeltaFile]] cancelDeltaFile Internal Method

[source, scala]

cancelDeltaFile( compressedStream: DataOutputStream, rawStream: CancellableFSDataOutputStream): Unit


cancelDeltaFile...FIXME

NOTE: cancelDeltaFile is used when...FIXME

=== [[finalizeDeltaFile]] finalizeDeltaFile Internal Method

[source, scala]

finalizeDeltaFile( output: DataOutputStream): Unit


finalizeDeltaFile simply writes -1 to the given DataOutputStream (to indicate end of file) and closes it.

NOTE: finalizeDeltaFile is used exclusively when HDFSBackedStateStoreProvider is requested to <>.

=== [[loadedMaps]] Lookup Table (Cache) of States By Version -- loadedMaps Internal Method

[source, scala]

loadedMaps: TreeMap[ Long, // version ConcurrentHashMap[UnsafeRow, UnsafeRow]] // state (as keys and values)


loadedMaps is a https://docs.oracle.com/javase/8/docs/api/java/util/TreeMap.html[java.util.TreeMap] of state versions sorted according to the reversed ordering of the versions (i.e. long numbers).

A new entry (a version and the state updates) can only be added when HDFSBackedStateStoreProvider is requested to <> (and only when the spark.sql.streaming.maxBatchesToRetainInMemory internal configuration is above 0).

loadedMaps is mainly used when HDFSBackedStateStoreProvider is requested to <>. Positive hits (when a version could be found in the cache) is available as the <> performance metric while misses are counted in the <> performance metric.

NOTE: With no or missing versions in cache <> metric should be above 0 while <> always 0 (or smaller than the other metric).

The estimated size of loadedMaps is available as the <> performance metric.

The spark.sql.streaming.maxBatchesToRetainInMemory internal configuration is used as the threshold of the number of elements in loadedMaps. When 0 or negative, every <> removes all elements in (clears) loadedMaps.

NOTE: It is possible to change the configuration at restart of a structured query.

The state deltas (the values) in loadedMaps are cleared (all entries removed) when HDFSBackedStateStoreProvider is requested to <>.

Used when HDFSBackedStateStoreProvider is requested for the following:

  • <>

  • <>

=== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| fm a| [[fm]] CheckpointFileManager for the <> (and the <>)

Used when:

| hadoopConf a| [[hadoopConf]] Hadoop https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/conf/Configuration.html[Configuration] of the <>

Given when HDFSBackedStateStoreProvider is requested to <>

| keySchema a| [[keySchema]]

[source, scala]

keySchema: StructType

Schema of the state keys

| valueSchema a| [[valueSchema]]

[source, scala]

valueSchema: StructType

Schema of the state values

| numberOfVersionsToRetainInMemory a| [[numberOfVersionsToRetainInMemory]]

[source, scala]

numberOfVersionsToRetainInMemory: Int

numberOfVersionsToRetainInMemory is the maximum number of entries in the <> internal registry and is configured by the spark.sql.streaming.maxBatchesToRetainInMemory internal configuration.

numberOfVersionsToRetainInMemory is a threshold when HDFSBackedStateStoreProvider removes the last key from the <> internal registry (per reverse ordering of state versions) when requested to <>.

| sparkConf a| [[sparkConf]] SparkConf

|===