diff --git a/go.mod b/go.mod index 82e4a7a90..bda52c488 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 go.opencensus.io v0.21.0 - go.uber.org/goleak v1.1.10 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 diff --git a/go.sum b/go.sum index 6d61a23c2..2d401e3e3 100644 --- a/go.sum +++ b/go.sum @@ -608,8 +608,6 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= -go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= @@ -645,8 +643,6 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190409202823-959b441ac422 h1:QzoH/1pFpZguR8NrRHLcO6jKqfv2zpuSqZLgdm7ZmjI= golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -768,8 +764,6 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190909030654-5b82db07426d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72 h1:bw9doJza/SFBEweII/rHQh338oozWyiFsBRHtrflcws= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E= -golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 352c6600d..edfe46f90 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -15,6 +15,7 @@ package queue import ( + "container/list" "context" "fmt" "sync" @@ -23,8 +24,10 @@ import ( pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" + "golang.org/x/sync/semaphore" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" ) const ( @@ -37,11 +40,45 @@ type ItemHandler func(ctx context.Context, key string) error // Queue implements a wrapper around workqueue with native VK instrumentation type Queue struct { - lock sync.Mutex - running bool - name string - workqueue workqueue.RateLimitingInterface - handler ItemHandler + // clock is used for testing + clock clock.Clock + // lock protects running, and the items list / map + lock sync.Mutex + running bool + name string + handler ItemHandler + + ratelimiter workqueue.RateLimiter + // items are items that are marked dirty waiting for processing. + items *list.List + // itemInQueue is a map of (string) key -> item while it is in the items list + itemsInQueue map[string]*list.Element + // itemsBeingProcessed is a map of (string) key -> item once it has been moved + itemsBeingProcessed map[string]*queueItem + // Wait for next semaphore is an exclusive (1 item) lock that is taken every time items is checked to see if there + // is an item in queue for work + waitForNextItemSemaphore *semaphore.Weighted + + // wakeup + wakeupCh chan struct{} +} + +type queueItem struct { + key string + plannedToStartWorkAt time.Time + redirtiedAt time.Time + redirtiedWithRatelimit bool + forget bool + requeues int + + // Debugging information only + originallyAdded time.Time + addedViaRedirty bool + delayedViaRateLimit *time.Duration +} + +func (item *queueItem) String() string { + return fmt.Sprintf("", item.plannedToStartWorkAt.String(), item.key) } // New creates a queue @@ -50,40 +87,154 @@ type Queue struct { // in the internal kubernetes metrics. func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Queue { return &Queue{ - name: name, - workqueue: workqueue.NewNamedRateLimitingQueue(ratelimiter, name), - handler: handler, + clock: clock.RealClock{}, + name: name, + ratelimiter: ratelimiter, + items: list.New(), + itemsBeingProcessed: make(map[string]*queueItem), + itemsInQueue: make(map[string]*list.Element), + handler: handler, + wakeupCh: make(chan struct{}, 1), + waitForNextItemSemaphore: semaphore.NewWeighted(1), } } // Enqueue enqueues the key in a rate limited fashion func (q *Queue) Enqueue(key string) { - q.workqueue.AddRateLimited(key) + q.lock.Lock() + defer q.lock.Unlock() + + q.insert(key, true, 0) } // EnqueueWithoutRateLimit enqueues the key without a rate limit func (q *Queue) EnqueueWithoutRateLimit(key string) { - q.workqueue.Add(key) + q.lock.Lock() + defer q.lock.Unlock() + + q.insert(key, false, 0) } // Forget forgets the key func (q *Queue) Forget(key string) { - q.workqueue.Forget(key) + q.lock.Lock() + defer q.lock.Unlock() + + if item, ok := q.itemsInQueue[key]; ok { + delete(q.itemsInQueue, key) + q.items.Remove(item) + } + + if qi, ok := q.itemsBeingProcessed[key]; ok { + qi.forget = true + } } -// EnqueueAfter enqueues the item after this period -// -// Since it wrap workqueue semantics, if an item has been enqueued after, and it is immediately scheduled for work, -// it will process the immediate item, and then upon the latter delayed processing it will be processed again -func (q *Queue) EnqueueAfter(key string, after time.Duration) { - q.workqueue.AddAfter(key, after) +// insert inserts a new item to be processed at time time. It will not further delay items if when is later than the +// original time the item was scheduled to be processed. If when is earlier, it will "bring it forward" +func (q *Queue) insert(key string, ratelimit bool, delay time.Duration) *queueItem { + defer func() { + select { + case q.wakeupCh <- struct{}{}: + default: + } + }() + + // First see if the item is already being processed + if item, ok := q.itemsBeingProcessed[key]; ok { + when := q.clock.Now().Add(delay) + // Is the item already been redirtied? + if item.redirtiedAt.IsZero() { + item.redirtiedAt = when + item.redirtiedWithRatelimit = ratelimit + } else if when.Before(item.redirtiedAt) { + item.redirtiedAt = when + item.redirtiedWithRatelimit = ratelimit + } + item.forget = false + return item + } + + // Is the item already in the queue? + if item, ok := q.itemsInQueue[key]; ok { + qi := item.Value.(*queueItem) + when := q.clock.Now().Add(delay) + q.adjustPosition(qi, item, when) + return qi + } + + now := q.clock.Now() + val := &queueItem{ + key: key, + plannedToStartWorkAt: now, + originallyAdded: now, + } + + if ratelimit { + if delay > 0 { + panic("Non-zero delay with rate limiting not supported") + } + ratelimitDelay := q.ratelimiter.When(key) + val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(ratelimitDelay) + val.delayedViaRateLimit = &ratelimitDelay + } else { + val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(delay) + } + + for item := q.items.Back(); item != nil; item = item.Prev() { + qi := item.Value.(*queueItem) + if qi.plannedToStartWorkAt.Before(val.plannedToStartWorkAt) { + q.itemsInQueue[key] = q.items.InsertAfter(val, item) + return val + } + } + + q.itemsInQueue[key] = q.items.PushFront(val) + return val +} + +func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.Time) { + if when.After(qi.plannedToStartWorkAt) { + // The item has already been delayed appropriately + return + } + + qi.plannedToStartWorkAt = when + for prev := element.Prev(); prev != nil; prev = prev.Prev() { + item := prev.Value.(*queueItem) + // does this item plan to start work *before* the new time? If so add it + if item.plannedToStartWorkAt.Before(when) { + q.items.MoveAfter(element, prev) + return + } + } + + q.items.MoveToFront(element) +} + +// EnqueueWithoutRateLimitWithDelay enqueues without rate limiting, but work will not start for this given delay period +func (q *Queue) EnqueueWithoutRateLimitWithDelay(key string, after time.Duration) { + q.lock.Lock() + defer q.lock.Unlock() + q.insert(key, false, after) } // Empty returns if the queue has no items in it // -// It should only be used for debugging, as delayed items are not counted, leading to confusion +// It should only be used for debugging. func (q *Queue) Empty() bool { - return q.workqueue.Len() == 0 + return q.Len() == 0 +} + +// Len includes items that are in the queue, and are being processed +func (q *Queue) Len() int { + q.lock.Lock() + defer q.lock.Unlock() + if q.items.Len() != len(q.itemsInQueue) { + panic("Internally inconsistent state") + } + + return q.items.Len() + len(q.itemsBeingProcessed) } // Run starts the workers @@ -112,13 +263,14 @@ func (q *Queue) Run(ctx context.Context, workers int) { group := &wait.Group{} for i := 0; i < workers; i++ { + // This is required because i is referencing a mutable variable and that's running in a separate goroutine + idx := i group.StartWithContext(ctx, func(ctx context.Context) { - q.worker(ctx, i) + q.worker(ctx, idx) }) } defer group.Wait() <-ctx.Done() - q.workqueue.ShutDown() } func (q *Queue) worker(ctx context.Context, i int) { @@ -130,6 +282,54 @@ func (q *Queue) worker(ctx context.Context, i int) { } } +func (q *Queue) getNextItem(ctx context.Context) (*queueItem, error) { + if err := q.waitForNextItemSemaphore.Acquire(ctx, 1); err != nil { + return nil, err + } + defer q.waitForNextItemSemaphore.Release(1) + + for { + q.lock.Lock() + element := q.items.Front() + if element == nil { + // Wait for the next item + q.lock.Unlock() + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-q.wakeupCh: + } + } else { + qi := element.Value.(*queueItem) + timeUntilProcessing := time.Until(qi.plannedToStartWorkAt) + + // Do we need to sleep? If not, let's party. + if timeUntilProcessing <= 0 { + q.itemsBeingProcessed[qi.key] = qi + q.items.Remove(element) + delete(q.itemsInQueue, qi.key) + q.lock.Unlock() + return qi, nil + } + + q.lock.Unlock() + if err := func() error { + timer := q.clock.NewTimer(timeUntilProcessing) + defer timer.Stop() + select { + case <-timer.C(): + case <-ctx.Done(): + return ctx.Err() + case <-q.wakeupCh: + } + return nil + }(); err != nil { + return nil, err + } + } + } +} + // handleQueueItem handles a single item // // A return value of "false" indicates that further processing should be stopped. @@ -137,8 +337,9 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool { ctx, span := trace.StartSpan(ctx, "handleQueueItem") defer span.End() - obj, shutdown := q.workqueue.Get() - if shutdown { + qi, err := q.getNextItem(ctx) + if err != nil { + span.SetStatus(err) return false } @@ -146,11 +347,10 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool { // These are of the form namespace/name. // We do this as the delayed nature of the work Queue means the items in the informer cache may actually be more u // to date that when the item was initially put onto the workqueue. - key := obj.(string) - ctx = span.WithField(ctx, "key", key) + ctx = span.WithField(ctx, "key", qi.key) log.G(ctx).Debug("Got Queue object") - err := q.handleQueueItemObject(ctx, key) + err = q.handleQueueItemObject(ctx, qi) if err != nil { // We've actually hit an error, so we set the span's status based on the error. span.SetStatus(err) @@ -162,35 +362,69 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool { return true } -func (q *Queue) handleQueueItemObject(ctx context.Context, key string) error { +func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error { // This is a separate function / span, because the handleQueueItem span is the time spent waiting for the object // plus the time spend handling the object. Instead, this function / span is scoped to a single object. ctx, span := trace.StartSpan(ctx, "handleQueueItemObject") defer span.End() - ctx = span.WithField(ctx, "key", key) - // We call Done here so the work Queue knows we have finished processing this item. - // We also must remember to call Forget if we do not want this work item being re-queued. - // For example, we do not call Forget if a transient error occurs. - // Instead, the item is put back on the work Queue and attempted again after a back-off period. - defer q.workqueue.Done(key) + ctx = span.WithFields(ctx, map[string]interface{}{ + "requeues": qi.requeues, + "originallyAdded": qi.originallyAdded.String(), + "addedViaRedirty": qi.addedViaRedirty, + "plannedForWork": qi.plannedToStartWorkAt.String(), + }) + + if qi.delayedViaRateLimit != nil { + ctx = span.WithField(ctx, "delayedViaRateLimit", qi.delayedViaRateLimit.String()) + } // Add the current key as an attribute to the current span. - ctx = span.WithField(ctx, "key", key) + ctx = span.WithField(ctx, "key", qi.key) // Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced. - if err := q.handler(ctx, key); err != nil { - if q.workqueue.NumRequeues(key) < MaxRetries { + err := q.handler(ctx, qi.key) + + q.lock.Lock() + defer q.lock.Unlock() + + delete(q.itemsBeingProcessed, qi.key) + if qi.forget { + q.ratelimiter.Forget(qi.key) + log.G(ctx).WithError(err).Warnf("forgetting %q as told to forget while in progress", qi.key) + return nil + } + + if err != nil { + if qi.requeues+1 < MaxRetries { // Put the item back on the work Queue to handle any transient errors. - log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key) - q.workqueue.AddRateLimited(key) + log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", qi.key) + newQI := q.insert(qi.key, true, 0) + newQI.requeues = qi.requeues + 1 + newQI.originallyAdded = qi.originallyAdded + return nil } - // We've exceeded the maximum retries, so we must Forget the key. - q.workqueue.Forget(key) - return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key) + err = pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", qi.key) } - // Finally, if no error occurs we Forget this item so it does not get queued again until another change happens. - q.workqueue.Forget(key) - return nil + // We've exceeded the maximum retries or we were successful. + q.ratelimiter.Forget(qi.key) + if !qi.redirtiedAt.IsZero() { + newQI := q.insert(qi.key, qi.redirtiedWithRatelimit, time.Until(qi.redirtiedAt)) + newQI.addedViaRedirty = true + } + + return err +} + +func (q *Queue) String() string { + q.lock.Lock() + defer q.lock.Unlock() + + items := make([]string, 0, q.items.Len()) + + for next := q.items.Front(); next != nil; next = next.Next() { + items = append(items, next.Value.(*queueItem).String()) + } + return fmt.Sprintf("", items) } diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index b2fe5f2db..c2897799a 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -5,18 +5,18 @@ import ( "errors" "strconv" "sync" + "sync/atomic" "testing" "time" "github.com/sirupsen/logrus" "github.com/virtual-kubelet/virtual-kubelet/log" logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus" - "go.uber.org/goleak" "golang.org/x/time/rate" "gotest.tools/assert" is "gotest.tools/assert/cmp" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" ) func TestQueueMaxRetries(t *testing.T) { @@ -43,7 +43,7 @@ func TestQueueMaxRetries(t *testing.T) { } assert.Assert(t, is.Equal(n, MaxRetries)) - assert.Assert(t, is.Equal(0, wq.workqueue.Len())) + assert.Assert(t, is.Equal(0, wq.Len())) } func TestForget(t *testing.T) { @@ -54,62 +54,400 @@ func TestForget(t *testing.T) { wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler) wq.Forget("val") - assert.Assert(t, is.Equal(0, wq.workqueue.Len())) + assert.Assert(t, is.Equal(0, wq.Len())) v := "test" wq.EnqueueWithoutRateLimit(v) - assert.Assert(t, is.Equal(1, wq.workqueue.Len())) - - t.Skip("This is broken") - // Workqueue docs: - // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing - // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you - // still have to call `Done` on the queue. - // Even if you do this, it doesn't work: https://play.golang.com/p/8vfL_RCsFGI - assert.Assert(t, is.Equal(0, wq.workqueue.Len())) - + assert.Assert(t, is.Equal(1, wq.Len())) } -func TestQueueTerminate(t *testing.T) { +func TestQueueEmpty(t *testing.T) { t.Parallel() - defer goleak.VerifyNone(t, - // Ignore existing goroutines - goleak.IgnoreCurrent(), - // Ignore klog background flushers - goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"), - goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"), - // Workqueue runs a goroutine in the background to handle background functions. AFAICT, they're unkillable - // and are designed to stop after a certain idle window - goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*Type).updateUnfinishedWorkLoop"), - goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop"), - ) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - testMap := &sync.Map{} - handler := func(ctx context.Context, key string) error { - testMap.Store(key, struct{}{}) + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { return nil - } - - wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler) - group := &wait.Group{} - group.StartWithContext(ctx, func(ctx context.Context) { - wq.Run(ctx, 10) }) - for i := 0; i < 1000; i++ { - wq.EnqueueWithoutRateLimit(strconv.Itoa(i)) + + item, err := q.getNextItem(ctx) + assert.Error(t, err, context.DeadlineExceeded.Error()) + assert.Assert(t, is.Nil(item)) +} + +func TestQueueItemNoSleep(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + + q.lock.Lock() + q.insert("foo", false, -1*time.Hour) + q.insert("bar", false, -1*time.Hour) + q.lock.Unlock() + + item, err := q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "foo")) + + item, err = q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "bar")) +} + +func TestQueueItemSleep(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + q.lock.Lock() + q.insert("foo", false, 100*time.Millisecond) + q.insert("bar", false, 100*time.Millisecond) + q.lock.Unlock() + + item, err := q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "foo")) +} + +func TestQueueBackgroundAdd(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + start := time.Now() + time.AfterFunc(100*time.Millisecond, func() { + q.lock.Lock() + defer q.lock.Unlock() + q.insert("foo", false, 0) + }) + + item, err := q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "foo")) + assert.Assert(t, time.Since(start) > 100*time.Millisecond) +} + +func TestQueueBackgroundAdvance(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + start := time.Now() + q.lock.Lock() + q.insert("foo", false, 10*time.Second) + q.lock.Unlock() + + time.AfterFunc(200*time.Millisecond, func() { + q.lock.Lock() + defer q.lock.Unlock() + q.insert("foo", false, 0) + }) + + item, err := q.getNextItem(ctx) + assert.NilError(t, err) + assert.Assert(t, is.Equal(item.key, "foo")) + assert.Assert(t, time.Since(start) > 200*time.Millisecond) + assert.Assert(t, time.Since(start) < 5*time.Second) +} + +func TestQueueRedirty(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + var times int64 + var q *Queue + q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + assert.Assert(t, is.Equal(key, "foo")) + if atomic.AddInt64(×, 1) == 1 { + q.EnqueueWithoutRateLimit("foo") + } else { + cancel() + } + return nil + }) + + q.EnqueueWithoutRateLimit("foo") + q.Run(ctx, 1) + for !q.Empty() { + time.Sleep(100 * time.Millisecond) + } + assert.Assert(t, is.Equal(atomic.LoadInt64(×), int64(2))) +} + +func TestHeapConcurrency(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + start := time.Now() + seen := sync.Map{} + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + seen.Store(key, struct{}{}) + time.Sleep(time.Second) + return nil + }) + for i := 0; i < 20; i++ { + q.EnqueueWithoutRateLimit(strconv.Itoa(i)) } - for wq.workqueue.Len() > 0 { + assert.Assert(t, q.Len() == 20) + + go q.Run(ctx, 20) + for q.Len() > 0 { time.Sleep(100 * time.Millisecond) } - for i := 0; i < 1000; i++ { - _, ok := testMap.Load(strconv.Itoa(i)) - assert.Assert(t, ok, "Item %d missing", i) + for i := 0; i < 20; i++ { + _, ok := seen.Load(strconv.Itoa(i)) + assert.Assert(t, ok, "Did not observe: %d", i) + } + assert.Assert(t, time.Since(start) < 5*time.Second) +} + +func checkConsistency(t *testing.T, q *Queue) { + q.lock.Lock() + defer q.lock.Unlock() + + for next := q.items.Front(); next != nil && next.Next() != nil; next = next.Next() { + qi := next.Value.(*queueItem) + qiNext := next.Next().Value.(*queueItem) + assert.Assert(t, qi.plannedToStartWorkAt.Before(qiNext.plannedToStartWorkAt) || qi.plannedToStartWorkAt.Equal(qiNext.plannedToStartWorkAt)) + } +} + +func TestHeapOrder(t *testing.T) { + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + return nil + }) + q.clock = nonmovingClock{} + + q.EnqueueWithoutRateLimitWithDelay("a", 1000) + q.EnqueueWithoutRateLimitWithDelay("b", 2000) + q.EnqueueWithoutRateLimitWithDelay("c", 3000) + q.EnqueueWithoutRateLimitWithDelay("d", 4000) + q.EnqueueWithoutRateLimitWithDelay("e", 5000) + checkConsistency(t, q) + t.Logf("%v", q) + q.EnqueueWithoutRateLimitWithDelay("d", 1000) + checkConsistency(t, q) + t.Logf("%v", q) + q.EnqueueWithoutRateLimitWithDelay("c", 1001) + checkConsistency(t, q) + t.Logf("%v", q) + q.EnqueueWithoutRateLimitWithDelay("e", 999) + checkConsistency(t, q) + t.Logf("%v", q) +} + +type rateLimitWrapper struct { + addedMap sync.Map + forgottenMap sync.Map + rl workqueue.RateLimiter +} + +func (r *rateLimitWrapper) When(item interface{}) time.Duration { + if _, ok := r.forgottenMap.Load(item); ok { + r.forgottenMap.Delete(item) + // Reset the added map + r.addedMap.Store(item, 1) + } else { + actual, loaded := r.addedMap.LoadOrStore(item, 1) + if loaded { + r.addedMap.Store(item, actual.(int)+1) + } } - cancel() - group.Wait() + return r.rl.When(item) +} + +func (r *rateLimitWrapper) Forget(item interface{}) { + r.forgottenMap.Store(item, struct{}{}) + r.rl.Forget(item) +} + +func (r *rateLimitWrapper) NumRequeues(item interface{}) int { + return r.rl.NumRequeues(item) +} + +func TestRateLimiter(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) + defer cancel() + + syncMap := sync.Map{} + syncMap.Store("foo", 0) + syncMap.Store("bar", 0) + syncMap.Store("baz", 0) + syncMap.Store("quux", 0) + + start := time.Now() + ratelimiter := &rateLimitWrapper{ + rl: workqueue.NewItemFastSlowRateLimiter(1*time.Millisecond, 100*time.Millisecond, 1), + } + + q := New(ratelimiter, t.Name(), func(ctx context.Context, key string) error { + oldValue, _ := syncMap.Load(key) + syncMap.Store(key, oldValue.(int)+1) + if oldValue.(int) < 9 { + return errors.New("test") + } + return nil + }) + + enqueued := 0 + syncMap.Range(func(key, value interface{}) bool { + enqueued++ + q.Enqueue(key.(string)) + return true + }) + + assert.Assert(t, enqueued == 4) + go q.Run(ctx, 10) + + incomplete := true + for incomplete { + time.Sleep(10 * time.Millisecond) + incomplete = false + // Wait for all items to finish processing. + syncMap.Range(func(key, value interface{}) bool { + if value.(int) < 10 { + incomplete = true + } + return true + }) + } + + // Make sure there were ~9 "slow" rate limits per item, and 1 fast + assert.Assert(t, time.Since(start) > 9*100*time.Millisecond) + // Make sure we didn't go off the deep end. + assert.Assert(t, time.Since(start) < 2*9*100*time.Millisecond) + + // Make sure each item was seen. And Forgotten. + syncMap.Range(func(key, value interface{}) bool { + _, ok := ratelimiter.forgottenMap.Load(key) + assert.Assert(t, ok, "%s in forgotten map", key) + val, ok := ratelimiter.addedMap.Load(key) + assert.Assert(t, ok, "%s in added map", key) + assert.Assert(t, val == 10) + return true + }) + + q.lock.Lock() + defer q.lock.Unlock() + assert.Assert(t, len(q.itemsInQueue) == 0) + assert.Assert(t, len(q.itemsBeingProcessed) == 0) + assert.Assert(t, q.items.Len() == 0) + +} + +func TestQueueForgetInProgress(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var times int64 + var q *Queue + q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + assert.Assert(t, is.Equal(key, "foo")) + atomic.AddInt64(×, 1) + q.Forget(key) + return errors.New("test") + }) + + q.EnqueueWithoutRateLimit("foo") + go q.Run(ctx, 1) + for !q.Empty() { + time.Sleep(100 * time.Millisecond) + } + assert.Assert(t, is.Equal(atomic.LoadInt64(×), int64(1))) +} + +func TestQueueForgetBeforeStart(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + panic("shouldn't be called") + }) + + q.EnqueueWithoutRateLimit("foo") + q.Forget("foo") + go q.Run(ctx, 1) + for !q.Empty() { + time.Sleep(100 * time.Millisecond) + } +} + +func TestQueueMoveItem(t *testing.T) { + t.Parallel() + + q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { + panic("shouldn't be called") + }) + q.clock = nonmovingClock{} + + q.insert("foo", false, 3000) + q.insert("bar", false, 2000) + q.insert("baz", false, 1000) + checkConsistency(t, q) + t.Log(q) + + q.insert("foo", false, 2000) + checkConsistency(t, q) + t.Log(q) + + q.insert("foo", false, 1999) + checkConsistency(t, q) + t.Log(q) + + q.insert("foo", false, 999) + checkConsistency(t, q) + t.Log(q) +} + +type nonmovingClock struct { +} + +func (n nonmovingClock) Now() time.Time { + return time.Time{} +} + +func (n nonmovingClock) Since(t time.Time) time.Duration { + return n.Now().Sub(t) +} + +func (n nonmovingClock) After(d time.Duration) <-chan time.Time { + panic("implement me") +} + +func (n nonmovingClock) NewTimer(d time.Duration) clock.Timer { + panic("implement me") +} + +func (n nonmovingClock) Sleep(d time.Duration) { + panic("implement me") +} + +func (n nonmovingClock) Tick(d time.Duration) <-chan time.Time { + panic("implement me") } diff --git a/node/podcontroller.go b/node/podcontroller.go index 85f6ae150..8fd7c84c0 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -518,7 +518,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, return err } - pc.deletePodsFromKubernetes.EnqueueAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) + pc.deletePodsFromKubernetes.EnqueueWithoutRateLimitWithDelay(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) return nil }