diff options
Diffstat (limited to 'vendor/k8s.io/client-go/plugin')
-rw-r--r-- | vendor/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go | 166 |
1 files changed, 123 insertions, 43 deletions
diff --git a/vendor/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go b/vendor/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go index dfd434d0c..b88902c10 100644 --- a/vendor/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go +++ b/vendor/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go @@ -18,23 +18,33 @@ package exec import ( "bytes" + "context" + "crypto/tls" + "errors" "fmt" "io" + "net" "net/http" "os" "os/exec" + "reflect" "sync" "time" - "github.com/golang/glog" + "github.com/davecgh/go-spew/spew" "golang.org/x/crypto/ssh/terminal" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/pkg/apis/clientauthentication" "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1" + "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1" "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/transport" + "k8s.io/client-go/util/connrotation" + "k8s.io/klog" ) const execInfoEnv = "KUBERNETES_EXEC_INFO" @@ -44,8 +54,9 @@ var codecs = serializer.NewCodecFactory(scheme) func init() { v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"}) - v1alpha1.AddToScheme(scheme) - clientauthentication.AddToScheme(scheme) + utilruntime.Must(v1alpha1.AddToScheme(scheme)) + utilruntime.Must(v1beta1.AddToScheme(scheme)) + utilruntime.Must(clientauthentication.AddToScheme(scheme)) } var ( @@ -55,6 +66,7 @@ var ( // The list of API versions we accept. apiVersions = map[string]schema.GroupVersion{ v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion, + v1beta1.SchemeGroupVersion.String(): v1beta1.SchemeGroupVersion, } ) @@ -62,8 +74,10 @@ func newCache() *cache { return &cache{m: make(map[string]*Authenticator)} } +var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "} + func cacheKey(c *api.ExecConfig) string { - return fmt.Sprintf("%#v", c) + return spewConfig.Sprint(c) } type cache struct { @@ -147,14 +161,40 @@ type Authenticator struct { // The mutex also guards calling the plugin. Since the plugin could be // interactive we want to make sure it's only called once. mu sync.Mutex - cachedToken string + cachedCreds *credentials exp time.Time + + onRotate func() } -// WrapTransport instruments an existing http.RoundTripper with credentials returned -// by the plugin. -func (a *Authenticator) WrapTransport(rt http.RoundTripper) http.RoundTripper { - return &roundTripper{a, rt} +type credentials struct { + token string + cert *tls.Certificate +} + +// UpdateTransportConfig updates the transport.Config to use credentials +// returned by the plugin. +func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error { + c.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return &roundTripper{a, rt} + }) + + if c.TLS.GetCert != nil { + return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set") + } + c.TLS.GetCert = a.cert + + var dial func(ctx context.Context, network, addr string) (net.Conn, error) + if c.Dial != nil { + dial = c.Dial + } else { + dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext + } + d := connrotation.NewDialer(dial) + a.onRotate = d.CloseAll + c.Dial = d.DialContext + + return nil } type roundTripper struct { @@ -169,11 +209,13 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return r.base.RoundTrip(req) } - token, err := r.a.token() + creds, err := r.a.getCreds() if err != nil { - return nil, fmt.Errorf("getting token: %v", err) + return nil, fmt.Errorf("getting credentials: %v", err) + } + if creds.token != "" { + req.Header.Set("Authorization", "Bearer "+creds.token) } - req.Header.Set("Authorization", "Bearer "+token) res, err := r.base.RoundTrip(req) if err != nil { @@ -184,47 +226,60 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { Header: res.Header, Code: int32(res.StatusCode), } - if err := r.a.refresh(token, resp); err != nil { - glog.Errorf("refreshing token: %v", err) + if err := r.a.maybeRefreshCreds(creds, resp); err != nil { + klog.Errorf("refreshing credentials: %v", err) } } return res, nil } -func (a *Authenticator) tokenExpired() bool { +func (a *Authenticator) credsExpired() bool { if a.exp.IsZero() { return false } return a.now().After(a.exp) } -func (a *Authenticator) token() (string, error) { +func (a *Authenticator) cert() (*tls.Certificate, error) { + creds, err := a.getCreds() + if err != nil { + return nil, err + } + return creds.cert, nil +} + +func (a *Authenticator) getCreds() (*credentials, error) { a.mu.Lock() defer a.mu.Unlock() - if a.cachedToken != "" && !a.tokenExpired() { - return a.cachedToken, nil + if a.cachedCreds != nil && !a.credsExpired() { + return a.cachedCreds, nil } - return a.getToken(nil) + if err := a.refreshCredsLocked(nil); err != nil { + return nil, err + } + return a.cachedCreds, nil } -// refresh executes the plugin to force a rotation of the token. -func (a *Authenticator) refresh(token string, r *clientauthentication.Response) error { +// maybeRefreshCreds executes the plugin to force a rotation of the +// credentials, unless they were rotated already. +func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error { a.mu.Lock() defer a.mu.Unlock() - if token != a.cachedToken { - // Token already rotated. + // Since we're not making a new pointer to a.cachedCreds in getCreds, no + // need to do deep comparison. + if creds != a.cachedCreds { + // Credentials already rotated. return nil } - _, err := a.getToken(r) - return err + return a.refreshCredsLocked(r) } -// getToken executes the plugin and reads the credentials from stdout. It must be -// called while holding the Authenticator's mutex. -func (a *Authenticator) getToken(r *clientauthentication.Response) (string, error) { +// refreshCredsLocked executes the plugin and reads the credentials from +// stdout. It must be called while holding the Authenticator's mutex. +func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error { cred := &clientauthentication.ExecCredential{ Spec: clientauthentication.ExecCredentialSpec{ Response: r, @@ -232,13 +287,18 @@ func (a *Authenticator) getToken(r *clientauthentication.Response) (string, erro }, } - data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred) - if err != nil { - return "", fmt.Errorf("encode ExecCredentials: %v", err) - } - env := append(a.environ(), a.env...) - env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data)) + if a.group == v1alpha1.SchemeGroupVersion { + // Input spec disabled for beta due to lack of use. Possibly re-enable this later if + // someone wants it back. + // + // See: https://github.com/kubernetes/kubernetes/issues/61796 + data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred) + if err != nil { + return fmt.Errorf("encode ExecCredentials: %v", err) + } + env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data)) + } stdout := &bytes.Buffer{} cmd := exec.Command(a.cmd, a.args...) @@ -250,23 +310,26 @@ func (a *Authenticator) getToken(r *clientauthentication.Response) (string, erro } if err := cmd.Run(); err != nil { - return "", fmt.Errorf("exec: %v", err) + return fmt.Errorf("exec: %v", err) } _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred) if err != nil { - return "", fmt.Errorf("decode stdout: %v", err) + return fmt.Errorf("decoding stdout: %v", err) } if gvk.Group != a.group.Group || gvk.Version != a.group.Version { - return "", fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s", + return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s", a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version}) } if cred.Status == nil { - return "", fmt.Errorf("exec plugin didn't return a status field") + return fmt.Errorf("exec plugin didn't return a status field") + } + if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" { + return fmt.Errorf("exec plugin didn't return a token or cert/key pair") } - if cred.Status.Token == "" { - return "", fmt.Errorf("exec plugin didn't return a token") + if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") { + return fmt.Errorf("exec plugin returned only certificate or key, not both") } if cred.Status.ExpirationTimestamp != nil { @@ -274,7 +337,24 @@ func (a *Authenticator) getToken(r *clientauthentication.Response) (string, erro } else { a.exp = time.Time{} } - a.cachedToken = cred.Status.Token - return a.cachedToken, nil + newCreds := &credentials{ + token: cred.Status.Token, + } + if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" { + cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData)) + if err != nil { + return fmt.Errorf("failed parsing client key/certificate: %v", err) + } + newCreds.cert = &cert + } + + oldCreds := a.cachedCreds + a.cachedCreds = newCreds + // Only close all connections when TLS cert rotates. Token rotation doesn't + // need the extra noise. + if a.onRotate != nil && oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) { + a.onRotate() + } + return nil } |