diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 1b6a5e136..ee60bcc2e 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -35,6 +35,9 @@ const ( MaxRetries = 20 ) +// ShouldRetryFunc is a mechanism to have a custom retry policy +type ShouldRetryFunc func(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error) + // ItemHandler is a callback that handles a single key on the Queue type ItemHandler func(ctx context.Context, key string) error @@ -61,6 +64,8 @@ type Queue struct { // wakeup wakeupCh chan struct{} + + retryFunc ShouldRetryFunc } type queueItem struct { @@ -83,9 +88,12 @@ func (item *queueItem) String() string { // New creates a queue // -// It expects to get a item rate limiter, and a friendly name which is used in logs, and -// in the internal kubernetes metrics. -func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Queue { +// It expects to get a item rate limiter, and a friendly name which is used in logs, and in the internal kubernetes +// metrics. If retryFunc is nil, the default retry function. +func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler, retryFunc ShouldRetryFunc) *Queue { + if retryFunc == nil { + retryFunc = DefaultRetryFunc + } return &Queue{ clock: clock.RealClock{}, name: name, @@ -96,6 +104,7 @@ func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Q handler: handler, wakeupCh: make(chan struct{}, 1), waitForNextItemSemaphore: semaphore.NewWeighted(1), + retryFunc: retryFunc, } } @@ -104,7 +113,7 @@ func (q *Queue) Enqueue(ctx context.Context, key string) { q.lock.Lock() defer q.lock.Unlock() - q.insert(ctx, key, true, 0) + q.insert(ctx, key, true, nil) } // EnqueueWithoutRateLimit enqueues the key without a rate limit @@ -112,7 +121,7 @@ func (q *Queue) EnqueueWithoutRateLimit(ctx context.Context, key string) { q.lock.Lock() defer q.lock.Unlock() - q.insert(ctx, key, false, 0) + q.insert(ctx, key, false, nil) } // Forget forgets the key @@ -142,9 +151,20 @@ func (q *Queue) Forget(ctx context.Context, key string) { span.WithField(ctx, "status", "notfound") } +func durationDeref(duration *time.Duration, def time.Duration) time.Duration { + if duration == nil { + return def + } + + return *duration +} + // insert inserts a new item to be processed at time time. It will not further delay items if when is later than the // original time the item was scheduled to be processed. If when is earlier, it will "bring it forward" -func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay time.Duration) *queueItem { +// If ratelimit is specified, and delay is nil, then the ratelimiter's delay (return from When function) will be used +// If ratelimit is specified, and the delay is non-nil, then the delay value will be used +// If ratelimit is false, then only delay is used to schedule the work. If delay is nil, it will be considered 0. +func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay *time.Duration) *queueItem { ctx, span := trace.StartSpan(ctx, "insert") defer span.End() @@ -153,7 +173,9 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay ti "key": key, "ratelimit": ratelimit, }) - if delay > 0 { + if delay == nil { + ctx = span.WithField(ctx, "delay", "nil") + } else { ctx = span.WithField(ctx, "delay", delay.String()) } @@ -167,7 +189,7 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay ti // First see if the item is already being processed if item, ok := q.itemsBeingProcessed[key]; ok { span.WithField(ctx, "status", "itemsBeingProcessed") - when := q.clock.Now().Add(delay) + when := q.clock.Now().Add(durationDeref(delay, 0)) // Is the item already been redirtied? if item.redirtiedAt.IsZero() { item.redirtiedAt = when @@ -184,7 +206,7 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay ti if item, ok := q.itemsInQueue[key]; ok { span.WithField(ctx, "status", "itemsInQueue") qi := item.Value.(*queueItem) - when := q.clock.Now().Add(delay) + when := q.clock.Now().Add(durationDeref(delay, 0)) q.adjustPosition(qi, item, when) return qi } @@ -198,15 +220,16 @@ func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay ti } if ratelimit { - if delay > 0 { - panic("Non-zero delay with rate limiting not supported") + actualDelay := q.ratelimiter.When(key) + // Check if delay is overridden + if delay != nil { + actualDelay = *delay } - ratelimitDelay := q.ratelimiter.When(key) - span.WithField(ctx, "delay", ratelimitDelay.String()) - val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(ratelimitDelay) - val.delayedViaRateLimit = &ratelimitDelay + span.WithField(ctx, "delay", actualDelay.String()) + val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(actualDelay) + val.delayedViaRateLimit = &actualDelay } else { - val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(delay) + val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(durationDeref(delay, 0)) } for item := q.items.Back(); item != nil; item = item.Prev() { @@ -244,7 +267,7 @@ func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.T func (q *Queue) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key string, after time.Duration) { q.lock.Lock() defer q.lock.Unlock() - q.insert(ctx, key, false, after) + q.insert(ctx, key, false, &after) } // Empty returns if the queue has no items in it @@ -423,25 +446,37 @@ func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error } if err != nil { - if qi.requeues+1 < MaxRetries { + ctx = span.WithField(ctx, "error", err.Error()) + var delay *time.Duration + + // Stash the original error for logging below + originalError := err + delay, err = q.retryFunc(ctx, qi.key, qi.requeues+1, qi.originallyAdded, err) + if err == nil { // Put the item back on the work Queue to handle any transient errors. - log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", qi.key) - newQI := q.insert(ctx, qi.key, true, 0) + log.G(ctx).WithError(originalError).Warnf("requeuing %q due to failed sync", qi.key) + newQI := q.insert(ctx, qi.key, true, delay) newQI.requeues = qi.requeues + 1 newQI.originallyAdded = qi.originallyAdded return nil } - err = pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", qi.key) + if !qi.redirtiedAt.IsZero() { + err = fmt.Errorf("temporarily (requeued) forgetting %q due to: %w", qi.key, err) + } else { + err = fmt.Errorf("forgetting %q due to: %w", qi.key, err) + } } // We've exceeded the maximum retries or we were successful. q.ratelimiter.Forget(qi.key) if !qi.redirtiedAt.IsZero() { - newQI := q.insert(ctx, qi.key, qi.redirtiedWithRatelimit, time.Until(qi.redirtiedAt)) + delay := time.Until(qi.redirtiedAt) + newQI := q.insert(ctx, qi.key, qi.redirtiedWithRatelimit, &delay) newQI.addedViaRedirty = true } + span.SetStatus(err) return err } @@ -456,3 +491,12 @@ func (q *Queue) String() string { } return fmt.Sprintf("", items) } + +// DefaultRetryFunc is the default function used for retries by the queue subsystem. +func DefaultRetryFunc(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error) { + if timesTried < MaxRetries { + return nil, nil + } + + return nil, pkgerrors.Wrapf(err, "maximum retries (%d) reached", MaxRetries) +} diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index cf6be8870..55fad4f1a 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -19,6 +19,10 @@ import ( "k8s.io/utils/clock" ) +func durationPtr(d time.Duration) *time.Duration { + return &d +} + func TestQueueMaxRetries(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -35,7 +39,7 @@ func TestQueueMaxRetries(t *testing.T) { // The default upper bound is 1000 seconds. Let's not use that. workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond), &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ), t.Name(), handler) + ), t.Name(), handler, nil) wq.Enqueue(context.TODO(), "test") for n < MaxRetries { @@ -46,12 +50,63 @@ func TestQueueMaxRetries(t *testing.T) { assert.Assert(t, is.Equal(0, wq.Len())) } +func TestQueueCustomRetries(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := logrus.New() + logger.SetLevel(logrus.DebugLevel) + ctx = log.WithLogger(ctx, logruslogger.FromLogrus(logrus.NewEntry(logger))) + n := 0 + errorSeen := 0 + retryTestError := errors.New("Error should be retried every 10 milliseconds") + handler := func(ctx context.Context, key string) error { + if key == "retrytest" { + n++ + return retryTestError + } + return errors.New("Unknown error") + } + + shouldRetryFunc := func(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error) { + var sleepTime *time.Duration + if errors.Is(err, retryTestError) { + errorSeen++ + sleepTime = durationPtr(10 * time.Millisecond) + } + _, retErr := DefaultRetryFunc(ctx, key, timesTried, originallyAdded, err) + return sleepTime, retErr + } + + wq := New(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1000), 1000)}, t.Name(), handler, shouldRetryFunc) + + timeTaken := func(key string) time.Duration { + start := time.Now() + wq.Enqueue(context.TODO(), key) + for i := 0; i < MaxRetries; i++ { + assert.Assert(t, wq.handleQueueItem(ctx)) + } + return time.Since(start) + } + + unknownTime := timeTaken("unknown") + assert.Assert(t, n == 0) + assert.Assert(t, unknownTime < 10*time.Millisecond) + + retrytestTime := timeTaken("retrytest") + assert.Assert(t, is.Equal(n, MaxRetries)) + assert.Assert(t, is.Equal(errorSeen, MaxRetries)) + + assert.Assert(t, is.Equal(0, wq.Len())) + assert.Assert(t, retrytestTime > 10*time.Millisecond*time.Duration(n-1)) + assert.Assert(t, retrytestTime < 2*10*time.Millisecond*time.Duration(n-1)) +} + func TestForget(t *testing.T) { t.Parallel() handler := func(ctx context.Context, key string) error { panic("Should never be called") } - wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler) + wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler, nil) wq.Forget(context.TODO(), "val") assert.Assert(t, is.Equal(0, wq.Len())) @@ -68,7 +123,7 @@ func TestQueueEmpty(t *testing.T) { q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { return nil - }) + }, nil) item, err := q.getNextItem(ctx) assert.Error(t, err, context.DeadlineExceeded.Error()) @@ -83,11 +138,11 @@ func TestQueueItemNoSleep(t *testing.T) { q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { return nil - }) + }, nil) q.lock.Lock() - q.insert(ctx, "foo", false, -1*time.Hour) - q.insert(ctx, "bar", false, -1*time.Hour) + q.insert(ctx, "foo", false, durationPtr(-1*time.Hour)) + q.insert(ctx, "bar", false, durationPtr(-1*time.Hour)) q.lock.Unlock() item, err := q.getNextItem(ctx) @@ -107,10 +162,10 @@ func TestQueueItemSleep(t *testing.T) { q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { return nil - }) + }, nil) q.lock.Lock() - q.insert(ctx, "foo", false, 100*time.Millisecond) - q.insert(ctx, "bar", false, 100*time.Millisecond) + q.insert(ctx, "foo", false, durationPtr(100*time.Millisecond)) + q.insert(ctx, "bar", false, durationPtr(100*time.Millisecond)) q.lock.Unlock() item, err := q.getNextItem(ctx) @@ -126,12 +181,12 @@ func TestQueueBackgroundAdd(t *testing.T) { q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { return nil - }) + }, nil) start := time.Now() time.AfterFunc(100*time.Millisecond, func() { q.lock.Lock() defer q.lock.Unlock() - q.insert(ctx, "foo", false, 0) + q.insert(ctx, "foo", false, nil) }) item, err := q.getNextItem(ctx) @@ -148,16 +203,16 @@ func TestQueueBackgroundAdvance(t *testing.T) { q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { return nil - }) + }, nil) start := time.Now() q.lock.Lock() - q.insert(ctx, "foo", false, 10*time.Second) + q.insert(ctx, "foo", false, durationPtr(10*time.Second)) q.lock.Unlock() time.AfterFunc(200*time.Millisecond, func() { q.lock.Lock() defer q.lock.Unlock() - q.insert(ctx, "foo", false, 0) + q.insert(ctx, "foo", false, nil) }) item, err := q.getNextItem(ctx) @@ -183,7 +238,7 @@ func TestQueueRedirty(t *testing.T) { cancel() } return nil - }) + }, nil) q.EnqueueWithoutRateLimit(context.TODO(), "foo") q.Run(ctx, 1) @@ -205,7 +260,7 @@ func TestHeapConcurrency(t *testing.T) { seen.Store(key, struct{}{}) time.Sleep(time.Second) return nil - }) + }, nil) for i := 0; i < 20; i++ { q.EnqueueWithoutRateLimit(context.TODO(), strconv.Itoa(i)) } @@ -238,7 +293,7 @@ func checkConsistency(t *testing.T, q *Queue) { func TestHeapOrder(t *testing.T) { q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { return nil - }) + }, nil) q.clock = nonmovingClock{} q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "a", 1000) @@ -311,7 +366,7 @@ func TestRateLimiter(t *testing.T) { return errors.New("test") } return nil - }) + }, nil) enqueued := 0 syncMap.Range(func(key, value interface{}) bool { @@ -371,7 +426,7 @@ func TestQueueForgetInProgress(t *testing.T) { atomic.AddInt64(×, 1) q.Forget(context.TODO(), key) return errors.New("test") - }) + }, nil) q.EnqueueWithoutRateLimit(context.TODO(), "foo") go q.Run(ctx, 1) @@ -388,7 +443,7 @@ func TestQueueForgetBeforeStart(t *testing.T) { q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { panic("shouldn't be called") - }) + }, nil) q.EnqueueWithoutRateLimit(context.TODO(), "foo") q.Forget(context.TODO(), "foo") @@ -405,24 +460,24 @@ func TestQueueMoveItem(t *testing.T) { defer cancel() q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { panic("shouldn't be called") - }) + }, nil) q.clock = nonmovingClock{} - q.insert(ctx, "foo", false, 3000) - q.insert(ctx, "bar", false, 2000) - q.insert(ctx, "baz", false, 1000) + q.insert(ctx, "foo", false, durationPtr(3000)) + q.insert(ctx, "bar", false, durationPtr(2000)) + q.insert(ctx, "baz", false, durationPtr(1000)) checkConsistency(t, q) t.Log(q) - q.insert(ctx, "foo", false, 2000) + q.insert(ctx, "foo", false, durationPtr(2000)) checkConsistency(t, q) t.Log(q) - q.insert(ctx, "foo", false, 1999) + q.insert(ctx, "foo", false, durationPtr(1999)) checkConsistency(t, q) t.Log(q) - q.insert(ctx, "foo", false, 999) + q.insert(ctx, "foo", false, durationPtr(999)) checkConsistency(t, q) t.Log(q) } diff --git a/node/pod.go b/node/pod.go index bd538c5cb..be010ba91 100644 --- a/node/pod.go +++ b/node/pod.go @@ -20,10 +20,11 @@ import ( "strings" "time" + "github.com/virtual-kubelet/virtual-kubelet/internal/queue" + "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/internal/podutils" - "github.com/virtual-kubelet/virtual-kubelet/internal/queue" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" diff --git a/node/podcontroller.go b/node/podcontroller.go index f8342f956..420e0c991 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -20,12 +20,11 @@ import ( "sync" "time" - "github.com/virtual-kubelet/virtual-kubelet/internal/queue" - "github.com/google/go-cmp/cmp" pkgerrors "github.com/pkg/errors" "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/internal/manager" + "github.com/virtual-kubelet/virtual-kubelet/internal/queue" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/trace" corev1 "k8s.io/api/core/v1" @@ -178,10 +177,18 @@ type PodControllerConfig struct { // SyncPodsFromKubernetesRateLimiter defines the rate limit for the SyncPodsFromKubernetes queue SyncPodsFromKubernetesRateLimiter workqueue.RateLimiter + // SyncPodsFromKubernetesShouldRetryFunc allows for a custom retry policy for the SyncPodsFromKubernetes queue + SyncPodsFromKubernetesShouldRetryFunc ShouldRetryFunc + // DeletePodsFromKubernetesRateLimiter defines the rate limit for the DeletePodsFromKubernetesRateLimiter queue DeletePodsFromKubernetesRateLimiter workqueue.RateLimiter + // DeletePodsFromKubernetesShouldRetryFunc allows for a custom retry policy for the SyncPodsFromKubernetes queue + DeletePodsFromKubernetesShouldRetryFunc ShouldRetryFunc + // SyncPodStatusFromProviderRateLimiter defines the rate limit for the SyncPodStatusFromProviderRateLimiter queue SyncPodStatusFromProviderRateLimiter workqueue.RateLimiter + // SyncPodStatusFromProviderShouldRetryFunc allows for a custom retry policy for the SyncPodStatusFromProvider queue + SyncPodStatusFromProviderShouldRetryFunc ShouldRetryFunc // Add custom filtering for pod informer event handlers // Use this for cases where the pod informer handles more than pods assigned to this node @@ -240,9 +247,9 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) { podEventFilterFunc: cfg.PodEventFilterFunc, } - pc.syncPodsFromKubernetes = queue.New(cfg.SyncPodsFromKubernetesRateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler) - pc.deletePodsFromKubernetes = queue.New(cfg.DeletePodsFromKubernetesRateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler) - pc.syncPodStatusFromProvider = queue.New(cfg.SyncPodStatusFromProviderRateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler) + pc.syncPodsFromKubernetes = queue.New(cfg.SyncPodsFromKubernetesRateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler, cfg.SyncPodsFromKubernetesShouldRetryFunc) + pc.deletePodsFromKubernetes = queue.New(cfg.DeletePodsFromKubernetesRateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler, cfg.DeletePodsFromKubernetesShouldRetryFunc) + pc.syncPodStatusFromProvider = queue.New(cfg.SyncPodStatusFromProviderRateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler, cfg.SyncPodStatusFromProviderShouldRetryFunc) return pc, nil } diff --git a/node/queue.go b/node/queue.go new file mode 100644 index 000000000..0a9137090 --- /dev/null +++ b/node/queue.go @@ -0,0 +1,34 @@ +package node + +import ( + "github.com/virtual-kubelet/virtual-kubelet/internal/queue" +) + +// These are exportable definitions of the queue package: + +// ShouldRetryFunc is a mechanism to have a custom retry policy +// +// it is passed metadata about the work item when the handler returns an error. It returns the following: +// * The key +// * The number of attempts that this item has already had (and failed) +// * The (potentially wrapped) error from the queue handler. +// +// The return value is an error, and optionally an amount to delay the work. +// If an error is returned, the work will be aborted, and the returned error is bubbled up. It can be the error that +// was passed in or that error can be wrapped. +// +// If the work item should be is to be retried, a delay duration may be specified. The delay is used to schedule when +// the item should begin processing relative to now, it does not necessarily dictate when the item will start work. +// Items are processed in the order they are scheduled. If the delay is nil, it will fall back to the default behaviour +// of the queue, and use the rate limiter that's configured to determine when to start work. +// +// If the delay is negative, the item will be scheduled "earlier" than now. This will result in the item being executed +// earlier than other items in the FIFO work order. +type ShouldRetryFunc = queue.ShouldRetryFunc + +// DefaultRetryFunc is the default function used for retries by the queue subsystem. Its only policy is that it gives up +// after MaxRetries, and falls back to the rate limiter for all other retries. +var DefaultRetryFunc = queue.DefaultRetryFunc + +// MaxRetries is the number of times we try to process a given key before permanently forgetting it. +var MaxRetries = queue.MaxRetries