SymmetricHashJoinStateManager¶
SymmetricHashJoinStateManager
is <StreamingSymmetricHashJoinExec
is requested to process partitions of the left and right sides of a stream-stream join).
SymmetricHashJoinStateManager
manages join state using the <
Creating Instance¶
SymmetricHashJoinStateManager
takes the following to be created:
- [[joinSide]] JoinSide
- [[inputValueAttributes]] Attributes of input values
- [[joinKeys]] Join keys (
Seq[Expression]
) - [[stateInfo]] StatefulOperatorStateInfo
- [[storeConf]] StateStoreConf
- [[hadoopConf]] Hadoop Configuration
=== [[keyToNumValues]][[keyWithIndexToValue]] KeyToNumValuesStore and KeyWithIndexToValueStore State Store Handlers -- keyToNumValues
and keyWithIndexToValue
Internal Properties
SymmetricHashJoinStateManager
uses a <keyToNumValues
) and a <keyWithIndexToValue
) internally that are created immediately when SymmetricHashJoinStateManager
is <
keyToNumValues
and keyWithIndexToValue
are used when SymmetricHashJoinStateManager
is requested for the following:
-
<
> -
<
> -
<
> -
<
> -
<
> -
<
> -
<
>
=== [[joinSide-internals]] Join Side Marker -- JoinSide
Internal Enum
JoinSide
can be one of the two possible values:
-
[[LeftSide]][[left]]
LeftSide
(alias:left
) -
[[RightSide]][[right]]
RightSide
(alias:right
)
They are both used exclusively when StreamingSymmetricHashJoinExec
binary physical operator is requested to <
=== [[metrics]] Performance Metrics -- metrics
Method
[source, scala]¶
metrics: StateStoreMetrics¶
metrics
returns the combined <
metrics
is used when OneSideHashJoiner
is requested to commitStateAndGetMetrics.
=== [[removeByKeyCondition]] removeByKeyCondition
Method
[source, scala]¶
removeByKeyCondition( removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]
removeByKeyCondition
creates an Iterator
of UnsafeRowPairs
that <removalCondition
predicate holds.
[[removeByKeyCondition-allKeyToNumValues]] removeByKeyCondition
uses the <
removeByKeyCondition
is used when OneSideHashJoiner
is requested to remove an old state (for JoinStateKeyWatermarkPredicate).
==== [[removeByKeyCondition-getNext]] getNext
Internal Method (of removeByKeyCondition
Method)
[source, scala]¶
getNext(): UnsafeRowPair¶
getNext
goes over the keys and values in the <removalCondition
predicate holds.
=== [[removeByValueCondition]] removeByValueCondition
Method
[source, scala]¶
removeByValueCondition( removalCondition: UnsafeRow => Boolean): Iterator[UnsafeRowPair]
removeByValueCondition
creates an Iterator
of UnsafeRowPairs
that <removalCondition
predicate holds.
removeByValueCondition
is used when OneSideHashJoiner
is requested to remove an old state (when JoinStateValueWatermarkPredicate is used).
==== [[removeByValueCondition-getNext]] getNext
Internal Method (of removeByValueCondition
Method)
[source, scala]¶
getNext(): UnsafeRowPair¶
getNext
...FIXME
=== [[append]] Appending New Value Row to Key -- append
Method
[source, scala]¶
append( key: UnsafeRow, value: UnsafeRow): Unit
append
requests the <
In the end, append
requests the stores for the following:
-
<
> to < > -
<
> to < >.
append
is used when OneSideHashJoiner
is requested to storeAndJoinWithOtherSide.
=== [[get]] Retrieving Value Rows By Key -- get
Method
[source, scala]¶
get(key: UnsafeRow): Iterator[UnsafeRow]¶
get
requests the <
In the end, get
requests the <
get
is used when OneSideHashJoiner
is requested to storeAndJoinWithOtherSide and retrieving value rows for a key.
=== [[commit]] Committing State (Changes) -- commit
Method
[source, scala]¶
commit(): Unit¶
commit
simply requests the <
commit
is used when OneSideHashJoiner
is requested to commit state changes and get performance metrics.
=== [[abortIfNeeded]] Aborting State (Changes) -- abortIfNeeded
Method
[source, scala]¶
abortIfNeeded(): Unit¶
abortIfNeeded
...FIXME
NOTE: abortIfNeeded
is used when...FIXME
=== [[allStateStoreNames]] allStateStoreNames
Object Method
[source, scala]¶
allStateStoreNames(joinSides: JoinSide*): Seq[String]¶
allStateStoreNames
simply returns the <JoinSides
and the two possible store types (e.g. <
NOTE: allStateStoreNames
is used exclusively when StreamingSymmetricHashJoinExec
physical operator is requested to <RDD[InternalRow]
).
=== [[getStateStoreName]] getStateStoreName
Object Method
[source, scala]¶
getStateStoreName( joinSide: JoinSide, storeType: StateStoreType): String
getStateStoreName
simply returns a string of the following format:
[joinSide]-[storeType]
[NOTE]¶
getStateStoreName
is used when:
StateStoreHandler
is requested to <>
* SymmetricHashJoinStateManager
utility is requested for <> (for StreamingSymmetricHashJoinExec
physical operator to <>)¶
StreamingSymmetricHashJoinExec
physical operator to <=== [[updateNumValueForCurrentKey]] updateNumValueForCurrentKey
Internal Method
[source, scala]¶
updateNumValueForCurrentKey(): Unit¶
updateNumValueForCurrentKey
...FIXME
NOTE: updateNumValueForCurrentKey
is used exclusively when SymmetricHashJoinStateManager
is requested to <
=== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| keyAttributes a| [[keyAttributes]] Key attributes, i.e. AttributeReferences
of the <
Used exclusively in KeyWithIndexToValueStore
when requested for the <
| keySchema a| [[keySchema]] Key schema (StructType
) based on the <
Used when:
-
SymmetricHashJoinStateManager
is requested for the <> (for < >) -
KeyToNumValuesStore
is requested for the <> -
KeyWithIndexToValueStore
is requested for the <> (for the internal < >)
|===