Merge pull request #952 from sargun/add-tracking-info

Add Alternative Workqueue Implementation
This commit is contained in:
Sargun Dhillon
2021-02-12 15:20:06 -08:00
committed by GitHub
6 changed files with 722 additions and 109 deletions

1
go.mod
View File

@@ -18,7 +18,6 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
go.opencensus.io v0.21.0
go.uber.org/goleak v1.1.10
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4

6
go.sum
View File

@@ -608,8 +608,6 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
@@ -645,8 +643,6 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190409202823-959b441ac422 h1:QzoH/1pFpZguR8NrRHLcO6jKqfv2zpuSqZLgdm7ZmjI=
golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -768,8 +764,6 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190909030654-5b82db07426d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72 h1:bw9doJza/SFBEweII/rHQh338oozWyiFsBRHtrflcws=
golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@@ -15,6 +15,7 @@
package queue
import (
"container/list"
"context"
"fmt"
"sync"
@@ -23,8 +24,10 @@ import (
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"golang.org/x/sync/semaphore"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
)
const (
@@ -37,11 +40,45 @@ type ItemHandler func(ctx context.Context, key string) error
// Queue implements a wrapper around workqueue with native VK instrumentation
type Queue struct {
lock sync.Mutex
running bool
name string
workqueue workqueue.RateLimitingInterface
handler ItemHandler
// clock is used for testing
clock clock.Clock
// lock protects running, and the items list / map
lock sync.Mutex
running bool
name string
handler ItemHandler
ratelimiter workqueue.RateLimiter
// items are items that are marked dirty waiting for processing.
items *list.List
// itemInQueue is a map of (string) key -> item while it is in the items list
itemsInQueue map[string]*list.Element
// itemsBeingProcessed is a map of (string) key -> item once it has been moved
itemsBeingProcessed map[string]*queueItem
// Wait for next semaphore is an exclusive (1 item) lock that is taken every time items is checked to see if there
// is an item in queue for work
waitForNextItemSemaphore *semaphore.Weighted
// wakeup
wakeupCh chan struct{}
}
type queueItem struct {
key string
plannedToStartWorkAt time.Time
redirtiedAt time.Time
redirtiedWithRatelimit bool
forget bool
requeues int
// Debugging information only
originallyAdded time.Time
addedViaRedirty bool
delayedViaRateLimit *time.Duration
}
func (item *queueItem) String() string {
return fmt.Sprintf("<plannedToStartWorkAt:%s key: %s>", item.plannedToStartWorkAt.String(), item.key)
}
// New creates a queue
@@ -50,40 +87,182 @@ type Queue struct {
// in the internal kubernetes metrics.
func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Queue {
return &Queue{
name: name,
workqueue: workqueue.NewNamedRateLimitingQueue(ratelimiter, name),
handler: handler,
clock: clock.RealClock{},
name: name,
ratelimiter: ratelimiter,
items: list.New(),
itemsBeingProcessed: make(map[string]*queueItem),
itemsInQueue: make(map[string]*list.Element),
handler: handler,
wakeupCh: make(chan struct{}, 1),
waitForNextItemSemaphore: semaphore.NewWeighted(1),
}
}
// Enqueue enqueues the key in a rate limited fashion
func (q *Queue) Enqueue(key string) {
q.workqueue.AddRateLimited(key)
func (q *Queue) Enqueue(ctx context.Context, key string) {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(ctx, key, true, 0)
}
// EnqueueWithoutRateLimit enqueues the key without a rate limit
func (q *Queue) EnqueueWithoutRateLimit(key string) {
q.workqueue.Add(key)
func (q *Queue) EnqueueWithoutRateLimit(ctx context.Context, key string) {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(ctx, key, false, 0)
}
// Forget forgets the key
func (q *Queue) Forget(key string) {
q.workqueue.Forget(key)
func (q *Queue) Forget(ctx context.Context, key string) {
q.lock.Lock()
defer q.lock.Unlock()
ctx, span := trace.StartSpan(ctx, "Forget")
defer span.End()
ctx = span.WithFields(ctx, map[string]interface{}{
"queue": q.name,
"key": key,
})
if item, ok := q.itemsInQueue[key]; ok {
span.WithField(ctx, "status", "itemInQueue")
delete(q.itemsInQueue, key)
q.items.Remove(item)
return
}
if qi, ok := q.itemsBeingProcessed[key]; ok {
span.WithField(ctx, "status", "itemBeingProcessed")
qi.forget = true
return
}
span.WithField(ctx, "status", "notfound")
}
// EnqueueAfter enqueues the item after this period
//
// Since it wrap workqueue semantics, if an item has been enqueued after, and it is immediately scheduled for work,
// it will process the immediate item, and then upon the latter delayed processing it will be processed again
func (q *Queue) EnqueueAfter(key string, after time.Duration) {
q.workqueue.AddAfter(key, after)
// insert inserts a new item to be processed at time time. It will not further delay items if when is later than the
// original time the item was scheduled to be processed. If when is earlier, it will "bring it forward"
func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay time.Duration) *queueItem {
ctx, span := trace.StartSpan(ctx, "insert")
defer span.End()
ctx = span.WithFields(ctx, map[string]interface{}{
"queue": q.name,
"key": key,
"ratelimit": ratelimit,
})
if delay > 0 {
ctx = span.WithField(ctx, "delay", delay.String())
}
defer func() {
select {
case q.wakeupCh <- struct{}{}:
default:
}
}()
// First see if the item is already being processed
if item, ok := q.itemsBeingProcessed[key]; ok {
span.WithField(ctx, "status", "itemsBeingProcessed")
when := q.clock.Now().Add(delay)
// Is the item already been redirtied?
if item.redirtiedAt.IsZero() {
item.redirtiedAt = when
item.redirtiedWithRatelimit = ratelimit
} else if when.Before(item.redirtiedAt) {
item.redirtiedAt = when
item.redirtiedWithRatelimit = ratelimit
}
item.forget = false
return item
}
// Is the item already in the queue?
if item, ok := q.itemsInQueue[key]; ok {
span.WithField(ctx, "status", "itemsInQueue")
qi := item.Value.(*queueItem)
when := q.clock.Now().Add(delay)
q.adjustPosition(qi, item, when)
return qi
}
span.WithField(ctx, "status", "added")
now := q.clock.Now()
val := &queueItem{
key: key,
plannedToStartWorkAt: now,
originallyAdded: now,
}
if ratelimit {
if delay > 0 {
panic("Non-zero delay with rate limiting not supported")
}
ratelimitDelay := q.ratelimiter.When(key)
span.WithField(ctx, "delay", ratelimitDelay.String())
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(ratelimitDelay)
val.delayedViaRateLimit = &ratelimitDelay
} else {
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(delay)
}
for item := q.items.Back(); item != nil; item = item.Prev() {
qi := item.Value.(*queueItem)
if qi.plannedToStartWorkAt.Before(val.plannedToStartWorkAt) {
q.itemsInQueue[key] = q.items.InsertAfter(val, item)
return val
}
}
q.itemsInQueue[key] = q.items.PushFront(val)
return val
}
func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.Time) {
if when.After(qi.plannedToStartWorkAt) {
// The item has already been delayed appropriately
return
}
qi.plannedToStartWorkAt = when
for prev := element.Prev(); prev != nil; prev = prev.Prev() {
item := prev.Value.(*queueItem)
// does this item plan to start work *before* the new time? If so add it
if item.plannedToStartWorkAt.Before(when) {
q.items.MoveAfter(element, prev)
return
}
}
q.items.MoveToFront(element)
}
// EnqueueWithoutRateLimitWithDelay enqueues without rate limiting, but work will not start for this given delay period
func (q *Queue) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key string, after time.Duration) {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(ctx, key, false, after)
}
// Empty returns if the queue has no items in it
//
// It should only be used for debugging, as delayed items are not counted, leading to confusion
// It should only be used for debugging.
func (q *Queue) Empty() bool {
return q.workqueue.Len() == 0
return q.Len() == 0
}
// Len includes items that are in the queue, and are being processed
func (q *Queue) Len() int {
q.lock.Lock()
defer q.lock.Unlock()
if q.items.Len() != len(q.itemsInQueue) {
panic("Internally inconsistent state")
}
return q.items.Len() + len(q.itemsBeingProcessed)
}
// Run starts the workers
@@ -112,13 +291,14 @@ func (q *Queue) Run(ctx context.Context, workers int) {
group := &wait.Group{}
for i := 0; i < workers; i++ {
// This is required because i is referencing a mutable variable and that's running in a separate goroutine
idx := i
group.StartWithContext(ctx, func(ctx context.Context) {
q.worker(ctx, i)
q.worker(ctx, idx)
})
}
defer group.Wait()
<-ctx.Done()
q.workqueue.ShutDown()
}
func (q *Queue) worker(ctx context.Context, i int) {
@@ -130,6 +310,54 @@ func (q *Queue) worker(ctx context.Context, i int) {
}
}
func (q *Queue) getNextItem(ctx context.Context) (*queueItem, error) {
if err := q.waitForNextItemSemaphore.Acquire(ctx, 1); err != nil {
return nil, err
}
defer q.waitForNextItemSemaphore.Release(1)
for {
q.lock.Lock()
element := q.items.Front()
if element == nil {
// Wait for the next item
q.lock.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.wakeupCh:
}
} else {
qi := element.Value.(*queueItem)
timeUntilProcessing := time.Until(qi.plannedToStartWorkAt)
// Do we need to sleep? If not, let's party.
if timeUntilProcessing <= 0 {
q.itemsBeingProcessed[qi.key] = qi
q.items.Remove(element)
delete(q.itemsInQueue, qi.key)
q.lock.Unlock()
return qi, nil
}
q.lock.Unlock()
if err := func() error {
timer := q.clock.NewTimer(timeUntilProcessing)
defer timer.Stop()
select {
case <-timer.C():
case <-ctx.Done():
return ctx.Err()
case <-q.wakeupCh:
}
return nil
}(); err != nil {
return nil, err
}
}
}
}
// handleQueueItem handles a single item
//
// A return value of "false" indicates that further processing should be stopped.
@@ -137,8 +365,9 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool {
ctx, span := trace.StartSpan(ctx, "handleQueueItem")
defer span.End()
obj, shutdown := q.workqueue.Get()
if shutdown {
qi, err := q.getNextItem(ctx)
if err != nil {
span.SetStatus(err)
return false
}
@@ -146,11 +375,10 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool {
// These are of the form namespace/name.
// We do this as the delayed nature of the work Queue means the items in the informer cache may actually be more u
// to date that when the item was initially put onto the workqueue.
key := obj.(string)
ctx = span.WithField(ctx, "key", key)
ctx = span.WithField(ctx, "key", qi.key)
log.G(ctx).Debug("Got Queue object")
err := q.handleQueueItemObject(ctx, key)
err = q.handleQueueItemObject(ctx, qi)
if err != nil {
// We've actually hit an error, so we set the span's status based on the error.
span.SetStatus(err)
@@ -162,35 +390,69 @@ func (q *Queue) handleQueueItem(ctx context.Context) bool {
return true
}
func (q *Queue) handleQueueItemObject(ctx context.Context, key string) error {
func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error {
// This is a separate function / span, because the handleQueueItem span is the time spent waiting for the object
// plus the time spend handling the object. Instead, this function / span is scoped to a single object.
ctx, span := trace.StartSpan(ctx, "handleQueueItemObject")
defer span.End()
ctx = span.WithField(ctx, "key", key)
// We call Done here so the work Queue knows we have finished processing this item.
// We also must remember to call Forget if we do not want this work item being re-queued.
// For example, we do not call Forget if a transient error occurs.
// Instead, the item is put back on the work Queue and attempted again after a back-off period.
defer q.workqueue.Done(key)
ctx = span.WithFields(ctx, map[string]interface{}{
"requeues": qi.requeues,
"originallyAdded": qi.originallyAdded.String(),
"addedViaRedirty": qi.addedViaRedirty,
"plannedForWork": qi.plannedToStartWorkAt.String(),
})
if qi.delayedViaRateLimit != nil {
ctx = span.WithField(ctx, "delayedViaRateLimit", qi.delayedViaRateLimit.String())
}
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
ctx = span.WithField(ctx, "key", qi.key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
if err := q.handler(ctx, key); err != nil {
if q.workqueue.NumRequeues(key) < MaxRetries {
err := q.handler(ctx, qi.key)
q.lock.Lock()
defer q.lock.Unlock()
delete(q.itemsBeingProcessed, qi.key)
if qi.forget {
q.ratelimiter.Forget(qi.key)
log.G(ctx).WithError(err).Warnf("forgetting %q as told to forget while in progress", qi.key)
return nil
}
if err != nil {
if qi.requeues+1 < MaxRetries {
// Put the item back on the work Queue to handle any transient errors.
log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key)
q.workqueue.AddRateLimited(key)
log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", qi.key)
newQI := q.insert(ctx, qi.key, true, 0)
newQI.requeues = qi.requeues + 1
newQI.originallyAdded = qi.originallyAdded
return nil
}
// We've exceeded the maximum retries, so we must Forget the key.
q.workqueue.Forget(key)
return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key)
err = pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", qi.key)
}
// Finally, if no error occurs we Forget this item so it does not get queued again until another change happens.
q.workqueue.Forget(key)
return nil
// We've exceeded the maximum retries or we were successful.
q.ratelimiter.Forget(qi.key)
if !qi.redirtiedAt.IsZero() {
newQI := q.insert(ctx, qi.key, qi.redirtiedWithRatelimit, time.Until(qi.redirtiedAt))
newQI.addedViaRedirty = true
}
return err
}
func (q *Queue) String() string {
q.lock.Lock()
defer q.lock.Unlock()
items := make([]string, 0, q.items.Len())
for next := q.items.Front(); next != nil; next = next.Next() {
items = append(items, next.Value.(*queueItem).String())
}
return fmt.Sprintf("<items:%s>", items)
}

View File

@@ -5,18 +5,18 @@ import (
"errors"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"go.uber.org/goleak"
"golang.org/x/time/rate"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
)
func TestQueueMaxRetries(t *testing.T) {
@@ -36,14 +36,14 @@ func TestQueueMaxRetries(t *testing.T) {
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), t.Name(), handler)
wq.Enqueue("test")
wq.Enqueue(context.TODO(), "test")
for n < MaxRetries {
assert.Assert(t, wq.handleQueueItem(ctx))
}
assert.Assert(t, is.Equal(n, MaxRetries))
assert.Assert(t, is.Equal(0, wq.workqueue.Len()))
assert.Assert(t, is.Equal(0, wq.Len()))
}
func TestForget(t *testing.T) {
@@ -53,63 +53,403 @@ func TestForget(t *testing.T) {
}
wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler)
wq.Forget("val")
assert.Assert(t, is.Equal(0, wq.workqueue.Len()))
wq.Forget(context.TODO(), "val")
assert.Assert(t, is.Equal(0, wq.Len()))
v := "test"
wq.EnqueueWithoutRateLimit(v)
assert.Assert(t, is.Equal(1, wq.workqueue.Len()))
t.Skip("This is broken")
// Workqueue docs:
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
// Even if you do this, it doesn't work: https://play.golang.com/p/8vfL_RCsFGI
assert.Assert(t, is.Equal(0, wq.workqueue.Len()))
wq.EnqueueWithoutRateLimit(context.TODO(), v)
assert.Assert(t, is.Equal(1, wq.Len()))
}
func TestQueueTerminate(t *testing.T) {
func TestQueueEmpty(t *testing.T) {
t.Parallel()
defer goleak.VerifyNone(t,
// Ignore existing goroutines
goleak.IgnoreCurrent(),
// Ignore klog background flushers
goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"),
// Workqueue runs a goroutine in the background to handle background functions. AFAICT, they're unkillable
// and are designed to stop after a certain idle window
goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*Type).updateUnfinishedWorkLoop"),
goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop"),
)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
testMap := &sync.Map{}
handler := func(ctx context.Context, key string) error {
testMap.Store(key, struct{}{})
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
return nil
}
wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler)
group := &wait.Group{}
group.StartWithContext(ctx, func(ctx context.Context) {
wq.Run(ctx, 10)
})
for i := 0; i < 1000; i++ {
wq.EnqueueWithoutRateLimit(strconv.Itoa(i))
item, err := q.getNextItem(ctx)
assert.Error(t, err, context.DeadlineExceeded.Error())
assert.Assert(t, is.Nil(item))
}
func TestQueueItemNoSleep(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
return nil
})
q.lock.Lock()
q.insert(ctx, "foo", false, -1*time.Hour)
q.insert(ctx, "bar", false, -1*time.Hour)
q.lock.Unlock()
item, err := q.getNextItem(ctx)
assert.NilError(t, err)
assert.Assert(t, is.Equal(item.key, "foo"))
item, err = q.getNextItem(ctx)
assert.NilError(t, err)
assert.Assert(t, is.Equal(item.key, "bar"))
}
func TestQueueItemSleep(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
return nil
})
q.lock.Lock()
q.insert(ctx, "foo", false, 100*time.Millisecond)
q.insert(ctx, "bar", false, 100*time.Millisecond)
q.lock.Unlock()
item, err := q.getNextItem(ctx)
assert.NilError(t, err)
assert.Assert(t, is.Equal(item.key, "foo"))
}
func TestQueueBackgroundAdd(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
return nil
})
start := time.Now()
time.AfterFunc(100*time.Millisecond, func() {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(ctx, "foo", false, 0)
})
item, err := q.getNextItem(ctx)
assert.NilError(t, err)
assert.Assert(t, is.Equal(item.key, "foo"))
assert.Assert(t, time.Since(start) > 100*time.Millisecond)
}
func TestQueueBackgroundAdvance(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
return nil
})
start := time.Now()
q.lock.Lock()
q.insert(ctx, "foo", false, 10*time.Second)
q.lock.Unlock()
time.AfterFunc(200*time.Millisecond, func() {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(ctx, "foo", false, 0)
})
item, err := q.getNextItem(ctx)
assert.NilError(t, err)
assert.Assert(t, is.Equal(item.key, "foo"))
assert.Assert(t, time.Since(start) > 200*time.Millisecond)
assert.Assert(t, time.Since(start) < 5*time.Second)
}
func TestQueueRedirty(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
var times int64
var q *Queue
q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
assert.Assert(t, is.Equal(key, "foo"))
if atomic.AddInt64(&times, 1) == 1 {
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
} else {
cancel()
}
return nil
})
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
q.Run(ctx, 1)
for !q.Empty() {
time.Sleep(100 * time.Millisecond)
}
assert.Assert(t, is.Equal(atomic.LoadInt64(&times), int64(2)))
}
func TestHeapConcurrency(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
start := time.Now()
seen := sync.Map{}
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
seen.Store(key, struct{}{})
time.Sleep(time.Second)
return nil
})
for i := 0; i < 20; i++ {
q.EnqueueWithoutRateLimit(context.TODO(), strconv.Itoa(i))
}
for wq.workqueue.Len() > 0 {
assert.Assert(t, q.Len() == 20)
go q.Run(ctx, 20)
for q.Len() > 0 {
time.Sleep(100 * time.Millisecond)
}
for i := 0; i < 1000; i++ {
_, ok := testMap.Load(strconv.Itoa(i))
assert.Assert(t, ok, "Item %d missing", i)
for i := 0; i < 20; i++ {
_, ok := seen.Load(strconv.Itoa(i))
assert.Assert(t, ok, "Did not observe: %d", i)
}
assert.Assert(t, time.Since(start) < 5*time.Second)
}
func checkConsistency(t *testing.T, q *Queue) {
q.lock.Lock()
defer q.lock.Unlock()
for next := q.items.Front(); next != nil && next.Next() != nil; next = next.Next() {
qi := next.Value.(*queueItem)
qiNext := next.Next().Value.(*queueItem)
assert.Assert(t, qi.plannedToStartWorkAt.Before(qiNext.plannedToStartWorkAt) || qi.plannedToStartWorkAt.Equal(qiNext.plannedToStartWorkAt))
}
}
func TestHeapOrder(t *testing.T) {
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
return nil
})
q.clock = nonmovingClock{}
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "a", 1000)
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "b", 2000)
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "c", 3000)
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "d", 4000)
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "e", 5000)
checkConsistency(t, q)
t.Logf("%v", q)
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "d", 1000)
checkConsistency(t, q)
t.Logf("%v", q)
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "c", 1001)
checkConsistency(t, q)
t.Logf("%v", q)
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "e", 999)
checkConsistency(t, q)
t.Logf("%v", q)
}
type rateLimitWrapper struct {
addedMap sync.Map
forgottenMap sync.Map
rl workqueue.RateLimiter
}
func (r *rateLimitWrapper) When(item interface{}) time.Duration {
if _, ok := r.forgottenMap.Load(item); ok {
r.forgottenMap.Delete(item)
// Reset the added map
r.addedMap.Store(item, 1)
} else {
actual, loaded := r.addedMap.LoadOrStore(item, 1)
if loaded {
r.addedMap.Store(item, actual.(int)+1)
}
}
cancel()
group.Wait()
return r.rl.When(item)
}
func (r *rateLimitWrapper) Forget(item interface{}) {
r.forgottenMap.Store(item, struct{}{})
r.rl.Forget(item)
}
func (r *rateLimitWrapper) NumRequeues(item interface{}) int {
return r.rl.NumRequeues(item)
}
func TestRateLimiter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
syncMap := sync.Map{}
syncMap.Store("foo", 0)
syncMap.Store("bar", 0)
syncMap.Store("baz", 0)
syncMap.Store("quux", 0)
start := time.Now()
ratelimiter := &rateLimitWrapper{
rl: workqueue.NewItemFastSlowRateLimiter(1*time.Millisecond, 100*time.Millisecond, 1),
}
q := New(ratelimiter, t.Name(), func(ctx context.Context, key string) error {
oldValue, _ := syncMap.Load(key)
syncMap.Store(key, oldValue.(int)+1)
if oldValue.(int) < 9 {
return errors.New("test")
}
return nil
})
enqueued := 0
syncMap.Range(func(key, value interface{}) bool {
enqueued++
q.Enqueue(context.TODO(), key.(string))
return true
})
assert.Assert(t, enqueued == 4)
go q.Run(ctx, 10)
incomplete := true
for incomplete {
time.Sleep(10 * time.Millisecond)
incomplete = false
// Wait for all items to finish processing.
syncMap.Range(func(key, value interface{}) bool {
if value.(int) < 10 {
incomplete = true
}
return true
})
}
// Make sure there were ~9 "slow" rate limits per item, and 1 fast
assert.Assert(t, time.Since(start) > 9*100*time.Millisecond)
// Make sure we didn't go off the deep end.
assert.Assert(t, time.Since(start) < 2*9*100*time.Millisecond)
// Make sure each item was seen. And Forgotten.
syncMap.Range(func(key, value interface{}) bool {
_, ok := ratelimiter.forgottenMap.Load(key)
assert.Assert(t, ok, "%s in forgotten map", key)
val, ok := ratelimiter.addedMap.Load(key)
assert.Assert(t, ok, "%s in added map", key)
assert.Assert(t, val == 10)
return true
})
q.lock.Lock()
defer q.lock.Unlock()
assert.Assert(t, len(q.itemsInQueue) == 0)
assert.Assert(t, len(q.itemsBeingProcessed) == 0)
assert.Assert(t, q.items.Len() == 0)
}
func TestQueueForgetInProgress(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var times int64
var q *Queue
q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
assert.Assert(t, is.Equal(key, "foo"))
atomic.AddInt64(&times, 1)
q.Forget(context.TODO(), key)
return errors.New("test")
})
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
go q.Run(ctx, 1)
for !q.Empty() {
time.Sleep(100 * time.Millisecond)
}
assert.Assert(t, is.Equal(atomic.LoadInt64(&times), int64(1)))
}
func TestQueueForgetBeforeStart(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
panic("shouldn't be called")
})
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
q.Forget(context.TODO(), "foo")
go q.Run(ctx, 1)
for !q.Empty() {
time.Sleep(100 * time.Millisecond)
}
}
func TestQueueMoveItem(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
panic("shouldn't be called")
})
q.clock = nonmovingClock{}
q.insert(ctx, "foo", false, 3000)
q.insert(ctx, "bar", false, 2000)
q.insert(ctx, "baz", false, 1000)
checkConsistency(t, q)
t.Log(q)
q.insert(ctx, "foo", false, 2000)
checkConsistency(t, q)
t.Log(q)
q.insert(ctx, "foo", false, 1999)
checkConsistency(t, q)
t.Log(q)
q.insert(ctx, "foo", false, 999)
checkConsistency(t, q)
t.Log(q)
}
type nonmovingClock struct {
}
func (n nonmovingClock) Now() time.Time {
return time.Time{}
}
func (n nonmovingClock) Since(t time.Time) time.Duration {
return n.Now().Sub(t)
}
func (n nonmovingClock) After(d time.Duration) <-chan time.Time {
panic("implement me")
}
func (n nonmovingClock) NewTimer(d time.Duration) clock.Timer {
panic("implement me")
}
func (n nonmovingClock) Sleep(d time.Duration) {
panic("implement me")
}
func (n nonmovingClock) Tick(d time.Duration) <-chan time.Time {
panic("implement me")
}

View File

@@ -330,7 +330,7 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1
}
kpod.lastPodStatusReceivedFromProvider = pod
kpod.Unlock()
pc.syncPodStatusFromProvider.Enqueue(key)
pc.syncPodStatusFromProvider.Enqueue(ctx, key)
}
func (pc *PodController) syncPodStatusFromProviderHandler(ctx context.Context, key string) (retErr error) {

View File

@@ -302,14 +302,25 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
var eventHandler cache.ResourceEventHandler = cache.ResourceEventHandlerFuncs{
AddFunc: func(pod interface{}) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "AddFunc")
defer span.End()
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.G(ctx).Error(err)
} else {
ctx = span.WithField(ctx, "key", key)
pc.knownPods.Store(key, &knownPod{})
pc.syncPodsFromKubernetes.Enqueue(key)
pc.syncPodsFromKubernetes.Enqueue(ctx, key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "UpdateFunc")
defer span.End()
// Create a copy of the old and new pod objects so we don't mutate the cache.
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
@@ -318,6 +329,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil {
log.G(ctx).Error(err)
} else {
ctx = span.WithField(ctx, "key", key)
obj, ok := pc.knownPods.Load(key)
if !ok {
// Pods are only ever *added* to knownPods in the above AddFunc, and removed
@@ -332,25 +344,31 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
// This means that the pod in API server was changed by someone else [this can be okay], but we skipped
// a status update on our side because we compared the status received from the provider to the status
// received from the k8s api server based on outdated information.
pc.syncPodStatusFromProvider.Enqueue(key)
pc.syncPodStatusFromProvider.Enqueue(ctx, key)
// Reset this to avoid re-adding it continuously
kPod.lastPodStatusUpdateSkipped = false
}
kPod.Unlock()
if podShouldEnqueue(oldPod, newPod) {
pc.syncPodsFromKubernetes.Enqueue(key)
pc.syncPodsFromKubernetes.Enqueue(ctx, key)
}
}
},
DeleteFunc: func(pod interface{}) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "DeleteFunc")
defer span.End()
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil {
log.G(ctx).Error(err)
} else {
ctx = span.WithField(ctx, "key", key)
pc.knownPods.Delete(key)
pc.syncPodsFromKubernetes.Enqueue(key)
pc.syncPodsFromKubernetes.Enqueue(ctx, key)
// If this pod was in the deletion queue, forget about it
pc.deletePodsFromKubernetes.Forget(key)
pc.deletePodsFromKubernetes.Forget(ctx, key)
}
},
}
@@ -482,7 +500,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod,
// more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760
if pod.DeletionTimestamp != nil && !running(&pod.Status) {
log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running")
pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(key)
pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(ctx, key)
return nil
}
obj, ok := pc.knownPods.Load(key)
@@ -518,7 +536,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod,
return err
}
pc.deletePodsFromKubernetes.EnqueueAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
pc.deletePodsFromKubernetes.EnqueueWithoutRateLimitWithDelay(ctx, key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
return nil
}