Skip to content

Catalyst Expressions

Expression is an extension of the TreeNode abstraction for executable nodes.

Expression is a executable node (in a Catalyst multi-tree) that can be evaluated to a value for an input row (e.g. produces a JVM object for an InternalRow).

Expression is often referred to as a Catalyst expression, but it is simply built using the Catalyst Tree Manipulation Framework.

// evaluating an expression
// Use Literal expression to create an expression from a Scala object
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
val e: Expression = Literal("hello")

import org.apache.spark.sql.catalyst.expressions.EmptyRow
val v: Any = e.eval(EmptyRow)

// Convert to Scala's String
import org.apache.spark.unsafe.types.UTF8String
val s = v.asInstanceOf[UTF8String].toString
assert(s == "hello")

Expression can generate a Java source code that is then used in code-gen non-interpreted evaluation.

[[specialized-expressions]] .Specialized Expressions [cols="1,2,2,1",options="header",width="100%"] |=== | Name | Scala Kind | Behaviour | Examples

| [[BinaryExpression]] BinaryExpression | abstract class | a|

  • expressions/[UnixTimestamp]

| [[CodegenFallback]][CodegenFallback] | trait | Does not support code generation and falls back to interpreted mode a|


| <> | trait | | [[ExpectsInputTypes]]

| [[ExtractValue]] ExtractValue | trait | Marks UnresolvedAliases to be resolved to[Aliases] with "pretty" SQLs when ResolveAliases is executed a|

| [[LeafExpression]] LeafExpression | abstract class | Has no child expressions (and hence "terminates" the expression tree). a|

[[NamedExpression]] expressions/[NamedExpression]
Can later be referenced in a dataflow graph.

| [[Nondeterministic]][Nondeterministic] | trait | |

| [[NonSQLExpression]] NonSQLExpression | trait | Expression with no SQL representation

Gives the only custom <> method that is non-overridable (i.e. final).

When requested <>, NonSQLExpression transforms[Attributes] to be PrettyAttributes to build text representation. a|

  • expressions/[ScalaUDAF]
  • expressions/[TimeWindow]

| [[Predicate]] Predicate | trait | Result data type is always boolean a| * And * AtLeastNNonNulls *[Exists] *[In] *[InSet]

| [[TernaryExpression]] TernaryExpression | abstract class | |

| [[TimeZoneAwareExpression]] TimeZoneAwareExpression | trait | Timezone-aware expressions a|

  • expressions/[UnixTimestamp]
  • expressions/[JsonToStructs]

| [[UnaryExpression]] <> | abstract class | a|

  • expressions/[ExplodeBase]
  • expressions/[JsonToStructs]

| Unevaluable | trait a| [[Unevaluable]] Cannot be evaluated to produce a value (neither in <> nor <> expression evaluations), i.e. <> and <> are not supported and simply report an UnsupportedOperationException.

Example: Analysis failure due to an Unevaluable expression
UnresolvedFunction is an Unevaluable expression
Using Catalyst DSL to create a UnresolvedFunction
import org.apache.spark.sql.catalyst.dsl.expressions._
val f = 'f.function()

import org.apache.spark.sql.catalyst.dsl.plans._
val logicalPlan = table("t1").select(f)
scala> println(logicalPlan.numberedTreeString)
00 'Project [unresolvedalias('f(), None)]
01 +- 'UnresolvedRelation `t1`

scala> spark.sessionState.analyzer.execute(logicalPlan)
org.apache.spark.sql.AnalysisException: Undefined function: 'f'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.;
  at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1198)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1198)
  at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1197)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1195)


deterministic Flag

Expression is deterministic when evaluates to the same result for the same input(s). An expression is deterministic if all the child expressions are (which for <> with no child expressions is always true).

NOTE: A deterministic expression is like a[pure function] in functional programming languages.

val e = $"a".expr
scala> :type e

scala> println(e.deterministic)

NOTE: Non-deterministic expressions are not allowed in some logical operators and are excluded in some optimizations.

=== [[contract]] Expression Contract

[source, scala]

package org.apache.spark.sql.catalyst.expressions

abstract class Expression extends TreeNode[Expression] { // only required methods that have no implementation def dataType: DataType def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode def eval(input: InternalRow = EmptyRow): Any def nullable: Boolean }

.(Subset of) Expression Contract [cols="1,2",options="header",width="100%"] |=== | Method | Description

[[canonicalized]] canonicalized

| [[checkInputDataTypes]] checkInputDataTypes | Verifies (checks the correctness of) the input data types

[[childrenResolved]] childrenResolved

| [[dataType]] dataType | Data type of the result of evaluating an expression

| [[doGenCode]] doGenCode | Code-generated expression evaluation that generates a Java source code (that is used to evaluate the expression in a more optimized way not directly using <>).

Used when Expression is requested to <>.

| [[eval]] eval a| Interpreted (non-code-generated) expression evaluation that evaluates an expression to a JVM object for a given InternalRow (without <>.)

NOTE: By default accepts EmptyRow, i.e. null.

eval is a slower "relative" of the <>.

[[foldable]] foldable

| [[genCode]] genCode | Generates the Java source code for code-generated (non-interpreted) expression evaluation (on an input InternalRow (in a more optimized way not directly using <>).

Similar to <> but supports expression reuse (aka[subexpression elimination]).

genCode is a faster "relative" of the <>.

[[nullable]] nullable

| [[prettyName]] prettyName | User-facing name

[[references]] references
[[resolved]] resolved
[[semanticEquals]] semanticEquals
[[semanticHash]] semanticHash

=== [[reduceCodeSize]] reduceCodeSize Internal Method

[source, scala]

reduceCodeSize(ctx: CodegenContext, eval: ExprCode): Unit

reduceCodeSize does its work only when all of the following are met:

  1. Length of the generate code is above 1024

  2. INPUT_ROW of the input CodegenContext is defined

  3. currentVars of the input CodegenContext is not defined

CAUTION: FIXME When would the above not be met? What's so special about such an expression?

reduceCodeSize sets the value of the input ExprCode to the fresh term name for the value name.

In the end, reduceCodeSize sets the code of the input ExprCode to the following:

[javaType] [newValue] = [funcFullName]([INPUT_ROW]);

The funcFullName is the fresh term name for the name of the current expression node.

TIP: Use the expression node name to search for the function that corresponds to the expression in a generated code.

NOTE: reduceCodeSize is used exclusively when Expression is requested to <>.

=== [[flatArguments]] flatArguments Method

[source, scala]

flatArguments: Iterator[Any]


NOTE: flatArguments is used when...FIXME

=== [[sql]] SQL Representation -- sql Method

[source, scala]

sql: String

sql gives a SQL representation.

Internally, sql gives a text representation with <> followed by sql of children in the round brackets and concatenated using the comma (,).

[source, scala]

import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.Sentences val sentences = Sentences("Hi there! Good morning.", "en", "US")

import org.apache.spark.sql.catalyst.expressions.Expression val expr: Expression = count("") === 5 && count(sentences) === 5 scala> expr.sql res0: String = ((count('') = 5) AND (count(sentences('Hi there! Good morning.', 'en', 'US')) = 5))

NOTE: sql is used when...FIXME

Last update: 2021-05-20