DataWritingSparkTask Partition Processing Function¶
DataWritingSparkTask
is the partition processing function that WriteToDataSourceV2Exec
physical operator uses to schedule a Spark job when requested to execute.
NOTE: The DataWritingSparkTask
partition processing function is executed on executors.
[[logging]] [TIP] ==== Enable INFO
or ERROR
logging levels for org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask=INFO
Refer to <>.¶
=== [[run]] Running Partition Processing Function -- run
Method
[source, scala]¶
run( writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow], useCommitCoordinator: Boolean): WriterCommitMessage
run
requests the given TaskContext
for the IDs of the stage, the stage attempt, the partition, the task attempt, and how many times the task may have been attempted (default 0
).
run
also requests the given TaskContext
for the epoch ID (that is streaming.sql.batchId local property) or defaults to 0
.
run
requests the given DataWriterFactory
to create a DataWriter (with the partition, task and epoch IDs).
For every row in the partition (in the given Iterator[InternalRow]
), run
requests the DataWriter
to write the row.
Once all the rows have been written successfully, run
requests the DataWriter
to commit the write task (<OutputCommitCoordinator
for authorization) that gives the final WriterCommitMessage
.
In the end, run
prints out the following INFO message to the logs:
Committed partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])
In case of any errors, run
prints out the following ERROR message to the logs:
Aborting commit for partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])
run
then requests the DataWriter
to abort the write task.
In the end, run
prints out the following ERROR message to the logs:
Aborted commit for partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])
NOTE: run
is used exclusively when WriteToDataSourceV2Exec
physical operator is requested to <
==== [[run-useCommitCoordinator-enabled]] useCommitCoordinator
Flag Enabled
With the given useCommitCoordinator
flag enabled, run
requests the SparkEnv
for the OutputCommitCoordinator
that is then requested whether to commit the write task output or not (canCommit
).
TIP: Read up on https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-service-outputcommitcoordinator.html[OutputCommitCoordinator] in the https://bit.ly/mastering-apache-spark[Mastering Apache Spark].
If authorized, run
prints out the following INFO message to the logs:
Commit authorized for partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])
In the end, run
requests the DataWriter
to commit the write task.
If not authorized, run
prints out the following INFO message to the logs and throws a CommitDeniedException
.
Commit denied for partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])
==== [[run-useCommitCoordinator-disabled]] useCommitCoordinator
Flag Disabled
With the given useCommitCoordinator
flag disabled, run
prints out the following INFO message to the logs:
Writer for partition [partId] is committing.
In the end, run
requests the DataWriter
to commit the write task.