From 3a361ebabd4a37ee3438136cd73958855235ac22 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Wed, 3 Feb 2021 09:42:45 -0800 Subject: [PATCH] queue: Add tracing This adds tracing throughout the queues, so we can determine what's going on. --- internal/queue/queue.go | 48 ++++++++++++++++++++------ internal/queue/queue_test.go | 66 +++++++++++++++++++----------------- node/pod.go | 2 +- node/podcontroller.go | 32 +++++++++++++---- 4 files changed, 98 insertions(+), 50 deletions(-) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index edfe46f90..1b6a5e136 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -100,39 +100,63 @@ func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler) *Q } // Enqueue enqueues the key in a rate limited fashion -func (q *Queue) Enqueue(key string) { +func (q *Queue) Enqueue(ctx context.Context, key string) { q.lock.Lock() defer q.lock.Unlock() - q.insert(key, true, 0) + q.insert(ctx, key, true, 0) } // EnqueueWithoutRateLimit enqueues the key without a rate limit -func (q *Queue) EnqueueWithoutRateLimit(key string) { +func (q *Queue) EnqueueWithoutRateLimit(ctx context.Context, key string) { q.lock.Lock() defer q.lock.Unlock() - q.insert(key, false, 0) + q.insert(ctx, key, false, 0) } // Forget forgets the key -func (q *Queue) Forget(key string) { +func (q *Queue) Forget(ctx context.Context, key string) { q.lock.Lock() defer q.lock.Unlock() + ctx, span := trace.StartSpan(ctx, "Forget") + defer span.End() + + ctx = span.WithFields(ctx, map[string]interface{}{ + "queue": q.name, + "key": key, + }) if item, ok := q.itemsInQueue[key]; ok { + span.WithField(ctx, "status", "itemInQueue") delete(q.itemsInQueue, key) q.items.Remove(item) + return } if qi, ok := q.itemsBeingProcessed[key]; ok { + span.WithField(ctx, "status", "itemBeingProcessed") qi.forget = true + return } + span.WithField(ctx, "status", "notfound") } // insert inserts a new item to be processed at time time. It will not further delay items if when is later than the // original time the item was scheduled to be processed. If when is earlier, it will "bring it forward" -func (q *Queue) insert(key string, ratelimit bool, delay time.Duration) *queueItem { +func (q *Queue) insert(ctx context.Context, key string, ratelimit bool, delay time.Duration) *queueItem { + ctx, span := trace.StartSpan(ctx, "insert") + defer span.End() + + ctx = span.WithFields(ctx, map[string]interface{}{ + "queue": q.name, + "key": key, + "ratelimit": ratelimit, + }) + if delay > 0 { + ctx = span.WithField(ctx, "delay", delay.String()) + } + defer func() { select { case q.wakeupCh <- struct{}{}: @@ -142,6 +166,7 @@ func (q *Queue) insert(key string, ratelimit bool, delay time.Duration) *queueIt // First see if the item is already being processed if item, ok := q.itemsBeingProcessed[key]; ok { + span.WithField(ctx, "status", "itemsBeingProcessed") when := q.clock.Now().Add(delay) // Is the item already been redirtied? if item.redirtiedAt.IsZero() { @@ -157,12 +182,14 @@ func (q *Queue) insert(key string, ratelimit bool, delay time.Duration) *queueIt // Is the item already in the queue? if item, ok := q.itemsInQueue[key]; ok { + span.WithField(ctx, "status", "itemsInQueue") qi := item.Value.(*queueItem) when := q.clock.Now().Add(delay) q.adjustPosition(qi, item, when) return qi } + span.WithField(ctx, "status", "added") now := q.clock.Now() val := &queueItem{ key: key, @@ -175,6 +202,7 @@ func (q *Queue) insert(key string, ratelimit bool, delay time.Duration) *queueIt panic("Non-zero delay with rate limiting not supported") } ratelimitDelay := q.ratelimiter.When(key) + span.WithField(ctx, "delay", ratelimitDelay.String()) val.plannedToStartWorkAt = val.plannedToStartWorkAt.Add(ratelimitDelay) val.delayedViaRateLimit = &ratelimitDelay } else { @@ -213,10 +241,10 @@ func (q *Queue) adjustPosition(qi *queueItem, element *list.Element, when time.T } // EnqueueWithoutRateLimitWithDelay enqueues without rate limiting, but work will not start for this given delay period -func (q *Queue) EnqueueWithoutRateLimitWithDelay(key string, after time.Duration) { +func (q *Queue) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key string, after time.Duration) { q.lock.Lock() defer q.lock.Unlock() - q.insert(key, false, after) + q.insert(ctx, key, false, after) } // Empty returns if the queue has no items in it @@ -398,7 +426,7 @@ func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error if qi.requeues+1 < MaxRetries { // Put the item back on the work Queue to handle any transient errors. log.G(ctx).WithError(err).Warnf("requeuing %q due to failed sync", qi.key) - newQI := q.insert(qi.key, true, 0) + newQI := q.insert(ctx, qi.key, true, 0) newQI.requeues = qi.requeues + 1 newQI.originallyAdded = qi.originallyAdded @@ -410,7 +438,7 @@ func (q *Queue) handleQueueItemObject(ctx context.Context, qi *queueItem) error // We've exceeded the maximum retries or we were successful. q.ratelimiter.Forget(qi.key) if !qi.redirtiedAt.IsZero() { - newQI := q.insert(qi.key, qi.redirtiedWithRatelimit, time.Until(qi.redirtiedAt)) + newQI := q.insert(ctx, qi.key, qi.redirtiedWithRatelimit, time.Until(qi.redirtiedAt)) newQI.addedViaRedirty = true } diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index c2897799a..cf6be8870 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -36,7 +36,7 @@ func TestQueueMaxRetries(t *testing.T) { workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond), &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ), t.Name(), handler) - wq.Enqueue("test") + wq.Enqueue(context.TODO(), "test") for n < MaxRetries { assert.Assert(t, wq.handleQueueItem(ctx)) @@ -53,11 +53,11 @@ func TestForget(t *testing.T) { } wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler) - wq.Forget("val") + wq.Forget(context.TODO(), "val") assert.Assert(t, is.Equal(0, wq.Len())) v := "test" - wq.EnqueueWithoutRateLimit(v) + wq.EnqueueWithoutRateLimit(context.TODO(), v) assert.Assert(t, is.Equal(1, wq.Len())) } @@ -86,8 +86,8 @@ func TestQueueItemNoSleep(t *testing.T) { }) q.lock.Lock() - q.insert("foo", false, -1*time.Hour) - q.insert("bar", false, -1*time.Hour) + q.insert(ctx, "foo", false, -1*time.Hour) + q.insert(ctx, "bar", false, -1*time.Hour) q.lock.Unlock() item, err := q.getNextItem(ctx) @@ -109,8 +109,8 @@ func TestQueueItemSleep(t *testing.T) { return nil }) q.lock.Lock() - q.insert("foo", false, 100*time.Millisecond) - q.insert("bar", false, 100*time.Millisecond) + q.insert(ctx, "foo", false, 100*time.Millisecond) + q.insert(ctx, "bar", false, 100*time.Millisecond) q.lock.Unlock() item, err := q.getNextItem(ctx) @@ -131,7 +131,7 @@ func TestQueueBackgroundAdd(t *testing.T) { time.AfterFunc(100*time.Millisecond, func() { q.lock.Lock() defer q.lock.Unlock() - q.insert("foo", false, 0) + q.insert(ctx, "foo", false, 0) }) item, err := q.getNextItem(ctx) @@ -151,13 +151,13 @@ func TestQueueBackgroundAdvance(t *testing.T) { }) start := time.Now() q.lock.Lock() - q.insert("foo", false, 10*time.Second) + q.insert(ctx, "foo", false, 10*time.Second) q.lock.Unlock() time.AfterFunc(200*time.Millisecond, func() { q.lock.Lock() defer q.lock.Unlock() - q.insert("foo", false, 0) + q.insert(ctx, "foo", false, 0) }) item, err := q.getNextItem(ctx) @@ -178,14 +178,14 @@ func TestQueueRedirty(t *testing.T) { q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { assert.Assert(t, is.Equal(key, "foo")) if atomic.AddInt64(×, 1) == 1 { - q.EnqueueWithoutRateLimit("foo") + q.EnqueueWithoutRateLimit(context.TODO(), "foo") } else { cancel() } return nil }) - q.EnqueueWithoutRateLimit("foo") + q.EnqueueWithoutRateLimit(context.TODO(), "foo") q.Run(ctx, 1) for !q.Empty() { time.Sleep(100 * time.Millisecond) @@ -207,7 +207,7 @@ func TestHeapConcurrency(t *testing.T) { return nil }) for i := 0; i < 20; i++ { - q.EnqueueWithoutRateLimit(strconv.Itoa(i)) + q.EnqueueWithoutRateLimit(context.TODO(), strconv.Itoa(i)) } assert.Assert(t, q.Len() == 20) @@ -241,20 +241,20 @@ func TestHeapOrder(t *testing.T) { }) q.clock = nonmovingClock{} - q.EnqueueWithoutRateLimitWithDelay("a", 1000) - q.EnqueueWithoutRateLimitWithDelay("b", 2000) - q.EnqueueWithoutRateLimitWithDelay("c", 3000) - q.EnqueueWithoutRateLimitWithDelay("d", 4000) - q.EnqueueWithoutRateLimitWithDelay("e", 5000) + q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "a", 1000) + q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "b", 2000) + q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "c", 3000) + q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "d", 4000) + q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "e", 5000) checkConsistency(t, q) t.Logf("%v", q) - q.EnqueueWithoutRateLimitWithDelay("d", 1000) + q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "d", 1000) checkConsistency(t, q) t.Logf("%v", q) - q.EnqueueWithoutRateLimitWithDelay("c", 1001) + q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "c", 1001) checkConsistency(t, q) t.Logf("%v", q) - q.EnqueueWithoutRateLimitWithDelay("e", 999) + q.EnqueueWithoutRateLimitWithDelay(context.TODO(), "e", 999) checkConsistency(t, q) t.Logf("%v", q) } @@ -316,7 +316,7 @@ func TestRateLimiter(t *testing.T) { enqueued := 0 syncMap.Range(func(key, value interface{}) bool { enqueued++ - q.Enqueue(key.(string)) + q.Enqueue(context.TODO(), key.(string)) return true }) @@ -369,11 +369,11 @@ func TestQueueForgetInProgress(t *testing.T) { q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { assert.Assert(t, is.Equal(key, "foo")) atomic.AddInt64(×, 1) - q.Forget(key) + q.Forget(context.TODO(), key) return errors.New("test") }) - q.EnqueueWithoutRateLimit("foo") + q.EnqueueWithoutRateLimit(context.TODO(), "foo") go q.Run(ctx, 1) for !q.Empty() { time.Sleep(100 * time.Millisecond) @@ -390,8 +390,8 @@ func TestQueueForgetBeforeStart(t *testing.T) { panic("shouldn't be called") }) - q.EnqueueWithoutRateLimit("foo") - q.Forget("foo") + q.EnqueueWithoutRateLimit(context.TODO(), "foo") + q.Forget(context.TODO(), "foo") go q.Run(ctx, 1) for !q.Empty() { time.Sleep(100 * time.Millisecond) @@ -401,26 +401,28 @@ func TestQueueForgetBeforeStart(t *testing.T) { func TestQueueMoveItem(t *testing.T) { t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error { panic("shouldn't be called") }) q.clock = nonmovingClock{} - q.insert("foo", false, 3000) - q.insert("bar", false, 2000) - q.insert("baz", false, 1000) + q.insert(ctx, "foo", false, 3000) + q.insert(ctx, "bar", false, 2000) + q.insert(ctx, "baz", false, 1000) checkConsistency(t, q) t.Log(q) - q.insert("foo", false, 2000) + q.insert(ctx, "foo", false, 2000) checkConsistency(t, q) t.Log(q) - q.insert("foo", false, 1999) + q.insert(ctx, "foo", false, 1999) checkConsistency(t, q) t.Log(q) - q.insert("foo", false, 999) + q.insert(ctx, "foo", false, 999) checkConsistency(t, q) t.Log(q) } diff --git a/node/pod.go b/node/pod.go index 87edb13ec..805504546 100644 --- a/node/pod.go +++ b/node/pod.go @@ -330,7 +330,7 @@ func (pc *PodController) enqueuePodStatusUpdate(ctx context.Context, pod *corev1 } kpod.lastPodStatusReceivedFromProvider = pod kpod.Unlock() - pc.syncPodStatusFromProvider.Enqueue(key) + pc.syncPodStatusFromProvider.Enqueue(ctx, key) } func (pc *PodController) syncPodStatusFromProviderHandler(ctx context.Context, key string) (retErr error) { diff --git a/node/podcontroller.go b/node/podcontroller.go index 8fd7c84c0..9654f9f9d 100644 --- a/node/podcontroller.go +++ b/node/podcontroller.go @@ -302,14 +302,25 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er var eventHandler cache.ResourceEventHandler = cache.ResourceEventHandlerFuncs{ AddFunc: func(pod interface{}) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + ctx, span := trace.StartSpan(ctx, "AddFunc") + defer span.End() + if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.G(ctx).Error(err) } else { + ctx = span.WithField(ctx, "key", key) pc.knownPods.Store(key, &knownPod{}) - pc.syncPodsFromKubernetes.Enqueue(key) + pc.syncPodsFromKubernetes.Enqueue(ctx, key) } }, UpdateFunc: func(oldObj, newObj interface{}) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + ctx, span := trace.StartSpan(ctx, "UpdateFunc") + defer span.End() + // Create a copy of the old and new pod objects so we don't mutate the cache. oldPod := oldObj.(*corev1.Pod) newPod := newObj.(*corev1.Pod) @@ -318,6 +329,7 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er if key, err := cache.MetaNamespaceKeyFunc(newPod); err != nil { log.G(ctx).Error(err) } else { + ctx = span.WithField(ctx, "key", key) obj, ok := pc.knownPods.Load(key) if !ok { // Pods are only ever *added* to knownPods in the above AddFunc, and removed @@ -332,25 +344,31 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er // This means that the pod in API server was changed by someone else [this can be okay], but we skipped // a status update on our side because we compared the status received from the provider to the status // received from the k8s api server based on outdated information. - pc.syncPodStatusFromProvider.Enqueue(key) + pc.syncPodStatusFromProvider.Enqueue(ctx, key) // Reset this to avoid re-adding it continuously kPod.lastPodStatusUpdateSkipped = false } kPod.Unlock() if podShouldEnqueue(oldPod, newPod) { - pc.syncPodsFromKubernetes.Enqueue(key) + pc.syncPodsFromKubernetes.Enqueue(ctx, key) } } }, DeleteFunc: func(pod interface{}) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + ctx, span := trace.StartSpan(ctx, "DeleteFunc") + defer span.End() + if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod); err != nil { log.G(ctx).Error(err) } else { + ctx = span.WithField(ctx, "key", key) pc.knownPods.Delete(key) - pc.syncPodsFromKubernetes.Enqueue(key) + pc.syncPodsFromKubernetes.Enqueue(ctx, key) // If this pod was in the deletion queue, forget about it - pc.deletePodsFromKubernetes.Forget(key) + pc.deletePodsFromKubernetes.Forget(ctx, key) } }, } @@ -482,7 +500,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, // more context is here: https://github.com/virtual-kubelet/virtual-kubelet/pull/760 if pod.DeletionTimestamp != nil && !running(&pod.Status) { log.G(ctx).Debug("Force deleting pod from API Server as it is no longer running") - pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(key) + pc.deletePodsFromKubernetes.EnqueueWithoutRateLimit(ctx, key) return nil } obj, ok := pc.knownPods.Load(key) @@ -518,7 +536,7 @@ func (pc *PodController) syncPodInProvider(ctx context.Context, pod *corev1.Pod, return err } - pc.deletePodsFromKubernetes.EnqueueWithoutRateLimitWithDelay(key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) + pc.deletePodsFromKubernetes.EnqueueWithoutRateLimitWithDelay(ctx, key, time.Second*time.Duration(*pod.DeletionGracePeriodSeconds)) return nil }