Remove Server object (#629)

This had some weird shared responsibility with the PodController.
Instead just move the functionality to the PodController.
This commit is contained in:
Brian Goff
2019-06-01 09:36:38 -07:00
committed by GitHub
parent 21d8ba2206
commit 71546a908f
6 changed files with 226 additions and 267 deletions

View File

@@ -17,6 +17,7 @@ package root
import ( import (
"context" "context"
"os" "os"
"path"
"time" "time"
"github.com/cpuguy83/strongerrors" "github.com/cpuguy83/strongerrors"
@@ -32,8 +33,11 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers" kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
) )
// NewCommand creates a new top-level command. // NewCommand creates a new top-level command.
@@ -88,7 +92,6 @@ func runRootCommand(ctx context.Context, c Opts) error {
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String() options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", c.NodeName).String()
})) }))
// Create a pod informer so we can pass its lister to the resource manager.
podInformer := podInformerFactory.Core().V1().Pods() podInformer := podInformerFactory.Core().V1().Pods()
// Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors). // Create another shared informer factory for Kubernetes secrets and configmaps (not subject to any selectors).
@@ -148,15 +151,20 @@ func runRootCommand(ctx context.Context, c Opts) error {
log.G(ctx).Fatal(err) log.G(ctx).Fatal(err)
} }
vk := vkubelet.New(vkubelet.Config{ eb := record.NewBroadcaster()
Client: client, eb.StartLogging(log.G(ctx).Infof)
Namespace: c.KubeNamespace, eb.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: client.CoreV1().Events(c.KubeNamespace)})
NodeName: pNode.Name,
pc, err := vkubelet.NewPodController(vkubelet.PodControllerConfig{
PodClient: client.CoreV1(),
PodInformer: podInformer,
EventRecorder: eb.NewRecorder(scheme.Scheme, corev1.EventSource{Component: path.Join(pNode.Name, "pod-controller")}),
Provider: p, Provider: p,
ResourceManager: rm, ResourceManager: rm,
PodSyncWorkers: c.PodSyncWorkers,
PodInformer: podInformer,
}) })
if err != nil {
return errors.Wrap(err, "error setting up pod controller")
}
cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig) cancelHTTP, err := setupHTTPServer(ctx, p, apiConfig)
if err != nil { if err != nil {
@@ -165,7 +173,7 @@ func runRootCommand(ctx context.Context, c Opts) error {
defer cancelHTTP() defer cancelHTTP()
go func() { go func() {
if err := vk.Run(ctx); err != nil && errors.Cause(err) != context.Canceled { if err := pc.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled {
log.G(ctx).Fatal(err) log.G(ctx).Fatal(err)
} }
}() }()
@@ -174,7 +182,7 @@ func runRootCommand(ctx context.Context, c Opts) error {
// If there is a startup timeout, it does two things: // If there is a startup timeout, it does two things:
// 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period // 1. It causes the VK to shutdown if we haven't gotten into an operational state in a time period
// 2. It prevents node advertisement from happening until we're in an operational state // 2. It prevents node advertisement from happening until we're in an operational state
err = waitForVK(ctx, c.StartupTimeout, vk) err = waitFor(ctx, c.StartupTimeout, pc.Ready())
if err != nil { if err != nil {
return err return err
} }
@@ -192,7 +200,7 @@ func runRootCommand(ctx context.Context, c Opts) error {
return nil return nil
} }
func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) error { func waitFor(ctx context.Context, time time.Duration, ready <-chan struct{}) error {
ctx, cancel := context.WithTimeout(ctx, time) ctx, cancel := context.WithTimeout(ctx, time)
defer cancel() defer cancel()
@@ -200,7 +208,7 @@ func waitForVK(ctx context.Context, time time.Duration, vk *vkubelet.Server) err
log.G(ctx).Info("Waiting for pod controller / VK to be ready") log.G(ctx).Info("Waiting for pod controller / VK to be ready")
select { select {
case <-vk.Ready(): case <-ready:
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return errors.Wrap(ctx.Err(), "Error while starting up VK") return errors.Wrap(ctx.Err(), "Error while starting up VK")

View File

@@ -15,10 +15,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
const (
podStatusReasonProviderFailed = "ProviderFailed"
)
func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context { func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) context.Context {
return span.WithFields(ctx, log.Fields{ return span.WithFields(ctx, log.Fields{
"uid": string(pod.GetUID()), "uid": string(pod.GetUID()),
@@ -29,7 +32,7 @@ func addPodAttributes(ctx context.Context, span trace.Span, pod *corev1.Pod) con
}) })
} }
func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorder record.EventRecorder) error { func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod) error {
ctx, span := trace.StartSpan(ctx, "createOrUpdatePod") ctx, span := trace.StartSpan(ctx, "createOrUpdatePod")
defer span.End() defer span.End()
@@ -40,7 +43,7 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde
"namespace": pod.GetNamespace(), "namespace": pod.GetNamespace(),
}) })
if err := populateEnvironmentVariables(ctx, pod, s.resourceManager, recorder); err != nil { if err := populateEnvironmentVariables(ctx, pod, pc.resourceManager, pc.recorder); err != nil {
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
return err return err
} }
@@ -48,7 +51,7 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde
// Check if the pod is already known by the provider. // Check if the pod is already known by the provider.
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
if pp, _ := s.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil { if pp, _ := pc.provider.GetPod(ctx, pod.Namespace, pod.Name); pp != nil {
// Pod Update Only Permits update of: // Pod Update Only Permits update of:
// - `spec.containers[*].image` // - `spec.containers[*].image`
// - `spec.initContainers[*].image` // - `spec.initContainers[*].image`
@@ -58,15 +61,15 @@ func (s *Server) createOrUpdatePod(ctx context.Context, pod *corev1.Pod, recorde
expected := hashPodSpec(pp.Spec) expected := hashPodSpec(pp.Spec)
if actual := hashPodSpec(pod.Spec); actual != expected { if actual := hashPodSpec(pod.Spec); actual != expected {
log.G(ctx).Debugf("Pod %s exists, updating pod in provider", pp.Name) log.G(ctx).Debugf("Pod %s exists, updating pod in provider", pp.Name)
if origErr := s.provider.UpdatePod(ctx, pod); origErr != nil { if origErr := pc.provider.UpdatePod(ctx, pod); origErr != nil {
s.handleProviderError(ctx, span, origErr, pod) pc.handleProviderError(ctx, span, origErr, pod)
return origErr return origErr
} }
log.G(ctx).Info("Updated pod in provider") log.G(ctx).Info("Updated pod in provider")
} }
} else { } else {
if origErr := s.provider.CreatePod(ctx, pod); origErr != nil { if origErr := pc.provider.CreatePod(ctx, pod); origErr != nil {
s.handleProviderError(ctx, span, origErr, pod) pc.handleProviderError(ctx, span, origErr, pod)
return origErr return origErr
} }
log.G(ctx).Info("Created pod in provider") log.G(ctx).Info("Created pod in provider")
@@ -88,7 +91,7 @@ func hashPodSpec(spec corev1.PodSpec) uint64 {
return uint64(hash.Sum32()) return uint64(hash.Sum32())
} }
func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) { func (pc *PodController) handleProviderError(ctx context.Context, span trace.Span, origErr error, pod *corev1.Pod) {
podPhase := corev1.PodPending podPhase := corev1.PodPending
if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { if pod.Spec.RestartPolicy == corev1.RestartPolicyNever {
podPhase = corev1.PodFailed podPhase = corev1.PodFailed
@@ -104,7 +107,7 @@ func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origE
"reason": pod.Status.Reason, "reason": pod.Status.Reason,
}) })
_, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod)
if err != nil { if err != nil {
logger.WithError(err).Warn("Failed to update pod status") logger.WithError(err).Warn("Failed to update pod status")
} else { } else {
@@ -113,14 +116,14 @@ func (s *Server) handleProviderError(ctx context.Context, span trace.Span, origE
span.SetStatus(ocstatus.FromError(origErr)) span.SetStatus(ocstatus.FromError(origErr))
} }
func (s *Server) deletePod(ctx context.Context, namespace, name string) error { func (pc *PodController) deletePod(ctx context.Context, namespace, name string) error {
// Grab the pod as known by the provider. // Grab the pod as known by the provider.
// NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't. // NOTE: Some providers return a non-nil error in their GetPod implementation when the pod is not found while some other don't.
// Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod). // Hence, we ignore the error and just act upon the pod if it is non-nil (meaning that the provider still knows about the pod).
pod, _ := s.provider.GetPod(ctx, namespace, name) pod, _ := pc.provider.GetPod(ctx, namespace, name)
if pod == nil { if pod == nil {
// The provider is not aware of the pod, but we must still delete the Kubernetes API resource. // The provider is not aware of the pod, but we must still delete the Kubernetes API resource.
return s.forceDeletePodResource(ctx, namespace, name) return pc.forceDeletePodResource(ctx, namespace, name)
} }
ctx, span := trace.StartSpan(ctx, "deletePod") ctx, span := trace.StartSpan(ctx, "deletePod")
@@ -128,7 +131,7 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error {
ctx = addPodAttributes(ctx, span, pod) ctx = addPodAttributes(ctx, span, pod)
var delErr error var delErr error
if delErr = s.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) { if delErr = pc.provider.DeletePod(ctx, pod); delErr != nil && errors.IsNotFound(delErr) {
span.SetStatus(ocstatus.FromError(delErr)) span.SetStatus(ocstatus.FromError(delErr))
return delErr return delErr
} }
@@ -136,7 +139,7 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error {
log.G(ctx).Debug("Deleted pod from provider") log.G(ctx).Debug("Deleted pod from provider")
if !errors.IsNotFound(delErr) { if !errors.IsNotFound(delErr) {
if err := s.forceDeletePodResource(ctx, namespace, name); err != nil { if err := pc.forceDeletePodResource(ctx, namespace, name); err != nil {
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
return err return err
} }
@@ -146,7 +149,7 @@ func (s *Server) deletePod(ctx context.Context, namespace, name string) error {
return nil return nil
} }
func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name string) error { func (pc *PodController) forceDeletePodResource(ctx context.Context, namespace, name string) error {
ctx, span := trace.StartSpan(ctx, "forceDeletePodResource") ctx, span := trace.StartSpan(ctx, "forceDeletePodResource")
defer span.End() defer span.End()
ctx = span.WithFields(ctx, log.Fields{ ctx = span.WithFields(ctx, log.Fields{
@@ -155,7 +158,7 @@ func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name str
}) })
var grace int64 var grace int64
if err := s.k8sClient.CoreV1().Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil { if err := pc.client.Pods(namespace).Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &grace}); err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete") log.G(ctx).Debug("Pod does not exist in Kubernetes, nothing to delete")
return nil return nil
@@ -167,12 +170,12 @@ func (s *Server) forceDeletePodResource(ctx context.Context, namespace, name str
} }
// updatePodStatuses syncs the providers pod status with the kubernetes pod status. // updatePodStatuses syncs the providers pod status with the kubernetes pod status.
func (s *Server) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) { func (pc *PodController) updatePodStatuses(ctx context.Context, q workqueue.RateLimitingInterface) {
ctx, span := trace.StartSpan(ctx, "updatePodStatuses") ctx, span := trace.StartSpan(ctx, "updatePodStatuses")
defer span.End() defer span.End()
// Update all the pods with the provider status. // Update all the pods with the provider status.
pods, err := s.podInformer.Lister().List(labels.Everything()) pods, err := pc.podsLister.List(labels.Everything())
if err != nil { if err != nil {
err = pkgerrors.Wrap(err, "error getting pod list") err = pkgerrors.Wrap(err, "error getting pod list")
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
@@ -183,7 +186,7 @@ func (s *Server) updatePodStatuses(ctx context.Context, q workqueue.RateLimiting
for _, pod := range pods { for _, pod := range pods {
if !shouldSkipPodStatusUpdate(pod) { if !shouldSkipPodStatusUpdate(pod) {
s.enqueuePodStatusUpdate(ctx, q, pod) enqueuePodStatusUpdate(ctx, q, pod)
} }
} }
} }
@@ -194,7 +197,7 @@ func shouldSkipPodStatusUpdate(pod *corev1.Pod) bool {
pod.Status.Reason == podStatusReasonProviderFailed pod.Status.Reason == podStatusReasonProviderFailed
} }
func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error { func (pc *PodController) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
if shouldSkipPodStatusUpdate(pod) { if shouldSkipPodStatusUpdate(pod) {
return nil return nil
} }
@@ -203,7 +206,7 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
defer span.End() defer span.End()
ctx = addPodAttributes(ctx, span, pod) ctx = addPodAttributes(ctx, span, pod)
status, err := s.provider.GetPodStatus(ctx, pod.Namespace, pod.Name) status, err := pc.provider.GetPodStatus(ctx, pod.Namespace, pod.Name)
if err != nil { if err != nil {
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
return pkgerrors.Wrap(err, "error retreiving pod status") return pkgerrors.Wrap(err, "error retreiving pod status")
@@ -234,7 +237,7 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
} }
} }
if _, err := s.k8sClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil { if _, err := pc.client.Pods(pod.Namespace).UpdateStatus(pod); err != nil {
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
return pkgerrors.Wrap(err, "error while updating pod status in kubernetes") return pkgerrors.Wrap(err, "error while updating pod status in kubernetes")
} }
@@ -247,7 +250,7 @@ func (s *Server) updatePodStatus(ctx context.Context, pod *corev1.Pod) error {
return nil return nil
} }
func (s *Server) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { func enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key") log.G(ctx).WithError(err).WithField("method", "enqueuePodStatusUpdate").Error("Error getting pod meta namespace key")
} else { } else {
@@ -255,24 +258,28 @@ func (s *Server) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLim
} }
} }
func (s *Server) podStatusHandler(ctx context.Context, key string) (retErr error) { func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "podStatusHandler") ctx, span := trace.StartSpan(ctx, "podStatusHandler")
defer span.End() defer span.End()
defer func() {
span.SetStatus(ocstatus.FromError(retErr))
}()
ctx = span.WithField(ctx, "key", key) ctx = span.WithField(ctx, "key", key)
log.G(ctx).Debug("processing pod status update")
defer func() {
span.SetStatus(ocstatus.FromError(retErr))
if retErr != nil {
log.G(ctx).WithError(retErr).Error("Error processing pod status update")
}
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
return pkgerrors.Wrap(err, "error spliting cache key") return pkgerrors.Wrap(err, "error spliting cache key")
} }
pod, err := s.podInformer.Lister().Pods(namespace).Get(name) pod, err := pc.podsLister.Pods(namespace).Get(name)
if err != nil { if err != nil {
return pkgerrors.Wrap(err, "error looking up pod") return pkgerrors.Wrap(err, "error looking up pod")
} }
return s.updatePodStatus(ctx, pod) return pc.updatePodStatus(ctx, pod)
} }

View File

@@ -64,8 +64,8 @@ func (m *mockProvider) GetPods(_ context.Context) ([]*corev1.Pod, error) {
return ls, nil return ls, nil
} }
type TestServer struct { type TestController struct {
*Server *PodController
mock *mockProvider mock *mockProvider
client *fake.Clientset client *fake.Clientset
} }
@@ -74,24 +74,22 @@ func newMockProvider() *mockProvider {
return &mockProvider{pods: make(map[string]*corev1.Pod)} return &mockProvider{pods: make(map[string]*corev1.Pod)}
} }
func newTestServer() *TestServer { func newTestController() *TestController {
fk8s := fake.NewSimpleClientset() fk8s := fake.NewSimpleClientset()
rm := testutil.FakeResourceManager() rm := testutil.FakeResourceManager()
p := newMockProvider() p := newMockProvider()
tsvr := &TestServer{ return &TestController{
Server: &Server{ PodController: &PodController{
namespace: "default", client: fk8s.CoreV1(),
nodeName: "vk123",
provider: p, provider: p,
resourceManager: rm, resourceManager: rm,
k8sClient: fk8s, recorder: testutil.FakeEventRecorder(5),
}, },
mock: p, mock: p,
client: fk8s, client: fk8s,
} }
return tsvr
} }
func TestPodHashingEqual(t *testing.T) { func TestPodHashingEqual(t *testing.T) {
@@ -169,7 +167,7 @@ func TestPodHashingDifferent(t *testing.T) {
} }
func TestPodCreateNewPod(t *testing.T) { func TestPodCreateNewPod(t *testing.T) {
svr := newTestServer() svr := newTestController()
pod := &corev1.Pod{} pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default" pod.ObjectMeta.Namespace = "default"
@@ -189,8 +187,8 @@ func TestPodCreateNewPod(t *testing.T) {
}, },
} }
er := testutil.FakeEventRecorder(5) err := svr.createOrUpdatePod(context.Background(), pod)
err := svr.createOrUpdatePod(context.Background(), pod, er)
assert.Check(t, is.Nil(err)) assert.Check(t, is.Nil(err))
// createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist // createOrUpdate called CreatePod but did not call UpdatePod because the pod did not exist
assert.Check(t, is.Equal(svr.mock.creates, 1)) assert.Check(t, is.Equal(svr.mock.creates, 1))
@@ -198,7 +196,7 @@ func TestPodCreateNewPod(t *testing.T) {
} }
func TestPodUpdateExisting(t *testing.T) { func TestPodUpdateExisting(t *testing.T) {
svr := newTestServer() svr := newTestController()
pod := &corev1.Pod{} pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default" pod.ObjectMeta.Namespace = "default"
@@ -241,8 +239,7 @@ func TestPodUpdateExisting(t *testing.T) {
}, },
} }
er := testutil.FakeEventRecorder(5) err = svr.createOrUpdatePod(context.Background(), pod2)
err = svr.createOrUpdatePod(context.Background(), pod2, er)
assert.Check(t, is.Nil(err)) assert.Check(t, is.Nil(err))
// createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed // createOrUpdate didn't call CreatePod but did call UpdatePod because the spec changed
@@ -251,7 +248,7 @@ func TestPodUpdateExisting(t *testing.T) {
} }
func TestPodNoSpecChange(t *testing.T) { func TestPodNoSpecChange(t *testing.T) {
svr := newTestServer() svr := newTestController()
pod := &corev1.Pod{} pod := &corev1.Pod{}
pod.ObjectMeta.Namespace = "default" pod.ObjectMeta.Namespace = "default"
@@ -276,8 +273,7 @@ func TestPodNoSpecChange(t *testing.T) {
assert.Check(t, is.Equal(svr.mock.creates, 1)) assert.Check(t, is.Equal(svr.mock.creates, 1))
assert.Check(t, is.Equal(svr.mock.updates, 0)) assert.Check(t, is.Equal(svr.mock.updates, 0))
er := testutil.FakeEventRecorder(5) err = svr.createOrUpdatePod(context.Background(), pod)
err = svr.createOrUpdatePod(context.Background(), pod, er)
assert.Check(t, is.Nil(err)) assert.Check(t, is.Nil(err))
// createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change // createOrUpdate didn't call CreatePod or UpdatePod, spec didn't change

View File

@@ -22,21 +22,21 @@ import (
"sync" "sync"
"time" "time"
"github.com/cpuguy83/strongerrors"
"github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/cpuguy83/strongerrors/status/ocstatus"
pkgerrors "github.com/pkg/errors" pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/trace" "github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
v1 "k8s.io/client-go/informers/core/v1" corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1" corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"github.com/virtual-kubelet/virtual-kubelet/log"
) )
// PodLifecycleHandler defines the interface used by the PodController to react // PodLifecycleHandler defines the interface used by the PodController to react
@@ -74,42 +74,75 @@ type PodNotifier interface {
// PodController is the controller implementation for Pod resources. // PodController is the controller implementation for Pod resources.
type PodController struct { type PodController struct {
// server is the instance to which this controller belongs. provider PodLifecycleHandler
server *Server
// podsInformer is an informer for Pod resources. // podsInformer is an informer for Pod resources.
podsInformer v1.PodInformer podsInformer corev1informers.PodInformer
// podsLister is able to list/get Pod resources from a shared informer's store. // podsLister is able to list/get Pod resources from a shared informer's store.
podsLister corev1listers.PodLister podsLister corev1listers.PodLister
// workqueue is a rate limited work queue.
// This is used to queue work to be processed instead of performing it as soon as a change happens.
// This means we can ensure we only process a fixed amount of resources at a time, and makes it easy to ensure we are never processing the same item simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the Kubernetes API. // recorder is an event recorder for recording Event resources to the Kubernetes API.
recorder record.EventRecorder recorder record.EventRecorder
// inSync is a channel which will be closed once the pod controller has become in-sync with apiserver // ready is a channel which will be closed once the pod controller is fully up and running.
// it will never close if startup fails, or if the run context is cancelled prior to initialization completing // this channel will never be closed if there is an error on startup.
inSyncCh chan struct{} ready chan struct{}
client corev1client.PodsGetter
resourceManager *manager.ResourceManager // TODO: can we eliminate this?
} }
// NewPodController returns a new instance of PodController. // PodControllerConfig is used to configure a new PodController.
func NewPodController(server *Server) *PodController { type PodControllerConfig struct {
// Create an event broadcaster. // PodClient is used to perform actions on the k8s API, such as updating pod status
eventBroadcaster := record.NewBroadcaster() // This field is required
eventBroadcaster.StartLogging(log.L.Infof) PodClient corev1client.PodsGetter
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: server.k8sClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: fmt.Sprintf("%s/pod-controller", server.nodeName)})
// Create an instance of PodController having a work queue that uses the rate limiter created above. // PodInformer is used as a local cache for pods
pc := &PodController{ // This should be configured to only look at pods scheduled to the node which the controller will be managing
server: server, PodInformer corev1informers.PodInformer
podsInformer: server.podInformer,
podsLister: server.podInformer.Lister(), EventRecorder record.EventRecorder
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pods"),
recorder: recorder, Provider PodLifecycleHandler
inSyncCh: make(chan struct{}),
// TODO: get rid of this
ResourceManager *manager.ResourceManager
}
func NewPodController(cfg PodControllerConfig) (*PodController, error) {
if cfg.PodClient == nil {
return nil, strongerrors.InvalidArgument(pkgerrors.New("must set core client"))
} }
if cfg.EventRecorder == nil {
return nil, strongerrors.InvalidArgument(pkgerrors.New("must set event recorder"))
}
if cfg.PodInformer == nil {
return nil, strongerrors.InvalidArgument(pkgerrors.New("must set informer"))
}
return &PodController{
client: cfg.PodClient,
podsInformer: cfg.PodInformer,
podsLister: cfg.PodInformer.Lister(),
provider: cfg.Provider,
resourceManager: cfg.ResourceManager,
ready: make(chan struct{}),
recorder: cfg.EventRecorder,
}, nil
}
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
// It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) error {
k8sQ := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodsFromKubernetes")
defer k8sQ.ShutDown()
podStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncPodStatusFromProvider")
pc.runProviderSyncWorkers(ctx, podStatusQueue, podSyncWorkers)
pc.runSyncFromProvider(ctx, podStatusQueue)
defer podStatusQueue.ShutDown()
// Set up event handlers for when Pod resources change. // Set up event handlers for when Pod resources change.
pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ pc.podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -117,7 +150,7 @@ func NewPodController(server *Server) *PodController {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err) log.L.Error(err)
} else { } else {
pc.workqueue.AddRateLimited(key) k8sQ.AddRateLimited(key)
} }
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
@@ -136,49 +169,39 @@ func NewPodController(server *Server) *PodController {
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.L.Error(err) log.L.Error(err)
} else { } else {
pc.workqueue.AddRateLimited(key) k8sQ.AddRateLimited(key)
} }
}, },
DeleteFunc: func(pod interface{}) { DeleteFunc: func(pod interface{}) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
log.L.Error(err) log.L.Error(err)
} else { } else {
pc.workqueue.AddRateLimited(key) k8sQ.AddRateLimited(key)
} }
}, },
}) })
// Return the instance of PodController back to the caller. // Wait for the caches to be synced *before* starting workers.
return pc
}
// Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers.
// It will block until stopCh is closed, at which point it will shutdown the work queue and wait for workers to finish processing their current work items.
func (pc *PodController) Run(ctx context.Context, threadiness int) error {
defer pc.workqueue.ShutDown()
// Wait for the caches to be synced before starting workers.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
return pkgerrors.New("failed to wait for caches to sync") return pkgerrors.New("failed to wait for caches to sync")
} }
log.G(ctx).Info("Pod cache in-sync") log.G(ctx).Info("Pod cache in-sync")
close(pc.inSyncCh)
// Perform a reconciliation step that deletes any dangling pods from the provider. // Perform a reconciliation step that deletes any dangling pods from the provider.
// This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis. // This happens only when the virtual-kubelet is starting, and operates on a "best-effort" basis.
// If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried. // If by any reason the provider fails to delete a dangling pod, it will stay in the provider and deletion won't be retried.
pc.deleteDanglingPods(ctx, threadiness) pc.deleteDanglingPods(ctx, podSyncWorkers)
// Launch "threadiness" workers to process Pod resources.
log.G(ctx).Info("starting workers") log.G(ctx).Info("starting workers")
for id := 0; id < threadiness; id++ { for id := 0; id < podSyncWorkers; id++ {
go wait.Until(func() { go wait.Until(func() {
// Use the worker's "index" as its ID so we can use it for tracing. // Use the worker's "index" as its ID so we can use it for tracing.
pc.runWorker(ctx, strconv.Itoa(id)) pc.runWorker(ctx, strconv.Itoa(id), k8sQ)
}, time.Second, ctx.Done()) }, time.Second, ctx.Done())
} }
close(pc.ready)
log.G(ctx).Info("started workers") log.G(ctx).Info("started workers")
<-ctx.Done() <-ctx.Done()
log.G(ctx).Info("shutting down workers") log.G(ctx).Info("shutting down workers")
@@ -186,14 +209,21 @@ func (pc *PodController) Run(ctx context.Context, threadiness int) error {
return nil return nil
} }
// Ready returns a channel which gets closed once the PodController is ready to handle scheduled pods.
// This channel will never close if there is an error on startup.
// The status of this channel after sthudown is indeterminate.
func (pc *PodController) Ready() <-chan struct{} {
return pc.ready
}
// runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue. // runWorker is a long-running function that will continually call the processNextWorkItem function in order to read and process an item on the work queue.
func (pc *PodController) runWorker(ctx context.Context, workerId string) { func (pc *PodController) runWorker(ctx context.Context, workerId string, q workqueue.RateLimitingInterface) {
for pc.processNextWorkItem(ctx, workerId) { for pc.processNextWorkItem(ctx, workerId, q) {
} }
} }
// processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler. // processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler.
func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string) bool { func (pc *PodController) processNextWorkItem(ctx context.Context, workerId string, q workqueue.RateLimitingInterface) bool {
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item. // We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processNextWorkItem") ctx, span := trace.StartSpan(ctx, "processNextWorkItem")
@@ -201,7 +231,7 @@ func (pc *PodController) processNextWorkItem(ctx context.Context, workerId strin
// Add the ID of the current worker as an attribute to the current span. // Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerId", workerId) ctx = span.WithField(ctx, "workerId", workerId)
return handleQueueItem(ctx, pc.workqueue, pc.syncHandler) return handleQueueItem(ctx, q, pc.syncHandler)
} }
// syncHandler compares the actual state with the desired, and attempts to converge the two. // syncHandler compares the actual state with the desired, and attempts to converge the two.
@@ -232,7 +262,7 @@ func (pc *PodController) syncHandler(ctx context.Context, key string) error {
} }
// At this point we know the Pod resource doesn't exist, which most probably means it was deleted. // At this point we know the Pod resource doesn't exist, which most probably means it was deleted.
// Hence, we must delete it from the provider if it still exists there. // Hence, we must delete it from the provider if it still exists there.
if err := pc.server.deletePod(ctx, namespace, name); err != nil { if err := pc.deletePod(ctx, namespace, name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name)) err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodNameFromCoordinates(namespace, name))
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
return err return err
@@ -254,7 +284,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod)
// Check whether the pod has been marked for deletion. // Check whether the pod has been marked for deletion.
// If it does, guarantee it is deleted in the provider and Kubernetes. // If it does, guarantee it is deleted in the provider and Kubernetes.
if pod.DeletionTimestamp != nil { if pod.DeletionTimestamp != nil {
if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil { if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod)) err := pkgerrors.Wrapf(err, "failed to delete pod %q in the provider", loggablePodName(pod))
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
return err return err
@@ -269,7 +299,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod)
} }
// Create or update the pod in the provider. // Create or update the pod in the provider.
if err := pc.server.createOrUpdatePod(ctx, pod, pc.recorder); err != nil { if err := pc.createOrUpdatePod(ctx, pod); err != nil {
err := pkgerrors.Wrapf(err, "failed to sync pod %q in the provider", loggablePodName(pod)) err := pkgerrors.Wrapf(err, "failed to sync pod %q in the provider", loggablePodName(pod))
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
return err return err
@@ -283,7 +313,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int
defer span.End() defer span.End()
// Grab the list of pods known to the provider. // Grab the list of pods known to the provider.
pps, err := pc.server.provider.GetPods(ctx) pps, err := pc.provider.GetPods(ctx)
if err != nil { if err != nil {
err := pkgerrors.Wrap(err, "failed to fetch the list of pods from the provider") err := pkgerrors.Wrap(err, "failed to fetch the list of pods from the provider")
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
@@ -332,7 +362,7 @@ func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int
// Add the pod's attributes to the current span. // Add the pod's attributes to the current span.
ctx = addPodAttributes(ctx, span, pod) ctx = addPodAttributes(ctx, span, pod)
// Actually delete the pod. // Actually delete the pod.
if err := pc.server.deletePod(ctx, pod.Namespace, pod.Name); err != nil { if err := pc.deletePod(ctx, pod.Namespace, pod.Name); err != nil {
span.SetStatus(ocstatus.FromError(err)) span.SetStatus(ocstatus.FromError(err))
log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod)) log.G(ctx).Errorf("failed to delete pod %q in provider", loggablePodName(pod))
} else { } else {

View File

@@ -2,11 +2,14 @@ package vkubelet
import ( import (
"context" "context"
"strconv"
"time"
"github.com/cpuguy83/strongerrors/status/ocstatus" "github.com/cpuguy83/strongerrors/status/ocstatus"
pkgerrors "github.com/pkg/errors" pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace" "github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
@@ -26,7 +29,10 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han
return false return false
} }
log.G(ctx).Debug("Got queue object")
err := func(obj interface{}) error { err := func(obj interface{}) error {
defer log.G(ctx).Debug("Processed queue item")
// We call Done here so the work queue knows we have finished processing this item. // We call Done here so the work queue knows we have finished processing this item.
// We also must remember to call Forget if we do not want this work item being re-queued. // We also must remember to call Forget if we do not want this work item being re-queued.
// For example, we do not call Forget if a transient error occurs. // For example, we do not call Forget if a transient error occurs.
@@ -43,6 +49,7 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han
log.G(ctx).Warnf("expected string in work queue item but got %#v", obj) log.G(ctx).Warnf("expected string in work queue item but got %#v", obj)
return nil return nil
} }
// Add the current key as an attribute to the current span. // Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key) ctx = span.WithField(ctx, "key", key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
@@ -71,3 +78,64 @@ func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, han
return true return true
} }
func (pc *PodController) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface, numWorkers int) {
for i := 0; i < numWorkers; i++ {
go func(index int) {
workerID := strconv.Itoa(index)
pc.runProviderSyncWorker(ctx, workerID, q)
}(i)
}
}
func (pc *PodController) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processPodStatusUpdate(ctx, workerID, q) {
}
}
func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerID", workerID)
return handleQueueItem(ctx, q, pc.podStatusHandler)
}
// providerSyncLoop syncronizes pod states from the provider back to kubernetes
// Deprecated: This is only used when the provider does not support async updates
// Providers should implement async update support, even if it just means copying
// something like this in.
func (pc *PodController) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) {
const sleepTime = 5 * time.Second
t := time.NewTimer(sleepTime)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
t.Stop()
ctx, span := trace.StartSpan(ctx, "syncActualState")
pc.updatePodStatuses(ctx, q)
span.End()
// restart the timer
t.Reset(sleepTime)
}
}
}
func (pc *PodController) runSyncFromProvider(ctx context.Context, q workqueue.RateLimitingInterface) {
if pn, ok := pc.provider.(PodNotifier); ok {
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
enqueuePodStatusUpdate(ctx, q, pod)
})
} else {
go pc.providerSyncLoop(ctx, q)
}
}

View File

@@ -1,150 +0,0 @@
package vkubelet
import (
"context"
"strconv"
"time"
"github.com/virtual-kubelet/virtual-kubelet/manager"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
)
const (
podStatusReasonProviderFailed = "ProviderFailed"
)
// Server masquarades itself as a kubelet and allows for the virtual node to be backed by non-vm/node providers.
type Server struct {
namespace string
nodeName string
k8sClient kubernetes.Interface
provider PodLifecycleHandler
resourceManager *manager.ResourceManager
podSyncWorkers int
podInformer corev1informers.PodInformer
readyCh chan struct{}
}
// Config is used to configure a new server.
type Config struct {
Client *kubernetes.Clientset
Namespace string
NodeName string
Provider PodLifecycleHandler
ResourceManager *manager.ResourceManager
PodSyncWorkers int
PodInformer corev1informers.PodInformer
}
// New creates a new virtual-kubelet server.
// This is the entrypoint to this package.
//
// This creates but does not start the server.
// You must call `Run` on the returned object to start the server.
func New(cfg Config) *Server {
return &Server{
nodeName: cfg.NodeName,
namespace: cfg.Namespace,
k8sClient: cfg.Client,
resourceManager: cfg.ResourceManager,
provider: cfg.Provider,
podSyncWorkers: cfg.PodSyncWorkers,
podInformer: cfg.PodInformer,
readyCh: make(chan struct{}),
}
}
// Run creates and starts an instance of the pod controller, blocking until it stops.
//
// Note that this does not setup the HTTP routes that are used to expose pod
// info to the Kubernetes API Server, such as logs, metrics, exec, etc.
// See `AttachPodRoutes` and `AttachMetricsRoutes` to set these up.
func (s *Server) Run(ctx context.Context) error {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podStatusUpdate")
s.runProviderSyncWorkers(ctx, q)
if pn, ok := s.provider.(PodNotifier); ok {
pn.NotifyPods(ctx, func(pod *corev1.Pod) {
s.enqueuePodStatusUpdate(ctx, q, pod)
})
} else {
go s.providerSyncLoop(ctx, q)
}
pc := NewPodController(s)
go func() {
select {
case <-pc.inSyncCh:
case <-ctx.Done():
}
close(s.readyCh)
}()
return pc.Run(ctx, s.podSyncWorkers)
}
// Ready returns a channel which will be closed once the VKubelet is running
func (s *Server) Ready() <-chan struct{} {
// TODO: right now all this waits on is the in-sync channel. Later, we might either want to expose multiple types
// of ready, for example:
// * In Sync
// * Control Loop running
// * Provider state synchronized with API Server state
return s.readyCh
}
// providerSyncLoop syncronizes pod states from the provider back to kubernetes
// Deprecated: This is only used when the provider does not support async updates
// Providers should implement async update support, even if it just means copying
// something like this in.
func (s *Server) providerSyncLoop(ctx context.Context, q workqueue.RateLimitingInterface) {
const sleepTime = 5 * time.Second
t := time.NewTimer(sleepTime)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
t.Stop()
ctx, span := trace.StartSpan(ctx, "syncActualState")
s.updatePodStatuses(ctx, q)
span.End()
// restart the timer
t.Reset(sleepTime)
}
}
}
func (s *Server) runProviderSyncWorkers(ctx context.Context, q workqueue.RateLimitingInterface) {
for i := 0; i < s.podSyncWorkers; i++ {
go func(index int) {
workerID := strconv.Itoa(index)
s.runProviderSyncWorker(ctx, workerID, q)
}(i)
}
}
func (s *Server) runProviderSyncWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for s.processPodStatusUpdate(ctx, workerID, q) {
}
}
func (s *Server) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerID", workerID)
return handleQueueItem(ctx, q, s.podStatusHandler)
}