Skip to content

SQLContext

[CAUTION]

As of Spark 2.0.0 SQLContext is only for backward compatibility and is a mere wrapper of SparkSession.md[SparkSession].

In the pre-Spark 2.0's ear, SQLContext was the entry point for Spark SQL. Whatever you did in Spark SQL it had to start from <>.

A SQLContext object requires a SparkContext, a CacheManager, and a SQLListener. They are all transient and do not participate in serializing a SQLContext.

You should use SQLContext for the following:

  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>
  • <>

=== [[creating-instance]] Creating SQLContext Instance

You can create a SQLContext using the following constructors:

  • SQLContext(sc: SparkContext)
  • SQLContext.getOrCreate(sc: SparkContext)
  • SQLContext.newSession() allows for creating a new instance of SQLContext with a separate SQL configuration (through a shared SparkContext).

=== [[setting-configuration-properties]] Setting Configuration Properties

You can set Spark SQL configuration properties using:

  • setConf(props: Properties): Unit
  • setConf(key: String, value: String): Unit

You can get the current value of a configuration property by key using:

  • getConf(key: String): String
  • getConf(key: String, defaultValue: String): String
  • getAllConfs: immutable.Map[String, String]

NOTE: Properties that start with spark.sql are reserved for Spark SQL.

=== [[creating-dataframes]] Creating DataFrames

==== emptyDataFrame

[source, scala]

emptyDataFrame: DataFrame

emptyDataFrame creates an empty DataFrame. It calls createDataFrame with an empty RDD[Row] and an empty schema spark-sql-schema.md[StructType(Nil)].

==== createDataFrame for RDD and Seq

[source, scala]

createDataFrameA <: Product: DataFrame createDataFrameA <: Product: DataFrame


createDataFrame family of methods can create a DataFrame from an RDD of Scala's Product types like case classes or tuples or Seq thereof.

==== createDataFrame for RDD of Row with Explicit Schema

[source, scala]

createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

This variant of createDataFrame creates a DataFrame from RDD of spark-sql-Row.md[Row] and explicit schema.

=== [[registering-udfs]] Registering User-Defined Functions (UDF)

[source, scala]

udf: UDFRegistration

udf method gives you access to UDFRegistration to manipulate user-defined functions. Functions registered using udf are available for Hive queries only.

TIP: Read up on UDFs in spark-sql-udfs.md[UDFs -- User-Defined Functions] document.

[source, scala]

// Create a DataFrame val df = Seq("hello", "world!").zip(0 to 1).toDF("text", "id")

// Register the DataFrame as a temporary table in Hive df.registerTempTable("texts")

scala> sql("SHOW TABLES").show +---------+-----------+ |tableName|isTemporary| +---------+-----------+ | texts| true| +---------+-----------+

scala> sql("SELECT * FROM texts").show +------+---+ | text| id| +------+---+ | hello| 0| |world!| 1| +------+---+

// Just a Scala function val my_upper: String => String = _.toUpperCase

// Register the function as UDF spark.udf.register("my_upper", my_upper)

scala> sql("SELECT *, my_upper(text) AS MY_UPPER FROM texts").show +------+---+--------+ | text| id|MY_UPPER| +------+---+--------+ | hello| 0| HELLO| |world!| 1| WORLD!| +------+---+--------+


=== [[caching-dataframes]] Caching DataFrames in In-Memory Cache

[source, scala]

isCached(tableName: String): Boolean

isCached method asks CacheManager whether tableName table is cached in memory or not. It simply requests CacheManager for CachedData and when exists, it assumes the table is cached.

[source, scala]

cacheTable(tableName: String): Unit

You can cache a table in memory using cacheTable.

CAUTION: Why would I want to cache a table?

[source, scala]

uncacheTable(tableName: String) clearCache(): Unit


uncacheTable and clearCache remove one or all in-memory cached tables.

=== [[implicits]] Implicits -- SQLContext.implicits

The implicits object is a helper class with methods to convert objects into Dataset.md[Datasets] and spark-sql-DataFrame.md[DataFrames], and also comes with many Encoders for "primitive" types as well as the collections thereof.

[NOTE]

Import the implicits by import spark.implicits._ as follows:

[source, scala]

val spark = new SQLContext(sc) import spark.implicits._


====

It holds Encoders for Scala "primitive" types like Int, Double, String, and their collections.

It offers support for creating Dataset from RDD of any types (for which an encoder exists in scope), or case classes or tuples, and Seq.

It also offers conversions from Scala's Symbol or $ to Column.

It also offers conversions from RDD or Seq of Product types (e.g. case classes or tuples) to DataFrame. It has direct conversions from RDD of Int, Long and String to DataFrame with a single column name _1.

NOTE: It is not possible to call toDF methods on RDD objects of other "primitive" types except Int, Long, and String.

=== [[creating-datasets]][[createDataset]] Creating Datasets

[source, scala]

createDatasetT: Encoder: Dataset[T] createDatasetT: Encoder: Dataset[T]


createDataset family of methods creates a Dataset.md[Dataset] from a collection of elements of type T, be it a regular Scala Seq or Spark's RDD.

It requires that there is an encoder in scope.

NOTE: <> brings many encoders available in scope.

=== [[read]] Accessing DataFrameReader (read method)

[source, scala]

read: DataFrameReader

The experimental read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.

=== [[creating-external-tables]][[createExternalTable]] Creating External Tables

[source, scala]

createExternalTable(tableName: String, path: String): DataFrame createExternalTable(tableName: String, path: String, source: String): DataFrame createExternalTable(tableName: String, source: String, options: Map[String, String]): DataFrame createExternalTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame


The experimental createExternalTable family of methods is used to create an external table tableName and return a corresponding DataFrame.

CAUTION: FIXME What is an external table?

It assumes parquet as the default data source format that you can change using spark.sql.sources.default configuration property.

=== [[dropping-temporary-tables]] Dropping Temporary Tables

[source, scala]

dropTempTable(tableName: String): Unit

dropTempTable method drops a temporary table tableName.

CAUTION: FIXME What is a temporary table?

=== [[range]][[range-method]] Creating Dataset[Long] (range method)

[source, scala]

range(end: Long): Dataset[Long] range(start: Long, end: Long): Dataset[Long] range(start: Long, end: Long, step: Long): Dataset[Long] range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]


The range family of methods creates a Dataset[Long] with the sole id column of LongType for given start, end, and step.

NOTE: The three first variants use spark-SparkContext.md#defaultParallelism[SparkContext.defaultParallelism] for the number of partitions numPartitions.

[source, scala]

scala> spark.range(5) res0: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> .show +---+ | id| +---+ | 0| | 1| | 2| | 3| | 4| +---+


=== [[creating-dataframes-for-table]] Creating DataFrames for Table

[source, scala]

table(tableName: String): DataFrame

table method creates a tableName table and returns a corresponding DataFrame.

=== [[listing-existing-tables]] Listing Existing Tables

[source, scala]

tables(): DataFrame tables(databaseName: String): DataFrame


table methods return a DataFrame that holds names of existing tables in a database.

[source, scala]

scala> spark.tables.show +---------+-----------+ |tableName|isTemporary| +---------+-----------+ | t| true| | t2| true| +---------+-----------+


The schema consists of two columns - tableName of StringType and isTemporary of BooleanType.

NOTE: tables is a result of SHOW TABLES [IN databaseName].

[source, scala]

tableNames(): Array[String] tableNames(databaseName: String): Array[String]


tableNames are similar to tables with the only difference that they return Array[String] which is a collection of table names.

=== [[accessing-StreamingQueryManager]] Accessing StreamingQueryManager

[source, scala]

streams: StreamingQueryManager

The streams method returns a spark-sql-streaming-StreamingQueryManager.md[StreamingQueryManager] that is used to...TK

CAUTION: FIXME

=== [[getOrCreate]] Managing Active SQLContext for JVM

[source, scala]

SQLContext.getOrCreate(sparkContext: SparkContext): SQLContext

SQLContext.getOrCreate method returns an active SQLContext object for the JVM or creates a new one using a given sparkContext.

NOTE: It is a factory-like method that works on SQLContext class.

Interestingly, there are two helper methods to set and clear the active SQLContext object - setActive and clearActive respectively.

[source, scala]

setActive(spark: SQLContext): Unit clearActive(): Unit


=== [[sql]][[executing-sql-queries]] Executing SQL Queries

[source, scala]

sql(sqlText: String): DataFrame

sql executes the sqlText SQL query.

NOTE: It supports Hive statements through spark-sql-hive-integration.md[HiveContext].

scala> sql("set spark.sql.hive.version").show(false)
16/04/10 15:19:36 INFO HiveSqlParser: Parsing command: set spark.sql.hive.version
+----------------------+-----+
|key                   |value|
+----------------------+-----+
|spark.sql.hive.version|1.2.1|
+----------------------+-----+

scala> sql("describe database extended default").show(false)
16/04/10 15:21:14 INFO HiveSqlParser: Parsing command: describe database extended default
+-------------------------+--------------------------+
|database_description_item|database_description_value|
+-------------------------+--------------------------+
|Database Name            |default                   |
|Description              |Default Hive database     |
|Location                 |file:/user/hive/warehouse |
|Properties               |                          |
+-------------------------+--------------------------+

// Create temporary table
scala> spark.range(10).registerTempTable("t")
16/04/14 23:34:31 INFO HiveSqlParser: Parsing command: t

scala> sql("CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t")
16/04/14 23:34:38 INFO HiveSqlParser: Parsing command: CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t

scala> spark.tables.show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|        t|       true|
|       t2|       true|
+---------+-----------+

sql parses sqlText using a dialect that can be set up using spark.sql.dialect setting.

[NOTE]

sql is imported in spark-shell so you can execute Hive statements without spark prefix.

scala> :imports
 1) import spark.implicits._  (52 terms, 31 are implicit)
 2) import spark.sql          (1 terms)

TIP: You may also use spark-sql-spark-sql.md[spark-sql shell script] to interact with Hive.

Internally, it uses SessionState.sqlParser.parsePlan(sql) method to create a spark-sql-LogicalPlan.md[LogicalPlan].

CAUTION: FIXME Review

[source, scala]

scala> sql("show tables").show(false) 16/04/09 13:05:32 INFO HiveSqlParser: Parsing command: show tables +---------+-----------+ |tableName|isTemporary| +---------+-----------+ |dafa |false | +---------+-----------+


[TIP]

Enable INFO logging level for the loggers that correspond to the spark-sql-AbstractSqlParser.md[AbstractSqlParser] to see what happens inside sql.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.hive.execution.HiveSqlParser=INFO

Refer to spark-logging.md[Logging].

=== [[newSession]] Creating New Session

[source, scala]

newSession(): SQLContext

You can use newSession method to create a new session without a cost of instantiating a new SqlContext from scratch.

newSession returns a new SqlContext that shares SparkContext, CacheManager, SQLListener, and ExternalCatalog.

CAUTION: FIXME Why would I need that?


Last update: 2020-11-25