Skip to content

BroadcastHashJoinExec Binary Physical Operator

BroadcastHashJoinExec is a binary physical operator to perform a broadcast hash join.

BroadcastHashJoinExec is <> after applying JoinSelection execution planning strategy to spark-sql-ExtractEquiJoinKeys.md[ExtractEquiJoinKeys]-destructurable logical query plans (i.e. INNER, CROSS, LEFT OUTER, LEFT SEMI, LEFT ANTI) of which the right physical operator can be broadcast.

BroadcastHashJoinExec supports Java code generation (aka codegen).

val tokens = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "BroadcastHashJoinExec")
).toDF("id", "token")

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res0: String = 10485760

val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
   :- LocalTableScan [id#15, token#16]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#20, token#21]

BroadcastHashJoinExec <> for the two children physical operators match BroadcastDistribution (with a HashedRelationBroadcastMode) and UnspecifiedDistribution (for <> and <> sides of a join or vice versa).

[[metrics]] .BroadcastHashJoinExec's Performance Metrics [cols="1,2,2",options="header",width="100%"] |=== | Key | Name (in web UI) | Description

| [[numOutputRows]] numOutputRows | number of output rows |

| [[avgHashProbe]] avgHashProbe | avg hash probe | |===

.BroadcastHashJoinExec in web UI (Details for Query) image::images/spark-sql-BroadcastHashJoinExec-webui-query-details.png[align="center"]

NOTE: The prefix for variable names for BroadcastHashJoinExec operators in CodegenSupport-generated code is bhj.

[source, scala]

scala> q.queryExecution.debug.codegen Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *Project [id#15, token#16, token#21] +- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight :- LocalTableScan [id#15, token#16] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#20, token#21]

Generated code: /* 001 / public Object generate(Object[] references) { / 002 / return new GeneratedIterator(references); / 003 / } / 004 / / 005 / final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { / 006 / private Object[] references; / 007 / private scala.collection.Iterator[] inputs; / 008 / private scala.collection.Iterator inputadapter_input; / 009 / private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast; / 010 / private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation; / 011 / private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows; / 012 / private UnsafeRow bhj_result; / 013 / private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder; / 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter; ...


[[requiredChildDistribution]] .BroadcastHashJoinExec's Required Child Output Distributions [cols="1m,2,2",options="header",width="100%"] |=== | BuildSide | Left Child | Right Child

| BuildLeft | BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys | UnspecifiedDistribution

| BuildRight | UnspecifiedDistribution | BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys |===

=== [[doExecute]] Executing Physical Operator (Generating RDD[InternalRow]) -- doExecute Method

[source, scala]

doExecute(): RDD[InternalRow]

doExecute...FIXME

doExecute is part of the SparkPlan abstraction.

=== [[codegenInner]] Generating Java Source Code for Inner Join -- codegenInner Internal Method

[source, scala]

codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenInner...FIXME

NOTE: codegenInner is used exclusively when BroadcastHashJoinExec is requested to <>.

=== [[codegenOuter]] Generating Java Source Code for Left or Right Outer Join -- codegenOuter Internal Method

[source, scala]

codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenOuter...FIXME

NOTE: codegenOuter is used exclusively when BroadcastHashJoinExec is requested to <>.

=== [[codegenSemi]] Generating Java Source Code for Left Semi Join -- codegenSemi Internal Method

[source, scala]

codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenSemi...FIXME

NOTE: codegenSemi is used exclusively when BroadcastHashJoinExec is requested to <>.

=== [[codegenAnti]] Generating Java Source Code for Anti Join -- codegenAnti Internal Method

[source, scala]

codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenAnti...FIXME

NOTE: codegenAnti is used exclusively when BroadcastHashJoinExec is requested to <>.

=== [[codegenExistence]] codegenExistence Internal Method

[source, scala]

codegenExistence(ctx: CodegenContext, input: Seq[ExprCode]): String

codegenExistence...FIXME

NOTE: codegenExistence is used exclusively when BroadcastHashJoinExec is requested to <>.

=== [[genStreamSideJoinKey]] genStreamSideJoinKey Internal Method

[source, scala]

genStreamSideJoinKey( ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String)


genStreamSideJoinKey...FIXME

NOTE: genStreamSideJoinKey is used when BroadcastHashJoinExec is requested to generate the Java source code for <>, <>, <>, <> and <> joins (for the "consume" path in whole-stage code generation).

=== [[creating-instance]] Creating BroadcastHashJoinExec Instance

BroadcastHashJoinExec takes the following when created:

  • [[leftKeys]] Left join key expressions/Expression.md[expressions]
  • [[rightKeys]] Right join key expressions/Expression.md[expressions]
  • [[joinType]] spark-sql-joins.md#join-types[Join type]
  • [[buildSide]] BuildSide
  • [[condition]] Optional join condition expressions/Expression.md[expression]
  • [[left]] Left SparkPlan.md[physical operator]
  • [[right]] Right SparkPlan.md[physical operator]

Last update: 2020-10-21