use shared informers and workqueue (#425)

* vendor: add vendored code

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* controller: use shared informers and a work queue

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* errors: use cpuguy83/strongerrors

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* aci: fix test that uses resource manager

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* readme: clarify skaffold run before e2e

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* cmd: use root context everywhere

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: refactor pod lifecycle management

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* e2e: fix race in test when observing deletions

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* e2e: test pod forced deletion

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* cmd: fix root context potential leak

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: rename metaKey

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: remove calls to HandleError

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* Revert "errors: use cpuguy83/strongerrors"

This reverts commit f031fc6d.

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* manager: remove redundant lister constraint

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: rename the pod event recorder

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: amend misleading comment

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* mock: add tracing

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: add tracing

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* test: observe timeouts

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* trace: remove unnecessary comments

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: limit concurrency in deleteDanglingPods

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: never store context, always pass in calls

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: remove HandleCrash and just panic

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: don't sync succeeded pods

Signed-off-by: Paulo Pires <pjpires@gmail.com>

* sync: ensure pod deletion from kubernetes

Signed-off-by: Paulo Pires <pjpires@gmail.com>
This commit is contained in:
Paulo Pires
2018-11-30 23:53:58 +00:00
committed by Robbie Zhang
parent 0e9cfca585
commit 28a757f4da
419 changed files with 20138 additions and 14777 deletions

View File

@@ -6,17 +6,13 @@ import (
"time"
"github.com/cpuguy83/strongerrors/status/ocstatus"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
func addPodAttributes(span *trace.Span, pod *corev1.Pod) {
@@ -29,138 +25,18 @@ func addPodAttributes(span *trace.Span, pod *corev1.Pod) {
)
}
func (s *Server) onAddPod(ctx context.Context, obj interface{}) {
ctx, span := trace.StartSpan(ctx, "onAddPod")
defer span.End()
logger := log.G(ctx).WithField("method", "onAddPod")
pod, ok := obj.(*corev1.Pod)
if !ok {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %T", obj)})
logger.Errorf("obj is not of a valid type: %T", obj)
return
func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod) error {
// Check if the pod is already known by the provider.
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil {
// The pod has already been created in the provider.
// Hence, we return since pod updates are not yet supported.
log.G(ctx).Warnf("skipping update of pod %s as pod updates are not supported", pp.Name)
return nil
}
addPodAttributes(span, pod)
logger.Debugf("Receive added pod '%s/%s' ", pod.GetNamespace(), pod.GetName())
if s.resourceManager.UpdatePod(pod) {
span.Annotate(nil, "Add pod to synchronizer channel.")
select {
case <-ctx.Done():
logger = logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
logger.WithError(ctx.Err()).Debug("Cancel send pod event due to cancelled context")
return
case s.podCh <- &podNotification{pod: pod, ctx: ctx}:
}
}
}
func (s *Server) onUpdatePod(ctx context.Context, obj interface{}) {
ctx, span := trace.StartSpan(ctx, "onUpdatePod")
defer span.End()
logger := log.G(ctx).WithField("method", "onUpdatePod")
pod, ok := obj.(*corev1.Pod)
if !ok {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %T", obj)})
logger.Errorf("obj is not of a valid type: %T", obj)
return
}
addPodAttributes(span, pod)
logger.Debugf("Receive updated pod '%s/%s'", pod.GetNamespace(), pod.GetName())
if s.resourceManager.UpdatePod(pod) {
span.Annotate(nil, "Add pod to synchronizer channel.")
select {
case <-ctx.Done():
logger = logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
logger.WithError(ctx.Err()).Debug("Cancel send pod event due to cancelled context")
return
case s.podCh <- &podNotification{pod: pod, ctx: ctx}:
}
}
}
func (s *Server) onDeletePod(ctx context.Context, obj interface{}) {
ctx, span := trace.StartSpan(ctx, "onDeletePod")
defer span.End()
logger := log.G(ctx).WithField("method", "onDeletePod")
pod, ok := obj.(*corev1.Pod)
if !ok {
delta, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %T", obj)})
logger.Errorf("obj is not of a valid type: %T", obj)
return
}
if pod, ok = delta.Obj.(*corev1.Pod); !ok {
span.SetStatus(trace.Status{Code: trace.StatusCodeInvalidArgument, Message: fmt.Sprintf("Unexpected object from event: %T", obj)})
logger.Errorf("obj is not of a valid type: %T", obj)
return
}
}
addPodAttributes(span, pod)
logger.Debugf("Receive deleted pod '%s/%s'", pod.GetNamespace(), pod.GetName())
if s.resourceManager.DeletePod(pod) {
span.Annotate(nil, "Add pod to synchronizer channel.")
select {
case <-ctx.Done():
logger = logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
logger.WithError(ctx.Err()).Debug("Cancel send pod event due to cancelled context")
return
case s.podCh <- &podNotification{pod: pod, ctx: ctx}:
}
}
}
func (s *Server) startPodSynchronizer(ctx context.Context, id int) {
logger := log.G(ctx).WithField("method", "startPodSynchronizer").WithField("podSynchronizer", id)
logger.Debug("Start pod synchronizer")
for {
select {
case <-ctx.Done():
logger.Info("Stop pod syncronizer")
return
case event := <-s.podCh:
s.syncPod(event.ctx, event.pod)
}
}
}
func (s *Server) syncPod(ctx context.Context, pod *corev1.Pod) {
ctx, span := trace.StartSpan(ctx, "syncPod")
defer span.End()
logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
addPodAttributes(span, pod)
if pod.DeletionTimestamp != nil {
span.Annotate(nil, "Delete pod")
logger.Debugf("Deleting pod")
if err := s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Failed to delete pod")
}
} else {
span.Annotate(nil, "Create pod")
logger.Debugf("Creating pod")
if err := s.createPod(ctx, pod); err != nil {
logger.WithError(err).Errorf("Failed to create pod")
}
}
}
func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "createPod")
ctx, span := trace.StartSpan(ctx, "createOrUpdatePod")
defer span.End()
addPodAttributes(span, pod)
@@ -199,7 +75,16 @@ func (s *Server) createPod(ctx context.Context, pod *corev1.Pod) error {
return nil
}
func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
func (s *Server) deletePod(ctx context.Context, namespace, name string) error {
// Grab the pod as known by the provider.
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
pod, _ := s.provider.GetPod(ctx, namespace, name)
if pod == nil {
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
return s.forceDeletePodResource(ctx, namespace, name)
}
ctx, span := trace.StartSpan(ctx, "deletePod")
defer span.End()
addPodAttributes(span, pod)
@@ -213,26 +98,37 @@ func (s *Server) deletePod(ctx context.Context, pod *corev1.Pod) error {
logger := log.G(ctx).WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
if !errors.IsNotFound(delErr) {
var grace int64
if err := s.k8sClient.CoreV1().Pods(pod.GetNamespace()).Delete(pod.GetName(), &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil {
if errors.IsNotFound(err) {
span.Annotate(nil, "Pod does not exist in k8s, nothing to delete")
return nil
}
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return fmt.Errorf("Failed to delete kubernetes pod: %s", err)
if err := s.forceDeletePodResource(ctx, namespace, name); err != nil {
span.SetStatus(ocstatus.FromError(err))
return err
}
span.Annotate(nil, "Deleted pod from k8s")
s.resourceManager.DeletePod(pod)
span.Annotate(nil, "Deleted pod from internal state")
logger.Info("Pod deleted")
}
return nil
}
func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name string) error {
ctx, span := trace.StartSpan(ctx, "forceDeletePodResource")
defer span.End()
span.AddAttributes(
trace.StringAttribute("namespace", namespace),
trace.StringAttribute("name", name),
)
var grace int64
if err := s.k8sClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil {
if errors.IsNotFound(err) {
span.Annotate(nil, "Pod does not exist in Kubernetes, nothing to delete")
return nil
}
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
return fmt.Errorf("Failed to delete Kubernetes pod: %s", err)
}
return nil
}
// updatePodStatuses syncs the providers pod status with the kubernetes pod status.
func (s *Server) updatePodStatuses(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "updatePodStatuses")
@@ -310,69 +206,3 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
}, "updated pod status in kubernetes")
return nil
}
// watchForPodEvent waits for pod changes from kubernetes and updates the details accordingly in the local state.
// This returns after a single pod event.
func (s *Server) watchForPodEvent(ctx context.Context) error {
opts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", s.nodeName).String(),
}
pods, err := s.k8sClient.CoreV1().Pods(s.namespace).List(opts)
if err != nil {
return pkgerrors.Wrap(err, "error getting pod list")
}
s.resourceManager.SetPods(pods)
s.reconcile(ctx)
opts.ResourceVersion = pods.ResourceVersion
var controller cache.Controller
_, controller = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if controller != nil {
opts.ResourceVersion = controller.LastSyncResourceVersion()
}
return s.k8sClient.Core().Pods(s.namespace).List(opts)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if controller != nil {
opts.ResourceVersion = controller.LastSyncResourceVersion()
}
return s.k8sClient.Core().Pods(s.namespace).Watch(opts)
},
},
&corev1.Pod{},
time.Minute,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
s.onAddPod(ctx, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
s.onUpdatePod(ctx, newObj)
},
DeleteFunc: func(obj interface{}) {
s.onDeletePod(ctx, obj)
},
},
)
for i := 0; i < s.podSyncWorkers; i++ {
go s.startPodSynchronizer(ctx, i)
}
log.G(ctx).Info("Start to run pod cache controller.")
controller.Run(ctx.Done())
return ctx.Err()
}

377
vkubelet/podcontroller.go Normal file
View File

@@ -0,0 +1,377 @@
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package vkubelet
import (
"context"
"fmt"
"reflect"
"strconv"
"sync"
"time"
"github.com/cpuguy83/strongerrors/status/ocstatus"
pkgerrors "github.com/pkg/errors"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"github.com/virtual-kubelet/virtual-kubelet/log"
)
const (
// maxRetries is the number of times we try to process a given key before permanently forgetting it.
maxRetries = 20
)
// PodController is the controller implementation for Pod resources.
type PodController struct {
// server is the instance to which this controller belongs.
server *Server
// podsInformer is an informer for Pod resources.
podsInformer v1.PodInformer
// podsLister is able to list/get Pod resources from a shared informer's store.
podsLister corev1listers.PodLister
// workqueue is a rate limited work queue.
// This is used to queue work to be processed instead of performing it as soon as a change happens.
// This means we can ensure we only process a fixed amount of resources at a time, and makes it easy to ensure we are never processing the same item simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the Kubernetes API.
recorder record.EventRecorder
}
// NewPodController returns a new instance of PodController.
func NewPodController(server *Server) *PodController {
// Create an event broadcaster.
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(log.L.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: server.k8sClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: fmt.Sprintf("%s/pod-controller", server.nodeName)})
// Create an instance of PodController having a work queue that uses the rate limiter created above.
pc := &PodController{
server: server,
podsInformer: server.podInformer,
podsLister: server.podInformer.Lister(),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"),
recorder: recorder,
}
// Set up event handlers for when Pod resources change.
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(pod interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err)
} else {
pc.workqueue.AddRateLimited(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Create a copy of the old and new pod objects so we don't mutate the cache.
oldPod := oldObj.(*corev1.Pod).DeepCopy()
newPod := newObj.(*corev1.Pod).DeepCopy()
// We want to check if the two objects differ in anything other than their resource versions.
// Hence, we make them equal so that this change isn't picked up by reflect.DeepEqual.
newPod.ResourceVersion = oldPod.ResourceVersion
// Skip adding this pod's key to the work queue if its .metadata (except .metadata.resourceVersion) and .spec fields haven't changed.
// This guarantees that we don't attempt to sync the pod every time its .status field is updated.
if reflect.DeepEqual(oldPod.ObjectMeta, newPod.ObjectMeta) && reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
return
}
// At this point we know that something in .metadata or .spec has changed, so we must proceed to sync the pod.
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.L.Error(err)
} else {
pc.workqueue.AddRateLimited(key)
}
},
DeleteFunc: func(pod interface{}) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err)
} else {
pc.workqueue.AddRateLimited(key)
}
},
})
// Return the instance of PodController back to the caller.
return pc
}
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
// It will block until stopCh is closed, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
func (pc *PodController) Run(ctx context.Context, threadiness int) error {
defer pc.workqueue.ShutDown()
// Wait for the caches to be synced before starting workers.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
return pkgerrors.New("failed to wait for caches to sync")
}
// Perform a reconciliation step that deletes any dangling pods from the provider.
// This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis.
// If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried.
pc.deleteDanglingPods(ctx, threadiness)
// Launch "threadiness" workers to process Pod resources.
log.G(ctx).Info("starting workers")
for id := 0; id < threadiness; id++ {
go wait.Until(func() {
// Use the worker's "index" as its ID so we can use it for tracing.
pc.runWorker(ctx, strconv.Itoa(id))
}, time.Second, ctx.Done())
}
log.G(ctx).Info("started workers")
<-ctx.Done()
log.G(ctx).Info("shutting down workers")
return nil
}
// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue.
func (pc *PodController) runWorker(ctx context.Context, workerId string) {
for pc.processNextWorkItem(ctx, workerId) {
}
}
// processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler.
func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string) bool {
obj, shutdown := pc.workqueue.Get()
if shutdown {
return false
}
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processNextWorkItem")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
span.AddAttributes(trace.StringAttribute("workerId", workerId))
// We wrap this block in a func so we can defer pc.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the work queue knows we have finished processing this item.
// We also must remember to call Forget if we do not want this work item being re-queued.
// For example, we do not call Forget if a transient error occurs.
// Instead, the item is put back on the work queue and attempted again after a back-off period.
defer pc.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the work queue.
// These are of the form namespace/name.
// We do this as the delayed nature of the work queue means the items in the informer cache may actually be more up to date that when the item was initially put onto the workqueue.
if key, ok = obj.(string); !ok {
// As the item in the work queue is actually invalid, we call Forget here else we'd go into a loop of attempting to process a work item that is invalid.
pc.workqueue.Forget(obj)
log.G(ctx).Warnf("expected string in work queue but got %#v", obj)
return nil
}
// Add the current key as an attribute to the current span.
span.AddAttributes(trace.StringAttribute("key", key))
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
if err := pc.syncHandler(ctx, key); err != nil {
if pc.workqueue.NumRequeues(key) < maxRetries {
// Put the item back on the work queue to handle any transient errors.
log.G(ctx).Warnf("requeuing %q due to failed sync", key)
pc.workqueue.AddRateLimited(key)
return nil
}
// We've exceeded the maximum retries, so we must forget the key.
pc.workqueue.Forget(key)
return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key)
}
// Finally, if no error occurs we Forget this item so it does not get queued again until another change happens.
pc.workqueue.Forget(obj)
return nil
}(obj)
if err != nil {
// We've actually hit an error, so we set the span's status based on the error.
span.SetStatus(ocstatus.FromError(err))
log.G(ctx).Error(err)
return true
}
return true
}
// syncHandler compares the actual state with the desired, and attempts to converge the two.
func (pc *PodController) syncHandler(ctx context.Context, key string) error {
ctx, span := trace.StartSpan(ctx, "syncHandler")
defer span.End()
// Add the current key as an attribute to the current span.
span.AddAttributes(trace.StringAttribute("key", key))
// Convert the namespace/name string into a distinct namespace and name.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
// Log the error as a warning, but do not requeue the key as it is invalid.
log.G(ctx).Warn(pkgerrors.Wrapf(err, "invalid resource key: %q", key))
return nil
}
// Get the Pod resource with this namespace/name.
pod, err := pc.podsLister.Pods(namespace).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
// We've failed to fetch the pod from the lister, but the error is not a 404.
// Hence, we add the key back to the work queue so we can retry processing it later.
err := pkgerrors.Wrapf(err, "failed to fetch pod with key %q from lister", key)
span.SetStatus(ocstatus.FromError(err))
return err
}
// At this point we know the Pod resource doesn't exist, which most probably means it was deleted.
// Hence, we must delete it from the provider if it still exists there.
if err := pc.server.deletePod(ctx, namespace, name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
span.SetStatus(ocstatus.FromError(err))
return err
}
return nil
}
// At this point we know the Pod resource has either been created or updated (which includes being marked for deletion).
return pc.syncPodInProvider(ctx, pod)
}
// syncPodInProvider tries and reconciles the state of a pod by comparing its Kubernetes representation and the provider's representation.
func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "syncPodInProvider")
defer span.End()
// Add the pod's attributes to the current span.
addPodAttributes(span, pod)
// Check whether the pod has been marked for deletion.
// If it does, guarantee it is deleted in the provider and Kubernetes.
if pod.DeletionTimestamp != nil {
if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
span.SetStatus(ocstatus.FromError(err))
return err
}
return nil
}
// Ignore the pod if it is in the "Failed" or "Succeeded" state.
if pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded {
log.G(ctx).Warnf("skipping sync of pod %q in %q phase", loggablePodName(pod), pod.Status.Phase)
return nil
}
// Create or update the pod in the provider.
if err := pc.server.createOrUpdatePod(ctx, pod); err != nil {
err := pkgerrors.Wrapf(err, "failed to sync pod %q in the provider", loggablePodName(pod))
span.SetStatus(ocstatus.FromError(err))
return err
}
return nil
}
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")
defer span.End()
// Grab the list of pods known to the provider.
pps, err := pc.server.provider.GetPods(ctx)
if err != nil {
err := pkgerrors.Wrap(err, "failed to fetch the list of pods from the provider")
span.SetStatus(ocstatus.FromError(err))
log.G(ctx).Error(err)
return
}
// Create a slice to hold the pods we will be deleting from the provider.
ptd := make([]*corev1.Pod, 0)
// Iterate over the pods known to the provider, marking for deletion those that don't exist in Kubernetes.
// Take on this opportunity to populate the list of key that correspond to pods known to the provider.
for _, pp := range pps {
if _, err := pc.podsLister.Pods(pp.Namespace).Get(pp.Name); err != nil {
if errors.IsNotFound(err) {
// The current pod does not exist in Kubernetes, so we mark it for deletion.
ptd = append(ptd, pp)
continue
}
// For some reason we couldn't fetch the pod from the lister, so we propagate the error.
err := pkgerrors.Wrap(err, "failed to fetch pod from the lister")
span.SetStatus(ocstatus.FromError(err))
log.G(ctx).Error(err)
return
}
}
// We delete each pod in its own goroutine, allowing a maximum of "threadiness" concurrent deletions.
semaphore := make(chan struct{}, threadiness)
var wg sync.WaitGroup
wg.Add(len(ptd))
// Iterate over the slice of pods to be deleted and delete them in the provider.
for _, pod := range ptd {
go func(ctx context.Context, pod *corev1.Pod) {
defer wg.Done()
ctx, span := trace.StartSpan(ctx, "deleteDanglingPod")
defer span.End()
semaphore <- struct{}{}
defer func() {
<-semaphore
}()
// Add the pod's attributes to the current span.
addPodAttributes(span, pod)
// Actually delete the pod.
if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
span.SetStatus(ocstatus.FromError(err))
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
} else {
log.G(ctx).Infof("deleted leaked pod %q in provider", loggablePodName(pod))
}
}(ctx, pod)
}
// Wait for all pods to be deleted.
wg.Wait()
return
}
// loggablePodName returns the "namespace/name" key for the specified pod.
// If the key cannot be computed, "(unknown)" is returned.
// This method is meant to be used for logging purposes only.
func loggablePodName(pod *corev1.Pod) string {
k, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
return "(unknown)"
}
return k
}
// loggablePodNameFromCoordinates returns the "namespace/name" key for the pod identified by the specified namespace and name (coordinates).
func loggablePodNameFromCoordinates(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

View File

@@ -6,12 +6,14 @@ import (
"time"
pkgerrors "github.com/pkg/errors"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/providers"
"go.opencensus.io/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
const (
@@ -28,6 +30,7 @@ type Server struct {
resourceManager *manager.ResourceManager
podSyncWorkers int
podCh chan *podNotification
podInformer corev1informers.PodInformer
}
// Config is used to configure a new server.
@@ -41,6 +44,7 @@ type Config struct {
ResourceManager *manager.ResourceManager
Taint *corev1.Taint
PodSyncWorkers int
PodInformer corev1informers.PodInformer
}
// APIConfig is used to configure the API server of the virtual kubelet.
@@ -66,6 +70,7 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
provider: cfg.Provider,
podSyncWorkers: cfg.PodSyncWorkers,
podCh: make(chan *podNotification, cfg.PodSyncWorkers),
podInformer: cfg.PodInformer,
}
ctx = log.WithLogger(ctx, log.G(ctx))
@@ -120,127 +125,7 @@ func New(ctx context.Context, cfg Config) (s *Server, retErr error) {
return s, nil
}
// Run starts the server, registers it with Kubernetes and begins watching/reconciling the cluster.
// Run will block until Stop is called or a SIGINT or SIGTERM signal is received.
// Run creates and starts an instance of the pod controller, blocking until it stops.
func (s *Server) Run(ctx context.Context) error {
if err := s.watchForPodEvent(ctx); err != nil {
if pkgerrors.Cause(err) == context.Canceled {
return err
}
log.G(ctx).Error(err)
}
return nil
}
// reconcile is the main reconciliation loop that compares differences between Kubernetes and
// the active provider and reconciles the differences.
func (s *Server) reconcile(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "reconcile")
defer span.End()
logger := log.G(ctx)
logger.Debug("Start reconcile")
defer logger.Debug("End reconcile")
providerPods, err := s.provider.GetPods(ctx)
if err != nil {
logger.WithError(err).Error("Error getting pod list from provider")
return
}
var deletePods []*corev1.Pod
for _, pod := range providerPods {
// Delete pods that don't exist in Kubernetes
if p := s.resourceManager.GetPod(pod.Namespace, pod.Name); p == nil || p.DeletionTimestamp != nil {
deletePods = append(deletePods, pod)
}
}
span.Annotate(nil, "Got provider pods")
var failedDeleteCount int64
for _, pod := range deletePods {
logger := logger.WithField("pod", pod.GetName()).WithField("namespace", pod.GetNamespace())
logger.Debug("Deleting pod")
if err := s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
failedDeleteCount++
continue
}
}
span.Annotate(
[]trace.Attribute{
trace.Int64Attribute("expected_delete_pods_count", int64(len(deletePods))),
trace.Int64Attribute("failed_delete_pods_count", failedDeleteCount),
},
"Cleaned up stale provider pods",
)
pods := s.resourceManager.GetPods()
var createPods []*corev1.Pod
cleanupPods := deletePods[:0]
for _, pod := range pods {
var providerPod *corev1.Pod
for _, p := range providerPods {
if p.Namespace == pod.Namespace && p.Name == pod.Name {
providerPod = p
break
}
}
// Delete pod if DeletionTimestamp is set
if pod.DeletionTimestamp != nil {
cleanupPods = append(cleanupPods, pod)
continue
}
if providerPod == nil &&
pod.DeletionTimestamp == nil &&
pod.Status.Phase != corev1.PodSucceeded &&
pod.Status.Phase != corev1.PodFailed &&
pod.Status.Reason != podStatusReasonProviderFailed {
createPods = append(createPods, pod)
}
}
var failedCreateCount int64
for _, pod := range createPods {
logger := logger.WithField("pod", pod.Name)
logger.Debug("Creating pod")
if err := s.createPod(ctx, pod); err != nil {
failedCreateCount++
logger.WithError(err).Error("Error creating pod")
continue
}
}
span.Annotate(
[]trace.Attribute{
trace.Int64Attribute("expected_created_pods", int64(len(createPods))),
trace.Int64Attribute("failed_pod_creates", failedCreateCount),
},
"Created pods in provider",
)
var failedCleanupCount int64
for _, pod := range cleanupPods {
logger := logger.WithField("pod", pod.Name)
log.Trace(logger, "Pod pending deletion")
var err error
if err = s.deletePod(ctx, pod); err != nil {
logger.WithError(err).Error("Error deleting pod")
failedCleanupCount++
continue
}
log.Trace(logger, "Pod deletion complete")
}
span.Annotate(
[]trace.Attribute{
trace.Int64Attribute("expected_cleaned_up_pods", int64(len(cleanupPods))),
trace.Int64Attribute("cleaned_up_pod_failures", failedCleanupCount),
},
"Cleaned up provider pods marked for deletion",
)
return NewPodController(s).Run(ctx, s.podSyncWorkers)
}