Skip to content

Micro-Batch Stream Processing

Micro-Batch Stream Processing is a stream processing model in Spark Structured Streaming that is used for streaming queries with Trigger.Once and Trigger.ProcessingTime triggers.

Micro-batch stream processing uses MicroBatchExecution stream execution engine.

Micro-batch stream processing supports MicroBatchStream data sources.

Micro-batch stream processing is often referred to as Structured Streaming V1.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(1.minute)) // <-- Uses MicroBatchExecution for execution
  .queryName("rate2console")
  .start

assert(sq.isActive)

scala> sq.explain
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@678e6267
+- *(1) Project [timestamp#54, value#55L]
   +- *(1) ScanV2 rate[timestamp#54, value#55L]

// sq.stop

=== [[execution-phases]] Execution Phases (Processing Cycle)

Once <> stream processing engine is requested to <>, the query execution goes through the following execution phases every trigger:

. [[triggerExecution]] <> . <> for Sources or <> for <> . <> . <> . <> . <> . <>

Execution phases with execution times are available using <> under durationMs.

scala> :type sq
org.apache.spark.sql.streaming.StreamingQuery
sq.lastProgress.durationMs.get("walCommit")

Tip

Enable INFO logging level for StreamExecution logger to be notified about durations.

17/08/11 09:04:17 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:17.373Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {          // <-- Durations (in millis)
    "addBatch" : 38,
    "getBatch" : 1,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 62,
    "walCommit" : 19
  },

Monitoring

MicroBatchExecution posts events to announce when a streaming query is started and stopped as well as after every micro-batch. StreamingQueryListener interface can be used to intercept the events and act accordingly.

After <> MicroBatchExecution is requested to finish up a streaming batch (trigger) and generate a StreamingQueryProgress (with execution statistics).

MicroBatchExecution prints out the following DEBUG message to the logs:

Execution stats: [executionStats]

MicroBatchExecution posts a QueryProgressEvent with the StreamingQueryProgress and prints out the following INFO message to the logs:

Streaming query made progress: [newProgress]