Skip to content

SymmetricHashJoinStateManager

SymmetricHashJoinStateManager is <> for the left and right OneSideHashJoiners of a StreamingSymmetricHashJoinExec physical operator (one for each side when StreamingSymmetricHashJoinExec is requested to process partitions of the left and right sides of a stream-stream join).

SymmetricHashJoinStateManager and Stream-Stream Join

SymmetricHashJoinStateManager manages join state using the <> and the <> state store handlers (and simply acts like their facade).

Creating Instance

SymmetricHashJoinStateManager takes the following to be created:

=== [[keyToNumValues]][[keyWithIndexToValue]] KeyToNumValuesStore and KeyWithIndexToValueStore State Store Handlers -- keyToNumValues and keyWithIndexToValue Internal Properties

SymmetricHashJoinStateManager uses a <> (keyToNumValues) and a <> (keyWithIndexToValue) internally that are created immediately when SymmetricHashJoinStateManager is <> (for a OneSideHashJoiner).

keyToNumValues and keyWithIndexToValue are used when SymmetricHashJoinStateManager is requested for the following:

  • <>

  • <>

  • <>

  • <>

  • <>

  • <>

  • <>

=== [[joinSide-internals]] Join Side Marker -- JoinSide Internal Enum

JoinSide can be one of the two possible values:

  • [[LeftSide]][[left]] LeftSide (alias: left)

  • [[RightSide]][[right]] RightSide (alias: right)

They are both used exclusively when StreamingSymmetricHashJoinExec binary physical operator is requested to <> (and <> with an OneSideHashJoiner).

=== [[metrics]] Performance Metrics -- metrics Method

[source, scala]

metrics: StateStoreMetrics

metrics returns the combined <> of the <> and the <> state store handlers.

metrics is used when OneSideHashJoiner is requested to commitStateAndGetMetrics.

=== [[removeByKeyCondition]] removeByKeyCondition Method

[source, scala]

removeByKeyCondition( removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]


removeByKeyCondition creates an Iterator of UnsafeRowPairs that <> for which the given removalCondition predicate holds.

[[removeByKeyCondition-allKeyToNumValues]] removeByKeyCondition uses the <> for <>.

removeByKeyCondition is used when OneSideHashJoiner is requested to remove an old state (for JoinStateKeyWatermarkPredicate).

==== [[removeByKeyCondition-getNext]] getNext Internal Method (of removeByKeyCondition Method)

[source, scala]

getNext(): UnsafeRowPair

getNext goes over the keys and values in the <> sequence and <> (from the <>) and the <> (from the <>) for which the given removalCondition predicate holds.

=== [[removeByValueCondition]] removeByValueCondition Method

[source, scala]

removeByValueCondition( removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]


removeByValueCondition creates an Iterator of UnsafeRowPairs that <> for which the given removalCondition predicate holds.

removeByValueCondition is used when OneSideHashJoiner is requested to remove an old state (when JoinStateValueWatermarkPredicate is used).

==== [[removeByValueCondition-getNext]] getNext Internal Method (of removeByValueCondition Method)

[source, scala]

getNext(): UnsafeRowPair

getNext...FIXME

=== [[append]] Appending New Value Row to Key -- append Method

[source, scala]

append( key: UnsafeRow, value: UnsafeRow): Unit


append requests the <> for the <>.

In the end, append requests the stores for the following:

  • <> to <>

  • <> to <>.

append is used when OneSideHashJoiner is requested to storeAndJoinWithOtherSide.

=== [[get]] Retrieving Value Rows By Key -- get Method

[source, scala]

get(key: UnsafeRow): Iterator[UnsafeRow]

get requests the <> for the <>.

In the end, get requests the <> to <> and leaves value rows only.

get is used when OneSideHashJoiner is requested to storeAndJoinWithOtherSide and retrieving value rows for a key.

=== [[commit]] Committing State (Changes) -- commit Method

[source, scala]

commit(): Unit

commit simply requests the <> and <> state store handlers to <>.

commit is used when OneSideHashJoiner is requested to commit state changes and get performance metrics.

=== [[abortIfNeeded]] Aborting State (Changes) -- abortIfNeeded Method

[source, scala]

abortIfNeeded(): Unit

abortIfNeeded...FIXME

NOTE: abortIfNeeded is used when...FIXME

=== [[allStateStoreNames]] allStateStoreNames Object Method

[source, scala]

allStateStoreNames(joinSides: JoinSide*): Seq[String]

allStateStoreNames simply returns the <> for all possible combinations of the given JoinSides and the two possible store types (e.g. <> and <>).

NOTE: allStateStoreNames is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to <> (as a RDD[InternalRow]).

=== [[getStateStoreName]] getStateStoreName Object Method

[source, scala]

getStateStoreName( joinSide: JoinSide, storeType: StateStoreType): String


getStateStoreName simply returns a string of the following format:

[joinSide]-[storeType]

[NOTE]

getStateStoreName is used when:

  • StateStoreHandler is requested to <>

* SymmetricHashJoinStateManager utility is requested for <> (for StreamingSymmetricHashJoinExec physical operator to <>)

=== [[updateNumValueForCurrentKey]] updateNumValueForCurrentKey Internal Method

[source, scala]

updateNumValueForCurrentKey(): Unit

updateNumValueForCurrentKey...FIXME

NOTE: updateNumValueForCurrentKey is used exclusively when SymmetricHashJoinStateManager is requested to <>.

=== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| keyAttributes a| [[keyAttributes]] Key attributes, i.e. AttributeReferences of the <>

Used exclusively in KeyWithIndexToValueStore when requested for the <>, <>, <> and <>

| keySchema a| [[keySchema]] Key schema (StructType) based on the <> with the names in the format of field and their ordinals (index)

Used when:

  • SymmetricHashJoinStateManager is requested for the <> (for <>)

  • KeyToNumValuesStore is requested for the <>

  • KeyWithIndexToValueStore is requested for the <> (for the internal <>)

|===


Last update: 2020-11-28