Skip to content

FindDataSourceTable Logical Evaluation Rule -- Resolving UnresolvedCatalogRelations

FindDataSourceTable is a catalyst/[Catalyst rule] for <> (of Spark and Hive tables) in a logical query plan.

FindDataSourceTable is part of additional rules in Resolution fixed-point batch of rules.

[[sparkSession]][[creating-instance]] FindDataSourceTable takes a single SparkSession to be created.

scala> :type spark

// Example: InsertIntoTable with UnresolvedCatalogRelation
// Drop tables to make the example reproducible
val db = spark.catalog.currentDatabase
Seq("t1", "t2").foreach { t =>
  spark.sharedState.externalCatalog.dropTable(db, t, ignoreIfNotExists = true, purge = true)

// Create tables
sql("CREATE TABLE t1 (id LONG) USING parquet")
sql("CREATE TABLE t2 (id LONG) USING orc")

import org.apache.spark.sql.catalyst.dsl.plans._
val plan = table("t1").insertInto(tableName = "t2", overwrite = true)
scala> println(plan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'UnresolvedRelation `t1`

// Transform the logical plan with ResolveRelations logical rule first
// so UnresolvedRelations become UnresolvedCatalogRelations
import spark.sessionState.analyzer.ResolveRelations
val planWithUnresolvedCatalogRelations = ResolveRelations(plan)
scala> println(planWithUnresolvedCatalogRelations.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- 'SubqueryAlias t1
02    +- 'UnresolvedCatalogRelation `default`.`t1`,

// Let's resolve UnresolvedCatalogRelations then
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
val r = new FindDataSourceTable(spark)
val tablesResolvedPlan = r(planWithUnresolvedCatalogRelations)
// FIXME Why is t2 not resolved?!
scala> println(tablesResolvedPlan.numberedTreeString)
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
01 +- SubqueryAlias t1
02    +- Relation[id#10L] parquet

=== [[apply]] Applying Rule to Logical Plan (Resolving UnresolvedCatalogRelations) -- apply Method

[source, scala]

apply( plan: LogicalPlan): LogicalPlan

NOTE: apply is part of catalyst/[Rule] contract.

apply resolves[UnresolvedCatalogRelations] for Spark (Data Source) and Hive tables:

  • apply <> for[UnresolvedCatalogRelations] of[Spark tables] (incl.[InsertIntoTable] operators)

  • apply <> for[InsertIntoTable] operators with[UnresolvedCatalogRelation] of a Hive table or[UnresolvedCatalogRelations] of a Hive table

=== [[readHiveTable]] Creating HiveTableRelation Logical Operator -- readHiveTable Internal Method

[source, scala]

readHiveTable( table: CatalogTable): LogicalPlan

readHiveTable creates a hive/[HiveTableRelation] for the input CatalogTable.

NOTE: readHiveTable is used when FindDataSourceTable is requested to <> (for hive tables).

=== [[readDataSourceTable]] Creating LogicalRelation Logical Operator for CatalogTable -- readDataSourceTable Internal Method

[source, scala]

readDataSourceTable( table: CatalogTable): LogicalPlan

readDataSourceTable requests the <> for[SessionCatalog].

readDataSourceTable requests the SessionCatalog for the cached logical plan for the input CatalogTable.

If not available, readDataSourceTable creates a new DataSource for the provider (of the input CatalogTable) with the extra path option (based on the locationUri of the storage of the input CatalogTable). readDataSourceTable requests the DataSource to resolve the relation and create a corresponding BaseRelation that is then used to create a LogicalRelation with the input CatalogTable.

NOTE: readDataSourceTable is used when FindDataSourceTable is requested to <> (for data source tables).

Last update: 2020-11-07