Skip to content

ShuffleExchangeExec Unary Physical Operator

ShuffleExchangeExec is an Exchange (indirectly as a ShuffleExchangeLike) unary physical operator that is used to perform a shuffle.

Creating Instance

ShuffleExchangeExec takes the following to be created:

ShuffleExchangeExec is created when:

Node Name

nodeName: String

nodeName is part of the TreeNode abstraction.

nodeName is Exchange.

Performance Metrics

Key Name (in web UI)
dataSize data size
fetchWaitTime fetch wait time
localBlocksFetched local blocks read
localBytesRead local bytes read
recordsRead records read
remoteBlocksFetched remote blocks read
remoteBytesRead remote bytes read
remoteBytesReadToDisk remote bytes read to disk
shuffleBytesWritten shuffle bytes written
shuffleRecordsWritten shuffle records written
shuffleWriteTime shuffle write time

ShuffleExchangeExec in web UI (Details for Query)

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute gives a ShuffledRowRDD (with the ShuffleDependency and read performance metrics).

doExecute uses cachedShuffleRDD to avoid multiple execution.

Creating ShuffleDependency

prepareShuffleDependency(
  rdd: RDD[InternalRow],
  outputAttributes: Seq[Attribute],
  newPartitioning: Partitioning,
  serializer: Serializer,
  writeMetrics: Map[String, SQLMetric]): ShuffleDependency[Int, InternalRow, InternalRow]

prepareShuffleDependency creates a ShuffleDependency (Apache Spark) with an RDD[Product2[Int, InternalRow]] (where Ints are partition IDs of the InternalRows values) and the given Serializer (e.g. the Serializer of the ShuffleExchangeExec physical operator).

Partitioner

prepareShuffleDependency determines a Partitioner based on the given newPartitioning Partitioning:

getPartitionKeyExtractor Internal Method

prepareShuffleDependency defines a getPartitionKeyExtractor method.

getPartitionKeyExtractor(): InternalRow => Any

getPartitionKeyExtractor uses the given newPartitioning Partitioning:

isRoundRobin Internal Flag

prepareShuffleDependency determines whether "this" is isRoundRobin or not based on the given newPartitioning partitioning. It is isRoundRobin when the partitioning is a RoundRobinPartitioning with more than one partition.

rddWithPartitionIds RDD

prepareShuffleDependency creates a rddWithPartitionIds:

  1. Firstly, prepareShuffleDependency determines a newRdd based on isRoundRobin flag and spark.sql.execution.sortBeforeRepartition configuration property. When both are enabled (true), prepareShuffleDependency sorts partitions (using a UnsafeExternalRowSorter) Otherwise, prepareShuffleDependency returns the given RDD[InternalRow] (unchanged).
  2. Secondly, prepareShuffleDependency determines whether this is isOrderSensitive or not. This is isOrderSensitive when isRoundRobin flag is enabled (true) while spark.sql.execution.sortBeforeRepartition configuration property is not (false).

prepareShuffleDependency...FIXME

Usage

prepareShuffleDependency is used when:

  • CollectLimitExec, <> and TakeOrderedAndProjectExec physical operators are executed

UnsafeRowSerializer

serializer: Serializer

serializer is an UnsafeRowSerializer with the following properties:

serializer is used when ShuffleExchangeExec operator is requested for a ShuffleDependency.

ShuffledRowRDD

cachedShuffleRDD: ShuffledRowRDD

cachedShuffleRDD is an internal registry for the ShuffledRowRDD that ShuffleExchangeExec operator creates when executed.

The purpose of cachedShuffleRDD is to avoid multiple executions of ShuffleExchangeExec operator when it is reused in a query plan:

  • cachedShuffleRDD is uninitialized (null) when ShuffleExchangeExec operator is created
  • cachedShuffleRDD is assigned a ShuffledRowRDD when ShuffleExchangeExec operator is executed for the first time

ShuffleDependency

shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow]

shuffleDependency is a Spark Core ShuffleDependency.

shuffleDependency lazy value

shuffleDependency is a Scala lazy value which is computed once when accessed and cached afterwards.

ShuffleExchangeExec operator creates a ShuffleDependency for the following:

shuffleDependency is used when:

createShuffleWriteProcessor

createShuffleWriteProcessor(
  metrics: Map[String, SQLMetric]): ShuffleWriteProcessor

createShuffleWriteProcessor creates a Spark Core ShuffleWriteProcessor for the only reason to plug in a custom ShuffleWriteMetricsReporter (SQLShuffleWriteMetricsReporter).

createShuffleWriteProcessor is used when ShuffleExchangeExec operator is executed (and requested to prepareShuffleDependency).

Demo

ShuffleExchangeExec and Repartition Logical Operator

val q = spark.range(6).repartition(2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Optimized Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Physical Plan ==
Exchange RoundRobinPartitioning(2), false, [id=#8]
+- *(1) Range (0, 6, step=1, splits=16)

ShuffleExchangeExec and RepartitionByExpression Logical Operator

val q = spark.range(6).repartition(2, 'id % 2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'RepartitionByExpression [('id % 2)], 2
+- Range (0, 6, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [(id#4L % cast(2 as bigint))], 2
+- Range (0, 6, step=1, splits=Some(16))

== Optimized Logical Plan ==
RepartitionByExpression [(id#4L % 2)], 2
+- Range (0, 6, step=1, splits=Some(16))

== Physical Plan ==
Exchange hashpartitioning((id#4L % 2), 2), false, [id=#17]
+- *(1) Range (0, 6, step=1, splits=16)

Last update: 2021-05-17