Skip to content

ExecutorPodsAllocator

ExecutorPodsAllocator is responsible for allocating pods for executors (possibly dynamic) in a Spark application.

ExecutorPodsAllocator is used to create a KubernetesClusterSchedulerBackend.

Creating Instance

ExecutorPodsAllocator takes the following to be created:

ExecutorPodsAllocator is created when:

spark.dynamicAllocation.enabled

ExecutorPodsAllocator uses spark.dynamicAllocation.enabled configuration property to turn dynamic allocation of executors on and off.

The Internals of Apache Spark

Learn more about Dynamic Allocation of Executors in The Internals of Apache Spark.

Driver Pod

driverPod: Option[Pod]

driverPod is a driver pod with the name of spark.kubernetes.driver.pod.name configuration property (if defined).

ExecutorPodsAllocator throws a SparkException when the driver pod could not be found in a Kubernetes cluster:

No pod was found named [kubernetesDriverPodName] in the cluster in the namespace [namespace] (this was supposed to be the driver pod.).

spark.kubernetes.driver.pod.name

ExecutorPodsAllocator uses spark.kubernetes.driver.pod.name configuration property to look up the driver pod by name when created.

spark.kubernetes.allocation.batch.size

ExecutorPodsAllocator uses spark.kubernetes.allocation.batch.size configuration property in the following:

spark.kubernetes.allocation.batch.delay

ExecutorPodsAllocator uses spark.kubernetes.allocation.batch.delay configuration property for the following:

spark.kubernetes.executor.deleteOnTermination

ExecutorPodsAllocator uses spark.kubernetes.executor.deleteOnTermination configuration property.

Starting

start(
  applicationId: String): Unit

start requests the ExecutorPodsSnapshotsStore to subscribe this ExecutorPodsAllocator to be notified about new snapshots (with pod allocation delay based on spark.kubernetes.allocation.batch.delay configuration property).

start is used when:

  • KubernetesClusterSchedulerBackend is requested to start

Processing Executor Pods Snapshots

onNewSnapshots(
  applicationId: String,
  snapshots: Seq[ExecutorPodsSnapshot]): Unit

onNewSnapshots removes the executor IDs (of the executor pods in the given snapshots) from the newlyCreatedExecutors internal registry.

onNewSnapshots finds timed-out executor IDs (in the newlyCreatedExecutors internal registry) whose creation time exceeded some podCreationTimeout threshold. For the other executor IDs, onNewSnapshots prints out the following DEBUG message to the logs:

Executor with id [execId] was not found in the Kubernetes cluster since it was created [time] milliseconds ago.

For any timed-out executor IDs, onNewSnapshots prints out the following WARN message to the logs:

Executors with ids [ids] were not detected in the Kubernetes cluster after [podCreationTimeout] ms despite the fact that a previous allocation attempt tried to create them. The executors may have been deleted but the application missed the deletion event.

onNewSnapshots removes (forgets) the timed-out executor IDs (from the newlyCreatedExecutors internal registry). With the shouldDeleteExecutors flag enabled, onNewSnapshots requests the KubernetesClient to delete pods with the following labels:

  • spark-app-selector with the given applicationId
  • spark-role=executor
  • spark-exec-id for all timed-out executor IDs

onNewSnapshots updates the lastSnapshot internal registry with the last ExecutorPodsSnapshot among the given snapshots if available.

onNewSnapshots counts running executor pods in the lastSnapshot internal registry.

onNewSnapshots finds pending executor IDs in the lastSnapshot internal registry.

For non-empty input snapshots, onNewSnapshots prints out the following DEBUG message to the logs:

Pod allocation status: [currentRunningCount] running, [currentPendingExecutors] pending, [newlyCreatedExecutors] unacknowledged.

onNewSnapshots...FIXME

In the end, with DEBUG logging enabled or the input snapshots is empty, onNewSnapshots prints out the following DEBUG messages.

With the number of the executor pods currently running higher than the total expected executors but no dynamicAllocationEnabled, onNewSnapshots prints out the following:

Current number of running executors is equal to the number of requested executors. Not scaling up further.

Otherwise, when there are executor pods pending (outstanding), onNewSnapshots prints out the following:

Still waiting for [outstanding] executors before requesting more.

Total Expected Executors

totalExpectedExecutors: AtomicInteger

ExecutorPodsAllocator uses a Java AtomicInteger to track the total expected number of executors.

Starts from 0 and is set to a fixed number of the total expected executors in setTotalExpectedExecutors

Used in onNewSnapshots

Changing Total Expected Executors

setTotalExpectedExecutors(
  total: Int): Unit

setTotalExpectedExecutors sets totalExpectedExecutors internal registry to the input total.

With no hasPendingPods, setTotalExpectedExecutors requests the ExecutorPodsSnapshotsStore to notifySubscribers.

setTotalExpectedExecutors is used when:

Registries

newlyCreatedExecutors

newlyCreatedExecutors: Map[Long, Long]

ExecutorPodsAllocator uses newlyCreatedExecutors internal registry to track executor IDs (with the timestamps they were created) that have been requested from Kubernetes but have not been detected in any snapshot yet.

Used in onNewSnapshots

EXECUTOR_ID_COUNTER

ExecutorPodsAllocator uses a Java AtomicLong for the missing executor IDs that are going to be requested (in onNewSnapshots) when the number of running executor pods is below the total expected executors.

hasPendingPods Flag

hasPendingPods: AtomicBoolean

ExecutorPodsAllocator uses a Java AtomicBoolean as a flag to avoid notifying subscribers.

Starts as false and is updated every onNewSnapshots

Used in setTotalExpectedExecutors (only when false)

Logging

Enable ALL logging level for org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator=ALL

Refer to Logging.


Last update: 2021-01-13