Skip to content

MemoryStream -- Streaming Reader for Micro-Batch Stream Processing

MemoryStream is a concrete streaming source of memory data source that supports <> in <>.

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.MemoryStream logger to see what happens inside.

Add the following line to conf/

Refer to <>.

=== [[creating-instance]] Creating MemoryStream Instance

MemoryStream takes the following to be created:

  • [[id]] ID
  • [[sqlContext]] SQLContext

MemoryStream initializes the <>.

=== [[apply]] Creating MemoryStream Instance -- apply Object Factory

[source, scala]

applyA : Encoder: MemoryStream[A]

apply uses an memoryStreamId internal counter to <> with a unique <> and the implicit SQLContext.

=== [[addData]] Adding Data to Source -- addData Method

[source, scala]

addData( data: TraversableOnce[A]): Offset

addData adds the given data to the <> internal registry.

Internally, addData prints out the following DEBUG message to the logs:

Adding: [data]

In the end, addData increments the <> and adds the data to the <> internal registry.

=== [[getBatch]] Generating Next Streaming Batch -- getBatch Method

getBatch is a part of the Source abstraction.

When executed, getBatch uses the internal <> collection to return requested offsets.

You should see the following DEBUG message in the logs:

DEBUG MemoryStream: MemoryBatch [[startOrdinal], [endOrdinal]]: [newBlocks]

=== [[logicalPlan]] Logical Plan -- logicalPlan Internal Property

[source, scala]

logicalPlan: LogicalPlan

logicalPlan is part of the MemoryStreamBase abstraction.

logicalPlan is simply a StreamingExecutionRelation (for this memory source and the attributes).

MemoryStream uses StreamingExecutionRelation logical plan to build Datasets or DataFrames when requested.

scala> val ints = MemoryStream[Int]
ints: org.apache.spark.sql.execution.streaming.MemoryStream[Int] = MemoryStream[value#13]

scala> ints.toDS.queryExecution.logical.isStreaming
res14: Boolean = true

scala> ints.toDS.queryExecution.logical
res15: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = MemoryStream[value#13]

=== [[schema]] Schema (schema method)

MemoryStream works with the data of the[schema] as described by the[Encoder] (of the Dataset).

=== [[toString]] Textual Representation -- toString Method

[source, scala]

toString: String

NOTE: toString is part of the ++[java.lang.Object] contract for the string representation of the object.

toString uses the <> to return the following textual representation:


=== [[planInputPartitions]] Plan Input Partitions -- planInputPartitions Method

[source, scala]

planInputPartitions(): java.util.List[InputPartition[InternalRow]]

NOTE: planInputPartitions is part of the DataSourceReader contract in Spark SQL for the number of InputPartitions to use as RDD partitions (when DataSourceV2ScanExec physical operator is requested for the partitions of the input RDD).


planInputPartitions prints out a DEBUG message to the logs with the <> (with the batches after the <>).


=== [[generateDebugString]] generateDebugString Internal Method

[source, scala]

generateDebugString( rows: Seq[UnsafeRow], startOrdinal: Int, endOrdinal: Int): String

generateDebugString resolves and binds the encoder for the data.

In the end, generateDebugString returns the following string:

MemoryBatch [[startOrdinal], [endOrdinal]]: [rows]

NOTE: generateDebugString is used exclusively when MemoryStream is requested to <>.

Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| batches a| [[batches]] Batch data (ListBuffer[Array[UnsafeRow]])

| currentOffset a| [[currentOffset]] Current offset

| lastOffsetCommitted a| [[lastOffsetCommitted]] Last committed offset

| output a| [[output]] Output schema (Seq[Attribute]) of the logical query plan

Used exclusively for <>