Skip to content

CheckpointFileManager

CheckpointFileManager is the <> of <> that manage checkpoint files (metadata of streaming batches) on Hadoop DFS-compatible file systems.

CheckpointFileManager is <> per <> configuration property if defined before reverting to the available <>.

CheckpointFileManager is used exclusively by HDFSMetadataLog, StreamMetadata and HDFSBackedStateStoreProvider.

[[contract]] .CheckpointFileManager Contract [cols="30m,70",options="header",width="100%"] |=== | Method | Description

| createAtomic a| [[createAtomic]]

[source, scala]

createAtomic( path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream


Used when:

| delete a| [[delete]]

[source, scala]

delete(path: Path): Unit

Deletes the given path recursively (if exists)

Used when:

| exists a| [[exists]]

[source, scala]

exists(path: Path): Boolean

Used when HDFSMetadataLog is created (to create the metadata directory) and requested for metadata of a batch

| isLocal a| [[isLocal]]

[source, scala]

isLocal: Boolean

Does not seem to be used.

| list a| [[list]]

[source, scala]

list( path: Path): Array[FileStatus] // <1> list( path: Path, filter: PathFilter): Array[FileStatus]


<1> Uses PathFilter that accepts all files in the path

Lists all files in the given path

Used when:

| mkdirs a| [[mkdirs]]

[source, scala]

mkdirs(path: Path): Unit

Used when:

| open a| [[open]]

[source, scala]

open(path: Path): FSDataInputStream

Opens a file (by the given path) for reading

Used when:

|===

[[implementations]] .CheckpointFileManagers [cols="30,70",options="header",width="100%"] |=== | CheckpointFileManager | Description

| FileContextBasedCheckpointFileManager | [[FileContextBasedCheckpointFileManager]] Default CheckpointFileManager that uses Hadoop's https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FileContext.html[FileContext] API for managing checkpoint files (unless <>)

| FileSystemBasedCheckpointFileManager | [[FileSystemBasedCheckpointFileManager]] Basic CheckpointFileManager that uses Hadoop's https://hadoop.apache.org/docs/r2.8.3/api/org/apache/hadoop/fs/FileSystem.html[FileSystem] API for managing checkpoint files (that <> that the implementation of FileSystem.rename() is atomic or the correctness and fault-tolerance of Structured Streaming is not guaranteed)

|===

=== [[create]] Creating CheckpointFileManager Instance -- create Object Method

[source, scala]

create( path: Path, hadoopConf: Configuration): CheckpointFileManager


create finds spark.sql.streaming.checkpointFileManagerClass configuration property in the hadoopConf configuration.

If found, create simply instantiates whatever CheckpointFileManager implementation is defined.

If not found, create creates a FileContextBasedCheckpointFileManager.

In case of UnsupportedFileSystemException, create prints out the following WARN message to the logs and creates (falls back on) a FileSystemBasedCheckpointFileManager.

Could not use FileContext API for managing Structured Streaming checkpoint files at [path]. Using FileSystem API instead for managing log files. If the implementation of FileSystem.rename() is not atomic, then the correctness and fault-tolerance of your Structured Streaming is not guaranteed.

create is used when: