Continuous Stream Processing¶
Continuous Stream Processing is a stream processing engine in Spark Structured Streaming used for execution of structured streaming queries with Trigger.Continuous trigger.
Continuous Stream Processing execution engine uses the novel Data Source API V2 (Spark SQL) and for the very first time makes stream processing truly continuous (not micro-batch).
Because of the two innovative changes Continuous Stream Processing is often referred to as Structured Streaming V2.
Under the covers, Continuous Stream Processing uses ContinuousExecution stream execution engine. When requested to run an activated streaming query,
ContinuousExecution adds WriteToContinuousDataSourceExec physical operator as the top-level operator in the physical query plan of the streaming query.
scala> :type sq org.apache.spark.sql.streaming.StreamingQuery scala> sq.explain == Physical Plan == WriteToContinuousDataSource ConsoleWriter[numRows=20, truncate=false] +- *(1) Project [timestamp#758, value#759L] +- *(1) ScanV2 rate[timestamp#758, value#759L]
From now on, you may think of a streaming query as a soon-to-be-generated ContinuousWriteRDD - an RDD data structure that Spark developers use to describe a distributed computation.
When the streaming query is started (and the top-level
WriteToContinuousDataSourceExec physical operator is requested to execute), it simply requests the underlying
ContinuousWriteRDD to collect.
That collect operator is how a Spark job is run (as tasks over all partitions of the RDD) as described by the ContinuousWriteRDD.compute "protocol" (a recipe for the tasks to be scheduled to run on Spark executors).
.Creating Instance of StreamExecution image::images/webui-spark-job-streaming-query-started.png[align="center"]
While the tasks are computing partitions (of the
ContinuousWriteRDD), they keep running until killed or completed. And that's the ingenious design trick of how the streaming query (as a Spark job with the distributed tasks running on executors) runs continuously and indefinitely.
DataStreamReader is requested to create a streaming query for a ContinuousReadSupport data source, it creates...FIXME
import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val sq = spark .readStream .format("rate") .load .writeStream .format("console") .option("truncate", false) .trigger(Trigger.Continuous(15.seconds)) // <-- Uses ContinuousExecution for execution .queryName("rate2console") .start scala> :type sq org.apache.spark.sql.streaming.StreamingQuery assert(sq.isActive) // sq.stop