Skip to content

PushDownPredicate -- Predicate Pushdown / Filter Pushdown Logical Optimization

PushDownPredicate is a base logical optimization that <>.

PushDownPredicate is part of the Operator Optimization before Inferring Filters fixed-point batch in the standard batches of the Logical Optimizer.

PushDownPredicate is simply a <> for transforming <>, i.e. Rule[LogicalPlan].

When you execute Dataset.md#where[where] or Dataset.md#filter[filter] operators right after loading a dataset, Spark SQL will try to push the where/filter predicate down to the data source using a corresponding SQL query with WHERE clause (or whatever the proper language for the data source is).

This optimization is called filter pushdown or predicate pushdown and aims at pushing down the filtering to the "bare metal", i.e. a data source engine. That is to increase the performance of queries since the filtering is performed at the very low level rather than dealing with the entire dataset after it has been loaded to Spark's memory and perhaps causing memory issues.

PushDownPredicate is also applied to structured queries with <> or <>.

=== [[select]] Pushing Filter Operator Down Using Projection

[source, scala]

val dataset = spark.range(2)

scala> dataset.select('id as "_id").filter('_id === 0).explain(extended = true) ... TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate === !Filter (_id#14L = cast(0 as bigint)) Project [id#11L AS _id#14L] !+- Project [id#11L AS _id#14L] +- Filter (id#11L = cast(0 as bigint)) +- Range (0, 2, step=1, splits=Some(8)) +- Range (0, 2, step=1, splits=Some(8)) ... == Parsed Logical Plan == 'Filter ('_id = 0) +- Project [id#11L AS _id#14L] +- Range (0, 2, step=1, splits=Some(8))

== Analyzed Logical Plan == _id: bigint Filter (_id#14L = cast(0 as bigint)) +- Project [id#11L AS _id#14L] +- Range (0, 2, step=1, splits=Some(8))

== Optimized Logical Plan == Project [id#11L AS _id#14L] +- Filter (id#11L = 0) +- Range (0, 2, step=1, splits=Some(8))

== Physical Plan == *Project [id#11L AS _id#14L] +- *Filter (id#11L = 0) +- *Range (0, 2, step=1, splits=Some(8))


=== [[windows]] Optimizing Window Aggregate Operators

[source, scala]

val dataset = spark.range(5).withColumn("group", 'id % 3) scala> dataset.show +---+-----+ | id|group| +---+-----+ | 0| 0| | 1| 1| | 2| 2| | 3| 0| | 4| 1| +---+-----+

import org.apache.spark.sql.expressions.Window val groupW = Window.partitionBy('group).orderBy('id)

// Filter out group 2 after window // No need to compute rank for group 2 // Push the filter down val ranked = dataset.withColumn("rank", rank over groupW).filter('group !== 2)

scala> ranked.queryExecution.optimizedPlan ... TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate === !Filter NOT (group#35L = cast(2 as bigint)) Project [id#32L, group#35L, rank#203] !+- Project [id#32L, group#35L, rank#203] +- Project [id#32L, group#35L, rank#203, rank#203] ! +- Project [id#32L, group#35L, rank#203, rank#203] +- Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC] ! +- Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC] +- Project [id#32L, group#35L] ! +- Project [id#32L, group#35L] +- Project [id#32L, (id#32L % cast(3 as bigint)) AS group#35L] ! +- Project [id#32L, (id#32L % cast(3 as bigint)) AS group#35L] +- Filter NOT ((id#32L % cast(3 as bigint)) = cast(2 as bigint)) +- Range (0, 5, step=1, splits=Some(8)) +- Range (0, 5, step=1, splits=Some(8)) ... res1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Window [rank(id#32L) windowspecdefinition(group#35L, id#32L ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#203], [group#35L], [id#32L ASC] +- Project [id#32L, (id#32L % 3) AS group#35L] +- Filter NOT ((id#32L % 3) = 2) +- Range (0, 5, step=1, splits=Some(8))


=== [[jdbc]] JDBC Data Source

TIP: Follow the instructions on how to set up PostgreSQL in exercises/spark-exercise-dataframe-jdbc-postgresql.md[Creating DataFrames from Tables using JDBC and PostgreSQL].

Given the following code:

// Start with the PostgreSQL driver on CLASSPATH

case class Project(id: Long, name: String, website: String)

// No optimizations for typed queries
// LOG:  execute <unnamed>: SELECT "id","name","website" FROM projects
val df = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:sparkdb")
  .option("dbtable", "projects")
  .load()
  .as[Project]
  .filter(_.name.contains("Spark"))

// Only the following would end up with the pushdown
val df = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:sparkdb")
  .option("dbtable", "projects")
  .load()
  .where("""name like "%Spark%"""")

PushDownPredicate translates the above query to the following SQL query:

LOG:  execute <unnamed>: SELECT "id","name","website" FROM projects WHERE (name LIKE '%Spark%')

[TIP]

Enable all logs in PostgreSQL to see the above SELECT and other query statements.

log_statement = 'all'

Add log_statement = 'all' to /usr/local/var/postgres/postgresql.conf on Mac OS X with PostgreSQL installed using brew.

Parquet Data Source

val spark: SparkSession = ...
import spark.implicits._

// paste it to REPL individually to make the following line work
case class City(id: Long, name: String)

import org.apache.spark.sql.SaveMode.Overwrite
Seq(
  City(0, "Warsaw"),
  City(1, "Toronto"),
  City(2, "London"),
  City(3, "Redmond"),
  City(4, "Boston")).toDF.write.mode(Overwrite).parquet("cities.parquet")

val cities = spark.read.parquet("cities.parquet").as[City]

// Using DataFrame's Column-based query
scala> cities.where('name === "Warsaw").queryExecution.executedPlan
res21: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
   +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>

// Using SQL query
scala> cities.where("""name = "Warsaw"""").queryExecution.executedPlan
res23: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
   +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:string>

// Using Dataset's strongly type-safe filter
// Why does the following not push the filter down?
scala> cities.filter(_.name == "Warsaw").queryExecution.executedPlan
res24: org.apache.spark.sql.execution.SparkPlan =
*Filter <function1>.apply
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint,name:string>

Last update: 2021-03-31
Back to top