Skip to content

MemorySink

MemorySink is a streaming sink that <>.

MemorySink is intended only for testing or demos.

MemorySink is used for memory format and requires a query name (by queryName method or queryName option).

NOTE: MemorySink was introduced in the https://github.com/apache/spark/pull/12119[pull request for [SPARK-14288][SQL] Memory Sink for streaming].

Use toDebugString to see the batches.

Its aim is to allow users to test streaming applications in the Spark shell or other local tests.

You can set checkpointLocation using option method or it will be set to spark.sql.streaming.checkpointLocation property.

If spark.sql.streaming.checkpointLocation is set, the code uses $location/$queryName directory.

Finally, when no spark.sql.streaming.checkpointLocation is set, a temporary directory memory.stream under java.io.tmpdir is used with offsets subdirectory inside.

NOTE: The directory is cleaned up at shutdown using ShutdownHookManager.registerShutdownDeleteDir.

It creates MemorySink instance based on the schema of the DataFrame it operates on.

It creates a new DataFrame using MemoryPlan with MemorySink instance created earlier and registers it as a temporary table (using spark-sql-dataframe.md#registerTempTable[DataFrame.registerTempTable] method).

NOTE: At this point you can query the table as if it were a regular non-streaming table using spark-sql-sqlcontext.md#sql[sql] method.

A new StreamingQuery.md[StreamingQuery] is started (using StreamingQueryManager.startQuery) and returned.

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.MemorySink logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.MemorySink=ALL

Refer to <>.

=== [[creating-instance]] Creating MemorySink Instance

MemorySink takes the following to be created:

  • [[schema]] Output schema
  • [[outputMode]] OutputMode

MemorySink initializes the <> internal property.

=== [[batches]] In-Memory Buffer of Streaming Batches -- batches Internal Property

[source, scala]

batches: ArrayBuffer[AddedData]

batches holds data from streaming batches that have been <> (written) to this sink.

For Append and Update output modes, batches holds rows from all batches.

For Complete output mode, batches holds rows from the last batch only.

batches can be cleared (emptied) using <>.

=== [[addBatch]] Adding Batch of Data to Sink -- addBatch Method

[source, scala]

addBatch( batchId: Long, data: DataFrame): Unit


addBatch branches off based on whether the given batchId has already been <> or <>.

A batch ID is considered committed when the given batch ID is greater than the <> (if available).

addBatch is part of the Sink abstraction.

==== [[addBatch-not-committed]] Batch Not Committed

With the batchId not committed, addBatch prints out the following DEBUG message to the logs:

Committing batch [batchId] to [this]

addBatch collects records from the given data.

NOTE: addBatch uses Dataset.collect operator to collect records.

For <> and <> output modes, addBatch adds the data (as a AddedData) to the <> internal registry.

For <> output mode, addBatch clears the <> internal registry first before adding the data (as a AddedData).

For any other output mode, addBatch reports an IllegalArgumentException:

Output mode [outputMode] is not supported by MemorySink

==== [[addBatch-committed]] Batch Committed

With the batchId committed, addBatch simply prints out the following DEBUG message to the logs and returns.

Skipping already committed batch: [batchId]

=== [[clear]] Clearing Up Internal Batch Buffer -- clear Method

[source, scala]

clear(): Unit

clear simply removes (clears) all data from the <> internal registry.

NOTE: clear is used exclusively in tests.


Last update: 2020-11-28