Skip to content

FilterExec Unary Physical Operator

FilterExec is a unary physical operator that represents Filter and TypedFilter unary logical operators at execution time.

FilterExec supports Java code generation (aka codegen) as follows:

  • <> is an empty AttributeSet (to defer evaluation of attribute expressions until they are actually used, i.e. in the generated Java source code for consume path)

  • Uses whatever the <> physical operator uses for the input RDDs

  • Generates a Java source code for the <> and <> paths in whole-stage code generation

FilterExec is <> when:

Performance Metrics

[cols="1,2,2",options="header",width="100%"] |=== | Key | Name (in web UI) | Description

| numOutputRows | number of output rows | [[numOutputRows]] |===

.FilterExec in web UI (Details for Query) image::images/spark-sql-FilterExec-webui-details-for-query.png[align="center"]

[[inputRDDs]] [[outputOrdering]] [[outputPartitioning]] FilterExec uses whatever the <> physical operator uses for the input RDDs, the outputOrdering and the outputPartitioning.

FilterExec uses the spark-sql-PredicateHelper.md[PredicateHelper] for...FIXME

[[internal-registries]] .FilterExec's Internal Properties (e.g. Registries, Counters and Flags) [cols="1,2",options="header",width="100%"] |=== | Name | Description

| notNullAttributes | [[notNullAttributes]] FIXME

Used when...FIXME

| notNullPreds | [[notNullPreds]] FIXME

Used when...FIXME

| otherPreds | [[otherPreds]] FIXME

Used when...FIXME |===

=== [[creating-instance]] Creating FilterExec Instance

FilterExec takes the following when created:

  • [[condition]] <> for the filter condition
  • [[child]] Child <>

FilterExec initializes the <>.

=== [[isNullIntolerant]] isNullIntolerant Internal Method

[source, scala]

isNullIntolerant(expr: Expression): Boolean

isNullIntolerant...FIXME

NOTE: isNullIntolerant is used when...FIXME

=== [[doConsume]] Generating Java Source Code for Consume Path in Whole-Stage Code Generation -- doConsume Method

[source, scala]

doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String

doConsume creates a new metric term for the <> metric.

doConsume...FIXME

In the end, doConsume uses consume and FIXME to generate a Java source code (as a plain text) inside a do {...} while(false); code block.

doConsume is part of the CodegenSupport abstraction.

==== [[doConsume-genPredicate]] genPredicate Internal Method

[source, scala]

genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String

NOTE: genPredicate is an internal method of <>.

genPredicate...FIXME

=== [[doExecute]] Executing Physical Operator (Generating RDD[InternalRow]) -- doExecute Method

[source, scala]

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute executes the <> physical operator and creates a new MapPartitionsRDD that does the filtering.

[source, scala]

// DEMO Show the RDD lineage with the new MapPartitionsRDD after FilterExec

Internally, doExecute takes the <> metric.

In the end, doExecute requests the <> physical operator to <> (that triggers physical query planning and generates an RDD[InternalRow]) and transforms it by executing the following function on internal rows per partition with index (using RDD.mapPartitionsWithIndexInternal that creates another RDD):

. Creates a partition filter as a new <> (for the <> expression and the <> of the <> physical operator)

. Requests the generated partition filter Predicate to initialize (with 0 partition index)

. Filters out elements from the partition iterator (Iterator[InternalRow]) by requesting the generated partition filter Predicate to evaluate for every InternalRow .. Increments the <> metric for positive evaluations (i.e. that returned true)

NOTE: doExecute (by RDD.mapPartitionsWithIndexInternal) adds a new MapPartitionsRDD to the RDD lineage. Use RDD.toDebugString to see the additional MapPartitionsRDD.


Last update: 2020-11-07