Skip to content

RepartitionOperation Unary Logical Operators

RepartitionOperation is an extension of the UnaryNode abstraction for repartition operations.

Contract

shuffle

shuffle: Boolean

numPartitions

numPartitions: Int

Implementations

Logical Optimizations

Output Attributes

output: Seq[Attribute]

output simply requests the child logical operator for the output attributes.

output is part of the QueryPlan abstraction.

Repartition Logical Operator

Repartition is a concrete RepartitionOperation that takes the following to be created:

Repartition is created for the following:

Repartition is planned to ShuffleExchangeExec or CoalesceExec physical operators (based on shuffle flag).

Catalyst DSL

Catalyst DSL defines the following operators to create Repartition logical operators:

RepartitionByExpression Logical Operator

RepartitionByExpression is a concrete RepartitionOperation that takes the following to be created:

RepartitionByExpression is also called distribute operator.

RepartitionByExpression is created for the following:

RepartitionByExpression is planned to ShuffleExchangeExec physical operator.

Catalyst DSL

Catalyst DSL defines distribute operator to create RepartitionByExpression logical operators.

Partitioning

RepartitionByExpression determines a Partitioning when created.

Maximum Number of Rows

maxRows: Option[Long]

maxRows simply requests the child logical operator for the maximum number of rows.

maxRows is part of the LogicalPlan abstraction.

shuffle

shuffle: Boolean

shuffle is always true.

shuffle is part of the RepartitionOperation abstraction.

Demo

val nums = spark.range(5)

scala> nums.rdd.getNumPartitions
res1: Int = 16

Repartition Operator

val numsRepartitioned = nums.repartition(numPartitions = 4)

assert(numsRepartitioned.rdd.getNumPartitions == 4, "Number of partitions should be 4")

scala> numsRepartitioned.explain(extended = true)
== Parsed Logical Plan ==
Repartition 4, true
+- Range (0, 5, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
Repartition 4, true
+- Range (0, 5, step=1, splits=Some(16))

== Optimized Logical Plan ==
Repartition 4, true
+- Range (0, 5, step=1, splits=Some(16))

== Physical Plan ==
Exchange RoundRobinPartitioning(4), false, [id=#31]
+- *(1) Range (0, 5, step=1, splits=16)

Repartition Operator (Twice)

val numsRepartitionedTwice = numsRepartitioned.repartition(numPartitions = 8)
assert(numsRepartitionedTwice.rdd.getNumPartitions == 8, "Number of partitions should be 4")

scala> numsRepartitionedTwice.explain(extended = true)
== Parsed Logical Plan ==
Repartition 8, true
+- Repartition 4, true
   +- Range (0, 5, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
Repartition 8, true
+- Repartition 4, true
   +- Range (0, 5, step=1, splits=Some(16))

== Optimized Logical Plan ==
Repartition 8, true
+- Range (0, 5, step=1, splits=Some(16))

== Physical Plan ==
Exchange RoundRobinPartitioning(8), false, [id=#77]
+- *(1) Range (0, 5, step=1, splits=16)

Coalesce Operator

val numsCoalesced = nums.coalesce(numPartitions = 4)
assert(numsCoalesced.rdd.getNumPartitions == 4, "Number of partitions should be 4")

scala> numsCoalesced.explain(extended = true)
== Parsed Logical Plan ==
Repartition 4, false
+- Range (0, 5, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
Repartition 4, false
+- Range (0, 5, step=1, splits=Some(16))

== Optimized Logical Plan ==
Repartition 4, false
+- Range (0, 5, step=1, splits=Some(16))

== Physical Plan ==
Coalesce 4
+- *(1) Range (0, 5, step=1, splits=16)

RepartitionByExpression (Partition Expressions Only)

val q = nums.repartition(partitionExprs = 'id % 2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'RepartitionByExpression [('id % 2)], 200
+- Range (0, 5, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [(id#2L % cast(2 as bigint))], 200
+- Range (0, 5, step=1, splits=Some(16))

== Optimized Logical Plan ==
RepartitionByExpression [(id#2L % 2)], 200
+- Range (0, 5, step=1, splits=Some(16))

== Physical Plan ==
Exchange hashpartitioning((id#2L % 2), 200), false, [id=#86]
+- *(1) Range (0, 5, step=1, splits=16)

RepartitionByExpression (Number of Partitions and Partition Expressions )

val q = nums.repartition(numPartitions = 2, partitionExprs = 'id % 2)
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'RepartitionByExpression [('id % 2)], 2
+- Range (0, 5, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [(id#2L % cast(2 as bigint))], 2
+- Range (0, 5, step=1, splits=Some(16))

== Optimized Logical Plan ==
RepartitionByExpression [(id#2L % 2)], 2
+- Range (0, 5, step=1, splits=Some(16))

== Physical Plan ==
Exchange hashpartitioning((id#2L % 2), 2), false, [id=#95]
+- *(1) Range (0, 5, step=1, splits=16)
Back to top