Files
virtual-kubelet/internal/queue/queue.go
Sargun Dhillon b259cb0548 Add the ability to dictate custom retries
Our current retry policy is naive and only does 20 retries. It is
also based off of the rate limiter. If the user is somewhat aggressive in
rate limiting, but they have a temporary outage on API server, they
may want to continue to delay.

In facts, K8s has a built-in function to suggest delays:
https://pkg.go.dev/k8s.io/apimachinery/pkg/api/errors#SuggestsClientDelay

Signed-off-by: Sargun Dhillon <sargun@sargun.me>
2021-04-14 10:52:26 -07:00

503 lines
14 KiB
Go

// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"container/list"
"context"
"fmt"
"sync"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"golang.org/x/sync/semaphore"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
)
const (
// MaxRetries is the number of times we try to process a given key before permanently forgetting it.
MaxRetries = 20
)
// ShouldRetryFunc is a mechanism to have a custom retry policy
type ShouldRetryFunc func(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error)
// ItemHandler is a callback that handles a single key on the Queue
type ItemHandler func(ctx context.Context, key string) error
// Queue implements a wrapper around workqueue with native VK instrumentation
type Queue struct {
// clock is used for testing
clock clock.Clock
// lock protects running, and the items list / map
lock sync.Mutex
running bool
name string
handler ItemHandler
ratelimiter workqueue.RateLimiter
// items are items that are marked dirty waiting for processing.
items *list.List
// itemInQueue is a map of (string) key -> item while it is in the items list
itemsInQueue map[string]*list.Element
// itemsBeingProcessed is a map of (string) key -> item once it has been moved
itemsBeingProcessed map[string]*queueItem
// Wait for next semaphore is an exclusive (1 item) lock that is taken every time items is checked to see if there
// is an item in queue for work
waitForNextItemSemaphore *semaphore.Weighted
// wakeup
wakeupCh chan struct{}
retryFunc ShouldRetryFunc
}
type queueItem struct {
key string
plannedToStartWorkAt time.Time
redirtiedAt time.Time
redirtiedWithRatelimit bool
forget bool
requeues int
// Debugging information only
originallyAdded time.Time
addedViaRedirty bool
delayedViaRateLimit *time.Duration
}
func (item *queueItem) String() string {
return fmt.Sprintf("<plannedToStartWorkAt:%s key: %s>", item.plannedToStartWorkAt.String(), item.key)
}
// New creates a queue
//
// It expects to get a item rate limiter, and a friendly name which is used in logs, and in the internal kubernetes
// metrics. If retryFunc is nil, the default retry function.
func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler, retryFunc ShouldRetryFunc) *Queue {
if retryFunc == nil {
retryFunc = DefaultRetryFunc
}
return &Queue{
clock: clock.RealClock{},
name: name,
ratelimiter: ratelimiter,
items: list.New(),
itemsBeingProcessed: make(map[string]*queueItem),
itemsInQueue: make(map[string]*list.Element),
handler: handler,
wakeupCh: make(chan struct{}, 1),
waitForNextItemSemaphore: semaphore.NewWeighted(1),
retryFunc: retryFunc,
}
}
// Enqueue enqueues the key in a rate limited fashion
func (q *Queue) Enqueue(ctx context.Context, key string) {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(ctx, key, true, nil)
}
// EnqueueWithoutRateLimit enqueues the key without a rate limit
func (q *Queue) EnqueueWithoutRateLimit(ctx context.Context, key string) {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(ctx, key, false, nil)
}
// Forget forgets the key
func (q *Queue) Forget(ctx context.Context, key string) {
q.lock.Lock()
defer q.lock.Unlock()
ctx, span := trace.StartSpan(ctx, "Forget")
defer span.End()
ctx = span.WithFields(ctx, map[string]interface{}{
"queue": q.name,
"key": key,
})
if item, ok := q.itemsInQueue[key]; ok {
span.WithField(ctx, "status", "itemInQueue")
delete(q.itemsInQueue, key)
q.items.Remove(item)
return
}
if qi, ok := q.itemsBeingProcessed[key]; ok {
span.WithField(ctx, "status", "itemBeingProcessed")
qi.forget = true
return
}
span.WithField(ctx, "status", "notfound")
}
func durationDeref(duration *time.Duration, def time.Duration) time.Duration {
if duration == nil {
return def
}
return *duration
}
// insert inserts a new item to be processed at time time. It will not further delay items if when is later than the
// original time the item was scheduled to be processed. If when is earlier, it will "bring it forward"
// If ratelimit is specified, and delay is nil, then the ratelimiter's delay (return from When function) will be used
// If ratelimit is specified, and the delay is non-nil, then the delay value will be used
// If ratelimit is false, then only delay is used to schedule the work. If delay is nil, it will be considered 0.
func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay *time.Duration) *queueItem {
ctx, span := trace.StartSpan(ctx, "insert")
defer span.End()
ctx = span.WithFields(ctx, map[string]interface{}{
"queue": q.name,
"key": key,
"ratelimit": ratelimit,
})
if delay == nil {
ctx = span.WithField(ctx, "delay", "nil")
} else {
ctx = span.WithField(ctx, "delay", delay.String())
}
defer func() {
select {
case q.wakeupCh <- struct{}{}:
default:
}
}()
// First see if the item is already being processed
if item, ok := q.itemsBeingProcessed[key]; ok {
span.WithField(ctx, "status", "itemsBeingProcessed")
when := q.clock.Now().Add(durationDeref(delay, 0))
// Is the item already been redirtied?
if item.redirtiedAt.IsZero() {
item.redirtiedAt = when
item.redirtiedWithRatelimit = ratelimit
} else if when.Before(item.redirtiedAt) {
item.redirtiedAt = when
item.redirtiedWithRatelimit = ratelimit
}
item.forget = false
return item
}
// Is the item already in the queue?
if item, ok := q.itemsInQueue[key]; ok {
span.WithField(ctx, "status", "itemsInQueue")
qi := item.Value.(*queueItem)
when := q.clock.Now().Add(durationDeref(delay, 0))
q.adjustPosition(qi, item, when)
return qi
}
span.WithField(ctx, "status", "added")
now := q.clock.Now()
val := &queueItem{
key: key,
plannedToStartWorkAt: now,
originallyAdded: now,
}
if ratelimit {
actualDelay := q.ratelimiter.When(key)
// Check if delay is overridden
if delay != nil {
actualDelay = *delay
}
span.WithField(ctx, "delay", actualDelay.String())
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(actualDelay)
val.delayedViaRateLimit = &actualDelay
} else {
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(durationDeref(delay, 0))
}
for item := q.items.Back(); item != nil; item = item.Prev() {
qi := item.Value.(*queueItem)
if qi.plannedToStartWorkAt.Before(val.plannedToStartWorkAt) {
q.itemsInQueue[key] = q.items.InsertAfter(val, item)
return val
}
}
q.itemsInQueue[key] = q.items.PushFront(val)
return val
}
func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.Time) {
if when.After(qi.plannedToStartWorkAt) {
// The item has already been delayed appropriately
return
}
qi.plannedToStartWorkAt = when
for prev := element.Prev(); prev != nil; prev = prev.Prev() {
item := prev.Value.(*queueItem)
// does this item plan to start work *before* the new time? If so add it
if item.plannedToStartWorkAt.Before(when) {
q.items.MoveAfter(element, prev)
return
}
}
q.items.MoveToFront(element)
}
// EnqueueWithoutRateLimitWithDelay enqueues without rate limiting, but work will not start for this given delay period
func (q *Queue) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key string, after time.Duration) {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(ctx, key, false, &after)
}
// Empty returns if the queue has no items in it
//
// It should only be used for debugging.
func (q *Queue) Empty() bool {
return q.Len() == 0
}
// Len includes items that are in the queue, and are being processed
func (q *Queue) Len() int {
q.lock.Lock()
defer q.lock.Unlock()
if q.items.Len() != len(q.itemsInQueue) {
panic("Internally inconsistent state")
}
return q.items.Len() + len(q.itemsBeingProcessed)
}
// Run starts the workers
//
// It blocks until context is cancelled, and all of the workers exit.
func (q *Queue) Run(ctx context.Context, workers int) {
if workers <= 0 {
panic(fmt.Sprintf("Workers must be greater than 0, got: %d", workers))
}
q.lock.Lock()
if q.running {
panic(fmt.Sprintf("Queue %s is already running", q.name))
}
q.running = true
q.lock.Unlock()
defer func() {
q.lock.Lock()
defer q.lock.Unlock()
q.running = false
}()
// Make sure all workers are stopped before we finish up.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
group := &wait.Group{}
for i := 0; i < workers; i++ {
// This is required because i is referencing a mutable variable and that's running in a separate goroutine
idx := i
group.StartWithContext(ctx, func(ctx context.Context) {
q.worker(ctx, idx)
})
}
defer group.Wait()
<-ctx.Done()
}
func (q *Queue) worker(ctx context.Context, i int) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(map[string]interface{}{
"workerId": i,
"queue": q.name,
}))
for q.handleQueueItem(ctx) {
}
}
func (q *Queue) getNextItem(ctx context.Context) (*queueItem, error) {
if err := q.waitForNextItemSemaphore.Acquire(ctx, 1); err != nil {
return nil, err
}
defer q.waitForNextItemSemaphore.Release(1)
for {
q.lock.Lock()
element := q.items.Front()
if element == nil {
// Wait for the next item
q.lock.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.wakeupCh:
}
} else {
qi := element.Value.(*queueItem)
timeUntilProcessing := time.Until(qi.plannedToStartWorkAt)
// Do we need to sleep? If not, let's party.
if timeUntilProcessing <= 0 {
q.itemsBeingProcessed[qi.key] = qi
q.items.Remove(element)
delete(q.itemsInQueue, qi.key)
q.lock.Unlock()
return qi, nil
}
q.lock.Unlock()
if err := func() error {
timer := q.clock.NewTimer(timeUntilProcessing)
defer timer.Stop()
select {
case <-timer.C():
case <-ctx.Done():
return ctx.Err()
case <-q.wakeupCh:
}
return nil
}(); err != nil {
return nil, err
}
}
}
}
// handleQueueItem handles a single item
//
// A return value of "false" indicates that further processing should be stopped.
func (q *Queue) handleQueueItem(ctx context.Context) bool {
ctx, span := trace.StartSpan(ctx, "handleQueueItem")
defer span.End()
qi, err := q.getNextItem(ctx)
if err != nil {
span.SetStatus(err)
return false
}
// We expect strings to come off the work Queue.
// These are of the form namespace/name.
// We do this as the delayed nature of the work Queue means the items in the informer cache may actually be more u
// to date that when the item was initially put onto the workqueue.
ctx = span.WithField(ctx, "key", qi.key)
log.G(ctx).Debug("Got Queue object")
err = q.handleQueueItemObject(ctx, qi)
if err != nil {
// We've actually hit an error, so we set the span's status based on the error.
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error processing Queue item")
return true
}
log.G(ctx).Debug("Processed Queue item")
return true
}
func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error {
// This is a separate function / span, because the handleQueueItem span is the time spent waiting for the object
// plus the time spend handling the object. Instead, this function / span is scoped to a single object.
ctx, span := trace.StartSpan(ctx, "handleQueueItemObject")
defer span.End()
ctx = span.WithFields(ctx, map[string]interface{}{
"requeues": qi.requeues,
"originallyAdded": qi.originallyAdded.String(),
"addedViaRedirty": qi.addedViaRedirty,
"plannedForWork": qi.plannedToStartWorkAt.String(),
})
if qi.delayedViaRateLimit != nil {
ctx = span.WithField(ctx, "delayedViaRateLimit", qi.delayedViaRateLimit.String())
}
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", qi.key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
err := q.handler(ctx, qi.key)
q.lock.Lock()
defer q.lock.Unlock()
delete(q.itemsBeingProcessed, qi.key)
if qi.forget {
q.ratelimiter.Forget(qi.key)
log.G(ctx).WithError(err).Warnf("forgetting %q as told to forget while in progress", qi.key)
return nil
}
if err != nil {
ctx = span.WithField(ctx, "error", err.Error())
var delay *time.Duration
// Stash the original error for logging below
originalError := err
delay, err = q.retryFunc(ctx, qi.key, qi.requeues+1, qi.originallyAdded, err)
if err == nil {
// Put the item back on the work Queue to handle any transient errors.
log.G(ctx).WithError(originalError).Warnf("requeuing %q due to failed sync", qi.key)
newQI := q.insert(ctx, qi.key, true, delay)
newQI.requeues = qi.requeues + 1
newQI.originallyAdded = qi.originallyAdded
return nil
}
if !qi.redirtiedAt.IsZero() {
err = fmt.Errorf("temporarily (requeued) forgetting %q due to: %w", qi.key, err)
} else {
err = fmt.Errorf("forgetting %q due to: %w", qi.key, err)
}
}
// We've exceeded the maximum retries or we were successful.
q.ratelimiter.Forget(qi.key)
if !qi.redirtiedAt.IsZero() {
delay := time.Until(qi.redirtiedAt)
newQI := q.insert(ctx, qi.key, qi.redirtiedWithRatelimit, &delay)
newQI.addedViaRedirty = true
}
span.SetStatus(err)
return err
}
func (q *Queue) String() string {
q.lock.Lock()
defer q.lock.Unlock()
items := make([]string, 0, q.items.Len())
for next := q.items.Front(); next != nil; next = next.Next() {
items = append(items, next.Value.(*queueItem).String())
}
return fmt.Sprintf("<items:%s>", items)
}
// DefaultRetryFunc is the default function used for retries by the queue subsystem.
func DefaultRetryFunc(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error) {
if timesTried < MaxRetries {
return nil, nil
}
return nil, pkgerrors.Wrapf(err, "maximum retries (%d) reached", MaxRetries)
}