dep ensure -update k8s.io/client-go (#197)

This commit is contained in:
Kit Ewbank
2018-05-18 19:24:19 -04:00
committed by Robbie Zhang
parent 8068f3cac8
commit b4cb809968
774 changed files with 7107 additions and 15817 deletions

View File

@@ -10,8 +10,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["ring_growing_test.go"],
importpath = "k8s.io/client-go/util/buffer",
library = ":go_default_library",
embed = [":go_default_library"],
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
)

View File

@@ -13,8 +13,7 @@ go_test(
"pem_test.go",
],
data = glob(["testdata/**"]),
importpath = "k8s.io/client-go/util/cert",
library = ":go_default_library",
embed = [":go_default_library"],
)
go_library(

View File

@@ -38,7 +38,7 @@ const (
duration365d = time.Hour * 24 * 365
)
// Config containes the basic fields required for creating a certificate
// Config contains the basic fields required for creating a certificate
type Config struct {
CommonName string
Organization []string
@@ -138,23 +138,50 @@ func MakeEllipticPrivateKeyPEM() ([]byte, error) {
// Host may be an IP or a DNS name
// You may also specify additional subject alt names (either ip or dns names) for the certificate
func GenerateSelfSignedCertKey(host string, alternateIPs []net.IP, alternateDNS []string) ([]byte, []byte, error) {
caKey, err := rsa.GenerateKey(cryptorand.Reader, 2048)
if err != nil {
return nil, nil, err
}
caTemplate := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s-ca@%d", host, time.Now().Unix()),
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
IsCA: true,
}
caDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &caTemplate, &caTemplate, &caKey.PublicKey, caKey)
if err != nil {
return nil, nil, err
}
caCertificate, err := x509.ParseCertificate(caDERBytes)
if err != nil {
return nil, nil, err
}
priv, err := rsa.GenerateKey(cryptorand.Reader, 2048)
if err != nil {
return nil, nil, err
}
template := x509.Certificate{
SerialNumber: big.NewInt(1),
SerialNumber: big.NewInt(2),
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true,
IsCA: true,
}
if ip := net.ParseIP(host); ip != nil {
@@ -166,16 +193,19 @@ func GenerateSelfSignedCertKey(host string, alternateIPs []net.IP, alternateDNS
template.IPAddresses = append(template.IPAddresses, alternateIPs...)
template.DNSNames = append(template.DNSNames, alternateDNS...)
derBytes, err := x509.CreateCertificate(cryptorand.Reader, &template, &template, &priv.PublicKey, priv)
derBytes, err := x509.CreateCertificate(cryptorand.Reader, &template, caCertificate, &priv.PublicKey, caKey)
if err != nil {
return nil, nil, err
}
// Generate cert
// Generate cert, followed by ca
certBuffer := bytes.Buffer{}
if err := pem.Encode(&certBuffer, &pem.Block{Type: CertificateBlockType, Bytes: derBytes}); err != nil {
return nil, nil, err
}
if err := pem.Encode(&certBuffer, &pem.Block{Type: CertificateBlockType, Bytes: caDERBytes}); err != nil {
return nil, nil, err
}
// Generate key
keyBuffer := bytes.Buffer{}

View File

@@ -14,8 +14,7 @@ go_test(
"certificate_manager_test.go",
"certificate_store_test.go",
],
importpath = "k8s.io/client-go/util/certificate",
library = ":go_default_library",
embed = [":go_default_library"],
tags = ["automanaged"],
deps = [
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",

View File

@@ -141,7 +141,6 @@ type manager struct {
certStore Store
certAccessLock sync.RWMutex
cert *tls.Certificate
rotationDeadline time.Time
forceRotation bool
certificateExpiration Gauge
serverHealth bool
@@ -208,8 +207,7 @@ func (m *manager) SetCertificateSigningRequestClient(certSigningRequestClient ce
func (m *manager) Start() {
// Certificate rotation depends on access to the API server certificate
// signing API, so don't start the certificate manager if we don't have a
// client. This will happen on the cluster master, where the kubelet is
// responsible for bootstrapping the pods of the master components.
// client.
if m.certSigningRequestClient == nil {
glog.V(2).Infof("Certificate rotation is not enabled, no connection to the apiserver.")
return
@@ -217,27 +215,18 @@ func (m *manager) Start() {
glog.V(2).Infof("Certificate rotation is enabled.")
m.setRotationDeadline()
// Synchronously request a certificate before entering the background
// loop to allow bootstrap scenarios, where the certificate manager
// doesn't have a certificate at all yet.
if m.shouldRotate() {
glog.V(1).Infof("shouldRotate() is true, forcing immediate rotation")
if _, err := m.rotateCerts(); err != nil {
utilruntime.HandleError(fmt.Errorf("Could not rotate certificates: %v", err))
}
}
backoff := wait.Backoff{
Duration: 2 * time.Second,
Factor: 2,
Jitter: 0.1,
Steps: 5,
}
go wait.Forever(func() {
sleepInterval := m.rotationDeadline.Sub(time.Now())
glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval)
time.Sleep(sleepInterval)
deadline := m.nextRotationDeadline()
if sleepInterval := deadline.Sub(time.Now()); sleepInterval > 0 {
glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval)
time.Sleep(sleepInterval)
}
backoff := wait.Backoff{
Duration: 2 * time.Second,
Factor: 2,
Jitter: 0.1,
Steps: 5,
}
if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil {
utilruntime.HandleError(fmt.Errorf("Reached backoff limit, still unable to rotate certs: %v", err))
wait.PollInfinite(32*time.Second, m.rotateCerts)
@@ -252,11 +241,14 @@ func getCurrentCertificateOrBootstrap(
currentCert, err := store.Current()
if err == nil {
return currentCert, false, nil
}
if _, ok := err.(*NoCertKeyError); !ok {
return nil, false, err
// if the current cert is expired, fall back to the bootstrap cert
if currentCert.Leaf != nil && time.Now().Before(currentCert.Leaf.NotAfter) {
return currentCert, false, nil
}
} else {
if _, ok := err.(*NoCertKeyError); !ok {
return nil, false, err
}
}
if bootstrapCertificatePEM == nil || bootstrapKeyPEM == nil {
@@ -276,21 +268,14 @@ func getCurrentCertificateOrBootstrap(
return nil, false, fmt.Errorf("unable to parse certificate data: %v", err)
}
bootstrapCert.Leaf = certs[0]
return &bootstrapCert, true, nil
}
// shouldRotate looks at how close the current certificate is to expiring and
// decides if it is time to rotate or not.
func (m *manager) shouldRotate() bool {
m.certAccessLock.RLock()
defer m.certAccessLock.RUnlock()
if m.cert == nil {
return true
if _, err := store.Update(bootstrapCertificatePEM, bootstrapKeyPEM); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to set the cert/key pair to the bootstrap certificate: %v", err))
} else {
glog.V(4).Infof("Updated the store to contain the initial bootstrap certificate")
}
if m.forceRotation {
return true
}
return time.Now().After(m.rotationDeadline)
return &bootstrapCert, true, nil
}
// rotateCerts attempts to request a client cert from the server, wait a reasonable
@@ -315,7 +300,7 @@ func (m *manager) rotateCerts() (bool, error) {
return false, m.updateServerError(err)
}
// Wait for the certificate to be signed. Instead of one long watch, we retry with slighly longer
// Wait for the certificate to be signed. Instead of one long watch, we retry with slightly longer
// intervals each time in order to tolerate failures from the server AND to preserve the liveliness
// of the cert manager loop. This creates slightly more traffic against the API server in return
// for bounding the amount of time we wait when a certificate expires.
@@ -349,30 +334,34 @@ func (m *manager) rotateCerts() (bool, error) {
}
m.updateCached(cert)
m.setRotationDeadline()
m.forceRotation = false
return true, nil
}
// setRotationDeadline sets a cached value for the threshold at which the
// nextRotationDeadline returns a value for the threshold at which the
// current certificate should be rotated, 80%+/-10% of the expiration of the
// certificate.
func (m *manager) setRotationDeadline() {
func (m *manager) nextRotationDeadline() time.Time {
// forceRotation is not protected by locks
if m.forceRotation {
m.forceRotation = false
return time.Now()
}
m.certAccessLock.RLock()
defer m.certAccessLock.RUnlock()
if m.cert == nil {
m.rotationDeadline = time.Now()
return
return time.Now()
}
notAfter := m.cert.Leaf.NotAfter
totalDuration := float64(notAfter.Sub(m.cert.Leaf.NotBefore))
deadline := m.cert.Leaf.NotBefore.Add(jitteryDuration(totalDuration))
m.rotationDeadline = m.cert.Leaf.NotBefore.Add(jitteryDuration(totalDuration))
glog.V(2).Infof("Certificate expiration is %v, rotation deadline is %v", notAfter, m.rotationDeadline)
glog.V(2).Infof("Certificate expiration is %v, rotation deadline is %v", notAfter, deadline)
if m.certificateExpiration != nil {
m.certificateExpiration.Set(float64(notAfter.Unix()))
}
return deadline
}
// jitteryDuration uses some jitter to set the rotation threshold so each node

View File

@@ -146,46 +146,6 @@ func TestNewManagerNoRotation(t *testing.T) {
}
}
func TestShouldRotate(t *testing.T) {
now := time.Now()
tests := []struct {
name string
notBefore time.Time
notAfter time.Time
shouldRotate bool
}{
{"just issued, still good", now.Add(-1 * time.Hour), now.Add(99 * time.Hour), false},
{"half way expired, still good", now.Add(-24 * time.Hour), now.Add(24 * time.Hour), false},
{"mostly expired, still good", now.Add(-69 * time.Hour), now.Add(31 * time.Hour), false},
{"just about expired, should rotate", now.Add(-91 * time.Hour), now.Add(9 * time.Hour), true},
{"nearly expired, should rotate", now.Add(-99 * time.Hour), now.Add(1 * time.Hour), true},
{"already expired, should rotate", now.Add(-10 * time.Hour), now.Add(-1 * time.Hour), true},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
m := manager{
cert: &tls.Certificate{
Leaf: &x509.Certificate{
NotBefore: test.notBefore,
NotAfter: test.notAfter,
},
},
template: &x509.CertificateRequest{},
usages: []certificates.KeyUsage{},
}
m.setRotationDeadline()
if m.shouldRotate() != test.shouldRotate {
t.Errorf("Time %v, a certificate issued for (%v, %v) should rotate should be %t.",
now,
m.cert.Leaf.NotBefore,
m.cert.Leaf.NotAfter,
test.shouldRotate)
}
})
}
}
type gaugeMock struct {
calls int
lastValue float64
@@ -233,20 +193,20 @@ func TestSetRotationDeadline(t *testing.T) {
jitteryDuration = func(float64) time.Duration { return time.Duration(float64(tc.notAfter.Sub(tc.notBefore)) * 0.7) }
lowerBound := tc.notBefore.Add(time.Duration(float64(tc.notAfter.Sub(tc.notBefore)) * 0.7))
m.setRotationDeadline()
deadline := m.nextRotationDeadline()
if !m.rotationDeadline.Equal(lowerBound) {
if !deadline.Equal(lowerBound) {
t.Errorf("For notBefore %v, notAfter %v, the rotationDeadline %v should be %v.",
tc.notBefore,
tc.notAfter,
m.rotationDeadline,
deadline,
lowerBound)
}
if g.calls != 1 {
t.Errorf("%d metrics were recorded, wanted %d", g.calls, 1)
}
if g.lastValue != float64(tc.notAfter.Unix()) {
t.Errorf("%d value for metric was recorded, wanted %d", g.lastValue, tc.notAfter.Unix())
t.Errorf("%f value for metric was recorded, wanted %d", g.lastValue, tc.notAfter.Unix())
}
})
}
@@ -321,7 +281,7 @@ func TestNewManagerBootstrap(t *testing.T) {
}
if m, ok := cm.(*manager); !ok {
t.Errorf("Expected a '*manager' from 'NewManager'")
} else if !m.shouldRotate() {
} else if !m.forceRotation {
t.Errorf("Expected rotation should happen during bootstrap, but it won't.")
}
}
@@ -360,9 +320,8 @@ func TestNewManagerNoBootstrap(t *testing.T) {
if m, ok := cm.(*manager); !ok {
t.Errorf("Expected a '*manager' from 'NewManager'")
} else {
m.setRotationDeadline()
if m.shouldRotate() {
t.Errorf("Expected rotation should happen during bootstrap, but it won't.")
if m.forceRotation {
t.Errorf("Expected rotation should not happen during bootstrap, but it won't.")
}
}
}
@@ -515,8 +474,7 @@ func TestInitializeCertificateSigningRequestClient(t *testing.T) {
if m, ok := certificateManager.(*manager); !ok {
t.Errorf("Expected a '*manager' from 'NewManager'")
} else {
m.setRotationDeadline()
if m.shouldRotate() {
if m.forceRotation {
if success, err := m.rotateCerts(); !success {
t.Errorf("Got failure from 'rotateCerts', wanted success.")
} else if err != nil {
@@ -614,8 +572,7 @@ func TestInitializeOtherRESTClients(t *testing.T) {
if m, ok := certificateManager.(*manager); !ok {
t.Errorf("Expected a '*manager' from 'NewManager'")
} else {
m.setRotationDeadline()
if m.shouldRotate() {
if m.forceRotation {
success, err := certificateManager.(*manager).rotateCerts()
if err != nil {
t.Errorf("Got error %v, expected none.", err)

View File

@@ -42,8 +42,7 @@ filegroup(
go_test(
name = "go_default_test",
srcs = ["csr_test.go"],
importpath = "k8s.io/client-go/util/certificate/csr",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//vendor/k8s.io/api/certificates/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@@ -12,8 +12,7 @@ go_test(
"backoff_test.go",
"throttle_test.go",
],
importpath = "k8s.io/client-go/util/flowcontrol",
library = ":go_default_library",
embed = [":go_default_library"],
deps = ["//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library"],
)
@@ -25,7 +24,7 @@ go_library(
],
importpath = "k8s.io/client-go/util/flowcontrol",
deps = [
"//vendor/github.com/juju/ratelimit:go_default_library",
"//vendor/golang.org/x/time/rate:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
],

View File

@@ -18,8 +18,9 @@ package flowcontrol
import (
"sync"
"time"
"github.com/juju/ratelimit"
"golang.org/x/time/rate"
)
type RateLimiter interface {
@@ -30,17 +31,13 @@ type RateLimiter interface {
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
// Saturation returns a percentage number which describes how saturated
// this rate limiter is.
// Usually we use token bucket rate limiter. In that case,
// 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
Saturation() float64
// QPS returns QPS of this rate limiter
QPS() float32
}
type tokenBucketRateLimiter struct {
limiter *ratelimit.Bucket
limiter *rate.Limiter
clock Clock
qps float32
}
@@ -50,42 +47,48 @@ type tokenBucketRateLimiter struct {
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
return newTokenBucketRateLimiter(limiter, qps)
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, realClock{}, qps)
}
// An injectable, mockable clock interface.
type Clock interface {
ratelimit.Clock
Now() time.Time
Sleep(time.Duration)
}
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
}
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
// but allows an injectable clock, for testing.
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter {
limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock)
return newTokenBucketRateLimiter(limiter, qps)
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, c, qps)
}
func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter {
func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
return &tokenBucketRateLimiter{
limiter: limiter,
clock: c,
qps: qps,
}
}
func (t *tokenBucketRateLimiter) TryAccept() bool {
return t.limiter.TakeAvailable(1) == 1
}
func (t *tokenBucketRateLimiter) Saturation() float64 {
capacity := t.limiter.Capacity()
avail := t.limiter.Available()
return float64(capacity-avail) / float64(capacity)
return t.limiter.AllowN(t.clock.Now(), 1)
}
// Accept will block until a token becomes available
func (t *tokenBucketRateLimiter) Accept() {
t.limiter.Wait(1)
now := t.clock.Now()
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
}
func (t *tokenBucketRateLimiter) Stop() {
@@ -105,10 +108,6 @@ func (t *fakeAlwaysRateLimiter) TryAccept() bool {
return true
}
func (t *fakeAlwaysRateLimiter) Saturation() float64 {
return 0
}
func (t *fakeAlwaysRateLimiter) Stop() {}
func (t *fakeAlwaysRateLimiter) Accept() {}
@@ -131,10 +130,6 @@ func (t *fakeNeverRateLimiter) TryAccept() bool {
return false
}
func (t *fakeNeverRateLimiter) Saturation() float64 {
return 1
}
func (t *fakeNeverRateLimiter) Stop() {
t.wg.Done()
}

View File

@@ -17,7 +17,6 @@ limitations under the License.
package flowcontrol
import (
"math"
"sync"
"testing"
"time"
@@ -116,29 +115,6 @@ func TestThrottle(t *testing.T) {
}
}
func TestRateLimiterSaturation(t *testing.T) {
const e = 0.000001
tests := []struct {
capacity int
take int
expectedSaturation float64
}{
{1, 1, 1},
{10, 3, 0.3},
}
for i, tt := range tests {
rl := NewTokenBucketRateLimiter(1, tt.capacity)
for i := 0; i < tt.take; i++ {
rl.Accept()
}
if math.Abs(rl.Saturation()-tt.expectedSaturation) > e {
t.Fatalf("#%d: Saturation rate difference isn't within tolerable range\n want=%f, get=%f",
i, tt.expectedSaturation, rl.Saturation())
}
}
}
func TestAlwaysFake(t *testing.T) {
rl := NewFakeAlwaysRateLimiter()
if !rl.TryAccept() {

View File

@@ -9,8 +9,7 @@ load(
go_test(
name = "go_default_test",
srcs = ["integer_test.go"],
importpath = "k8s.io/client-go/util/integer",
library = ":go_default_library",
embed = [":go_default_library"],
)
go_library(

View File

@@ -12,8 +12,7 @@ go_test(
"jsonpath_test.go",
"parser_test.go",
],
importpath = "k8s.io/client-go/util/jsonpath",
library = ":go_default_library",
embed = [":go_default_library"],
)
go_library(

View File

@@ -190,7 +190,7 @@ func TestStructInput(t *testing.T) {
{"nonexistent field", "{.hello}", storeData, "hello is not found", false},
{"invalid array", "{.Labels[0]}", storeData, "map[string]int is not array or slice", false},
{"invalid filter operator", "{.Book[?(@.Price<>10)]}", storeData, "unrecognized filter operator <>", false},
{"redundent end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end", false},
{"redundant end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end", false},
}
testFailJSONPath(failStoreTests, t)
}

View File

@@ -19,8 +19,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["util_test.go"],
importpath = "k8s.io/client-go/util/retry",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@@ -9,8 +9,7 @@ load(
go_test(
name = "go_default_test",
srcs = ["fake_handler_test.go"],
importpath = "k8s.io/client-go/util/testing",
library = ":go_default_library",
embed = [":go_default_library"],
)
go_library(

View File

@@ -13,8 +13,7 @@ go_test(
"delaying_queue_test.go",
"rate_limitting_queue_test.go",
],
importpath = "k8s.io/client-go/util/workqueue",
library = ":go_default_library",
embed = [":go_default_library"],
deps = [
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
@@ -34,7 +33,7 @@ go_library(
],
importpath = "k8s.io/client-go/util/workqueue",
deps = [
"//vendor/github.com/juju/ratelimit:go_default_library",
"//vendor/golang.org/x/time/rate:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
@@ -43,7 +42,6 @@ go_library(
go_test(
name = "go_default_xtest",
srcs = ["queue_test.go"],
importpath = "k8s.io/client-go/util/workqueue_test",
deps = ["//vendor/k8s.io/client-go/util/workqueue:go_default_library"],
)

View File

@@ -21,7 +21,7 @@ import (
"sync"
"time"
"github.com/juju/ratelimit"
"golang.org/x/time/rate"
)
type RateLimiter interface {
@@ -40,19 +40,19 @@ func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))},
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
*ratelimit.Bucket
*rate.Limiter
}
var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Bucket.Take(1)
return r.Limiter.Reserve().Delay()
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {

View File

@@ -89,7 +89,7 @@ type waitFor struct {
// waitForPriorityQueue implements a priority queue for waitFor items.
//
// waitForPriorityQueue implements heap.Interface. The item occuring next in
// waitForPriorityQueue implements heap.Interface. The item occurring next in
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
// Peek returns this minimum item at index 0. Pop returns the minimum item after
// it has been removed from the queue and placed at index Len()-1 by