Skip to content

OneSideHashJoiner

OneSideHashJoiner manages join state of one side of a <> (using <>).

OneSideHashJoiner is <> exclusively for <> physical operator (when requested to <>).

.OneSideHashJoiner and StreamingSymmetricHashJoinExec image::images/OneSideHashJoiner.png[align="center"]

StreamingSymmetricHashJoinExec physical operator uses two OneSideHashJoiners per side of the stream-stream join (<> and <> sides).

OneSideHashJoiner uses an <> to <>.

NOTE: OneSideHashJoiner is a Scala private internal class of <> and so has full access to StreamingSymmetricHashJoinExec properties.

Creating OneSideHashJoiner Instance

OneSideHashJoiner takes the following to be created:

  • [[joinSide]] JoinSide
  • [[inputAttributes]] Input attributes (Seq[Attribute])
  • [[joinKeys]] Join keys (Seq[Expression])
  • [[inputIter]] Input rows (Iterator[InternalRow])
  • [[preJoinFilterExpr]] Optional pre-join filter Catalyst expression
  • [[postJoinFilter]] Post-join filter ((InternalRow) => Boolean)
  • <>

OneSideHashJoiner initializes the <>.

=== [[joinStateManager]] SymmetricHashJoinStateManager -- joinStateManager Internal Property

[source, scala]

joinStateManager: SymmetricHashJoinStateManager

joinStateManager is a SymmetricHashJoinStateManager that is created for a OneSideHashJoiner (with the <>, the <>, the <>, and the <> of the owning <>).

joinStateManager is used when OneSideHashJoiner is requested for the following:

  • <>

  • <>

  • <>

  • <>

=== [[updatedStateRowsCount]] Number of Updated State Rows -- updatedStateRowsCount Internal Counter

updatedStateRowsCount is the number the join keys and associated rows that were persisted as a join state, i.e. how many times <> requested the <> to append the join key and the input row (to a join state).

updatedStateRowsCount is then used (via <> method) for the <> performance metric.

updatedStateRowsCount is available via numUpdatedStateRows method.

[[numUpdatedStateRows]] [source, scala]


numUpdatedStateRows: Long

NOTE: numUpdatedStateRows is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to <> (and <>).

=== [[stateWatermarkPredicate]] Optional Join State Watermark Predicate -- stateWatermarkPredicate Internal Property

[source, scala]

stateWatermarkPredicate: Option[JoinStateWatermarkPredicate]

When <>, OneSideHashJoiner is given a <>.

stateWatermarkPredicate is used for the <> (when a <>) and the <> (when a <>) that are both used when OneSideHashJoiner is requested to <>.

=== [[storeAndJoinWithOtherSide]] storeAndJoinWithOtherSide Method

[source, scala]

storeAndJoinWithOtherSide( otherSideJoiner: OneSideHashJoiner)( generateJoinedRow: (InternalRow, InternalRow) => JoinedRow): Iterator[InternalRow]


storeAndJoinWithOtherSide tries to find the watermark attribute among the input attributes.

storeAndJoinWithOtherSide creates a watermark expression (for the watermark attribute and the current event-time watermark).

[[storeAndJoinWithOtherSide-nonLateRows]] With the watermark attribute found, storeAndJoinWithOtherSide generates a new predicate for the watermark expression and the <> that is then used to filter out (exclude) late rows from the <>. Otherwise, the input rows are left unchanged (i.e. no rows are considered late and excluded).

[[storeAndJoinWithOtherSide-nonLateRows-flatMap]] For every <> (possibly <>), storeAndJoinWithOtherSide applies the <> predicate and branches off per result (<> or <>).

NOTE: storeAndJoinWithOtherSide is used when StreamingSymmetricHashJoinExec physical operator is requested to <>.

==== [[preJoinFilter-true]] preJoinFilter Predicate Positive (true)

When the <> predicate succeeds on an input row, storeAndJoinWithOtherSide extracts the join key (using the <>) and requests the given OneSideHashJoiner (otherSideJoiner) for the <> that is in turn requested for the state values for the extracted join key. The values are then processed (mapped over) using the given generateJoinedRow function and then filtered by the <>.

storeAndJoinWithOtherSide uses the <> (on the extracted join key) and the <> (on the current input row) to determine whether to request the <> to append the key and the input row (to a join state). If so, storeAndJoinWithOtherSide increments the <> counter.

==== [[preJoinFilter-false]] preJoinFilter Predicate Negative (false)

When the <> predicate fails on an input row, storeAndJoinWithOtherSide creates a new Iterator[InternalRow] of joined rows per <> and <>:

  • For LeftSide and LeftOuter, the join row is the current row with the values of the right side all null (nullRight)

  • For RightSide and RightOuter, the join row is the current row with the values of the left side all null (nullLeft)

  • For all other combinations, the iterator is simply empty (that will be removed from the output by the outer <>).

=== [[removeOldState]] Removing Old State -- removeOldState Method

[source, scala]

removeOldState(): Iterator[UnsafeRowPair]

removeOldState branches off per the <>:

  • For <>, removeOldState requests the <> to removeByKeyCondition (with the <>)

  • For <>, removeOldState requests the <> to removeByValueCondition (with the <>)

  • For any other predicates, removeOldState returns an empty iterator (no rows to process)

NOTE: removeOldState is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to <>.

=== [[get]] Retrieving Value Rows For Key -- get Method

[source, scala]

get(key: UnsafeRow): Iterator[UnsafeRow]

get simply requests the <> to retrieve value rows for the key.

NOTE: get is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to <>.

=== [[commitStateAndGetMetrics]] Committing State (Changes) and Requesting Performance Metrics -- commitStateAndGetMetrics Method

[source, scala]

commitStateAndGetMetrics(): StateStoreMetrics

commitStateAndGetMetrics simply requests the <> to commit followed by requesting for the performance metrics.

commitStateAndGetMetrics is used when StreamingSymmetricHashJoinExec physical operator is requested to <>.

Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| keyGenerator a| [[keyGenerator]]

[source, scala]

keyGenerator: UnsafeProjection

Function to project (extract) join keys from an input row

Used when...FIXME

| preJoinFilter a| [[preJoinFilter]]

[source, scala]

preJoinFilter: InternalRow => Boolean

Used when...FIXME

| stateKeyWatermarkPredicateFunc a| [[stateKeyWatermarkPredicateFunc]]

[source, scala]

stateKeyWatermarkPredicateFunc: InternalRow => Boolean

Predicate for late rows based on the <>

Used for the following:

| stateValueWatermarkPredicateFunc a| [[stateValueWatermarkPredicateFunc]]

[source, scala]

stateValueWatermarkPredicateFunc: InternalRow => Boolean

Predicate for late rows based on the <>

Used for the following:

|===