ForeachBatchSink is a streaming sink that represents DataStreamWriter.foreachBatch streaming operator at runtime.

Type Constructor

ForeachBatchSink[T] is a Scala type constructor with the type parameter T.

ForeachBatchSink was added in Spark 2.4.0 as part of SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame.

import org.apache.spark.sql.Dataset
val q = spark.readStream
  .foreachBatch { (output: Dataset[_], batchId: Long) => // <-- creates a ForeachBatchSink
    println(s"Batch ID: $batchId")
// q.stop

scala> println(q.lastProgress.sink.description)

Creating Instance

ForeachBatchSink takes the following when created:

  • Batch Writer Function ((Dataset[T], Long) => Unit)
  • Encoder of type T (ExpressionEncoder[T])

ForeachBatchSink is created when DataStreamWriter is requested to start execution of the streaming query (with the foreachBatch source) for DataStreamWriter.foreachBatch streaming operator.

Adding Batch

  batchId: Long,
  data: DataFrame): Unit

addBatch requests the encoder to resolveAndBind (using the output of the analyzed logical plan of the given DataFrame) that creates a "resolved" encoder. addBatch requests the resolved encoder to create an Deserializer (to convert a Spark SQL Row objects into objects of type T).

addBatch requests the QueryExecution (of the given DataFrame) for RDD[InternalRow] (executes the query plan) and applies map operator to convert rows to Scala objects.


At this point the "old" DataFrame is no longer a DataFrame but an RDD[InternalRow]. One of the "side-effects" is that whatever logical and physical optimizations may have been applied to the given DataFrame it is over now.

addBatch creates a new Dataset (for the RDD) and executes batchWriter function (passing the Dataset and the batchId).

addBatch is a part of the Sink abstraction.

Text Representation

ForeachBatchSink uses ForeachBatchSink name.