Merge pull request #962 from sargun/expose-custom-retry
This commit is contained in:
@@ -35,6 +35,9 @@ const (
|
|||||||
MaxRetries = 20
|
MaxRetries = 20
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ShouldRetryFunc is a mechanism to have a custom retry policy
|
||||||
|
type ShouldRetryFunc func(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error)
|
||||||
|
|
||||||
// ItemHandler is a callback that handles a single key on the Queue
|
// ItemHandler is a callback that handles a single key on the Queue
|
||||||
type ItemHandler func(ctx context.Context, key string) error
|
type ItemHandler func(ctx context.Context, key string) error
|
||||||
|
|
||||||
@@ -61,6 +64,8 @@ type Queue struct {
|
|||||||
|
|
||||||
// wakeup
|
// wakeup
|
||||||
wakeupCh chan struct{}
|
wakeupCh chan struct{}
|
||||||
|
|
||||||
|
retryFunc ShouldRetryFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
type queueItem struct {
|
type queueItem struct {
|
||||||
@@ -83,9 +88,12 @@ func (item *queueItem) String() string {
|
|||||||
|
|
||||||
// New creates a queue
|
// New creates a queue
|
||||||
//
|
//
|
||||||
// It expects to get a item rate limiter, and a friendly name which is used in logs, and
|
// It expects to get a item rate limiter, and a friendly name which is used in logs, and in the internal kubernetes
|
||||||
// in the internal kubernetes metrics.
|
// metrics. If retryFunc is nil, the default retry function.
|
||||||
func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Queue {
|
func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler, retryFunc ShouldRetryFunc) *Queue {
|
||||||
|
if retryFunc == nil {
|
||||||
|
retryFunc = DefaultRetryFunc
|
||||||
|
}
|
||||||
return &Queue{
|
return &Queue{
|
||||||
clock: clock.RealClock{},
|
clock: clock.RealClock{},
|
||||||
name: name,
|
name: name,
|
||||||
@@ -96,6 +104,7 @@ func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Q
|
|||||||
handler: handler,
|
handler: handler,
|
||||||
wakeupCh: make(chan struct{}, 1),
|
wakeupCh: make(chan struct{}, 1),
|
||||||
waitForNextItemSemaphore: semaphore.NewWeighted(1),
|
waitForNextItemSemaphore: semaphore.NewWeighted(1),
|
||||||
|
retryFunc: retryFunc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -104,7 +113,7 @@ func (q *Queue) Enqueue(ctx context.Context, key string) {
|
|||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
q.insert(ctx, key, true, 0)
|
q.insert(ctx, key, true, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnqueueWithoutRateLimit enqueues the key without a rate limit
|
// EnqueueWithoutRateLimit enqueues the key without a rate limit
|
||||||
@@ -112,7 +121,7 @@ func (q *Queue) EnqueueWithoutRateLimit(ctx context.Context, key string) {
|
|||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
q.insert(ctx, key, false, 0)
|
q.insert(ctx, key, false, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forget forgets the key
|
// Forget forgets the key
|
||||||
@@ -142,9 +151,20 @@ func (q *Queue) Forget(ctx context.Context, key string) {
|
|||||||
span.WithField(ctx, "status", "notfound")
|
span.WithField(ctx, "status", "notfound")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func durationDeref(duration *time.Duration, def time.Duration) time.Duration {
|
||||||
|
if duration == nil {
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
|
||||||
|
return *duration
|
||||||
|
}
|
||||||
|
|
||||||
// insert inserts a new item to be processed at time time. It will not further delay items if when is later than the
|
// insert inserts a new item to be processed at time time. It will not further delay items if when is later than the
|
||||||
// original time the item was scheduled to be processed. If when is earlier, it will "bring it forward"
|
// original time the item was scheduled to be processed. If when is earlier, it will "bring it forward"
|
||||||
func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay time.Duration) *queueItem {
|
// If ratelimit is specified, and delay is nil, then the ratelimiter's delay (return from When function) will be used
|
||||||
|
// If ratelimit is specified, and the delay is non-nil, then the delay value will be used
|
||||||
|
// If ratelimit is false, then only delay is used to schedule the work. If delay is nil, it will be considered 0.
|
||||||
|
func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay *time.Duration) *queueItem {
|
||||||
ctx, span := trace.StartSpan(ctx, "insert")
|
ctx, span := trace.StartSpan(ctx, "insert")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
@@ -153,7 +173,9 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay ti
|
|||||||
"key": key,
|
"key": key,
|
||||||
"ratelimit": ratelimit,
|
"ratelimit": ratelimit,
|
||||||
})
|
})
|
||||||
if delay > 0 {
|
if delay == nil {
|
||||||
|
ctx = span.WithField(ctx, "delay", "nil")
|
||||||
|
} else {
|
||||||
ctx = span.WithField(ctx, "delay", delay.String())
|
ctx = span.WithField(ctx, "delay", delay.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -167,7 +189,7 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay ti
|
|||||||
// First see if the item is already being processed
|
// First see if the item is already being processed
|
||||||
if item, ok := q.itemsBeingProcessed[key]; ok {
|
if item, ok := q.itemsBeingProcessed[key]; ok {
|
||||||
span.WithField(ctx, "status", "itemsBeingProcessed")
|
span.WithField(ctx, "status", "itemsBeingProcessed")
|
||||||
when := q.clock.Now().Add(delay)
|
when := q.clock.Now().Add(durationDeref(delay, 0))
|
||||||
// Is the item already been redirtied?
|
// Is the item already been redirtied?
|
||||||
if item.redirtiedAt.IsZero() {
|
if item.redirtiedAt.IsZero() {
|
||||||
item.redirtiedAt = when
|
item.redirtiedAt = when
|
||||||
@@ -184,7 +206,7 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay ti
|
|||||||
if item, ok := q.itemsInQueue[key]; ok {
|
if item, ok := q.itemsInQueue[key]; ok {
|
||||||
span.WithField(ctx, "status", "itemsInQueue")
|
span.WithField(ctx, "status", "itemsInQueue")
|
||||||
qi := item.Value.(*queueItem)
|
qi := item.Value.(*queueItem)
|
||||||
when := q.clock.Now().Add(delay)
|
when := q.clock.Now().Add(durationDeref(delay, 0))
|
||||||
q.adjustPosition(qi, item, when)
|
q.adjustPosition(qi, item, when)
|
||||||
return qi
|
return qi
|
||||||
}
|
}
|
||||||
@@ -198,15 +220,16 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay ti
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ratelimit {
|
if ratelimit {
|
||||||
if delay > 0 {
|
actualDelay := q.ratelimiter.When(key)
|
||||||
panic("Non-zero delay with rate limiting not supported")
|
// Check if delay is overridden
|
||||||
|
if delay != nil {
|
||||||
|
actualDelay = *delay
|
||||||
}
|
}
|
||||||
ratelimitDelay := q.ratelimiter.When(key)
|
span.WithField(ctx, "delay", actualDelay.String())
|
||||||
span.WithField(ctx, "delay", ratelimitDelay.String())
|
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(actualDelay)
|
||||||
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(ratelimitDelay)
|
val.delayedViaRateLimit = &actualDelay
|
||||||
val.delayedViaRateLimit = &ratelimitDelay
|
|
||||||
} else {
|
} else {
|
||||||
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(delay)
|
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(durationDeref(delay, 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
for item := q.items.Back(); item != nil; item = item.Prev() {
|
for item := q.items.Back(); item != nil; item = item.Prev() {
|
||||||
@@ -244,7 +267,7 @@ func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.T
|
|||||||
func (q *Queue) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key string, after time.Duration) {
|
func (q *Queue) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key string, after time.Duration) {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
q.insert(ctx, key, false, after)
|
q.insert(ctx, key, false, &after)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Empty returns if the queue has no items in it
|
// Empty returns if the queue has no items in it
|
||||||
@@ -423,25 +446,37 @@ func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if qi.requeues+1 < MaxRetries {
|
ctx = span.WithField(ctx, "error", err.Error())
|
||||||
|
var delay *time.Duration
|
||||||
|
|
||||||
|
// Stash the original error for logging below
|
||||||
|
originalError := err
|
||||||
|
delay, err = q.retryFunc(ctx, qi.key, qi.requeues+1, qi.originallyAdded, err)
|
||||||
|
if err == nil {
|
||||||
// Put the item back on the work Queue to handle any transient errors.
|
// Put the item back on the work Queue to handle any transient errors.
|
||||||
log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", qi.key)
|
log.G(ctx).WithError(originalError).Warnf("requeuing %q due to failed sync", qi.key)
|
||||||
newQI := q.insert(ctx, qi.key, true, 0)
|
newQI := q.insert(ctx, qi.key, true, delay)
|
||||||
newQI.requeues = qi.requeues + 1
|
newQI.requeues = qi.requeues + 1
|
||||||
newQI.originallyAdded = qi.originallyAdded
|
newQI.originallyAdded = qi.originallyAdded
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
err = pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", qi.key)
|
if !qi.redirtiedAt.IsZero() {
|
||||||
|
err = fmt.Errorf("temporarily (requeued) forgetting %q due to: %w", qi.key, err)
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("forgetting %q due to: %w", qi.key, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We've exceeded the maximum retries or we were successful.
|
// We've exceeded the maximum retries or we were successful.
|
||||||
q.ratelimiter.Forget(qi.key)
|
q.ratelimiter.Forget(qi.key)
|
||||||
if !qi.redirtiedAt.IsZero() {
|
if !qi.redirtiedAt.IsZero() {
|
||||||
newQI := q.insert(ctx, qi.key, qi.redirtiedWithRatelimit, time.Until(qi.redirtiedAt))
|
delay := time.Until(qi.redirtiedAt)
|
||||||
|
newQI := q.insert(ctx, qi.key, qi.redirtiedWithRatelimit, &delay)
|
||||||
newQI.addedViaRedirty = true
|
newQI.addedViaRedirty = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
span.SetStatus(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -456,3 +491,12 @@ func (q *Queue) String() string {
|
|||||||
}
|
}
|
||||||
return fmt.Sprintf("<items:%s>", items)
|
return fmt.Sprintf("<items:%s>", items)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DefaultRetryFunc is the default function used for retries by the queue subsystem.
|
||||||
|
func DefaultRetryFunc(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error) {
|
||||||
|
if timesTried < MaxRetries {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, pkgerrors.Wrapf(err, "maximum retries (%d) reached", MaxRetries)
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,6 +19,10 @@ import (
|
|||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func durationPtr(d time.Duration) *time.Duration {
|
||||||
|
return &d
|
||||||
|
}
|
||||||
|
|
||||||
func TestQueueMaxRetries(t *testing.T) {
|
func TestQueueMaxRetries(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@@ -35,7 +39,7 @@ func TestQueueMaxRetries(t *testing.T) {
|
|||||||
// The default upper bound is 1000 seconds. Let's not use that.
|
// The default upper bound is 1000 seconds. Let's not use that.
|
||||||
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
|
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
|
||||||
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
||||||
), t.Name(), handler)
|
), t.Name(), handler, nil)
|
||||||
wq.Enqueue(context.TODO(), "test")
|
wq.Enqueue(context.TODO(), "test")
|
||||||
|
|
||||||
for n < MaxRetries {
|
for n < MaxRetries {
|
||||||
@@ -46,12 +50,63 @@ func TestQueueMaxRetries(t *testing.T) {
|
|||||||
assert.Assert(t, is.Equal(0, wq.Len()))
|
assert.Assert(t, is.Equal(0, wq.Len()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestQueueCustomRetries(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
logger := logrus.New()
|
||||||
|
logger.SetLevel(logrus.DebugLevel)
|
||||||
|
ctx = log.WithLogger(ctx, logruslogger.FromLogrus(logrus.NewEntry(logger)))
|
||||||
|
n := 0
|
||||||
|
errorSeen := 0
|
||||||
|
retryTestError := errors.New("Error should be retried every 10 milliseconds")
|
||||||
|
handler := func(ctx context.Context, key string) error {
|
||||||
|
if key == "retrytest" {
|
||||||
|
n++
|
||||||
|
return retryTestError
|
||||||
|
}
|
||||||
|
return errors.New("Unknown error")
|
||||||
|
}
|
||||||
|
|
||||||
|
shouldRetryFunc := func(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error) {
|
||||||
|
var sleepTime *time.Duration
|
||||||
|
if errors.Is(err, retryTestError) {
|
||||||
|
errorSeen++
|
||||||
|
sleepTime = durationPtr(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
_, retErr := DefaultRetryFunc(ctx, key, timesTried, originallyAdded, err)
|
||||||
|
return sleepTime, retErr
|
||||||
|
}
|
||||||
|
|
||||||
|
wq := New(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1000), 1000)}, t.Name(), handler, shouldRetryFunc)
|
||||||
|
|
||||||
|
timeTaken := func(key string) time.Duration {
|
||||||
|
start := time.Now()
|
||||||
|
wq.Enqueue(context.TODO(), key)
|
||||||
|
for i := 0; i < MaxRetries; i++ {
|
||||||
|
assert.Assert(t, wq.handleQueueItem(ctx))
|
||||||
|
}
|
||||||
|
return time.Since(start)
|
||||||
|
}
|
||||||
|
|
||||||
|
unknownTime := timeTaken("unknown")
|
||||||
|
assert.Assert(t, n == 0)
|
||||||
|
assert.Assert(t, unknownTime < 10*time.Millisecond)
|
||||||
|
|
||||||
|
retrytestTime := timeTaken("retrytest")
|
||||||
|
assert.Assert(t, is.Equal(n, MaxRetries))
|
||||||
|
assert.Assert(t, is.Equal(errorSeen, MaxRetries))
|
||||||
|
|
||||||
|
assert.Assert(t, is.Equal(0, wq.Len()))
|
||||||
|
assert.Assert(t, retrytestTime > 10*time.Millisecond*time.Duration(n-1))
|
||||||
|
assert.Assert(t, retrytestTime < 2*10*time.Millisecond*time.Duration(n-1))
|
||||||
|
}
|
||||||
|
|
||||||
func TestForget(t *testing.T) {
|
func TestForget(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
handler := func(ctx context.Context, key string) error {
|
handler := func(ctx context.Context, key string) error {
|
||||||
panic("Should never be called")
|
panic("Should never be called")
|
||||||
}
|
}
|
||||||
wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler)
|
wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler, nil)
|
||||||
|
|
||||||
wq.Forget(context.TODO(), "val")
|
wq.Forget(context.TODO(), "val")
|
||||||
assert.Assert(t, is.Equal(0, wq.Len()))
|
assert.Assert(t, is.Equal(0, wq.Len()))
|
||||||
@@ -68,7 +123,7 @@ func TestQueueEmpty(t *testing.T) {
|
|||||||
|
|
||||||
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
item, err := q.getNextItem(ctx)
|
item, err := q.getNextItem(ctx)
|
||||||
assert.Error(t, err, context.DeadlineExceeded.Error())
|
assert.Error(t, err, context.DeadlineExceeded.Error())
|
||||||
@@ -83,11 +138,11 @@ func TestQueueItemNoSleep(t *testing.T) {
|
|||||||
|
|
||||||
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
q.insert(ctx, "foo", false, -1*time.Hour)
|
q.insert(ctx, "foo", false, durationPtr(-1*time.Hour))
|
||||||
q.insert(ctx, "bar", false, -1*time.Hour)
|
q.insert(ctx, "bar", false, durationPtr(-1*time.Hour))
|
||||||
q.lock.Unlock()
|
q.lock.Unlock()
|
||||||
|
|
||||||
item, err := q.getNextItem(ctx)
|
item, err := q.getNextItem(ctx)
|
||||||
@@ -107,10 +162,10 @@ func TestQueueItemSleep(t *testing.T) {
|
|||||||
|
|
||||||
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
}, nil)
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
q.insert(ctx, "foo", false, 100*time.Millisecond)
|
q.insert(ctx, "foo", false, durationPtr(100*time.Millisecond))
|
||||||
q.insert(ctx, "bar", false, 100*time.Millisecond)
|
q.insert(ctx, "bar", false, durationPtr(100*time.Millisecond))
|
||||||
q.lock.Unlock()
|
q.lock.Unlock()
|
||||||
|
|
||||||
item, err := q.getNextItem(ctx)
|
item, err := q.getNextItem(ctx)
|
||||||
@@ -126,12 +181,12 @@ func TestQueueBackgroundAdd(t *testing.T) {
|
|||||||
|
|
||||||
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
}, nil)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
time.AfterFunc(100*time.Millisecond, func() {
|
time.AfterFunc(100*time.Millisecond, func() {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
q.insert(ctx, "foo", false, 0)
|
q.insert(ctx, "foo", false, nil)
|
||||||
})
|
})
|
||||||
|
|
||||||
item, err := q.getNextItem(ctx)
|
item, err := q.getNextItem(ctx)
|
||||||
@@ -148,16 +203,16 @@ func TestQueueBackgroundAdvance(t *testing.T) {
|
|||||||
|
|
||||||
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
}, nil)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
q.insert(ctx, "foo", false, 10*time.Second)
|
q.insert(ctx, "foo", false, durationPtr(10*time.Second))
|
||||||
q.lock.Unlock()
|
q.lock.Unlock()
|
||||||
|
|
||||||
time.AfterFunc(200*time.Millisecond, func() {
|
time.AfterFunc(200*time.Millisecond, func() {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
q.insert(ctx, "foo", false, 0)
|
q.insert(ctx, "foo", false, nil)
|
||||||
})
|
})
|
||||||
|
|
||||||
item, err := q.getNextItem(ctx)
|
item, err := q.getNextItem(ctx)
|
||||||
@@ -183,7 +238,7 @@ func TestQueueRedirty(t *testing.T) {
|
|||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
|
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
|
||||||
q.Run(ctx, 1)
|
q.Run(ctx, 1)
|
||||||
@@ -205,7 +260,7 @@ func TestHeapConcurrency(t *testing.T) {
|
|||||||
seen.Store(key, struct{}{})
|
seen.Store(key, struct{}{})
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
return nil
|
return nil
|
||||||
})
|
}, nil)
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
q.EnqueueWithoutRateLimit(context.TODO(), strconv.Itoa(i))
|
q.EnqueueWithoutRateLimit(context.TODO(), strconv.Itoa(i))
|
||||||
}
|
}
|
||||||
@@ -238,7 +293,7 @@ func checkConsistency(t *testing.T, q *Queue) {
|
|||||||
func TestHeapOrder(t *testing.T) {
|
func TestHeapOrder(t *testing.T) {
|
||||||
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
}, nil)
|
||||||
q.clock = nonmovingClock{}
|
q.clock = nonmovingClock{}
|
||||||
|
|
||||||
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "a", 1000)
|
q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "a", 1000)
|
||||||
@@ -311,7 +366,7 @@ func TestRateLimiter(t *testing.T) {
|
|||||||
return errors.New("test")
|
return errors.New("test")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
enqueued := 0
|
enqueued := 0
|
||||||
syncMap.Range(func(key, value interface{}) bool {
|
syncMap.Range(func(key, value interface{}) bool {
|
||||||
@@ -371,7 +426,7 @@ func TestQueueForgetInProgress(t *testing.T) {
|
|||||||
atomic.AddInt64(×, 1)
|
atomic.AddInt64(×, 1)
|
||||||
q.Forget(context.TODO(), key)
|
q.Forget(context.TODO(), key)
|
||||||
return errors.New("test")
|
return errors.New("test")
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
|
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
|
||||||
go q.Run(ctx, 1)
|
go q.Run(ctx, 1)
|
||||||
@@ -388,7 +443,7 @@ func TestQueueForgetBeforeStart(t *testing.T) {
|
|||||||
|
|
||||||
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
||||||
panic("shouldn't be called")
|
panic("shouldn't be called")
|
||||||
})
|
}, nil)
|
||||||
|
|
||||||
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
|
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
|
||||||
q.Forget(context.TODO(), "foo")
|
q.Forget(context.TODO(), "foo")
|
||||||
@@ -405,24 +460,24 @@ func TestQueueMoveItem(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
|
||||||
panic("shouldn't be called")
|
panic("shouldn't be called")
|
||||||
})
|
}, nil)
|
||||||
q.clock = nonmovingClock{}
|
q.clock = nonmovingClock{}
|
||||||
|
|
||||||
q.insert(ctx, "foo", false, 3000)
|
q.insert(ctx, "foo", false, durationPtr(3000))
|
||||||
q.insert(ctx, "bar", false, 2000)
|
q.insert(ctx, "bar", false, durationPtr(2000))
|
||||||
q.insert(ctx, "baz", false, 1000)
|
q.insert(ctx, "baz", false, durationPtr(1000))
|
||||||
checkConsistency(t, q)
|
checkConsistency(t, q)
|
||||||
t.Log(q)
|
t.Log(q)
|
||||||
|
|
||||||
q.insert(ctx, "foo", false, 2000)
|
q.insert(ctx, "foo", false, durationPtr(2000))
|
||||||
checkConsistency(t, q)
|
checkConsistency(t, q)
|
||||||
t.Log(q)
|
t.Log(q)
|
||||||
|
|
||||||
q.insert(ctx, "foo", false, 1999)
|
q.insert(ctx, "foo", false, durationPtr(1999))
|
||||||
checkConsistency(t, q)
|
checkConsistency(t, q)
|
||||||
t.Log(q)
|
t.Log(q)
|
||||||
|
|
||||||
q.insert(ctx, "foo", false, 999)
|
q.insert(ctx, "foo", false, durationPtr(999))
|
||||||
checkConsistency(t, q)
|
checkConsistency(t, q)
|
||||||
t.Log(q)
|
t.Log(q)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -390,10 +390,7 @@ func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testin
|
|||||||
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, podCopyWithDeletionTimestamp, metav1.CreateOptions{})
|
_, e := s.client.CoreV1().Pods(testNamespace).Create(ctx, podCopyWithDeletionTimestamp, metav1.CreateOptions{})
|
||||||
assert.NilError(t, e)
|
assert.NilError(t, e)
|
||||||
|
|
||||||
// Start the pod controller
|
|
||||||
assert.NilError(t, s.start(ctx))
|
|
||||||
watchErrCh := make(chan error)
|
watchErrCh := make(chan error)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
|
_, watchErr := watchutils.UntilWithoutRetry(ctx, watcher,
|
||||||
func(ev watch.Event) (bool, error) {
|
func(ev watch.Event) (bool, error) {
|
||||||
@@ -402,6 +399,9 @@ func testDanglingPodScenarioWithDeletionTimestamp(ctx context.Context, t *testin
|
|||||||
watchErrCh <- watchErr
|
watchErrCh <- watchErr
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Start the pod controller
|
||||||
|
assert.NilError(t, s.start(ctx))
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
t.Fatalf("Context ended early: %s", ctx.Err().Error())
|
t.Fatalf("Context ended early: %s", ctx.Err().Error())
|
||||||
|
|||||||
@@ -20,10 +20,11 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/virtual-kubelet/virtual-kubelet/internal/queue"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/internal/podutils"
|
"github.com/virtual-kubelet/virtual-kubelet/internal/podutils"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/internal/queue"
|
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
|||||||
@@ -20,12 +20,11 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/internal/queue"
|
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
pkgerrors "github.com/pkg/errors"
|
pkgerrors "github.com/pkg/errors"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
"github.com/virtual-kubelet/virtual-kubelet/internal/manager"
|
||||||
|
"github.com/virtual-kubelet/virtual-kubelet/internal/queue"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/log"
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
||||||
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
@@ -178,10 +177,18 @@ type PodControllerConfig struct {
|
|||||||
|
|
||||||
// SyncPodsFromKubernetesRateLimiter defines the rate limit for the SyncPodsFromKubernetes queue
|
// SyncPodsFromKubernetesRateLimiter defines the rate limit for the SyncPodsFromKubernetes queue
|
||||||
SyncPodsFromKubernetesRateLimiter workqueue.RateLimiter
|
SyncPodsFromKubernetesRateLimiter workqueue.RateLimiter
|
||||||
|
// SyncPodsFromKubernetesShouldRetryFunc allows for a custom retry policy for the SyncPodsFromKubernetes queue
|
||||||
|
SyncPodsFromKubernetesShouldRetryFunc ShouldRetryFunc
|
||||||
|
|
||||||
// DeletePodsFromKubernetesRateLimiter defines the rate limit for the DeletePodsFromKubernetesRateLimiter queue
|
// DeletePodsFromKubernetesRateLimiter defines the rate limit for the DeletePodsFromKubernetesRateLimiter queue
|
||||||
DeletePodsFromKubernetesRateLimiter workqueue.RateLimiter
|
DeletePodsFromKubernetesRateLimiter workqueue.RateLimiter
|
||||||
|
// DeletePodsFromKubernetesShouldRetryFunc allows for a custom retry policy for the SyncPodsFromKubernetes queue
|
||||||
|
DeletePodsFromKubernetesShouldRetryFunc ShouldRetryFunc
|
||||||
|
|
||||||
// SyncPodStatusFromProviderRateLimiter defines the rate limit for the SyncPodStatusFromProviderRateLimiter queue
|
// SyncPodStatusFromProviderRateLimiter defines the rate limit for the SyncPodStatusFromProviderRateLimiter queue
|
||||||
SyncPodStatusFromProviderRateLimiter workqueue.RateLimiter
|
SyncPodStatusFromProviderRateLimiter workqueue.RateLimiter
|
||||||
|
// SyncPodStatusFromProviderShouldRetryFunc allows for a custom retry policy for the SyncPodStatusFromProvider queue
|
||||||
|
SyncPodStatusFromProviderShouldRetryFunc ShouldRetryFunc
|
||||||
|
|
||||||
// Add custom filtering for pod informer event handlers
|
// Add custom filtering for pod informer event handlers
|
||||||
// Use this for cases where the pod informer handles more than pods assigned to this node
|
// Use this for cases where the pod informer handles more than pods assigned to this node
|
||||||
@@ -240,9 +247,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
|
|||||||
podEventFilterFunc: cfg.PodEventFilterFunc,
|
podEventFilterFunc: cfg.PodEventFilterFunc,
|
||||||
}
|
}
|
||||||
|
|
||||||
pc.syncPodsFromKubernetes = queue.New(cfg.SyncPodsFromKubernetesRateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler)
|
pc.syncPodsFromKubernetes = queue.New(cfg.SyncPodsFromKubernetesRateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler, cfg.SyncPodsFromKubernetesShouldRetryFunc)
|
||||||
pc.deletePodsFromKubernetes = queue.New(cfg.DeletePodsFromKubernetesRateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler)
|
pc.deletePodsFromKubernetes = queue.New(cfg.DeletePodsFromKubernetesRateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler, cfg.DeletePodsFromKubernetesShouldRetryFunc)
|
||||||
pc.syncPodStatusFromProvider = queue.New(cfg.SyncPodStatusFromProviderRateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler)
|
pc.syncPodStatusFromProvider = queue.New(cfg.SyncPodStatusFromProviderRateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler, cfg.SyncPodStatusFromProviderShouldRetryFunc)
|
||||||
|
|
||||||
return pc, nil
|
return pc, nil
|
||||||
}
|
}
|
||||||
|
|||||||
34
node/queue.go
Normal file
34
node/queue.go
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
package node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/virtual-kubelet/virtual-kubelet/internal/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
// These are exportable definitions of the queue package:
|
||||||
|
|
||||||
|
// ShouldRetryFunc is a mechanism to have a custom retry policy
|
||||||
|
//
|
||||||
|
// it is passed metadata about the work item when the handler returns an error. It returns the following:
|
||||||
|
// * The key
|
||||||
|
// * The number of attempts that this item has already had (and failed)
|
||||||
|
// * The (potentially wrapped) error from the queue handler.
|
||||||
|
//
|
||||||
|
// The return value is an error, and optionally an amount to delay the work.
|
||||||
|
// If an error is returned, the work will be aborted, and the returned error is bubbled up. It can be the error that
|
||||||
|
// was passed in or that error can be wrapped.
|
||||||
|
//
|
||||||
|
// If the work item should be is to be retried, a delay duration may be specified. The delay is used to schedule when
|
||||||
|
// the item should begin processing relative to now, it does not necessarily dictate when the item will start work.
|
||||||
|
// Items are processed in the order they are scheduled. If the delay is nil, it will fall back to the default behaviour
|
||||||
|
// of the queue, and use the rate limiter that's configured to determine when to start work.
|
||||||
|
//
|
||||||
|
// If the delay is negative, the item will be scheduled "earlier" than now. This will result in the item being executed
|
||||||
|
// earlier than other items in the FIFO work order.
|
||||||
|
type ShouldRetryFunc = queue.ShouldRetryFunc
|
||||||
|
|
||||||
|
// DefaultRetryFunc is the default function used for retries by the queue subsystem. Its only policy is that it gives up
|
||||||
|
// after MaxRetries, and falls back to the rate limiter for all other retries.
|
||||||
|
var DefaultRetryFunc = queue.DefaultRetryFunc
|
||||||
|
|
||||||
|
// MaxRetries is the number of times we try to process a given key before permanently forgetting it.
|
||||||
|
var MaxRetries = queue.MaxRetries
|
||||||
Reference in New Issue
Block a user