summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/kubernetes/pkg/controller
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/controller')
-rw-r--r--vendor/k8s.io/kubernetes/pkg/controller/client_builder.go292
-rw-r--r--vendor/k8s.io/kubernetes/pkg/controller/controller_ref_manager.go515
-rw-r--r--vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go1018
-rw-r--r--vendor/k8s.io/kubernetes/pkg/controller/doc.go19
-rw-r--r--vendor/k8s.io/kubernetes/pkg/controller/lookup_cache.go92
5 files changed, 1936 insertions, 0 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/controller/client_builder.go b/vendor/k8s.io/kubernetes/pkg/controller/client_builder.go
new file mode 100644
index 000000000..491366288
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/controller/client_builder.go
@@ -0,0 +1,292 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "fmt"
+ "time"
+
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ apiserverserviceaccount "k8s.io/apiserver/pkg/authentication/serviceaccount"
+ clientgoclientset "k8s.io/client-go/kubernetes"
+ restclient "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/api/v1"
+ v1authenticationapi "k8s.io/kubernetes/pkg/apis/authentication/v1"
+ "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
+ v1authentication "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/authentication/v1"
+ v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
+ "k8s.io/kubernetes/pkg/serviceaccount"
+
+ "github.com/golang/glog"
+)
+
+// ControllerClientBuilder allows you to get clients and configs for controllers
+type ControllerClientBuilder interface {
+ Config(name string) (*restclient.Config, error)
+ ConfigOrDie(name string) *restclient.Config
+ Client(name string) (clientset.Interface, error)
+ ClientOrDie(name string) clientset.Interface
+ ClientGoClient(name string) (clientgoclientset.Interface, error)
+ ClientGoClientOrDie(name string) clientgoclientset.Interface
+}
+
+// SimpleControllerClientBuilder returns a fixed client with different user agents
+type SimpleControllerClientBuilder struct {
+ // ClientConfig is a skeleton config to clone and use as the basis for each controller client
+ ClientConfig *restclient.Config
+}
+
+func (b SimpleControllerClientBuilder) Config(name string) (*restclient.Config, error) {
+ clientConfig := *b.ClientConfig
+ return restclient.AddUserAgent(&clientConfig, name), nil
+}
+
+func (b SimpleControllerClientBuilder) ConfigOrDie(name string) *restclient.Config {
+ clientConfig, err := b.Config(name)
+ if err != nil {
+ glog.Fatal(err)
+ }
+ return clientConfig
+}
+
+func (b SimpleControllerClientBuilder) Client(name string) (clientset.Interface, error) {
+ clientConfig, err := b.Config(name)
+ if err != nil {
+ return nil, err
+ }
+ return clientset.NewForConfig(clientConfig)
+}
+
+func (b SimpleControllerClientBuilder) ClientOrDie(name string) clientset.Interface {
+ client, err := b.Client(name)
+ if err != nil {
+ glog.Fatal(err)
+ }
+ return client
+}
+
+func (b SimpleControllerClientBuilder) ClientGoClient(name string) (clientgoclientset.Interface, error) {
+ clientConfig, err := b.Config(name)
+ if err != nil {
+ return nil, err
+ }
+ return clientgoclientset.NewForConfig(clientConfig)
+}
+
+func (b SimpleControllerClientBuilder) ClientGoClientOrDie(name string) clientgoclientset.Interface {
+ client, err := b.ClientGoClient(name)
+ if err != nil {
+ glog.Fatal(err)
+ }
+ return client
+}
+
+// SAControllerClientBuilder is a ControllerClientBuilder that returns clients identifying as
+// service accounts
+type SAControllerClientBuilder struct {
+ // ClientConfig is a skeleton config to clone and use as the basis for each controller client
+ ClientConfig *restclient.Config
+
+ // CoreClient is used to provision service accounts if needed and watch for their associated tokens
+ // to construct a controller client
+ CoreClient v1core.CoreV1Interface
+
+ // AuthenticationClient is used to check API tokens to make sure they are valid before
+ // building a controller client from them
+ AuthenticationClient v1authentication.AuthenticationV1Interface
+
+ // Namespace is the namespace used to host the service accounts that will back the
+ // controllers. It must be highly privileged namespace which normal users cannot inspect.
+ Namespace string
+}
+
+// config returns a complete clientConfig for constructing clients. This is separate in anticipation of composition
+// which means that not all clientsets are known here
+func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, error) {
+ sa, err := b.getOrCreateServiceAccount(name)
+ if err != nil {
+ return nil, err
+ }
+
+ var clientConfig *restclient.Config
+
+ lw := &cache.ListWatch{
+ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
+ options.FieldSelector = fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(v1.SecretTypeServiceAccountToken)}).String()
+ return b.CoreClient.Secrets(b.Namespace).List(options)
+ },
+ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
+ options.FieldSelector = fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(v1.SecretTypeServiceAccountToken)}).String()
+ return b.CoreClient.Secrets(b.Namespace).Watch(options)
+ },
+ }
+ _, err = cache.ListWatchUntil(30*time.Second, lw,
+ func(event watch.Event) (bool, error) {
+ switch event.Type {
+ case watch.Deleted:
+ return false, nil
+ case watch.Error:
+ return false, fmt.Errorf("error watching")
+
+ case watch.Added, watch.Modified:
+ secret, ok := event.Object.(*v1.Secret)
+ if !ok {
+ return false, fmt.Errorf("unexpected object type: %T", event.Object)
+ }
+ if !serviceaccount.IsServiceAccountToken(secret, sa) {
+ return false, nil
+ }
+ if len(secret.Data[v1.ServiceAccountTokenKey]) == 0 {
+ return false, nil
+ }
+ validConfig, valid, err := b.getAuthenticatedConfig(sa, string(secret.Data[v1.ServiceAccountTokenKey]))
+ if err != nil {
+ glog.Warningf("error validating API token for %s/%s in secret %s: %v", sa.Name, sa.Namespace, secret.Name, err)
+ // continue watching for good tokens
+ return false, nil
+ }
+ if !valid {
+ glog.Warningf("secret %s contained an invalid API token for %s/%s", secret.Name, sa.Name, sa.Namespace)
+ // try to delete the secret containing the invalid token
+ if err := b.CoreClient.Secrets(secret.Namespace).Delete(secret.Name, &metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
+ glog.Warningf("error deleting secret %s containing invalid API token for %s/%s: %v", secret.Name, sa.Name, sa.Namespace, err)
+ }
+ // continue watching for good tokens
+ return false, nil
+ }
+ clientConfig = validConfig
+ return true, nil
+
+ default:
+ return false, fmt.Errorf("unexpected event type: %v", event.Type)
+ }
+ })
+ if err != nil {
+ return nil, fmt.Errorf("unable to get token for service account: %v", err)
+ }
+
+ return clientConfig, nil
+}
+
+func (b SAControllerClientBuilder) getOrCreateServiceAccount(name string) (*v1.ServiceAccount, error) {
+ sa, err := b.CoreClient.ServiceAccounts(b.Namespace).Get(name, metav1.GetOptions{})
+ if err == nil {
+ return sa, nil
+ }
+ if !apierrors.IsNotFound(err) {
+ return nil, err
+ }
+
+ // Create the namespace if we can't verify it exists.
+ // Tolerate errors, since we don't know whether this component has namespace creation permissions.
+ if _, err := b.CoreClient.Namespaces().Get(b.Namespace, metav1.GetOptions{}); err != nil {
+ b.CoreClient.Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: b.Namespace}})
+ }
+
+ // Create the service account
+ sa, err = b.CoreClient.ServiceAccounts(b.Namespace).Create(&v1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: b.Namespace, Name: name}})
+ if apierrors.IsAlreadyExists(err) {
+ // If we're racing to init and someone else already created it, re-fetch
+ return b.CoreClient.ServiceAccounts(b.Namespace).Get(name, metav1.GetOptions{})
+ }
+ return sa, err
+}
+
+func (b SAControllerClientBuilder) getAuthenticatedConfig(sa *v1.ServiceAccount, token string) (*restclient.Config, bool, error) {
+ username := apiserverserviceaccount.MakeUsername(sa.Namespace, sa.Name)
+
+ clientConfig := restclient.AnonymousClientConfig(b.ClientConfig)
+ clientConfig.BearerToken = token
+ restclient.AddUserAgent(clientConfig, username)
+
+ // Try token review first
+ tokenReview := &v1authenticationapi.TokenReview{Spec: v1authenticationapi.TokenReviewSpec{Token: token}}
+ if tokenResult, err := b.AuthenticationClient.TokenReviews().Create(tokenReview); err == nil {
+ if !tokenResult.Status.Authenticated {
+ glog.Warningf("Token for %s/%s did not authenticate correctly", sa.Name, sa.Namespace)
+ return nil, false, nil
+ }
+ if tokenResult.Status.User.Username != username {
+ glog.Warningf("Token for %s/%s authenticated as unexpected username: %s", sa.Name, sa.Namespace, tokenResult.Status.User.Username)
+ return nil, false, nil
+ }
+ glog.V(4).Infof("Verified credential for %s/%s", sa.Name, sa.Namespace)
+ return clientConfig, true, nil
+ }
+
+ // If we couldn't run the token review, the API might be disabled or we might not have permission.
+ // Try to make a request to /apis with the token. If we get a 401 we should consider the token invalid.
+ clientConfigCopy := *clientConfig
+ clientConfigCopy.NegotiatedSerializer = api.Codecs
+ client, err := restclient.UnversionedRESTClientFor(&clientConfigCopy)
+ if err != nil {
+ return nil, false, err
+ }
+ err = client.Get().AbsPath("/apis").Do().Error()
+ if apierrors.IsUnauthorized(err) {
+ glog.Warningf("Token for %s/%s did not authenticate correctly: %v", sa.Name, sa.Namespace, err)
+ return nil, false, nil
+ }
+
+ return clientConfig, true, nil
+}
+
+func (b SAControllerClientBuilder) ConfigOrDie(name string) *restclient.Config {
+ clientConfig, err := b.Config(name)
+ if err != nil {
+ glog.Fatal(err)
+ }
+ return clientConfig
+}
+
+func (b SAControllerClientBuilder) Client(name string) (clientset.Interface, error) {
+ clientConfig, err := b.Config(name)
+ if err != nil {
+ return nil, err
+ }
+ return clientset.NewForConfig(clientConfig)
+}
+
+func (b SAControllerClientBuilder) ClientOrDie(name string) clientset.Interface {
+ client, err := b.Client(name)
+ if err != nil {
+ glog.Fatal(err)
+ }
+ return client
+}
+
+func (b SAControllerClientBuilder) ClientGoClient(name string) (clientgoclientset.Interface, error) {
+ clientConfig, err := b.Config(name)
+ if err != nil {
+ return nil, err
+ }
+ return clientgoclientset.NewForConfig(clientConfig)
+}
+
+func (b SAControllerClientBuilder) ClientGoClientOrDie(name string) clientgoclientset.Interface {
+ client, err := b.ClientGoClient(name)
+ if err != nil {
+ glog.Fatal(err)
+ }
+ return client
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/controller/controller_ref_manager.go b/vendor/k8s.io/kubernetes/pkg/controller/controller_ref_manager.go
new file mode 100644
index 000000000..5477a073f
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/controller/controller_ref_manager.go
@@ -0,0 +1,515 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/golang/glog"
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ utilerrors "k8s.io/apimachinery/pkg/util/errors"
+ "k8s.io/kubernetes/pkg/api/v1"
+ appsv1beta1 "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
+ extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
+)
+
+// GetControllerOf returns the controllerRef if controllee has a controller,
+// otherwise returns nil.
+func GetControllerOf(controllee metav1.Object) *metav1.OwnerReference {
+ ownerRefs := controllee.GetOwnerReferences()
+ for i := range ownerRefs {
+ owner := &ownerRefs[i]
+ if owner.Controller != nil && *owner.Controller == true {
+ return owner
+ }
+ }
+ return nil
+}
+
+type baseControllerRefManager struct {
+ controller metav1.Object
+ selector labels.Selector
+
+ canAdoptErr error
+ canAdoptOnce sync.Once
+ canAdoptFunc func() error
+}
+
+func (m *baseControllerRefManager) canAdopt() error {
+ m.canAdoptOnce.Do(func() {
+ if m.canAdoptFunc != nil {
+ m.canAdoptErr = m.canAdoptFunc()
+ }
+ })
+ return m.canAdoptErr
+}
+
+// claimObject tries to take ownership of an object for this controller.
+//
+// It will reconcile the following:
+// * Adopt orphans if the match function returns true.
+// * Release owned objects if the match function returns false.
+//
+// A non-nil error is returned if some form of reconciliation was attemped and
+// failed. Usually, controllers should try again later in case reconciliation
+// is still needed.
+//
+// If the error is nil, either the reconciliation succeeded, or no
+// reconciliation was necessary. The returned boolean indicates whether you now
+// own the object.
+//
+// No reconciliation will be attempted if the controller is being deleted.
+func (m *baseControllerRefManager) claimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
+ controllerRef := GetControllerOf(obj)
+ if controllerRef != nil {
+ if controllerRef.UID != m.controller.GetUID() {
+ // Owned by someone else. Ignore.
+ return false, nil
+ }
+ if match(obj) {
+ // We already own it and the selector matches.
+ // Return true (successfully claimed) before checking deletion timestamp.
+ // We're still allowed to claim things we already own while being deleted
+ // because doing so requires taking no actions.
+ return true, nil
+ }
+ // Owned by us but selector doesn't match.
+ // Try to release, unless we're being deleted.
+ if m.controller.GetDeletionTimestamp() != nil {
+ return false, nil
+ }
+ if err := release(obj); err != nil {
+ // If the pod no longer exists, ignore the error.
+ if errors.IsNotFound(err) {
+ return false, nil
+ }
+ // Either someone else released it, or there was a transient error.
+ // The controller should requeue and try again if it's still stale.
+ return false, err
+ }
+ // Successfully released.
+ return false, nil
+ }
+
+ // It's an orphan.
+ if m.controller.GetDeletionTimestamp() != nil || !match(obj) {
+ // Ignore if we're being deleted or selector doesn't match.
+ return false, nil
+ }
+ if obj.GetDeletionTimestamp() != nil {
+ // Ignore if the object is being deleted
+ return false, nil
+ }
+ // Selector matches. Try to adopt.
+ if err := adopt(obj); err != nil {
+ // If the pod no longer exists, ignore the error.
+ if errors.IsNotFound(err) {
+ return false, nil
+ }
+ // Either someone else claimed it first, or there was a transient error.
+ // The controller should requeue and try again if it's still orphaned.
+ return false, err
+ }
+ // Successfully adopted.
+ return true, nil
+}
+
+type PodControllerRefManager struct {
+ baseControllerRefManager
+ controllerKind schema.GroupVersionKind
+ podControl PodControlInterface
+}
+
+// NewPodControllerRefManager returns a PodControllerRefManager that exposes
+// methods to manage the controllerRef of pods.
+//
+// The canAdopt() function can be used to perform a potentially expensive check
+// (such as a live GET from the API server) prior to the first adoption.
+// It will only be called (at most once) if an adoption is actually attempted.
+// If canAdopt() returns a non-nil error, all adoptions will fail.
+//
+// NOTE: Once canAdopt() is called, it will not be called again by the same
+// PodControllerRefManager instance. Create a new instance if it makes
+// sense to check canAdopt() again (e.g. in a different sync pass).
+func NewPodControllerRefManager(
+ podControl PodControlInterface,
+ controller metav1.Object,
+ selector labels.Selector,
+ controllerKind schema.GroupVersionKind,
+ canAdopt func() error,
+) *PodControllerRefManager {
+ return &PodControllerRefManager{
+ baseControllerRefManager: baseControllerRefManager{
+ controller: controller,
+ selector: selector,
+ canAdoptFunc: canAdopt,
+ },
+ controllerKind: controllerKind,
+ podControl: podControl,
+ }
+}
+
+// ClaimPods tries to take ownership of a list of Pods.
+//
+// It will reconcile the following:
+// * Adopt orphans if the selector matches.
+// * Release owned objects if the selector no longer matches.
+//
+// Optional: If one or more filters are specified, a Pod will only be claimed if
+// all filters return true.
+//
+// A non-nil error is returned if some form of reconciliation was attemped and
+// failed. Usually, controllers should try again later in case reconciliation
+// is still needed.
+//
+// If the error is nil, either the reconciliation succeeded, or no
+// reconciliation was necessary. The list of Pods that you now own is returned.
+func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
+ var claimed []*v1.Pod
+ var errlist []error
+
+ match := func(obj metav1.Object) bool {
+ pod := obj.(*v1.Pod)
+ // Check selector first so filters only run on potentially matching Pods.
+ if !m.selector.Matches(labels.Set(pod.Labels)) {
+ return false
+ }
+ for _, filter := range filters {
+ if !filter(pod) {
+ return false
+ }
+ }
+ return true
+ }
+ adopt := func(obj metav1.Object) error {
+ return m.AdoptPod(obj.(*v1.Pod))
+ }
+ release := func(obj metav1.Object) error {
+ return m.ReleasePod(obj.(*v1.Pod))
+ }
+
+ for _, pod := range pods {
+ ok, err := m.claimObject(pod, match, adopt, release)
+ if err != nil {
+ errlist = append(errlist, err)
+ continue
+ }
+ if ok {
+ claimed = append(claimed, pod)
+ }
+ }
+ return claimed, utilerrors.NewAggregate(errlist)
+}
+
+// AdoptPod sends a patch to take control of the pod. It returns the error if
+// the patching fails.
+func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
+ if err := m.canAdopt(); err != nil {
+ return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err)
+ }
+ // Note that ValidateOwnerReferences() will reject this patch if another
+ // OwnerReference exists with controller=true.
+ addControllerPatch := fmt.Sprintf(
+ `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
+ m.controllerKind.GroupVersion(), m.controllerKind.Kind,
+ m.controller.GetName(), m.controller.GetUID(), pod.UID)
+ return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch))
+}
+
+// ReleasePod sends a patch to free the pod from the control of the controller.
+// It returns the error if the patching fails. 404 and 422 errors are ignored.
+func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error {
+ glog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s",
+ pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName())
+ deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), pod.UID)
+ err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch))
+ if err != nil {
+ if errors.IsNotFound(err) {
+ // If the pod no longer exists, ignore it.
+ return nil
+ }
+ if errors.IsInvalid(err) {
+ // Invalid error will be returned in two cases: 1. the pod
+ // has no owner reference, 2. the uid of the pod doesn't
+ // match, which means the pod is deleted and then recreated.
+ // In both cases, the error can be ignored.
+
+ // TODO: If the pod has owner references, but none of them
+ // has the owner.UID, server will silently ignore the patch.
+ // Investigate why.
+ return nil
+ }
+ }
+ return err
+}
+
+// ReplicaSetControllerRefManager is used to manage controllerRef of ReplicaSets.
+// Three methods are defined on this object 1: Classify 2: AdoptReplicaSet and
+// 3: ReleaseReplicaSet which are used to classify the ReplicaSets into appropriate
+// categories and accordingly adopt or release them. See comments on these functions
+// for more details.
+type ReplicaSetControllerRefManager struct {
+ baseControllerRefManager
+ controllerKind schema.GroupVersionKind
+ rsControl RSControlInterface
+}
+
+// NewReplicaSetControllerRefManager returns a ReplicaSetControllerRefManager that exposes
+// methods to manage the controllerRef of ReplicaSets.
+//
+// The canAdopt() function can be used to perform a potentially expensive check
+// (such as a live GET from the API server) prior to the first adoption.
+// It will only be called (at most once) if an adoption is actually attempted.
+// If canAdopt() returns a non-nil error, all adoptions will fail.
+//
+// NOTE: Once canAdopt() is called, it will not be called again by the same
+// ReplicaSetControllerRefManager instance. Create a new instance if it
+// makes sense to check canAdopt() again (e.g. in a different sync pass).
+func NewReplicaSetControllerRefManager(
+ rsControl RSControlInterface,
+ controller metav1.Object,
+ selector labels.Selector,
+ controllerKind schema.GroupVersionKind,
+ canAdopt func() error,
+) *ReplicaSetControllerRefManager {
+ return &ReplicaSetControllerRefManager{
+ baseControllerRefManager: baseControllerRefManager{
+ controller: controller,
+ selector: selector,
+ canAdoptFunc: canAdopt,
+ },
+ controllerKind: controllerKind,
+ rsControl: rsControl,
+ }
+}
+
+// ClaimReplicaSets tries to take ownership of a list of ReplicaSets.
+//
+// It will reconcile the following:
+// * Adopt orphans if the selector matches.
+// * Release owned objects if the selector no longer matches.
+//
+// A non-nil error is returned if some form of reconciliation was attemped and
+// failed. Usually, controllers should try again later in case reconciliation
+// is still needed.
+//
+// If the error is nil, either the reconciliation succeeded, or no
+// reconciliation was necessary. The list of ReplicaSets that you now own is
+// returned.
+func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*extensions.ReplicaSet) ([]*extensions.ReplicaSet, error) {
+ var claimed []*extensions.ReplicaSet
+ var errlist []error
+
+ match := func(obj metav1.Object) bool {
+ return m.selector.Matches(labels.Set(obj.GetLabels()))
+ }
+ adopt := func(obj metav1.Object) error {
+ return m.AdoptReplicaSet(obj.(*extensions.ReplicaSet))
+ }
+ release := func(obj metav1.Object) error {
+ return m.ReleaseReplicaSet(obj.(*extensions.ReplicaSet))
+ }
+
+ for _, rs := range sets {
+ ok, err := m.claimObject(rs, match, adopt, release)
+ if err != nil {
+ errlist = append(errlist, err)
+ continue
+ }
+ if ok {
+ claimed = append(claimed, rs)
+ }
+ }
+ return claimed, utilerrors.NewAggregate(errlist)
+}
+
+// AdoptReplicaSet sends a patch to take control of the ReplicaSet. It returns
+// the error if the patching fails.
+func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(rs *extensions.ReplicaSet) error {
+ if err := m.canAdopt(); err != nil {
+ return fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err)
+ }
+ // Note that ValidateOwnerReferences() will reject this patch if another
+ // OwnerReference exists with controller=true.
+ addControllerPatch := fmt.Sprintf(
+ `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
+ m.controllerKind.GroupVersion(), m.controllerKind.Kind,
+ m.controller.GetName(), m.controller.GetUID(), rs.UID)
+ return m.rsControl.PatchReplicaSet(rs.Namespace, rs.Name, []byte(addControllerPatch))
+}
+
+// ReleaseReplicaSet sends a patch to free the ReplicaSet from the control of the Deployment controller.
+// It returns the error if the patching fails. 404 and 422 errors are ignored.
+func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *extensions.ReplicaSet) error {
+ glog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s",
+ replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName())
+ deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), replicaSet.UID)
+ err := m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(deleteOwnerRefPatch))
+ if err != nil {
+ if errors.IsNotFound(err) {
+ // If the ReplicaSet no longer exists, ignore it.
+ return nil
+ }
+ if errors.IsInvalid(err) {
+ // Invalid error will be returned in two cases: 1. the ReplicaSet
+ // has no owner reference, 2. the uid of the ReplicaSet doesn't
+ // match, which means the ReplicaSet is deleted and then recreated.
+ // In both cases, the error can be ignored.
+ return nil
+ }
+ }
+ return err
+}
+
+// RecheckDeletionTimestamp returns a canAdopt() function to recheck deletion.
+//
+// The canAdopt() function calls getObject() to fetch the latest value,
+// and denies adoption attempts if that object has a non-nil DeletionTimestamp.
+func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error {
+ return func() error {
+ obj, err := getObject()
+ if err != nil {
+ return fmt.Errorf("can't recheck DeletionTimestamp: %v", err)
+ }
+ if obj.GetDeletionTimestamp() != nil {
+ return fmt.Errorf("%v/%v has just been deleted at %v", obj.GetNamespace(), obj.GetName(), obj.GetDeletionTimestamp())
+ }
+ return nil
+ }
+}
+
+// ControllerRevisionControllerRefManager is used to manage controllerRef of ControllerRevisions.
+// Three methods are defined on this object 1: Classify 2: AdoptControllerRevision and
+// 3: ReleaseControllerRevision which are used to classify the ControllerRevisions into appropriate
+// categories and accordingly adopt or release them. See comments on these functions
+// for more details.
+type ControllerRevisionControllerRefManager struct {
+ baseControllerRefManager
+ controllerKind schema.GroupVersionKind
+ crControl ControllerRevisionControlInterface
+}
+
+// NewControllerRevisionControllerRefManager returns a ControllerRevisionControllerRefManager that exposes
+// methods to manage the controllerRef of ControllerRevisions.
+//
+// The canAdopt() function can be used to perform a potentially expensive check
+// (such as a live GET from the API server) prior to the first adoption.
+// It will only be called (at most once) if an adoption is actually attempted.
+// If canAdopt() returns a non-nil error, all adoptions will fail.
+//
+// NOTE: Once canAdopt() is called, it will not be called again by the same
+// ControllerRevisionControllerRefManager instance. Create a new instance if it
+// makes sense to check canAdopt() again (e.g. in a different sync pass).
+func NewControllerRevisionControllerRefManager(
+ crControl ControllerRevisionControlInterface,
+ controller metav1.Object,
+ selector labels.Selector,
+ controllerKind schema.GroupVersionKind,
+ canAdopt func() error,
+) *ControllerRevisionControllerRefManager {
+ return &ControllerRevisionControllerRefManager{
+ baseControllerRefManager: baseControllerRefManager{
+ controller: controller,
+ selector: selector,
+ canAdoptFunc: canAdopt,
+ },
+ controllerKind: controllerKind,
+ crControl: crControl,
+ }
+}
+
+// ClaimControllerRevisions tries to take ownership of a list of ControllerRevisions.
+//
+// It will reconcile the following:
+// * Adopt orphans if the selector matches.
+// * Release owned objects if the selector no longer matches.
+//
+// A non-nil error is returned if some form of reconciliation was attemped and
+// failed. Usually, controllers should try again later in case reconciliation
+// is still needed.
+//
+// If the error is nil, either the reconciliation succeeded, or no
+// reconciliation was necessary. The list of ControllerRevisions that you now own is
+// returned.
+func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(histories []*appsv1beta1.ControllerRevision) ([]*appsv1beta1.ControllerRevision, error) {
+ var claimed []*appsv1beta1.ControllerRevision
+ var errlist []error
+
+ match := func(obj metav1.Object) bool {
+ return m.selector.Matches(labels.Set(obj.GetLabels()))
+ }
+ adopt := func(obj metav1.Object) error {
+ return m.AdoptControllerRevision(obj.(*appsv1beta1.ControllerRevision))
+ }
+ release := func(obj metav1.Object) error {
+ return m.ReleaseControllerRevision(obj.(*appsv1beta1.ControllerRevision))
+ }
+
+ for _, h := range histories {
+ ok, err := m.claimObject(h, match, adopt, release)
+ if err != nil {
+ errlist = append(errlist, err)
+ continue
+ }
+ if ok {
+ claimed = append(claimed, h)
+ }
+ }
+ return claimed, utilerrors.NewAggregate(errlist)
+}
+
+// AdoptControllerRevision sends a patch to take control of the ControllerRevision. It returns the error if
+// the patching fails.
+func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(history *appsv1beta1.ControllerRevision) error {
+ if err := m.canAdopt(); err != nil {
+ return fmt.Errorf("can't adopt ControllerRevision %v/%v (%v): %v", history.Namespace, history.Name, history.UID, err)
+ }
+ // Note that ValidateOwnerReferences() will reject this patch if another
+ // OwnerReference exists with controller=true.
+ addControllerPatch := fmt.Sprintf(
+ `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
+ m.controllerKind.GroupVersion(), m.controllerKind.Kind,
+ m.controller.GetName(), m.controller.GetUID(), history.UID)
+ return m.crControl.PatchControllerRevision(history.Namespace, history.Name, []byte(addControllerPatch))
+}
+
+// ReleaseControllerRevision sends a patch to free the ControllerRevision from the control of its controller.
+// It returns the error if the patching fails. 404 and 422 errors are ignored.
+func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(history *appsv1beta1.ControllerRevision) error {
+ glog.V(2).Infof("patching ControllerRevision %s_%s to remove its controllerRef to %s/%s:%s",
+ history.Namespace, history.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName())
+ deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controller.GetUID(), history.UID)
+ err := m.crControl.PatchControllerRevision(history.Namespace, history.Name, []byte(deleteOwnerRefPatch))
+ if err != nil {
+ if errors.IsNotFound(err) {
+ // If the ControllerRevision no longer exists, ignore it.
+ return nil
+ }
+ if errors.IsInvalid(err) {
+ // Invalid error will be returned in two cases: 1. the ControllerRevision
+ // has no owner reference, 2. the uid of the ControllerRevision doesn't
+ // match, which means the ControllerRevision is deleted and then recreated.
+ // In both cases, the error can be ignored.
+ return nil
+ }
+ }
+ return err
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go b/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go
new file mode 100644
index 000000000..9f1a17767
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/controller/controller_utils.go
@@ -0,0 +1,1018 @@
+/*
+Copyright 2014 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "hash/fnv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "k8s.io/apimachinery/pkg/api/meta"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/clock"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/client-go/util/integer"
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/api/v1"
+ v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
+ podutil "k8s.io/kubernetes/pkg/api/v1/pod"
+ "k8s.io/kubernetes/pkg/api/v1/ref"
+ "k8s.io/kubernetes/pkg/api/validation"
+ extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
+ "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
+ clientretry "k8s.io/kubernetes/pkg/client/retry"
+ hashutil "k8s.io/kubernetes/pkg/util/hash"
+
+ "github.com/golang/glog"
+)
+
+const (
+ // If a watch drops a delete event for a pod, it'll take this long
+ // before a dormant controller waiting for those packets is woken up anyway. It is
+ // specifically targeted at the case where some problem prevents an update
+ // of expectations, without it the controller could stay asleep forever. This should
+ // be set based on the expected latency of watch events.
+ //
+ // Currently a controller can service (create *and* observe the watch events for said
+ // creation) about 10 pods a second, so it takes about 1 min to service
+ // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
+ // latency/pod at the scale of 3000 pods over 100 nodes.
+ ExpectationsTimeout = 5 * time.Minute
+)
+
+var UpdateTaintBackoff = wait.Backoff{
+ Steps: 5,
+ Duration: 100 * time.Millisecond,
+ Jitter: 1.0,
+}
+
+var (
+ KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
+)
+
+type ResyncPeriodFunc func() time.Duration
+
+// Returns 0 for resyncPeriod in case resyncing is not needed.
+func NoResyncPeriodFunc() time.Duration {
+ return 0
+}
+
+// StaticResyncPeriodFunc returns the resync period specified
+func StaticResyncPeriodFunc(resyncPeriod time.Duration) ResyncPeriodFunc {
+ return func() time.Duration {
+ return resyncPeriod
+ }
+}
+
+// Expectations are a way for controllers to tell the controller manager what they expect. eg:
+// ControllerExpectations: {
+// controller1: expects 2 adds in 2 minutes
+// controller2: expects 2 dels in 2 minutes
+// controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met
+// }
+//
+// Implementation:
+// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion
+// ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller
+//
+// * Once set expectations can only be lowered
+// * A controller isn't synced till its expectations are either fulfilled, or expire
+// * Controllers that don't set expectations will get woken up for every matching controllee
+
+// ExpKeyFunc to parse out the key from a ControlleeExpectation
+var ExpKeyFunc = func(obj interface{}) (string, error) {
+ if e, ok := obj.(*ControlleeExpectations); ok {
+ return e.key, nil
+ }
+ return "", fmt.Errorf("Could not find key for obj %#v", obj)
+}
+
+// ControllerExpectationsInterface is an interface that allows users to set and wait on expectations.
+// Only abstracted out for testing.
+// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different
+// types of controllers, because the keys might conflict across types.
+type ControllerExpectationsInterface interface {
+ GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
+ SatisfiedExpectations(controllerKey string) bool
+ DeleteExpectations(controllerKey string)
+ SetExpectations(controllerKey string, add, del int) error
+ ExpectCreations(controllerKey string, adds int) error
+ ExpectDeletions(controllerKey string, dels int) error
+ CreationObserved(controllerKey string)
+ DeletionObserved(controllerKey string)
+ RaiseExpectations(controllerKey string, add, del int)
+ LowerExpectations(controllerKey string, add, del int)
+}
+
+// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync.
+type ControllerExpectations struct {
+ cache.Store
+}
+
+// GetExpectations returns the ControlleeExpectations of the given controller.
+func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) {
+ if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
+ return exp.(*ControlleeExpectations), true, nil
+ } else {
+ return nil, false, err
+ }
+}
+
+// DeleteExpectations deletes the expectations of the given controller from the TTLStore.
+func (r *ControllerExpectations) DeleteExpectations(controllerKey string) {
+ if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
+ if err := r.Delete(exp); err != nil {
+ glog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err)
+ }
+ }
+}
+
+// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
+// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
+// manager.
+func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
+ if exp, exists, err := r.GetExpectations(controllerKey); exists {
+ if exp.Fulfilled() {
+ glog.V(4).Infof("Controller expectations fulfilled %#v", exp)
+ return true
+ } else if exp.isExpired() {
+ glog.V(4).Infof("Controller expectations expired %#v", exp)
+ return true
+ } else {
+ glog.V(4).Infof("Controller still waiting on expectations %#v", exp)
+ return false
+ }
+ } else if err != nil {
+ glog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
+ } else {
+ // When a new controller is created, it doesn't have expectations.
+ // When it doesn't see expected watch events for > TTL, the expectations expire.
+ // - In this case it wakes up, creates/deletes controllees, and sets expectations again.
+ // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
+ // - In this case it continues without setting expectations till it needs to create/delete controllees.
+ glog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
+ }
+ // Trigger a sync if we either encountered and error (which shouldn't happen since we're
+ // getting from local store) or this controller hasn't established expectations.
+ return true
+}
+
+// TODO: Extend ExpirationCache to support explicit expiration.
+// TODO: Make this possible to disable in tests.
+// TODO: Support injection of clock.
+func (exp *ControlleeExpectations) isExpired() bool {
+ return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
+}
+
+// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
+func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
+ exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
+ glog.V(4).Infof("Setting expectations %#v", exp)
+ return r.Add(exp)
+}
+
+func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
+ return r.SetExpectations(controllerKey, adds, 0)
+}
+
+func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error {
+ return r.SetExpectations(controllerKey, 0, dels)
+}
+
+// Decrements the expectation counts of the given controller.
+func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
+ if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
+ exp.Add(int64(-add), int64(-del))
+ // The expectations might've been modified since the update on the previous line.
+ glog.V(4).Infof("Lowered expectations %#v", exp)
+ }
+}
+
+// Increments the expectation counts of the given controller.
+func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) {
+ if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
+ exp.Add(int64(add), int64(del))
+ // The expectations might've been modified since the update on the previous line.
+ glog.V(4).Infof("Raised expectations %#v", exp)
+ }
+}
+
+// CreationObserved atomically decrements the `add` expectation count of the given controller.
+func (r *ControllerExpectations) CreationObserved(controllerKey string) {
+ r.LowerExpectations(controllerKey, 1, 0)
+}
+
+// DeletionObserved atomically decrements the `del` expectation count of the given controller.
+func (r *ControllerExpectations) DeletionObserved(controllerKey string) {
+ r.LowerExpectations(controllerKey, 0, 1)
+}
+
+// Expectations are either fulfilled, or expire naturally.
+type Expectations interface {
+ Fulfilled() bool
+}
+
+// ControlleeExpectations track controllee creates/deletes.
+type ControlleeExpectations struct {
+ // Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms
+ // See: https://golang.org/pkg/sync/atomic/ for more information
+ add int64
+ del int64
+ key string
+ timestamp time.Time
+}
+
+// Add increments the add and del counters.
+func (e *ControlleeExpectations) Add(add, del int64) {
+ atomic.AddInt64(&e.add, add)
+ atomic.AddInt64(&e.del, del)
+}
+
+// Fulfilled returns true if this expectation has been fulfilled.
+func (e *ControlleeExpectations) Fulfilled() bool {
+ // TODO: think about why this line being atomic doesn't matter
+ return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
+}
+
+// GetExpectations returns the add and del expectations of the controllee.
+func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
+ return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
+}
+
+// NewControllerExpectations returns a store for ControllerExpectations.
+func NewControllerExpectations() *ControllerExpectations {
+ return &ControllerExpectations{cache.NewStore(ExpKeyFunc)}
+}
+
+// UIDSetKeyFunc to parse out the key from a UIDSet.
+var UIDSetKeyFunc = func(obj interface{}) (string, error) {
+ if u, ok := obj.(*UIDSet); ok {
+ return u.key, nil
+ }
+ return "", fmt.Errorf("Could not find key for obj %#v", obj)
+}
+
+// UIDSet holds a key and a set of UIDs. Used by the
+// UIDTrackingControllerExpectations to remember which UID it has seen/still
+// waiting for.
+type UIDSet struct {
+ sets.String
+ key string
+}
+
+// UIDTrackingControllerExpectations tracks the UID of the pods it deletes.
+// This cache is needed over plain old expectations to safely handle graceful
+// deletion. The desired behavior is to treat an update that sets the
+// DeletionTimestamp on an object as a delete. To do so consistently, one needs
+// to remember the expected deletes so they aren't double counted.
+// TODO: Track creates as well (#22599)
+type UIDTrackingControllerExpectations struct {
+ ControllerExpectationsInterface
+ // TODO: There is a much nicer way to do this that involves a single store,
+ // a lock per entry, and a ControlleeExpectationsInterface type.
+ uidStoreLock sync.Mutex
+ // Store used for the UIDs associated with any expectation tracked via the
+ // ControllerExpectationsInterface.
+ uidStore cache.Store
+}
+
+// GetUIDs is a convenience method to avoid exposing the set of expected uids.
+// The returned set is not thread safe, all modifications must be made holding
+// the uidStoreLock.
+func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.String {
+ if uid, exists, err := u.uidStore.GetByKey(controllerKey); err == nil && exists {
+ return uid.(*UIDSet).String
+ }
+ return nil
+}
+
+// ExpectDeletions records expectations for the given deleteKeys, against the given controller.
+func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error {
+ u.uidStoreLock.Lock()
+ defer u.uidStoreLock.Unlock()
+
+ if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 {
+ glog.Errorf("Clobbering existing delete keys: %+v", existing)
+ }
+ expectedUIDs := sets.NewString()
+ for _, k := range deletedKeys {
+ expectedUIDs.Insert(k)
+ }
+ glog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys)
+ if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil {
+ return err
+ }
+ return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len())
+}
+
+// DeletionObserved records the given deleteKey as a deletion, for the given rc.
+func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) {
+ u.uidStoreLock.Lock()
+ defer u.uidStoreLock.Unlock()
+
+ uids := u.GetUIDs(rcKey)
+ if uids != nil && uids.Has(deleteKey) {
+ glog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey)
+ u.ControllerExpectationsInterface.DeletionObserved(rcKey)
+ uids.Delete(deleteKey)
+ }
+}
+
+// DeleteExpectations deletes the UID set and invokes DeleteExpectations on the
+// underlying ControllerExpectationsInterface.
+func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) {
+ u.uidStoreLock.Lock()
+ defer u.uidStoreLock.Unlock()
+
+ u.ControllerExpectationsInterface.DeleteExpectations(rcKey)
+ if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists {
+ if err := u.uidStore.Delete(uidExp); err != nil {
+ glog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err)
+ }
+ }
+}
+
+// NewUIDTrackingControllerExpectations returns a wrapper around
+// ControllerExpectations that is aware of deleteKeys.
+func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations {
+ return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)}
+}
+
+// Reasons for pod events
+const (
+ // FailedCreatePodReason is added in an event and in a replica set condition
+ // when a pod for a replica set is failed to be created.
+ FailedCreatePodReason = "FailedCreate"
+ // SuccessfulCreatePodReason is added in an event when a pod for a replica set
+ // is successfully created.
+ SuccessfulCreatePodReason = "SuccessfulCreate"
+ // FailedDeletePodReason is added in an event and in a replica set condition
+ // when a pod for a replica set is failed to be deleted.
+ FailedDeletePodReason = "FailedDelete"
+ // SuccessfulDeletePodReason is added in an event when a pod for a replica set
+ // is successfully deleted.
+ SuccessfulDeletePodReason = "SuccessfulDelete"
+)
+
+// RSControlInterface is an interface that knows how to add or delete
+// ReplicaSets, as well as increment or decrement them. It is used
+// by the deployment controller to ease testing of actions that it takes.
+type RSControlInterface interface {
+ PatchReplicaSet(namespace, name string, data []byte) error
+}
+
+// RealRSControl is the default implementation of RSControllerInterface.
+type RealRSControl struct {
+ KubeClient clientset.Interface
+ Recorder record.EventRecorder
+}
+
+var _ RSControlInterface = &RealRSControl{}
+
+func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) error {
+ _, err := r.KubeClient.Extensions().ReplicaSets(namespace).Patch(name, types.StrategicMergePatchType, data)
+ return err
+}
+
+// TODO: merge the controller revision interface in controller_history.go with this one
+// ControllerRevisionControlInterface is an interface that knows how to patch
+// ControllerRevisions, as well as increment or decrement them. It is used
+// by the daemonset controller to ease testing of actions that it takes.
+type ControllerRevisionControlInterface interface {
+ PatchControllerRevision(namespace, name string, data []byte) error
+}
+
+// RealControllerRevisionControl is the default implementation of ControllerRevisionControlInterface.
+type RealControllerRevisionControl struct {
+ KubeClient clientset.Interface
+}
+
+var _ ControllerRevisionControlInterface = &RealControllerRevisionControl{}
+
+func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name string, data []byte) error {
+ _, err := r.KubeClient.AppsV1beta1().ControllerRevisions(namespace).Patch(name, types.StrategicMergePatchType, data)
+ return err
+}
+
+// PodControlInterface is an interface that knows how to add or delete pods
+// created as an interface to allow testing.
+type PodControlInterface interface {
+ // CreatePods creates new pods according to the spec.
+ CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
+ // CreatePodsOnNode creates a new pod according to the spec on the specified node,
+ // and sets the ControllerRef.
+ CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
+ // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
+ CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
+ // DeletePod deletes the pod identified by podID.
+ DeletePod(namespace string, podID string, object runtime.Object) error
+ // PatchPod patches the pod.
+ PatchPod(namespace, name string, data []byte) error
+}
+
+// RealPodControl is the default implementation of PodControlInterface.
+type RealPodControl struct {
+ KubeClient clientset.Interface
+ Recorder record.EventRecorder
+}
+
+var _ PodControlInterface = &RealPodControl{}
+
+func getPodsLabelSet(template *v1.PodTemplateSpec) labels.Set {
+ desiredLabels := make(labels.Set)
+ for k, v := range template.Labels {
+ desiredLabels[k] = v
+ }
+ return desiredLabels
+}
+
+func getPodsFinalizers(template *v1.PodTemplateSpec) []string {
+ desiredFinalizers := make([]string, len(template.Finalizers))
+ copy(desiredFinalizers, template.Finalizers)
+ return desiredFinalizers
+}
+
+func getPodsAnnotationSet(template *v1.PodTemplateSpec, object runtime.Object) (labels.Set, error) {
+ desiredAnnotations := make(labels.Set)
+ for k, v := range template.Annotations {
+ desiredAnnotations[k] = v
+ }
+ createdByRef, err := ref.GetReference(api.Scheme, object)
+ if err != nil {
+ return desiredAnnotations, fmt.Errorf("unable to get controller reference: %v", err)
+ }
+
+ // TODO: this code was not safe previously - as soon as new code came along that switched to v2, old clients
+ // would be broken upon reading it. This is explicitly hardcoded to v1 to guarantee predictable deployment.
+ // We need to consistently handle this case of annotation versioning.
+ codec := api.Codecs.LegacyCodec(schema.GroupVersion{Group: v1.GroupName, Version: "v1"})
+
+ createdByRefJson, err := runtime.Encode(codec, &v1.SerializedReference{
+ Reference: *createdByRef,
+ })
+ if err != nil {
+ return desiredAnnotations, fmt.Errorf("unable to serialize controller reference: %v", err)
+ }
+ desiredAnnotations[v1.CreatedByAnnotation] = string(createdByRefJson)
+ return desiredAnnotations, nil
+}
+
+func getPodsPrefix(controllerName string) string {
+ // use the dash (if the name isn't too long) to make the pod name a bit prettier
+ prefix := fmt.Sprintf("%s-", controllerName)
+ if len(validation.ValidatePodName(prefix, true)) != 0 {
+ prefix = controllerName
+ }
+ return prefix
+}
+
+func validateControllerRef(controllerRef *metav1.OwnerReference) error {
+ if controllerRef == nil {
+ return fmt.Errorf("controllerRef is nil")
+ }
+ if len(controllerRef.APIVersion) == 0 {
+ return fmt.Errorf("controllerRef has empty APIVersion")
+ }
+ if len(controllerRef.Kind) == 0 {
+ return fmt.Errorf("controllerRef has empty Kind")
+ }
+ if controllerRef.Controller == nil || *controllerRef.Controller != true {
+ return fmt.Errorf("controllerRef.Controller is not set to true")
+ }
+ if controllerRef.BlockOwnerDeletion == nil || *controllerRef.BlockOwnerDeletion != true {
+ return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set")
+ }
+ return nil
+}
+
+func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error {
+ return r.createPods("", namespace, template, object, nil)
+}
+
+func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
+ if err := validateControllerRef(controllerRef); err != nil {
+ return err
+ }
+ return r.createPods("", namespace, template, controllerObject, controllerRef)
+}
+
+func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
+ if err := validateControllerRef(controllerRef); err != nil {
+ return err
+ }
+ return r.createPods(nodeName, namespace, template, object, controllerRef)
+}
+
+func (r RealPodControl) PatchPod(namespace, name string, data []byte) error {
+ _, err := r.KubeClient.Core().Pods(namespace).Patch(name, types.StrategicMergePatchType, data)
+ return err
+}
+
+func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Object, controllerRef *metav1.OwnerReference) (*v1.Pod, error) {
+ desiredLabels := getPodsLabelSet(template)
+ desiredFinalizers := getPodsFinalizers(template)
+ desiredAnnotations, err := getPodsAnnotationSet(template, parentObject)
+ if err != nil {
+ return nil, err
+ }
+ accessor, err := meta.Accessor(parentObject)
+ if err != nil {
+ return nil, fmt.Errorf("parentObject does not have ObjectMeta, %v", err)
+ }
+ prefix := getPodsPrefix(accessor.GetName())
+
+ pod := &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Labels: desiredLabels,
+ Annotations: desiredAnnotations,
+ GenerateName: prefix,
+ Finalizers: desiredFinalizers,
+ },
+ }
+ if controllerRef != nil {
+ pod.OwnerReferences = append(pod.OwnerReferences, *controllerRef)
+ }
+ clone, err := api.Scheme.DeepCopy(&template.Spec)
+ if err != nil {
+ return nil, err
+ }
+ pod.Spec = *clone.(*v1.PodSpec)
+ return pod, nil
+}
+
+func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
+ pod, err := GetPodFromTemplate(template, object, controllerRef)
+ if err != nil {
+ return err
+ }
+ if len(nodeName) != 0 {
+ pod.Spec.NodeName = nodeName
+ }
+ if labels.Set(pod.Labels).AsSelectorPreValidated().Empty() {
+ return fmt.Errorf("unable to create pods, no labels")
+ }
+ if newPod, err := r.KubeClient.Core().Pods(namespace).Create(pod); err != nil {
+ r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
+ return fmt.Errorf("unable to create pods: %v", err)
+ } else {
+ accessor, err := meta.Accessor(object)
+ if err != nil {
+ glog.Errorf("parentObject does not have ObjectMeta, %v", err)
+ return nil
+ }
+ glog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name)
+ r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name)
+ }
+ return nil
+}
+
+func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error {
+ accessor, err := meta.Accessor(object)
+ if err != nil {
+ return fmt.Errorf("object does not have ObjectMeta, %v", err)
+ }
+ glog.V(2).Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID)
+ if err := r.KubeClient.Core().Pods(namespace).Delete(podID, nil); err != nil {
+ r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)
+ return fmt.Errorf("unable to delete pods: %v", err)
+ } else {
+ r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID)
+ }
+ return nil
+}
+
+type FakePodControl struct {
+ sync.Mutex
+ Templates []v1.PodTemplateSpec
+ ControllerRefs []metav1.OwnerReference
+ DeletePodName []string
+ Patches [][]byte
+ Err error
+}
+
+var _ PodControlInterface = &FakePodControl{}
+
+func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error {
+ f.Lock()
+ defer f.Unlock()
+ f.Patches = append(f.Patches, data)
+ if f.Err != nil {
+ return f.Err
+ }
+ return nil
+}
+
+func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error {
+ f.Lock()
+ defer f.Unlock()
+ f.Templates = append(f.Templates, *spec)
+ if f.Err != nil {
+ return f.Err
+ }
+ return nil
+}
+
+func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
+ f.Lock()
+ defer f.Unlock()
+ f.Templates = append(f.Templates, *spec)
+ f.ControllerRefs = append(f.ControllerRefs, *controllerRef)
+ if f.Err != nil {
+ return f.Err
+ }
+ return nil
+}
+
+func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
+ f.Lock()
+ defer f.Unlock()
+ f.Templates = append(f.Templates, *template)
+ f.ControllerRefs = append(f.ControllerRefs, *controllerRef)
+ if f.Err != nil {
+ return f.Err
+ }
+ return nil
+}
+
+func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error {
+ f.Lock()
+ defer f.Unlock()
+ f.DeletePodName = append(f.DeletePodName, podID)
+ if f.Err != nil {
+ return f.Err
+ }
+ return nil
+}
+
+func (f *FakePodControl) Clear() {
+ f.Lock()
+ defer f.Unlock()
+ f.DeletePodName = []string{}
+ f.Templates = []v1.PodTemplateSpec{}
+ f.ControllerRefs = []metav1.OwnerReference{}
+ f.Patches = [][]byte{}
+}
+
+// ByLogging allows custom sorting of pods so the best one can be picked for getting its logs.
+type ByLogging []*v1.Pod
+
+func (s ByLogging) Len() int { return len(s) }
+func (s ByLogging) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+
+func (s ByLogging) Less(i, j int) bool {
+ // 1. assigned < unassigned
+ if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
+ return len(s[i].Spec.NodeName) > 0
+ }
+ // 2. PodRunning < PodUnknown < PodPending
+ m := map[v1.PodPhase]int{v1.PodRunning: 0, v1.PodUnknown: 1, v1.PodPending: 2}
+ if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
+ return m[s[i].Status.Phase] < m[s[j].Status.Phase]
+ }
+ // 3. ready < not ready
+ if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
+ return podutil.IsPodReady(s[i])
+ }
+ // TODO: take availability into account when we push minReadySeconds information from deployment into pods,
+ // see https://github.com/kubernetes/kubernetes/issues/22065
+ // 4. Been ready for more time < less time < empty time
+ if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) {
+ return afterOrZero(podReadyTime(s[j]), podReadyTime(s[i]))
+ }
+ // 5. Pods with containers with higher restart counts < lower restart counts
+ if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
+ return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
+ }
+ // 6. older pods < newer pods < empty timestamp pods
+ if !s[i].CreationTimestamp.Equal(s[j].CreationTimestamp) {
+ return afterOrZero(s[j].CreationTimestamp, s[i].CreationTimestamp)
+ }
+ return false
+}
+
+// ActivePods type allows custom sorting of pods so a controller can pick the best ones to delete.
+type ActivePods []*v1.Pod
+
+func (s ActivePods) Len() int { return len(s) }
+func (s ActivePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+
+func (s ActivePods) Less(i, j int) bool {
+ // 1. Unassigned < assigned
+ // If only one of the pods is unassigned, the unassigned one is smaller
+ if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
+ return len(s[i].Spec.NodeName) == 0
+ }
+ // 2. PodPending < PodUnknown < PodRunning
+ m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2}
+ if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
+ return m[s[i].Status.Phase] < m[s[j].Status.Phase]
+ }
+ // 3. Not ready < ready
+ // If only one of the pods is not ready, the not ready one is smaller
+ if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
+ return !podutil.IsPodReady(s[i])
+ }
+ // TODO: take availability into account when we push minReadySeconds information from deployment into pods,
+ // see https://github.com/kubernetes/kubernetes/issues/22065
+ // 4. Been ready for empty time < less time < more time
+ // If both pods are ready, the latest ready one is smaller
+ if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) {
+ return afterOrZero(podReadyTime(s[i]), podReadyTime(s[j]))
+ }
+ // 5. Pods with containers with higher restart counts < lower restart counts
+ if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
+ return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
+ }
+ // 6. Empty creation time pods < newer pods < older pods
+ if !s[i].CreationTimestamp.Equal(s[j].CreationTimestamp) {
+ return afterOrZero(s[i].CreationTimestamp, s[j].CreationTimestamp)
+ }
+ return false
+}
+
+// afterOrZero checks if time t1 is after time t2; if one of them
+// is zero, the zero time is seen as after non-zero time.
+func afterOrZero(t1, t2 metav1.Time) bool {
+ if t1.Time.IsZero() || t2.Time.IsZero() {
+ return t1.Time.IsZero()
+ }
+ return t1.After(t2.Time)
+}
+
+func podReadyTime(pod *v1.Pod) metav1.Time {
+ if podutil.IsPodReady(pod) {
+ for _, c := range pod.Status.Conditions {
+ // we only care about pod ready conditions
+ if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
+ return c.LastTransitionTime
+ }
+ }
+ }
+ return metav1.Time{}
+}
+
+func maxContainerRestarts(pod *v1.Pod) int {
+ maxRestarts := 0
+ for _, c := range pod.Status.ContainerStatuses {
+ maxRestarts = integer.IntMax(maxRestarts, int(c.RestartCount))
+ }
+ return maxRestarts
+}
+
+// FilterActivePods returns pods that have not terminated.
+func FilterActivePods(pods []*v1.Pod) []*v1.Pod {
+ var result []*v1.Pod
+ for _, p := range pods {
+ if IsPodActive(p) {
+ result = append(result, p)
+ } else {
+ glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
+ p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
+ }
+ }
+ return result
+}
+
+func IsPodActive(p *v1.Pod) bool {
+ return v1.PodSucceeded != p.Status.Phase &&
+ v1.PodFailed != p.Status.Phase &&
+ p.DeletionTimestamp == nil
+}
+
+// FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods.
+func FilterActiveReplicaSets(replicaSets []*extensions.ReplicaSet) []*extensions.ReplicaSet {
+ activeFilter := func(rs *extensions.ReplicaSet) bool {
+ return rs != nil && *(rs.Spec.Replicas) > 0
+ }
+ return FilterReplicaSets(replicaSets, activeFilter)
+}
+
+type filterRS func(rs *extensions.ReplicaSet) bool
+
+// FilterReplicaSets returns replica sets that are filtered by filterFn (all returned ones should match filterFn).
+func FilterReplicaSets(RSes []*extensions.ReplicaSet, filterFn filterRS) []*extensions.ReplicaSet {
+ var filtered []*extensions.ReplicaSet
+ for i := range RSes {
+ if filterFn(RSes[i]) {
+ filtered = append(filtered, RSes[i])
+ }
+ }
+ return filtered
+}
+
+// PodKey returns a key unique to the given pod within a cluster.
+// It's used so we consistently use the same key scheme in this module.
+// It does exactly what cache.MetaNamespaceKeyFunc would have done
+// except there's not possibility for error since we know the exact type.
+func PodKey(pod *v1.Pod) string {
+ return fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)
+}
+
+// ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker.
+type ControllersByCreationTimestamp []*v1.ReplicationController
+
+func (o ControllersByCreationTimestamp) Len() int { return len(o) }
+func (o ControllersByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
+func (o ControllersByCreationTimestamp) Less(i, j int) bool {
+ if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
+ return o[i].Name < o[j].Name
+ }
+ return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
+}
+
+// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker.
+type ReplicaSetsByCreationTimestamp []*extensions.ReplicaSet
+
+func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) }
+func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
+func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool {
+ if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
+ return o[i].Name < o[j].Name
+ }
+ return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
+}
+
+// ReplicaSetsBySizeOlder sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker.
+// By using the creation timestamp, this sorts from old to new replica sets.
+type ReplicaSetsBySizeOlder []*extensions.ReplicaSet
+
+func (o ReplicaSetsBySizeOlder) Len() int { return len(o) }
+func (o ReplicaSetsBySizeOlder) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
+func (o ReplicaSetsBySizeOlder) Less(i, j int) bool {
+ if *(o[i].Spec.Replicas) == *(o[j].Spec.Replicas) {
+ return ReplicaSetsByCreationTimestamp(o).Less(i, j)
+ }
+ return *(o[i].Spec.Replicas) > *(o[j].Spec.Replicas)
+}
+
+// ReplicaSetsBySizeNewer sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker.
+// By using the creation timestamp, this sorts from new to old replica sets.
+type ReplicaSetsBySizeNewer []*extensions.ReplicaSet
+
+func (o ReplicaSetsBySizeNewer) Len() int { return len(o) }
+func (o ReplicaSetsBySizeNewer) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
+func (o ReplicaSetsBySizeNewer) Less(i, j int) bool {
+ if *(o[i].Spec.Replicas) == *(o[j].Spec.Replicas) {
+ return ReplicaSetsByCreationTimestamp(o).Less(j, i)
+ }
+ return *(o[i].Spec.Replicas) > *(o[j].Spec.Replicas)
+}
+
+func AddOrUpdateTaintOnNode(c clientset.Interface, nodeName string, taint *v1.Taint) error {
+ firstTry := true
+ return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error {
+ var err error
+ var oldNode *v1.Node
+ // First we try getting node from the API server cache, as it's cheaper. If it fails
+ // we get it from etcd to be sure to have fresh data.
+ if firstTry {
+ oldNode, err = c.Core().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"})
+ firstTry = false
+ } else {
+ oldNode, err = c.Core().Nodes().Get(nodeName, metav1.GetOptions{})
+ }
+ if err != nil {
+ return err
+ }
+ newNode, ok, err := v1helper.AddOrUpdateTaint(oldNode, taint)
+ if err != nil {
+ return fmt.Errorf("Failed to update taint annotation!")
+ }
+ if !ok {
+ return nil
+ }
+ return PatchNodeTaints(c, nodeName, oldNode, newNode)
+ })
+}
+
+// RemoveTaintOffNode is for cleaning up taints temporarily added to node,
+// won't fail if target taint doesn't exist or has been removed.
+// If passed a node it'll check if there's anything to be done, if taint is not present it won't issue
+// any API calls.
+func RemoveTaintOffNode(c clientset.Interface, nodeName string, taint *v1.Taint, node *v1.Node) error {
+ // Short circuit for limiting amount of API calls.
+ if node != nil {
+ match := false
+ for i := range node.Spec.Taints {
+ if node.Spec.Taints[i].MatchTaint(taint) {
+ match = true
+ break
+ }
+ }
+ if !match {
+ return nil
+ }
+ }
+ firstTry := true
+ return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error {
+ var err error
+ var oldNode *v1.Node
+ // First we try getting node from the API server cache, as it's cheaper. If it fails
+ // we get it from etcd to be sure to have fresh data.
+ if firstTry {
+ oldNode, err = c.Core().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"})
+ firstTry = false
+ } else {
+ oldNode, err = c.Core().Nodes().Get(nodeName, metav1.GetOptions{})
+ }
+ if err != nil {
+ return err
+ }
+ newNode, ok, err := v1helper.RemoveTaint(oldNode, taint)
+ if err != nil {
+ return fmt.Errorf("Failed to update taint annotation!")
+ }
+ if !ok {
+ return nil
+ }
+ return PatchNodeTaints(c, nodeName, oldNode, newNode)
+ })
+}
+
+// PatchNodeTaints patches node's taints.
+func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
+ oldData, err := json.Marshal(oldNode)
+ if err != nil {
+ return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
+ }
+
+ newTaints := newNode.Spec.Taints
+ objCopy, err := api.Scheme.DeepCopy(oldNode)
+ if err != nil {
+ return fmt.Errorf("failed to copy node object %#v: %v", oldNode, err)
+ }
+ newNode, ok := (objCopy).(*v1.Node)
+ if !ok {
+ return fmt.Errorf("failed to cast copy onto node object %#v: %v", newNode, err)
+ }
+ newNode.Spec.Taints = newTaints
+ newData, err := json.Marshal(newNode)
+ if err != nil {
+ return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNode, nodeName, err)
+ }
+
+ patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
+ if err != nil {
+ return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
+ }
+
+ _, err = c.Core().Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes)
+ return err
+}
+
+// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
+// indicating that the controller identified by controllerName is waiting for syncs, followed by
+// either a successful or failed sync.
+func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
+ glog.Infof("Waiting for caches to sync for %s controller", controllerName)
+
+ if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
+ utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
+ return false
+ }
+
+ glog.Infof("Caches are synced for %s controller", controllerName)
+ return true
+}
+
+// ComputeHash returns a hash value calculated from pod template and a collisionCount to avoid hash collision
+func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int64) uint32 {
+ podTemplateSpecHasher := fnv.New32a()
+ hashutil.DeepHashObject(podTemplateSpecHasher, *template)
+
+ // Add collisionCount in the hash if it exists.
+ if collisionCount != nil {
+ collisionCountBytes := make([]byte, 8)
+ binary.LittleEndian.PutUint64(collisionCountBytes, uint64(*collisionCount))
+ podTemplateSpecHasher.Write(collisionCountBytes)
+ }
+
+ return podTemplateSpecHasher.Sum32()
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/controller/doc.go b/vendor/k8s.io/kubernetes/pkg/controller/doc.go
new file mode 100644
index 000000000..3c5c943da
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/controller/doc.go
@@ -0,0 +1,19 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package controller contains code for controllers (like the replication
+// controller).
+package controller // import "k8s.io/kubernetes/pkg/controller"
diff --git a/vendor/k8s.io/kubernetes/pkg/controller/lookup_cache.go b/vendor/k8s.io/kubernetes/pkg/controller/lookup_cache.go
new file mode 100644
index 000000000..160aa6e08
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/controller/lookup_cache.go
@@ -0,0 +1,92 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controller
+
+import (
+ "hash/fnv"
+ "sync"
+
+ "github.com/golang/groupcache/lru"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ hashutil "k8s.io/kubernetes/pkg/util/hash"
+)
+
+type objectWithMeta interface {
+ metav1.Object
+}
+
+// keyFunc returns the key of an object, which is used to look up in the cache for it's matching object.
+// Since we match objects by namespace and Labels/Selector, so if two objects have the same namespace and labels,
+// they will have the same key.
+func keyFunc(obj objectWithMeta) uint64 {
+ hash := fnv.New32a()
+ hashutil.DeepHashObject(hash, &equivalenceLabelObj{
+ namespace: obj.GetNamespace(),
+ labels: obj.GetLabels(),
+ })
+ return uint64(hash.Sum32())
+}
+
+type equivalenceLabelObj struct {
+ namespace string
+ labels map[string]string
+}
+
+// MatchingCache save label and selector matching relationship
+type MatchingCache struct {
+ mutex sync.RWMutex
+ cache *lru.Cache
+}
+
+// NewMatchingCache return a NewMatchingCache, which save label and selector matching relationship.
+func NewMatchingCache(maxCacheEntries int) *MatchingCache {
+ return &MatchingCache{
+ cache: lru.New(maxCacheEntries),
+ }
+}
+
+// Add will add matching information to the cache.
+func (c *MatchingCache) Add(labelObj objectWithMeta, selectorObj objectWithMeta) {
+ key := keyFunc(labelObj)
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ c.cache.Add(key, selectorObj)
+}
+
+// GetMatchingObject lookup the matching object for a given object.
+// Note: the cache information may be invalid since the controller may be deleted or updated,
+// we need check in the external request to ensure the cache data is not dirty.
+func (c *MatchingCache) GetMatchingObject(labelObj objectWithMeta) (controller interface{}, exists bool) {
+ key := keyFunc(labelObj)
+ // NOTE: we use Lock() instead of RLock() here because lru's Get() method also modifies state(
+ // it need update the least recently usage information). So we can not call it concurrently.
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ return c.cache.Get(key)
+}
+
+// Update update the cached matching information.
+func (c *MatchingCache) Update(labelObj objectWithMeta, selectorObj objectWithMeta) {
+ c.Add(labelObj, selectorObj)
+}
+
+// InvalidateAll invalidate the whole cache.
+func (c *MatchingCache) InvalidateAll() {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ c.cache = lru.New(c.cache.MaxEntries)
+}