Skip to content

ExecutorPodsLifecycleManager

Creating Instance

ExecutorPodsLifecycleManager takes the following to be created:

ExecutorPodsLifecycleManager is created when KubernetesClusterManager is requested for a SchedulerBackend (and creates a KubernetesClusterSchedulerBackend).

Configuration Properties

spark.kubernetes.executor.eventProcessingInterval

ExecutorPodsLifecycleManager uses the spark.kubernetes.executor.eventProcessingInterval configuration property when started to register a new subscriber for how often to...FIXME

spark.kubernetes.executor.deleteOnTermination

ExecutorPodsLifecycleManager uses the spark.kubernetes.executor.deleteOnTermination configuration property for onFinalNonDeletedState.

Missing Pod Timeout

ExecutorPodsLifecycleManager defines Missing Pod Timeout based on the spark.kubernetes.executor.missingPodDetectDelta configuration property.

ExecutorPodsLifecycleManager uses the timeout to detect lost executor pods when handling executor pods snapshots.

Starting

start(
  schedulerBackend: KubernetesClusterSchedulerBackend): Unit

start requests the ExecutorPodsSnapshotsStore to add a subscriber to intercept state changes in executor pods.

start is used when KubernetesClusterSchedulerBackend is started.

Processing Executor Pods Snapshots

onNewSnapshots(
  schedulerBackend: KubernetesClusterSchedulerBackend,
  snapshots: Seq[ExecutorPodsSnapshot]): Unit

onNewSnapshots creates an empty execIdsRemovedInThisRound collection of executors to be removed.

onNewSnapshots walks over the input ExecutorPodsSnapshots and branches off based on ExecutorPodState:

  • For PodDeleted, onNewSnapshots prints out the following DEBUG message to the logs:

    Snapshot reported deleted executor with id [execId], pod name [state.pod.getMetadata.getName]
    

    onNewSnapshots removeExecutorFromSpark and adds the executor ID to the execIdsRemovedInThisRound local collection.

  • For PodFailed, onNewSnapshots prints out the following DEBUG message to the logs:

    Snapshot reported failed executor with id [execId], pod name [state.pod.getMetadata.getName]
    

    onNewSnapshots onFinalNonDeletedState with the execIdsRemovedInThisRound local collection.

  • For PodSucceeded, onNewSnapshots requests the input KubernetesClusterSchedulerBackend to isExecutorActive. If so, onNewSnapshots prints out the following INFO message to the logs:

    Snapshot reported succeeded executor with id [execId], even though the application has not requested for it to be removed.
    

    Otherwise, onNewSnapshots prints out the following DEBUG message to the logs:

    Snapshot reported succeeded executor with id [execId], pod name [state.pod.getMetadata.getName].
    

    onNewSnapshots onFinalNonDeletedState with the execIdsRemovedInThisRound local collection.

onFinalNonDeletedState

onFinalNonDeletedState(
  podState: FinalPodState,
  execId: Long,
  schedulerBackend: KubernetesClusterSchedulerBackend,
  deleteFromK8s: Boolean): Boolean

onFinalNonDeletedState removeExecutorFromSpark (and records the deleted return flag to be returned in the end).

With the given deleteFromK8s flag enabled, onFinalNonDeletedState removeExecutorFromK8s.

removeExecutorFromSpark

removeExecutorFromSpark(
  schedulerBackend: KubernetesClusterSchedulerBackend,
  podState: FinalPodState,
  execId: Long): Unit

removeExecutorFromSpark...FIXME

removeExecutorFromK8s

removeExecutorFromK8s(
  execId: Long,
  updatedPod: Pod): Unit

removeExecutorFromK8s...FIXME

Logging

Enable ALL logging level for org.apache.spark.scheduler.cluster.k8s.ExecutorPodsLifecycleManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.k8s.ExecutorPodsLifecycleManager=ALL

Refer to Logging.

Back to top