diff options
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/controller')
5 files changed, 1936 insertions, 0 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/controller/client_builder.go b/vendor/k8s.io/kubernetes/pkg/controller/client_builder.go new file mode 100644 index 000000000..491366288 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/controller/client_builder.go @@ -0,0 +1,292 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + apiserverserviceaccount "k8s.io/apiserver/pkg/authentication/serviceaccount" + clientgoclientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" + v1authenticationapi "k8s.io/kubernetes/pkg/apis/authentication/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + v1authentication "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/authentication/v1" + v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" + "k8s.io/kubernetes/pkg/serviceaccount" + + "github.com/golang/glog" +) + +// ControllerClientBuilder allows you to get clients and configs for controllers +type ControllerClientBuilder interface { + Config(name string) (*restclient.Config, error) + ConfigOrDie(name string) *restclient.Config + Client(name string) (clientset.Interface, error) + ClientOrDie(name string) clientset.Interface + ClientGoClient(name string) (clientgoclientset.Interface, error) + ClientGoClientOrDie(name string) clientgoclientset.Interface +} + +// SimpleControllerClientBuilder returns a fixed client with different user agents +type SimpleControllerClientBuilder struct { + // ClientConfig is a skeleton config to clone and use as the basis for each controller client + ClientConfig *restclient.Config +} + +func (b SimpleControllerClientBuilder) Config(name string) (*restclient.Config, error) { + clientConfig := *b.ClientConfig + return restclient.AddUserAgent(&clientConfig, name), nil +} + +func (b SimpleControllerClientBuilder) ConfigOrDie(name string) *restclient.Config { + clientConfig, err := b.Config(name) + if err != nil { + glog.Fatal(err) + } + return clientConfig +} + +func (b SimpleControllerClientBuilder) Client(name string) (clientset.Interface, error) { + clientConfig, err := b.Config(name) + if err != nil { + return nil, err + } + return clientset.NewForConfig(clientConfig) +} + +func (b SimpleControllerClientBuilder) ClientOrDie(name string) clientset.Interface { + client, err := b.Client(name) + if err != nil { + glog.Fatal(err) + } + return client +} + +func (b SimpleControllerClientBuilder) ClientGoClient(name string) (clientgoclientset.Interface, error) { + clientConfig, err := b.Config(name) + if err != nil { + return nil, err + } + return clientgoclientset.NewForConfig(clientConfig) +} + +func (b SimpleControllerClientBuilder) ClientGoClientOrDie(name string) clientgoclientset.Interface { + client, err := b.ClientGoClient(name) + if err != nil { + glog.Fatal(err) + } + return client +} + +// SAControllerClientBuilder is a ControllerClientBuilder that returns clients identifying as +// service accounts +type SAControllerClientBuilder struct { + // ClientConfig is a skeleton config to clone and use as the basis for each controller client + ClientConfig *restclient.Config + + // CoreClient is used to provision service accounts if needed and watch for their associated tokens + // to construct a controller client + CoreClient v1core.CoreV1Interface + + // AuthenticationClient is used to check API tokens to make sure they are valid before + // building a controller client from them + AuthenticationClient v1authentication.AuthenticationV1Interface + + // Namespace is the namespace used to host the service accounts that will back the + // controllers. It must be highly privileged namespace which normal users cannot inspect. + Namespace string +} + +// config returns a complete clientConfig for constructing clients. This is separate in anticipation of composition +// which means that not all clientsets are known here +func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, error) { + sa, err := b.getOrCreateServiceAccount(name) + if err != nil { + return nil, err + } + + var clientConfig *restclient.Config + + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(v1.SecretTypeServiceAccountToken)}).String() + return b.CoreClient.Secrets(b.Namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(v1.SecretTypeServiceAccountToken)}).String() + return b.CoreClient.Secrets(b.Namespace).Watch(options) + }, + } + _, err = cache.ListWatchUntil(30*time.Second, lw, + func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, nil + case watch.Error: + return false, fmt.Errorf("error watching") + + case watch.Added, watch.Modified: + secret, ok := event.Object.(*v1.Secret) + if !ok { + return false, fmt.Errorf("unexpected object type: %T", event.Object) + } + if !serviceaccount.IsServiceAccountToken(secret, sa) { + return false, nil + } + if len(secret.Data[v1.ServiceAccountTokenKey]) == 0 { + return false, nil + } + validConfig, valid, err := b.getAuthenticatedConfig(sa, string(secret.Data[v1.ServiceAccountTokenKey])) + if err != nil { + glog.Warningf("error validating API token for %s/%s in secret %s: %v", sa.Name, sa.Namespace, secret.Name, err) + // continue watching for good tokens + return false, nil + } + if !valid { + glog.Warningf("secret %s contained an invalid API token for %s/%s", secret.Name, sa.Name, sa.Namespace) + // try to delete the secret containing the invalid token + if err := b.CoreClient.Secrets(secret.Namespace).Delete(secret.Name, &metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + glog.Warningf("error deleting secret %s containing invalid API token for %s/%s: %v", secret.Name, sa.Name, sa.Namespace, err) + } + // continue watching for good tokens + return false, nil + } + clientConfig = validConfig + return true, nil + + default: + return false, fmt.Errorf("unexpected event type: %v", event.Type) + } + }) + if err != nil { + return nil, fmt.Errorf("unable to get token for service account: %v", err) + } + + return clientConfig, nil +} + +func (b SAControllerClientBuilder) getOrCreateServiceAccount(name string) (*v1.ServiceAccount, error) { + sa, err := b.CoreClient.ServiceAccounts(b.Namespace).Get(name, metav1.GetOptions{}) + if err == nil { + return sa, nil + } + if !apierrors.IsNotFound(err) { + return nil, err + } + + // Create the namespace if we can't verify it exists. + // Tolerate errors, since we don't know whether this component has namespace creation permissions. + if _, err := b.CoreClient.Namespaces().Get(b.Namespace, metav1.GetOptions{}); err != nil { + b.CoreClient.Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: b.Namespace}}) + } + + // Create the service account + sa, err = b.CoreClient.ServiceAccounts(b.Namespace).Create(&v1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: b.Namespace, Name: name}}) + if apierrors.IsAlreadyExists(err) { + // If we're racing to init and someone else already created it, re-fetch + return b.CoreClient.ServiceAccounts(b.Namespace).Get(name, metav1.GetOptions{}) + } + return sa, err +} + +func (b SAControllerClientBuilder) getAuthenticatedConfig(sa *v1.ServiceAccount, token string) (*restclient.Config, bool, error) { + username := apiserverserviceaccount.MakeUsername(sa.Namespace, sa.Name) + + clientConfig := restclient.AnonymousClientConfig(b.ClientConfig) + clientConfig.BearerToken = token + restclient.AddUserAgent(clientConfig, username) + + // Try token review first + tokenReview := &v1authenticationapi.TokenReview{Spec: v1authenticationapi.TokenReviewSpec{Token: token}} + if tokenResult, err := b.AuthenticationClient.TokenReviews().Create(tokenReview); err == nil { + if !tokenResult.Status.Authenticated { + glog.Warningf("Token for %s/%s did not authenticate correctly", sa.Name, sa.Namespace) + return nil, false, nil + } + if tokenResult.Status.User.Username != username { + glog.Warningf("Token for %s/%s authenticated as unexpected username: %s", sa.Name, sa.Namespace, tokenResult.Status.User.Username) + return nil, false, nil + } + glog.V(4).Infof("Verified credential for %s/%s", sa.Name, sa.Namespace) + return clientConfig, true, nil + } + + // If we couldn't run the token review, the API might be disabled or we might not have permission. + // Try to make a request to /apis with the token. If we get a 401 we should consider the token invalid. + clientConfigCopy := *clientConfig + clientConfigCopy.NegotiatedSerializer = api.Codecs + client, err := restclient.UnversionedRESTClientFor(&clientConfigCopy) + if err != nil { + return nil, false, err + } + err = client.Get().AbsPath("/apis").Do().Error() + if apierrors.IsUnauthorized(err) { + glog.Warningf("Token for %s/%s did not authenticate correctly: %v", sa.Name, sa.Namespace, err) + return nil, false, nil + } + + return clientConfig, true, nil +} + +func (b SAControllerClientBuilder) ConfigOrDie(name string) *restclient.Config { + clientConfig, err := b.Config(name) + if err != nil { + glog.Fatal(err) + } + return clientConfig +} + +func (b SAControllerClientBuilder) Client(name string) (clientset.Interface, error) { + clientConfig, err := b.Config(name) + if err != nil { + return nil, err + } + return clientset.NewForConfig(clientConfig) +} + +func (b SAControllerClientBuilder) ClientOrDie(name string) clientset.Interface { + client, err := b.Client(name) + if err != nil { + glog.Fatal(err) + } + return client +} + +func (b SAControllerClientBuilder) ClientGoClient(name string) (clientgoclientset.Interface, error) { + clientConfig, err := b.Config(name) + if err != nil { + return nil, err + } + return clientgoclientset.NewForConfig(clientConfig) +} + +func (b SAControllerClientBuilder) ClientGoClientOrDie(name string) clientgoclientset.Interface { + client, err := b.ClientGoClient(name) + if err != nil { + glog.Fatal(err) + } + return client +} diff --git a/vendor/k8s.io/kubernetes/pkg/controller/controller_ref_manager.go b/vendor/k8s.io/kubernetes/pkg/controller/controller_ref_manager.go new file mode 100644 index 000000000..5477a073f --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/controller/controller_ref_manager.go @@ -0,0 +1,515 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "sync" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/kubernetes/pkg/api/v1" + appsv1beta1 "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" +) + +// GetControllerOf returns the controllerRef if controllee has a controller, +// otherwise returns nil. +func GetControllerOf(controllee metav1.Object) *metav1.OwnerReference { + ownerRefs := controllee.GetOwnerReferences() + for i := range ownerRefs { + owner := &ownerRefs[i] + if owner.Controller != nil && *owner.Controller == true { + return owner + } + } + return nil +} + +type baseControllerRefManager struct { + controller metav1.Object + selector labels.Selector + + canAdoptErr error + canAdoptOnce sync.Once + canAdoptFunc func() error +} + +func (m *baseControllerRefManager) canAdopt() error { + m.canAdoptOnce.Do(func() { + if m.canAdoptFunc != nil { + m.canAdoptErr = m.canAdoptFunc() + } + }) + return m.canAdoptErr +} + +// claimObject tries to take ownership of an object for this controller. +// +// It will reconcile the following: +// * Adopt orphans if the match function returns true. +// * Release owned objects if the match function returns false. +// +// A non-nil error is returned if some form of reconciliation was attemped and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The returned boolean indicates whether you now +// own the object. +// +// No reconciliation will be attempted if the controller is being deleted. +func (m *baseControllerRefManager) claimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) { + controllerRef := GetControllerOf(obj) + if controllerRef != nil { + if controllerRef.UID != m.controller.GetUID() { + // Owned by someone else. Ignore. + return false, nil + } + if match(obj) { + // We already own it and the selector matches. + // Return true (successfully claimed) before checking deletion timestamp. + // We're still allowed to claim things we already own while being deleted + // because doing so requires taking no actions. + return true, nil + } + // Owned by us but selector doesn't match. + // Try to release, unless we're being deleted. + if m.controller.GetDeletionTimestamp() != nil { + return false, nil + } + if err := release(obj); err != nil { + // If the pod no longer exists, ignore the error. + if errors.IsNotFound(err) { + return false, nil + } + // Either someone else released it, or there was a transient error. + // The controller should requeue and try again if it's still stale. + return false, err + } + // Successfully released. + return false, nil + } + + // It's an orphan. + if m.controller.GetDeletionTimestamp() != nil || !match(obj) { + // Ignore if we're being deleted or selector doesn't match. + return false, nil + } + if obj.GetDeletionTimestamp() != nil { + // Ignore if the object is being deleted + return false, nil + } + // Selector matches. Try to adopt. + if err := adopt(obj); err != nil { + // If the pod no longer exists, ignore the error. + if errors.IsNotFound(err) { + return false, nil + } + // Either someone else claimed it first, or there was a transient error. + // The controller should requeue and try again if it's still orphaned. + return false, err + } + // Successfully adopted. + return true, nil +} + +type PodControllerRefManager struct { + baseControllerRefManager + controllerKind schema.GroupVersionKind + podControl PodControlInterface +} + +// NewPodControllerRefManager returns a PodControllerRefManager that exposes +// methods to manage the controllerRef of pods. +// +// The canAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If canAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once canAdopt() is called, it will not be called again by the same +// PodControllerRefManager instance. Create a new instance if it makes +// sense to check canAdopt() again (e.g. in a different sync pass). +func NewPodControllerRefManager( + podControl PodControlInterface, + controller metav1.Object, + selector labels.Selector, + controllerKind schema.GroupVersionKind, + canAdopt func() error, +) *PodControllerRefManager { + return &PodControllerRefManager{ + baseControllerRefManager: baseControllerRefManager{ + controller: controller, + selector: selector, + canAdoptFunc: canAdopt, + }, + controllerKind: controllerKind, + podControl: podControl, + } +} + +// ClaimPods tries to take ownership of a list of Pods. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// Optional: If one or more filters are specified, a Pod will only be claimed if +// all filters return true. +// +// A non-nil error is returned if some form of reconciliation was attemped and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of Pods that you now own is returned. +func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) { + var claimed []*v1.Pod + var errlist []error + + match := func(obj metav1.Object) bool { + pod := obj.(*v1.Pod) + // Check selector first so filters only run on potentially matching Pods. + if !m.selector.Matches(labels.Set(pod.Labels)) { + return false + } + for _, filter := range filters { + if !filter(pod) { + return false + } + } + return true + } + adopt := func(obj metav1.Object) error { + return m.AdoptPod(obj.(*v1.Pod)) + } + release := func(obj metav1.Object) error { + return m.ReleasePod(obj.(*v1.Pod)) + } + + for _, pod := range pods { + ok, err := m.claimObject(pod, match, adopt, release) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, pod) + } + } + return claimed, utilerrors.NewAggregate(errlist) +} + +// AdoptPod sends a patch to take control of the pod. It returns the error if +// the patching fails. +func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { + if err := m.canAdopt(); err != nil { + return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err) + } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. + addControllerPatch := fmt.Sprintf( + `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, + m.controllerKind.GroupVersion(), m.controllerKind.Kind, + m.controller.GetName(), m.controller.GetUID(), pod.UID) + return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch)) +} + +// ReleasePod sends a patch to free the pod from the control of the controller. +// It returns the error if the patching fails. 404 and 422 errors are ignored. +func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error { + glog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", + pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), pod.UID) + err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch)) + if err != nil { + if errors.IsNotFound(err) { + // If the pod no longer exists, ignore it. + return nil + } + if errors.IsInvalid(err) { + // Invalid error will be returned in two cases: 1. the pod + // has no owner reference, 2. the uid of the pod doesn't + // match, which means the pod is deleted and then recreated. + // In both cases, the error can be ignored. + + // TODO: If the pod has owner references, but none of them + // has the owner.UID, server will silently ignore the patch. + // Investigate why. + return nil + } + } + return err +} + +// ReplicaSetControllerRefManager is used to manage controllerRef of ReplicaSets. +// Three methods are defined on this object 1: Classify 2: AdoptReplicaSet and +// 3: ReleaseReplicaSet which are used to classify the ReplicaSets into appropriate +// categories and accordingly adopt or release them. See comments on these functions +// for more details. +type ReplicaSetControllerRefManager struct { + baseControllerRefManager + controllerKind schema.GroupVersionKind + rsControl RSControlInterface +} + +// NewReplicaSetControllerRefManager returns a ReplicaSetControllerRefManager that exposes +// methods to manage the controllerRef of ReplicaSets. +// +// The canAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If canAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once canAdopt() is called, it will not be called again by the same +// ReplicaSetControllerRefManager instance. Create a new instance if it +// makes sense to check canAdopt() again (e.g. in a different sync pass). +func NewReplicaSetControllerRefManager( + rsControl RSControlInterface, + controller metav1.Object, + selector labels.Selector, + controllerKind schema.GroupVersionKind, + canAdopt func() error, +) *ReplicaSetControllerRefManager { + return &ReplicaSetControllerRefManager{ + baseControllerRefManager: baseControllerRefManager{ + controller: controller, + selector: selector, + canAdoptFunc: canAdopt, + }, + controllerKind: controllerKind, + rsControl: rsControl, + } +} + +// ClaimReplicaSets tries to take ownership of a list of ReplicaSets. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// A non-nil error is returned if some form of reconciliation was attemped and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of ReplicaSets that you now own is +// returned. +func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.ReplicaSet) ([]*extensions.ReplicaSet, error) { + var claimed []*extensions.ReplicaSet + var errlist []error + + match := func(obj metav1.Object) bool { + return m.selector.Matches(labels.Set(obj.GetLabels())) + } + adopt := func(obj metav1.Object) error { + return m.AdoptReplicaSet(obj.(*extensions.ReplicaSet)) + } + release := func(obj metav1.Object) error { + return m.ReleaseReplicaSet(obj.(*extensions.ReplicaSet)) + } + + for _, rs := range sets { + ok, err := m.claimObject(rs, match, adopt, release) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, rs) + } + } + return claimed, utilerrors.NewAggregate(errlist) +} + +// AdoptReplicaSet sends a patch to take control of the ReplicaSet. It returns +// the error if the patching fails. +func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(rs *extensions.ReplicaSet) error { + if err := m.canAdopt(); err != nil { + return fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err) + } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. + addControllerPatch := fmt.Sprintf( + `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, + m.controllerKind.GroupVersion(), m.controllerKind.Kind, + m.controller.GetName(), m.controller.GetUID(), rs.UID) + return m.rsControl.PatchReplicaSet(rs.Namespace, rs.Name, []byte(addControllerPatch)) +} + +// ReleaseReplicaSet sends a patch to free the ReplicaSet from the control of the Deployment controller. +// It returns the error if the patching fails. 404 and 422 errors are ignored. +func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *extensions.ReplicaSet) error { + glog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s", + replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), replicaSet.UID) + err := m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(deleteOwnerRefPatch)) + if err != nil { + if errors.IsNotFound(err) { + // If the ReplicaSet no longer exists, ignore it. + return nil + } + if errors.IsInvalid(err) { + // Invalid error will be returned in two cases: 1. the ReplicaSet + // has no owner reference, 2. the uid of the ReplicaSet doesn't + // match, which means the ReplicaSet is deleted and then recreated. + // In both cases, the error can be ignored. + return nil + } + } + return err +} + +// RecheckDeletionTimestamp returns a canAdopt() function to recheck deletion. +// +// The canAdopt() function calls getObject() to fetch the latest value, +// and denies adoption attempts if that object has a non-nil DeletionTimestamp. +func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error { + return func() error { + obj, err := getObject() + if err != nil { + return fmt.Errorf("can't recheck DeletionTimestamp: %v", err) + } + if obj.GetDeletionTimestamp() != nil { + return fmt.Errorf("%v/%v has just been deleted at %v", obj.GetNamespace(), obj.GetName(), obj.GetDeletionTimestamp()) + } + return nil + } +} + +// ControllerRevisionControllerRefManager is used to manage controllerRef of ControllerRevisions. +// Three methods are defined on this object 1: Classify 2: AdoptControllerRevision and +// 3: ReleaseControllerRevision which are used to classify the ControllerRevisions into appropriate +// categories and accordingly adopt or release them. See comments on these functions +// for more details. +type ControllerRevisionControllerRefManager struct { + baseControllerRefManager + controllerKind schema.GroupVersionKind + crControl ControllerRevisionControlInterface +} + +// NewControllerRevisionControllerRefManager returns a ControllerRevisionControllerRefManager that exposes +// methods to manage the controllerRef of ControllerRevisions. +// +// The canAdopt() function can be used to perform a potentially expensive check +// (such as a live GET from the API server) prior to the first adoption. +// It will only be called (at most once) if an adoption is actually attempted. +// If canAdopt() returns a non-nil error, all adoptions will fail. +// +// NOTE: Once canAdopt() is called, it will not be called again by the same +// ControllerRevisionControllerRefManager instance. Create a new instance if it +// makes sense to check canAdopt() again (e.g. in a different sync pass). +func NewControllerRevisionControllerRefManager( + crControl ControllerRevisionControlInterface, + controller metav1.Object, + selector labels.Selector, + controllerKind schema.GroupVersionKind, + canAdopt func() error, +) *ControllerRevisionControllerRefManager { + return &ControllerRevisionControllerRefManager{ + baseControllerRefManager: baseControllerRefManager{ + controller: controller, + selector: selector, + canAdoptFunc: canAdopt, + }, + controllerKind: controllerKind, + crControl: crControl, + } +} + +// ClaimControllerRevisions tries to take ownership of a list of ControllerRevisions. +// +// It will reconcile the following: +// * Adopt orphans if the selector matches. +// * Release owned objects if the selector no longer matches. +// +// A non-nil error is returned if some form of reconciliation was attemped and +// failed. Usually, controllers should try again later in case reconciliation +// is still needed. +// +// If the error is nil, either the reconciliation succeeded, or no +// reconciliation was necessary. The list of ControllerRevisions that you now own is +// returned. +func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(histories []*appsv1beta1.ControllerRevision) ([]*appsv1beta1.ControllerRevision, error) { + var claimed []*appsv1beta1.ControllerRevision + var errlist []error + + match := func(obj metav1.Object) bool { + return m.selector.Matches(labels.Set(obj.GetLabels())) + } + adopt := func(obj metav1.Object) error { + return m.AdoptControllerRevision(obj.(*appsv1beta1.ControllerRevision)) + } + release := func(obj metav1.Object) error { + return m.ReleaseControllerRevision(obj.(*appsv1beta1.ControllerRevision)) + } + + for _, h := range histories { + ok, err := m.claimObject(h, match, adopt, release) + if err != nil { + errlist = append(errlist, err) + continue + } + if ok { + claimed = append(claimed, h) + } + } + return claimed, utilerrors.NewAggregate(errlist) +} + +// AdoptControllerRevision sends a patch to take control of the ControllerRevision. It returns the error if +// the patching fails. +func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(history *appsv1beta1.ControllerRevision) error { + if err := m.canAdopt(); err != nil { + return fmt.Errorf("can't adopt ControllerRevision %v/%v (%v): %v", history.Namespace, history.Name, history.UID, err) + } + // Note that ValidateOwnerReferences() will reject this patch if another + // OwnerReference exists with controller=true. + addControllerPatch := fmt.Sprintf( + `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, + m.controllerKind.GroupVersion(), m.controllerKind.Kind, + m.controller.GetName(), m.controller.GetUID(), history.UID) + return m.crControl.PatchControllerRevision(history.Namespace, history.Name, []byte(addControllerPatch)) +} + +// ReleaseControllerRevision sends a patch to free the ControllerRevision from the control of its controller. +// It returns the error if the patching fails. 404 and 422 errors are ignored. +func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(history *appsv1beta1.ControllerRevision) error { + glog.V(2).Infof("patching ControllerRevision %s_%s to remove its controllerRef to %s/%s:%s", + history.Namespace, history.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName()) + deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), history.UID) + err := m.crControl.PatchControllerRevision(history.Namespace, history.Name, []byte(deleteOwnerRefPatch)) + if err != nil { + if errors.IsNotFound(err) { + // If the ControllerRevision no longer exists, ignore it. + return nil + } + if errors.IsInvalid(err) { + // Invalid error will be returned in two cases: 1. the ControllerRevision + // has no owner reference, 2. the uid of the ControllerRevision doesn't + // match, which means the ControllerRevision is deleted and then recreated. + // In both cases, the error can be ignored. + return nil + } + } + return err +} diff --git a/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go b/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go new file mode 100644 index 000000000..9f1a17767 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go @@ -0,0 +1,1018 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "hash/fnv" + "sync" + "sync/atomic" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/integer" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" + v1helper "k8s.io/kubernetes/pkg/api/v1/helper" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/api/v1/ref" + "k8s.io/kubernetes/pkg/api/validation" + extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + clientretry "k8s.io/kubernetes/pkg/client/retry" + hashutil "k8s.io/kubernetes/pkg/util/hash" + + "github.com/golang/glog" +) + +const ( + // If a watch drops a delete event for a pod, it'll take this long + // before a dormant controller waiting for those packets is woken up anyway. It is + // specifically targeted at the case where some problem prevents an update + // of expectations, without it the controller could stay asleep forever. This should + // be set based on the expected latency of watch events. + // + // Currently a controller can service (create *and* observe the watch events for said + // creation) about 10 pods a second, so it takes about 1 min to service + // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s + // latency/pod at the scale of 3000 pods over 100 nodes. + ExpectationsTimeout = 5 * time.Minute +) + +var UpdateTaintBackoff = wait.Backoff{ + Steps: 5, + Duration: 100 * time.Millisecond, + Jitter: 1.0, +} + +var ( + KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc +) + +type ResyncPeriodFunc func() time.Duration + +// Returns 0 for resyncPeriod in case resyncing is not needed. +func NoResyncPeriodFunc() time.Duration { + return 0 +} + +// StaticResyncPeriodFunc returns the resync period specified +func StaticResyncPeriodFunc(resyncPeriod time.Duration) ResyncPeriodFunc { + return func() time.Duration { + return resyncPeriod + } +} + +// Expectations are a way for controllers to tell the controller manager what they expect. eg: +// ControllerExpectations: { +// controller1: expects 2 adds in 2 minutes +// controller2: expects 2 dels in 2 minutes +// controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met +// } +// +// Implementation: +// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion +// ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller +// +// * Once set expectations can only be lowered +// * A controller isn't synced till its expectations are either fulfilled, or expire +// * Controllers that don't set expectations will get woken up for every matching controllee + +// ExpKeyFunc to parse out the key from a ControlleeExpectation +var ExpKeyFunc = func(obj interface{}) (string, error) { + if e, ok := obj.(*ControlleeExpectations); ok { + return e.key, nil + } + return "", fmt.Errorf("Could not find key for obj %#v", obj) +} + +// ControllerExpectationsInterface is an interface that allows users to set and wait on expectations. +// Only abstracted out for testing. +// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different +// types of controllers, because the keys might conflict across types. +type ControllerExpectationsInterface interface { + GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) + SatisfiedExpectations(controllerKey string) bool + DeleteExpectations(controllerKey string) + SetExpectations(controllerKey string, add, del int) error + ExpectCreations(controllerKey string, adds int) error + ExpectDeletions(controllerKey string, dels int) error + CreationObserved(controllerKey string) + DeletionObserved(controllerKey string) + RaiseExpectations(controllerKey string, add, del int) + LowerExpectations(controllerKey string, add, del int) +} + +// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync. +type ControllerExpectations struct { + cache.Store +} + +// GetExpectations returns the ControlleeExpectations of the given controller. +func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) { + if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists { + return exp.(*ControlleeExpectations), true, nil + } else { + return nil, false, err + } +} + +// DeleteExpectations deletes the expectations of the given controller from the TTLStore. +func (r *ControllerExpectations) DeleteExpectations(controllerKey string) { + if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists { + if err := r.Delete(exp); err != nil { + glog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err) + } + } +} + +// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed. +// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller +// manager. +func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool { + if exp, exists, err := r.GetExpectations(controllerKey); exists { + if exp.Fulfilled() { + glog.V(4).Infof("Controller expectations fulfilled %#v", exp) + return true + } else if exp.isExpired() { + glog.V(4).Infof("Controller expectations expired %#v", exp) + return true + } else { + glog.V(4).Infof("Controller still waiting on expectations %#v", exp) + return false + } + } else if err != nil { + glog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err) + } else { + // When a new controller is created, it doesn't have expectations. + // When it doesn't see expected watch events for > TTL, the expectations expire. + // - In this case it wakes up, creates/deletes controllees, and sets expectations again. + // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire. + // - In this case it continues without setting expectations till it needs to create/delete controllees. + glog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey) + } + // Trigger a sync if we either encountered and error (which shouldn't happen since we're + // getting from local store) or this controller hasn't established expectations. + return true +} + +// TODO: Extend ExpirationCache to support explicit expiration. +// TODO: Make this possible to disable in tests. +// TODO: Support injection of clock. +func (exp *ControlleeExpectations) isExpired() bool { + return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout +} + +// SetExpectations registers new expectations for the given controller. Forgets existing expectations. +func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error { + exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()} + glog.V(4).Infof("Setting expectations %#v", exp) + return r.Add(exp) +} + +func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error { + return r.SetExpectations(controllerKey, adds, 0) +} + +func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error { + return r.SetExpectations(controllerKey, 0, dels) +} + +// Decrements the expectation counts of the given controller. +func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) { + if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { + exp.Add(int64(-add), int64(-del)) + // The expectations might've been modified since the update on the previous line. + glog.V(4).Infof("Lowered expectations %#v", exp) + } +} + +// Increments the expectation counts of the given controller. +func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) { + if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { + exp.Add(int64(add), int64(del)) + // The expectations might've been modified since the update on the previous line. + glog.V(4).Infof("Raised expectations %#v", exp) + } +} + +// CreationObserved atomically decrements the `add` expectation count of the given controller. +func (r *ControllerExpectations) CreationObserved(controllerKey string) { + r.LowerExpectations(controllerKey, 1, 0) +} + +// DeletionObserved atomically decrements the `del` expectation count of the given controller. +func (r *ControllerExpectations) DeletionObserved(controllerKey string) { + r.LowerExpectations(controllerKey, 0, 1) +} + +// Expectations are either fulfilled, or expire naturally. +type Expectations interface { + Fulfilled() bool +} + +// ControlleeExpectations track controllee creates/deletes. +type ControlleeExpectations struct { + // Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms + // See: https://golang.org/pkg/sync/atomic/ for more information + add int64 + del int64 + key string + timestamp time.Time +} + +// Add increments the add and del counters. +func (e *ControlleeExpectations) Add(add, del int64) { + atomic.AddInt64(&e.add, add) + atomic.AddInt64(&e.del, del) +} + +// Fulfilled returns true if this expectation has been fulfilled. +func (e *ControlleeExpectations) Fulfilled() bool { + // TODO: think about why this line being atomic doesn't matter + return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0 +} + +// GetExpectations returns the add and del expectations of the controllee. +func (e *ControlleeExpectations) GetExpectations() (int64, int64) { + return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del) +} + +// NewControllerExpectations returns a store for ControllerExpectations. +func NewControllerExpectations() *ControllerExpectations { + return &ControllerExpectations{cache.NewStore(ExpKeyFunc)} +} + +// UIDSetKeyFunc to parse out the key from a UIDSet. +var UIDSetKeyFunc = func(obj interface{}) (string, error) { + if u, ok := obj.(*UIDSet); ok { + return u.key, nil + } + return "", fmt.Errorf("Could not find key for obj %#v", obj) +} + +// UIDSet holds a key and a set of UIDs. Used by the +// UIDTrackingControllerExpectations to remember which UID it has seen/still +// waiting for. +type UIDSet struct { + sets.String + key string +} + +// UIDTrackingControllerExpectations tracks the UID of the pods it deletes. +// This cache is needed over plain old expectations to safely handle graceful +// deletion. The desired behavior is to treat an update that sets the +// DeletionTimestamp on an object as a delete. To do so consistently, one needs +// to remember the expected deletes so they aren't double counted. +// TODO: Track creates as well (#22599) +type UIDTrackingControllerExpectations struct { + ControllerExpectationsInterface + // TODO: There is a much nicer way to do this that involves a single store, + // a lock per entry, and a ControlleeExpectationsInterface type. + uidStoreLock sync.Mutex + // Store used for the UIDs associated with any expectation tracked via the + // ControllerExpectationsInterface. + uidStore cache.Store +} + +// GetUIDs is a convenience method to avoid exposing the set of expected uids. +// The returned set is not thread safe, all modifications must be made holding +// the uidStoreLock. +func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.String { + if uid, exists, err := u.uidStore.GetByKey(controllerKey); err == nil && exists { + return uid.(*UIDSet).String + } + return nil +} + +// ExpectDeletions records expectations for the given deleteKeys, against the given controller. +func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error { + u.uidStoreLock.Lock() + defer u.uidStoreLock.Unlock() + + if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 { + glog.Errorf("Clobbering existing delete keys: %+v", existing) + } + expectedUIDs := sets.NewString() + for _, k := range deletedKeys { + expectedUIDs.Insert(k) + } + glog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys) + if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil { + return err + } + return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len()) +} + +// DeletionObserved records the given deleteKey as a deletion, for the given rc. +func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) { + u.uidStoreLock.Lock() + defer u.uidStoreLock.Unlock() + + uids := u.GetUIDs(rcKey) + if uids != nil && uids.Has(deleteKey) { + glog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey) + u.ControllerExpectationsInterface.DeletionObserved(rcKey) + uids.Delete(deleteKey) + } +} + +// DeleteExpectations deletes the UID set and invokes DeleteExpectations on the +// underlying ControllerExpectationsInterface. +func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) { + u.uidStoreLock.Lock() + defer u.uidStoreLock.Unlock() + + u.ControllerExpectationsInterface.DeleteExpectations(rcKey) + if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists { + if err := u.uidStore.Delete(uidExp); err != nil { + glog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err) + } + } +} + +// NewUIDTrackingControllerExpectations returns a wrapper around +// ControllerExpectations that is aware of deleteKeys. +func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations { + return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)} +} + +// Reasons for pod events +const ( + // FailedCreatePodReason is added in an event and in a replica set condition + // when a pod for a replica set is failed to be created. + FailedCreatePodReason = "FailedCreate" + // SuccessfulCreatePodReason is added in an event when a pod for a replica set + // is successfully created. + SuccessfulCreatePodReason = "SuccessfulCreate" + // FailedDeletePodReason is added in an event and in a replica set condition + // when a pod for a replica set is failed to be deleted. + FailedDeletePodReason = "FailedDelete" + // SuccessfulDeletePodReason is added in an event when a pod for a replica set + // is successfully deleted. + SuccessfulDeletePodReason = "SuccessfulDelete" +) + +// RSControlInterface is an interface that knows how to add or delete +// ReplicaSets, as well as increment or decrement them. It is used +// by the deployment controller to ease testing of actions that it takes. +type RSControlInterface interface { + PatchReplicaSet(namespace, name string, data []byte) error +} + +// RealRSControl is the default implementation of RSControllerInterface. +type RealRSControl struct { + KubeClient clientset.Interface + Recorder record.EventRecorder +} + +var _ RSControlInterface = &RealRSControl{} + +func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) error { + _, err := r.KubeClient.Extensions().ReplicaSets(namespace).Patch(name, types.StrategicMergePatchType, data) + return err +} + +// TODO: merge the controller revision interface in controller_history.go with this one +// ControllerRevisionControlInterface is an interface that knows how to patch +// ControllerRevisions, as well as increment or decrement them. It is used +// by the daemonset controller to ease testing of actions that it takes. +type ControllerRevisionControlInterface interface { + PatchControllerRevision(namespace, name string, data []byte) error +} + +// RealControllerRevisionControl is the default implementation of ControllerRevisionControlInterface. +type RealControllerRevisionControl struct { + KubeClient clientset.Interface +} + +var _ ControllerRevisionControlInterface = &RealControllerRevisionControl{} + +func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name string, data []byte) error { + _, err := r.KubeClient.AppsV1beta1().ControllerRevisions(namespace).Patch(name, types.StrategicMergePatchType, data) + return err +} + +// PodControlInterface is an interface that knows how to add or delete pods +// created as an interface to allow testing. +type PodControlInterface interface { + // CreatePods creates new pods according to the spec. + CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error + // CreatePodsOnNode creates a new pod according to the spec on the specified node, + // and sets the ControllerRef. + CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller. + CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + // DeletePod deletes the pod identified by podID. + DeletePod(namespace string, podID string, object runtime.Object) error + // PatchPod patches the pod. + PatchPod(namespace, name string, data []byte) error +} + +// RealPodControl is the default implementation of PodControlInterface. +type RealPodControl struct { + KubeClient clientset.Interface + Recorder record.EventRecorder +} + +var _ PodControlInterface = &RealPodControl{} + +func getPodsLabelSet(template *v1.PodTemplateSpec) labels.Set { + desiredLabels := make(labels.Set) + for k, v := range template.Labels { + desiredLabels[k] = v + } + return desiredLabels +} + +func getPodsFinalizers(template *v1.PodTemplateSpec) []string { + desiredFinalizers := make([]string, len(template.Finalizers)) + copy(desiredFinalizers, template.Finalizers) + return desiredFinalizers +} + +func getPodsAnnotationSet(template *v1.PodTemplateSpec, object runtime.Object) (labels.Set, error) { + desiredAnnotations := make(labels.Set) + for k, v := range template.Annotations { + desiredAnnotations[k] = v + } + createdByRef, err := ref.GetReference(api.Scheme, object) + if err != nil { + return desiredAnnotations, fmt.Errorf("unable to get controller reference: %v", err) + } + + // TODO: this code was not safe previously - as soon as new code came along that switched to v2, old clients + // would be broken upon reading it. This is explicitly hardcoded to v1 to guarantee predictable deployment. + // We need to consistently handle this case of annotation versioning. + codec := api.Codecs.LegacyCodec(schema.GroupVersion{Group: v1.GroupName, Version: "v1"}) + + createdByRefJson, err := runtime.Encode(codec, &v1.SerializedReference{ + Reference: *createdByRef, + }) + if err != nil { + return desiredAnnotations, fmt.Errorf("unable to serialize controller reference: %v", err) + } + desiredAnnotations[v1.CreatedByAnnotation] = string(createdByRefJson) + return desiredAnnotations, nil +} + +func getPodsPrefix(controllerName string) string { + // use the dash (if the name isn't too long) to make the pod name a bit prettier + prefix := fmt.Sprintf("%s-", controllerName) + if len(validation.ValidatePodName(prefix, true)) != 0 { + prefix = controllerName + } + return prefix +} + +func validateControllerRef(controllerRef *metav1.OwnerReference) error { + if controllerRef == nil { + return fmt.Errorf("controllerRef is nil") + } + if len(controllerRef.APIVersion) == 0 { + return fmt.Errorf("controllerRef has empty APIVersion") + } + if len(controllerRef.Kind) == 0 { + return fmt.Errorf("controllerRef has empty Kind") + } + if controllerRef.Controller == nil || *controllerRef.Controller != true { + return fmt.Errorf("controllerRef.Controller is not set to true") + } + if controllerRef.BlockOwnerDeletion == nil || *controllerRef.BlockOwnerDeletion != true { + return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set") + } + return nil +} + +func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { + return r.createPods("", namespace, template, object, nil) +} + +func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { + if err := validateControllerRef(controllerRef); err != nil { + return err + } + return r.createPods("", namespace, template, controllerObject, controllerRef) +} + +func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + if err := validateControllerRef(controllerRef); err != nil { + return err + } + return r.createPods(nodeName, namespace, template, object, controllerRef) +} + +func (r RealPodControl) PatchPod(namespace, name string, data []byte) error { + _, err := r.KubeClient.Core().Pods(namespace).Patch(name, types.StrategicMergePatchType, data) + return err +} + +func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Object, controllerRef *metav1.OwnerReference) (*v1.Pod, error) { + desiredLabels := getPodsLabelSet(template) + desiredFinalizers := getPodsFinalizers(template) + desiredAnnotations, err := getPodsAnnotationSet(template, parentObject) + if err != nil { + return nil, err + } + accessor, err := meta.Accessor(parentObject) + if err != nil { + return nil, fmt.Errorf("parentObject does not have ObjectMeta, %v", err) + } + prefix := getPodsPrefix(accessor.GetName()) + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: desiredLabels, + Annotations: desiredAnnotations, + GenerateName: prefix, + Finalizers: desiredFinalizers, + }, + } + if controllerRef != nil { + pod.OwnerReferences = append(pod.OwnerReferences, *controllerRef) + } + clone, err := api.Scheme.DeepCopy(&template.Spec) + if err != nil { + return nil, err + } + pod.Spec = *clone.(*v1.PodSpec) + return pod, nil +} + +func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + pod, err := GetPodFromTemplate(template, object, controllerRef) + if err != nil { + return err + } + if len(nodeName) != 0 { + pod.Spec.NodeName = nodeName + } + if labels.Set(pod.Labels).AsSelectorPreValidated().Empty() { + return fmt.Errorf("unable to create pods, no labels") + } + if newPod, err := r.KubeClient.Core().Pods(namespace).Create(pod); err != nil { + r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err) + return fmt.Errorf("unable to create pods: %v", err) + } else { + accessor, err := meta.Accessor(object) + if err != nil { + glog.Errorf("parentObject does not have ObjectMeta, %v", err) + return nil + } + glog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name) + r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name) + } + return nil +} + +func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error { + accessor, err := meta.Accessor(object) + if err != nil { + return fmt.Errorf("object does not have ObjectMeta, %v", err) + } + glog.V(2).Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID) + if err := r.KubeClient.Core().Pods(namespace).Delete(podID, nil); err != nil { + r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err) + return fmt.Errorf("unable to delete pods: %v", err) + } else { + r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID) + } + return nil +} + +type FakePodControl struct { + sync.Mutex + Templates []v1.PodTemplateSpec + ControllerRefs []metav1.OwnerReference + DeletePodName []string + Patches [][]byte + Err error +} + +var _ PodControlInterface = &FakePodControl{} + +func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error { + f.Lock() + defer f.Unlock() + f.Patches = append(f.Patches, data) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error { + f.Lock() + defer f.Unlock() + f.Templates = append(f.Templates, *spec) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + f.Lock() + defer f.Unlock() + f.Templates = append(f.Templates, *spec) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + f.Lock() + defer f.Unlock() + f.Templates = append(f.Templates, *template) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { + f.Lock() + defer f.Unlock() + f.DeletePodName = append(f.DeletePodName, podID) + if f.Err != nil { + return f.Err + } + return nil +} + +func (f *FakePodControl) Clear() { + f.Lock() + defer f.Unlock() + f.DeletePodName = []string{} + f.Templates = []v1.PodTemplateSpec{} + f.ControllerRefs = []metav1.OwnerReference{} + f.Patches = [][]byte{} +} + +// ByLogging allows custom sorting of pods so the best one can be picked for getting its logs. +type ByLogging []*v1.Pod + +func (s ByLogging) Len() int { return len(s) } +func (s ByLogging) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s ByLogging) Less(i, j int) bool { + // 1. assigned < unassigned + if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) { + return len(s[i].Spec.NodeName) > 0 + } + // 2. PodRunning < PodUnknown < PodPending + m := map[v1.PodPhase]int{v1.PodRunning: 0, v1.PodUnknown: 1, v1.PodPending: 2} + if m[s[i].Status.Phase] != m[s[j].Status.Phase] { + return m[s[i].Status.Phase] < m[s[j].Status.Phase] + } + // 3. ready < not ready + if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) { + return podutil.IsPodReady(s[i]) + } + // TODO: take availability into account when we push minReadySeconds information from deployment into pods, + // see https://github.com/kubernetes/kubernetes/issues/22065 + // 4. Been ready for more time < less time < empty time + if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) { + return afterOrZero(podReadyTime(s[j]), podReadyTime(s[i])) + } + // 5. Pods with containers with higher restart counts < lower restart counts + if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) { + return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j]) + } + // 6. older pods < newer pods < empty timestamp pods + if !s[i].CreationTimestamp.Equal(s[j].CreationTimestamp) { + return afterOrZero(s[j].CreationTimestamp, s[i].CreationTimestamp) + } + return false +} + +// ActivePods type allows custom sorting of pods so a controller can pick the best ones to delete. +type ActivePods []*v1.Pod + +func (s ActivePods) Len() int { return len(s) } +func (s ActivePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s ActivePods) Less(i, j int) bool { + // 1. Unassigned < assigned + // If only one of the pods is unassigned, the unassigned one is smaller + if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) { + return len(s[i].Spec.NodeName) == 0 + } + // 2. PodPending < PodUnknown < PodRunning + m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2} + if m[s[i].Status.Phase] != m[s[j].Status.Phase] { + return m[s[i].Status.Phase] < m[s[j].Status.Phase] + } + // 3. Not ready < ready + // If only one of the pods is not ready, the not ready one is smaller + if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) { + return !podutil.IsPodReady(s[i]) + } + // TODO: take availability into account when we push minReadySeconds information from deployment into pods, + // see https://github.com/kubernetes/kubernetes/issues/22065 + // 4. Been ready for empty time < less time < more time + // If both pods are ready, the latest ready one is smaller + if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) { + return afterOrZero(podReadyTime(s[i]), podReadyTime(s[j])) + } + // 5. Pods with containers with higher restart counts < lower restart counts + if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) { + return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j]) + } + // 6. Empty creation time pods < newer pods < older pods + if !s[i].CreationTimestamp.Equal(s[j].CreationTimestamp) { + return afterOrZero(s[i].CreationTimestamp, s[j].CreationTimestamp) + } + return false +} + +// afterOrZero checks if time t1 is after time t2; if one of them +// is zero, the zero time is seen as after non-zero time. +func afterOrZero(t1, t2 metav1.Time) bool { + if t1.Time.IsZero() || t2.Time.IsZero() { + return t1.Time.IsZero() + } + return t1.After(t2.Time) +} + +func podReadyTime(pod *v1.Pod) metav1.Time { + if podutil.IsPodReady(pod) { + for _, c := range pod.Status.Conditions { + // we only care about pod ready conditions + if c.Type == v1.PodReady && c.Status == v1.ConditionTrue { + return c.LastTransitionTime + } + } + } + return metav1.Time{} +} + +func maxContainerRestarts(pod *v1.Pod) int { + maxRestarts := 0 + for _, c := range pod.Status.ContainerStatuses { + maxRestarts = integer.IntMax(maxRestarts, int(c.RestartCount)) + } + return maxRestarts +} + +// FilterActivePods returns pods that have not terminated. +func FilterActivePods(pods []*v1.Pod) []*v1.Pod { + var result []*v1.Pod + for _, p := range pods { + if IsPodActive(p) { + result = append(result, p) + } else { + glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", + p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp) + } + } + return result +} + +func IsPodActive(p *v1.Pod) bool { + return v1.PodSucceeded != p.Status.Phase && + v1.PodFailed != p.Status.Phase && + p.DeletionTimestamp == nil +} + +// FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods. +func FilterActiveReplicaSets(replicaSets []*extensions.ReplicaSet) []*extensions.ReplicaSet { + activeFilter := func(rs *extensions.ReplicaSet) bool { + return rs != nil && *(rs.Spec.Replicas) > 0 + } + return FilterReplicaSets(replicaSets, activeFilter) +} + +type filterRS func(rs *extensions.ReplicaSet) bool + +// FilterReplicaSets returns replica sets that are filtered by filterFn (all returned ones should match filterFn). +func FilterReplicaSets(RSes []*extensions.ReplicaSet, filterFn filterRS) []*extensions.ReplicaSet { + var filtered []*extensions.ReplicaSet + for i := range RSes { + if filterFn(RSes[i]) { + filtered = append(filtered, RSes[i]) + } + } + return filtered +} + +// PodKey returns a key unique to the given pod within a cluster. +// It's used so we consistently use the same key scheme in this module. +// It does exactly what cache.MetaNamespaceKeyFunc would have done +// except there's not possibility for error since we know the exact type. +func PodKey(pod *v1.Pod) string { + return fmt.Sprintf("%v/%v", pod.Namespace, pod.Name) +} + +// ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker. +type ControllersByCreationTimestamp []*v1.ReplicationController + +func (o ControllersByCreationTimestamp) Len() int { return len(o) } +func (o ControllersByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ControllersByCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +} + +// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker. +type ReplicaSetsByCreationTimestamp []*extensions.ReplicaSet + +func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) } +func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +} + +// ReplicaSetsBySizeOlder sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker. +// By using the creation timestamp, this sorts from old to new replica sets. +type ReplicaSetsBySizeOlder []*extensions.ReplicaSet + +func (o ReplicaSetsBySizeOlder) Len() int { return len(o) } +func (o ReplicaSetsBySizeOlder) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ReplicaSetsBySizeOlder) Less(i, j int) bool { + if *(o[i].Spec.Replicas) == *(o[j].Spec.Replicas) { + return ReplicaSetsByCreationTimestamp(o).Less(i, j) + } + return *(o[i].Spec.Replicas) > *(o[j].Spec.Replicas) +} + +// ReplicaSetsBySizeNewer sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker. +// By using the creation timestamp, this sorts from new to old replica sets. +type ReplicaSetsBySizeNewer []*extensions.ReplicaSet + +func (o ReplicaSetsBySizeNewer) Len() int { return len(o) } +func (o ReplicaSetsBySizeNewer) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ReplicaSetsBySizeNewer) Less(i, j int) bool { + if *(o[i].Spec.Replicas) == *(o[j].Spec.Replicas) { + return ReplicaSetsByCreationTimestamp(o).Less(j, i) + } + return *(o[i].Spec.Replicas) > *(o[j].Spec.Replicas) +} + +func AddOrUpdateTaintOnNode(c clientset.Interface, nodeName string, taint *v1.Taint) error { + firstTry := true + return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error { + var err error + var oldNode *v1.Node + // First we try getting node from the API server cache, as it's cheaper. If it fails + // we get it from etcd to be sure to have fresh data. + if firstTry { + oldNode, err = c.Core().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"}) + firstTry = false + } else { + oldNode, err = c.Core().Nodes().Get(nodeName, metav1.GetOptions{}) + } + if err != nil { + return err + } + newNode, ok, err := v1helper.AddOrUpdateTaint(oldNode, taint) + if err != nil { + return fmt.Errorf("Failed to update taint annotation!") + } + if !ok { + return nil + } + return PatchNodeTaints(c, nodeName, oldNode, newNode) + }) +} + +// RemoveTaintOffNode is for cleaning up taints temporarily added to node, +// won't fail if target taint doesn't exist or has been removed. +// If passed a node it'll check if there's anything to be done, if taint is not present it won't issue +// any API calls. +func RemoveTaintOffNode(c clientset.Interface, nodeName string, taint *v1.Taint, node *v1.Node) error { + // Short circuit for limiting amount of API calls. + if node != nil { + match := false + for i := range node.Spec.Taints { + if node.Spec.Taints[i].MatchTaint(taint) { + match = true + break + } + } + if !match { + return nil + } + } + firstTry := true + return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error { + var err error + var oldNode *v1.Node + // First we try getting node from the API server cache, as it's cheaper. If it fails + // we get it from etcd to be sure to have fresh data. + if firstTry { + oldNode, err = c.Core().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"}) + firstTry = false + } else { + oldNode, err = c.Core().Nodes().Get(nodeName, metav1.GetOptions{}) + } + if err != nil { + return err + } + newNode, ok, err := v1helper.RemoveTaint(oldNode, taint) + if err != nil { + return fmt.Errorf("Failed to update taint annotation!") + } + if !ok { + return nil + } + return PatchNodeTaints(c, nodeName, oldNode, newNode) + }) +} + +// PatchNodeTaints patches node's taints. +func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error { + oldData, err := json.Marshal(oldNode) + if err != nil { + return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) + } + + newTaints := newNode.Spec.Taints + objCopy, err := api.Scheme.DeepCopy(oldNode) + if err != nil { + return fmt.Errorf("failed to copy node object %#v: %v", oldNode, err) + } + newNode, ok := (objCopy).(*v1.Node) + if !ok { + return fmt.Errorf("failed to cast copy onto node object %#v: %v", newNode, err) + } + newNode.Spec.Taints = newTaints + newData, err := json.Marshal(newNode) + if err != nil { + return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNode, nodeName, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + if err != nil { + return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) + } + + _, err = c.Core().Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes) + return err +} + +// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages +// indicating that the controller identified by controllerName is waiting for syncs, followed by +// either a successful or failed sync. +func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool { + glog.Infof("Waiting for caches to sync for %s controller", controllerName) + + if !cache.WaitForCacheSync(stopCh, cacheSyncs...) { + utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName)) + return false + } + + glog.Infof("Caches are synced for %s controller", controllerName) + return true +} + +// ComputeHash returns a hash value calculated from pod template and a collisionCount to avoid hash collision +func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int64) uint32 { + podTemplateSpecHasher := fnv.New32a() + hashutil.DeepHashObject(podTemplateSpecHasher, *template) + + // Add collisionCount in the hash if it exists. + if collisionCount != nil { + collisionCountBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(collisionCountBytes, uint64(*collisionCount)) + podTemplateSpecHasher.Write(collisionCountBytes) + } + + return podTemplateSpecHasher.Sum32() +} diff --git a/vendor/k8s.io/kubernetes/pkg/controller/doc.go b/vendor/k8s.io/kubernetes/pkg/controller/doc.go new file mode 100644 index 000000000..3c5c943da --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/controller/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package controller contains code for controllers (like the replication +// controller). +package controller // import "k8s.io/kubernetes/pkg/controller" diff --git a/vendor/k8s.io/kubernetes/pkg/controller/lookup_cache.go b/vendor/k8s.io/kubernetes/pkg/controller/lookup_cache.go new file mode 100644 index 000000000..160aa6e08 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/controller/lookup_cache.go @@ -0,0 +1,92 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "hash/fnv" + "sync" + + "github.com/golang/groupcache/lru" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + hashutil "k8s.io/kubernetes/pkg/util/hash" +) + +type objectWithMeta interface { + metav1.Object +} + +// keyFunc returns the key of an object, which is used to look up in the cache for it's matching object. +// Since we match objects by namespace and Labels/Selector, so if two objects have the same namespace and labels, +// they will have the same key. +func keyFunc(obj objectWithMeta) uint64 { + hash := fnv.New32a() + hashutil.DeepHashObject(hash, &equivalenceLabelObj{ + namespace: obj.GetNamespace(), + labels: obj.GetLabels(), + }) + return uint64(hash.Sum32()) +} + +type equivalenceLabelObj struct { + namespace string + labels map[string]string +} + +// MatchingCache save label and selector matching relationship +type MatchingCache struct { + mutex sync.RWMutex + cache *lru.Cache +} + +// NewMatchingCache return a NewMatchingCache, which save label and selector matching relationship. +func NewMatchingCache(maxCacheEntries int) *MatchingCache { + return &MatchingCache{ + cache: lru.New(maxCacheEntries), + } +} + +// Add will add matching information to the cache. +func (c *MatchingCache) Add(labelObj objectWithMeta, selectorObj objectWithMeta) { + key := keyFunc(labelObj) + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache.Add(key, selectorObj) +} + +// GetMatchingObject lookup the matching object for a given object. +// Note: the cache information may be invalid since the controller may be deleted or updated, +// we need check in the external request to ensure the cache data is not dirty. +func (c *MatchingCache) GetMatchingObject(labelObj objectWithMeta) (controller interface{}, exists bool) { + key := keyFunc(labelObj) + // NOTE: we use Lock() instead of RLock() here because lru's Get() method also modifies state( + // it need update the least recently usage information). So we can not call it concurrently. + c.mutex.Lock() + defer c.mutex.Unlock() + return c.cache.Get(key) +} + +// Update update the cached matching information. +func (c *MatchingCache) Update(labelObj objectWithMeta, selectorObj objectWithMeta) { + c.Add(labelObj, selectorObj) +} + +// InvalidateAll invalidate the whole cache. +func (c *MatchingCache) InvalidateAll() { + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache = lru.New(c.cache.MaxEntries) +} |