Skip to content

Hint Framework

Structured queries can be optimized using Hint Framework that allows for specifying query hints.

Query hints allow for annotating a query and give a hint to the query optimizer how to optimize logical plans. This can be very useful when the query optimizer cannot make optimal decision, e.g. with respect to join methods due to conservativeness or the lack of proper statistics.

Spark SQL supports COALESCE and REPARTITION and BROADCAST hints. All remaining unresolved hints are silently removed from a query plan at analysis.

Hint Framework was added in Apache Spark 2.2.

Specifying Query Hints

Hints are defined using Dataset.hint operator or SQL hints.

Dataset API

val q = spark.range(1).hint(name = "myHint", 100, true)
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'UnresolvedHint myHint, [100, true]
01 +- Range (0, 1, step=1, splits=Some(8))

SQL

val q = sql("SELECT /*+ myHint (100, true) */ 1")
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'UnresolvedHint myHint, [100, true]
01 +- 'Project [unresolvedalias(1, None)]
02    +- OneRowRelation

SELECT SQL Statements With Hints

SELECT SQL statement supports query hints as comments in SQL query that Spark SQL translates into a UnresolvedHint unary logical operator.

COALESCE and REPARTITION Hints

Spark SQL 2.4 added support for COALESCE and REPARTITION hints (using SQL comments):

  • SELECT /*+ COALESCE(5) */ ...

  • SELECT /*+ REPARTITION(3) */ ...

Broadcast Hints

Spark SQL 2.2 supports BROADCAST hints using broadcast standard function or SQL comments:

  • SELECT /*+ MAPJOIN(b) */ ...

  • SELECT /*+ BROADCASTJOIN(b) */ ...

  • SELECT /*+ BROADCAST(b) */ ...

broadcast Standard Function

While hint operator allows for attaching any hint to a logical plan broadcast standard function attaches the broadcast hint only (that actually makes it a special case of hint operator).

broadcast standard function is used for broadcast joins (aka map-side joins) (and to hint the Spark planner to broadcast a dataset regardless of the size).

val small = spark.range(1)
val large = spark.range(100)

// Let's use broadcast standard function first
val q = large.join(broadcast(small), "id")
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'Join UsingJoin(Inner,List(id))
01 :- Range (0, 100, step=1, splits=Some(8))
02 +- ResolvedHint (broadcast)
03    +- Range (0, 1, step=1, splits=Some(8))

// Please note that broadcast standard function uses ResolvedHint not UnresolvedHint

// Let's "replicate" standard function using hint operator
// Any of the names would work (case-insensitive)
// "BROADCAST", "BROADCASTJOIN", "MAPJOIN"
val smallHinted = small.hint("broadcast")
val plan = smallHinted.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'UnresolvedHint broadcast
01 +- Range (0, 1, step=1, splits=Some(8))

// join is "clever"
// i.e. resolves UnresolvedHint into ResolvedHint immediately
val q = large.join(smallHinted, "id")
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'Join UsingJoin(Inner,List(id))
01 :- Range (0, 100, step=1, splits=Some(8))
02 +- ResolvedHint (broadcast)
03    +- Range (0, 1, step=1, splits=Some(8))

Spark Analyzer

Logical Analyzer uses the following logical rules to resolve UnresolvedHint logical operators:

The order of executing the above rules matters.

// Let's hint the query twice
// The order of hints matters as every hint operator executes Spark analyzer
// That will resolve all but the last hint
val q = spark.range(100).
  hint("broadcast").
  hint("myHint", 100, true)
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'UnresolvedHint myHint, [100, true]
01 +- ResolvedHint (broadcast)
02    +- Range (0, 100, step=1, splits=Some(8))

// Let's resolve unresolved hints
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.analysis.ResolveHints
import org.apache.spark.sql.internal.SQLConf
object HintResolver extends RuleExecutor[LogicalPlan] {
  lazy val batches =
    Batch("Hints", FixedPoint(maxIterations = 100),
      new ResolveHints.ResolveBroadcastHints(SQLConf.get),
      ResolveHints.RemoveAllHints) :: Nil
}
val resolvedPlan = HintResolver.execute(plan)
scala> println(resolvedPlan.numberedTreeString)
00 ResolvedHint (broadcast)
01 +- Range (0, 100, step=1, splits=Some(8))

Hint Operator in Catalyst DSL

hint operator in Catalyst DSL allows to create a UnresolvedHint logical operator.

// Create a logical plan to add hint to
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
val r1 = LocalRelation('a.int, 'b.timestamp, 'c.boolean)
scala> println(r1.numberedTreeString)
00 LocalRelation <empty>, [a#0, b#1, c#2]

// Attach hint to the plan
import org.apache.spark.sql.catalyst.dsl.plans._
val plan = r1.hint(name = "myHint", 100, true)
scala> println(plan.numberedTreeString)
00 'UnresolvedHint myHint, [100, true]
01 +- LocalRelation <empty>, [a#0, b#1, c#2]
Back to top