Skip to content

DataFrameWriter

DataFrameWriter[T] is a high-level API for Spark SQL developers to describe "write path" of a structured query over rows of T type (i.e. how to save a Dataset to a data source).

DataFrameWriter is available using Dataset.write operator.

Creating Instance

DataFrameWriter takes the following to be created:

DataFrameWriter is created when:

DataFrame

When created, DataFrameWriter converts the Dataset to a DataFrame.

Name of Data Source

source: String

source is a short name (alias) or a fully-qualified class name to identify the data source to write data to.

source can be specified using format method:

format(
  source: String): DataFrameWriter[T]

Default: spark.sql.sources.default configuration property

insertInto

insertInto(
  tableName: String): Unit

insertInto requests the DataFrame for the SparkSession.

insertInto tries to look up the TableProvider for the data source.

insertInto requests the ParserInterface to parse the tableName identifier (possibly multi-part).

In the end, insertInto uses the modern or the legacy insert paths based on...FIXME

DataFrameWrite.insertInto Executes SQL Command (as a Spark job)

insertInto asserts that write is not bucketed (with insertInto operation name).

Note

saveAsTable and insertInto are structurally alike.

Modern Insert Path (CatalogPlugin)

insertInto(
  catalog: CatalogPlugin,
  ident: Identifier): Unit

insertInto...FIXME

Legacy Insert Path (TableIdentifier)

insertInto(
  tableIdent: TableIdentifier): Unit

insertInto...FIXME

AnalysisException

insertInto throws an AnalysisException when the partitioningColumns are defined:

insertInto() can't be used together with partitionBy(). Partition columns have already been defined for the table. It is not necessary to use partitionBy().

saveAsTable

saveAsTable(
  tableName: String): Unit

saveAsTable requests the DataFrame for the SparkSession.

saveAsTable tries to look up the TableProvider for the data source.

saveAsTable requests the ParserInterface to parse the tableName identifier (possibly multi-part).

In the end, saveAsTable uses the modern or the legacy save paths based on...FIXME

Note

saveAsTable and insertInto are structurally alike.

Modern saveAsTable Path (TableCatalog)

saveAsTable(
  catalog: TableCatalog,
  ident: Identifier,
  nameParts: Seq[String]): Unit

Legacy saveAsTable Path (TableIdentifier)

saveAsTable(
  tableIdent: TableIdentifier): Unit

saveAsTable saves the content of a DataFrame to the tableName table.

AnalysisException

saveAsTable throws an AnalysisException when no catalog could handle the table identifier:

Couldn't find a catalog to handle the identifier [tableName].

Demo

val ids = spark.range(5)
ids.write.
  option("path", "/tmp/five_ids").
  saveAsTable("five_ids")

// Check out if saveAsTable as five_ids was successful
val q = spark.catalog.listTables.filter($"name" === "five_ids")
scala> q.show
+--------+--------+-----------+---------+-----------+
|    name|database|description|tableType|isTemporary|
+--------+--------+-----------+---------+-----------+
|five_ids| default|       null| EXTERNAL|      false|
+--------+--------+-----------+---------+-----------+

save

save(): Unit
save(
  path: String): Unit

Saves a DataFrame (the result of executing a structured query) to a data source.

Internally, save uses DataSource to look up the class of the requested data source (for the source option and the SQLConf).

Note

save uses SparkSession to access the SessionState and in turn the SQLConf.

val df: DataFrame = ???
df.sparkSession.sessionState.conf

save...FIXME

save throws an AnalysisException when requested to save to Hive data source (the source is hive):

Hive data source can only be used with tables, you can not write files of Hive data source directly.

save throws an AnalysisException when bucketing is used (the numBuckets or sortColumnNames options are defined):

'[operation]' does not support bucketing right now

Looking up TableProvider

lookupV2Provider(): Option[TableProvider]

lookupV2Provider tries to look up a TableProvider for the source.

lookupV2Provider explicitly excludes FileDataSourceV2-based data sources (due to SPARK-28396).

lookupV2Provider is used when:

Review Me

[[methods]] .DataFrameWriter API / Writing Operators [cols="1,2",options="header",width="100%"] |=== | Method | Description

| <> a| [[bucketBy]]

[source, scala]

bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]

| <> a| [[csv]]

[source, scala]

csv(path: String): Unit

| <> a| [[format]]

[source, scala]

format(source: String): DataFrameWriter[T]

| <> a| [[jdbc]]

[source, scala]

jdbc(url: String, table: String, connectionProperties: Properties): Unit

| <> a| [[json]]

[source, scala]

json(path: String): Unit

| <> a| [[mode]]

[source, scala]

mode(saveMode: SaveMode): DataFrameWriter[T] mode(saveMode: String): DataFrameWriter[T]


| <> a| [[option]]

[source, scala]

option(key: String, value: String): DataFrameWriter[T] option(key: String, value: Boolean): DataFrameWriter[T] option(key: String, value: Long): DataFrameWriter[T] option(key: String, value: Double): DataFrameWriter[T]


| <> a| [[options]]

[source, scala]

options(options: scala.collection.Map[String, String]): DataFrameWriter[T]

| <> a| [[orc]]

[source, scala]

orc(path: String): Unit

| <> a| [[parquet]]

[source, scala]

parquet(path: String): Unit

| <> a| [[partitionBy]]

[source, scala]

partitionBy(colNames: String*): DataFrameWriter[T]

| <> a| [[sortBy]]

[source, scala]

sortBy(colName: String, colNames: String*): DataFrameWriter[T]

| <> a| [[text]]

[source, scala]

text(path: String): Unit

|===

DataFrameWriter is available using <> operator.

[source, scala]

scala> :type df org.apache.spark.sql.DataFrame

val writer = df.write

scala> :type writer org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row]


DataFrameWriter supports many <> and <>. It also allows for plugging in <>.

DataFrameWriter defaults to <> data source format. You can change the default format using spark.sql.sources.default configuration property or <> or the format-specific methods.

// see above for writer definition

// Save dataset in Parquet format
writer.save(path = "nums")

// Save dataset in JSON format
writer.format("json").save(path = "nums-json")

// Alternatively, use format-specific method
write.json(path = "nums-json")

In the end, you trigger the actual saving of the content of a Dataset (i.e. the result of executing a structured query) using <> method.

[source, scala]

writer.save

[[internal-state]] DataFrameWriter uses <> to build a properly-defined "write specification" for <>, <> and <> methods.

[[internal-attributes-and-corresponding-setters]] .Internal Attributes and Corresponding Setters [cols="1m,2",options="header"] |=== | Attribute | Setters

| source | [[source]] <>

| mode | [[mode-internal-property]] <>

| extraOptions | [[extraOptions]] <>, <>, <>

| partitioningColumns | [[partitioningColumns]] <>

| bucketColumnNames | [[bucketColumnNames]] <>

| numBuckets | [[numBuckets]] <>

| sortColumnNames | [[sortColumnNames]] <> |===

[[df]] NOTE: DataFrameWriter is a type constructor in Scala that keeps an internal reference to the source DataFrame for the whole lifecycle (starting right from the moment it was created).

NOTE: Spark Structured Streaming's DataStreamWriter is responsible for writing the content of streaming Datasets in a streaming fashion.

=== [[runCommand]] Executing Logical Command(s) -- runCommand Internal Method

[source, scala]

runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit

runCommand uses the input SparkSession to access the <> that is in turn requested to <> (that simply creates a QueryExecution).

runCommand records the current time (start time) and uses the SQLExecution helper object to execute the action (under a new execution id) that simply requests the QueryExecution for the RDD[InternalRow] (and triggers execution of logical commands).

TIP: Use web UI's SQL tab to see the execution or a SparkListener to be notified when the execution is started and finished. The SparkListener should intercept SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events.

runCommand records the current time (end time).

In the end, runCommand uses the input SparkSession to access the <> and requests it to <> (with the input name, the QueryExecution and the duration).

In case of any exceptions, runCommand requests the ExecutionListenerManager to <> (with the exception) and (re)throws it.

NOTE: runCommand is used when DataFrameWriter is requested to <> (and indirectly <>), <> and <> (that is used exclusively for <>).

=== [[jdbc-internals]] Saving Data to Table Using JDBC Data Source -- jdbc Method

[source, scala]

jdbc(url: String, table: String, connectionProperties: Properties): Unit

jdbc method saves the content of the DataFrame to an external database table via JDBC.

You can use <> to control save mode, i.e. what happens when an external table exists when save is executed.

It is assumed that the jdbc save pipeline is not <> and <>.

All <> are overriden by the input connectionProperties.

The required options are:

  • driver which is the class name of the JDBC driver (that is passed to Spark's own DriverRegistry.register and later used to connect(url, properties)).

When table exists and the <> is in use, DROP TABLE table is executed.

It creates the input table (using CREATE TABLE table (schema) where schema is the schema of the DataFrame).

=== [[bucketBy-internals]] bucketBy Method

[source, scala]

bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]

bucketBy simply sets the internal <> and <> to the input numBuckets and colName with colNames, respectively.

[source, scala]

val df = spark.range(5) import org.apache.spark.sql.DataFrameWriter val writer: DataFrameWriter[java.lang.Long] = df.write

val bucketedTable = writer.bucketBy(numBuckets = 8, "col1", "col2")

scala> :type bucketedTable org.apache.spark.sql.DataFrameWriter[Long]


=== [[partitionBy]] partitionBy Method

[source, scala]

partitionBy(colNames: String*): DataFrameWriter[T]

CAUTION: FIXME

=== [[mode-internals]] Specifying Save Mode -- mode Method

[source, scala]

mode(saveMode: String): DataFrameWriter[T] mode(saveMode: SaveMode): DataFrameWriter[T]


mode defines the behaviour of <> when an external file or table (Spark writes to) already exists, i.e. SaveMode.

SaveMode

[cols="1,2",options="header",width="100%"] |=== | Name | Description

| Append | [[Append]] Records are appended to existing data.

| ErrorIfExists | [[ErrorIfExists]] Exception is thrown.

| Ignore | [[Ignore]] Do not save the records and not change the existing data in any way.

| Overwrite | [[Overwrite]] Existing data is overwritten by new records. |===

=== [[sortBy-internals]] Specifying Sorting Columns -- sortBy Method

[source, scala]

sortBy(colName: String, colNames: String*): DataFrameWriter[T]

sortBy simply sets <> to the input colName and colNames column names.

NOTE: sortBy must be used together with <> or DataFrameWriter reports an IllegalArgumentException.

NOTE: <> asserts that bucketing is not used by some methods.

=== [[option-internals]] Specifying Writer Configuration -- option Method

[source, scala]

option(key: String, value: Boolean): DataFrameWriter[T] option(key: String, value: Double): DataFrameWriter[T] option(key: String, value: Long): DataFrameWriter[T] option(key: String, value: String): DataFrameWriter[T]


option...FIXME

=== [[options-internals]] Specifying Writer Configuration -- options Method

[source, scala]

options(options: scala.collection.Map[String, String]): DataFrameWriter[T]

options...FIXME

=== [[writing-dataframes-to-files]] Writing DataFrames to Files

CAUTION: FIXME

=== [[format-internals]] Specifying Data Source (by Alias or Fully-Qualified Class Name) -- format Method

[source, scala]

format(source: String): DataFrameWriter[T]

format simply sets the <> internal property.

=== [[parquet]] Parquet

CAUTION: FIXME

NOTE: Parquet is the default data source format.

=== [[getBucketSpec]] getBucketSpec Internal Method

[source, scala]

getBucketSpec: Option[BucketSpec]

getBucketSpec returns a new <> if <> was defined (with <> and <>).

getBucketSpec throws an IllegalArgumentException when <> are not defined when <> are.

sortBy must be used together with bucketBy

NOTE: getBucketSpec is used exclusively when DataFrameWriter is requested to <>.

Creating Table

createTable(
  tableIdent: TableIdentifier): Unit

createTable builds a CatalogStorageFormat per extraOptions.

createTable assumes the table being external when location URI of CatalogStorageFormat is defined, and managed otherwise.

createTable creates a CatalogTable (with the bucketSpec per getBucketSpec).

In the end, createTable creates a <> logical command (with the CatalogTable, <> and the <> of the <>) and <> it.

NOTE: createTable is used when DataFrameWriter is requested to <>.

=== [[assertNotBucketed]] assertNotBucketed Internal Method

[source, scala]

assertNotBucketed(operation: String): Unit

assertNotBucketed simply throws an AnalysisException if either <> or <> internal property is defined:

'[operation]' does not support bucketing right now

NOTE: assertNotBucketed is used when DataFrameWriter is requested to <>, <> and <>.

Executing Logical Command for Writing to Data Source V1

saveToV1Source(): Unit

saveToV1Source creates a DataSource (for the source class name, the partitioningColumns and the extraOptions) and requests it for the logical command for writing (with the mode and the analyzed logical plan of the structured query).

Note

While requesting the analyzed logical plan of the structured query, saveToV1Source triggers execution of logical commands.

In the end, saveToV1Source runs the logical command for writing.

Note

The logical command for writing can be one of the following:

saveToV1Source is used when DataFrameWriter is requested to save the rows of a structured query (a DataFrame) to a data source.

=== [[assertNotPartitioned]] assertNotPartitioned Internal Method

[source, scala]

assertNotPartitioned(operation: String): Unit

assertNotPartitioned...FIXME

NOTE: assertNotPartitioned is used when...FIXME

=== [[csv-internals]] csv Method

[source, scala]

csv(path: String): Unit

csv...FIXME

=== [[json-internals]] json Method

[source, scala]

json(path: String): Unit

json...FIXME

=== [[orc-internals]] orc Method

[source, scala]

orc(path: String): Unit

orc...FIXME

=== [[parquet-internals]] parquet Method

[source, scala]

parquet(path: String): Unit

parquet...FIXME

=== [[text-internals]] text Method

[source, scala]

text(path: String): Unit

text...FIXME

=== [[partitionBy]] partitionBy Method

[source, scala]

partitionBy(colNames: String*): DataFrameWriter[T]

partitionBy simply sets the <> internal property.


Last update: 2020-12-29