Skip to content

FileFormatWriter Utility

FileFormatWriter utility is used to write out query result for the following:

Writing Out Query Result

write(
  sparkSession: SparkSession,
  plan: SparkPlan,
  fileFormat: FileFormat,
  committer: FileCommitProtocol,
  outputSpec: OutputSpec,
  hadoopConf: Configuration,
  partitionColumns: Seq[Attribute],
  bucketSpec: Option[BucketSpec],
  statsTrackers: Seq[WriteJobStatsTracker],
  options: Map[String, String]): Set[String]

write creates a Hadoop Job instance (with the given Hadoop Configuration) and uses the following job output classes:

  • Void for keys

  • InternalRow for values

write sets the output directory (for the map-reduce job) to be the outputPath of the given OutputSpec.

write requests the given FileFormat to prepareWrite.

write creates a WriteJobDescription with the following:

write requests the given FileCommitProtocol committer to setupJob.

write executes the given SparkPlan (and generates an RDD). The execution can be directly on the given physical operator if ordering matches the requirements or uses SortExec physical operator (with global flag off).

write runs a Spark job (action) on the RDD with executeTask as the partition function. The result task handler simply requests the given FileCommitProtocol committer to onTaskCommit (with the TaskCommitMessage of a WriteTaskResult) and saves the WriteTaskResult.

write requests the given FileCommitProtocol committer to commitJob (with the Hadoop Job instance and the TaskCommitMessage of all write tasks).

write prints out the following INFO message to the logs:

Write Job [uuid] committed.

write processStats.

write prints out the following INFO message to the logs:

Finished processing stats for write job [uuid].

In the end, write returns all the partition paths that were updated during this write job.

write is used when:

write And Throwables

In case of any Throwable, write prints out the following ERROR message to the logs:

Aborting job [uuid].

write requests the given FileCommitProtocol committer to abortJob (with the Hadoop Job instance).

In the end, write throws a SparkException.

Executing Task

executeTask(
  description: WriteJobDescription,
  jobIdInstant: Long,
  sparkStageId: Int,
  sparkPartitionId: Int,
  sparkAttemptNumber: Int,
  committer: FileCommitProtocol,
  iterator: Iterator[InternalRow]): WriteTaskResult

executeTask...FIXME

Processing WriteTaskStats

processStats(
  statsTrackers: Seq[WriteJobStatsTracker],
  statsPerTask: Seq[Seq[WriteTaskStats]]): Unit

processStats...FIXME

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.FileFormatWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.datasources.FileFormatWriter=ALL

Refer to Logging.


Last update: 2020-11-16