Skip to content

Offsets and Metadata Checkpointing

A streaming query can be started from scratch or from checkpoint (that gives fault-tolerance as the state is preserved even when a failure happens).

Stream execution engines use checkpoint location to resume stream processing and get start offsets to start query processing from.

StreamExecution resumes (populates the start offsets) from the latest checkpointed offsets from the Write-Ahead Log (WAL) of Offsets that may have already been processed (and, if so, committed to the Offset Commit Log).

Micro-Batch Stream Processing

In <>, the available offsets registry is populated with the latest offsets from the Write-Ahead Log (WAL) when MicroBatchExecution stream processing engine is requested to <> (if available) when MicroBatchExecution is requested to <> (before the first "zero" micro-batch).

The available offsets are then added to the committed offsets when the latest batch ID available (as described above) is exactly the latest batch ID committed to the Offset Commit Log when MicroBatchExecution stream processing engine is requested to <>.

When a streaming query is started from scratch (with no checkpoint that has offsets in the Offset Write-Ahead Log), MicroBatchExecution prints out the following INFO message:

Starting new streaming query.

When a streaming query is resumed (restarted) from a checkpoint with offsets in the Offset Write-Ahead Log, MicroBatchExecution prints out the following INFO message:

Resuming at batch [currentBatchId] with committed offsets [committedOffsets] and available offsets [availableOffsets]

Every time MicroBatchExecution is requested to <> (in any of the streaming sources)...FIXME

When MicroBatchExecution is requested to <> (when MicroBatchExecution requested to <>), every streaming source is requested for the latest offset available that are added to the availableOffsets registry. Streaming sources report some offsets or none at all (if this source has never received any data). Streaming sources with no data are excluded (filtered out).

MicroBatchExecution prints out the following TRACE message to the logs:

noDataBatchesEnabled = [noDataBatchesEnabled], lastExecutionRequiresAnotherBatch = [lastExecutionRequiresAnotherBatch], isNewDataAvailable = [isNewDataAvailable], shouldConstructNextBatch = [shouldConstructNextBatch]

With <> internal flag enabled, MicroBatchExecution commits (adds) the available offsets for the batch to the Write-Ahead Log (WAL) and prints out the following INFO message to the logs:

Committed offsets for batch [currentBatchId]. Metadata [offsetSeqMetadata]

When <>, MicroBatchExecution requests every Source and <> (in the availableOffsets registry) for unprocessed data (that has not been committed yet and so considered unprocessed).

In the end (of <>), MicroBatchExecution commits (adds) the available offsets (to the <> registry) so they are considered processed already.

MicroBatchExecution prints out the following DEBUG message to the logs:

Completed batch [currentBatchId]

Limitations (Assumptions)

It is assumed that the order of streaming sources in a streaming query matches the order of the offsets of OffsetSeq (in offsetLog) and availableOffsets.

In other words, a streaming query can be modified and then restarted from a checkpoint (to maintain stream processing state) only when the number of streaming sources and their order are preserved across restarts.