aboutsummaryrefslogtreecommitdiff
path: root/vendor/k8s.io/client-go/util
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/client-go/util')
-rw-r--r--vendor/k8s.io/client-go/util/buffer/ring_growing.go72
-rw-r--r--vendor/k8s.io/client-go/util/cert/cert.go2
-rw-r--r--vendor/k8s.io/client-go/util/cert/io.go30
-rw-r--r--vendor/k8s.io/client-go/util/cert/pem.go143
-rw-r--r--vendor/k8s.io/client-go/util/flowcontrol/throttle.go55
-rw-r--r--vendor/k8s.io/client-go/util/retry/util.go79
6 files changed, 333 insertions, 48 deletions
diff --git a/vendor/k8s.io/client-go/util/buffer/ring_growing.go b/vendor/k8s.io/client-go/util/buffer/ring_growing.go
new file mode 100644
index 000000000..86965a513
--- /dev/null
+++ b/vendor/k8s.io/client-go/util/buffer/ring_growing.go
@@ -0,0 +1,72 @@
+/*
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package buffer
+
+// RingGrowing is a growing ring buffer.
+// Not thread safe.
+type RingGrowing struct {
+ data []interface{}
+ n int // Size of Data
+ beg int // First available element
+ readable int // Number of data items available
+}
+
+// NewRingGrowing constructs a new RingGrowing instance with provided parameters.
+func NewRingGrowing(initialSize int) *RingGrowing {
+ return &RingGrowing{
+ data: make([]interface{}, initialSize),
+ n: initialSize,
+ }
+}
+
+// ReadOne reads (consumes) first item from the buffer if it is available, otherwise returns false.
+func (r *RingGrowing) ReadOne() (data interface{}, ok bool) {
+ if r.readable == 0 {
+ return nil, false
+ }
+ r.readable--
+ element := r.data[r.beg]
+ r.data[r.beg] = nil // Remove reference to the object to help GC
+ if r.beg == r.n-1 {
+ // Was the last element
+ r.beg = 0
+ } else {
+ r.beg++
+ }
+ return element, true
+}
+
+// WriteOne adds an item to the end of the buffer, growing it if it is full.
+func (r *RingGrowing) WriteOne(data interface{}) {
+ if r.readable == r.n {
+ // Time to grow
+ newN := r.n * 2
+ newData := make([]interface{}, newN)
+ to := r.beg + r.readable
+ if to <= r.n {
+ copy(newData, r.data[r.beg:to])
+ } else {
+ copied := copy(newData, r.data[r.beg:])
+ copy(newData[copied:], r.data[:(to%r.n)])
+ }
+ r.beg = 0
+ r.data = newData
+ r.n = newN
+ }
+ r.data[(r.readable+r.beg)%r.n] = data
+ r.readable++
+}
diff --git a/vendor/k8s.io/client-go/util/cert/cert.go b/vendor/k8s.io/client-go/util/cert/cert.go
index 6854d4152..2c95754c1 100644
--- a/vendor/k8s.io/client-go/util/cert/cert.go
+++ b/vendor/k8s.io/client-go/util/cert/cert.go
@@ -38,7 +38,7 @@ const (
duration365d = time.Hour * 24 * 365
)
-// Config containes the basic fields required for creating a certificate
+// Config contains the basic fields required for creating a certificate
type Config struct {
CommonName string
Organization []string
diff --git a/vendor/k8s.io/client-go/util/cert/io.go b/vendor/k8s.io/client-go/util/cert/io.go
index b6b690383..a41f8054a 100644
--- a/vendor/k8s.io/client-go/util/cert/io.go
+++ b/vendor/k8s.io/client-go/util/cert/io.go
@@ -66,10 +66,7 @@ func WriteCert(certPath string, data []byte) error {
if err := os.MkdirAll(filepath.Dir(certPath), os.FileMode(0755)); err != nil {
return err
}
- if err := ioutil.WriteFile(certPath, data, os.FileMode(0644)); err != nil {
- return err
- }
- return nil
+ return ioutil.WriteFile(certPath, data, os.FileMode(0644))
}
// WriteKey writes the pem-encoded key data to keyPath.
@@ -80,10 +77,7 @@ func WriteKey(keyPath string, data []byte) error {
if err := os.MkdirAll(filepath.Dir(keyPath), os.FileMode(0755)); err != nil {
return err
}
- if err := ioutil.WriteFile(keyPath, data, os.FileMode(0600)); err != nil {
- return err
- }
- return nil
+ return ioutil.WriteFile(keyPath, data, os.FileMode(0600))
}
// LoadOrGenerateKeyFile looks for a key in the file at the given path. If it
@@ -138,13 +132,27 @@ func CertsFromFile(file string) ([]*x509.Certificate, error) {
// PrivateKeyFromFile returns the private key in rsa.PrivateKey or ecdsa.PrivateKey format from a given PEM-encoded file.
// Returns an error if the file could not be read or if the private key could not be parsed.
func PrivateKeyFromFile(file string) (interface{}, error) {
- pemBlock, err := ioutil.ReadFile(file)
+ data, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
- key, err := ParsePrivateKeyPEM(pemBlock)
+ key, err := ParsePrivateKeyPEM(data)
if err != nil {
- return nil, fmt.Errorf("error reading %s: %v", file, err)
+ return nil, fmt.Errorf("error reading private key file %s: %v", file, err)
}
return key, nil
}
+
+// PublicKeysFromFile returns the public keys in rsa.PublicKey or ecdsa.PublicKey format from a given PEM-encoded file.
+// Reads public keys from both public and private key files.
+func PublicKeysFromFile(file string) ([]interface{}, error) {
+ data, err := ioutil.ReadFile(file)
+ if err != nil {
+ return nil, err
+ }
+ keys, err := ParsePublicKeysPEM(data)
+ if err != nil {
+ return nil, fmt.Errorf("error reading public key file %s: %v", file, err)
+ }
+ return keys, nil
+}
diff --git a/vendor/k8s.io/client-go/util/cert/pem.go b/vendor/k8s.io/client-go/util/cert/pem.go
index 899845857..b99e36651 100644
--- a/vendor/k8s.io/client-go/util/cert/pem.go
+++ b/vendor/k8s.io/client-go/util/cert/pem.go
@@ -17,6 +17,7 @@ limitations under the License.
package cert
import (
+ "crypto/ecdsa"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
@@ -29,17 +30,17 @@ const (
ECPrivateKeyBlockType = "EC PRIVATE KEY"
// RSAPrivateKeyBlockType is a possible value for pem.Block.Type.
RSAPrivateKeyBlockType = "RSA PRIVATE KEY"
- // CertificateBlockType is a possible value for pem.Block.Type.
- CertificateBlockType = "CERTIFICATE"
- // CertificateRequestBlockType is a possible value for pem.Block.Type.
- CertificateRequestBlockType = "CERTIFICATE REQUEST"
// PrivateKeyBlockType is a possible value for pem.Block.Type.
PrivateKeyBlockType = "PRIVATE KEY"
// PublicKeyBlockType is a possible value for pem.Block.Type.
PublicKeyBlockType = "PUBLIC KEY"
+ // CertificateBlockType is a possible value for pem.Block.Type.
+ CertificateBlockType = "CERTIFICATE"
+ // CertificateRequestBlockType is a possible value for pem.Block.Type.
+ CertificateRequestBlockType = "CERTIFICATE REQUEST"
)
-// EncodePublicKeyPEM returns PEM-endcode public data
+// EncodePublicKeyPEM returns PEM-encoded public data
func EncodePublicKeyPEM(key *rsa.PublicKey) ([]byte, error) {
der, err := x509.MarshalPKIXPublicKey(key)
if err != nil {
@@ -106,6 +107,46 @@ func ParsePrivateKeyPEM(keyData []byte) (interface{}, error) {
return nil, fmt.Errorf("data does not contain a valid RSA or ECDSA private key")
}
+// ParsePublicKeysPEM is a helper function for reading an array of rsa.PublicKey or ecdsa.PublicKey from a PEM-encoded byte array.
+// Reads public keys from both public and private key files.
+func ParsePublicKeysPEM(keyData []byte) ([]interface{}, error) {
+ var block *pem.Block
+ keys := []interface{}{}
+ for {
+ // read the next block
+ block, keyData = pem.Decode(keyData)
+ if block == nil {
+ break
+ }
+
+ // test block against parsing functions
+ if privateKey, err := parseRSAPrivateKey(block.Bytes); err == nil {
+ keys = append(keys, &privateKey.PublicKey)
+ continue
+ }
+ if publicKey, err := parseRSAPublicKey(block.Bytes); err == nil {
+ keys = append(keys, publicKey)
+ continue
+ }
+ if privateKey, err := parseECPrivateKey(block.Bytes); err == nil {
+ keys = append(keys, &privateKey.PublicKey)
+ continue
+ }
+ if publicKey, err := parseECPublicKey(block.Bytes); err == nil {
+ keys = append(keys, publicKey)
+ continue
+ }
+
+ // tolerate non-key PEM blocks for backwards compatibility
+ // originally, only the first PEM block was parsed and expected to be a key block
+ }
+
+ if len(keys) == 0 {
+ return nil, fmt.Errorf("data does not contain any valid RSA or ECDSA public keys")
+ }
+ return keys, nil
+}
+
// ParseCertsPEM returns the x509.Certificates contained in the given PEM-encoded byte array
// Returns an error if a certificate could not be parsed, or if the data does not contain any certificates
func ParseCertsPEM(pemCerts []byte) ([]*x509.Certificate, error) {
@@ -132,7 +173,97 @@ func ParseCertsPEM(pemCerts []byte) ([]*x509.Certificate, error) {
}
if !ok {
- return certs, errors.New("could not read any certificates")
+ return certs, errors.New("data does not contain any valid RSA or ECDSA certificates")
}
return certs, nil
}
+
+// parseRSAPublicKey parses a single RSA public key from the provided data
+func parseRSAPublicKey(data []byte) (*rsa.PublicKey, error) {
+ var err error
+
+ // Parse the key
+ var parsedKey interface{}
+ if parsedKey, err = x509.ParsePKIXPublicKey(data); err != nil {
+ if cert, err := x509.ParseCertificate(data); err == nil {
+ parsedKey = cert.PublicKey
+ } else {
+ return nil, err
+ }
+ }
+
+ // Test if parsed key is an RSA Public Key
+ var pubKey *rsa.PublicKey
+ var ok bool
+ if pubKey, ok = parsedKey.(*rsa.PublicKey); !ok {
+ return nil, fmt.Errorf("data doesn't contain valid RSA Public Key")
+ }
+
+ return pubKey, nil
+}
+
+// parseRSAPrivateKey parses a single RSA private key from the provided data
+func parseRSAPrivateKey(data []byte) (*rsa.PrivateKey, error) {
+ var err error
+
+ // Parse the key
+ var parsedKey interface{}
+ if parsedKey, err = x509.ParsePKCS1PrivateKey(data); err != nil {
+ if parsedKey, err = x509.ParsePKCS8PrivateKey(data); err != nil {
+ return nil, err
+ }
+ }
+
+ // Test if parsed key is an RSA Private Key
+ var privKey *rsa.PrivateKey
+ var ok bool
+ if privKey, ok = parsedKey.(*rsa.PrivateKey); !ok {
+ return nil, fmt.Errorf("data doesn't contain valid RSA Private Key")
+ }
+
+ return privKey, nil
+}
+
+// parseECPublicKey parses a single ECDSA public key from the provided data
+func parseECPublicKey(data []byte) (*ecdsa.PublicKey, error) {
+ var err error
+
+ // Parse the key
+ var parsedKey interface{}
+ if parsedKey, err = x509.ParsePKIXPublicKey(data); err != nil {
+ if cert, err := x509.ParseCertificate(data); err == nil {
+ parsedKey = cert.PublicKey
+ } else {
+ return nil, err
+ }
+ }
+
+ // Test if parsed key is an ECDSA Public Key
+ var pubKey *ecdsa.PublicKey
+ var ok bool
+ if pubKey, ok = parsedKey.(*ecdsa.PublicKey); !ok {
+ return nil, fmt.Errorf("data doesn't contain valid ECDSA Public Key")
+ }
+
+ return pubKey, nil
+}
+
+// parseECPrivateKey parses a single ECDSA private key from the provided data
+func parseECPrivateKey(data []byte) (*ecdsa.PrivateKey, error) {
+ var err error
+
+ // Parse the key
+ var parsedKey interface{}
+ if parsedKey, err = x509.ParseECPrivateKey(data); err != nil {
+ return nil, err
+ }
+
+ // Test if parsed key is an ECDSA Private Key
+ var privKey *ecdsa.PrivateKey
+ var ok bool
+ if privKey, ok = parsedKey.(*ecdsa.PrivateKey); !ok {
+ return nil, fmt.Errorf("data doesn't contain valid ECDSA Private Key")
+ }
+
+ return privKey, nil
+}
diff --git a/vendor/k8s.io/client-go/util/flowcontrol/throttle.go b/vendor/k8s.io/client-go/util/flowcontrol/throttle.go
index c45169c40..e671c044d 100644
--- a/vendor/k8s.io/client-go/util/flowcontrol/throttle.go
+++ b/vendor/k8s.io/client-go/util/flowcontrol/throttle.go
@@ -18,8 +18,9 @@ package flowcontrol
import (
"sync"
+ "time"
- "github.com/juju/ratelimit"
+ "golang.org/x/time/rate"
)
type RateLimiter interface {
@@ -30,17 +31,13 @@ type RateLimiter interface {
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
- // Saturation returns a percentage number which describes how saturated
- // this rate limiter is.
- // Usually we use token bucket rate limiter. In that case,
- // 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
- Saturation() float64
// QPS returns QPS of this rate limiter
QPS() float32
}
type tokenBucketRateLimiter struct {
- limiter *ratelimit.Bucket
+ limiter *rate.Limiter
+ clock Clock
qps float32
}
@@ -50,42 +47,48 @@ type tokenBucketRateLimiter struct {
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
- limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
- return newTokenBucketRateLimiter(limiter, qps)
+ limiter := rate.NewLimiter(rate.Limit(qps), burst)
+ return newTokenBucketRateLimiter(limiter, realClock{}, qps)
}
// An injectable, mockable clock interface.
type Clock interface {
- ratelimit.Clock
+ Now() time.Time
+ Sleep(time.Duration)
+}
+
+type realClock struct{}
+
+func (realClock) Now() time.Time {
+ return time.Now()
+}
+func (realClock) Sleep(d time.Duration) {
+ time.Sleep(d)
}
// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
// but allows an injectable clock, for testing.
-func NewTokenBucketRateLimiterWithClock(qps float32, burst int, clock Clock) RateLimiter {
- limiter := ratelimit.NewBucketWithRateAndClock(float64(qps), int64(burst), clock)
- return newTokenBucketRateLimiter(limiter, qps)
+func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
+ limiter := rate.NewLimiter(rate.Limit(qps), burst)
+ return newTokenBucketRateLimiter(limiter, c, qps)
}
-func newTokenBucketRateLimiter(limiter *ratelimit.Bucket, qps float32) RateLimiter {
+func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
return &tokenBucketRateLimiter{
limiter: limiter,
+ clock: c,
qps: qps,
}
}
func (t *tokenBucketRateLimiter) TryAccept() bool {
- return t.limiter.TakeAvailable(1) == 1
-}
-
-func (t *tokenBucketRateLimiter) Saturation() float64 {
- capacity := t.limiter.Capacity()
- avail := t.limiter.Available()
- return float64(capacity-avail) / float64(capacity)
+ return t.limiter.AllowN(t.clock.Now(), 1)
}
// Accept will block until a token becomes available
func (t *tokenBucketRateLimiter) Accept() {
- t.limiter.Wait(1)
+ now := t.clock.Now()
+ t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
}
func (t *tokenBucketRateLimiter) Stop() {
@@ -105,10 +108,6 @@ func (t *fakeAlwaysRateLimiter) TryAccept() bool {
return true
}
-func (t *fakeAlwaysRateLimiter) Saturation() float64 {
- return 0
-}
-
func (t *fakeAlwaysRateLimiter) Stop() {}
func (t *fakeAlwaysRateLimiter) Accept() {}
@@ -131,10 +130,6 @@ func (t *fakeNeverRateLimiter) TryAccept() bool {
return false
}
-func (t *fakeNeverRateLimiter) Saturation() float64 {
- return 1
-}
-
func (t *fakeNeverRateLimiter) Stop() {
t.wg.Done()
}
diff --git a/vendor/k8s.io/client-go/util/retry/util.go b/vendor/k8s.io/client-go/util/retry/util.go
new file mode 100644
index 000000000..3ac0840ad
--- /dev/null
+++ b/vendor/k8s.io/client-go/util/retry/util.go
@@ -0,0 +1,79 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package retry
+
+import (
+ "time"
+
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/util/wait"
+)
+
+// DefaultRetry is the recommended retry for a conflict where multiple clients
+// are making changes to the same resource.
+var DefaultRetry = wait.Backoff{
+ Steps: 5,
+ Duration: 10 * time.Millisecond,
+ Factor: 1.0,
+ Jitter: 0.1,
+}
+
+// DefaultBackoff is the recommended backoff for a conflict where a client
+// may be attempting to make an unrelated modification to a resource under
+// active management by one or more controllers.
+var DefaultBackoff = wait.Backoff{
+ Steps: 4,
+ Duration: 10 * time.Millisecond,
+ Factor: 5.0,
+ Jitter: 0.1,
+}
+
+// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
+// write. Callers should preserve previous executions if they wish to retry changes. It performs an
+// exponential backoff.
+//
+// var pod *api.Pod
+// err := RetryOnConflict(DefaultBackoff, func() (err error) {
+// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
+// return
+// })
+// if err != nil {
+// // may be conflict if max retries were hit
+// return err
+// }
+// ...
+//
+// TODO: Make Backoff an interface?
+func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
+ var lastConflictErr error
+ err := wait.ExponentialBackoff(backoff, func() (bool, error) {
+ err := fn()
+ switch {
+ case err == nil:
+ return true, nil
+ case errors.IsConflict(err):
+ lastConflictErr = err
+ return false, nil
+ default:
+ return false, err
+ }
+ })
+ if err == wait.ErrWaitTimeout {
+ err = lastConflictErr
+ }
+ return err
+}