Skip to content

DataWritingSparkTask Utility

DataWritingSparkTask utility defines a partition processing function that V2TableWriteExec unary physical commands use to schedule a Spark job for writing data out.

DataWritingSparkTask is executed on executors.

Partition Processing Function

run(
  writerFactory: DataWriterFactory,
  context: TaskContext,
  iter: Iterator[InternalRow],
  useCommitCoordinator: Boolean): DataWritingSparkTaskResult

run requests the DataWriterFactory for a DataWriter (for the partition and task of the TaskContext).

run counts all the InternalRows (in iter) and requests the DataWriter to write it out.

After all the rows have been written out successfully, run requests the DataWriter to commit (with or without requesting the OutputCommitCoordinator for authorization) that gives the final WriterCommitMessage.

run prints out the following INFO message to the logs:

Committed partition [partId] (task [taskId], attempt [attemptId], stage [stageId].[stageAttempt])

In the end, run returns a DataWritingSparkTaskResult with the count (of the rows written out) and the final WriterCommitMessage.

Usage

run is used when:

  • V2TableWriteExec unary physical command is requested to writeWithV2

Using CommitCoordinator

With the given useCommitCoordinator flag enabled, run requests the SparkEnv for the OutputCommitCoordinator (Spark Core) that is asked whether to commit the write task output or not (canCommit).

Commit Authorized

If authorized, run prints out the following INFO message to the logs:

Commit authorized for partition [partId] (task [taskId], attempt [attemptId], stage [stageId].[stageAttempt])

Commit Denied

If not authorized, run prints out the following INFO message to the logs and throws a CommitDeniedException.

Commit denied for partition [partId] (task [taskId], attempt [attemptId], stage [stageId].[stageAttempt])

No CommitCoordinator

With the given useCommitCoordinator flag disabled, run prints out the following INFO message to the logs:

Writer for partition [partitionId] is committing.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask=ALL

Refer to Logging.

Back to top