aboutsummaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/util/flowcontrol
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/client-go/util/flowcontrol')
-rw-r--r--vendor/k8s.io/client-go/util/flowcontrol/backoff.go149
-rw-r--r--vendor/k8s.io/client-go/util/flowcontrol/throttle.go148
2 files changed, 297 insertions, 0 deletions
diff --git a/vendor/k8s.io/client-go/util/flowcontrol/backoff.go b/vendor/k8s.io/client-go/util/flowcontrol/backoff.go
new file mode 100644
index 000000000..71d442a62
--- /dev/null
+++ b/vendor/k8s.io/client-go/util/flowcontrol/backoff.go
@@ -0,0 +1,149 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package flowcontrol
+
+import (
+ "sync"
+ "time"
+
+ "k8s.io/apimachinery/pkg/util/clock"
+ "k8s.io/client-go/util/integer"
+)
+
+type backoffEntry struct {
+ backoff time.Duration
+ lastUpdate time.Time
+}
+
+type Backoff struct {
+ sync.Mutex
+ Clock clock.Clock
+ defaultDuration time.Duration
+ maxDuration time.Duration
+ perItemBackoff map[string]*backoffEntry
+}
+
+func NewFakeBackOff(initial, max time.Duration, tc *clock.FakeClock) *Backoff {
+ return &Backoff{
+ perItemBackoff: map[string]*backoffEntry{},
+ Clock: tc,
+ defaultDuration: initial,
+ maxDuration: max,
+ }
+}
+
+func NewBackOff(initial, max time.Duration) *Backoff {
+ return &Backoff{
+ perItemBackoff: map[string]*backoffEntry{},
+ Clock: clock.RealClock{},
+ defaultDuration: initial,
+ maxDuration: max,
+ }
+}
+
+// Get the current backoff Duration
+func (p *Backoff) Get(id string) time.Duration {
+ p.Lock()
+ defer p.Unlock()
+ var delay time.Duration
+ entry, ok := p.perItemBackoff[id]
+ if ok {
+ delay = entry.backoff
+ }
+ return delay
+}
+
+// move backoff to the next mark, capping at maxDuration
+func (p *Backoff) Next(id string, eventTime time.Time) {
+ p.Lock()
+ defer p.Unlock()
+ entry, ok := p.perItemBackoff[id]
+ if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
+ entry = p.initEntryUnsafe(id)
+ } else {
+ delay := entry.backoff * 2 // exponential
+ entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration)))
+ }
+ entry.lastUpdate = p.Clock.Now()
+}
+
+// Reset forces clearing of all backoff data for a given key.
+func (p *Backoff) Reset(id string) {
+ p.Lock()
+ defer p.Unlock()
+ delete(p.perItemBackoff, id)
+}
+
+// Returns True if the elapsed time since eventTime is smaller than the current backoff window
+func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool {
+ p.Lock()
+ defer p.Unlock()
+ entry, ok := p.perItemBackoff[id]
+ if !ok {
+ return false
+ }
+ if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
+ return false
+ }
+ return p.Clock.Now().Sub(eventTime) < entry.backoff
+}
+
+// Returns True if time since lastupdate is less than the current backoff window.
+func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool {
+ p.Lock()
+ defer p.Unlock()
+ entry, ok := p.perItemBackoff[id]
+ if !ok {
+ return false
+ }
+ if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
+ return false
+ }
+ return eventTime.Sub(entry.lastUpdate) < entry.backoff
+}
+
+// Garbage collect records that have aged past maxDuration. Backoff users are expected
+// to invoke this periodically.
+func (p *Backoff) GC() {
+ p.Lock()
+ defer p.Unlock()
+ now := p.Clock.Now()
+ for id, entry := range p.perItemBackoff {
+ if now.Sub(entry.lastUpdate) > p.maxDuration*2 {
+ // GC when entry has not been updated for 2*maxDuration
+ delete(p.perItemBackoff, id)
+ }
+ }
+}
+
+func (p *Backoff) DeleteEntry(id string) {
+ p.Lock()
+ defer p.Unlock()
+ delete(p.perItemBackoff, id)
+}
+
+// Take a lock on *Backoff, before calling initEntryUnsafe
+func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
+ entry := &backoffEntry{backoff: p.defaultDuration}
+ p.perItemBackoff[id] = entry
+ return entry
+}
+
+// After 2*maxDuration we restart the backoff factor to the beginning
+func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
+ return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration
+}
diff --git a/vendor/k8s.io/client-go/util/flowcontrol/throttle.go b/vendor/k8s.io/client-go/util/flowcontrol/throttle.go
new file mode 100644
index 000000000..c45169c40
--- /dev/null
+++ b/vendor/k8s.io/client-go/util/flowcontrol/throttle.go
@@ -0,0 +1,148 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package flowcontrol
+
+import (
+ "sync"
+
+ "github.com/juju/ratelimit"
+)
+
+type RateLimiter interface {
+ // TryAccept returns true if a token is taken immediately. Otherwise,
+ // it returns false.
+ TryAccept() bool
+ // Accept returns once a token becomes available.
+ Accept()
+ // Stop stops the rate limiter, subsequent calls to CanAccept will return false
+ Stop()
+ // Saturation returns a percentage number which describes how saturated
+ // this rate limiter is.
+ // Usually we use token bucket rate limiter. In that case,
+ // 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
+ Saturation() float64
+ // QPS returns QPS of this rate limiter
+ QPS() float32
+}
+
+type tokenBucketRateLimiter struct {
+ limiter *ratelimit.Bucket
+ qps float32
+}
+
+// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
+// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
+// smoothed qps rate of 'qps'.
+// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
+// The maximum number of tokens in the bucket is capped at 'burst'.
+func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
+ limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
+ return newTokenBucketRateLimiter(limiter, qps)
+}
+
+// An injectable, mockable clock interface.
+type Clock interface {
+ ratelimit.Clock
+}
+
+// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
+// but allows an injectable clock, for testing.
+func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter {
+ limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock)
+ return newTokenBucketRateLimiter(limiter, qps)
+}
+
+func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter {
+ return &tokenBucketRateLimiter{
+ limiter: limiter,
+ qps: qps,
+ }
+}
+
+func (t *tokenBucketRateLimiter) TryAccept() bool {
+ return t.limiter.TakeAvailable(1) == 1
+}
+
+func (t *tokenBucketRateLimiter) Saturation() float64 {
+ capacity := t.limiter.Capacity()
+ avail := t.limiter.Available()
+ return float64(capacity-avail) / float64(capacity)
+}
+
+// Accept will block until a token becomes available
+func (t *tokenBucketRateLimiter) Accept() {
+ t.limiter.Wait(1)
+}
+
+func (t *tokenBucketRateLimiter) Stop() {
+}
+
+func (t *tokenBucketRateLimiter) QPS() float32 {
+ return t.qps
+}
+
+type fakeAlwaysRateLimiter struct{}
+
+func NewFakeAlwaysRateLimiter() RateLimiter {
+ return &fakeAlwaysRateLimiter{}
+}
+
+func (t *fakeAlwaysRateLimiter) TryAccept() bool {
+ return true
+}
+
+func (t *fakeAlwaysRateLimiter) Saturation() float64 {
+ return 0
+}
+
+func (t *fakeAlwaysRateLimiter) Stop() {}
+
+func (t *fakeAlwaysRateLimiter) Accept() {}
+
+func (t *fakeAlwaysRateLimiter) QPS() float32 {
+ return 1
+}
+
+type fakeNeverRateLimiter struct {
+ wg sync.WaitGroup
+}
+
+func NewFakeNeverRateLimiter() RateLimiter {
+ rl := fakeNeverRateLimiter{}
+ rl.wg.Add(1)
+ return &rl
+}
+
+func (t *fakeNeverRateLimiter) TryAccept() bool {
+ return false
+}
+
+func (t *fakeNeverRateLimiter) Saturation() float64 {
+ return 1
+}
+
+func (t *fakeNeverRateLimiter) Stop() {
+ t.wg.Done()
+}
+
+func (t *fakeNeverRateLimiter) Accept() {
+ t.wg.Wait()
+}
+
+func (t *fakeNeverRateLimiter) QPS() float32 {
+ return 1
+}