TungstenAggregationIterator¶
TungstenAggregationIterator
is a AggregationIterator that is used when the HashAggregateExec aggregate physical operator is executed (to process UnsafeRows per partition and calculate aggregations).
TungstenAggregationIterator
prefers hash-based aggregation (before <
val q = spark.range(10).
groupBy('id % 2 as "group").
agg(sum("id") as "sum")
val execPlan = q.queryExecution.sparkPlan
scala> println(execPlan.numberedTreeString)
00 HashAggregate(keys=[(id#0L % 2)#11L], functions=[sum(id#0L)], output=[group#3L, sum#7L])
01 +- HashAggregate(keys=[(id#0L % 2) AS (id#0L % 2)#11L], functions=[partial_sum(id#0L)], output=[(id#0L % 2)#11L, sum#13L])
02 +- Range (0, 10, step=1, splits=8)
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
val hashAggExec = execPlan.asInstanceOf[HashAggregateExec]
val hashAggExecRDD = hashAggExec.execute
// MapPartitionsRDD is in private[spark] scope
// Use :paste -raw for the following helper object
package org.apache.spark
object AccessPrivateSpark {
import org.apache.spark.rdd.RDD
def mapPartitionsRDD[T](hashAggExecRDD: RDD[T]) = {
import org.apache.spark.rdd.MapPartitionsRDD
hashAggExecRDD.asInstanceOf[MapPartitionsRDD[_, _]]
}
}
// END :paste -raw
import org.apache.spark.AccessPrivateSpark
val mpRDD = AccessPrivateSpark.mapPartitionsRDD(hashAggExecRDD)
val f = mpRDD.iterator(_, _)
import org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator
// FIXME How to show that TungstenAggregationIterator is used?
When <TungstenAggregationIterator
gets SQL metrics from the HashAggregateExec aggregate physical operator being executed, i.e. <
-
<
> is used when TungstenAggregationIterator
is requested for the <> (and it < >) -
<
>, < > and < > are used at the < > (one per partition)
The metrics are then displayed as part of HashAggregateExec aggregate physical operator (e.g. in web UI in Details for Query).
[[internal-registries]] .TungstenAggregationIterator's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,2",options="header",width="100%"] |=== | Name | Description
| aggregationBufferMapIterator | [[aggregationBufferMapIterator]] KVIterator[UnsafeRow, UnsafeRow]
Used when...FIXME
| hashMap a| [[hashMap]] UnsafeFixedWidthAggregationMap with the following:
-
<
> -
<
> built from (the < > of) the < > -
<
> built from (the < > of) the < > -
1024 * 16
initial capacity -
The page size of the
TaskMemoryManager
(defaults tospark.buffer.pageSize
configuration)
Used when TungstenAggregationIterator
is requested for the <
| initialAggregationBuffer | [[initialAggregationBuffer]] <
Used when...FIXME
| externalSorter | [[externalSorter]] UnsafeKVExternalSorter
used for sort-based aggregation
| sortBased | [[sortBased]] Flag to indicate whether TungstenAggregationIterator
uses sort-based aggregation (not hash-based aggregation).
sortBased
flag is disabled (false
) by default.
Enabled (true
) when TungstenAggregationIterator
is requested to <
Used when...FIXME |===
=== [[processInputs]] processInputs
Internal Method
[source, scala]¶
processInputs(fallbackStartsAt: (Int, Int)): Unit¶
processInputs
...FIXME
NOTE: processInputs
is used exclusively when TungstenAggregationIterator
is <
=== [[switchToSortBasedAggregation]] Switching to Sort-Based Aggregation (From Preferred Hash-Based Aggregation) -- switchToSortBasedAggregation
Internal Method
[source, scala]¶
switchToSortBasedAggregation(): Unit¶
switchToSortBasedAggregation
...FIXME
NOTE: switchToSortBasedAggregation
is used exclusively when TungstenAggregationIterator
is requested to <
==== [[next]] Getting Next UnsafeRow -- next
Method
[source, scala]¶
next(): UnsafeRow¶
NOTE: next
is part of Scala's http://www.scala-lang.org/api/2.11.11/#scala.collection.Iterator[scala.collection.Iterator] interface that returns the next element and discards it from the iterator.
next
...FIXME
=== [[hasNext]] hasNext
Method
[source, scala]¶
hasNext: Boolean¶
NOTE: hasNext
is part of Scala's http://www.scala-lang.org/api/2.11.11/#scala.collection.Iterator[scala.collection.Iterator] interface that tests whether this iterator can provide another element.
hasNext
...FIXME
=== [[creating-instance]] Creating TungstenAggregationIterator Instance
TungstenAggregationIterator
takes the following when created:
- [[partIndex]] Partition index
- [[groupingExpressions]] Grouping <
> - [[aggregateExpressions]] Aggregate expressions
- [[aggregateAttributes]] Aggregate <
> - [[initialInputBufferOffset]] Initial input buffer offset
- [[resultExpressions]] Output <
> - [[newMutableProjection]] Function to create a new
MutableProjection
given Catalyst expressions and attributes (i.e.(Seq[Expression], Seq[Attribute]) => MutableProjection
) - [[originalInputAttributes]] Output attributes (of the child of the HashAggregateExec physical operator)
- [[inputIter]] Iterator of InternalRows (from a single partition of the child of the HashAggregateExec physical operator)
- [[testFallbackStartsAt]] (used for testing) Optional
HashAggregateExec
's testFallbackStartsAt - [[numOutputRows]]
numOutputRows
<> - [[peakMemory]]
peakMemory
<> - [[spillSize]]
spillSize
<> - [[avgHashProbe]]
avgHashProbe
<>
Note
The SQL metrics (<TungstenAggregationIterator
.
TungstenAggregationIterator
initializes the <
TungstenAggregationIterator
starts <
=== [[generateResultProjection]] generateResultProjection
Method
[source, scala]¶
generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow¶
NOTE: generateResultProjection
is part of the <
generateResultProjection
...FIXME
=== [[outputForEmptyGroupingKeyWithoutInput]] Creating UnsafeRow -- outputForEmptyGroupingKeyWithoutInput
Method
[source, scala]¶
outputForEmptyGroupingKeyWithoutInput(): UnsafeRow¶
outputForEmptyGroupingKeyWithoutInput
...FIXME
NOTE: outputForEmptyGroupingKeyWithoutInput
is used when...FIXME
=== [[TaskCompletionListener]] TaskCompletionListener
TungstenAggregationIterator
registers a TaskCompletionListener
that is executed on task completion (for every task that processes a partition).
When executed (once per partition), the TaskCompletionListener
updates the following metrics:
-
<
> -
<
> -
<
>