RepartitionOperation Unary Logical Operators¶
RepartitionOperation
is an extension of the UnaryNode abstraction for repartition operations.
Contract¶
shuffle¶
shuffle: Boolean
numPartitions¶
numPartitions: Int
Implementations¶
Logical Optimizations¶
-
CollapseRepartition logical optimization collapses adjacent repartition operations
-
Repartition operations allow FoldablePropagation and PushDownPredicate logical optimizations to "push through"
-
PropagateEmptyRelation logical optimization may result in an empty LocalRelation for repartition operations
Output Attributes¶
output: Seq[Attribute]
output
simply requests the child logical operator for the output attributes.
output
is part of the QueryPlan abstraction.
Repartition Logical Operator¶
Repartition
is a concrete RepartitionOperation that takes the following to be created:
- Number of Partitions (must be positive)
- shuffle flag
- Child Logical Plan
Repartition
is created for the following:
- Dataset.coalesce and Dataset.repartition operators (with shuffle disabled and enabled, respectively)
COALESCE
andREPARTITION
hints (via ResolveCoalesceHints logical analysis rule, with shuffle disabled and enabled, respectively)
Repartition
is planned to ShuffleExchangeExec or CoalesceExec physical operators (based on shuffle flag).
Catalyst DSL¶
Catalyst DSL defines the following operators to create Repartition
logical operators:
- coalesce (with shuffle disabled)
- repartition (with shuffle enabled)
RepartitionByExpression Logical Operator¶
RepartitionByExpression
is a concrete RepartitionOperation that takes the following to be created:
- Partition Expressions
- Child Logical Plan
- Number of Partitions (must be positive)
RepartitionByExpression
is also called distribute operator.
RepartitionByExpression
is created for the following:
- Dataset.repartition and Dataset.repartitionByRange operators
COALESCE
,REPARTITION
andREPARTITION_BY_RANGE
hints (via ResolveCoalesceHints logical analysis rule)DISTRIBUTE BY
andCLUSTER BY
SQL clauses (via SparkSqlAstBuilder)
RepartitionByExpression
is planned to ShuffleExchangeExec physical operator.
Catalyst DSL¶
Catalyst DSL defines distribute operator to create RepartitionByExpression
logical operators.
Partitioning¶
RepartitionByExpression
determines a Partitioning when created.
Maximum Number of Rows¶
maxRows: Option[Long]
maxRows
simply requests the child logical operator for the maximum number of rows.
maxRows
is part of the LogicalPlan abstraction.
shuffle¶
shuffle: Boolean
shuffle
is always true
.
shuffle
is part of the RepartitionOperation abstraction.
Demo¶
val nums = spark.range(5)
scala> nums.rdd.getNumPartitions
res1: Int = 16
Repartition Operator¶
val numsRepartitioned = nums.repartition(numPartitions = 4)
assert(numsRepartitioned.rdd.getNumPartitions == 4, "Number of partitions should be 4")
scala> numsRepartitioned.explain(extended = true)
== Parsed Logical Plan ==
Repartition 4, true
+- Range (0, 5, step=1, splits=Some(16))
== Analyzed Logical Plan ==
id: bigint
Repartition 4, true
+- Range (0, 5, step=1, splits=Some(16))
== Optimized Logical Plan ==
Repartition 4, true
+- Range (0, 5, step=1, splits=Some(16))
== Physical Plan ==
Exchange RoundRobinPartitioning(4), false, [id=#31]
+- *(1) Range (0, 5, step=1, splits=16)
Repartition Operator (Twice)¶
val numsRepartitionedTwice = numsRepartitioned.repartition(numPartitions = 8)
assert(numsRepartitionedTwice.rdd.getNumPartitions == 8, "Number of partitions should be 4")
scala> numsRepartitionedTwice.explain(extended = true)
== Parsed Logical Plan ==
Repartition 8, true
+- Repartition 4, true
+- Range (0, 5, step=1, splits=Some(16))
== Analyzed Logical Plan ==
id: bigint
Repartition 8, true
+- Repartition 4, true
+- Range (0, 5, step=1, splits=Some(16))
== Optimized Logical Plan ==
Repartition 8, true
+- Range (0, 5, step=1, splits=Some(16))
== Physical Plan ==
Exchange RoundRobinPartitioning(8), false, [id=#77]
+- *(1) Range (0, 5, step=1, splits=16)
Coalesce Operator¶
val numsCoalesced = nums.coalesce(numPartitions = 4)
assert(numsCoalesced.rdd.getNumPartitions == 4, "Number of partitions should be 4")
scala> numsCoalesced.explain(extended = true)
== Parsed Logical Plan ==
Repartition 4, false
+- Range (0, 5, step=1, splits=Some(16))
== Analyzed Logical Plan ==
id: bigint
Repartition 4, false
+- Range (0, 5, step=1, splits=Some(16))
== Optimized Logical Plan ==
Repartition 4, false
+- Range (0, 5, step=1, splits=Some(16))
== Physical Plan ==
Coalesce 4
+- *(1) Range (0, 5, step=1, splits=16)
RepartitionByExpression (Partition Expressions Only)¶
val q = nums.repartition(partitionExprs = 'id % 2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'RepartitionByExpression [('id % 2)], 200
+- Range (0, 5, step=1, splits=Some(16))
== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [(id#2L % cast(2 as bigint))], 200
+- Range (0, 5, step=1, splits=Some(16))
== Optimized Logical Plan ==
RepartitionByExpression [(id#2L % 2)], 200
+- Range (0, 5, step=1, splits=Some(16))
== Physical Plan ==
Exchange hashpartitioning((id#2L % 2), 200), false, [id=#86]
+- *(1) Range (0, 5, step=1, splits=16)
RepartitionByExpression (Number of Partitions and Partition Expressions )¶
val q = nums.repartition(numPartitions = 2, partitionExprs = 'id % 2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'RepartitionByExpression [('id % 2)], 2
+- Range (0, 5, step=1, splits=Some(16))
== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [(id#2L % cast(2 as bigint))], 2
+- Range (0, 5, step=1, splits=Some(16))
== Optimized Logical Plan ==
RepartitionByExpression [(id#2L % 2)], 2
+- Range (0, 5, step=1, splits=Some(16))
== Physical Plan ==
Exchange hashpartitioning((id#2L % 2), 2), false, [id=#95]
+- *(1) Range (0, 5, step=1, splits=16)