Skip to content

StateStoreRDD

StateStoreRDD is an RDD for <> with StateStore (and data from partitions of the <>).

StateStoreRDD is <> for the following stateful physical operators (using StateStoreOps.mapPartitionsWithStateStore):

StateStoreRDD, Physical and Logical Plans, and operators

StateStoreRDD uses StateStoreCoordinator for the <> for job scheduling.

StateStoreRDD and StateStoreCoordinator

[[getPartitions]] getPartitions is exactly the partitions of the <>.

Computing Partition

compute(
  partition: Partition,
  ctxt: TaskContext): Iterator[U]

compute is part of the RDD abstraction.

compute computes <> passing the result on to <> (with a configured StateStore).

Internally, (and similarly to <>) compute creates a <> with StateStoreId (using <>, <> and the index of the input partition) and <>.

compute then requests StateStore for the store for the StateStoreProviderId.

In the end, compute computes <> (using the input partition and ctxt) followed by executing <> (with the store and the result).

=== [[getPreferredLocations]] Placement Preferences of Partition (Preferred Locations) -- getPreferredLocations Method

[source, scala]

getPreferredLocations(partition: Partition): Seq[String]

NOTE: getPreferredLocations is a part of the RDD Contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.

getPreferredLocations creates a <> with StateStoreId (using <>, <> and the index of the input partition) and <>.

NOTE: <> and <> are shared across different partitions and so the only difference in <> is the partition index.

In the end, getPreferredLocations requests <> for the location of the state store for the StateStoreProviderId.

Creating Instance

StateStoreRDD takes the following to be created:

  • [[dataRDD]] Data RDD (RDD[T] to update the aggregates in a state store)
  • [[storeUpdateFunction]] Store update function ((StateStore, Iterator[T]) => Iterator[U] where T is the type of rows in the <>)
  • [[checkpointLocation]] Checkpoint directory
  • [[queryRunId]] Run ID of the streaming query
  • [[operatorId]] Operator ID
  • [[storeVersion]] Version of the store
  • [[keySchema]] Key schema - schema of the keys
  • [[valueSchema]] Value schema - schema of the values
  • [[indexOrdinal]] Index
  • [[sessionState]] SessionState
  • [[storeCoordinator]] Optional StateStoreCoordinatorRef

=== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| hadoopConfBroadcast | [[hadoopConfBroadcast]]

| storeConf | [[storeConf]] Configuration parameters (as StateStoreConf) using the current SQLConf (from SessionState) |===