Aggregator — User-Defined Typed Aggregate Functions (UDAFs)

Aggregator is the <> for user-defined typed aggregate functions (user-defined typed aggregations or UDAFs).

[[contract]] [source, scala]

package org.apache.spark.sql.expressions

abstract class Aggregator[-IN, BUF, OUT] extends Serializable { // only required methods that have no implementation def bufferEncoder: Encoder[BUF] def finish(reduction: BUF): OUT def merge(b1: BUF, b2: BUF): BUF def outputEncoder: Encoder[OUT] def reduce(b: BUF, a: IN): BUF def zero: BUF }

After you create a custom Aggregator, you should use <> method to convert it to a TypedColumn that can be used with[] and KeyValueGroupedDataset.agg typed operators.

[source, scala]

// From Spark MLlib's // Step 1. Create Aggregator val topKAggregator: Aggregator[Int, Int, Float] = ??? val recs = ratings .as[(Int, Int, Float)] .groupByKey(_._1) .agg(topKAggregator.toColumn) // ← use the custom Aggregator .toDF("id", "recommendations")


Use org.apache.spark.sql.expressions.scalalang.typed object to access the type-safe aggregate functions, i.e. avg, count, sum and sumLong.

[source, scala]

import org.apache.spark.sql.expressions.scalalang.typed

// Example 1 ds.groupByKey(._1).agg(typed.sum(._2))

// Example 2 Int) => i))



Aggregator is an Experimental and Evolving contract that is evolving towards becoming a stable API, but is not a stable API yet and can change from one feature release to another release.

In other words, using the contract is as treading on thin ice.

Aggregator is used when:

  •[SimpleTypedAggregateExpression] and[ComplexTypedAggregateExpression] are created

  • TypedAggregateExpression is requested for the[aggregator]

.Aggregator Contract [cols="1,2",options="header",width="100%"] |=== | Method | Description

| [[bufferEncoder]] bufferEncoder | Used when...FIXME

| [[finish]] finish | Used when...FIXME

| [[merge]] merge | Used when...FIXME

| [[outputEncoder]] outputEncoder | Used when...FIXME

| [[reduce]] reduce | Used when...FIXME

| [[zero]] zero | Used when...FIXME |===

[[implementations]] .Aggregators [cols="1,2",options="header",width="100%"] |=== | Aggregator | Description

[[ParameterizedTypeSum]] ParameterizedTypeSum
[[ReduceAggregator]] ReduceAggregator

| [[TopByKeyAggregator]] TopByKeyAggregator | Used exclusively in Spark MLlib

[[TypedAverage]] TypedAverage
[[TypedCount]] TypedCount
[[TypedSumDouble]] TypedSumDouble
[[TypedSumLong]] TypedSumLong

=== [[toColumn]] Converting Aggregator to TypedColumn -- toColumn Method

[source, scala]

toColumn: TypedColumn[IN, OUT]


NOTE: toColumn is used when...FIXME

