* vendor: add vendored code
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* controller: use shared informers and a work queue
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* errors: use cpuguy83/strongerrors
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* aci: fix test that uses resource manager
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* readme: clarify skaffold run before e2e
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* cmd: use root context everywhere
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: refactor pod lifecycle management
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* e2e: fix race in test when observing deletions
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* e2e: test pod forced deletion
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* cmd: fix root context potential leak
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: rename metaKey
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: remove calls to HandleError
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* Revert "errors: use cpuguy83/strongerrors"
This reverts commit f031fc6d.
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* manager: remove redundant lister constraint
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: rename the pod event recorder
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: amend misleading comment
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* mock: add tracing
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: add tracing
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* test: observe timeouts
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* trace: remove unnecessary comments
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: limit concurrency in deleteDanglingPods
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: never store context, always pass in calls
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: remove HandleCrash and just panic
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: don't sync succeeded pods
Signed-off-by: Paulo Pires <pjpires@gmail.com>
* sync: ensure pod deletion from kubernetes
Signed-off-by: Paulo Pires <pjpires@gmail.com>
378 lines
15 KiB
Go
378 lines
15 KiB
Go
// Copyright © 2017 The virtual-kubelet authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package vkubelet
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cpuguy83/strongerrors/status/ocstatus"
|
|
pkgerrors "github.com/pkg/errors"
|
|
"go.opencensus.io/trace"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/informers/core/v1"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
corev1listers "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
|
)
|
|
|
|
const (
|
|
// maxRetries is the number of times we try to process a given key before permanently forgetting it.
|
|
maxRetries = 20
|
|
)
|
|
|
|
// PodController is the controller implementation for Pod resources.
|
|
type PodController struct {
|
|
// server is the instance to which this controller belongs.
|
|
server *Server
|
|
// podsInformer is an informer for Pod resources.
|
|
podsInformer v1.PodInformer
|
|
// podsLister is able to list/get Pod resources from a shared informer's store.
|
|
podsLister corev1listers.PodLister
|
|
|
|
// workqueue is a rate limited work queue.
|
|
// This is used to queue work to be processed instead of performing it as soon as a change happens.
|
|
// This means we can ensure we only process a fixed amount of resources at a time, and makes it easy to ensure we are never processing the same item simultaneously in two different workers.
|
|
workqueue workqueue.RateLimitingInterface
|
|
// recorder is an event recorder for recording Event resources to the Kubernetes API.
|
|
recorder record.EventRecorder
|
|
}
|
|
|
|
// NewPodController returns a new instance of PodController.
|
|
func NewPodController(server *Server) *PodController {
|
|
// Create an event broadcaster.
|
|
eventBroadcaster := record.NewBroadcaster()
|
|
eventBroadcaster.StartLogging(log.L.Infof)
|
|
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: server.k8sClient.CoreV1().Events("")})
|
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: fmt.Sprintf("%s/pod-controller", server.nodeName)})
|
|
|
|
// Create an instance of PodController having a work queue that uses the rate limiter created above.
|
|
pc := &PodController{
|
|
server: server,
|
|
podsInformer: server.podInformer,
|
|
podsLister: server.podInformer.Lister(),
|
|
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"),
|
|
recorder: recorder,
|
|
}
|
|
|
|
// Set up event handlers for when Pod resources change.
|
|
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(pod interface{}) {
|
|
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
|
|
log.L.Error(err)
|
|
} else {
|
|
pc.workqueue.AddRateLimited(key)
|
|
}
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
// Create a copy of the old and new pod objects so we don't mutate the cache.
|
|
oldPod := oldObj.(*corev1.Pod).DeepCopy()
|
|
newPod := newObj.(*corev1.Pod).DeepCopy()
|
|
// We want to check if the two objects differ in anything other than their resource versions.
|
|
// Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual.
|
|
newPod.ResourceVersion = oldPod.ResourceVersion
|
|
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
|
|
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
|
|
if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
|
|
return
|
|
}
|
|
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
|
|
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
|
|
log.L.Error(err)
|
|
} else {
|
|
pc.workqueue.AddRateLimited(key)
|
|
}
|
|
},
|
|
DeleteFunc: func(pod interface{}) {
|
|
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
|
|
log.L.Error(err)
|
|
} else {
|
|
pc.workqueue.AddRateLimited(key)
|
|
}
|
|
},
|
|
})
|
|
|
|
// Return the instance of PodController back to the caller.
|
|
return pc
|
|
}
|
|
|
|
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
|
|
// It will block until stopCh is closed, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
|
|
func (pc *PodController) Run(ctx context.Context, threadiness int) error {
|
|
defer pc.workqueue.ShutDown()
|
|
|
|
// Wait for the caches to be synced before starting workers.
|
|
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
|
|
return pkgerrors.New("failed to wait for caches to sync")
|
|
}
|
|
|
|
// Perform a reconciliation step that deletes any dangling pods from the provider.
|
|
// This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis.
|
|
// If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried.
|
|
pc.deleteDanglingPods(ctx, threadiness)
|
|
|
|
// Launch "threadiness" workers to process Pod resources.
|
|
log.G(ctx).Info("starting workers")
|
|
for id := 0; id < threadiness; id++ {
|
|
go wait.Until(func() {
|
|
// Use the worker's "index" as its ID so we can use it for tracing.
|
|
pc.runWorker(ctx, strconv.Itoa(id))
|
|
}, time.Second, ctx.Done())
|
|
}
|
|
|
|
log.G(ctx).Info("started workers")
|
|
<-ctx.Done()
|
|
log.G(ctx).Info("shutting down workers")
|
|
|
|
return nil
|
|
}
|
|
|
|
// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue.
|
|
func (pc *PodController) runWorker(ctx context.Context, workerId string) {
|
|
for pc.processNextWorkItem(ctx, workerId) {
|
|
}
|
|
}
|
|
|
|
// processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler.
|
|
func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string) bool {
|
|
obj, shutdown := pc.workqueue.Get()
|
|
|
|
if shutdown {
|
|
return false
|
|
}
|
|
|
|
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
|
|
ctx, span := trace.StartSpan(ctx, "processNextWorkItem")
|
|
defer span.End()
|
|
|
|
// Add the ID of the current worker as an attribute to the current span.
|
|
span.AddAttributes(trace.StringAttribute("workerId", workerId))
|
|
|
|
// We wrap this block in a func so we can defer pc.workqueue.Done.
|
|
err := func(obj interface{}) error {
|
|
// We call Done here so the work queue knows we have finished processing this item.
|
|
// We also must remember to call Forget if we do not want this work item being re-queued.
|
|
// For example, we do not call Forget if a transient error occurs.
|
|
// Instead, the item is put back on the work queue and attempted again after a back-off period.
|
|
defer pc.workqueue.Done(obj)
|
|
var key string
|
|
var ok bool
|
|
// We expect strings to come off the work queue.
|
|
// These are of the form namespace/name.
|
|
// We do this as the delayed nature of the work queue means the items in the informer cache may actually be more up to date that when the item was initially put onto the workqueue.
|
|
if key, ok = obj.(string); !ok {
|
|
// As the item in the work queue is actually invalid, we call Forget here else we'd go into a loop of attempting to process a work item that is invalid.
|
|
pc.workqueue.Forget(obj)
|
|
log.G(ctx).Warnf("expected string in work queue but got %#v", obj)
|
|
return nil
|
|
}
|
|
// Add the current key as an attribute to the current span.
|
|
span.AddAttributes(trace.StringAttribute("key", key))
|
|
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
|
|
if err := pc.syncHandler(ctx, key); err != nil {
|
|
if pc.workqueue.NumRequeues(key) < maxRetries {
|
|
// Put the item back on the work queue to handle any transient errors.
|
|
log.G(ctx).Warnf("requeuing %q due to failed sync", key)
|
|
pc.workqueue.AddRateLimited(key)
|
|
return nil
|
|
}
|
|
// We've exceeded the maximum retries, so we must forget the key.
|
|
pc.workqueue.Forget(key)
|
|
return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key)
|
|
}
|
|
// Finally, if no error occurs we Forget this item so it does not get queued again until another change happens.
|
|
pc.workqueue.Forget(obj)
|
|
return nil
|
|
}(obj)
|
|
|
|
if err != nil {
|
|
// We've actually hit an error, so we set the span's status based on the error.
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
log.G(ctx).Error(err)
|
|
return true
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// syncHandler compares the actual state with the desired, and attempts to converge the two.
|
|
func (pc *PodController) syncHandler(ctx context.Context, key string) error {
|
|
ctx, span := trace.StartSpan(ctx, "syncHandler")
|
|
defer span.End()
|
|
|
|
// Add the current key as an attribute to the current span.
|
|
span.AddAttributes(trace.StringAttribute("key", key))
|
|
|
|
// Convert the namespace/name string into a distinct namespace and name.
|
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
|
if err != nil {
|
|
// Log the error as a warning, but do not requeue the key as it is invalid.
|
|
log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key))
|
|
return nil
|
|
}
|
|
|
|
// Get the Pod resource with this namespace/name.
|
|
pod, err := pc.podsLister.Pods(namespace).Get(name)
|
|
if err != nil {
|
|
if !errors.IsNotFound(err) {
|
|
// We've failed to fetch the pod from the lister, but the error is not a 404.
|
|
// Hence, we add the key back to the work queue so we can retry processing it later.
|
|
err := pkgerrors.Wrapf(err, "failed to fetch pod with key %q from lister", key)
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
return err
|
|
}
|
|
// At this point we know the Pod resource doesn't exist, which most probably means it was deleted.
|
|
// Hence, we must delete it from the provider if it still exists there.
|
|
if err := pc.server.deletePod(ctx, namespace, name); err != nil {
|
|
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
|
|
return pc.syncPodInProvider(ctx, pod)
|
|
}
|
|
|
|
// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation.
|
|
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error {
|
|
ctx, span := trace.StartSpan(ctx, "syncPodInProvider")
|
|
defer span.End()
|
|
|
|
// Add the pod's attributes to the current span.
|
|
addPodAttributes(span, pod)
|
|
|
|
// Check whether the pod has been marked for deletion.
|
|
// If it does, guarantee it is deleted in the provider and Kubernetes.
|
|
if pod.DeletionTimestamp != nil {
|
|
if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
|
|
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Ignore the pod if it is in the "Failed" or "Succeeded" state.
|
|
if pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded {
|
|
log.G(ctx).Warnf("skipping sync of pod %q in %q phase", loggablePodName(pod), pod.Status.Phase)
|
|
return nil
|
|
}
|
|
|
|
// Create or update the pod in the provider.
|
|
if err := pc.server.createOrUpdatePod(ctx, pod); err != nil {
|
|
err := pkgerrors.Wrapf(err, "failed to sync pod %q in the provider", loggablePodName(pod))
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
|
|
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
|
|
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")
|
|
defer span.End()
|
|
|
|
// Grab the list of pods known to the provider.
|
|
pps, err := pc.server.provider.GetPods(ctx)
|
|
if err != nil {
|
|
err := pkgerrors.Wrap(err, "failed to fetch the list of pods from the provider")
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
log.G(ctx).Error(err)
|
|
return
|
|
}
|
|
|
|
// Create a slice to hold the pods we will be deleting from the provider.
|
|
ptd := make([]*corev1.Pod, 0)
|
|
|
|
// Iterate over the pods known to the provider, marking for deletion those that don't exist in Kubernetes.
|
|
// Take on this opportunity to populate the list of key that correspond to pods known to the provider.
|
|
for _, pp := range pps {
|
|
if _, err := pc.podsLister.Pods(pp.Namespace).Get(pp.Name); err != nil {
|
|
if errors.IsNotFound(err) {
|
|
// The current pod does not exist in Kubernetes, so we mark it for deletion.
|
|
ptd = append(ptd, pp)
|
|
continue
|
|
}
|
|
// For some reason we couldn't fetch the pod from the lister, so we propagate the error.
|
|
err := pkgerrors.Wrap(err, "failed to fetch pod from the lister")
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
log.G(ctx).Error(err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// We delete each pod in its own goroutine, allowing a maximum of "threadiness" concurrent deletions.
|
|
semaphore := make(chan struct{}, threadiness)
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(ptd))
|
|
|
|
// Iterate over the slice of pods to be deleted and delete them in the provider.
|
|
for _, pod := range ptd {
|
|
go func(ctx context.Context, pod *corev1.Pod) {
|
|
defer wg.Done()
|
|
|
|
ctx, span := trace.StartSpan(ctx, "deleteDanglingPod")
|
|
defer span.End()
|
|
|
|
semaphore <- struct{}{}
|
|
defer func() {
|
|
<-semaphore
|
|
}()
|
|
|
|
// Add the pod's attributes to the current span.
|
|
addPodAttributes(span, pod)
|
|
// Actually delete the pod.
|
|
if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
|
|
span.SetStatus(ocstatus.FromError(err))
|
|
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
|
|
} else {
|
|
log.G(ctx).Infof("deleted leaked pod %q in provider", loggablePodName(pod))
|
|
}
|
|
}(ctx, pod)
|
|
}
|
|
|
|
// Wait for all pods to be deleted.
|
|
wg.Wait()
|
|
return
|
|
}
|
|
|
|
// loggablePodName returns the "namespace/name" key for the specified pod.
|
|
// If the key cannot be computed, "(unknown)" is returned.
|
|
// This method is meant to be used for logging purposes only.
|
|
func loggablePodName(pod *corev1.Pod) string {
|
|
k, err := cache.MetaNamespaceKeyFunc(pod)
|
|
if err != nil {
|
|
return "(unknown)"
|
|
}
|
|
return k
|
|
}
|
|
|
|
// loggablePodNameFromCoordinates returns the "namespace/name" key for the pod identified by the specified namespace and name (coordinates).
|
|
func loggablePodNameFromCoordinates(namespace, name string) string {
|
|
return fmt.Sprintf("%s/%s", namespace, name)
|
|
}
|