DataFrameWriter[T] is a high-level API for Spark SQL developers to describe "write path" of a structured query (over rows of
DataFrameWriter is used to describe an output node in a data processing graph.
DataFrameWriter ends description of a write specification and does trigger a Spark job (unlike DataFrameWriter).
DataFrameWriter is available using Dataset.write operator.
DataFrameWriter takes the following to be created:
assert(df.isInstanceOf[Dataset[_]]) val writer = df.write import org.apache.spark.sql.DataFrameWriter assert(writer.isInstanceOf[DataFrameWriter])
Name of Data Source¶
source is a short name (alias) or a fully-qualified class name to identify the data source to write data to.
source can be specified using
format( source: String): DataFrameWriter[T]
Default: spark.sql.sources.default configuration property
insertInto( tableName: String): Unit
insertInto asserts that write is not bucketed (with insertInto operation name).
Modern Insert Path (CatalogPlugin)¶
insertInto( catalog: CatalogPlugin, ident: Identifier): Unit
Legacy Insert Path (TableIdentifier)¶
insertInto( tableIdent: TableIdentifier): Unit
insertInto throws an
AnalysisException when the partitioningColumns are defined:
insertInto() can't be used together with partitionBy(). Partition columns have already been defined for the table. It is not necessary to use partitionBy().
saveAsTable( tableName: String): Unit
Modern saveAsTable with TableCatalog¶
saveAsTable( catalog: TableCatalog, ident: Identifier, nameParts: Seq[String]): Unit
Legacy saveAsTable with TableIdentifier¶
saveAsTable( tableIdent: TableIdentifier): Unit
saveAsTable saves the content of a
DataFrame to the
saveAsTable throws an
AnalysisException when no catalog could handle the table identifier:
Couldn't find a catalog to handle the identifier [tableName].
val ids = spark.range(5) ids.write. option("path", "/tmp/five_ids"). saveAsTable("five_ids") // Check out if saveAsTable as five_ids was successful val q = spark.catalog.listTables.filter($"name" === "five_ids") scala> q.show +--------+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +--------+--------+-----------+---------+-----------+ |five_ids| default| null| EXTERNAL| false| +--------+--------+-----------+---------+-----------+
save(): Unit save( path: String): Unit
DataFrame (the result of executing a structured query) to a data source.
val df: DataFrame = ??? df.sparkSession.sessionState.conf
save throws an
AnalysisException when requested to save to Hive data source (the source is
Hive data source can only be used with tables, you can not write files of Hive data source directly.
'[operation]' does not support bucketing right now
Looking up TableProvider¶
lookupV2Provider is used when:
mode( saveMode: SaveMode): DataFrameWriter[T] mode( saveMode: String): DataFrameWriter[T]
mode defines the behaviour of save when an external file or table Spark writes to already exists.
|Append||Records are appended to an existing data|
|ErrorIfExists||Exception is thrown if the target exists|
|Ignore||Do not save the records and not change the existing data in any way|
|Overwrite||Existing data is overwritten by new records|
sortBy must be used together with bucketBy
Executing Logical Command for Writing to Data Source V1¶
saveToV1Source creates a DataSource (for the source class name, the partitioningColumns and the extraOptions) and requests it for the logical command for writing (with the mode and the analyzed logical plan of the structured query).
While requesting the analyzed logical plan of the structured query,
saveToV1Source triggers execution of logical commands.
In the end,
saveToV1Source runs the logical command for writing.
The logical command for writing can be one of the following:
saveToV1Source is used when
DataFrameWriter is requested to save the rows of a structured query (a DataFrame) to a data source.
Executing Logical Command(s)¶
runCommand( session: SparkSession, name: String)( command: LogicalPlan): Unit
runCommand records the current time (start time) and uses the
SQLExecution helper object to execute the action (under a new execution id) that simply requests the
QueryExecution for the RDD[InternalRow] (and triggers execution of logical commands).
Use web UI's SQL tab to see the execution or a
SparkListener to be notified when the execution is started and finished. The
SparkListener should intercept
runCommand records the current time (end time).
In case of any exceptions,
runCommand requests the
ExecutionListenerManager to onFailure (with the exception) and (re)throws it.
runCommand is used when:
DataFrameWriteris requested to save the rows of a structured query (a DataFrame) to a data source (and indirectly executing a logical command for writing to a data source V1), insert the rows of a structured streaming (a DataFrame) into a table and create a table (that is used exclusively for saveAsTable)
createTable( tableIdent: TableIdentifier): Unit
createTable is used when:
DataFrameWriteris requested to saveAsTable