Skip to content

Aggregator — Typed User-Defined Aggregate Functions (UDAFs)

Aggregator is an abstraction of typed user-defined aggregate functions (user-defined typed aggregations or UDAFs).

abstract class Aggregator[-IN, BUF, OUT]

Aggregator is Serializable.



bufferEncoder: Encoder[BUF]

Used when:

  • Aggregator is requested to toColumn
  • UserDefinedAggregator is requested to scalaAggregator


  reduction: BUF): OUT

Used when:

  • ComplexTypedAggregateExpression is requested to eval
  • ScalaAggregator is requested to eval


  b1: BUF,
  b2: BUF): BUF

Used when:

  • ComplexTypedAggregateExpression is requested to merge
  • ScalaAggregator is requested to merge


outputEncoder: Encoder[OUT]

Used when:

  • ScalaAggregator is requested for the outputEncoder
  • Aggregator is requested to toColumn


  b: BUF,
  a: IN): BUF

Used when:

  • ComplexTypedAggregateExpression is requested to update
  • ScalaAggregator is requested to update


zero: BUF

Used when:

  • SimpleTypedAggregateExpression is requested for initialValues
  • ComplexTypedAggregateExpression is requested to createAggregationBuffer
  • ScalaAggregator is requested to createAggregationBuffer


  • ReduceAggregator
  • TypedAverage
  • TypedCount
  • TypedSumDouble
  • TypedSumLong

udaf Standard Function

udaf standard function is used to register an Aggregator (create an UserDefinedFunction that wraps the given Aggregator so that it may be used with untyped Data Frames).

udaf[IN: TypeTag, BUF, OUT](
  agg: Aggregator[IN, BUF, OUT]): UserDefinedFunction
udaf[IN, BUF, OUT](
  agg: Aggregator[IN, BUF, OUT],
  inputEncoder: Encoder[IN]): UserDefinedFunction

Converting to TypedColumn

toColumn: TypedColumn[IN, OUT]

toColumn converts the Aggregator to a TypedColumn (that can be used with and KeyValueGroupedDataset.agg typed operators).


// From Spark MLlib's
// Step 1. Create Aggregator
val topKAggregator: Aggregator[Int, Int, Float] = ???
val recs = ratings
  .as[(Int, Int, Float)]
  .agg(topKAggregator.toColumn) // <-- use the custom Aggregator
  .toDF("id", "recommendations")

Use org.apache.spark.sql.expressions.scalalang.typed object to access the type-safe aggregate functions, i.e. avg, count, sum and sumLong.

import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_._1).agg(typed.sum(_._2)) Int) => i))
Back to top