diff --git a/go.mod b/go.mod index 2b1cd3129..3c446ca84 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,10 @@ require ( github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 go.opencensus.io v0.21.0 + go.uber.org/goleak v1.1.10 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 gotest.tools v2.2.0+incompatible k8s.io/api v0.18.6 k8s.io/apimachinery v0.18.6 diff --git a/go.sum b/go.sum index 2d401e3e3..c66c3063d 100644 --- a/go.sum +++ b/go.sum @@ -608,6 +608,8 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= @@ -643,6 +645,7 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190409202823-959b441ac422 h1:QzoH/1pFpZguR8NrRHLcO6jKqfv2zpuSqZLgdm7ZmjI= golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -764,6 +767,8 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190909030654-5b82db07426d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72 h1:bw9doJza/SFBEweII/rHQh338oozWyiFsBRHtrflcws= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 000000000..8f73477be --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,196 @@ +// Copyright © 2017 The virtual-kubelet authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "context" + "fmt" + "sync" + "time" + + pkgerrors "github.com/pkg/errors" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/trace" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" +) + +const ( + // MaxRetries is the number of times we try to process a given key before permanently forgetting it. + MaxRetries = 20 +) + +// ItemHandler is a callback that handles a single key on the Queue +type ItemHandler func(ctx context.Context, key string) error + +// Queue implements a wrapper around workqueue with native VK instrumentation +type Queue struct { + lock sync.Mutex + running bool + name string + workqueue workqueue.RateLimitingInterface + handler ItemHandler +} + +// New creates a queue +// +// It expects to get a item rate limiter, and a friendly name which is used in logs, and +// in the internal kubernetes metrics. +func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Queue { + return &Queue{ + name: name, + workqueue: workqueue.NewNamedRateLimitingQueue(ratelimiter, name), + handler: handler, + } +} + +// Enqueue enqueues the key in a rate limited fashion +func (q *Queue) Enqueue(key string) { + q.workqueue.AddRateLimited(key) +} + +// EnqueueWithoutRateLimit enqueues the key without a rate limit +func (q *Queue) EnqueueWithoutRateLimit(key string) { + q.workqueue.Add(key) +} + +// Forget forgets the key +func (q *Queue) Forget(key string) { + q.workqueue.Forget(key) +} + +// EnqueueAfter enqueues the item after this period +// +// Since it wrap workqueue semantics, if an item has been enqueued after, and it is immediately scheduled for work, +// it will process the immediate item, and then upon the latter delayed processing it will be processed again +func (q *Queue) EnqueueAfter(key string, after time.Duration) { + q.workqueue.AddAfter(key, after) +} + +// Empty returns if the queue has no items in it +// +// It should only be used for debugging, as delayed items are not counted, leading to confusion +func (q *Queue) Empty() bool { + return q.workqueue.Len() == 0 +} + +// Run starts the workers +// +// It blocks until context is cancelled, and all of the workers exit. +func (q *Queue) Run(ctx context.Context, workers int) { + if workers <= 0 { + panic(fmt.Sprintf("Workers must be greater than 0, got: %d", workers)) + } + + q.lock.Lock() + if q.running { + panic(fmt.Sprintf("Queue %s is already running", q.name)) + } + q.running = true + q.lock.Unlock() + defer func() { + q.lock.Lock() + defer q.lock.Unlock() + q.running = false + }() + + // Make sure all workers are stopped before we finish up. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + group := &wait.Group{} + for i := 0; i < workers; i++ { + group.StartWithContext(ctx, func(ctx context.Context) { + q.worker(ctx, i) + }) + } + defer group.Wait() + <-ctx.Done() + q.workqueue.ShutDown() +} + +func (q *Queue) worker(ctx context.Context, i int) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(map[string]interface{}{ + "workerId": i, + "Queue": q.name, + })) + for q.handleQueueItem(ctx) { + } +} + +// handleQueueItem handles a single item +// +// A return value of "false" indicates that further processing should be stopped. +func (q *Queue) handleQueueItem(ctx context.Context) bool { + ctx, span := trace.StartSpan(ctx, "handleQueueItem") + defer span.End() + + obj, shutdown := q.workqueue.Get() + if shutdown { + return false + } + + // We expect strings to come off the work Queue. + // These are of the form namespace/name. + // We do this as the delayed nature of the work Queue means the items in the informer cache may actually be more u + // to date that when the item was initially put onto the workqueue. + key := obj.(string) + ctx = span.WithField(ctx, "key", key) + log.G(ctx).Debug("Got Queue object") + + err := q.handleQueueItemObject(ctx, key) + if err != nil { + // We've actually hit an error, so we set the span's status based on the error. + span.SetStatus(err) + log.G(ctx).WithError(err).Error("Error processing Queue item") + return true + } + log.G(ctx).Debug("Processed Queue item") + + return true +} + +func (q *Queue) handleQueueItemObject(ctx context.Context, key string) error { + // This is a separate function / span, because the handleQueueItem span is the time spent waiting for the object + // plus the time spend handling the object. Instead, this function / span is scoped to a single object. + ctx, span := trace.StartSpan(ctx, "handleQueueItemObject") + defer span.End() + ctx = span.WithField(ctx, "key", key) + + // We call Done here so the work Queue knows we have finished processing this item. + // We also must remember to call Forget if we do not want this work item being re-queued. + // For example, we do not call Forget if a transient error occurs. + // Instead, the item is put back on the work Queue and attempted again after a back-off period. + defer q.workqueue.Done(key) + + // Add the current key as an attribute to the current span. + ctx = span.WithField(ctx, "key", key) + // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. + if err := q.handler(ctx, key); err != nil { + if q.workqueue.NumRequeues(key) < MaxRetries { + // Put the item back on the work Queue to handle any transient errors. + log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key) + q.workqueue.AddRateLimited(key) + return nil + } + // We've exceeded the maximum retries, so we must Forget the key. + q.workqueue.Forget(key) + return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key) + } + // Finally, if no error occurs we Forget this item so it does not get queued again until another change happens. + q.workqueue.Forget(key) + + return nil +} diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go new file mode 100644 index 000000000..b2fe5f2db --- /dev/null +++ b/internal/queue/queue_test.go @@ -0,0 +1,115 @@ +package queue + +import ( + "context" + "errors" + "strconv" + "sync" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/virtual-kubelet/virtual-kubelet/log" + logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" + "go.uber.org/goleak" + "golang.org/x/time/rate" + "gotest.tools/assert" + is "gotest.tools/assert/cmp" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" +) + +func TestQueueMaxRetries(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := logrus.New() + logger.SetLevel(logrus.DebugLevel) + ctx = log.WithLogger(ctx, logruslogger.FromLogrus(logrus.NewEntry(logger))) + n := 0 + knownErr := errors.New("Testing error") + handler := func(ctx context.Context, key string) error { + n++ + return knownErr + } + wq := New(workqueue.NewMaxOfRateLimiter( + // The default upper bound is 1000 seconds. Let's not use that. + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), t.Name(), handler) + wq.Enqueue("test") + + for n < MaxRetries { + assert.Assert(t, wq.handleQueueItem(ctx)) + } + + assert.Assert(t, is.Equal(n, MaxRetries)) + assert.Assert(t, is.Equal(0, wq.workqueue.Len())) +} + +func TestForget(t *testing.T) { + t.Parallel() + handler := func(ctx context.Context, key string) error { + panic("Should never be called") + } + wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler) + + wq.Forget("val") + assert.Assert(t, is.Equal(0, wq.workqueue.Len())) + + v := "test" + wq.EnqueueWithoutRateLimit(v) + assert.Assert(t, is.Equal(1, wq.workqueue.Len())) + + t.Skip("This is broken") + // Workqueue docs: + // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing + // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you + // still have to call `Done` on the queue. + // Even if you do this, it doesn't work: https://play.golang.com/p/8vfL_RCsFGI + assert.Assert(t, is.Equal(0, wq.workqueue.Len())) + +} + +func TestQueueTerminate(t *testing.T) { + t.Parallel() + defer goleak.VerifyNone(t, + // Ignore existing goroutines + goleak.IgnoreCurrent(), + // Ignore klog background flushers + goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"), + // Workqueue runs a goroutine in the background to handle background functions. AFAICT, they're unkillable + // and are designed to stop after a certain idle window + goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*Type).updateUnfinishedWorkLoop"), + goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop"), + ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testMap := &sync.Map{} + handler := func(ctx context.Context, key string) error { + testMap.Store(key, struct{}{}) + return nil + } + + wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler) + group := &wait.Group{} + group.StartWithContext(ctx, func(ctx context.Context) { + wq.Run(ctx, 10) + }) + for i := 0; i < 1000; i++ { + wq.EnqueueWithoutRateLimit(strconv.Itoa(i)) + } + + for wq.workqueue.Len() > 0 { + time.Sleep(100 * time.Millisecond) + } + + for i := 0; i < 1000; i++ { + _, ok := testMap.Load(strconv.Itoa(i)) + assert.Assert(t, ok, "Item %d missing", i) + } + + cancel() + group.Wait() +} diff --git a/node/lifecycle_test.go b/node/lifecycle_test.go index 06171a3ea..3adc2f8b3 100644 --- a/node/lifecycle_test.go +++ b/node/lifecycle_test.go @@ -335,7 +335,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system, // Start the pod controller assert.NilError(t, s.start(ctx)) - for s.pc.k8sQ.Len() > 0 { + for !s.pc.syncPodsFromKubernetes.Empty() { time.Sleep(10 * time.Millisecond) } diff --git a/node/pod.go b/node/pod.go index 5ff6ac867..87edb13ec 100644 --- a/node/pod.go +++ b/node/pod.go @@ -22,6 +22,7 @@ import ( "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/internal/podutils" + "github.com/virtual-kubelet/virtual-kubelet/internal/queue" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" @@ -29,7 +30,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" ) const ( @@ -264,8 +264,8 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes // enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd // prior to enqueuePodStatusUpdate. -func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) { - ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries) +func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1.Pod) { + ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*queue.MaxRetries) defer cancel() ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate") @@ -330,11 +330,11 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue } kpod.lastPodStatusReceivedFromProvider = pod kpod.Unlock() - q.AddRateLimited(key) + pc.syncPodStatusFromProvider.Enqueue(key) } -func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) { - ctx, span := trace.StartSpan(ctx, "podStatusHandler") +func (pc *PodController) syncPodStatusFromProviderHandler(ctx context.Context, key string) (retErr error) { + ctx, span := trace.StartSpan(ctx, "syncPodStatusFromProviderHandler") defer span.End() ctx = span.WithField(ctx, "key", key) @@ -363,8 +363,8 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE return pc.updatePodStatus(ctx, pod, key) } -func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retErr error) { - ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem") +func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, key string) (retErr error) { + ctx, span := trace.StartSpan(ctx, "deletePodsFromKubernetesHandler") defer span.End() namespace, name, err := cache.SplitMetaNamespaceKey(key) diff --git a/node/pod_test.go b/node/pod_test.go index 7ed8983e9..e392cb220 100644 --- a/node/pod_test.go +++ b/node/pod_test.go @@ -17,11 +17,11 @@ package node import ( "context" "fmt" - "sync" "testing" "time" testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util" + "golang.org/x/time/rate" "gotest.tools/assert" is "gotest.tools/assert/cmp" corev1 "k8s.io/api/core/v1" @@ -44,23 +44,32 @@ func newTestController() *TestController { rm := testutil.FakeResourceManager() p := newMockProvider() iFactory := kubeinformers.NewSharedInformerFactoryWithOptions(fk8s, 10*time.Minute) + rateLimiter := workqueue.NewMaxOfRateLimiter( + // The default upper bound is 1000 seconds. Let's not use that. + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ) + podController, err := NewPodController(PodControllerConfig{ + PodClient: fk8s.CoreV1(), + PodInformer: iFactory.Core().V1().Pods(), + EventRecorder: testutil.FakeEventRecorder(5), + Provider: p, + ConfigMapInformer: iFactory.Core().V1().ConfigMaps(), + SecretInformer: iFactory.Core().V1().Secrets(), + ServiceInformer: iFactory.Core().V1().Services(), + RateLimiter: rateLimiter, + }) + + if err != nil { + panic(err) + } + // Override the resource manager in the contructor with our own. + podController.resourceManager = rm + return &TestController{ - PodController: &PodController{ - client: fk8s.CoreV1(), - provider: p, - resourceManager: rm, - recorder: testutil.FakeEventRecorder(5), - k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - done: make(chan struct{}), - ready: make(chan struct{}), - knownPods: sync.Map{}, - podsInformer: iFactory.Core().V1().Pods(), - podsLister: iFactory.Core().V1().Pods().Lister(), - }, - mock: p, - client: fk8s, + PodController: podController, + mock: p, + client: fk8s, } } diff --git a/node/podcontroller.go b/node/podcontroller.go index 3bf0046bf..27ffadf54 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -17,10 +17,11 @@ package node import ( "context" "fmt" - "strconv" "sync" "time" + "github.com/virtual-kubelet/virtual-kubelet/internal/queue" + "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/errdefs" @@ -29,6 +30,7 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/wait" corev1informers "k8s.io/client-go/informers/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1listers "k8s.io/client-go/listers/core/v1" @@ -110,12 +112,13 @@ type PodController struct { resourceManager *manager.ResourceManager - k8sQ workqueue.RateLimitingInterface + syncPodsFromKubernetes *queue.Queue - // deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period - deletionQ workqueue.RateLimitingInterface + // deletePodsFromKubernetes is a queue on which pods are reconciled, and we check if pods are in API server after + // the grace period + deletePodsFromKubernetes *queue.Queue - podStatusQ workqueue.RateLimitingInterface + syncPodStatusFromProvider *queue.Queue // From the time of creation, to termination the knownPods map will contain the pods key // (derived from Kubernetes' cache library) -> a *knownPod struct. @@ -224,12 +227,13 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { ready: make(chan struct{}), done: make(chan struct{}), recorder: cfg.EventRecorder, - k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"), - deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"), - podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"), podEventFilterFunc: cfg.PodEventFilterFunc, } + pc.syncPodsFromKubernetes = queue.New(cfg.RateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler) + pc.deletePodsFromKubernetes = queue.New(cfg.RateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler) + pc.syncPodStatusFromProvider = queue.New(cfg.RateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler) + return pc, nil } @@ -247,10 +251,11 @@ type asyncProvider interface { // Once this returns, you should not re-use the controller. func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) { // Shutdowns are idempotent, so we can call it multiple times. This is in case we have to bail out early for some reason. + // This is to make extra sure that any workers we started are terminated on exit + ctx, cancel := context.WithCancel(ctx) + defer cancel() defer func() { - pc.k8sQ.ShutDown() - pc.deletionQ.ShutDown() pc.mu.Lock() pc.err = retErr close(pc.done) @@ -271,12 +276,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er pc.provider = provider provider.NotifyPods(ctx, func(pod *corev1.Pod) { - pc.enqueuePodStatusUpdate(ctx, pc.podStatusQ, pod.DeepCopy()) + pc.enqueuePodStatusUpdate(ctx, pod.DeepCopy()) }) go runProvider(ctx) - defer pc.podStatusQ.ShutDown() - // Wait for the caches to be synced *before* starting to do work. if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok { return pkgerrors.New("failed to wait for caches to sync") @@ -293,7 +296,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er log.G(ctx).Error(err) } else { pc.knownPods.Store(key, &knownPod{}) - pc.k8sQ.AddRateLimited(key) + pc.syncPodsFromKubernetes.Enqueue(key) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -319,14 +322,14 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er // This means that the pod in API server was changed by someone else [this can be okay], but we skipped // a status update on our side because we compared the status received from the provider to the status // received from the k8s api server based on outdated information. - pc.podStatusQ.AddRateLimited(key) + pc.syncPodStatusFromProvider.Enqueue(key) // Reset this to avoid re-adding it continuously kPod.lastPodStatusUpdateSkipped = false } kPod.Unlock() if podShouldEnqueue(oldPod, newPod) { - pc.k8sQ.AddRateLimited(key) + pc.syncPodsFromKubernetes.Enqueue(key) } } }, @@ -335,9 +338,9 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er log.G(ctx).Error(err) } else { pc.knownPods.Delete(key) - pc.k8sQ.AddRateLimited(key) + pc.syncPodsFromKubernetes.Enqueue(key) // If this pod was in the deletion queue, forget about it - pc.deletionQ.Forget(key) + pc.deletePodsFromKubernetes.Forget(key) } }, } @@ -363,46 +366,23 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er pc.deleteDanglingPods(ctx, podSyncWorkers) log.G(ctx).Info("starting workers") - wg := sync.WaitGroup{} - - // Use the worker's "index" as its ID so we can use it for tracing. - for id := 0; id < podSyncWorkers; id++ { - wg.Add(1) - workerID := strconv.Itoa(id) - go func() { - defer wg.Done() - pc.runSyncPodStatusFromProviderWorker(ctx, workerID, pc.podStatusQ) - }() - } - - for id := 0; id < podSyncWorkers; id++ { - wg.Add(1) - workerID := strconv.Itoa(id) - go func() { - defer wg.Done() - pc.runSyncPodsFromKubernetesWorker(ctx, workerID, pc.k8sQ) - }() - } - - for id := 0; id < podSyncWorkers; id++ { - wg.Add(1) - workerID := strconv.Itoa(id) - go func() { - defer wg.Done() - pc.runDeletionReconcilationWorker(ctx, workerID, pc.deletionQ) - }() - } - + group := &wait.Group{} + group.StartWithContext(ctx, func(ctx context.Context) { + pc.syncPodsFromKubernetes.Run(ctx, podSyncWorkers) + }) + group.StartWithContext(ctx, func(ctx context.Context) { + pc.deletePodsFromKubernetes.Run(ctx, podSyncWorkers) + }) + group.StartWithContext(ctx, func(ctx context.Context) { + pc.syncPodStatusFromProvider.Run(ctx, podSyncWorkers) + }) + defer group.Wait() + log.G(ctx).Info("started workers") close(pc.ready) - log.G(ctx).Info("started workers") <-ctx.Done() log.G(ctx).Info("shutting down workers") - pc.k8sQ.ShutDown() - pc.podStatusQ.ShutDown() - pc.deletionQ.ShutDown() - wg.Wait() return nil } @@ -426,28 +406,9 @@ func (pc *PodController) Err() error { return pc.err } -// runSyncPodsFromKubernetesWorker is a long-running function that will continually call the processNextWorkItem function -// in order to read and process an item on the work queue that is generated by the pod informer. -func (pc *PodController) runSyncPodsFromKubernetesWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { - for pc.processNextWorkItem(ctx, workerID, q) { - } -} - -// processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler. -func (pc *PodController) processNextWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool { - - // We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item. - ctx, span := trace.StartSpan(ctx, "processNextWorkItem") - defer span.End() - - // Add the ID of the current worker as an attribute to the current span. - ctx = span.WithField(ctx, "workerId", workerID) - return handleQueueItem(ctx, q, pc.syncHandler) -} - -// syncHandler compares the actual state with the desired, and attempts to converge the two. -func (pc *PodController) syncHandler(ctx context.Context, key string) error { - ctx, span := trace.StartSpan(ctx, "syncHandler") +// syncPodFromKubernetesHandler compares the actual state with the desired, and attempts to converge the two. +func (pc *PodController) syncPodFromKubernetesHandler(ctx context.Context, key string) error { + ctx, span := trace.StartSpan(ctx, "syncPodFromKubernetesHandler") defer span.End() // Add the current key as an attribute to the current span. @@ -511,7 +472,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, // more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760 if pod.DeletionTimestamp != nil && !running(&pod.Status) { log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running") - pc.deletionQ.Add(key) + pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(key) return nil } obj, ok := pc.knownPods.Load(key) @@ -547,7 +508,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, return err } - pc.deletionQ.AddAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) + pc.deletePodsFromKubernetes.EnqueueAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) return nil } @@ -566,25 +527,6 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, return nil } -// runDeletionReconcilationWorker is a long-running function that will continually call the processDeletionReconcilationWorkItem -// function in order to read and process an item on the work queue that is generated by the pod informer. -func (pc *PodController) runDeletionReconcilationWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { - for pc.processDeletionReconcilationWorkItem(ctx, workerID, q) { - } -} - -// processDeletionReconcilationWorkItem will read a single work item off the work queue and attempt to process it,by calling the deletionReconcilation. -func (pc *PodController) processDeletionReconcilationWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool { - - // We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item. - ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem") - defer span.End() - - // Add the ID of the current worker as an attribute to the current span. - ctx = span.WithField(ctx, "workerId", workerID) - return handleQueueItem(ctx, q, pc.deletePodHandler) -} - // deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them. func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) { ctx, span := trace.StartSpan(ctx, "deleteDanglingPods") diff --git a/node/queue.go b/node/queue.go deleted file mode 100644 index 12572b5de..000000000 --- a/node/queue.go +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright © 2017 The virtual-kubelet authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package node - -import ( - "context" - - pkgerrors "github.com/pkg/errors" - "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/trace" - "k8s.io/client-go/util/workqueue" -) - -const ( - // maxRetries is the number of times we try to process a given key before permanently forgetting it. - maxRetries = 20 -) - -type queueHandler func(ctx context.Context, key string) error - -func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, handler queueHandler) bool { - ctx, span := trace.StartSpan(ctx, "handleQueueItem") - defer span.End() - - obj, shutdown := q.Get() - if shutdown { - return false - } - - log.G(ctx).Debug("Got queue object") - - err := func(obj interface{}) error { - defer log.G(ctx).Debug("Processed queue item") - // We call Done here so the work queue knows we have finished processing this item. - // We also must remember to call Forget if we do not want this work item being re-queued. - // For example, we do not call Forget if a transient error occurs. - // Instead, the item is put back on the work queue and attempted again after a back-off period. - defer q.Done(obj) - var key string - var ok bool - // We expect strings to come off the work queue. - // These are of the form namespace/name. - // We do this as the delayed nature of the work queue means the items in the informer cache may actually be more up to date that when the item was initially put onto the workqueue. - if key, ok = obj.(string); !ok { - // As the item in the work queue is actually invalid, we call Forget here else we'd go into a loop of attempting to process a work item that is invalid. - q.Forget(obj) - log.G(ctx).Warnf("expected string in work queue item but got %#v", obj) - return nil - } - - // Add the current key as an attribute to the current span. - ctx = span.WithField(ctx, "key", key) - // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. - if err := handler(ctx, key); err != nil { - if q.NumRequeues(key) < maxRetries { - // Put the item back on the work queue to handle any transient errors. - log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key) - q.AddRateLimited(key) - return nil - } - // We've exceeded the maximum retries, so we must forget the key. - q.Forget(key) - return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key) - } - // Finally, if no error occurs we Forget this item so it does not get queued again until another change happens. - q.Forget(obj) - return nil - }(obj) - - if err != nil { - // We've actually hit an error, so we set the span's status based on the error. - span.SetStatus(err) - log.G(ctx).Error(err) - return true - } - - return true -} - -func (pc *PodController) runSyncPodStatusFromProviderWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) { - for pc.processPodStatusUpdate(ctx, workerID, q) { - } -} - -func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool { - ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate") - defer span.End() - - // Add the ID of the current worker as an attribute to the current span. - ctx = span.WithField(ctx, "workerID", workerID) - - return handleQueueItem(ctx, q, pc.podStatusHandler) -}