Skip to content

OffsetSeqLog — Hadoop DFS-based Metadata Storage of OffsetSeqs

OffsetSeqLog is a Hadoop DFS-based metadata storage for OffsetSeq metadata.

OffsetSeqLog is created as the write-ahead log (WAL) of offsets of streaming query execution engines.

[[OffsetSeq]][[offsets]][[metadata]] OffsetSeqLog uses OffsetSeq for metadata which holds an ordered collection of offsets and optional metadata (as OffsetSeqMetadata for event-time watermark).

[[VERSION]] OffsetSeqLog uses 1 for the version when <> and <> metadata.

Creating Instance

OffsetSeqLog takes the following to be created:

  • [[sparkSession]] SparkSession
  • [[path]] Path of the metadata log directory

=== [[serialize]] Serializing Metadata (Writing Metadata in Serialized Format) -- serialize Method

[source, scala]

serialize( offsetSeq: OffsetSeq, out: OutputStream): Unit


serialize firstly writes out the <> prefixed with v on a single line (e.g. v1) followed by the optional metadata in JSON format.

serialize then writes out the offsets in JSON format, one per line.

NOTE: No offsets to write in offsetSeq for a streaming source is marked as - (a dash) in the log.

$ ls -tr [checkpoint-directory]/offsets
0 1 2 3 4 5 6

$ cat [checkpoint-directory]/offsets/6
v1
{"batchWatermarkMs":0,"batchTimestampMs":1502872590006,"conf":{"spark.sql.shuffle.partitions":"200","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
51

serialize is part of HDFSMetadataLog abstraction.

=== [[deserialize]] Deserializing Metadata (Reading OffsetSeq from Serialized Format) -- deserialize Method

[source, scala]

deserialize(in: InputStream): OffsetSeq

deserialize firstly parses the <> on the first line.

deserialize reads the optional metadata (with an empty line for metadata not available).

deserialize creates a SerializedOffset for every line left.

In the end, deserialize creates a OffsetSeq for the optional metadata and the SerializedOffsets.

When there are no lines in the InputStream, deserialize throws an IllegalStateException:

Incomplete log file

deserialize is part of HDFSMetadataLog abstraction.