ShuffleExchangeExec Unary Physical Operator¶
ShuffleExchangeExec
is an Exchange unary physical operator that is used to perform a shuffle.
Creating Instance¶
ShuffleExchangeExec
takes the following to be created:
- Output Partitioning
- Child physical operator
-
canChangeNumPartitions
flag (default:true
)
ShuffleExchangeExec
is created when:
- BasicOperators execution planning strategy is executed (and plans Repartition with the shuffle flag enabled or RepartitionByExpression)
- EnsureRequirements physical optimization is executed
Node Name¶
nodeName: String
nodeName
is always Exchange.
nodeName
is part of the TreeNode abstraction.
Performance Metrics¶
Key | Name (in web UI) | Description |
---|---|---|
dataSize | data size | |
fetchWaitTime | fetch wait time | |
localBlocksFetched | local blocks read | |
localBytesRead | local bytes read | |
recordsRead | records read | |
remoteBlocksFetched | remote blocks read | |
remoteBytesRead | remote bytes read | |
remoteBytesReadToDisk | remote bytes read to disk | |
shuffleBytesWritten | shuffle bytes written | |
shuffleRecordsWritten | shuffle records written | |
shuffleWriteTime | shuffle write time |
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
gives a ShuffledRowRDD for the ShuffleDependency and read performance metrics.
doExecute
uses cachedShuffleRDD to avoid multiple execution.
doExecute
is part of the SparkPlan abstraction.
Creating ShuffleDependency¶
prepareShuffleDependency(
rdd: RDD[InternalRow],
outputAttributes: Seq[Attribute],
newPartitioning: Partitioning,
serializer: Serializer,
writeMetrics: Map[String, SQLMetric]): ShuffleDependency[Int, InternalRow, InternalRow]
prepareShuffleDependency
creates a Spark Core ShuffleDependency
with a RDD[Product2[Int, InternalRow]]
(where Ints
are partition IDs of the InternalRows
values) and the given Serializer
(e.g. the <ShuffleExchangeExec
physical operator).
Partitioner¶
prepareShuffleDependency
determines a Partitioner
based on the given newPartitioning
Partitioning:
- For RoundRobinPartitioning,
prepareShuffleDependency
creates aHashPartitioner
for the same number of partitions - For HashPartitioning,
prepareShuffleDependency
creates aPartitioner
for the same number of partitions andgetPartition
that is an "identity" - For RangePartitioning,
prepareShuffleDependency
creates aRangePartitioner
for the same number of partitions andsamplePointsPerPartitionHint
based on spark.sql.execution.rangeExchange.sampleSizePerPartition configuration property - For SinglePartition,
prepareShuffleDependency
creates aPartitioner
with1
for the number of partitions andgetPartition
that always gives0
getPartitionKeyExtractor Internal Method¶
prepareShuffleDependency
defines a getPartitionKeyExtractor
method.
getPartitionKeyExtractor(): InternalRow => Any
getPartitionKeyExtractor
uses the given newPartitioning
Partitioning:
- For RoundRobinPartitioning,...FIXME
- For HashPartitioning,...FIXME
- For RangePartitioning,...FIXME
- For SinglePartition,...FIXME
isRoundRobin Internal Flag¶
prepareShuffleDependency
determines whether "this" is isRoundRobin
or not based on the given newPartitioning
partitioning. It is isRoundRobin
when the partitioning is a RoundRobinPartitioning
with more than one partition.
rddWithPartitionIds RDD¶
prepareShuffleDependency
creates a rddWithPartitionIds
:
- Firstly,
prepareShuffleDependency
determines anewRdd
based onisRoundRobin
flag and spark.sql.execution.sortBeforeRepartition configuration property. When both are enabled (true
),prepareShuffleDependency
sorts partitions (using aUnsafeExternalRowSorter
) Otherwise,prepareShuffleDependency
returns the givenRDD[InternalRow]
(unchanged). - Secondly,
prepareShuffleDependency
determines whether this isisOrderSensitive
or not. This isisOrderSensitive
whenisRoundRobin
flag is enabled (true
) while spark.sql.execution.sortBeforeRepartition configuration property is not (false
).
prepareShuffleDependency
...FIXME
Usage¶
prepareShuffleDependency
is used when:
- CollectLimitExec, <
> and TakeOrderedAndProjectExec physical operators are executed
UnsafeRowSerializer¶
serializer: Serializer
serializer
is an UnsafeRowSerializer
with the following properties:
- Number of fields is the number of the output attributes of the child physical operator
- dataSize performance metric
serializer
is used when ShuffleExchangeExec
operator is requested for a ShuffleDependency.
ShuffledRowRDD¶
cachedShuffleRDD: ShuffledRowRDD
cachedShuffleRDD
is an internal registry for the ShuffledRowRDD that ShuffleExchangeExec
operator creates when executed.
The purpose of cachedShuffleRDD
is to avoid multiple executions of ShuffleExchangeExec
operator when it is reused in a query plan:
cachedShuffleRDD
is uninitialized (null
) whenShuffleExchangeExec
operator is createdcachedShuffleRDD
is assigned aShuffledRowRDD
whenShuffleExchangeExec
operator is executed for the first time
ShuffleDependency¶
shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow]
shuffleDependency
is a Spark Core ShuffleDependency
.
shuffleDependency lazy value
shuffleDependency
is a Scala lazy value which is computed once when accessed and cached afterwards.
ShuffleExchangeExec
operator creates a ShuffleDependency for the following:
- RDD[InternalRow]
- Output attributes of the child physical operator
- Output partitioning
- UnsafeRowSerializer
- writeMetrics
shuffleDependency
is used when:
- CustomShuffleReaderExec physical operator is executed
- OptimizeLocalShuffleReader is requested to
getPartitionSpecs
- OptimizeSkewedJoin physical optimization is executed
ShuffleExchangeExec
physical operator is executed and requested for MapOutputStatistics
createShuffleWriteProcessor¶
createShuffleWriteProcessor(
metrics: Map[String, SQLMetric]): ShuffleWriteProcessor
createShuffleWriteProcessor
creates a Spark Core ShuffleWriteProcessor
for the only reason to plug in a custom ShuffleWriteMetricsReporter
(SQLShuffleWriteMetricsReporter
).
createShuffleWriteProcessor
is used when ShuffleExchangeExec
operator is executed (and requested to prepareShuffleDependency).
Demo¶
ShuffleExchangeExec and Repartition Logical Operator¶
val q = spark.range(6).repartition(2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))
== Analyzed Logical Plan ==
id: bigint
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))
== Optimized Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))
== Physical Plan ==
Exchange RoundRobinPartitioning(2), false, [id=#8]
+- *(1) Range (0, 6, step=1, splits=16)
ShuffleExchangeExec and RepartitionByExpression Logical Operator¶
val q = spark.range(6).repartition(2, 'id % 2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'RepartitionByExpression [('id % 2)], 2
+- Range (0, 6, step=1, splits=Some(16))
== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [(id#4L % cast(2 as bigint))], 2
+- Range (0, 6, step=1, splits=Some(16))
== Optimized Logical Plan ==
RepartitionByExpression [(id#4L % 2)], 2
+- Range (0, 6, step=1, splits=Some(16))
== Physical Plan ==
Exchange hashpartitioning((id#4L % 2), 2), false, [id=#17]
+- *(1) Range (0, 6, step=1, splits=16)