Do not delete pods in a non-graceful manner

This moves from forcefully deleting pods to deleting pods in a
graceful manner from the API Server. It waits for the pod to
get to a terminal status prior to deleting the pod from api
server.
This commit is contained in:
Sargun Dhillon
2019-10-10 13:24:58 -07:00
parent 871424368f
commit d22265e5f5
5 changed files with 219 additions and 136 deletions

View File

@@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"sync"
"time"
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors"
@@ -51,7 +52,7 @@ type PodLifecycleHandler interface {
// DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is
// expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal
// state, as well as the pod.
// state, as well as the pod. DeletePod may be called multiple times for the same pod.
DeletePod(ctx context.Context, pod *corev1.Pod) error
// GetPod retrieves a pod by name from the provider (can be cached).
@@ -101,6 +102,9 @@ type PodController struct {
k8sQ workqueue.RateLimitingInterface
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
deletionQ workqueue.RateLimitingInterface
// From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct.
knownPods sync.Map
@@ -127,6 +131,7 @@ type knownPod struct {
// should be immutable to avoid having to hold the lock the entire time you're working with them
sync.Mutex
lastPodStatusReceivedFromProvider *corev1.Pod
lastPodUsed *corev1.Pod
}
// PodControllerConfig is used to configure a new PodController.
@@ -190,6 +195,7 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
done: make(chan struct{}),
recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deletePodsFromKubernetes"),
}
return pc, nil
@@ -207,7 +213,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
defer func() {
pc.k8sQ.ShutDown()
pc.deletionQ.ShutDown()
pc.mu.Lock()
pc.err = retErr
close(pc.done)
@@ -241,13 +247,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Create a copy of the old and new pod objects so we don't mutate the cache.
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
if podsEffectivelyEqual(oldPod, newPod) {
return
}
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.G(ctx).Error(err)
@@ -261,6 +262,8 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
} else {
pc.knownPods.Delete(key)
pc.k8sQ.AddRateLimited(key)
// If this pod was in the deletion queue, forget about it
pc.deletionQ.Forget(key)
}
},
})
@@ -292,6 +295,15 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
}()
}
for id := 0; id < podSyncWorkers; id++ {
wg.Add(1)
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runDeletionReconcilationWorker(ctx, workerID, pc.deletionQ)
}()
}
close(pc.ready)
log.G(ctx).Info("started workers")
@@ -299,6 +311,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown()
podStatusQueue.ShutDown()
pc.deletionQ.ShutDown()
wg.Wait()
return nil
@@ -350,6 +363,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
log.G(ctx).WithField("key", key).Debug("sync handled")
// Convert the namespace/name string into a distinct namespace and name.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@@ -369,35 +383,81 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
span.SetStatus(err)
return err
}
// At this point we know the Pod resource doesn't exist, which most probably means it was deleted.
// Hence, we must delete it from the provider if it still exists there.
if err := pc.deletePod(ctx, namespace, name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
pod, err = pc.provider.GetPod(ctx, namespace, name)
if err != nil && !errdefs.IsNotFound(err) {
err = pkgerrors.Wrapf(err, "failed to fetch pod with key %q from provider", key)
span.SetStatus(err)
return err
}
return nil
if errdefs.IsNotFound(err) || pod == nil {
return nil
}
err = pc.provider.DeletePod(ctx, pod)
if errdefs.IsNotFound(err) {
return nil
}
if err != nil {
err = pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
span.SetStatus(err)
}
return err
}
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
return pc.syncPodInProvider(ctx, pod)
return pc.syncPodInProvider(ctx, pod, key)
}
// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation.
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error {
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "syncPodInProvider")
defer span.End()
// Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod)
// If the pod('s containers) is no longer in a running state then we force-delete the pod from API server
// more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760
if pod.DeletionTimestamp != nil && !running(&pod.Status) {
log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running")
pc.deletionQ.Add(key)
return nil
}
obj, ok := pc.knownPods.Load(key)
if !ok {
// That means the pod was deleted while we were working
return nil
}
kPod := obj.(*knownPod)
kPod.Lock()
if kPod.lastPodUsed != nil && podsEffectivelyEqual(kPod.lastPodUsed, pod) {
kPod.Unlock()
return nil
}
kPod.Unlock()
defer func() {
if retErr == nil {
kPod.Lock()
defer kPod.Unlock()
kPod.lastPodUsed = pod
}
}()
// Check whether the pod has been marked for deletion.
// If it does, guarantee it is deleted in the provider and Kubernetes.
if pod.DeletionTimestamp != nil {
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
log.G(ctx).Debug("Deleting pod in provider")
if err := pc.deletePod(ctx, pod); errdefs.IsNotFound(err) {
log.G(ctx).Debug("Pod not found in provider")
} else if err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
span.SetStatus(err)
return err
}
pc.deletionQ.AddAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
return nil
}
@@ -416,6 +476,25 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod)
return nil
}
// runDeletionReconcilationWorker is a long-running function that will continually call the processDeletionReconcilationWorkItem
// function in order to read and process an item on the work queue that is generated by the pod informer.
func (pc *PodController) runDeletionReconcilationWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processDeletionReconcilationWorkItem(ctx, workerID, q) {
}
}
// processDeletionReconcilationWorkItem will read a single work item off the work queue and attempt to process it,by calling the deletionReconcilation.
func (pc *PodController) processDeletionReconcilationWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerId", workerID)
return handleQueueItem(ctx, q, pc.deletePodHandler)
}
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")
@@ -471,7 +550,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int
// Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod)
// Actually delete the pod.
if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
if err := pc.provider.DeletePod(ctx, pod.DeepCopy()); err != nil && !errdefs.IsNotFound(err) {
span.SetStatus(err)
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
} else {
@@ -515,3 +594,16 @@ func podsEffectivelyEqual(p1, p2 *corev1.Pod) bool {
return cmp.Equal(p1, p2, cmp.FilterPath(filterForResourceVersion, cmp.Ignore()))
}
// borrowed from https://github.com/kubernetes/kubernetes/blob/f64c631cd7aea58d2552ae2038c1225067d30dde/pkg/kubelet/kubelet_pods.go#L944-L953
// running returns true, unless if every status is terminated or waiting, or the status list
// is empty.
func running(podStatus *corev1.PodStatus) bool {
statuses := podStatus.ContainerStatuses
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return true
}
}
return false
}