Skip to content

ExecutorPodsLifecycleManager

Creating Instance

ExecutorPodsLifecycleManager takes the following to be created:

ExecutorPodsLifecycleManager is created when KubernetesClusterManager is requested for a SchedulerBackend (and creates a KubernetesClusterSchedulerBackend).

Configuration Properties

spark.kubernetes.executor.eventProcessingInterval

ExecutorPodsLifecycleManager uses the spark.kubernetes.executor.eventProcessingInterval configuration property when started to register a new subscriber for how often to...FIXME

spark.kubernetes.executor.deleteOnTermination

ExecutorPodsLifecycleManager uses the spark.kubernetes.executor.deleteOnTermination configuration property for onFinalNonDeletedState.

Starting

start(
  schedulerBackend: KubernetesClusterSchedulerBackend): Unit

start requests the ExecutorPodsSnapshotsStore to add a subscriber to intercept state changes in executor pods.

start is used when KubernetesClusterSchedulerBackend is started.

Handling State Changes in Executor Pods

onNewSnapshots(
  schedulerBackend: KubernetesClusterSchedulerBackend,
  snapshots: Seq[ExecutorPodsSnapshot]): Unit

onNewSnapshots creates an empty execIdsRemovedInThisRound collection of executors to be removed.

onNewSnapshots walks over the input ExecutorPodsSnapshots and branches off based on ExecutorPodState:

  • For PodDeleted, onNewSnapshots prints out the following DEBUG message to the logs:

    Snapshot reported deleted executor with id [execId], pod name [state.pod.getMetadata.getName]
    

    onNewSnapshots removeExecutorFromSpark and adds the executor ID to the execIdsRemovedInThisRound local collection.

  • For PodFailed, onNewSnapshots prints out the following DEBUG message to the logs:

    Snapshot reported failed executor with id [execId], pod name [state.pod.getMetadata.getName]
    

    onNewSnapshots onFinalNonDeletedState with the execIdsRemovedInThisRound local collection.

  • For PodSucceeded, onNewSnapshots requests the input KubernetesClusterSchedulerBackend to isExecutorActive. If so, onNewSnapshots prints out the following INFO message to the logs:

    Snapshot reported succeeded executor with id [execId], even though the application has not requested for it to be removed.
    

    Otherwise, onNewSnapshots prints out the following DEBUG message to the logs:

    Snapshot reported succeeded executor with id [execId], pod name [state.pod.getMetadata.getName].
    

    onNewSnapshots onFinalNonDeletedState with the execIdsRemovedInThisRound local collection.

onFinalNonDeletedState

onFinalNonDeletedState(
  podState: FinalPodState,
  execId: Long,
  schedulerBackend: KubernetesClusterSchedulerBackend,
  execIdsRemovedInRound: mutable.Set[Long]): Unit

onFinalNonDeletedState removeExecutorFromSpark.

With spark.kubernetes.executor.deleteOnTermination configuration property enabled, onFinalNonDeletedState removeExecutorFromK8s.

In the end, onFinalNonDeletedState adds the given execId to the given execIdsRemovedInRound collection.

removeExecutorFromSpark

removeExecutorFromSpark(
  schedulerBackend: KubernetesClusterSchedulerBackend,
  podState: FinalPodState,
  execId: Long): Unit

removeExecutorFromSpark...FIXME

Logging

Enable ALL logging level for org.apache.spark.scheduler.cluster.k8s.ExecutorPodsLifecycleManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.k8s.ExecutorPodsLifecycleManager=ALL

Refer to Logging.


Last update: 2021-01-04