ExecutorPodsAllocator¶
ExecutorPodsAllocator
is responsible for allocating pods for executors (possibly dynamic) in a Spark application.
ExecutorPodsAllocator
is used to create a KubernetesClusterSchedulerBackend.
Creating Instance¶
ExecutorPodsAllocator
takes the following to be created:
-
SparkConf
-
SecurityManager
- KubernetesExecutorBuilder
-
KubernetesClient
- ExecutorPodsSnapshotsStore
-
Clock
ExecutorPodsAllocator
is created when:
KubernetesClusterManager
is requested for a SchedulerBackend
Executor Pod Allocation Timeout¶
ExecutorPodsAllocator
defines Executor Pod Allocation Timeout that is the maximum of the following values:
- 5 times of spark.kubernetes.allocation.batch.delay configuration property
- spark.kubernetes.allocation.executor.timeout configuration property
ExecutorPodsAllocator
uses the allocation timeout to detect "old" executor pod requests when handling executor pods snapshots.
spark.dynamicAllocation.enabled¶
ExecutorPodsAllocator
uses spark.dynamicAllocation.enabled
configuration property to turn dynamic allocation of executors on and off.
The Internals of Apache Spark
Learn more about Dynamic Allocation of Executors in The Internals of Apache Spark.
Driver Pod¶
driverPod: Option[Pod]
driverPod
is a driver pod with the name of spark.kubernetes.driver.pod.name configuration property (if defined).
ExecutorPodsAllocator
throws a SparkException
when the driver pod could not be found in a Kubernetes cluster:
No pod was found named [kubernetesDriverPodName] in the cluster in the namespace [namespace] (this was supposed to be the driver pod.).
spark.kubernetes.driver.pod.name¶
ExecutorPodsAllocator
uses spark.kubernetes.driver.pod.name configuration property to look up the driver pod by name when created.
spark.kubernetes.allocation.batch.size¶
ExecutorPodsAllocator
uses spark.kubernetes.allocation.batch.size configuration property when allocating executor pods.
spark.kubernetes.allocation.batch.delay¶
ExecutorPodsAllocator
uses spark.kubernetes.allocation.batch.delay configuration property for the following:
spark.kubernetes.executor.deleteOnTermination¶
ExecutorPodsAllocator
uses spark.kubernetes.executor.deleteOnTermination configuration property.
Starting¶
start(
applicationId: String): Unit
start
requests the ExecutorPodsSnapshotsStore to subscribe this ExecutorPodsAllocator
to be notified about new snapshots (with pod allocation delay based on spark.kubernetes.allocation.batch.delay configuration property).
start
is used when:
KubernetesClusterSchedulerBackend
is requested to start
Processing Executor Pods Snapshots¶
onNewSnapshots(
applicationId: String,
snapshots: Seq[ExecutorPodsSnapshot]): Unit
onNewSnapshots
removes the executor IDs (of the executor pods in the given snapshots) from the newlyCreatedExecutors internal registry.
For the remaining executor IDs in the newlyCreatedExecutors internal registry, onNewSnapshots
finds timed-out executor IDs whose creation time exceeded some podCreationTimeout threshold. For the other executor IDs, onNewSnapshots
prints out the following DEBUG message to the logs:
Executor with id [execId] was not found in the Kubernetes cluster since it was created [time] milliseconds ago.
For any timed-out executor IDs, onNewSnapshots
prints out the following WARN message to the logs:
Executors with ids [ids] were not detected in the Kubernetes cluster after [podCreationTimeout] ms despite the fact that a previous allocation attempt tried to create them. The executors may have been deleted but the application missed the deletion event.
onNewSnapshots
removes (forgets) the timed-out executor IDs (from the newlyCreatedExecutors internal registry). With the spark.kubernetes.executor.deleteOnTermination configuration property enabled, onNewSnapshots
requests the KubernetesClient to delete pods with the following labels:
spark-app-selector
with the givenapplicationId
spark-role
=executor
spark-exec-id
for all timed-out executor IDs
onNewSnapshots
updates the lastSnapshot internal registry with the last ExecutorPodsSnapshot
among the given snapshots
if available.
onNewSnapshots
...FIXME
Requesting Executors from Kubernetes¶
requestNewExecutors(
expected: Int,
running: Int,
applicationId: String,
resourceProfileId: Int): Unit
requestNewExecutors
determines the number of executor pods to allocate based on the given expected
and running
and spark.kubernetes.allocation.batch.size configuration property.
requestNewExecutors
prints out the following INFO message to the logs:
Going to request [numExecutorsToAllocate] executors from Kubernetes for ResourceProfile Id: [resourceProfileId], target: [expected] running: [running].
For every new executor pod, requestNewExecutors
does the following:
- Increments the executor ID counter
- Creates a KubernetesExecutorConf for the executor ID, the given
applicationId
andresourceProfileId
, and the driver pod - Requests the KubernetesExecutorBuilder to build the pod spec for executors
- Requests the KubernetesClient to create an executor pod with an executor container attached
-
Requests the KubernetesClient to create
PersistentVolumeClaim
resources if there are any defined (as additional resources) and prints out the following INFO message to the logs:Trying to create PersistentVolumeClaim [name] with StorageClass [storageClassName]
-
Registers the new executor ID in the newlyCreatedExecutors registry
-
Prints out the following DEBUG message to the logs:
Requested executor with id [newExecutorId] from Kubernetes.
In case of any exceptions, requestNewExecutors
requests the KubernetesClient to delete the failed executor pod.
Setting Expected Number of Executors per ResourceProfile¶
setTotalExpectedExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit
setTotalExpectedExecutors
updates the rpIdToResourceProfile and totalExpectedExecutorsPerResourceProfileId internal registries for every ResourceProfile
.
setTotalExpectedExecutors
prints out the following DEBUG message to the logs:
Set total expected execs to [totalExpectedExecutorsPerResourceProfileId]
With no pending pods, setTotalExpectedExecutors
requests the ExecutorPodsSnapshotsStore to notifySubscribers.
setTotalExpectedExecutors
is used when:
KubernetesClusterSchedulerBackend
is requested to start and doRequestTotalExecutors
Registries¶
newlyCreatedExecutors¶
newlyCreatedExecutors: Map[Long, Long]
ExecutorPodsAllocator
uses newlyCreatedExecutors
internal registry to track executor IDs (with the timestamps they were created) that have been requested from Kubernetes but have not been detected in any snapshot yet.
Used in onNewSnapshots
EXECUTOR_ID_COUNTER¶
ExecutorPodsAllocator
uses a Java AtomicLong for the missing executor IDs that are going to be requested (in onNewSnapshots) when...FIXME
hasPendingPods Flag¶
hasPendingPods: AtomicBoolean
ExecutorPodsAllocator
uses a Java AtomicBoolean as a flag to avoid notifying subscribers.
Starts as false
and is updated every onNewSnapshots
Used in setTotalExpectedExecutors (only when false
)
totalExpectedExecutorsPerResourceProfileId¶
totalExpectedExecutorsPerResourceProfileId: ConcurrentHashMap[Int, Int]
ExecutorPodsAllocator
uses a Java ConcurrentHashMap for...FIXME
A new entry added while changing the total expected executors
Used in onNewSnapshots
rpIdToResourceProfile¶
rpIdToResourceProfile: HashMap[Int, ResourceProfile]
ExecutorPodsAllocator
uses a Java HashMap as a lookup table of ResourceProfile
s by ID.
A new entry added while changing the total expected executors
Used in requestNewExecutors
Logging¶
Enable ALL
logging level for org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator=ALL
Refer to Logging.