Skip to content

BroadcastHashJoinExec Physical Operator

BroadcastHashJoinExec is a hash-based join physical operator for broadcast hash join.

BroadcastHashJoinExec supports Java code generation (variable prefix: bhj).

Performance Metrics

Key Name (in web UI) Description
numOutputRows number of output rows Number of output rows

BroadcastHashJoinExec in web UI (Details for Query)

Creating Instance

BroadcastHashJoinExec takes the following to be created:

BroadcastHashJoinExec is created when:

isNullAwareAntiJoin Flag

BroadcastHashJoinExec can be given isNullAwareAntiJoin flag when created.

isNullAwareAntiJoin flag is false by default.

isNullAwareAntiJoin flag is true when:

If enabled, BroadcastHashJoinExec makes sure that the following all hold:

  1. There is one left key only
  2. There is one right key only
  3. Join Type is LeftAnti
  4. Build Side is BuildRight
  5. Join condition is not defined

isNullAwareAntiJoin is used for the following:

Required Child Output Distribution

requiredChildDistribution: Seq[Distribution]

requiredChildDistribution is part of the SparkPlan abstraction.

BuildSide Left Child Right Child
BuildLeft BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys UnspecifiedDistribution
BuildRight UnspecifiedDistribution BroadcastDistribution with HashedRelationBroadcastMode broadcast mode of build join keys

Output Data Partitioning Requirements

outputPartitioning: Partitioning

outputPartitioning is part of the SparkPlan abstraction.

outputPartitioning...FIXME

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute requests the buildPlan to executeBroadcast (that gives a broadcast variable with a HashedRelation).

doExecute branches off based on isNullAwareAntiJoin flag: enabled or not.

isNullAwareAntiJoin Enabled

doExecute...FIXME

isNullAwareAntiJoin Disabled

doExecute requests the streamedPlan to execute (that gives an RDD[InternalRow]) and maps over partitions (RDD.mapPartitions):

  1. Takes the read-only copy of the HashedRelation (from the broadcast variable)
  2. Increment the peak execution memory (of the task) by the size of the HashedRelation
  3. Joins the rows with the HashedRelation (with the numOutputRows metric)

Generating Java Code for Anti Join

codegenAnti(
  ctx: CodegenContext,
  input: Seq[ExprCode]): String

codegenAnti is part of the HashJoin abstraction.

codegenAnti...FIXME

Demo

val tokens = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "BroadcastHashJoinExec")
).toDF("id", "token")

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res0: String = 10485760

val q = tokens.join(tokens, Seq("id"), "inner")
scala> q.explain
== Physical Plan ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
   :- LocalTableScan [id#15, token#16]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#20, token#21]
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#15, token#16, token#21]
+- *BroadcastHashJoin [id#15], [id#20], Inner, BuildRight
   :- LocalTableScan [id#15, token#16]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [id#20, token#21]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 010 */   private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation;
/* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 012 */   private UnsafeRow bhj_result;
/* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
...

Last update: 2021-05-09
Back to top