Skip to content

ExecutorPodsAllocator

ExecutorPodsAllocator is responsible for allocating pods for executors (possibly dynamic) in a Spark application.

ExecutorPodsAllocator is used to create a KubernetesClusterSchedulerBackend.

Creating Instance

ExecutorPodsAllocator takes the following to be created:

ExecutorPodsAllocator is created when:

Executor Pod Allocation Timeout

ExecutorPodsAllocator defines Executor Pod Allocation Timeout that is the maximum of the following values:

ExecutorPodsAllocator uses the allocation timeout to detect "old" executor pod requests when handling executor pods snapshots.

spark.dynamicAllocation.enabled

ExecutorPodsAllocator uses spark.dynamicAllocation.enabled configuration property to turn dynamic allocation of executors on and off.

The Internals of Apache Spark

Learn more about Dynamic Allocation of Executors in The Internals of Apache Spark.

Driver Pod

driverPod: Option[Pod]

driverPod is a driver pod with the name of spark.kubernetes.driver.pod.name configuration property (if defined).

ExecutorPodsAllocator throws a SparkException when the driver pod could not be found in a Kubernetes cluster:

No pod was found named [kubernetesDriverPodName] in the cluster in the namespace [namespace] (this was supposed to be the driver pod.).

spark.kubernetes.driver.pod.name

ExecutorPodsAllocator uses spark.kubernetes.driver.pod.name configuration property to look up the driver pod by name when created.

spark.kubernetes.allocation.batch.size

ExecutorPodsAllocator uses spark.kubernetes.allocation.batch.size configuration property when allocating executor pods.

spark.kubernetes.allocation.batch.delay

ExecutorPodsAllocator uses spark.kubernetes.allocation.batch.delay configuration property for the following:

spark.kubernetes.executor.deleteOnTermination

ExecutorPodsAllocator uses spark.kubernetes.executor.deleteOnTermination configuration property.

Starting

start(
  applicationId: String): Unit

start requests the ExecutorPodsSnapshotsStore to subscribe this ExecutorPodsAllocator to be notified about new snapshots (with pod allocation delay based on spark.kubernetes.allocation.batch.delay configuration property).

start is used when:

  • KubernetesClusterSchedulerBackend is requested to start

Processing Executor Pods Snapshots

onNewSnapshots(
  applicationId: String,
  snapshots: Seq[ExecutorPodsSnapshot]): Unit

onNewSnapshots removes the executor IDs (of the executor pods in the given snapshots) from the newlyCreatedExecutors internal registry.

For the remaining executor IDs in the newlyCreatedExecutors internal registry, onNewSnapshots finds timed-out executor IDs whose creation time exceeded some podCreationTimeout threshold. For the other executor IDs, onNewSnapshots prints out the following DEBUG message to the logs:

Executor with id [execId] was not found in the Kubernetes cluster since it was created [time] milliseconds ago.

For any timed-out executor IDs, onNewSnapshots prints out the following WARN message to the logs:

Executors with ids [ids] were not detected in the Kubernetes cluster after [podCreationTimeout] ms despite the fact that a previous allocation attempt tried to create them. The executors may have been deleted but the application missed the deletion event.

onNewSnapshots removes (forgets) the timed-out executor IDs (from the newlyCreatedExecutors internal registry). With the spark.kubernetes.executor.deleteOnTermination configuration property enabled, onNewSnapshots requests the KubernetesClient to delete pods with the following labels:

  • spark-app-selector with the given applicationId
  • spark-role=executor
  • spark-exec-id for all timed-out executor IDs

onNewSnapshots updates the lastSnapshot internal registry with the last ExecutorPodsSnapshot among the given snapshots if available.

onNewSnapshots...FIXME

Requesting Executors from Kubernetes

requestNewExecutors(
  expected: Int,
  running: Int,
  applicationId: String,
  resourceProfileId: Int): Unit

requestNewExecutors determines the number of executor pods to allocate based on the given expected and running and spark.kubernetes.allocation.batch.size configuration property.

requestNewExecutors prints out the following INFO message to the logs:

Going to request [numExecutorsToAllocate] executors from Kubernetes for ResourceProfile Id: [resourceProfileId], target: [expected] running: [running].

For every new executor pod, requestNewExecutors does the following:

  1. Increments the executor ID counter
  2. Creates a KubernetesExecutorConf for the executor ID, the given applicationId and resourceProfileId, and the driver pod
  3. Requests the KubernetesExecutorBuilder to build the pod spec for executors
  4. Requests the KubernetesClient to create an executor pod with an executor container attached
  5. Requests the KubernetesClient to create PersistentVolumeClaim resources if there are any defined (as additional resources) and prints out the following INFO message to the logs:

    Trying to create PersistentVolumeClaim [name] with StorageClass [storageClassName]
    
  6. Registers the new executor ID in the newlyCreatedExecutors registry

  7. Prints out the following DEBUG message to the logs:

    Requested executor with id [newExecutorId] from Kubernetes.
    

In case of any exceptions, requestNewExecutors requests the KubernetesClient to delete the failed executor pod.

Setting Expected Number of Executors per ResourceProfile

setTotalExpectedExecutors(
  resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit

setTotalExpectedExecutors updates the rpIdToResourceProfile and totalExpectedExecutorsPerResourceProfileId internal registries for every ResourceProfile.

setTotalExpectedExecutors prints out the following DEBUG message to the logs:

Set total expected execs to [totalExpectedExecutorsPerResourceProfileId]

With no pending pods, setTotalExpectedExecutors requests the ExecutorPodsSnapshotsStore to notifySubscribers.

setTotalExpectedExecutors is used when:

Registries

newlyCreatedExecutors

newlyCreatedExecutors: Map[Long, Long]

ExecutorPodsAllocator uses newlyCreatedExecutors internal registry to track executor IDs (with the timestamps they were created) that have been requested from Kubernetes but have not been detected in any snapshot yet.

Used in onNewSnapshots

EXECUTOR_ID_COUNTER

ExecutorPodsAllocator uses a Java AtomicLong for the missing executor IDs that are going to be requested (in onNewSnapshots) when...FIXME

hasPendingPods Flag

hasPendingPods: AtomicBoolean

ExecutorPodsAllocator uses a Java AtomicBoolean as a flag to avoid notifying subscribers.

Starts as false and is updated every onNewSnapshots

Used in setTotalExpectedExecutors (only when false)

totalExpectedExecutorsPerResourceProfileId

totalExpectedExecutorsPerResourceProfileId: ConcurrentHashMap[Int, Int]

ExecutorPodsAllocator uses a Java ConcurrentHashMap for...FIXME

A new entry added while changing the total expected executors

Used in onNewSnapshots

rpIdToResourceProfile

rpIdToResourceProfile: HashMap[Int, ResourceProfile]

ExecutorPodsAllocator uses a Java HashMap as a lookup table of ResourceProfiles by ID.

A new entry added while changing the total expected executors

Used in requestNewExecutors

Logging

Enable ALL logging level for org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator=ALL

Refer to Logging.


Last update: 2021-03-12