Skip to content


TungstenAggregationIterator is a AggregationIterator that is used when the HashAggregateExec aggregate physical operator is executed (to process UnsafeRows per partition and calculate aggregations).

TungstenAggregationIterator prefers hash-based aggregation (before <>).

val q = spark.range(10).
  groupBy('id % 2 as "group").
  agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02    +- Range (0, 10, step=1, splits=8)

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute

// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
  import org.apache.spark.rdd.RDD
  def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
    import org.apache.spark.rdd.MapPartitionsRDD
    hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
// END :paste -raw

import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)

import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?

When <>, TungstenAggregationIterator gets SQL metrics from the HashAggregateExec aggregate physical operator being executed, i.e. <>, <>, <> and <> metrics.

  • <> is used when TungstenAggregationIterator is requested for the <> (and it <>)

  • <>, <> and <> are used at the <> (one per partition)

The metrics are then displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).

HashAggregateExec in web UI (Details for Query)

[[internal-registries]] .TungstenAggregationIterator's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,2",options="header",width="100%"] |=== | Name | Description

| aggregationBufferMapIterator | [[aggregationBufferMapIterator]] KVIterator[UnsafeRow, UnsafeRow]

Used when...FIXME

| hashMap a| [[hashMap]] UnsafeFixedWidthAggregationMap with the following:

  • <>

  • <> built from (the <> of) the <>

  • <> built from (the <> of) the <>

  • 1024 * 16 initial capacity

  • The page size of the TaskMemoryManager (defaults to spark.buffer.pageSize configuration)

Used when TungstenAggregationIterator is requested for the <>, to <>, <>, to initialize the <> and <>.

| initialAggregationBuffer | [[initialAggregationBuffer]] <> that is the aggregation buffer containing initial buffer values.

Used when...FIXME

| externalSorter | [[externalSorter]] UnsafeKVExternalSorter used for sort-based aggregation

| sortBased | [[sortBased]] Flag to indicate whether TungstenAggregationIterator uses sort-based aggregation (not hash-based aggregation).

sortBased flag is disabled (false) by default.

Enabled (true) when TungstenAggregationIterator is requested to <>.

Used when...FIXME |===

=== [[processInputs]] processInputs Internal Method

[source, scala]

processInputs(fallbackStartsAt: (Int, Int)): Unit


NOTE: processInputs is used exclusively when TungstenAggregationIterator is <> (and sets the internal flags to indicate whether to use a hash-based aggregation or, in the worst case, a sort-based aggregation when there is not enough memory for groups and their buffers).

=== [[switchToSortBasedAggregation]] Switching to Sort-Based Aggregation (From Preferred Hash-Based Aggregation) -- switchToSortBasedAggregation Internal Method

[source, scala]

switchToSortBasedAggregation(): Unit


NOTE: switchToSortBasedAggregation is used exclusively when TungstenAggregationIterator is requested to <> (and the <> is used).

==== [[next]] Getting Next UnsafeRow -- next Method

[source, scala]

next(): UnsafeRow

NOTE: next is part of Scala's[scala.collection.Iterator] interface that returns the next element and discards it from the iterator.


=== [[hasNext]] hasNext Method

[source, scala]

hasNext: Boolean

NOTE: hasNext is part of Scala's[scala.collection.Iterator] interface that tests whether this iterator can provide another element.


=== [[creating-instance]] Creating TungstenAggregationIterator Instance

TungstenAggregationIterator takes the following when created:

  • [[partIndex]] Partition index
  • [[groupingExpressions]] Grouping <>
  • [[aggregateExpressions]] Aggregate expressions
  • [[aggregateAttributes]] Aggregate <>
  • [[initialInputBufferOffset]] Initial input buffer offset
  • [[resultExpressions]] Output <>
  • [[newMutableProjection]] Function to create a new MutableProjection given Catalyst expressions and attributes (i.e. (Seq[Expression], Seq[Attribute]) => MutableProjection)
  • [[originalInputAttributes]] Output attributes (of the child of the HashAggregateExec physical operator)
  • [[inputIter]] Iterator of InternalRows (from a single partition of the child of the HashAggregateExec physical operator)
  • [[testFallbackStartsAt]] (used for testing) Optional HashAggregateExec's testFallbackStartsAt
  • [[numOutputRows]] numOutputRows <>
  • [[peakMemory]] peakMemory <>
  • [[spillSize]] spillSize <>
  • [[avgHashProbe]] avgHashProbe <>


The SQL metrics (<>, <>, <> and <>) belong to the HashAggregateExec physical operator that created the TungstenAggregationIterator.

TungstenAggregationIterator initializes the <>.

TungstenAggregationIterator starts <> and pre-loads the first key-value pair from the <> if did not <>.

=== [[generateResultProjection]] generateResultProjection Method

[source, scala]

generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow

NOTE: generateResultProjection is part of the <> to...FIXME.


=== [[outputForEmptyGroupingKeyWithoutInput]] Creating UnsafeRow -- outputForEmptyGroupingKeyWithoutInput Method

[source, scala]

outputForEmptyGroupingKeyWithoutInput(): UnsafeRow


NOTE: outputForEmptyGroupingKeyWithoutInput is used when...FIXME

=== [[TaskCompletionListener]] TaskCompletionListener

TungstenAggregationIterator registers a TaskCompletionListener that is executed on task completion (for every task that processes a partition).

When executed (once per partition), the TaskCompletionListener updates the following metrics:

  • <>

  • <>

  • <>

Last update: 2021-02-18