Skip to content

DataFrameReader -- Loading Data From External Data Sources

DataFrameReader is a <> to describe the <> that will be used to <> (e.g. <>, <>, <> or <>).

DataFrameReader is <> (available) exclusively using <>.

[source, scala]

import org.apache.spark.sql.SparkSession assert(spark.isInstanceOf[SparkSession])

import org.apache.spark.sql.DataFrameReader val reader = spark.read assert(reader.isInstanceOf[DataFrameReader])


[[methods]] .DataFrameReader API [cols="1,2",options="header",width="100%"] |=== | Method | Description

| <> a|

[source, scala]

csv(csvDataset: Dataset[String]): DataFrame csv(path: String): DataFrame csv(paths: String*): DataFrame


| <> a|

[source, scala]

format(source: String): DataFrameReader

| <> a|

[source, scala]

jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame jdbc( url: String, table: String, properties: Properties): DataFrame jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame


| <> a|

[source, scala]

json(jsonDataset: Dataset[String]): DataFrame json(path: String): DataFrame json(paths: String*): DataFrame


| <> a|

[source, scala]

load(): DataFrame load(path: String): DataFrame load(paths: String*): DataFrame


| <> a|

[source, scala]

option(key: String, value: Boolean): DataFrameReader option(key: String, value: Double): DataFrameReader option(key: String, value: Long): DataFrameReader option(key: String, value: String): DataFrameReader


| <> a|

[source, scala]

options(options: scala.collection.Map[String, String]): DataFrameReader options(options: java.util.Map[String, String]): DataFrameReader


| <> a|

[source, scala]

orc(path: String): DataFrame orc(paths: String*): DataFrame


| <> a|

[source, scala]

parquet(path: String): DataFrame parquet(paths: String*): DataFrame


| <> a|

[source, scala]

schema(schemaString: String): DataFrameReader schema(schema: StructType): DataFrameReader


| <> a|

[source, scala]

table(tableName: String): DataFrame

| <> a|

[source, scala]

text(path: String): DataFrame text(paths: String*): DataFrame


| <> a|

[source, scala]

textFile(path: String): Dataset[String] textFile(paths: String*): Dataset[String]


|===

DataFrameReader supports many <> natively and offers the <>.

Note

DataFrameReader assumes <> data source file format by default that you can change using spark.sql.sources.default configuration property.

After you have described the loading pipeline (i.e. the "Extract" part of ETL in Spark SQL), you eventually "trigger" the loading using format-agnostic <> or format-specific (e.g. <>, <>, <>) operators.

[source, scala]

import org.apache.spark.sql.SparkSession val spark: SparkSession = ...

import org.apache.spark.sql.DataFrame

// Using format-agnostic load operator val csvs: DataFrame = spark .read .format("csv") .option("header", true) .option("inferSchema", true) .load("*.csv")

// Using format-specific load operator val jsons: DataFrame = spark .read .json("metrics/*.json")


NOTE: All <> of DataFrameReader merely describe a process of loading a data and do not trigger a Spark job (until an action is called).


DataFrameReader can read text files using <> methods that return typed Datasets.

[source, scala]

import org.apache.spark.sql.SparkSession val spark: SparkSession = ...

import org.apache.spark.sql.Dataset val lines: Dataset[String] = spark .read .textFile("README.md")


NOTE: Loading datasets using <> methods allows for additional preprocessing before final processing of the string values as <> or <> lines.


[[loading-dataset-of-string]] DataFrameReader can load datasets from Dataset[String] (with lines being complete "files") using format-specific <> and <> operators.

[source, scala]

val csvLine = "0,Warsaw,Poland"

import org.apache.spark.sql.Dataset val cities: Dataset[String] = Seq(csvLine).toDS scala> cities.show +---------------+ | value| +---------------+ |0,Warsaw,Poland| +---------------+

// Define schema explicitly (as below) // or // option("header", true) + option("inferSchema", true) import org.apache.spark.sql.types.StructType val schema = new StructType() .add("id".long.copy(nullable = false)) .add("city".string) .add($"country".string) scala> schema.printTreeString root |-- id: long (nullable = false) |-- city: string (nullable = true) |-- country: string (nullable = true)

import org.apache.spark.sql.DataFrame val citiesDF: DataFrame = spark .read .schema(schema) .csv(cities) scala> citiesDF.show +---+------+-------+ | id| city|country| +---+------+-------+ | 0|Warsaw| Poland| +---+------+-------+


=== [[format]] Specifying Format Of Input Data Source -- format method

[source, scala]

format(source: String): DataFrameReader

You use format to configure DataFrameReader to use appropriate source format.

Supported data formats:

  • json
  • csv (since 2.0.0)
  • parquet (see spark-parquet.md[Parquet])
  • orc
  • text
  • <>
  • libsvm -- only when used in format("libsvm")

Note

Spark SQL allows for custom data source formats.

=== [[schema]] Specifying Schema -- schema method

[source, scala]

schema(schema: StructType): DataFrameReader

schema allows for specifying the schema of a data source (that the DataFrameReader is about to read a dataset from).

[source, scala]

import org.apache.spark.sql.types.StructType val schema = new StructType() .add("id".long.copy(nullable = false)) .add("city".string) .add($"country".string) scala> schema.printTreeString root |-- id: long (nullable = false) |-- city: string (nullable = true) |-- country: string (nullable = true)

import org.apache.spark.sql.DataFrameReader val r: DataFrameReader = spark.read.schema(schema)


NOTE: Some formats can infer schema from datasets (e.g. <> or <>) using <> option.

TIP: Read up on spark-sql-schema.md[Schema].

=== [[option]][[options]] Specifying Load Options -- option and options Methods

[source, scala]

option(key: String, value: String): DataFrameReader option(key: String, value: Boolean): DataFrameReader option(key: String, value: Long): DataFrameReader option(key: String, value: Double): DataFrameReader


You can also use options method to describe different options in a single Map.

[source, scala]

options(options: scala.collection.Map[String, String]): DataFrameReader

=== [[creating-dataframes-from-files]] Loading Datasets from Files (into DataFrames) Using Format-Specific Load Operators

DataFrameReader supports the following file formats:

  • <>
  • <>
  • <>
  • <>
  • <>

==== [[json]] json method

[source, scala]

json(path: String): DataFrame json(paths: String*): DataFrame json(jsonDataset: Dataset[String]): DataFrame json(jsonRDD: RDD[String]): DataFrame


New in 2.0.0: prefersDecimal

==== [[csv]] csv method

[source, scala]

csv(path: String): DataFrame csv(paths: String*): DataFrame csv(csvDataset: Dataset[String]): DataFrame


==== [[parquet]] parquet method

[source, scala]

parquet(path: String): DataFrame parquet(paths: String*): DataFrame


The supported options:

  • <> (default: snappy)

New in 2.0.0: snappy is the default Parquet codec. See https://github.com/apache/spark/commit/2f0b882e5c8787b09bedcc8208e6dcc5662dbbab[[SPARK-14482][SQL] Change default Parquet codec from gzip to snappy].

[[compression]] The compressions supported:

  • none or uncompressed
  • snappy - the default codec in Spark 2.0.0.
  • gzip - the default codec in Spark before 2.0.0
  • lzo

[source, scala]

val tokens = Seq("hello", "henry", "and", "harry") .zipWithIndex .map(_.swap) .toDF("id", "token")

val parquetWriter = tokens.write parquetWriter.option("compression", "none").save("hello-none")

// The exception is mostly for my learning purposes // so I know where and how to find the trace to the compressions // Sorry... scala> parquetWriter.option("compression", "unsupported").save("hello-unsupported") java.lang.IllegalArgumentException: Codec [unsupported] is not available. Available codecs are uncompressed, gzip, lzo, snappy, none. at org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.(ParquetOptions.scala:43) at org.apache.spark.sql.execution.datasources.parquet.DefaultSource.prepareWrite(ParquetRelation.scala:77) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationanonfunrunrunrunrun1anonfun4.apply(InsertIntoHadoopFsRelation.scala:122) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation4.apply(InsertIntoHadoopFsRelation.scala:122) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation4.apply(InsertIntoHadoopFsRelation.scala:122) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation4.apply(InsertIntoHadoopFsRelation.scala:122) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationanonfunrun1anonfun4.apply(InsertIntoHadoopFsRelation.scala:122) at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationanonfunrunrunrunrun1.applymcVsp(InsertIntoHadoopFsRelation.scala:141) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationanonfunrunrunrunrun1.apply(InsertIntoHadoopFsRelation.scala:116) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationanonfunrunrunrunrun1.apply(InsertIntoHadoopFsRelation.scala:116) at org.apache.spark.sql.execution.SQLExecution.withNewExecutionId(SQLExecution.scala:53) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:116) at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResultlzycompute(commands.scala:61) at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult(commands.scala:59) at org.apache.spark.sql.execution.command.ExecutedCommand.doExecute(commands.scala:73) at org.apache.spark.sql.execution.SparkPlananonfunexecuteexecuteexecuteexecute1.apply(SparkPlan.scala:118) at org.apache.spark.sql.execution.SparkPlananonfunexecuteexecuteexecuteexecute1.apply(SparkPlan.scala:118) at org.apache.spark.sql.execution.SparkPlananonfunexecuteQueryexecuteQueryexecuteQueryexecuteQuery1.apply(SparkPlan.scala:137) at org.apache.spark.rdd.RDDOperationScope.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117) at org.apache.spark.sql.execution.QueryExecution.toRddlzycompute(QueryExecution.scala:65) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230) ... 48 elided


==== [[orc]] orc method

[source, scala]

orc(path: String): DataFrame orc(paths: String*): DataFrame


Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.

TIP: Read https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC[ORC Files] document to learn about the ORC file format.

==== [[text]] text method

text method loads a text file.

[source, scala]

text(path: String): DataFrame text(paths: String*): DataFrame


===== [[text-example]] Example

[source, scala]

val lines: Dataset[String] = spark.read.text("README.md").as[String]

scala> lines.show +--------------------+ | value| +--------------------+ | # Apache Spark| | | |Spark is a fast a...| |high-level APIs i...| |supports general ...| |rich set of highe...| |MLlib for machine...| |and Spark Streami...| | | |<http://spark.apa...| | | | | |## Online Documen...| | | |You can find the ...| |guide, on the [pr...| |and [project wiki...| |This README file ...| | | | ## Building Spark| +--------------------+ only showing top 20 rows


=== [[table]][[creating-dataframes-from-tables]] Loading Table to DataFrame -- table Method

[source, scala]

table(tableName: String): DataFrame

table loads the content of the tableName table into an untyped spark-sql-DataFrame.md[DataFrame].

[source, scala]

scala> spark.catalog.tableExists("t1") res1: Boolean = true

// t1 exists in the catalog // let's load it val t1 = spark.read.table("t1")


NOTE: table simply passes the call to SparkSession.md#table[SparkSession.table] after making sure that a <> has not been specified.

=== [[jdbc]] Loading Data From External Table using JDBC Data Source -- jdbc Method

[source, scala]

jdbc(url: String, table: String, properties: Properties): DataFrame jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame


jdbc loads data from an external table using the JDBC data source.

Internally, jdbc creates a spark-sql-JDBCOptions.md#creating-instance[JDBCOptions] from the input url, table and extraOptions with connectionProperties.

jdbc then creates one JDBCPartition per predicates.

In the end, jdbc requests the <> to SparkSession.md#baseRelationToDataFrame[create a DataFrame] for a JDBCRelation (with JDBCPartitions and JDBCOptions created earlier).

[NOTE]

jdbc does not support a custom <> and throws an AnalysisException if defined:

User specified schema not supported with `[jdbc]`

NOTE: jdbc method uses java.util.Properties (and appears overly Java-centric). Use <> instead.

TIP: Review the exercise exercises/spark-exercise-dataframe-jdbc-postgresql.md[Creating DataFrames from Tables using JDBC and PostgreSQL].

=== [[textFile]] Loading Datasets From Text Files -- textFile Method

[source, scala]

textFile(path: String): Dataset[String] textFile(paths: String*): Dataset[String]


textFile loads one or many text files into a typed Dataset.md[Dataset[String]].

[source, scala]

import org.apache.spark.sql.SparkSession val spark: SparkSession = ...

import org.apache.spark.sql.Dataset val lines: Dataset[String] = spark .read .textFile("README.md")


NOTE: textFile are similar to <> family of methods in that they both read text files but text methods return untyped DataFrame while textFile return typed Dataset[String].

Internally, textFile passes calls on to <> method and Dataset.md#select[selects] the only value column before it applies Encoders.STRING encoder.

=== [[creating-instance]] Creating DataFrameReader Instance

DataFrameReader takes the following to be created:

  • [[sparkSession]] <>

DataFrameReader initializes the <>.

=== [[loadV1Source]] Loading Dataset (Data Source API V1) -- loadV1Source Internal Method

[source, scala]

loadV1Source(paths: String*): DataFrame

loadV1Source creates a DataSource and requests it to resolve the underlying relation (as a BaseRelation).

In the end, loadV1Source requests <> to SparkSession.md#baseRelationToDataFrame[create a DataFrame from the BaseRelation].

NOTE: loadV1Source is used when DataFrameReader is requested to <>.

Loading Data

load(): DataFrame
load(
  path: String): DataFrame
load(
  paths: String*): DataFrame

load loads a dataset from a data source (with optional support for multiple paths) as an untyped spark-sql-DataFrame.md[DataFrame].

Internally, load lookupDataSource for the <>. load then branches off per its type (i.e. whether it is of DataSourceV2 marker type or not).

For a "DataSource V2" data source, load...FIXME

Otherwise, if the <> is not a "DataSource V2" data source, load simply <>.

load throws a AnalysisException when the <> is hive.

Hive data source can only be used with tables, you can not read files of Hive data source directly.

=== [[assertNoSpecifiedSchema]] assertNoSpecifiedSchema Internal Method

[source, scala]

assertNoSpecifiedSchema(operation: String): Unit

assertNoSpecifiedSchema throws a AnalysisException if the <> is defined.

User specified schema not supported with `[operation]`

NOTE: assertNoSpecifiedSchema is used when DataFrameReader is requested to load data using <>, <> and <>.

=== [[verifyColumnNameOfCorruptRecord]] verifyColumnNameOfCorruptRecord Internal Method

[source, scala]

verifyColumnNameOfCorruptRecord( schema: StructType, columnNameOfCorruptRecord: String): Unit


verifyColumnNameOfCorruptRecord...FIXME

NOTE: verifyColumnNameOfCorruptRecord is used when DataFrameReader is requested to load data using <> and <>.

=== [[source]] Input Data Source -- source Internal Property

[source, scala]

source: String

source is the name of the input data source (aka format or provider) that will be used to <>.

In other words, the <> is simply to describe the input data source.

The default data source is <> per spark.sql.sources.default configuration property.

source is usually specified using <> method.

[IMPORTANT]

source must not be hive or an AnalysisException is thrown:

Hive data source can only be used with tables, you can not read files of Hive data source directly.

Once defined explicitly (using <> method) or implicitly (spark.sql.sources.default configuration property), source is resolved using DataSource utility.

NOTE: source is used exclusively when DataFrameReader is requested to <> (explicitly or using <>).

=== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| extraOptions a| [[extraOptions]]

Used when...FIXME

| userSpecifiedSchema | [[userSpecifiedSchema]] Optional used-specified schema (default: None, i.e. undefined)

Set when DataFrameReader is requested to <>, <>, <> (when creating a DataSource), and load a data using <> and <> file formats

Used when DataFrameReader is requested to <> (while loading data using <>, <> and <>)

|===


Last update: 2020-11-15