Skip to content

SparkSession — The Entry Point to Spark SQL

SparkSession is the entry point to Spark SQL. It is one of the first objects created in a Spark SQL application.

SparkSession is created using the SparkSession.builder method.

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
  .appName("My Spark Application")  // optional and will be autogenerated if not specified
  .master("local[*]")               // only for demo and testing purposes, use spark-submit instead
  .enableHiveSupport()              // self-explanatory, isn't it?
  .config("spark.sql.warehouse.dir", "target/spark-warehouse")
  .withExtensions { extensions =>
    extensions.injectResolutionRule { session =>
      ...
    }
    extensions.injectOptimizerRule { session =>
      ...
    }
  }
  .getOrCreate

SparkSession is a namespace of relational entities (e.g. databases, tables). A Spark SQL application could use many SparkSessions to keep the relational entities separate logically in metadata catalogs.

SparkSession in spark-shell

spark object in spark-shell (the instance of SparkSession that is auto-created) has Hive support enabled.

In order to disable the pre-configured Hive support in the spark object, use spark.sql.catalogImplementation internal configuration property with in-memory value (that uses InMemoryCatalog external catalog instead).

$ spark-shell --conf spark.sql.catalogImplementation=in-memory

Creating Instance

SparkSession takes the following to be created:

SparkSession is created when:

SessionState

sessionState: SessionState

sessionState is the current SessionState.

Internally, sessionState <> the optional <> (if given when <>) or <> using <> as defined by <> configuration property:

  • in-memory (default) for SessionStateBuilder.md[org.apache.spark.sql.internal.SessionStateBuilder]
  • hive for hive/HiveSessionStateBuilder.md[org.apache.spark.sql.hive.HiveSessionStateBuilder]

Creating New SparkSession

newSession(): SparkSession

newSession creates a new SparkSession with an undefined parent SessionState and (re)using the following:

SparkSession.newSession and SparkSession.cloneSession

SparkSession.newSession uses no parent SessionState while SparkSession.cloneSession (re)uses SessionState.

Cloning SparkSession

cloneSession(): SparkSession

cloneSession...FIXME

cloneSession is used when:

  • AdaptiveSparkPlanHelper is requested to getOrCloneSessionWithAqeOff
  • StreamExecution (Spark Structured Streaming) is created

Creating SparkSession Using Builder Pattern

builder(): Builder

builder is an object method that creates a new Builder to build a SparkSession using a fluent API.

import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder

TIP: Read about https://en.wikipedia.org/wiki/Fluent_interface[Fluent interface] design pattern in Wikipedia, the free encyclopedia.

Spark Version

version: String

version returns the version of Apache Spark in use.

Internally, version uses spark.SPARK_VERSION value that is the version property in spark-version-info.properties properties file on CLASSPATH.

Creating Empty Dataset (Given Encoder)

emptyDataset[T: Encoder]: Dataset[T]

emptyDataset creates an empty Dataset (assuming that future records being of type T).

scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]

scala> strings.printSchema
root
 |-- value: string (nullable = true)

emptyDataset creates a LocalRelation logical operator.

Creating Dataset from Local Collections or RDDs

createDataset[T : Encoder](
  data: RDD[T]): Dataset[T]
createDataset[T : Encoder](
  data: Seq[T]): Dataset[T]

createDataset creates a Dataset from a local Scala collection, i.e. Seq[T], Java's List[T], or a distributed RDD[T].

scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> one.show
+-----+
|value|
+-----+
|    1|
+-----+

createDataset creates logical operators:

implicits object

You may want to consider implicits object and toDS method instead.

val spark: SparkSession = ...
import spark.implicits._

scala> val one = Seq(1).toDS
one: org.apache.spark.sql.Dataset[Int] = [value: int]

Internally, createDataset first looks up the implicit ExpressionEncoder in scope to access the AttributeReferences (of the schema).

The expression encoder is then used to map elements (of the input Seq[T]) into a collection of InternalRows. With the references and rows, createDataset returns a Dataset.md[Dataset] with a LocalRelation.md[LocalRelation logical query plan].

Creating Dataset With Single Long Column

range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]

range method family create a Dataset of Long numbers.

scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
|  0|
|  2|
+---+

The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions.

Internally, range creates a new Dataset[Long] with Range leaf logical operator and Encoders.LONG encoder.

Executing SQL Queries (aka SQL Mode)

sql(sqlText: String): DataFrame

sql executes the sqlText SQL statement and creates a DataFrame.

Note

sql is imported in spark-shell so you can execute SQL statements as if sql were a part of the environment.

scala> :imports
1) import spark.implicits._       (72 terms, 43 are implicit)
2) import spark.sql               (1 terms)
scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]

scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []

// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")

scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata|      false|
+---------+-----------+

Internally, sql requests the SessionState.md#sqlParser[current ParserInterface] to spark-sql-ParserInterface.md#parsePlan[execute a SQL query] that gives a spark-sql-LogicalPlan.md[LogicalPlan].

NOTE: sql uses SessionState SessionState.md#sqlParser[to access the current ParserInterface].

sql then creates a DataFrame using the current SparkSession (itself) and the LogicalPlan.

spark-sql Command-Line Tool

Use spark-sql command-line tool to use SQL directly (not Scala as in spark-shell).

spark-sql> show databases;
default
Time taken: 0.028 seconds, Fetched 1 row(s)

Accessing UDFRegistration

udf: UDFRegistration

udf attribute is UDFRegistration (for registering user-defined functions for SQL-based queries).

val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)

val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")

scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
|    a|    A|
|    b|    B|
|    c|    C|
+-----+-----+

Internally, it is simply an alias for SessionState.udfRegistration.

Loading Data From Table

table(
  multipartIdentifier: Seq[String]): DataFrame
table(
  tableName: String): DataFrame
table(
  tableIdent: TableIdentifier): DataFrame

table creates a DataFrame for the input tableName table.

Note

baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into adocLogicalPlan object hierarchy that SparkSession uses to bridge them.

scala> spark.catalog.tableExists("t1")
res1: Boolean = true

// t1 exists in the catalog
// let's load it
val t1 = spark.table("t1")

Catalog

catalog: Catalog

catalog creates a CatalogImpl when first accessed.

lazy value

catalog is a Scala lazy value which is computed once when accessed and cached afterwards.

DataFrameReader

read: DataFrameReader

read gives DataFrameReader to load data from external data sources and load it into a DataFrame.

val spark: SparkSession = ... // create instance
val dfReader: DataFrameReader = spark.read

Runtime Configuration

conf: RuntimeConfig

conf returns the current RuntimeConfig.

Internally, conf creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf (of the SessionState).

ExperimentalMethods

experimental: ExperimentalMethods

experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.

experimental is used in SparkPlanner and SparkOptimizer.

Create DataFrame for BaseRelation

baseRelationToDataFrame(
  baseRelation: BaseRelation): DataFrame

Internally, baseRelationToDataFrame creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.

baseRelationToDataFrame is used when:

  • DataFrameReader is requested to load data from data source or JDBC table
  • TextInputCSVDataSource creates a base Dataset (of Strings)
  • TextInputJsonDataSource creates a base Dataset (of Strings)

Creating SessionState

instantiateSessionState(
  className: String,
  sparkSession: SparkSession): SessionState

instantiateSessionState finds the className that is then used to create and build a BaseSessionStateBuilder.

instantiateSessionState may report an IllegalArgumentException while instantiating the class of a SessionState:

Error while instantiating '[className]'

instantiateSessionState is used when SparkSession is requested for SessionState (based on spark.sql.catalogImplementation configuration property).

sessionStateClassName

sessionStateClassName(
  conf: SparkConf): String

sessionStateClassName gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.

sessionStateClassName is used when SparkSession is requested for the SessionState (and one is not available yet).

Creating DataFrame From RDD Of Internal Binary Rows and Schema

internalCreateDataFrame(
  catalystRows: RDD[InternalRow],
  schema: StructType,
  isStreaming: Boolean = false): DataFrame

internalCreateDataFrame creates a DataFrame with LogicalRDD.

internalCreateDataFrame is used when:

ExecutionListenerManager

listenerManager: ExecutionListenerManager

ExecutionListenerManager

SharedState

sharedState: SharedState

SharedState

Measuring Duration of Executing Code Block

time[T](f: => T): T

time executes a code block and prints out (to standard output) the time taken to execute it

Applying SparkSessionExtensions

applyExtensions(
  extensionConfClassNames: Seq[String],
  extensions: SparkSessionExtensions): SparkSessionExtensions

For every extension class name (in extensionConfClassNames) applyExtensions instantiates it and (since it's a function SparkSessionExtensions => Unit) passes the given SparkSessionExtensions in.

Note

The given SparkSessionExtensions is mutated in-place.

In case of ClassCastException, ClassNotFoundException or NoClassDefFoundError, applyExtensions prints out the following WARN message to the logs:

Cannot use [extensionConfClassName] to configure session extensions.

applyExtensions is used when:

Default Parallelism of Leaf Nodes

leafNodeDefaultParallelism: Int

leafNodeDefaultParallelism is the value of spark.sql.leafNodeDefaultParallelism if defined or SparkContext.defaultParallelism (Spark Core).

leafNodeDefaultParallelism is used when:

  • SparkSession is requested to range
  • RangeExec leaf physical operator is created
  • CommandResultExec physical operator is requested for the RDD[InternalRow]
  • LocalTableScanExec physical operator is requested for the RDD
  • FilePartition utility is used to maxSplitBytes
Back to top