DataSourceStrategy Execution Planning Strategy¶
DataSourceStrategy
is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec
and ProjectExec
logical operators).
[[apply]] [[selection-requirements]] .DataSourceStrategy's Selection Requirements (in execution order) [cols="1,2",options="header",width="100%"] |=== | Logical Operator | Description
| LogicalRelation with a CatalystScan relation | [[CatalystScan]] Uses <scanBuilder
).
CatalystScan
does not seem to be used in Spark SQL.
| LogicalRelation with PrunedFilteredScan relation | [[PrunedFilteredScan]] Uses <scanBuilder
).
Matches JDBCRelation exclusively
| LogicalRelation with a PrunedScan relation | [[PrunedScan]] Uses <scanBuilder
).
PrunedScan
does not seem to be used in Spark SQL.
| LogicalRelation with a TableScan relation a| [[TableScan]] Creates a RowDataSourceScanExec directly (requesting the TableScan
to buildScan followed by RDD conversion to RDD[InternalRow])
Matches KafkaRelation exclusively |===
[source, scala]¶
import org.apache.spark.sql.execution.datasources.DataSourceStrategy val strategy = DataSourceStrategy(spark.sessionState.conf)
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan val plan: LogicalPlan = ???
val sparkPlan = strategy(plan).head¶
=== [[pruneFilterProject]] pruneFilterProject
Internal Method
[source, scala]¶
pruneFilterProject( relation: LogicalRelation, projects: Seq[NamedExpression], filterPredicates: Seq[Expression], scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow])
pruneFilterProject
simply calls <scanBuilder
ignoring the Seq[Expression]
input parameter.
pruneFilterProject
is used when DataSourceStrategy
execution planning strategy is <
Selecting Catalyst Expressions Convertible to Data Source Filter Predicates¶
selectFilters(
relation: BaseRelation,
predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter])
selectFilters
builds a map of Catalyst predicate expressions (from the input predicates
) that can be translated to a data source filter predicate.
selectFilters
then requests the input BaseRelation for unhandled filters (out of the convertible ones that selectFilters
built the map with).
In the end, selectFilters
returns a 3-element tuple with the following:
-
Inconvertible and unhandled Catalyst predicate expressions
-
All converted data source filters
-
Pushed-down data source filters (that the input
BaseRelation
can handle)
selectFilters
is used when DataSourceStrategy
execution planning strategy is executed (and creates a RowDataSourceScanExec physical operator).
=== [[translateFilter]] Translating Catalyst Expression Into Data Source Filter Predicate -- translateFilter
Method
[source, scala]¶
translateFilter(predicate: Expression): Option[Filter]¶
translateFilter
translates a expressions/Expression.md[Catalyst expression] into a corresponding Filter predicate if possible. If not, translateFilter
returns None
.
[[translateFilter-conversions]] .translateFilter's Conversions [cols="1,1",options="header",width="100%"] |=== | Catalyst Expression | Filter Predicate
| EqualTo
| EqualTo
| EqualNullSafe
| EqualNullSafe
| GreaterThan
| GreaterThan
| LessThan
| LessThan
| GreaterThanOrEqual
| GreaterThanOrEqual
| LessThanOrEqual
| LessThanOrEqual
| spark-sql-Expression-InSet.md[InSet] | In
| spark-sql-Expression-In.md[In] | In
| IsNull
| IsNull
| IsNotNull
| IsNotNull
| And
| And
| Or
| Or
| Not
| Not
| StartsWith
| StringStartsWith
| EndsWith
| StringEndsWith
| Contains
| StringContains
|===
NOTE: The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages, i.e. org.apache.spark.sql.catalyst.expressions
and org.apache.spark.sql.sources
, respectively.
translateFilter
is used when:
- FileSourceScanExec is created (and initializes pushedDownFilters)
DataSourceStrategy
is requested to selectFilters- PushDownOperatorsToDataSource logical optimization is executed (for DataSourceV2Relation leaf operators with a SupportsPushDownFilters data source reader)
RDD Conversion (Converting RDD of Rows to Catalyst RDD of InternalRows)¶
toCatalystRDD(
relation: LogicalRelation,
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow]
toCatalystRDD(
relation: LogicalRelation,
rdd: RDD[Row]) // <1>
toCatalystRDD
branches off per the needConversion flag of the BaseRelation of the input LogicalRelation:
-
when
true
,toCatalystRDD
converts the objects inside Rows to Catalyst types. -
otherwise,
toCatalystRDD
casts the inputRDD[Row]
to anRDD[InternalRow]
(using Java'sasInstanceOf
operator)
toCatalystRDD
is used when DataSourceStrategy
execution planning strategy is executed (for all kinds of BaseRelations).
Creating RowDataSourceScanExec Physical Operator for LogicalRelation¶
pruneFilterProjectRaw(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan
pruneFilterProjectRaw
creates a RowDataSourceScanExec leaf physical operator with the LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).
In other words, pruneFilterProjectRaw
simply converts a <
Note
pruneFilterProjectRaw
is almost like SparkPlanner.pruneFilterProject.
Internally, pruneFilterProjectRaw
splits the input filterPredicates
expressions to <LogicalRelation
).
pruneFilterProjectRaw
combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And
binary expression (that creates a so-called filterCondition
that will eventually be used to create a <
pruneFilterProjectRaw
creates a <
If it is possible to use a column pruning only to get the right projection and if the columns of this projection are enough to evaluate all filter conditions, pruneFilterProjectRaw
creates a <RowDataSourceScanExec
leaf physical operator as the child).
NOTE: In this case no extra <
Otherwise, pruneFilterProjectRaw
creates a <RowDataSourceScanExec
leaf physical operator as the child) that in turn becomes the <
pruneFilterProjectRaw
is used when DataSourceStrategy
execution planning strategy is executed.