Skip to content

TungstenAggregationIterator

TungstenAggregationIterator is an AggregationIterator for HashAggregateExec physical operator.

TungstenAggregationIterator prefers hash-based aggregation (before switching to sort-based aggregation).

Creating Instance

TungstenAggregationIterator takes the following to be created:

TungstenAggregationIterator is created when:

  • HashAggregateExec physical operator is requested to doExecute

Note

The SQL metrics (numOutputRows, peakMemory, spillSize and avgHashProbe) belong to the HashAggregateExec physical operator that created the TungstenAggregationIterator.

TungstenAggregationIterator starts processing input rows and pre-loads the first key-value pair from the UnsafeFixedWidthAggregationMap if did not switch to sort-based aggregation.

Performance Metrics

When created, TungstenAggregationIterator gets SQLMetrics from the HashAggregateExec aggregate physical operator being executed.

The metrics are displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).

HashAggregateExec in web UI (Details for Query)

UnsafeFixedWidthAggregationMap

UnsafeFixedWidthAggregationMap

Used when:

TaskCompletionListener

TungstenAggregationIterator registers a TaskCompletionListener that is executed on task completion (for every task that processes a partition).

When executed (once per partition), the TaskCompletionListener updates the following metrics:

Demo

val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02    +- Range (0, 10, step=1, splits=8)

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute

// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
  import org.apache.spark.rdd.RDD
  def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
    import org.apache.spark.rdd.MapPartitionsRDD
    hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
  }
}
// END :paste -raw

import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)

import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?

Last update: 2021-07-05
Back to top