Skip to content

KeyValueGroupedDataset

KeyValueGroupedDataset represents a grouped dataset as a result of Dataset.groupByKey operator (that aggregates records by a grouping function).

// Dataset[T]
groupByKey(func: T => K): KeyValueGroupedDataset[K, T]
import java.sql.Timestamp
val numGroups = spark.
  readStream.
  format("rate").
  load.
  as[(Timestamp, Long)].
  groupByKey { case (time, value) => value % 2 }

scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]

KeyValueGroupedDataset is also <> for <> and <> operators.

scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]

scala> :type numGroups.keyAs[String]
org.apache.spark.sql.KeyValueGroupedDataset[String,(java.sql.Timestamp, Long)]
scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]

val mapped = numGroups.mapValues { case (ts, n) => s"($ts, $n)" }
scala> :type mapped
org.apache.spark.sql.KeyValueGroupedDataset[Long,String]

KeyValueGroupedDataset works for batch and streaming aggregations, but shines the most when used for Streaming Aggregation.

scala> :type numGroups
org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)]

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
numGroups.
  mapGroups { case(group, values) => values.size }.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.seconds)).
  start

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|    3|
|    2|
+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
|    5|
|    5|
+-----+

// Eventually...
spark.streams.active.foreach(_.stop)

The most prestigious use case of KeyValueGroupedDataset however is Arbitrary Stateful Streaming Aggregation that allows for accumulating streaming state (by means of GroupState) using <> and the more advanced <> operators.

[[operators]] .KeyValueGroupedDataset's Operators [cols="1m,2",options="header",width="100%"] |=== | Operator | Description

| agg a| [[agg]]

[source, scala]

aggU1: Dataset[(K, U1)] aggU1, U2: Dataset[(K, U1, U2)] aggU1, U2, U3: Dataset[(K, U1, U2, U3)] aggU1, U2, U3, U4: Dataset[(K, U1, U2, U3, U4)]


| cogroup a| [[cogroup]]

[source, scala]

cogroupU, R : Encoder( f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R]


| count a| [[count]]

[source, scala]

count(): Dataset[(K, Long)]

| flatMapGroups a| [[flatMapGroups]]

[source, scala]

flatMapGroupsU : Encoder: Dataset[U]

| flatMapGroupsWithState a| [[flatMapGroupsWithState]]

flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]

Arbitrary Stateful Streaming Aggregation - streaming aggregation with explicit state and state timeout

Note

The difference between this flatMapGroupsWithState and mapGroupsWithState operators is the state function that generates zero or more elements (that are in turn the rows in the result streaming Dataset).

| keyAs a| [[keyAs]]

[source, scala]

keys: Dataset[K] keyAs[L : Encoder]: KeyValueGroupedDataset[L, V]


| mapGroups a| [[mapGroups]]

[source, scala]

mapGroupsU : Encoder: Dataset[U]

| spark-sql-streaming-KeyValueGroupedDataset-mapGroupsWithState.md[mapGroupsWithState] a| [[mapGroupsWithState]]

mapGroupsWithState[S: Encoder, U: Encoder](
  func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
mapGroupsWithState[S: Encoder, U: Encoder](
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

Creates a new Dataset with FlatMapGroupsWithState logical operator

Note

The difference between mapGroupsWithState and flatMapGroupsWithState is the state function that generates exactly one element (that is in turn the row in the result Dataset).

| mapValues a| [[mapValues]]

[source, scala]

mapValuesW : Encoder: KeyValueGroupedDataset[K, W]

| reduceGroups a| [[reduceGroups]]

[source, scala]

reduceGroups(f: (V, V) => V): Dataset[(K, V)]

|===

=== [[creating-instance]] Creating KeyValueGroupedDataset Instance

KeyValueGroupedDataset takes the following when created:

  • [[kEncoder]] Encoder for keys
  • [[vEncoder]] Encoder for values
  • [[queryExecution]] QueryExecution
  • [[dataAttributes]] Data attributes
  • [[groupingAttributes]] Grouping attributes