197 lines
6.2 KiB
Go
197 lines
6.2 KiB
Go
// Copyright © 2017 The virtual-kubelet authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package queue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
pkgerrors "github.com/pkg/errors"
|
|
"github.com/virtual-kubelet/virtual-kubelet/log"
|
|
"github.com/virtual-kubelet/virtual-kubelet/trace"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/util/workqueue"
|
|
)
|
|
|
|
const (
|
|
// MaxRetries is the number of times we try to process a given key before permanently forgetting it.
|
|
MaxRetries = 20
|
|
)
|
|
|
|
// ItemHandler is a callback that handles a single key on the Queue
|
|
type ItemHandler func(ctx context.Context, key string) error
|
|
|
|
// Queue implements a wrapper around workqueue with native VK instrumentation
|
|
type Queue struct {
|
|
lock sync.Mutex
|
|
running bool
|
|
name string
|
|
workqueue workqueue.RateLimitingInterface
|
|
handler ItemHandler
|
|
}
|
|
|
|
// New creates a queue
|
|
//
|
|
// It expects to get a item rate limiter, and a friendly name which is used in logs, and
|
|
// in the internal kubernetes metrics.
|
|
func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Queue {
|
|
return &Queue{
|
|
name: name,
|
|
workqueue: workqueue.NewNamedRateLimitingQueue(ratelimiter, name),
|
|
handler: handler,
|
|
}
|
|
}
|
|
|
|
// Enqueue enqueues the key in a rate limited fashion
|
|
func (q *Queue) Enqueue(key string) {
|
|
q.workqueue.AddRateLimited(key)
|
|
}
|
|
|
|
// EnqueueWithoutRateLimit enqueues the key without a rate limit
|
|
func (q *Queue) EnqueueWithoutRateLimit(key string) {
|
|
q.workqueue.Add(key)
|
|
}
|
|
|
|
// Forget forgets the key
|
|
func (q *Queue) Forget(key string) {
|
|
q.workqueue.Forget(key)
|
|
}
|
|
|
|
// EnqueueAfter enqueues the item after this period
|
|
//
|
|
// Since it wrap workqueue semantics, if an item has been enqueued after, and it is immediately scheduled for work,
|
|
// it will process the immediate item, and then upon the latter delayed processing it will be processed again
|
|
func (q *Queue) EnqueueAfter(key string, after time.Duration) {
|
|
q.workqueue.AddAfter(key, after)
|
|
}
|
|
|
|
// Empty returns if the queue has no items in it
|
|
//
|
|
// It should only be used for debugging, as delayed items are not counted, leading to confusion
|
|
func (q *Queue) Empty() bool {
|
|
return q.workqueue.Len() == 0
|
|
}
|
|
|
|
// Run starts the workers
|
|
//
|
|
// It blocks until context is cancelled, and all of the workers exit.
|
|
func (q *Queue) Run(ctx context.Context, workers int) {
|
|
if workers <= 0 {
|
|
panic(fmt.Sprintf("Workers must be greater than 0, got: %d", workers))
|
|
}
|
|
|
|
q.lock.Lock()
|
|
if q.running {
|
|
panic(fmt.Sprintf("Queue %s is already running", q.name))
|
|
}
|
|
q.running = true
|
|
q.lock.Unlock()
|
|
defer func() {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
q.running = false
|
|
}()
|
|
|
|
// Make sure all workers are stopped before we finish up.
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
group := &wait.Group{}
|
|
for i := 0; i < workers; i++ {
|
|
group.StartWithContext(ctx, func(ctx context.Context) {
|
|
q.worker(ctx, i)
|
|
})
|
|
}
|
|
defer group.Wait()
|
|
<-ctx.Done()
|
|
q.workqueue.ShutDown()
|
|
}
|
|
|
|
func (q *Queue) worker(ctx context.Context, i int) {
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(map[string]interface{}{
|
|
"workerId": i,
|
|
"queue": q.name,
|
|
}))
|
|
for q.handleQueueItem(ctx) {
|
|
}
|
|
}
|
|
|
|
// handleQueueItem handles a single item
|
|
//
|
|
// A return value of "false" indicates that further processing should be stopped.
|
|
func (q *Queue) handleQueueItem(ctx context.Context) bool {
|
|
ctx, span := trace.StartSpan(ctx, "handleQueueItem")
|
|
defer span.End()
|
|
|
|
obj, shutdown := q.workqueue.Get()
|
|
if shutdown {
|
|
return false
|
|
}
|
|
|
|
// We expect strings to come off the work Queue.
|
|
// These are of the form namespace/name.
|
|
// We do this as the delayed nature of the work Queue means the items in the informer cache may actually be more u
|
|
// to date that when the item was initially put onto the workqueue.
|
|
key := obj.(string)
|
|
ctx = span.WithField(ctx, "key", key)
|
|
log.G(ctx).Debug("Got Queue object")
|
|
|
|
err := q.handleQueueItemObject(ctx, key)
|
|
if err != nil {
|
|
// We've actually hit an error, so we set the span's status based on the error.
|
|
span.SetStatus(err)
|
|
log.G(ctx).WithError(err).Error("Error processing Queue item")
|
|
return true
|
|
}
|
|
log.G(ctx).Debug("Processed Queue item")
|
|
|
|
return true
|
|
}
|
|
|
|
func (q *Queue) handleQueueItemObject(ctx context.Context, key string) error {
|
|
// This is a separate function / span, because the handleQueueItem span is the time spent waiting for the object
|
|
// plus the time spend handling the object. Instead, this function / span is scoped to a single object.
|
|
ctx, span := trace.StartSpan(ctx, "handleQueueItemObject")
|
|
defer span.End()
|
|
ctx = span.WithField(ctx, "key", key)
|
|
|
|
// We call Done here so the work Queue knows we have finished processing this item.
|
|
// We also must remember to call Forget if we do not want this work item being re-queued.
|
|
// For example, we do not call Forget if a transient error occurs.
|
|
// Instead, the item is put back on the work Queue and attempted again after a back-off period.
|
|
defer q.workqueue.Done(key)
|
|
|
|
// Add the current key as an attribute to the current span.
|
|
ctx = span.WithField(ctx, "key", key)
|
|
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
|
|
if err := q.handler(ctx, key); err != nil {
|
|
if q.workqueue.NumRequeues(key) < MaxRetries {
|
|
// Put the item back on the work Queue to handle any transient errors.
|
|
log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key)
|
|
q.workqueue.AddRateLimited(key)
|
|
return nil
|
|
}
|
|
// We've exceeded the maximum retries, so we must Forget the key.
|
|
q.workqueue.Forget(key)
|
|
return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key)
|
|
}
|
|
// Finally, if no error occurs we Forget this item so it does not get queued again until another change happens.
|
|
q.workqueue.Forget(key)
|
|
|
|
return nil
|
|
}
|