Skip to content

DataSourceStrategy Execution Planning Strategy

DataSourceStrategy is an execution planning strategy (of SparkPlanner) that plans LogicalRelation logical operators as RowDataSourceScanExec physical operators (possibly under FilterExec and ProjectExec logical operators).

[[apply]] [[selection-requirements]] .DataSourceStrategy's Selection Requirements (in execution order) [cols="1,2",options="header",width="100%"] |=== | Logical Operator | Description

| LogicalRelation with a CatalystScan relation | [[CatalystScan]] Uses <> (with the <> as part of scanBuilder).

CatalystScan does not seem to be used in Spark SQL.

| LogicalRelation with PrunedFilteredScan relation | [[PrunedFilteredScan]] Uses <> (with the <> as part of scanBuilder).

Matches JDBCRelation exclusively

| LogicalRelation with a PrunedScan relation | [[PrunedScan]] Uses <> (with the <> as part of scanBuilder).

PrunedScan does not seem to be used in Spark SQL.

| LogicalRelation with a TableScan relation a| [[TableScan]] Creates a RowDataSourceScanExec directly (requesting the TableScan to buildScan followed by RDD conversion to RDD[InternalRow])

Matches KafkaRelation exclusively |===

[source, scala]

import org.apache.spark.sql.execution.datasources.DataSourceStrategy val strategy = DataSourceStrategy(spark.sessionState.conf)

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan val plan: LogicalPlan = ???

val sparkPlan = strategy(plan).head

=== [[pruneFilterProject]] pruneFilterProject Internal Method

[source, scala]

pruneFilterProject( relation: LogicalRelation, projects: Seq[NamedExpression], filterPredicates: Seq[Expression], scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow])


pruneFilterProject simply calls <> with scanBuilder ignoring the Seq[Expression] input parameter.

pruneFilterProject is used when DataSourceStrategy execution planning strategy is <> (for LogicalRelation logical operators with a PrunedFilteredScan or a PrunedScan).

Selecting Catalyst Expressions Convertible to Data Source Filter Predicates

selectFilters(
  relation: BaseRelation,
  predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter])

selectFilters builds a map of Catalyst predicate expressions (from the input predicates) that can be translated to a data source filter predicate.

selectFilters then requests the input BaseRelation for unhandled filters (out of the convertible ones that selectFilters built the map with).

In the end, selectFilters returns a 3-element tuple with the following:

  1. Inconvertible and unhandled Catalyst predicate expressions

  2. All converted data source filters

  3. Pushed-down data source filters (that the input BaseRelation can handle)

selectFilters is used when DataSourceStrategy execution planning strategy is executed (and creates a RowDataSourceScanExec physical operator).

=== [[translateFilter]] Translating Catalyst Expression Into Data Source Filter Predicate -- translateFilter Method

[source, scala]

translateFilter(predicate: Expression): Option[Filter]

translateFilter translates a expressions/Expression.md[Catalyst expression] into a corresponding Filter predicate if possible. If not, translateFilter returns None.

[[translateFilter-conversions]] .translateFilter's Conversions [cols="1,1",options="header",width="100%"] |=== | Catalyst Expression | Filter Predicate

| EqualTo | EqualTo

| EqualNullSafe | EqualNullSafe

| GreaterThan | GreaterThan

| LessThan | LessThan

| GreaterThanOrEqual | GreaterThanOrEqual

| LessThanOrEqual | LessThanOrEqual

| spark-sql-Expression-InSet.md[InSet] | In

| spark-sql-Expression-In.md[In] | In

| IsNull | IsNull

| IsNotNull | IsNotNull

| And | And

| Or | Or

| Not | Not

| StartsWith | StringStartsWith

| EndsWith | StringEndsWith

| Contains | StringContains |===

NOTE: The Catalyst expressions and their corresponding data source filter predicates have the same names in most cases but belong to different Scala packages, i.e. org.apache.spark.sql.catalyst.expressions and org.apache.spark.sql.sources, respectively.

translateFilter is used when:

RDD Conversion (Converting RDD of Rows to Catalyst RDD of InternalRows)

toCatalystRDD(
  relation: LogicalRelation,
  output: Seq[Attribute],
  rdd: RDD[Row]): RDD[InternalRow]
toCatalystRDD(
  relation: LogicalRelation,
  rdd: RDD[Row]) // <1>

toCatalystRDD branches off per the needConversion flag of the BaseRelation of the input LogicalRelation:

toCatalystRDD is used when DataSourceStrategy execution planning strategy is executed (for all kinds of BaseRelations).

Creating RowDataSourceScanExec Physical Operator for LogicalRelation

pruneFilterProjectRaw(
  relation: LogicalRelation,
  projects: Seq[NamedExpression],
  filterPredicates: Seq[Expression],
  scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]): SparkPlan

pruneFilterProjectRaw creates a RowDataSourceScanExec leaf physical operator with the LogicalRelation leaf logical operator (possibly as a child of a FilterExec and a ProjectExec unary physical operators).

In other words, pruneFilterProjectRaw simply converts a <> leaf logical operator into a <> leaf physical operator (possibly under a <> and a <> unary physical operators).

Note

pruneFilterProjectRaw is almost like SparkPlanner.pruneFilterProject.

Internally, pruneFilterProjectRaw splits the input filterPredicates expressions to <> (and handled by the <> of the LogicalRelation).

pruneFilterProjectRaw combines all expressions that are neither convertible to data source filters nor can be handled by the relation using And binary expression (that creates a so-called filterCondition that will eventually be used to create a <> physical operator if non-empty).

pruneFilterProjectRaw creates a <> leaf physical operator.

If it is possible to use a column pruning only to get the right projection and if the columns of this projection are enough to evaluate all filter conditions, pruneFilterProjectRaw creates a <> unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec leaf physical operator as the child).

NOTE: In this case no extra <> unary physical operator is created.

Otherwise, pruneFilterProjectRaw creates a <> unary physical operator (with the unhandled predicate expressions and the RowDataSourceScanExec leaf physical operator as the child) that in turn becomes the <> of a new <> unary physical operator.

pruneFilterProjectRaw is used when DataSourceStrategy execution planning strategy is executed.


Last update: 2021-05-04