Files
virtual-kubelet/internal/queue/queue.go
Sargun Dhillon ac9a1af564 Replace golang workqueue with our own
This is a fundamentally different API than that of the K8s workqueue
which is better suited for our needs. Specifically, we need a simple
queue which doesn't have complex features like delayed adds that
sit on "external" goroutines.

In addition, we need deep introspection into the operations of the
workqueue. Although you can get this on top of the K8s workqueue
by implementing a custom rate limiter, the problem is that
the underlying rate limiter's behaviour is still somewhat
opaque.

This basically has 100% code coverage.
2021-02-08 11:07:03 -08:00

431 lines
12 KiB
Go

// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"container/list"
"context"
"fmt"
"sync"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"golang.org/x/sync/semaphore"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
)
const (
// MaxRetries is the number of times we try to process a given key before permanently forgetting it.
MaxRetries = 20
)
// ItemHandler is a callback that handles a single key on the Queue
type ItemHandler func(ctx context.Context, key string) error
// Queue implements a wrapper around workqueue with native VK instrumentation
type Queue struct {
// clock is used for testing
clock clock.Clock
// lock protects running, and the items list / map
lock sync.Mutex
running bool
name string
handler ItemHandler
ratelimiter workqueue.RateLimiter
// items are items that are marked dirty waiting for processing.
items *list.List
// itemInQueue is a map of (string) key -> item while it is in the items list
itemsInQueue map[string]*list.Element
// itemsBeingProcessed is a map of (string) key -> item once it has been moved
itemsBeingProcessed map[string]*queueItem
// Wait for next semaphore is an exclusive (1 item) lock that is taken every time items is checked to see if there
// is an item in queue for work
waitForNextItemSemaphore *semaphore.Weighted
// wakeup
wakeupCh chan struct{}
}
type queueItem struct {
key string
plannedToStartWorkAt time.Time
redirtiedAt time.Time
redirtiedWithRatelimit bool
forget bool
requeues int
// Debugging information only
originallyAdded time.Time
addedViaRedirty bool
delayedViaRateLimit *time.Duration
}
func (item *queueItem) String() string {
return fmt.Sprintf("<plannedToStartWorkAt:%s key: %s>", item.plannedToStartWorkAt.String(), item.key)
}
// New creates a queue
//
// It expects to get a item rate limiter, and a friendly name which is used in logs, and
// in the internal kubernetes metrics.
func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Queue {
return &Queue{
clock: clock.RealClock{},
name: name,
ratelimiter: ratelimiter,
items: list.New(),
itemsBeingProcessed: make(map[string]*queueItem),
itemsInQueue: make(map[string]*list.Element),
handler: handler,
wakeupCh: make(chan struct{}, 1),
waitForNextItemSemaphore: semaphore.NewWeighted(1),
}
}
// Enqueue enqueues the key in a rate limited fashion
func (q *Queue) Enqueue(key string) {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(key, true, 0)
}
// EnqueueWithoutRateLimit enqueues the key without a rate limit
func (q *Queue) EnqueueWithoutRateLimit(key string) {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(key, false, 0)
}
// Forget forgets the key
func (q *Queue) Forget(key string) {
q.lock.Lock()
defer q.lock.Unlock()
if item, ok := q.itemsInQueue[key]; ok {
delete(q.itemsInQueue, key)
q.items.Remove(item)
}
if qi, ok := q.itemsBeingProcessed[key]; ok {
qi.forget = true
}
}
// insert inserts a new item to be processed at time time. It will not further delay items if when is later than the
// original time the item was scheduled to be processed. If when is earlier, it will "bring it forward"
func (q *Queue) insert(key string, ratelimit bool, delay time.Duration) *queueItem {
defer func() {
select {
case q.wakeupCh <- struct{}{}:
default:
}
}()
// First see if the item is already being processed
if item, ok := q.itemsBeingProcessed[key]; ok {
when := q.clock.Now().Add(delay)
// Is the item already been redirtied?
if item.redirtiedAt.IsZero() {
item.redirtiedAt = when
item.redirtiedWithRatelimit = ratelimit
} else if when.Before(item.redirtiedAt) {
item.redirtiedAt = when
item.redirtiedWithRatelimit = ratelimit
}
item.forget = false
return item
}
// Is the item already in the queue?
if item, ok := q.itemsInQueue[key]; ok {
qi := item.Value.(*queueItem)
when := q.clock.Now().Add(delay)
q.adjustPosition(qi, item, when)
return qi
}
now := q.clock.Now()
val := &queueItem{
key: key,
plannedToStartWorkAt: now,
originallyAdded: now,
}
if ratelimit {
if delay > 0 {
panic("Non-zero delay with rate limiting not supported")
}
ratelimitDelay := q.ratelimiter.When(key)
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(ratelimitDelay)
val.delayedViaRateLimit = &ratelimitDelay
} else {
val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(delay)
}
for item := q.items.Back(); item != nil; item = item.Prev() {
qi := item.Value.(*queueItem)
if qi.plannedToStartWorkAt.Before(val.plannedToStartWorkAt) {
q.itemsInQueue[key] = q.items.InsertAfter(val, item)
return val
}
}
q.itemsInQueue[key] = q.items.PushFront(val)
return val
}
func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.Time) {
if when.After(qi.plannedToStartWorkAt) {
// The item has already been delayed appropriately
return
}
qi.plannedToStartWorkAt = when
for prev := element.Prev(); prev != nil; prev = prev.Prev() {
item := prev.Value.(*queueItem)
// does this item plan to start work *before* the new time? If so add it
if item.plannedToStartWorkAt.Before(when) {
q.items.MoveAfter(element, prev)
return
}
}
q.items.MoveToFront(element)
}
// EnqueueWithoutRateLimitWithDelay enqueues without rate limiting, but work will not start for this given delay period
func (q *Queue) EnqueueWithoutRateLimitWithDelay(key string, after time.Duration) {
q.lock.Lock()
defer q.lock.Unlock()
q.insert(key, false, after)
}
// Empty returns if the queue has no items in it
//
// It should only be used for debugging.
func (q *Queue) Empty() bool {
return q.Len() == 0
}
// Len includes items that are in the queue, and are being processed
func (q *Queue) Len() int {
q.lock.Lock()
defer q.lock.Unlock()
if q.items.Len() != len(q.itemsInQueue) {
panic("Internally inconsistent state")
}
return q.items.Len() + len(q.itemsBeingProcessed)
}
// Run starts the workers
//
// It blocks until context is cancelled, and all of the workers exit.
func (q *Queue) Run(ctx context.Context, workers int) {
if workers <= 0 {
panic(fmt.Sprintf("Workers must be greater than 0, got: %d", workers))
}
q.lock.Lock()
if q.running {
panic(fmt.Sprintf("Queue %s is already running", q.name))
}
q.running = true
q.lock.Unlock()
defer func() {
q.lock.Lock()
defer q.lock.Unlock()
q.running = false
}()
// Make sure all workers are stopped before we finish up.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
group := &wait.Group{}
for i := 0; i < workers; i++ {
// This is required because i is referencing a mutable variable and that's running in a separate goroutine
idx := i
group.StartWithContext(ctx, func(ctx context.Context) {
q.worker(ctx, idx)
})
}
defer group.Wait()
<-ctx.Done()
}
func (q *Queue) worker(ctx context.Context, i int) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(map[string]interface{}{
"workerId": i,
"queue": q.name,
}))
for q.handleQueueItem(ctx) {
}
}
func (q *Queue) getNextItem(ctx context.Context) (*queueItem, error) {
if err := q.waitForNextItemSemaphore.Acquire(ctx, 1); err != nil {
return nil, err
}
defer q.waitForNextItemSemaphore.Release(1)
for {
q.lock.Lock()
element := q.items.Front()
if element == nil {
// Wait for the next item
q.lock.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.wakeupCh:
}
} else {
qi := element.Value.(*queueItem)
timeUntilProcessing := time.Until(qi.plannedToStartWorkAt)
// Do we need to sleep? If not, let's party.
if timeUntilProcessing <= 0 {
q.itemsBeingProcessed[qi.key] = qi
q.items.Remove(element)
delete(q.itemsInQueue, qi.key)
q.lock.Unlock()
return qi, nil
}
q.lock.Unlock()
if err := func() error {
timer := q.clock.NewTimer(timeUntilProcessing)
defer timer.Stop()
select {
case <-timer.C():
case <-ctx.Done():
return ctx.Err()
case <-q.wakeupCh:
}
return nil
}(); err != nil {
return nil, err
}
}
}
}
// handleQueueItem handles a single item
//
// A return value of "false" indicates that further processing should be stopped.
func (q *Queue) handleQueueItem(ctx context.Context) bool {
ctx, span := trace.StartSpan(ctx, "handleQueueItem")
defer span.End()
qi, err := q.getNextItem(ctx)
if err != nil {
span.SetStatus(err)
return false
}
// We expect strings to come off the work Queue.
// These are of the form namespace/name.
// We do this as the delayed nature of the work Queue means the items in the informer cache may actually be more u
// to date that when the item was initially put onto the workqueue.
ctx = span.WithField(ctx, "key", qi.key)
log.G(ctx).Debug("Got Queue object")
err = q.handleQueueItemObject(ctx, qi)
if err != nil {
// We've actually hit an error, so we set the span's status based on the error.
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error processing Queue item")
return true
}
log.G(ctx).Debug("Processed Queue item")
return true
}
func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error {
// This is a separate function / span, because the handleQueueItem span is the time spent waiting for the object
// plus the time spend handling the object. Instead, this function / span is scoped to a single object.
ctx, span := trace.StartSpan(ctx, "handleQueueItemObject")
defer span.End()
ctx = span.WithFields(ctx, map[string]interface{}{
"requeues": qi.requeues,
"originallyAdded": qi.originallyAdded.String(),
"addedViaRedirty": qi.addedViaRedirty,
"plannedForWork": qi.plannedToStartWorkAt.String(),
})
if qi.delayedViaRateLimit != nil {
ctx = span.WithField(ctx, "delayedViaRateLimit", qi.delayedViaRateLimit.String())
}
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", qi.key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
err := q.handler(ctx, qi.key)
q.lock.Lock()
defer q.lock.Unlock()
delete(q.itemsBeingProcessed, qi.key)
if qi.forget {
q.ratelimiter.Forget(qi.key)
log.G(ctx).WithError(err).Warnf("forgetting %q as told to forget while in progress", qi.key)
return nil
}
if err != nil {
if qi.requeues+1 < MaxRetries {
// Put the item back on the work Queue to handle any transient errors.
log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", qi.key)
newQI := q.insert(qi.key, true, 0)
newQI.requeues = qi.requeues + 1
newQI.originallyAdded = qi.originallyAdded
return nil
}
err = pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", qi.key)
}
// We've exceeded the maximum retries or we were successful.
q.ratelimiter.Forget(qi.key)
if !qi.redirtiedAt.IsZero() {
newQI := q.insert(qi.key, qi.redirtiedWithRatelimit, time.Until(qi.redirtiedAt))
newQI.addedViaRedirty = true
}
return err
}
func (q *Queue) String() string {
q.lock.Lock()
defer q.lock.Unlock()
items := make([]string, 0, q.items.Len())
for next := q.items.Front(); next != nil; next = next.Next() {
items = append(items, next.Value.(*queueItem).String())
}
return fmt.Sprintf("<items:%s>", items)
}