k8s: adopt v1.31.4

And bump a bunch of dependencies.
This commit is contained in:
tskillian
2025-01-07 09:28:10 -06:00
committed by Pires
parent 78b11dfcbb
commit fafac75ae7
9 changed files with 245 additions and 255 deletions

View File

@@ -51,7 +51,7 @@ type Queue struct {
name string
handler ItemHandler
ratelimiter workqueue.RateLimiter
ratelimiter workqueue.TypedRateLimiter[any]
// items are items that are marked dirty waiting for processing.
items *list.List
// itemInQueue is a map of (string) key -> item while it is in the items list
@@ -90,7 +90,7 @@ func (item *queueItem) String() string {
//
// It expects to get a item rate limiter, and a friendly name which is used in logs, and in the internal kubernetes
// metrics. If retryFunc is nil, the default retry function.
func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler, retryFunc ShouldRetryFunc) *Queue {
func New(ratelimiter workqueue.TypedRateLimiter[any], name string, handler ItemHandler, retryFunc ShouldRetryFunc) *Queue {
if retryFunc == nil {
retryFunc = DefaultRetryFunc
}

View File

@@ -35,10 +35,10 @@ func TestQueueMaxRetries(t *testing.T) {
n++
return knownErr
}
wq := New(workqueue.NewMaxOfRateLimiter(
wq := New(workqueue.NewTypedMaxOfRateLimiter[any](
// The default upper bound is 1000 seconds. Let's not use that.
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 10*time.Millisecond),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 10*time.Millisecond),
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), t.Name(), handler, nil)
wq.Enqueue(context.TODO(), "test")
@@ -77,7 +77,7 @@ func TestQueueCustomRetries(t *testing.T) {
return sleepTime, retErr
}
wq := New(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1000), 1000)}, t.Name(), handler, shouldRetryFunc)
wq := New(&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1000), 1000)}, t.Name(), handler, shouldRetryFunc)
timeTaken := func(key string) time.Duration {
start := time.Now()
@@ -106,7 +106,7 @@ func TestForget(t *testing.T) {
handler := func(ctx context.Context, key string) error {
panic("Should never be called")
}
wq := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), handler, nil)
wq := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), handler, nil)
wq.Forget(context.TODO(), "val")
assert.Assert(t, is.Equal(0, wq.Len()))
@@ -121,7 +121,7 @@ func TestQueueEmpty(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
return nil
}, nil)
@@ -136,7 +136,7 @@ func TestQueueItemNoSleep(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
return nil
}, nil)
@@ -160,7 +160,7 @@ func TestQueueItemSleep(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
return nil
}, nil)
q.lock.Lock()
@@ -179,7 +179,7 @@ func TestQueueBackgroundAdd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
return nil
}, nil)
start := time.Now()
@@ -201,7 +201,7 @@ func TestQueueBackgroundAdvance(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
return nil
}, nil)
start := time.Now()
@@ -230,7 +230,7 @@ func TestQueueRedirty(t *testing.T) {
var times int64
var q *Queue
q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q = New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
assert.Assert(t, is.Equal(key, "foo"))
if atomic.AddInt64(&times, 1) == 1 {
q.EnqueueWithoutRateLimit(context.TODO(), "foo")
@@ -256,7 +256,7 @@ func TestHeapConcurrency(t *testing.T) {
start := time.Now()
seen := sync.Map{}
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
seen.Store(key, struct{}{})
time.Sleep(time.Second)
return nil
@@ -291,7 +291,7 @@ func checkConsistency(t *testing.T, q *Queue) {
}
func TestHeapOrder(t *testing.T) {
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
return nil
}, nil)
q.clock = nonmovingClock{}
@@ -317,7 +317,7 @@ func TestHeapOrder(t *testing.T) {
type rateLimitWrapper struct {
addedMap sync.Map
forgottenMap sync.Map
rl workqueue.RateLimiter
rl workqueue.TypedRateLimiter[any]
}
func (r *rateLimitWrapper) When(item interface{}) time.Duration {
@@ -356,7 +356,7 @@ func TestRateLimiter(t *testing.T) {
start := time.Now()
ratelimiter := &rateLimitWrapper{
rl: workqueue.NewItemFastSlowRateLimiter(1*time.Millisecond, 100*time.Millisecond, 1),
rl: workqueue.NewTypedItemFastSlowRateLimiter[any](1*time.Millisecond, 100*time.Millisecond, 1),
}
q := New(ratelimiter, t.Name(), func(ctx context.Context, key string) error {
@@ -421,7 +421,7 @@ func TestQueueForgetInProgress(t *testing.T) {
var times int64
var q *Queue
q = New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q = New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
assert.Assert(t, is.Equal(key, "foo"))
atomic.AddInt64(&times, 1)
q.Forget(context.TODO(), key)
@@ -441,7 +441,7 @@ func TestQueueForgetBeforeStart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
panic("shouldn't be called")
}, nil)
@@ -458,7 +458,7 @@ func TestQueueMoveItem(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q := New(workqueue.DefaultItemBasedRateLimiter(), t.Name(), func(ctx context.Context, key string) error {
q := New(workqueue.DefaultTypedItemBasedRateLimiter[any](), t.Name(), func(ctx context.Context, key string) error {
panic("shouldn't be called")
}, nil)
q.clock = nonmovingClock{}