Refactor queue code

This refactor is a preparation for another commit. I want to add instrumentation
around our queues. The code of how queues were handled was spread throughout
the code base, and that made adding such instrumentation nice and complicated.

This centralizes the queue management logic in queue.go, and only requires
the user to provide a (custom) rate limiter, if they want to, a name,
and a handler.

The lease code is moved into its own package to simplify testing, because
the goroutine leak tester was triggering incorrectly if other tests
were running, and it was measuring leaks from those tests.

This also identified buggy behaviour:

wq := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter(), "test")
wq.AddRateLimited("hi")
fmt.Printf("Added hi, len: %d\n", wq.Len())

wq.Forget("hi")
fmt.Printf("Forgot hi, len: %d\n", wq.Len())

wq.Done("hi")
fmt.Printf("Done hi, len: %d\n", wq.Len())

---
Prints all 0s because event non-delayed items are delayed. If you call Add
directly, then the last line prints a len of 2.

// Workqueue docs:
// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.

^----- Even this seems untrue
This commit is contained in:
Sargun Dhillon
2020-12-18 03:27:04 -08:00
parent 735eb34829
commit 1b8597647b
9 changed files with 391 additions and 227 deletions

2
go.mod
View File

@@ -18,8 +18,10 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
go.opencensus.io v0.21.0
go.uber.org/goleak v1.1.10
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gotest.tools v2.2.0+incompatible
k8s.io/api v0.18.6
k8s.io/apimachinery v0.18.6

5
go.sum
View File

@@ -608,6 +608,8 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
@@ -643,6 +645,7 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190409202823-959b441ac422 h1:QzoH/1pFpZguR8NrRHLcO6jKqfv2zpuSqZLgdm7ZmjI=
golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -764,6 +767,8 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190909030654-5b82db07426d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72 h1:bw9doJza/SFBEweII/rHQh338oozWyiFsBRHtrflcws=
golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

196
internal/queue/queue.go Normal file
View File

@@ -0,0 +1,196 @@
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
"context"
"fmt"
"sync"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
const (
// MaxRetries is the number of times we try to process a given key before permanently forgetting it.
MaxRetries = 20
)
// ItemHandler is a callback that handles a single key on the Queue
type ItemHandler func(ctx context.Context, key string) error
// Queue implements a wrapper around workqueue with native VK instrumentation
type Queue struct {
lock sync.Mutex
running bool
name string
workqueue workqueue.RateLimitingInterface
handler ItemHandler
}
// New creates a queue
//
// It expects to get a item rate limiter, and a friendly name which is used in logs, and
// in the internal kubernetes metrics.
func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Queue {
return &Queue{
name: name,
workqueue: workqueue.NewNamedRateLimitingQueue(ratelimiter, name),
handler: handler,
}
}
// Enqueue enqueues the key in a rate limited fashion
func (q *Queue) Enqueue(key string) {
q.workqueue.AddRateLimited(key)
}
// EnqueueWithoutRateLimit enqueues the key without a rate limit
func (q *Queue) EnqueueWithoutRateLimit(key string) {
q.workqueue.Add(key)
}
// Forget forgets the key
func (q *Queue) Forget(key string) {
q.workqueue.Forget(key)
}
// EnqueueAfter enqueues the item after this period
//
// Since it wrap workqueue semantics, if an item has been enqueued after, and it is immediately scheduled for work,
// it will process the immediate item, and then upon the latter delayed processing it will be processed again
func (q *Queue) EnqueueAfter(key string, after time.Duration) {
q.workqueue.AddAfter(key, after)
}
// Empty returns if the queue has no items in it
//
// It should only be used for debugging, as delayed items are not counted, leading to confusion
func (q *Queue) Empty() bool {
return q.workqueue.Len() == 0
}
// Run starts the workers
//
// It blocks until context is cancelled, and all of the workers exit.
func (q *Queue) Run(ctx context.Context, workers int) {
if workers <= 0 {
panic(fmt.Sprintf("Workers must be greater than 0, got: %d", workers))
}
q.lock.Lock()
if q.running {
panic(fmt.Sprintf("Queue %s is already running", q.name))
}
q.running = true
q.lock.Unlock()
defer func() {
q.lock.Lock()
defer q.lock.Unlock()
q.running = false
}()
// Make sure all workers are stopped before we finish up.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
group := &wait.Group{}
for i := 0; i < workers; i++ {
group.StartWithContext(ctx, func(ctx context.Context) {
q.worker(ctx, i)
})
}
defer group.Wait()
<-ctx.Done()
q.workqueue.ShutDown()
}
func (q *Queue) worker(ctx context.Context, i int) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(map[string]interface{}{
"workerId": i,
"Queue": q.name,
}))
for q.handleQueueItem(ctx) {
}
}
// handleQueueItem handles a single item
//
// A return value of "false" indicates that further processing should be stopped.
func (q *Queue) handleQueueItem(ctx context.Context) bool {
ctx, span := trace.StartSpan(ctx, "handleQueueItem")
defer span.End()
obj, shutdown := q.workqueue.Get()
if shutdown {
return false
}
// We expect strings to come off the work Queue.
// These are of the form namespace/name.
// We do this as the delayed nature of the work Queue means the items in the informer cache may actually be more u
// to date that when the item was initially put onto the workqueue.
key := obj.(string)
ctx = span.WithField(ctx, "key", key)
log.G(ctx).Debug("Got Queue object")
err := q.handleQueueItemObject(ctx, key)
if err != nil {
// We've actually hit an error, so we set the span's status based on the error.
span.SetStatus(err)
log.G(ctx).WithError(err).Error("Error processing Queue item")
return true
}
log.G(ctx).Debug("Processed Queue item")
return true
}
func (q *Queue) handleQueueItemObject(ctx context.Context, key string) error {
// This is a separate function / span, because the handleQueueItem span is the time spent waiting for the object
// plus the time spend handling the object. Instead, this function / span is scoped to a single object.
ctx, span := trace.StartSpan(ctx, "handleQueueItemObject")
defer span.End()
ctx = span.WithField(ctx, "key", key)
// We call Done here so the work Queue knows we have finished processing this item.
// We also must remember to call Forget if we do not want this work item being re-queued.
// For example, we do not call Forget if a transient error occurs.
// Instead, the item is put back on the work Queue and attempted again after a back-off period.
defer q.workqueue.Done(key)
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
if err := q.handler(ctx, key); err != nil {
if q.workqueue.NumRequeues(key) < MaxRetries {
// Put the item back on the work Queue to handle any transient errors.
log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key)
q.workqueue.AddRateLimited(key)
return nil
}
// We've exceeded the maximum retries, so we must Forget the key.
q.workqueue.Forget(key)
return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key)
}
// Finally, if no error occurs we Forget this item so it does not get queued again until another change happens.
q.workqueue.Forget(key)
return nil
}

View File

@@ -0,0 +1,115 @@
package queue
import (
"context"
"errors"
"strconv"
"sync"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"go.uber.org/goleak"
"golang.org/x/time/rate"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
func TestQueueMaxRetries(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
ctx = log.WithLogger(ctx, logruslogger.FromLogrus(logrus.NewEntry(logger)))
n := 0
knownErr := errors.New("Testing error")
handler := func(ctx context.Context, key string) error {
n++
return knownErr
}
wq := New(workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), t.Name(), handler)
wq.Enqueue("test")
for n < MaxRetries {
assert.Assert(t, wq.handleQueueItem(ctx))
}
assert.Assert(t, is.Equal(n, MaxRetries))
assert.Assert(t, is.Equal(0, wq.workqueue.Len()))
}
func TestForget(t *testing.T) {
t.Parallel()
handler := func(ctx context.Context, key string) error {
panic("Should never be called")
}
wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler)
wq.Forget("val")
assert.Assert(t, is.Equal(0, wq.workqueue.Len()))
v := "test"
wq.EnqueueWithoutRateLimit(v)
assert.Assert(t, is.Equal(1, wq.workqueue.Len()))
t.Skip("This is broken")
// Workqueue docs:
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
// Even if you do this, it doesn't work: https://play.golang.com/p/8vfL_RCsFGI
assert.Assert(t, is.Equal(0, wq.workqueue.Len()))
}
func TestQueueTerminate(t *testing.T) {
t.Parallel()
defer goleak.VerifyNone(t,
// Ignore existing goroutines
goleak.IgnoreCurrent(),
// Ignore klog background flushers
goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("k8s.io/klog/v2.(*loggingT).flushDaemon"),
// Workqueue runs a goroutine in the background to handle background functions. AFAICT, they're unkillable
// and are designed to stop after a certain idle window
goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*Type).updateUnfinishedWorkLoop"),
goleak.IgnoreTopFunction("k8s.io/client-go/util/workqueue.(*delayingType).waitingLoop"),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testMap := &sync.Map{}
handler := func(ctx context.Context, key string) error {
testMap.Store(key, struct{}{})
return nil
}
wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler)
group := &wait.Group{}
group.StartWithContext(ctx, func(ctx context.Context) {
wq.Run(ctx, 10)
})
for i := 0; i < 1000; i++ {
wq.EnqueueWithoutRateLimit(strconv.Itoa(i))
}
for wq.workqueue.Len() > 0 {
time.Sleep(100 * time.Millisecond)
}
for i := 0; i < 1000; i++ {
_, ok := testMap.Load(strconv.Itoa(i))
assert.Assert(t, ok, "Item %d missing", i)
}
cancel()
group.Wait()
}

View File

@@ -335,7 +335,7 @@ func testTerminalStatePodScenario(ctx context.Context, t *testing.T, s *system,
// Start the pod controller
assert.NilError(t, s.start(ctx))
for s.pc.k8sQ.Len() > 0 {
for !s.pc.syncPodsFromKubernetes.Empty() {
time.Sleep(10 * time.Millisecond)
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/internal/podutils"
"github.com/virtual-kubelet/virtual-kubelet/internal/queue"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
@@ -29,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
@@ -264,8 +264,8 @@ func (pc *PodController) updatePodStatus(ctx context.Context, podFromKubernetes
// enqueuePodStatusUpdate updates our pod status map, and marks the pod as dirty in the workqueue. The pod must be DeepCopy'd
// prior to enqueuePodStatusUpdate.
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue.RateLimitingInterface, pod *corev1.Pod) {
ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*maxRetries)
func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1.Pod) {
ctx, cancel := context.WithTimeout(ctx, notificationRetryPeriod*queue.MaxRetries)
defer cancel()
ctx, span := trace.StartSpan(ctx, "enqueuePodStatusUpdate")
@@ -330,11 +330,11 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, q workqueue
}
kpod.lastPodStatusReceivedFromProvider = pod
kpod.Unlock()
q.AddRateLimited(key)
pc.syncPodStatusFromProvider.Enqueue(key)
}
func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "podStatusHandler")
func (pc *PodController) syncPodStatusFromProviderHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "syncPodStatusFromProviderHandler")
defer span.End()
ctx = span.WithField(ctx, "key", key)
@@ -363,8 +363,8 @@ func (pc *PodController) podStatusHandler(ctx context.Context, key string) (retE
return pc.updatePodStatus(ctx, pod, key)
}
func (pc *PodController) deletePodHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
func (pc *PodController) deletePodsFromKubernetesHandler(ctx context.Context, key string) (retErr error) {
ctx, span := trace.StartSpan(ctx, "deletePodsFromKubernetesHandler")
defer span.End()
namespace, name, err := cache.SplitMetaNamespaceKey(key)

View File

@@ -17,11 +17,11 @@ package node
import (
"context"
"fmt"
"sync"
"testing"
"time"
testutil "github.com/virtual-kubelet/virtual-kubelet/internal/test/util"
"golang.org/x/time/rate"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
corev1 "k8s.io/api/core/v1"
@@ -44,23 +44,32 @@ func newTestController() *TestController {
rm := testutil.FakeResourceManager()
p := newMockProvider()
iFactory := kubeinformers.NewSharedInformerFactoryWithOptions(fk8s, 10*time.Minute)
rateLimiter := workqueue.NewMaxOfRateLimiter(
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
podController, err := NewPodController(PodControllerConfig{
PodClient: fk8s.CoreV1(),
PodInformer: iFactory.Core().V1().Pods(),
EventRecorder: testutil.FakeEventRecorder(5),
Provider: p,
ConfigMapInformer: iFactory.Core().V1().ConfigMaps(),
SecretInformer: iFactory.Core().V1().Secrets(),
ServiceInformer: iFactory.Core().V1().Services(),
RateLimiter: rateLimiter,
})
if err != nil {
panic(err)
}
// Override the resource manager in the contructor with our own.
podController.resourceManager = rm
return &TestController{
PodController: &PodController{
client: fk8s.CoreV1(),
provider: p,
resourceManager: rm,
recorder: testutil.FakeEventRecorder(5),
k8sQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletionQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podStatusQ: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
done: make(chan struct{}),
ready: make(chan struct{}),
knownPods: sync.Map{},
podsInformer: iFactory.Core().V1().Pods(),
podsLister: iFactory.Core().V1().Pods().Lister(),
},
mock: p,
client: fk8s,
PodController: podController,
mock: p,
client: fk8s,
}
}

View File

@@ -17,10 +17,11 @@ package node
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/virtual-kubelet/virtual-kubelet/internal/queue"
"github.com/google/go-cmp/cmp"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
@@ -29,6 +30,7 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
@@ -110,12 +112,13 @@ type PodController struct {
resourceManager *manager.ResourceManager
k8sQ workqueue.RateLimitingInterface
syncPodsFromKubernetes *queue.Queue
// deletionQ is a queue on which pods are reconciled, and we check if pods are in API server after grace period
deletionQ workqueue.RateLimitingInterface
// deletePodsFromKubernetes is a queue on which pods are reconciled, and we check if pods are in API server after
// the grace period
deletePodsFromKubernetes *queue.Queue
podStatusQ workqueue.RateLimitingInterface
syncPodStatusFromProvider *queue.Queue
// From the time of creation, to termination the knownPods map will contain the pods key
// (derived from Kubernetes' cache library) -> a *knownPod struct.
@@ -224,12 +227,13 @@ func NewPodController(cfg PodControllerConfig) (*PodController, error) {
ready: make(chan struct{}),
done: make(chan struct{}),
recorder: cfg.EventRecorder,
k8sQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodsFromKubernetes"),
deletionQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "deletePodsFromKubernetes"),
podStatusQ: workqueue.NewNamedRateLimitingQueue(cfg.RateLimiter, "syncPodStatusFromProvider"),
podEventFilterFunc: cfg.PodEventFilterFunc,
}
pc.syncPodsFromKubernetes = queue.New(cfg.RateLimiter, "syncPodsFromKubernetes", pc.syncPodFromKubernetesHandler)
pc.deletePodsFromKubernetes = queue.New(cfg.RateLimiter, "deletePodsFromKubernetes", pc.deletePodsFromKubernetesHandler)
pc.syncPodStatusFromProvider = queue.New(cfg.RateLimiter, "syncPodStatusFromProvider", pc.syncPodStatusFromProviderHandler)
return pc, nil
}
@@ -247,10 +251,11 @@ type asyncProvider interface {
// Once this returns, you should not re-use the controller.
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) {
// Shutdowns are idempotent, so we can call it multiple times. This is in case we have to bail out early for some reason.
// This is to make extra sure that any workers we started are terminated on exit
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
pc.k8sQ.ShutDown()
pc.deletionQ.ShutDown()
pc.mu.Lock()
pc.err = retErr
close(pc.done)
@@ -271,12 +276,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
pc.provider = provider
provider.NotifyPods(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, pc.podStatusQ, pod.DeepCopy())
pc.enqueuePodStatusUpdate(ctx, pod.DeepCopy())
})
go runProvider(ctx)
defer pc.podStatusQ.ShutDown()
// Wait for the caches to be synced *before* starting to do work.
if ok := cache.WaitForCacheSync(ctx.Done(), pc.podsInformer.Informer().HasSynced); !ok {
return pkgerrors.New("failed to wait for caches to sync")
@@ -293,7 +296,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
log.G(ctx).Error(err)
} else {
pc.knownPods.Store(key, &knownPod{})
pc.k8sQ.AddRateLimited(key)
pc.syncPodsFromKubernetes.Enqueue(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
@@ -319,14 +322,14 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
// This means that the pod in API server was changed by someone else [this can be okay], but we skipped
// a status update on our side because we compared the status received from the provider to the status
// received from the k8s api server based on outdated information.
pc.podStatusQ.AddRateLimited(key)
pc.syncPodStatusFromProvider.Enqueue(key)
// Reset this to avoid re-adding it continuously
kPod.lastPodStatusUpdateSkipped = false
}
kPod.Unlock()
if podShouldEnqueue(oldPod, newPod) {
pc.k8sQ.AddRateLimited(key)
pc.syncPodsFromKubernetes.Enqueue(key)
}
}
},
@@ -335,9 +338,9 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
log.G(ctx).Error(err)
} else {
pc.knownPods.Delete(key)
pc.k8sQ.AddRateLimited(key)
pc.syncPodsFromKubernetes.Enqueue(key)
// If this pod was in the deletion queue, forget about it
pc.deletionQ.Forget(key)
pc.deletePodsFromKubernetes.Forget(key)
}
},
}
@@ -363,46 +366,23 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
pc.deleteDanglingPods(ctx, podSyncWorkers)
log.G(ctx).Info("starting workers")
wg := sync.WaitGroup{}
// Use the worker's "index" as its ID so we can use it for tracing.
for id := 0; id < podSyncWorkers; id++ {
wg.Add(1)
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runSyncPodStatusFromProviderWorker(ctx, workerID, pc.podStatusQ)
}()
}
for id := 0; id < podSyncWorkers; id++ {
wg.Add(1)
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runSyncPodsFromKubernetesWorker(ctx, workerID, pc.k8sQ)
}()
}
for id := 0; id < podSyncWorkers; id++ {
wg.Add(1)
workerID := strconv.Itoa(id)
go func() {
defer wg.Done()
pc.runDeletionReconcilationWorker(ctx, workerID, pc.deletionQ)
}()
}
group := &wait.Group{}
group.StartWithContext(ctx, func(ctx context.Context) {
pc.syncPodsFromKubernetes.Run(ctx, podSyncWorkers)
})
group.StartWithContext(ctx, func(ctx context.Context) {
pc.deletePodsFromKubernetes.Run(ctx, podSyncWorkers)
})
group.StartWithContext(ctx, func(ctx context.Context) {
pc.syncPodStatusFromProvider.Run(ctx, podSyncWorkers)
})
defer group.Wait()
log.G(ctx).Info("started workers")
close(pc.ready)
log.G(ctx).Info("started workers")
<-ctx.Done()
log.G(ctx).Info("shutting down workers")
pc.k8sQ.ShutDown()
pc.podStatusQ.ShutDown()
pc.deletionQ.ShutDown()
wg.Wait()
return nil
}
@@ -426,28 +406,9 @@ func (pc *PodController) Err() error {
return pc.err
}
// runSyncPodsFromKubernetesWorker is a long-running function that will continually call the processNextWorkItem function
// in order to read and process an item on the work queue that is generated by the pod informer.
func (pc *PodController) runSyncPodsFromKubernetesWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processNextWorkItem(ctx, workerID, q) {
}
}
// processNextWorkItem will read a single work item off the work queue and attempt to process it,by calling the syncHandler.
func (pc *PodController) processNextWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processNextWorkItem")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerId", workerID)
return handleQueueItem(ctx, q, pc.syncHandler)
}
// syncHandler compares the actual state with the desired, and attempts to converge the two.
func (pc *PodController) syncHandler(ctx context.Context, key string) error {
ctx, span := trace.StartSpan(ctx, "syncHandler")
// syncPodFromKubernetesHandler compares the actual state with the desired, and attempts to converge the two.
func (pc *PodController) syncPodFromKubernetesHandler(ctx context.Context, key string) error {
ctx, span := trace.StartSpan(ctx, "syncPodFromKubernetesHandler")
defer span.End()
// Add the current key as an attribute to the current span.
@@ -511,7 +472,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod,
// more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760
if pod.DeletionTimestamp != nil && !running(&pod.Status) {
log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running")
pc.deletionQ.Add(key)
pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(key)
return nil
}
obj, ok := pc.knownPods.Load(key)
@@ -547,7 +508,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod,
return err
}
pc.deletionQ.AddAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
pc.deletePodsFromKubernetes.EnqueueAfter(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds))
return nil
}
@@ -566,25 +527,6 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod,
return nil
}
// runDeletionReconcilationWorker is a long-running function that will continually call the processDeletionReconcilationWorkItem
// function in order to read and process an item on the work queue that is generated by the pod informer.
func (pc *PodController) runDeletionReconcilationWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processDeletionReconcilationWorkItem(ctx, workerID, q) {
}
}
// processDeletionReconcilationWorkItem will read a single work item off the work queue and attempt to process it,by calling the deletionReconcilation.
func (pc *PodController) processDeletionReconcilationWorkItem(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
// We create a span only after popping from the queue so that we can get an adequate picture of how long it took to process the item.
ctx, span := trace.StartSpan(ctx, "processDeletionReconcilationWorkItem")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerId", workerID)
return handleQueueItem(ctx, q, pc.deletePodHandler)
}
// deleteDanglingPods checks whether the provider knows about any pods which Kubernetes doesn't know about, and deletes them.
func (pc *PodController) deleteDanglingPods(ctx context.Context, threadiness int) {
ctx, span := trace.StartSpan(ctx, "deleteDanglingPods")

View File

@@ -1,105 +0,0 @@
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package node
import (
"context"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"k8s.io/client-go/util/workqueue"
)
const (
// maxRetries is the number of times we try to process a given key before permanently forgetting it.
maxRetries = 20
)
type queueHandler func(ctx context.Context, key string) error
func handleQueueItem(ctx context.Context, q workqueue.RateLimitingInterface, handler queueHandler) bool {
ctx, span := trace.StartSpan(ctx, "handleQueueItem")
defer span.End()
obj, shutdown := q.Get()
if shutdown {
return false
}
log.G(ctx).Debug("Got queue object")
err := func(obj interface{}) error {
defer log.G(ctx).Debug("Processed queue item")
// We call Done here so the work queue knows we have finished processing this item.
// We also must remember to call Forget if we do not want this work item being re-queued.
// For example, we do not call Forget if a transient error occurs.
// Instead, the item is put back on the work queue and attempted again after a back-off period.
defer q.Done(obj)
var key string
var ok bool
// We expect strings to come off the work queue.
// These are of the form namespace/name.
// We do this as the delayed nature of the work queue means the items in the informer cache may actually be more up to date that when the item was initially put onto the workqueue.
if key, ok = obj.(string); !ok {
// As the item in the work queue is actually invalid, we call Forget here else we'd go into a loop of attempting to process a work item that is invalid.
q.Forget(obj)
log.G(ctx).Warnf("expected string in work queue item but got %#v", obj)
return nil
}
// Add the current key as an attribute to the current span.
ctx = span.WithField(ctx, "key", key)
// Run the syncHandler, passing it the namespace/name string of the Pod resource to be synced.
if err := handler(ctx, key); err != nil {
if q.NumRequeues(key) < maxRetries {
// Put the item back on the work queue to handle any transient errors.
log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", key)
q.AddRateLimited(key)
return nil
}
// We've exceeded the maximum retries, so we must forget the key.
q.Forget(key)
return pkgerrors.Wrapf(err, "forgetting %q due to maximum retries reached", key)
}
// Finally, if no error occurs we Forget this item so it does not get queued again until another change happens.
q.Forget(obj)
return nil
}(obj)
if err != nil {
// We've actually hit an error, so we set the span's status based on the error.
span.SetStatus(err)
log.G(ctx).Error(err)
return true
}
return true
}
func (pc *PodController) runSyncPodStatusFromProviderWorker(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) {
for pc.processPodStatusUpdate(ctx, workerID, q) {
}
}
func (pc *PodController) processPodStatusUpdate(ctx context.Context, workerID string, q workqueue.RateLimitingInterface) bool {
ctx, span := trace.StartSpan(ctx, "processPodStatusUpdate")
defer span.End()
// Add the ID of the current worker as an attribute to the current span.
ctx = span.WithField(ctx, "workerID", workerID)
return handleQueueItem(ctx, q, pc.podStatusHandler)
}