ExecutorPodsAllocator¶
ExecutorPodsAllocator is responsible for allocating pods for executors (possibly dynamic) in a Spark application.
ExecutorPodsAllocator is used to create a KubernetesClusterSchedulerBackend.
Creating Instance¶
ExecutorPodsAllocator takes the following to be created:
-
SparkConf -
SecurityManager - KubernetesExecutorBuilder
-
KubernetesClient - ExecutorPodsSnapshotsStore
-
Clock
ExecutorPodsAllocator is created when:
KubernetesClusterManageris requested for a SchedulerBackend
Executor Pod Allocation Timeout¶
ExecutorPodsAllocator defines Executor Pod Allocation Timeout that is the maximum of the following values:
- 5 times of spark.kubernetes.allocation.batch.delay configuration property
- spark.kubernetes.allocation.executor.timeout configuration property
ExecutorPodsAllocator uses the allocation timeout to detect "old" executor pod requests when handling executor pods snapshots.
spark.dynamicAllocation.enabled¶
ExecutorPodsAllocator uses spark.dynamicAllocation.enabled configuration property to turn dynamic allocation of executors on and off.
The Internals of Apache Spark
Learn more about Dynamic Allocation of Executors in The Internals of Apache Spark.
Driver Pod¶
driverPod: Option[Pod]
driverPod is a driver pod with the name of spark.kubernetes.driver.pod.name configuration property (if defined).
ExecutorPodsAllocator throws a SparkException when the driver pod could not be found in a Kubernetes cluster:
No pod was found named [kubernetesDriverPodName] in the cluster in the namespace [namespace] (this was supposed to be the driver pod.).
spark.kubernetes.driver.pod.name¶
ExecutorPodsAllocator uses spark.kubernetes.driver.pod.name configuration property to look up the driver pod by name when created.
spark.kubernetes.allocation.batch.size¶
ExecutorPodsAllocator uses spark.kubernetes.allocation.batch.size configuration property when allocating executor pods.
spark.kubernetes.allocation.batch.delay¶
ExecutorPodsAllocator uses spark.kubernetes.allocation.batch.delay configuration property for the following:
spark.kubernetes.executor.deleteOnTermination¶
ExecutorPodsAllocator uses spark.kubernetes.executor.deleteOnTermination configuration property.
Starting¶
start(
applicationId: String): Unit
start requests the ExecutorPodsSnapshotsStore to subscribe this ExecutorPodsAllocator to be notified about new snapshots (with pod allocation delay based on spark.kubernetes.allocation.batch.delay configuration property).
start is used when:
KubernetesClusterSchedulerBackendis requested to start
Processing Executor Pods Snapshots¶
onNewSnapshots(
applicationId: String,
snapshots: Seq[ExecutorPodsSnapshot]): Unit
onNewSnapshots removes the executor IDs (of the executor pods in the given snapshots) from the newlyCreatedExecutors internal registry.
For the remaining executor IDs in the newlyCreatedExecutors internal registry, onNewSnapshots finds timed-out executor IDs whose creation time exceeded some podCreationTimeout threshold. For the other executor IDs, onNewSnapshots prints out the following DEBUG message to the logs:
Executor with id [execId] was not found in the Kubernetes cluster since it was created [time] milliseconds ago.
For any timed-out executor IDs, onNewSnapshots prints out the following WARN message to the logs:
Executors with ids [ids] were not detected in the Kubernetes cluster after [podCreationTimeout] ms despite the fact that a previous allocation attempt tried to create them. The executors may have been deleted but the application missed the deletion event.
onNewSnapshots removes (forgets) the timed-out executor IDs (from the newlyCreatedExecutors internal registry). With the spark.kubernetes.executor.deleteOnTermination configuration property enabled, onNewSnapshots requests the KubernetesClient to delete pods with the following labels:
spark-app-selectorwith the givenapplicationIdspark-role=executorspark-exec-idfor all timed-out executor IDs
onNewSnapshots updates the lastSnapshot internal registry with the last ExecutorPodsSnapshot among the given snapshots if available.
onNewSnapshots...FIXME
Requesting Executors from Kubernetes¶
requestNewExecutors(
expected: Int,
running: Int,
applicationId: String,
resourceProfileId: Int): Unit
requestNewExecutors determines the number of executor pods to allocate based on the given expected and running and spark.kubernetes.allocation.batch.size configuration property.
requestNewExecutors prints out the following INFO message to the logs:
Going to request [numExecutorsToAllocate] executors from Kubernetes for ResourceProfile Id: [resourceProfileId], target: [expected] running: [running].
For every new executor pod, requestNewExecutors does the following:
- Increments the executor ID counter
- Creates a KubernetesExecutorConf for the executor ID, the given
applicationIdandresourceProfileId, and the driver pod - Requests the KubernetesExecutorBuilder to build the pod spec for executors
- Requests the KubernetesClient to create an executor pod with an executor container attached
-
Requests the KubernetesClient to create
PersistentVolumeClaimresources if there are any defined (as additional resources) and prints out the following INFO message to the logs:Trying to create PersistentVolumeClaim [name] with StorageClass [storageClassName] -
Registers the new executor ID in the newlyCreatedExecutors registry
-
Prints out the following DEBUG message to the logs:
Requested executor with id [newExecutorId] from Kubernetes.
In case of any exceptions, requestNewExecutors requests the KubernetesClient to delete the failed executor pod.
Setting Expected Number of Executors per ResourceProfile¶
setTotalExpectedExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit
setTotalExpectedExecutors updates the rpIdToResourceProfile and totalExpectedExecutorsPerResourceProfileId internal registries for every ResourceProfile.
setTotalExpectedExecutors prints out the following DEBUG message to the logs:
Set total expected execs to [totalExpectedExecutorsPerResourceProfileId]
With no pending pods, setTotalExpectedExecutors requests the ExecutorPodsSnapshotsStore to notifySubscribers.
setTotalExpectedExecutors is used when:
KubernetesClusterSchedulerBackendis requested to start and doRequestTotalExecutors
Registries¶
newlyCreatedExecutors¶
newlyCreatedExecutors: Map[Long, Long]
ExecutorPodsAllocator uses newlyCreatedExecutors internal registry to track executor IDs (with the timestamps they were created) that have been requested from Kubernetes but have not been detected in any snapshot yet.
Used in onNewSnapshots
EXECUTOR_ID_COUNTER¶
ExecutorPodsAllocator uses a Java AtomicLong for the missing executor IDs that are going to be requested (in onNewSnapshots) when...FIXME
hasPendingPods Flag¶
hasPendingPods: AtomicBoolean
ExecutorPodsAllocator uses a Java AtomicBoolean as a flag to avoid notifying subscribers.
Starts as false and is updated every onNewSnapshots
Used in setTotalExpectedExecutors (only when false)
totalExpectedExecutorsPerResourceProfileId¶
totalExpectedExecutorsPerResourceProfileId: ConcurrentHashMap[Int, Int]
ExecutorPodsAllocator uses a Java ConcurrentHashMap for...FIXME
A new entry added while changing the total expected executors
Used in onNewSnapshots
rpIdToResourceProfile¶
rpIdToResourceProfile: HashMap[Int, ResourceProfile]
ExecutorPodsAllocator uses a Java HashMap as a lookup table of ResourceProfiles by ID.
A new entry added while changing the total expected executors
Used in requestNewExecutors
Logging¶
Enable ALL logging level for org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator=ALL
Refer to Logging.