DataFrameWriter¶
DataFrameWriter[T]
is a high-level API for Spark SQL developers to describe "write path" of a structured query over rows of T
type (i.e. how to save a Dataset to a data source).
DataFrameWriter
is available using Dataset.write operator.
Creating Instance¶
DataFrameWriter
takes the following to be created:
DataFrameWriter
is created when:
- Dataset.write operator is used
DataFrame¶
When created, DataFrameWriter
converts the Dataset to a DataFrame.
Name of Data Source¶
source: String
source
is a short name (alias) or a fully-qualified class name to identify the data source to write data to.
source
can be specified using format
method:
format(
source: String): DataFrameWriter[T]
Default: spark.sql.sources.default configuration property
insertInto¶
insertInto(
tableName: String): Unit
insertInto
requests the DataFrame for the SparkSession.
insertInto
tries to look up the TableProvider for the data source.
insertInto
requests the ParserInterface to parse the tableName
identifier (possibly multi-part).
In the end, insertInto
uses the modern or the legacy insert paths based on...FIXME
insertInto
asserts that write is not bucketed (with insertInto operation name).
Note
saveAsTable and insertInto are structurally alike.
Modern Insert Path (CatalogPlugin)¶
insertInto(
catalog: CatalogPlugin,
ident: Identifier): Unit
insertInto
...FIXME
Legacy Insert Path (TableIdentifier)¶
insertInto(
tableIdent: TableIdentifier): Unit
insertInto
...FIXME
AnalysisException¶
insertInto
throws an AnalysisException
when the partitioningColumns are defined:
insertInto() can't be used together with partitionBy(). Partition columns have already been defined for the table. It is not necessary to use partitionBy().
saveAsTable¶
saveAsTable(
tableName: String): Unit
saveAsTable
requests the DataFrame for the SparkSession.
saveAsTable
tries to look up the TableProvider for the data source.
saveAsTable
requests the ParserInterface to parse the tableName
identifier (possibly multi-part).
In the end, saveAsTable
uses the modern or the legacy save paths based on...FIXME
Note
saveAsTable and insertInto are structurally alike.
Modern saveAsTable Path (TableCatalog)¶
saveAsTable(
catalog: TableCatalog,
ident: Identifier,
nameParts: Seq[String]): Unit
Legacy saveAsTable Path (TableIdentifier)¶
saveAsTable(
tableIdent: TableIdentifier): Unit
saveAsTable
saves the content of a DataFrame
to the tableName
table.
AnalysisException¶
saveAsTable
throws an AnalysisException
when no catalog could handle the table identifier:
Couldn't find a catalog to handle the identifier [tableName].
Demo¶
val ids = spark.range(5)
ids.write.
option("path", "/tmp/five_ids").
saveAsTable("five_ids")
// Check out if saveAsTable as five_ids was successful
val q = spark.catalog.listTables.filter($"name" === "five_ids")
scala> q.show
+--------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+--------+--------+-----------+---------+-----------+
|five_ids| default| null| EXTERNAL| false|
+--------+--------+-----------+---------+-----------+
save¶
save(): Unit
save(
path: String): Unit
Saves a DataFrame
(the result of executing a structured query) to a data source.
Internally, save
uses DataSource
to look up the class of the requested data source (for the source option and the SQLConf).
Note
save
uses SparkSession to access the SessionState and in turn the SQLConf.
val df: DataFrame = ???
df.sparkSession.sessionState.conf
save
...FIXME
save
throws an AnalysisException
when requested to save to Hive data source (the source is hive
):
Hive data source can only be used with tables, you can not write files of Hive data source directly.
save
throws an AnalysisException
when bucketing is used (the numBuckets or sortColumnNames options are defined):
'[operation]' does not support bucketing right now
Looking up TableProvider¶
lookupV2Provider(): Option[TableProvider]
lookupV2Provider
tries to look up a TableProvider for the source.
lookupV2Provider
explicitly excludes FileDataSourceV2-based data sources (due to SPARK-28396).
lookupV2Provider
is used when:
DataFrameWriter
is requested to save, insertInto and saveAsTable
Review Me¶
[[methods]] .DataFrameWriter API / Writing Operators [cols="1,2",options="header",width="100%"] |=== | Method | Description
| <
[source, scala]¶
bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]¶
| <
[source, scala]¶
csv(path: String): Unit¶
| <
[source, scala]¶
format(source: String): DataFrameWriter[T]¶
| <
[source, scala]¶
jdbc(url: String, table: String, connectionProperties: Properties): Unit¶
| <
[source, scala]¶
json(path: String): Unit¶
| <
[source, scala]¶
mode(saveMode: SaveMode): DataFrameWriter[T] mode(saveMode: String): DataFrameWriter[T]
| <
[source, scala]¶
option(key: String, value: String): DataFrameWriter[T] option(key: String, value: Boolean): DataFrameWriter[T] option(key: String, value: Long): DataFrameWriter[T] option(key: String, value: Double): DataFrameWriter[T]
| <
[source, scala]¶
options(options: scala.collection.Map[String, String]): DataFrameWriter[T]¶
| <
[source, scala]¶
orc(path: String): Unit¶
| <
[source, scala]¶
parquet(path: String): Unit¶
| <
[source, scala]¶
partitionBy(colNames: String*): DataFrameWriter[T]¶
| <
[source, scala]¶
sortBy(colName: String, colNames: String*): DataFrameWriter[T]¶
| <
[source, scala]¶
text(path: String): Unit¶
|===
DataFrameWriter
is available using <
[source, scala]¶
scala> :type df org.apache.spark.sql.DataFrame
val writer = df.write
scala> :type writer org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row]
DataFrameWriter
supports many <
DataFrameWriter
defaults to <
// see above for writer definition
// Save dataset in Parquet format
writer.save(path = "nums")
// Save dataset in JSON format
writer.format("json").save(path = "nums-json")
// Alternatively, use format-specific method
write.json(path = "nums-json")
In the end, you trigger the actual saving of the content of a Dataset
(i.e. the result of executing a structured query) using <
[source, scala]¶
writer.save¶
[[internal-state]] DataFrameWriter
uses <
[[internal-attributes-and-corresponding-setters]] .Internal Attributes and Corresponding Setters [cols="1m,2",options="header"] |=== | Attribute | Setters
| source | [[source]] <
| mode | [[mode-internal-property]] <
| extraOptions | [[extraOptions]] <
| partitioningColumns | [[partitioningColumns]] <
| bucketColumnNames | [[bucketColumnNames]] <
| numBuckets | [[numBuckets]] <
| sortColumnNames | [[sortColumnNames]] <
[[df]] NOTE: DataFrameWriter
is a type constructor in Scala that keeps an internal reference to the source DataFrame
for the whole lifecycle (starting right from the moment it was created).
NOTE: Spark Structured Streaming's DataStreamWriter
is responsible for writing the content of streaming Datasets in a streaming fashion.
=== [[runCommand]] Executing Logical Command(s) -- runCommand
Internal Method
[source, scala]¶
runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit¶
runCommand
uses the input SparkSession
to access the <
runCommand
records the current time (start time) and uses the SQLExecution
helper object to execute the action (under a new execution id) that simply requests the QueryExecution
for the RDD[InternalRow] (and triggers execution of logical commands).
TIP: Use web UI's SQL tab to see the execution or a SparkListener
to be notified when the execution is started and finished. The SparkListener
should intercept SparkListenerSQLExecutionStart
and SparkListenerSQLExecutionEnd
events.
runCommand
records the current time (end time).
In the end, runCommand
uses the input SparkSession
to access the <name
, the QueryExecution
and the duration).
In case of any exceptions, runCommand
requests the ExecutionListenerManager
to <
NOTE: runCommand
is used when DataFrameWriter
is requested to <
=== [[jdbc-internals]] Saving Data to Table Using JDBC Data Source -- jdbc
Method
[source, scala]¶
jdbc(url: String, table: String, connectionProperties: Properties): Unit¶
jdbc
method saves the content of the DataFrame
to an external database table via JDBC.
You can use <save
is executed.
It is assumed that the jdbc
save pipeline is not <
All <connectionProperties
.
The required options are:
driver
which is the class name of the JDBC driver (that is passed to Spark's ownDriverRegistry.register
and later used toconnect(url, properties)
).
When table
exists and the <DROP TABLE table
is executed.
It creates the input table
(using CREATE TABLE table (schema)
where schema
is the schema of the DataFrame
).
=== [[bucketBy-internals]] bucketBy
Method
[source, scala]¶
bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T]¶
bucketBy
simply sets the internal <numBuckets
and colName
with colNames
, respectively.
[source, scala]¶
val df = spark.range(5) import org.apache.spark.sql.DataFrameWriter val writer: DataFrameWriter[java.lang.Long] = df.write
val bucketedTable = writer.bucketBy(numBuckets = 8, "col1", "col2")
scala> :type bucketedTable org.apache.spark.sql.DataFrameWriter[Long]
=== [[partitionBy]] partitionBy
Method
[source, scala]¶
partitionBy(colNames: String*): DataFrameWriter[T]¶
CAUTION: FIXME
=== [[mode-internals]] Specifying Save Mode -- mode
Method
[source, scala]¶
mode(saveMode: String): DataFrameWriter[T] mode(saveMode: SaveMode): DataFrameWriter[T]
mode
defines the behaviour of <SaveMode
.
SaveMode¶
[cols="1,2",options="header",width="100%"] |=== | Name | Description
| Append
| [[Append]] Records are appended to existing data.
| ErrorIfExists
| [[ErrorIfExists]] Exception is thrown.
| Ignore
| [[Ignore]] Do not save the records and not change the existing data in any way.
| Overwrite
| [[Overwrite]] Existing data is overwritten by new records. |===
=== [[sortBy-internals]] Specifying Sorting Columns -- sortBy
Method
[source, scala]¶
sortBy(colName: String, colNames: String*): DataFrameWriter[T]¶
sortBy
simply sets <colName
and colNames
column names.
NOTE: sortBy
must be used together with <DataFrameWriter
reports an IllegalArgumentException
.
NOTE: <
=== [[option-internals]] Specifying Writer Configuration -- option
Method
[source, scala]¶
option(key: String, value: Boolean): DataFrameWriter[T] option(key: String, value: Double): DataFrameWriter[T] option(key: String, value: Long): DataFrameWriter[T] option(key: String, value: String): DataFrameWriter[T]
option
...FIXME
=== [[options-internals]] Specifying Writer Configuration -- options
Method
[source, scala]¶
options(options: scala.collection.Map[String, String]): DataFrameWriter[T]¶
options
...FIXME
=== [[writing-dataframes-to-files]] Writing DataFrames to Files
CAUTION: FIXME
=== [[format-internals]] Specifying Data Source (by Alias or Fully-Qualified Class Name) -- format
Method
[source, scala]¶
format(source: String): DataFrameWriter[T]¶
format
simply sets the <
=== [[parquet]] Parquet
CAUTION: FIXME
NOTE: Parquet is the default data source format.
=== [[getBucketSpec]] getBucketSpec
Internal Method
[source, scala]¶
getBucketSpec: Option[BucketSpec]¶
getBucketSpec
returns a new <
getBucketSpec
throws an IllegalArgumentException
when <
sortBy must be used together with bucketBy
NOTE: getBucketSpec
is used exclusively when DataFrameWriter
is requested to <
Creating Table¶
createTable(
tableIdent: TableIdentifier): Unit
createTable
builds a CatalogStorageFormat per extraOptions.
createTable
assumes the table being external when location URI of CatalogStorageFormat
is defined, and managed otherwise.
createTable
creates a CatalogTable (with the bucketSpec per getBucketSpec).
In the end, createTable
creates a <CatalogTable
, <
NOTE: createTable
is used when DataFrameWriter
is requested to <
=== [[assertNotBucketed]] assertNotBucketed
Internal Method
[source, scala]¶
assertNotBucketed(operation: String): Unit¶
assertNotBucketed
simply throws an AnalysisException
if either <
'[operation]' does not support bucketing right now
NOTE: assertNotBucketed
is used when DataFrameWriter
is requested to <
Executing Logical Command for Writing to Data Source V1¶
saveToV1Source(): Unit
saveToV1Source
creates a DataSource (for the source class name, the partitioningColumns and the extraOptions) and requests it for the logical command for writing (with the mode and the analyzed logical plan of the structured query).
Note
While requesting the analyzed logical plan of the structured query, saveToV1Source
triggers execution of logical commands.
In the end, saveToV1Source
runs the logical command for writing.
Note
The logical command for writing can be one of the following:
saveToV1Source
is used when DataFrameWriter
is requested to save the rows of a structured query (a DataFrame) to a data source.
=== [[assertNotPartitioned]] assertNotPartitioned
Internal Method
[source, scala]¶
assertNotPartitioned(operation: String): Unit¶
assertNotPartitioned
...FIXME
NOTE: assertNotPartitioned
is used when...FIXME
=== [[csv-internals]] csv
Method
[source, scala]¶
csv(path: String): Unit¶
csv
...FIXME
=== [[json-internals]] json
Method
[source, scala]¶
json(path: String): Unit¶
json
...FIXME
=== [[orc-internals]] orc
Method
[source, scala]¶
orc(path: String): Unit¶
orc
...FIXME
=== [[parquet-internals]] parquet
Method
[source, scala]¶
parquet(path: String): Unit¶
parquet
...FIXME
=== [[text-internals]] text
Method
[source, scala]¶
text(path: String): Unit¶
text
...FIXME
=== [[partitionBy]] partitionBy
Method
[source, scala]¶
partitionBy(colNames: String*): DataFrameWriter[T]¶
partitionBy
simply sets the <