Have NotifyPods store the pod status in a map (#751)
We introduce a map that can be used to store the pod status. In this, we do not need to call GetPodStatus immediately after NotifyPods is called. Instead, we stash the pod passed via notifypods as in a map we can access later. In addition to this, for legacy providers, the logic to merge the pod, and the pod status is hoisted up to the loop. It prevents leaks by deleting the entry in the map as soon as the pod is deleted from k8s.
This commit is contained in:
128
node/pod.go
128
node/pod.go
@@ -209,15 +209,15 @@ func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace,
|
||||
return nil
|
||||
}
|
||||
|
||||
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
|
||||
func (pc *PodController) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) {
|
||||
ctx, span := trace.StartSpan(ctx, "updatePodStatuses")
|
||||
// fetchPodStatusesFromProvider syncs the providers pod status with the kubernetes pod status.
|
||||
func (pc *PodController) fetchPodStatusesFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
|
||||
ctx, span := trace.StartSpan(ctx, "fetchPodStatusesFromProvider")
|
||||
defer span.End()
|
||||
|
||||
// Update all the pods with the provider status.
|
||||
pods, err := pc.podsLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
err = pkgerrors.Wrap(err, "error getting pod list")
|
||||
err = pkgerrors.Wrap(err, "error getting pod list from kubernetes")
|
||||
span.SetStatus(err)
|
||||
log.G(ctx).WithError(err).Error("Error updating pod statuses")
|
||||
return
|
||||
@@ -226,78 +226,114 @@ func (pc *PodController) updatePodStatuses(ctx context.Context, q workqueue.Rate
|
||||
|
||||
for _, pod := range pods {
|
||||
if !shouldSkipPodStatusUpdate(pod) {
|
||||
enqueuePodStatusUpdate(ctx, q, pod)
|
||||
enrichedPod, err := pc.fetchPodStatusFromProvider(ctx, q, pod)
|
||||
if err != nil {
|
||||
log.G(ctx).WithFields(map[string]interface{}{
|
||||
"name": pod.Name,
|
||||
"namespace": pod.Namespace,
|
||||
}).WithError(err).Error("Could not fetch pod status")
|
||||
} else if enrichedPod != nil {
|
||||
pc.enqueuePodStatusUpdate(ctx, q, enrichedPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fetchPodStatusFromProvider returns a pod (the pod we pass in) enriched with the pod status from the provider. If the pod is not found,
|
||||
// and it has been 1 minute since the pod was created, or the pod was previously running, it will be marked as failed.
|
||||
// If a valid pod status cannot be generated, for example, if a pod is not found in the provider, and it has been less than 1 minute
|
||||
// since pod creation, we will return nil for the pod.
|
||||
func (pc *PodController) fetchPodStatusFromProvider(ctx context.Context, q workqueue.RateLimitingInterface, podFromKubernetes *corev1.Pod) (*corev1.Pod, error) {
|
||||
podStatus, err := pc.provider.GetPodStatus(ctx, podFromKubernetes.Namespace, podFromKubernetes.Name)
|
||||
if errdefs.IsNotFound(err) || (err == nil && podStatus == nil) {
|
||||
// Only change the status when the pod was already up
|
||||
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
|
||||
if podFromKubernetes.Status.Phase == corev1.PodRunning || time.Since(podFromKubernetes.ObjectMeta.CreationTimestamp.Time) > time.Minute {
|
||||
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
|
||||
podStatus = podFromKubernetes.Status.DeepCopy()
|
||||
podStatus.Phase = corev1.PodFailed
|
||||
podStatus.Reason = "NotFound"
|
||||
podStatus.Message = "The pod status was not found and may have been deleted from the provider"
|
||||
now := metav1.NewTime(time.Now())
|
||||
for i, c := range podStatus.ContainerStatuses {
|
||||
if c.State.Running == nil {
|
||||
continue
|
||||
}
|
||||
podStatus.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
|
||||
ExitCode: -137,
|
||||
Reason: "NotFound",
|
||||
Message: "Container was not found and was likely deleted",
|
||||
FinishedAt: now,
|
||||
StartedAt: c.State.Running.StartedAt,
|
||||
ContainerID: c.ContainerID,
|
||||
}
|
||||
podStatus.ContainerStatuses[i].State.Running = nil
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pod := podFromKubernetes.DeepCopy()
|
||||
podStatus.DeepCopyInto(&pod.Status)
|
||||
return pod, nil
|
||||
}
|
||||
|
||||
func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool {
|
||||
return pod.Status.Phase == corev1.PodSucceeded ||
|
||||
pod.Status.Phase == corev1.PodFailed ||
|
||||
pod.Status.Reason == podStatusReasonProviderFailed
|
||||
}
|
||||
|
||||
func (pc *PodController) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
|
||||
if shouldSkipPodStatusUpdate(pod) {
|
||||
func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes *corev1.Pod, key string) error {
|
||||
if shouldSkipPodStatusUpdate(podFromKubernetes) {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "updatePodStatus")
|
||||
defer span.End()
|
||||
ctx = addPodAttributes(ctx, span, pod)
|
||||
ctx = addPodAttributes(ctx, span, podFromKubernetes)
|
||||
|
||||
status, err := pc.provider.GetPodStatus(ctx, pod.Namespace, pod.Name)
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
span.SetStatus(err)
|
||||
return pkgerrors.Wrap(err, "error retrieving pod status")
|
||||
obj, ok := pc.knownPods.Load(key)
|
||||
if !ok {
|
||||
// This means there was a race and the pod has been deleted from K8s
|
||||
return nil
|
||||
}
|
||||
kPod := obj.(*knownPod)
|
||||
kPod.Lock()
|
||||
podFromProvider := kPod.lastPodStatusReceivedFromProvider
|
||||
kPod.Unlock()
|
||||
|
||||
// Do not modify the pod that we got from the cache
|
||||
pod = pod.DeepCopy()
|
||||
|
||||
// Update the pod's status
|
||||
if status != nil {
|
||||
pod.Status = *status
|
||||
} else {
|
||||
// Only change the status when the pod was already up
|
||||
// Only doing so when the pod was successfully running makes sure we don't run into race conditions during pod creation.
|
||||
if pod.Status.Phase == corev1.PodRunning || pod.ObjectMeta.CreationTimestamp.Add(time.Minute).Before(time.Now()) {
|
||||
// Set the pod to failed, this makes sure if the underlying container implementation is gone that a new pod will be created.
|
||||
pod.Status.Phase = corev1.PodFailed
|
||||
pod.Status.Reason = "NotFound"
|
||||
pod.Status.Message = "The pod status was not found and may have been deleted from the provider"
|
||||
for i, c := range pod.Status.ContainerStatuses {
|
||||
pod.Status.ContainerStatuses[i].State.Terminated = &corev1.ContainerStateTerminated{
|
||||
ExitCode: -137,
|
||||
Reason: "NotFound",
|
||||
Message: "Container was not found and was likely deleted",
|
||||
FinishedAt: metav1.NewTime(time.Now()),
|
||||
StartedAt: c.State.Running.StartedAt,
|
||||
ContainerID: c.ContainerID,
|
||||
}
|
||||
pod.Status.ContainerStatuses[i].State.Running = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod); err != nil {
|
||||
if _, err := pc.client.Pods(podFromKubernetes.Namespace).UpdateStatus(podFromProvider); err != nil {
|
||||
span.SetStatus(err)
|
||||
return pkgerrors.Wrap(err, "error while updating pod status in kubernetes")
|
||||
}
|
||||
|
||||
log.G(ctx).WithFields(log.Fields{
|
||||
"new phase": string(pod.Status.Phase),
|
||||
"new reason": pod.Status.Reason,
|
||||
"new phase": string(podFromProvider.Status.Phase),
|
||||
"new reason": podFromProvider.Status.Reason,
|
||||
"old phase": string(podFromKubernetes.Status.Phase),
|
||||
"old reason": podFromKubernetes.Status.Reason,
|
||||
}).Debug("Updated pod status in kubernetes")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
|
||||
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
|
||||
// prior to enqueuePodStatusUpdate.
|
||||
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
|
||||
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key")
|
||||
} else {
|
||||
q.AddRateLimited(key)
|
||||
if obj, ok := pc.knownPods.Load(key); ok {
|
||||
kpod := obj.(*knownPod)
|
||||
kpod.Lock()
|
||||
kpod.lastPodStatusReceivedFromProvider = pod
|
||||
kpod.Unlock()
|
||||
q.AddRateLimited(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,5 +364,5 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE
|
||||
return pkgerrors.Wrap(err, "error looking up pod")
|
||||
}
|
||||
|
||||
return pc.updatePodStatus(ctx, pod)
|
||||
return pc.updatePodStatus(ctx, pod, key)
|
||||
}
|
||||
|
||||
@@ -109,6 +109,17 @@ type PodController struct {
|
||||
resourceManager *manager.ResourceManager
|
||||
|
||||
k8sQ workqueue.RateLimitingInterface
|
||||
|
||||
// From the time of creation, to termination the knownPods map will contain the pods key
|
||||
// (derived from Kubernetes' cache library) -> a *knownPod struct.
|
||||
knownPods sync.Map
|
||||
}
|
||||
|
||||
type knownPod struct {
|
||||
// You cannot read (or modify) the fields in this struct without taking the lock. The individual fields
|
||||
// should be immutable to avoid having to hold the lock the entire time you're working with them
|
||||
sync.Mutex
|
||||
lastPodStatusReceivedFromProvider *corev1.Pod
|
||||
}
|
||||
|
||||
// PodControllerConfig is used to configure a new PodController.
|
||||
@@ -199,6 +210,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
|
||||
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.G(ctx).Error(err)
|
||||
} else {
|
||||
pc.knownPods.Store(key, &knownPod{})
|
||||
pc.k8sQ.AddRateLimited(key)
|
||||
}
|
||||
},
|
||||
@@ -225,6 +237,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
|
||||
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
|
||||
log.G(ctx).Error(err)
|
||||
} else {
|
||||
pc.knownPods.Delete(key)
|
||||
pc.k8sQ.AddRateLimited(key)
|
||||
}
|
||||
},
|
||||
|
||||
@@ -134,7 +134,7 @@ func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateL
|
||||
t.Stop()
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "syncActualState")
|
||||
pc.updatePodStatuses(ctx, q)
|
||||
pc.fetchPodStatusesFromProvider(ctx, q)
|
||||
span.End()
|
||||
|
||||
// restart the timer
|
||||
@@ -146,7 +146,7 @@ func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateL
|
||||
func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
|
||||
if pn, ok := pc.provider.(PodNotifier); ok {
|
||||
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
|
||||
enqueuePodStatusUpdate(ctx, q, pod)
|
||||
pc.enqueuePodStatusUpdate(ctx, q, pod.DeepCopy())
|
||||
})
|
||||
} else {
|
||||
go pc.providerSyncLoop(ctx, q)
|
||||
|
||||
Reference in New Issue
Block a user