Skip to content

Aggregator — Typed User-Defined Aggregate Functions (UDAFs)

Aggregator is an abstraction of typed user-defined aggregate functions (user-defined typed aggregations or UDAFs).

abstract class Aggregator[-IN, BUF, OUT]

Aggregator is Serializable.

Contract

bufferEncoder

bufferEncoder: Encoder[BUF]

Used when:

  • Aggregator is requested to toColumn
  • UserDefinedAggregator is requested to scalaAggregator

finish

finish(
  reduction: BUF): OUT

Used when:

  • ComplexTypedAggregateExpression is requested to eval
  • ScalaAggregator is requested to eval

merge

merge(
  b1: BUF,
  b2: BUF): BUF

Used when:

  • ComplexTypedAggregateExpression is requested to merge
  • ScalaAggregator is requested to merge

outputEncoder

outputEncoder: Encoder[OUT]

Used when:

  • ScalaAggregator is requested for the outputEncoder
  • Aggregator is requested to toColumn

reduce

reduce(
  b: BUF,
  a: IN): BUF

Used when:

  • ComplexTypedAggregateExpression is requested to update
  • ScalaAggregator is requested to update

zero

zero: BUF

Used when:

  • SimpleTypedAggregateExpression is requested for initialValues
  • ComplexTypedAggregateExpression is requested to createAggregationBuffer
  • ScalaAggregator is requested to createAggregationBuffer

Implementations

  • ReduceAggregator
  • TypedAverage
  • TypedCount
  • TypedSumDouble
  • TypedSumLong

udaf Standard Function

udaf standard function is used to register an Aggregator (create an UserDefinedFunction that wraps the given Aggregator so that it may be used with untyped Data Frames).

udaf[IN: TypeTag, BUF, OUT](
  agg: Aggregator[IN, BUF, OUT]): UserDefinedFunction
udaf[IN, BUF, OUT](
  agg: Aggregator[IN, BUF, OUT],
  inputEncoder: Encoder[IN]): UserDefinedFunction

Converting to TypedColumn

toColumn: TypedColumn[IN, OUT]

toColumn converts the Aggregator to a TypedColumn (that can be used with Dataset.select and KeyValueGroupedDataset.agg typed operators).

Demo

// From Spark MLlib's org.apache.spark.ml.recommendation.ALSModel
// Step 1. Create Aggregator
val topKAggregator: Aggregator[Int, Int, Float] = ???
val recs = ratings
  .as[(Int, Int, Float)]
  .groupByKey(_._1)
  .agg(topKAggregator.toColumn) // <-- use the custom Aggregator
  .toDF("id", "recommendations")

Use org.apache.spark.sql.expressions.scalalang.typed object to access the type-safe aggregate functions, i.e. avg, count, sum and sumLong.

import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_._1).agg(typed.sum(_._2))
ds.select(typed.sum((i: Int) => i))
Back to top