MemorySink is a streaming sink that <
MemorySink is intended only for testing or demos.
MemorySink is used for
memory format and requires a query name (by
queryName method or
MemorySink was introduced in the https://github.com/apache/spark/pull/12119[pull request for [SPARK-14288][SQL] Memory Sink for streaming].
toDebugString to see the batches.
Its aim is to allow users to test streaming applications in the Spark shell or other local tests.
You can set
option method or it will be set to spark.sql.streaming.checkpointLocation property.
spark.sql.streaming.checkpointLocation is set, the code uses
Finally, when no
spark.sql.streaming.checkpointLocation is set, a temporary directory
java.io.tmpdir is used with
offsets subdirectory inside.
NOTE: The directory is cleaned up at shutdown using
MemorySink instance based on the schema of the DataFrame it operates on.
It creates a new DataFrame using
MemorySink instance created earlier and registers it as a temporary table (using spark-sql-dataframe.md#registerTempTable[DataFrame.registerTempTable] method).
NOTE: At this point you can query the table as if it were a regular non-streaming table using spark-sql-sqlcontext.md#sql[sql] method.
A new StreamingQuery.md[StreamingQuery] is started (using StreamingQueryManager.startQuery) and returned.
[[logging]] [TIP] ==== Enable
ALL logging level for
org.apache.spark.sql.execution.streaming.MemorySink logger to see what happens inside.
Add the following line to
Refer to <
=== [[creating-instance]] Creating MemorySink Instance
MemorySink takes the following to be created:
- [[schema]] Output schema
- [[outputMode]] OutputMode
MemorySink initializes the <
=== [[batches]] In-Memory Buffer of Streaming Batches --
batches Internal Property
batches holds data from streaming batches that have been <
For Complete output mode,
batches holds rows from the last batch only.
batches can be cleared (emptied) using <
=== [[addBatch]] Adding Batch of Data to Sink --
addBatch( batchId: Long, data: DataFrame): Unit
addBatch branches off based on whether the given
batchId has already been <
A batch ID is considered committed when the given batch ID is greater than the <
addBatch is part of the Sink abstraction.
==== [[addBatch-not-committed]] Batch Not Committed
batchId not committed,
addBatch prints out the following DEBUG message to the logs:
Committing batch [batchId] to [this]
addBatch collects records from the given
Dataset.collect operator to collect records.
addBatch adds the data (as a
AddedData) to the <
addBatch clears the <
For any other output mode,
addBatch reports an
Output mode [outputMode] is not supported by MemorySink
==== [[addBatch-committed]] Batch Committed
addBatch simply prints out the following DEBUG message to the logs and returns.
Skipping already committed batch: [batchId]
=== [[clear]] Clearing Up Internal Batch Buffer --
clear simply removes (clears) all data from the <
clear is used exclusively in tests.