WatermarkSupport Unary Physical Operators¶
WatermarkSupport
is the <UnaryExecNode
) with support for streaming event-time watermark.
[NOTE]¶
Watermark (aka "allowed lateness") is a moving threshold of event time and specifies what data to consider for aggregations, i.e. the threshold of late data so the engine can automatically drop incoming late data given event time and clean up old state accordingly.
Read the official documentation of Spark in http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking[Handling Late Data and Watermarking].¶
[[properties]] .WatermarkSupport's (Lazily-Initialized) Properties [cols="1,3",options="header",width="100%"] |=== | Property | Description
| [[watermarkExpression]] watermarkExpression
a| Optional Catalyst expression that matches rows older than the event time watermark.
Note
Use withWatermark operator to specify streaming watermark.
When initialized, watermarkExpression
finds spark.watermarkDelayMs watermark attribute in the child output's metadata.
If found, watermarkExpression
creates evictionExpression
with the watermark attribute that is less than or equal <
The watermark attribute may be of type StructType
. If it is, watermarkExpression
uses the first field as the watermark.
watermarkExpression
prints out the following INFO message to the logs when spark.watermarkDelayMs watermark attribute is found.
INFO [physicalOperator]Exec: Filtering state store on: [evictionExpression]
NOTE: physicalOperator
can be FlatMapGroupsWithStateExec, StateStoreSaveExec.md[StateStoreSaveExec] or physical-operators/StreamingDeduplicateExec.md[StreamingDeduplicateExec].
TIP: Enable INFO logging level for one of the stateful physical operators to see the INFO message in the logs.
| [[watermarkPredicateForData]] watermarkPredicateForData
| Optional Predicate
that uses <
| [[watermarkPredicateForKeys]] watermarkPredicateForKeys
| Optional Predicate
that uses <
=== [[contract]] WatermarkSupport Contract
[source, scala]¶
package org.apache.spark.sql.execution.streaming
trait WatermarkSupport extends UnaryExecNode { // only required methods that have no implementation def eventTimeWatermark: Option[Long] def keyExpressions: Seq[Attribute] }
.WatermarkSupport Contract [cols="1,2",options="header",width="100%"] |=== | Method | Description
| [[eventTimeWatermark]] eventTimeWatermark
| Used mainly in <LessThanOrEqual
Catalyst binary expression that matches rows older than the watermark.
| [[keyExpressions]] keyExpressions
| Grouping keys (in FlatMapGroupsWithStateExec), duplicate keys (in physical-operators/StreamingDeduplicateExec.md#keyExpressions[StreamingDeduplicateExec]) or key attributes (in StateStoreSaveExec.md#keyExpressions[StateStoreSaveExec]) with at most one that may have spark.watermarkDelayMs watermark attribute in metadata
Used in <Predicate
to match rows older than the event time watermark.
Used also when StateStoreSaveExec.md#doExecute[StateStoreSaveExec] and physical-operators/StreamingDeduplicateExec.md#doExecute[StreamingDeduplicateExec] physical operators are executed. |===
=== [[removeKeysOlderThanWatermark]][[removeKeysOlderThanWatermark-StateStore]] Removing Keys From StateStore Older Than Watermark -- removeKeysOlderThanWatermark
Method
[source, scala]¶
removeKeysOlderThanWatermark(store: StateStore): Unit¶
removeKeysOlderThanWatermark
requests the input store
for all rows.
removeKeysOlderThanWatermark
then uses watermarkPredicateForKeys to remove matching rows from the store.
removeKeysOlderThanWatermark
is used when StreamingDeduplicateExec physical operator is requested to execute.
=== [[removeKeysOlderThanWatermark-StreamingAggregationStateManager-store]] removeKeysOlderThanWatermark
Method
[source, scala]¶
removeKeysOlderThanWatermark( storeManager: StreamingAggregationStateManager, store: StateStore): Unit
removeKeysOlderThanWatermark
...FIXME
NOTE: removeKeysOlderThanWatermark
is used exclusively when StateStoreSaveExec
physical operator is requested to <