Skip to content

HashAggregateExec Aggregate Physical Operator

HashAggregateExec is a unary physical operator for hash-based aggregation.

HashAggregateExec in web UI (Details for Query)

HashAggregateExec is a BlockingOperatorWithCodegen.

HashAggregateExec is a AliasAwareOutputPartitioning.

Note

HashAggregateExec is the preferred aggregate physical operator for Aggregation execution planning strategy (over ObjectHashAggregateExec and SortAggregateExec).

HashAggregateExec supports Java code generation (aka codegen).

HashAggregateExec uses TungstenAggregationIterator (to iterate over UnsafeRows in partitions) when executed.

Note

HashAggregateExec uses TungstenAggregationIterator that can (theoretically) switch to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.

See testFallbackStartsAt internal property and spark.sql.TungstenAggregate.testFallbackStartsAt configuration property.

Search logs for the following INFO message to know whether the switch has happened.

falling back to sort based aggregation.

Selection Requirements

supportsAggregate(
  aggregateBufferAttributes: Seq[Attribute]): Boolean

supportsAggregate checks support for aggregation given the aggregation buffer Attributes.

supportsAggregate is used when:

Creating Instance

HashAggregateExec takes the following to be created:

HashAggregateExec is created when (indirectly through AggUtils.createAggregate) when:

  • Aggregation execution planning strategy is executed (to select the aggregate physical operator for an Aggregate logical operator

  • StatefulAggregationStrategy (Structured Streaming) execution planning strategy creates plan for streaming EventTimeWatermark or Aggregate logical operators

Demo

val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")

// HashAggregateExec selected due to:
// 1. sum uses mutable types for aggregate expression
// 2. just a single id column reference of LongType data type
scala> q.explain
== Physical Plan ==
*HashAggregate(keys=[(id#0L % 2)#12L], functions=[sum(id#0L)])
+- Exchange hashpartitioning((id#0L % 2)#12L, 200)
   +- *HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#12L], functions=[partial_sum(id#0L)])
      +- *Range (0, 10, step=1, splits=8)

val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#15L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#15L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#15L, sum#17L])
02    +- Range (0, 10, step=1, splits=8)

Going low level...watch your steps :)

import q.queryExecution.optimizedPlan
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
val aggLog = optimizedPlan.asInstanceOf[Aggregate]
import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
val (_, aggregateExpressions: Seq[AggregateExpression], _, _) = PhysicalAggregation.unapply(aggLog).get
val aggregateBufferAttributes =
  aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
// that's the exact reason why HashAggregateExec was selected
// Aggregation execution planning strategy prefers HashAggregateExec
scala> val useHash = HashAggregateExec.supportsAggregate(aggregateBufferAttributes)
useHash: Boolean = true

val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#15L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#15L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#15L, sum#17L])
02    +- Range (0, 10, step=1, splits=8)

val hashAggExecRDD = hashAggExec.execute // <-- calls doExecute
scala> println(hashAggExecRDD.toDebugString)
(8) MapPartitionsRDD[3] at execute at <console>:30 []
 |  MapPartitionsRDD[2] at execute at <console>:30 []
 |  MapPartitionsRDD[1] at execute at <console>:30 []
 |  ParallelCollectionRDD[0] at execute at <console>:30 []

Performance Metrics

aggTime

Name (in web UI): time in aggregation build

avgHashProbe

Average hash map probe per lookup (i.e. numProbes / numKeyLookups)

Name (in web UI): avg hash probe bucket list iters

numProbes and numKeyLookups are used in BytesToBytesMap append-only hash map for the number of iteration to look up a single key and the number of all the lookups in total, respectively.

numOutputRows

Average hash map probe per lookup (i.e. numProbes / numKeyLookups)

Name (in web UI): number of output rows

Number of groups (per partition) that (depending on the number of partitions and the side of ShuffleExchangeExec.md[ShuffleExchangeExec] operator) is the number of groups

  • 0 for no input with a grouping expression, e.g. spark.range(0).groupBy($"id").count.show

  • 1 for no grouping expression and no input, e.g. spark.range(0).groupBy().count.show

Tip

Use different number of elements and partitions in range operator to observe the difference in numOutputRows metric, e.g.

spark.
  range(0, 10, 1, numPartitions = 1).
  groupBy($"id" % 5 as "gid").
  count.
  show

spark.
  range(0, 10, 1, numPartitions = 5).
  groupBy($"id" % 5 as "gid").
  count.
  show

peakMemory

Name (in web UI): peak memory

spillSize

Name (in web UI): spill size

Required Child Distribution

requiredChildDistribution: List[Distribution]

requiredChildDistribution is part of the SparkPlan abstraction.

requiredChildDistribution varies per the input required child distribution expressions:

Note

requiredChildDistributionExpressions is exactly requiredChildDistributionExpressions from AggUtils.createAggregate and is undefined by default.


(No distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions).

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for final aggregations (i.e. mode is Final for aggregate expressions).


(one distinct in aggregation) requiredChildDistributionExpressions is undefined when HashAggregateExec is created for partial aggregations (i.e. mode is Partial for aggregate expressions) with one distinct in aggregation.

requiredChildDistributionExpressions is defined, but could possibly be empty, when HashAggregateExec is created for partial merge aggregations (i.e. mode is PartialMerge for aggregate expressions).

FIXME for the following two cases in aggregation with one distinct.

NOTE: The prefix for variable names for HashAggregateExec operators in CodegenSupport-generated code is agg.

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute requests the <> physical operator to <> (that triggers physical query planning and generates an RDD[InternalRow]) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndex that creates another RDD):

  1. Records the start execution time (beforeAgg)

  2. Requests the Iterator[InternalRow] (from executing the <> physical operator) for the next element

    • If there is no input (an empty partition), but there are <> used, doExecute simply returns an empty iterator

    • Otherwise, doExecute creates a <> and branches off per whether there are rows to process and the <>.

For empty partitions and no <>, doExecute increments the <> metric and requests the TungstenAggregationIterator to <> as the only element of the result iterator.

For non-empty partitions or there are <> used, doExecute returns the TungstenAggregationIterator.

In the end, doExecute calculates the <> metric and returns an Iterator[UnsafeRow] that can be as follows:

Note

The numOutputRows, peakMemory, spillSize and avgHashProbe metrics are used exclusively to create the TungstenAggregationIterator.

Note

doExecute (by RDD.mapPartitionsWithIndex transformation) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD.

val ids = spark.range(1)
scala> println(ids.queryExecution.toRdd.toDebugString)
(8) MapPartitionsRDD[12] at toRdd at <console>:29 []
|  MapPartitionsRDD[11] at toRdd at <console>:29 []
|  ParallelCollectionRDD[10] at toRdd at <console>:29 []

// Use groupBy that gives HashAggregateExec operator
val q = ids.groupBy('id).count
scala> q.explain
== Physical Plan ==
*(2) HashAggregate(keys=[id#30L], functions=[count(1)])
+- Exchange hashpartitioning(id#30L, 200)
  +- *(1) HashAggregate(keys=[id#30L], functions=[partial_count(1)])
      +- *(1) Range (0, 1, step=1, splits=8)

val rdd = q.queryExecution.toRdd
scala> println(rdd.toDebugString)
(200) MapPartitionsRDD[18] at toRdd at <console>:28 []
  |   ShuffledRowRDD[17] at toRdd at <console>:28 []
  +-(8) MapPartitionsRDD[16] at toRdd at <console>:28 []
    |  MapPartitionsRDD[15] at toRdd at <console>:28 []
    |  MapPartitionsRDD[14] at toRdd at <console>:28 []
    |  ParallelCollectionRDD[13] at toRdd at <console>:28 []

Generating Java Code for Consume Path

doConsume(
  ctx: CodegenContext,
  input: Seq[ExprCode],
  row: ExprCode): String

doConsume is part of the CodegenSupport abstraction.

doConsume doConsumeWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec or doConsumeWithKeys otherwise.

doConsumeWithKeys

doConsumeWithKeys(
  ctx: CodegenContext,
  input: Seq[ExprCode]): String

doConsumeWithKeys...FIXME

doConsumeWithoutKeys

doConsumeWithoutKeys(
  ctx: CodegenContext,
  input: Seq[ExprCode]): String

doConsumeWithoutKeys...FIXME

Generating Java Code for Produce Path

doProduce(
  ctx: CodegenContext): String

doProduce is part of the CodegenSupport abstraction.

doProduce executes doProduceWithoutKeys when no named expressions for the grouping keys were specified for the HashAggregateExec or doProduceWithKeys otherwise.

doProduceWithKeys

doProduceWithKeys(
  ctx: CodegenContext): String

doProduceWithKeys...FIXME

doProduceWithoutKeys

doProduceWithoutKeys(
  ctx: CodegenContext): String

doProduceWithoutKeys...FIXME

generateResultFunction

generateResultFunction(
  ctx: CodegenContext): String

generateResultFunction...FIXME

finishAggregate

finishAggregate(
  hashMap: UnsafeFixedWidthAggregationMap,
  sorter: UnsafeKVExternalSorter,
  peakMemory: SQLMetric,
  spillSize: SQLMetric,
  avgHashProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow]

finishAggregate...FIXME

createHashMap

createHashMap(): UnsafeFixedWidthAggregationMap

createHashMap creates a UnsafeFixedWidthAggregationMap (with the <>, the <>, the <>, the current TaskMemoryManager, 1024 * 16 initial capacity and the page size of the TaskMemoryManager)

Internal Properties

| aggregateBufferAttributes | [[aggregateBufferAttributes]] All the <> of the AggregateFunctions of the <>

| testFallbackStartsAt | [[testFallbackStartsAt]] Optional pair of numbers for controlled fall-back to a sort-based aggregation when the hash-based approach is unable to acquire enough memory.

| declFunctions | [[declFunctions]] <> expressions (from the AggregateFunctions of the <>)

| bufferSchema | [[bufferSchema]] StructType built from the <>

| groupingKeySchema | [[groupingKeySchema]] StructType built from the <>

| groupingAttributes | [[groupingAttributes]] <> of the <>

Back to top