From ac9a1af564683e5cb9dd1c2d9be71fde11b1dfc2 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Fri, 29 Jan 2021 00:45:03 -0800 Subject: [PATCH] Replace golang workqueue with our own This is a fundamentally different API than that of the K8s workqueue which is better suited for our needs. Specifically, we need a simple queue which doesn't have complex features like delayed adds that sit on "external" goroutines. In addition, we need deep introspection into the operations of the workqueue. Although you can get this on top of the K8s workqueue by implementing a custom rate limiter, the problem is that the underlying rate limiter's behaviour is still somewhat opaque. This basically has 100% code coverage. --- go.mod | 1 - go.sum | 6 - internal/queue/queue.go | 322 ++++++++++++++++++++++---- internal/queue/queue_test.go | 426 +++++++++++++++++++++++++++++++---- node/podcontroller.go | 2 +- 5 files changed, 661 insertions(+), 96 deletions(-) diff --git a/go.mod b/go.mod index 82e4a7a90..bda52c488 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 go.opencensus.io v0.21.0 - go.uber.org/goleak v1.1.10 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 diff --git a/go.sum b/go.sum index 6d61a23c2..2d401e3e3 100644 --- a/go.sum +++ b/go.sum @@ -608,8 +608,6 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= -go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= @@ -645,8 +643,6 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190409202823-959b441ac422 h1:QzoH/1pFpZguR8NrRHLcO6jKqfv2zpuSqZLgdm7ZmjI= golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -768,8 +764,6 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190909030654-5b82db07426d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72 h1:bw9doJza/SFBEweII/rHQh338oozWyiFsBRHtrflcws= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E= -golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 352c6600d..edfe46f90 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -15,6 +15,7 @@ package queue import ( + "container/list" "context" "fmt" "sync" @@ -23,8 +24,10 @@ import ( pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" + "golang.org/x/sync/semaphore" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" ) const ( @@ -37,11 +40,45 @@ type ItemHandler func(ctx context.Context, key string) error // Queue implements a wrapper around workqueue with native VK instrumentation type Queue struct { - lock sync.Mutex - running bool - name string - workqueue workqueue.RateLimitingInterface - handler ItemHandler + // clock is used for testing + clock clock.Clock + // lock protects running, and the items list / map + lock sync.Mutex + running bool + name string + handler ItemHandler + + ratelimiter workqueue.RateLimiter + // items are items that are marked dirty waiting for processing. + items *list.List + // itemInQueue is a map of (string) key -> item while it is in the items list + itemsInQueue map[string]*list.Element + // itemsBeingProcessed is a map of (string) key -> item once it has been moved + itemsBeingProcessed map[string]*queueItem + // Wait for next semaphore is an exclusive (1 item) lock that is taken every time items is checked to see if there + // is an item in queue for work + waitForNextItemSemaphore *semaphore.Weighted + + // wakeup + wakeupCh chan struct{} +} + +type queueItem struct { + key string + plannedToStartWorkAt time.Time + redirtiedAt time.Time + redirtiedWithRatelimit bool + forget bool + requeues int + + // Debugging information only + originallyAdded time.Time + addedViaRedirty bool + delayedViaRateLimit *time.Duration +} + +func (item *queueItem) String() string { + return fmt.Sprintf("", item.plannedToStartWorkAt.String(), item.key) } // New creates a queue @@ -50,40 +87,154 @@ type Queue struct { // in the internal kubernetes metrics. func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Queue { return &Queue{ - name: name, - workqueue: workqueue.NewNamedRateLimitingQueue(ratelimiter, name), - handler: handler, + clock: clock.RealClock{}, + name: name, + ratelimiter: ratelimiter, + items: list.New(), + itemsBeingProcessed: make(map[string]*queueItem), + itemsInQueue: make(map[string]*list.Element), + handler: handler, + wakeupCh: make(chan struct{}, 1), + waitForNextItemSemaphore: semaphore.NewWeighted(1), } } // Enqueue enqueues the key in a rate limited fashion func (q *Queue) Enqueue(key string) { - q.workqueue.AddRateLimited(key) + q.lock.Lock() + defer q.lock.Unlock() + + q.insert(key, true, 0) } // EnqueueWithoutRateLimit enqueues the key without a rate limit func (q *Queue) EnqueueWithoutRateLimit(key string) { - q.workqueue.Add(key) + q.lock.Lock() + defer q.lock.Unlock() + + q.insert(key, false, 0) } // Forget forgets the key func (q *Queue) Forget(key string) { - q.workqueue.Forget(key) + q.lock.Lock() + defer q.lock.Unlock() + + if item, ok := q.itemsInQueue[key]; ok { + delete(q.itemsInQueue, key) + q.items.Remove(item) + } + + if qi, ok := q.itemsBeingProcessed[key]; ok { + qi.forget = true + } } -// EnqueueAfter enqueues the item after this period -// -// Since it wrap workqueue semantics, if an item has been enqueued after, and it is immediately scheduled for work, -// it will process the immediate item, and then upon the latter delayed processing it will be processed again -func (q *Queue) EnqueueAfter(key string, after time.Duration) { - q.workqueue.AddAfter(key, after) +// insert inserts a new item to be processed at time time. It will not further delay items if when is later than the +// original time the item was scheduled to be processed. If when is earlier, it will "bring it forward" +func (q *Queue) insert(key string, ratelimit bool, delay time.Duration) *queueItem { + defer func() { + select { + case q.wakeupCh <- struct{}{}: + default: + } + }() + + // First see if the item is already being processed + if item, ok := q.itemsBeingProcessed[key]; ok { + when := q.clock.Now().Add(delay) + // Is the item already been redirtied? + if item.redirtiedAt.IsZero() { + item.redirtiedAt = when + item.redirtiedWithRatelimit = ratelimit + } else if when.Before(item.redirtiedAt) { + item.redirtiedAt = when + item.redirtiedWithRatelimit = ratelimit + } + item.forget = false + return item + } + + // Is the item already in the queue? + if item, ok := q.itemsInQueue[key]; ok { + qi := item.Value.(*queueItem) + when := q.clock.Now().Add(delay) + q.adjustPosition(qi, item, when) + return qi + } + + now := q.clock.Now() + val := &queueItem{ + key: key, + plannedToStartWorkAt: now, + originallyAdded: now, + } + + if ratelimit { + if delay > 0 { + panic("Non-zero delay with rate limiting not supported") + } + ratelimitDelay := q.ratelimiter.When(key) + val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(ratelimitDelay) + val.delayedViaRateLimit = &ratelimitDelay + } else { + val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(delay) + } + + for item := q.items.Back(); item != nil; item = item.Prev() { + qi := item.Value.(*queueItem) + if qi.plannedToStartWorkAt.Before(val.plannedToStartWorkAt) { + q.itemsInQueue[key] = q.items.InsertAfter(val, item) + return val + } + } + + q.itemsInQueue[key] = q.items.PushFront(val) + return val +} + +func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.Time) { + if when.After(qi.plannedToStartWorkAt) { + // The item has already been delayed appropriately + return + } + + qi.plannedToStartWorkAt = when + for prev := element.Prev(); prev != nil; prev = prev.Prev() { + item := prev.Value.(*queueItem) + // does this item plan to start work *before* the new time? If so add it + if item.plannedToStartWorkAt.Before(when) { + q.items.MoveAfter(element, prev) + return + } + } + + q.items.MoveToFront(element) +} + +// EnqueueWithoutRateLimitWithDelay enqueues without rate limiting, but work will not start for this given delay period +func (q *Queue) EnqueueWithoutRateLimitWithDelay(key string, after time.Duration) { + q.lock.Lock() + defer q.lock.Unlock() + q.insert(key, false, after) } // Empty returns if the queue has no items in it // -// It should only be used for debugging, as delayed items are not counted, leading to confusion +// It should only be used for debugging. func (q *Queue) Empty() bool { - return q.workqueue.Len() == 0 + return q.Len() == 0 +} + +// Len includes items that are in the queue, and are being processed +func (q *Queue) Len() int { + q.lock.Lock() + defer q.lock.Unlock() + if q.items.Len() != len(q.itemsInQueue) { + panic("Internally inconsistent state") + } + + return q.items.Len() + len(q.itemsBeingProcessed) } // Run starts the workers @@ -112,13 +263,14 @@ func (q *Queue) Run(ctx context.Context, workers int) { group := &wait.Group{} for i := 0; i < workers; i++ { + // This is required because i is referencing a mutable variable and that's running in a separate goroutine + idx := i group.StartWithContext(ctx, func(ctx context.Context) { - q.worker(ctx, i) + q.worker(ctx, idx) }) } defer group.Wait() <-ctx.Done() - q.workqueue.ShutDown() } func (q *Queue) worker(ctx context.Context, i int) { @@ -130,6 +282,54 @@ func (q *Queue) worker(ctx context.Context, i int) { } } +func (q *Queue) getNextItem(ctx context.Context) (*queueItem, error) { + if err := q.waitForNextItemSemaphore.Acquire(ctx, 1); err != nil { + return nil, err + } + defer q.waitForNextItemSemaphore.Release(1) + + for { + q.lock.Lock() + element := q.items.Front() + if element == nil { + // Wait for the next item + q.lock.Unlock() + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-q.wakeupCh: + } + } else { + qi := element.Value.(*queueItem) + timeUntilProcessing := time.Until(qi.plannedToStartWorkAt) + + // Do we need to sleep? If not, let's party. + if timeUntilProcessing <= 0 { + q.itemsBeingProcessed[qi.key] = qi + q.items.Remove(element) + delete(q.itemsInQueue, qi.key) + q.lock.Unlock() + return qi, nil + } + + q.lock.Unlock() + if err := func() error { + timer := q.clock.NewTimer(timeUntilProcessing) + defer timer.Stop() + select { + case <-timer.C(): + case <-ctx.Done(): + return ctx.Err() + case <-q.wakeupCh: + } + return nil + }(); err != nil { + return nil, err + } + } + } +} + // handleQueueItem handles a single item // // A return value of "false" indicates that further processing should be stopped. @@ -137,8 +337,9 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool { ctx, span := trace.StartSpan(ctx, "handleQueueItem") defer span.End() - obj, shutdown := q.workqueue.Get() - if shutdown { + qi, err := q.getNextItem(ctx) + if err != nil { + span.SetStatus(err) return false } @@ -146,11 +347,10 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool { // These are of the form namespace/name. // We do this as the delayed nature of the work Queue means the items in the informer cache may actually be more u // to date that when the item was initially put onto the workqueue. - key := obj.(string) - ctx = span.WithField(ctx, "key", key) + ctx = span.WithField(ctx, "key", qi.key) log.G(ctx).Debug("Got Queue object") - err := q.handleQueueItemObject(ctx, key) + err = q.handleQueueItemObject(ctx, qi) if err != nil { // We've actually hit an error, so we set the span's status based on the error. span.SetStatus(err) @@ -162,35 +362,69 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool { return true } -func (q *Queue) handleQueueItemObject(ctx context.Context, key string) error { +func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error { // This is a separate function / span, because the handleQueueItem span is the time spent waiting for the object // plus the time spend handling the object. Instead, this function / span is scoped to a single object. ctx, span := trace.StartSpan(ctx, "handleQueueItemObject") defer span.End() - ctx = span.WithField(ctx, "key", key) - // We call Done here so the work Queue knows we have finished processing this item. - // We also must remember to call Forget if we do not want this work item being re-queued. - // For example, we do not call Forget if a transient error occurs. - // Instead, the item is put back on the work Queue and attempted again after a back-off period. - defer q.workqueue.Done(key) + ctx = span.WithFields(ctx, map[string]interface{}{ + "requeues": qi.requeues, + "originallyAdded": qi.originallyAdded.String(), + "addedViaRedirty": qi.addedViaRedirty, + "plannedForWork": qi.plannedToStartWorkAt.String(), + }) + + if qi.delayedViaRateLimit != nil { + ctx = span.WithField(ctx, "delayedViaRateLimit", qi.delayedViaRateLimit.String()) + } // Add the current key as an attribute to the current span. - ctx = span.WithField(ctx, "key", key) + ctx = span.WithField(ctx, "key", qi.key) // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. - if err := q.handler(ctx, key); err != nil { - if q.workqueue.NumRequeues(key) < MaxRetries { + err := q.handler(ctx, qi.key) + + q.lock.Lock() + defer q.lock.Unlock() + + delete(q.itemsBeingProcessed, qi.key) + if qi.forget { + q.ratelimiter.Forget(qi.key) + log.G(ctx).WithError(err).Warnf("forgetting %q as told to forget while in progress", qi.key) + return nil + } + + if err != nil { + if qi.requeues+1 < MaxRetries { // Put the item back on the work Queue to handle any transient errors. - log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key) - q.workqueue.AddRateLimited(key) + log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", qi.key) + newQI := q.insert(qi.key, true, 0) + newQI.requeues = qi.requeues + 1 + newQI.originallyAdded = qi.originallyAdded + return nil } - // We've exceeded the maximum retries, so we must Forget the key. - q.workqueue.Forget(key) - return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key) + err = pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", qi.key) } - // Finally, if no error occurs we Forget this item so it does not get queued again until another change happens. - q.workqueue.Forget(key) - return nil + // We've exceeded the maximum retries or we were successful. + q.ratelimiter.Forget(qi.key) + if !qi.redirtiedAt.IsZero() { + newQI := q.insert(qi.key, qi.redirtiedWithRatelimit, time.Until(qi.redirtiedAt)) + newQI.addedViaRedirty = true + } + + return err +} + +func (q *Queue) String() string { + q.lock.Lock() + defer q.lock.Unlock() + + items := make([]string, 0, q.items.Len()) + + for next := q.items.Front(); next != nil; next = next.Next() { + items = append(items, next.Value.(*queueItem).String()) + } + return fmt.Sprintf("", items) } diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index b2fe5f2db..c2897799a 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -5,18 +5,18 @@ import ( "errors" "strconv" "sync" + "sync/atomic" "testing" "time" "github.com/sirupsen/logrus" "github.com/virtual-kubelet/virtual-kubelet/log" logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" - "go.uber.org/goleak" "golang.org/x/time/rate" "gotest.tools/assert" is "gotest.tools/assert/cmp" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" ) func TestQueueMaxRetries(t *testing.T) { @@ -43,7 +43,7 @@ func TestQueueMaxRetries(t *testing.T) { } assert.Assert(t, is.Equal(n, MaxRetries)) - assert.Assert(t, is.Equal(0, wq.workqueue.Len())) + assert.Assert(t, is.Equal(0, wq.Len())) } func TestForget(t *testing.T) { @@ -54,62 +54,400 @@ func TestForget(t *testing.T) { wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler) wq.Forget("val") - assert.Assert(t, is.Equal(0, wq.workqueue.Len())) + assert.Assert(t, is.Equal(0, wq.Len())) v := "test" wq.EnqueueWithoutRateLimit(v) - assert.Assert(t, is.Equal(1, wq.workqueue.Len())) - - t.Skip("This is broken") - // Workqueue docs: - // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing - // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you - // still have to call `Done` on the queue. - // Even if you do this, it doesn't work: https://play.golang.com/p/8vfL_RCsFGI - assert.Assert(t, is.Equal(0, wq.workqueue.Len())) - + assert.Assert(t, is.Equal(1, wq.Len())) } -func TestQueueTerminate(t *testing.T) { +func TestQueueEmpty(t *testing.T) { t.Parallel() - defer goleak.VerifyNone(t, - // Ignore existing goroutines - goleak.IgnoreCurrent(), - // Ignore klog background flushers - goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"), - goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"), - // Workqueue runs a goroutine in the background to handle background functions. AFAICT, they're unkillable - // and are designed to stop after a certain idle window - goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*Type).updateUnfinishedWorkLoop"), - goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop"), - ) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - testMap := &sync.Map{} - handler := func(ctx context.Context, key string) error { - testMap.Store(key, struct{}{}) + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { return nil - } - - wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler) - group := &wait.Group{} - group.StartWithContext(ctx, func(ctx context.Context) { - wq.Run(ctx, 10) }) - for i := 0; i < 1000; i++ { - wq.EnqueueWithoutRateLimit(strconv.Itoa(i)) + + item, err := q.getNextItem(ctx) + assert.Error(t, err, context.DeadlineExceeded.Error()) + assert.Assert(t, is.Nil(item)) +} + +func TestQueueItemNoSleep(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + + q.lock.Lock() + q.insert("foo", false, -1*time.Hour) + q.insert("bar", false, -1*time.Hour) + q.lock.Unlock() + + item, err := q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "foo")) + + item, err = q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "bar")) +} + +func TestQueueItemSleep(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + q.lock.Lock() + q.insert("foo", false, 100*time.Millisecond) + q.insert("bar", false, 100*time.Millisecond) + q.lock.Unlock() + + item, err := q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "foo")) +} + +func TestQueueBackgroundAdd(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + start := time.Now() + time.AfterFunc(100*time.Millisecond, func() { + q.lock.Lock() + defer q.lock.Unlock() + q.insert("foo", false, 0) + }) + + item, err := q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "foo")) + assert.Assert(t, time.Since(start) > 100*time.Millisecond) +} + +func TestQueueBackgroundAdvance(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + start := time.Now() + q.lock.Lock() + q.insert("foo", false, 10*time.Second) + q.lock.Unlock() + + time.AfterFunc(200*time.Millisecond, func() { + q.lock.Lock() + defer q.lock.Unlock() + q.insert("foo", false, 0) + }) + + item, err := q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "foo")) + assert.Assert(t, time.Since(start) > 200*time.Millisecond) + assert.Assert(t, time.Since(start) < 5*time.Second) +} + +func TestQueueRedirty(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + var times int64 + var q *Queue + q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + assert.Assert(t, is.Equal(key, "foo")) + if atomic.AddInt64(×, 1) == 1 { + q.EnqueueWithoutRateLimit("foo") + } else { + cancel() + } + return nil + }) + + q.EnqueueWithoutRateLimit("foo") + q.Run(ctx, 1) + for !q.Empty() { + time.Sleep(100 * time.Millisecond) + } + assert.Assert(t, is.Equal(atomic.LoadInt64(×), int64(2))) +} + +func TestHeapConcurrency(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + start := time.Now() + seen := sync.Map{} + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + seen.Store(key, struct{}{}) + time.Sleep(time.Second) + return nil + }) + for i := 0; i < 20; i++ { + q.EnqueueWithoutRateLimit(strconv.Itoa(i)) } - for wq.workqueue.Len() > 0 { + assert.Assert(t, q.Len() == 20) + + go q.Run(ctx, 20) + for q.Len() > 0 { time.Sleep(100 * time.Millisecond) } - for i := 0; i < 1000; i++ { - _, ok := testMap.Load(strconv.Itoa(i)) - assert.Assert(t, ok, "Item %d missing", i) + for i := 0; i < 20; i++ { + _, ok := seen.Load(strconv.Itoa(i)) + assert.Assert(t, ok, "Did not observe: %d", i) + } + assert.Assert(t, time.Since(start) < 5*time.Second) +} + +func checkConsistency(t *testing.T, q *Queue) { + q.lock.Lock() + defer q.lock.Unlock() + + for next := q.items.Front(); next != nil && next.Next() != nil; next = next.Next() { + qi := next.Value.(*queueItem) + qiNext := next.Next().Value.(*queueItem) + assert.Assert(t, qi.plannedToStartWorkAt.Before(qiNext.plannedToStartWorkAt) || qi.plannedToStartWorkAt.Equal(qiNext.plannedToStartWorkAt)) + } +} + +func TestHeapOrder(t *testing.T) { + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + q.clock = nonmovingClock{} + + q.EnqueueWithoutRateLimitWithDelay("a", 1000) + q.EnqueueWithoutRateLimitWithDelay("b", 2000) + q.EnqueueWithoutRateLimitWithDelay("c", 3000) + q.EnqueueWithoutRateLimitWithDelay("d", 4000) + q.EnqueueWithoutRateLimitWithDelay("e", 5000) + checkConsistency(t, q) + t.Logf("%v", q) + q.EnqueueWithoutRateLimitWithDelay("d", 1000) + checkConsistency(t, q) + t.Logf("%v", q) + q.EnqueueWithoutRateLimitWithDelay("c", 1001) + checkConsistency(t, q) + t.Logf("%v", q) + q.EnqueueWithoutRateLimitWithDelay("e", 999) + checkConsistency(t, q) + t.Logf("%v", q) +} + +type rateLimitWrapper struct { + addedMap sync.Map + forgottenMap sync.Map + rl workqueue.RateLimiter +} + +func (r *rateLimitWrapper) When(item interface{}) time.Duration { + if _, ok := r.forgottenMap.Load(item); ok { + r.forgottenMap.Delete(item) + // Reset the added map + r.addedMap.Store(item, 1) + } else { + actual, loaded := r.addedMap.LoadOrStore(item, 1) + if loaded { + r.addedMap.Store(item, actual.(int)+1) + } } - cancel() - group.Wait() + return r.rl.When(item) +} + +func (r *rateLimitWrapper) Forget(item interface{}) { + r.forgottenMap.Store(item, struct{}{}) + r.rl.Forget(item) +} + +func (r *rateLimitWrapper) NumRequeues(item interface{}) int { + return r.rl.NumRequeues(item) +} + +func TestRateLimiter(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + syncMap := sync.Map{} + syncMap.Store("foo", 0) + syncMap.Store("bar", 0) + syncMap.Store("baz", 0) + syncMap.Store("quux", 0) + + start := time.Now() + ratelimiter := &rateLimitWrapper{ + rl: workqueue.NewItemFastSlowRateLimiter(1*time.Millisecond, 100*time.Millisecond, 1), + } + + q := New(ratelimiter, t.Name(), func(ctx context.Context, key string) error { + oldValue, _ := syncMap.Load(key) + syncMap.Store(key, oldValue.(int)+1) + if oldValue.(int) < 9 { + return errors.New("test") + } + return nil + }) + + enqueued := 0 + syncMap.Range(func(key, value interface{}) bool { + enqueued++ + q.Enqueue(key.(string)) + return true + }) + + assert.Assert(t, enqueued == 4) + go q.Run(ctx, 10) + + incomplete := true + for incomplete { + time.Sleep(10 * time.Millisecond) + incomplete = false + // Wait for all items to finish processing. + syncMap.Range(func(key, value interface{}) bool { + if value.(int) < 10 { + incomplete = true + } + return true + }) + } + + // Make sure there were ~9 "slow" rate limits per item, and 1 fast + assert.Assert(t, time.Since(start) > 9*100*time.Millisecond) + // Make sure we didn't go off the deep end. + assert.Assert(t, time.Since(start) < 2*9*100*time.Millisecond) + + // Make sure each item was seen. And Forgotten. + syncMap.Range(func(key, value interface{}) bool { + _, ok := ratelimiter.forgottenMap.Load(key) + assert.Assert(t, ok, "%s in forgotten map", key) + val, ok := ratelimiter.addedMap.Load(key) + assert.Assert(t, ok, "%s in added map", key) + assert.Assert(t, val == 10) + return true + }) + + q.lock.Lock() + defer q.lock.Unlock() + assert.Assert(t, len(q.itemsInQueue) == 0) + assert.Assert(t, len(q.itemsBeingProcessed) == 0) + assert.Assert(t, q.items.Len() == 0) + +} + +func TestQueueForgetInProgress(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var times int64 + var q *Queue + q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + assert.Assert(t, is.Equal(key, "foo")) + atomic.AddInt64(×, 1) + q.Forget(key) + return errors.New("test") + }) + + q.EnqueueWithoutRateLimit("foo") + go q.Run(ctx, 1) + for !q.Empty() { + time.Sleep(100 * time.Millisecond) + } + assert.Assert(t, is.Equal(atomic.LoadInt64(×), int64(1))) +} + +func TestQueueForgetBeforeStart(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + panic("shouldn't be called") + }) + + q.EnqueueWithoutRateLimit("foo") + q.Forget("foo") + go q.Run(ctx, 1) + for !q.Empty() { + time.Sleep(100 * time.Millisecond) + } +} + +func TestQueueMoveItem(t *testing.T) { + t.Parallel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + panic("shouldn't be called") + }) + q.clock = nonmovingClock{} + + q.insert("foo", false, 3000) + q.insert("bar", false, 2000) + q.insert("baz", false, 1000) + checkConsistency(t, q) + t.Log(q) + + q.insert("foo", false, 2000) + checkConsistency(t, q) + t.Log(q) + + q.insert("foo", false, 1999) + checkConsistency(t, q) + t.Log(q) + + q.insert("foo", false, 999) + checkConsistency(t, q) + t.Log(q) +} + +type nonmovingClock struct { +} + +func (n nonmovingClock) Now() time.Time { + return time.Time{} +} + +func (n nonmovingClock) Since(t time.Time) time.Duration { + return n.Now().Sub(t) +} + +func (n nonmovingClock) After(d time.Duration) <-chan time.Time { + panic("implement me") +} + +func (n nonmovingClock) NewTimer(d time.Duration) clock.Timer { + panic("implement me") +} + +func (n nonmovingClock) Sleep(d time.Duration) { + panic("implement me") +} + +func (n nonmovingClock) Tick(d time.Duration) <-chan time.Time { + panic("implement me") } diff --git a/node/podcontroller.go b/node/podcontroller.go index 85f6ae150..8fd7c84c0 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -518,7 +518,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, return err } - pc.deletePodsFromKubernetes.EnqueueAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) + pc.deletePodsFromKubernetes.EnqueueWithoutRateLimitWithDelay(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) return nil }