OffsetSeqLog — Hadoop DFS-based Metadata Storage of OffsetSeqs¶
OffsetSeqLog
is a Hadoop DFS-based metadata storage for OffsetSeq metadata.
OffsetSeqLog
is created as the write-ahead log (WAL) of offsets of streaming query execution engines.
[[OffsetSeq]][[offsets]][[metadata]] OffsetSeqLog
uses OffsetSeq for metadata which holds an ordered collection of offsets and optional metadata (as OffsetSeqMetadata for event-time watermark).
[[VERSION]] OffsetSeqLog
uses 1
for the version when <
Creating Instance¶
OffsetSeqLog
takes the following to be created:
- [[sparkSession]]
SparkSession
- [[path]] Path of the metadata log directory
=== [[serialize]] Serializing Metadata (Writing Metadata in Serialized Format) -- serialize
Method
[source, scala]¶
serialize( offsetSeq: OffsetSeq, out: OutputStream): Unit
serialize
firstly writes out the <v
on a single line (e.g. v1
) followed by the optional metadata in JSON format.
serialize
then writes out the offsets in JSON format, one per line.
NOTE: No offsets to write in offsetSeq
for a streaming source is marked as - (a dash) in the log.
$ ls -tr [checkpoint-directory]/offsets
0 1 2 3 4 5 6
$ cat [checkpoint-directory]/offsets/6
v1
{"batchWatermarkMs":0,"batchTimestampMs":1502872590006,"conf":{"spark.sql.shuffle.partitions":"200","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
51
serialize
is part of HDFSMetadataLog abstraction.
=== [[deserialize]] Deserializing Metadata (Reading OffsetSeq from Serialized Format) -- deserialize
Method
[source, scala]¶
deserialize(in: InputStream): OffsetSeq¶
deserialize
firstly parses the <
deserialize
reads the optional metadata (with an empty line for metadata not available).
deserialize
creates a SerializedOffset for every line left.
In the end, deserialize
creates a OffsetSeq for the optional metadata and the SerializedOffsets
.
When there are no lines in the InputStream
, deserialize
throws an IllegalStateException
:
Incomplete log file
deserialize
is part of HDFSMetadataLog abstraction.