FileStreamSink can only be used with Append output mode.
Learn more in Demo: Deep Dive into FileStreamSink.
FileStreamSink takes the following to be created:
- Names of the Partition Columns (if any)
- Options (
FileStreamSink is created when
DataSource is requested to create a streaming sink for
FileFormat data sources.
Metadata Log Directory¶
FileStreamSink uses _spark_metadata directory (under the path) as the Metadata Log Directory to store metadata indicating which files are valid and can be read (and skipping already committed batch).
Metadata Log Directory is managed by FileStreamSinkLog.
Hadoop Path of Metadata Log¶
Used for "adding" batch.
FileStreamSink uses the path for the text representation (
"Adding" Batch of Data to Sink¶
addBatch( batchId: Long, data: DataFrame): Unit
With a newer
addBatch creates a
FileCommitProtocol based on spark.sql.streaming.commitProtocolClass configuration property.
In the end,
addBatch writes out the data using
FileFormatWriter.write workflow (with the
FileCommitProtocol and BasicWriteJobStatsTracker).
addBatch prints out the following INFO message to the logs when the given
batchId is below the latest committed batch ID:
Skipping already committed batch [batchId]
addBatch is a part of the Sink abstraction.
basicWriteJobStatsTracker creates a
BasicWriteJobStatsTracker with the basic metrics:
- number of written files
- bytes of written output
- number of output rows
- number of dynamic partitions
basicWriteJobStatsTracker is used when
FileStreamSink is requested to addBatch.
hasMetadata( path: Seq[String], hadoopConf: Configuration): Boolean
hasMetadata is used (to short-circut listing files using MetadataLogFileIndex instead of using HDFS API) when:
DataSource(Spark SQL) is requested to resolve a
FileTable(Spark SQL) is requested for a
FileStreamSourceis requested to fetchAllFiles
ALL logging level for
org.apache.spark.sql.execution.streaming.FileStreamSink logger to see what happens inside.
Add the following line to
Refer to Logging.