diff options
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/volume')
17 files changed, 552 insertions, 1346 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/metrics_du.go b/vendor/k8s.io/kubernetes/pkg/volume/metrics_du.go index 19a29cbbc..88a985d5a 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/metrics_du.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/metrics_du.go @@ -19,7 +19,7 @@ package volume import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume/util/fs" ) var _ MetricsProvider = &metricsDu{} @@ -66,7 +66,7 @@ func (md *metricsDu) GetMetrics() (*Metrics, error) { // runDu executes the "du" command and writes the results to metrics.Used func (md *metricsDu) runDu(metrics *Metrics) error { - used, err := util.Du(md.path) + used, err := fs.Du(md.path) if err != nil { return err } @@ -76,7 +76,7 @@ func (md *metricsDu) runDu(metrics *Metrics) error { // runFind executes the "find" command and writes the results to metrics.InodesUsed func (md *metricsDu) runFind(metrics *Metrics) error { - inodesUsed, err := util.Find(md.path) + inodesUsed, err := fs.Find(md.path) if err != nil { return err } @@ -87,7 +87,7 @@ func (md *metricsDu) runFind(metrics *Metrics) error { // getFsInfo writes metrics.Capacity and metrics.Available from the filesystem // info func (md *metricsDu) getFsInfo(metrics *Metrics) error { - available, capacity, _, inodes, inodesFree, _, err := util.FsInfo(md.path) + available, capacity, _, inodes, inodesFree, _, err := fs.FsInfo(md.path) if err != nil { return NewFsInfoFailedError(err) } diff --git a/vendor/k8s.io/kubernetes/pkg/volume/metrics_statfs.go b/vendor/k8s.io/kubernetes/pkg/volume/metrics_statfs.go index ede4f6ef8..66f99e30a 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/metrics_statfs.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/metrics_statfs.go @@ -19,7 +19,7 @@ package volume import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume/util/fs" ) var _ MetricsProvider = &metricsStatFS{} @@ -55,7 +55,7 @@ func (md *metricsStatFS) GetMetrics() (*Metrics, error) { // getFsInfo writes metrics.Capacity, metrics.Used and metrics.Available from the filesystem info func (md *metricsStatFS) getFsInfo(metrics *Metrics) error { - available, capacity, usage, inodes, inodesFree, inodesUsed, err := util.FsInfo(md.path) + available, capacity, usage, inodes, inodesFree, inodesUsed, err := fs.FsInfo(md.path) if err != nil { return NewFsInfoFailedError(err) } diff --git a/vendor/k8s.io/kubernetes/pkg/volume/plugins.go b/vendor/k8s.io/kubernetes/pkg/volume/plugins.go index 41721d1ee..ec4ec5791 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/plugins.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/plugins.go @@ -23,15 +23,25 @@ import ( "sync" "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/validation" - "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/util/io" "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume/util/recyclerclient" +) + +const ( + // Common parameter which can be specified in StorageClass to specify the desired FSType + // Provisioners SHOULD implement support for this if they are block device based + // Must be a filesystem type supported by the host operating system. + // Ex. "ext4", "xfs", "ntfs". Default value depends on the provisioner + VolumeParameterFSType = "fstype" ) // VolumeOptions contains option information about a volume. @@ -42,6 +52,8 @@ type VolumeOptions struct { // Reclamation policy for a persistent volume PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy + // Mount options for a persistent volume + MountOptions []string // Suggested PV.Name of the PersistentVolume to provision. // This is a generated name guaranteed to be unique in Kubernetes cluster. // If you choose not to use it as volume name, ensure uniqueness by either @@ -60,6 +72,17 @@ type VolumeOptions struct { Parameters map[string]string } +type DynamicPluginProber interface { + Init() error + + // If an update has occurred since the last probe, updated = true + // and the list of probed plugins is returned. + // Otherwise, update = false and probedPlugins = nil. + // + // If an error occurs, updated and probedPlugins are undefined. + Probe() (updated bool, probedPlugins []VolumePlugin, err error) +} + // VolumePlugin is an interface to volume plugins that can be used on a // kubernetes node (e.g. by kubelet) to instantiate and manage volumes. type VolumePlugin interface { @@ -139,7 +162,7 @@ type RecyclableVolumePlugin interface { // Recycle will use the provided recorder to write any events that might be // interesting to user. It's expected that caller will pass these events to // the PV being recycled. - Recycle(pvName string, spec *Spec, eventRecorder RecycleEventRecorder) error + Recycle(pvName string, spec *Spec, eventRecorder recyclerclient.RecycleEventRecorder) error } // DeletableVolumePlugin is an extended interface of VolumePlugin and is used @@ -178,6 +201,34 @@ type AttachableVolumePlugin interface { GetDeviceMountRefs(deviceMountPath string) ([]string, error) } +// ExpandableVolumePlugin is an extended interface of VolumePlugin and is used for volumes that can be +// expanded +type ExpandableVolumePlugin interface { + VolumePlugin + ExpandVolumeDevice(spec *Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) + RequiresFSResize() bool +} + +// BlockVolumePlugin is an extend interface of VolumePlugin and is used for block volumes support. +type BlockVolumePlugin interface { + VolumePlugin + // NewBlockVolumeMapper creates a new volume.BlockVolumeMapper from an API specification. + // Ownership of the spec pointer in *not* transferred. + // - spec: The v1.Volume spec + // - pod: The enclosing pod + NewBlockVolumeMapper(spec *Spec, podRef *v1.Pod, opts VolumeOptions) (BlockVolumeMapper, error) + // NewBlockVolumeUnmapper creates a new volume.BlockVolumeUnmapper from recoverable state. + // - name: The volume name, as per the v1.Volume spec. + // - podUID: The UID of the enclosing pod + NewBlockVolumeUnmapper(name string, podUID types.UID) (BlockVolumeUnmapper, error) + // ConstructBlockVolumeSpec constructs a volume spec based on the given + // podUID, volume name and a pod device map path. + // The spec may have incomplete information due to limited information + // from input. This function is used by volume manager to reconstruct + // volume spec by reading the volume directories from disk. + ConstructBlockVolumeSpec(podUID types.UID, volumeName, mountPath string) (*Spec, error) +} + // VolumeHost is an interface that plugins can use to access the kubelet. type VolumeHost interface { // GetPluginDir returns the absolute path to a directory under which @@ -186,6 +237,11 @@ type VolumeHost interface { // GetPodPluginDir(). GetPluginDir(pluginName string) string + // GetVolumeDevicePluginDir returns the absolute path to a directory + // under which a given plugin may store data. + // ex. plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumePluginDependentPath}/ + GetVolumeDevicePluginDir(pluginName string) string + // GetPodVolumeDir returns the absolute path a directory which // represents the named volume under the named plugin for the given // pod. If the specified pod does not exist, the result of this call @@ -198,6 +254,13 @@ type VolumeHost interface { // directory might not actually exist on disk yet. GetPodPluginDir(podUID types.UID, pluginName string) string + // GetPodVolumeDeviceDir returns the absolute path a directory which + // represents the named plugin for the given pod. + // If the specified pod does not exist, the result of this call + // might not exist. + // ex. pods/{podUid}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/ + GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string + // GetKubeClient returns a client interface GetKubeClient() clientset.Interface @@ -216,7 +279,7 @@ type VolumeHost interface { GetCloudProvider() cloudprovider.Interface // Get mounter interface. - GetMounter() mount.Interface + GetMounter(pluginName string) mount.Interface // Get writer interface for writing data to disk. GetWriter() io.Writer @@ -236,15 +299,23 @@ type VolumeHost interface { // Returns a function that returns a configmap. GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) + // Returns an interface that should be used to execute any utilities in volume plugins + GetExec(pluginName string) mount.Exec + // Returns the labels on the node GetNodeLabels() (map[string]string, error) + + // Returns the name of the node + GetNodeName() types.NodeName } // VolumePluginMgr tracks registered plugins. type VolumePluginMgr struct { - mutex sync.Mutex - plugins map[string]VolumePlugin - Host VolumeHost + mutex sync.Mutex + plugins map[string]VolumePlugin + prober DynamicPluginProber + probedPlugins []VolumePlugin + Host VolumeHost } // Spec is an internal representation of a volume. All API volume types translate to Spec. @@ -339,11 +410,24 @@ func NewSpecFromPersistentVolume(pv *v1.PersistentVolume, readOnly bool) *Spec { // InitPlugins initializes each plugin. All plugins must have unique names. // This must be called exactly once before any New* methods are called on any // plugins. -func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost) error { +func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPluginProber, host VolumeHost) error { pm.mutex.Lock() defer pm.mutex.Unlock() pm.Host = host + + if prober == nil { + // Use a dummy prober to prevent nil deference. + pm.prober = &dummyPluginProber{} + } else { + pm.prober = prober + } + if err := pm.prober.Init(); err != nil { + // Prober init failure should not affect the initialization of other plugins. + glog.Errorf("Error initializing dynamic plugin prober: %s", err) + pm.prober = &dummyPluginProber{} + } + if pm.plugins == nil { pm.plugins = map[string]VolumePlugin{} } @@ -362,7 +446,7 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost) } err := plugin.Init(host) if err != nil { - glog.Errorf("Failed to load volume plugin %s, error: %s", plugin, err.Error()) + glog.Errorf("Failed to load volume plugin %s, error: %s", name, err.Error()) allErrs = append(allErrs, err) continue } @@ -372,6 +456,21 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost) return utilerrors.NewAggregate(allErrs) } +func (pm *VolumePluginMgr) initProbedPlugin(probedPlugin VolumePlugin) error { + name := probedPlugin.GetPluginName() + if errs := validation.IsQualifiedName(name); len(errs) != 0 { + return fmt.Errorf("volume plugin has invalid name: %q: %s", name, strings.Join(errs, ";")) + } + + err := probedPlugin.Init(pm.Host) + if err != nil { + return fmt.Errorf("Failed to load volume plugin %s, error: %s", name, err.Error()) + } + + glog.V(1).Infof("Loaded volume plugin %q", name) + return nil +} + // FindPluginBySpec looks for a plugin that can support a given volume // specification. If no plugins can support or more than one plugin can // support it, return error. @@ -379,19 +478,34 @@ func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) { pm.mutex.Lock() defer pm.mutex.Unlock() - matches := []string{} + if spec == nil { + return nil, fmt.Errorf("Could not find plugin because volume spec is nil") + } + + matchedPluginNames := []string{} + matches := []VolumePlugin{} for k, v := range pm.plugins { if v.CanSupport(spec) { - matches = append(matches, k) + matchedPluginNames = append(matchedPluginNames, k) + matches = append(matches, v) + } + } + + pm.refreshProbedPlugins() + for _, plugin := range pm.probedPlugins { + if plugin.CanSupport(spec) { + matchedPluginNames = append(matchedPluginNames, plugin.GetPluginName()) + matches = append(matches, plugin) } } + if len(matches) == 0 { return nil, fmt.Errorf("no volume plugin matched") } if len(matches) > 1 { - return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matches, ",")) + return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ",")) } - return pm.plugins[matches[0]], nil + return matches[0], nil } // FindPluginByName fetches a plugin by name or by legacy name. If no plugin @@ -401,19 +515,52 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) { defer pm.mutex.Unlock() // Once we can get rid of legacy names we can reduce this to a map lookup. - matches := []string{} + matchedPluginNames := []string{} + matches := []VolumePlugin{} for k, v := range pm.plugins { if v.GetPluginName() == name { - matches = append(matches, k) + matchedPluginNames = append(matchedPluginNames, k) + matches = append(matches, v) } } + + pm.refreshProbedPlugins() + for _, plugin := range pm.probedPlugins { + if plugin.GetPluginName() == name { + matchedPluginNames = append(matchedPluginNames, plugin.GetPluginName()) + matches = append(matches, plugin) + } + } + if len(matches) == 0 { return nil, fmt.Errorf("no volume plugin matched") } if len(matches) > 1 { - return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matches, ",")) + return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ",")) + } + return matches[0], nil +} + +// Check if probedPlugin cache update is required. +// If it is, initialize all probed plugins and replace the cache with them. +func (pm *VolumePluginMgr) refreshProbedPlugins() { + updated, plugins, err := pm.prober.Probe() + if err != nil { + glog.Errorf("Error dynamically probing plugins: %s", err) + return // Use cached plugins upon failure. + } + + if updated { + pm.probedPlugins = []VolumePlugin{} + for _, plugin := range plugins { + if err := pm.initProbedPlugin(plugin); err != nil { + glog.Errorf("Error initializing dynamically probed plugin %s; error: %s", + plugin.GetPluginName(), err) + continue + } + pm.probedPlugins = append(pm.probedPlugins, plugin) + } } - return pm.plugins[matches[0]], nil } // FindPersistentPluginBySpec looks for a persistent volume plugin that can @@ -508,7 +655,7 @@ func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (ProvisionableV return nil, fmt.Errorf("no creatable volume plugin matched") } -// FindAttachablePluginBySpec fetches a persistent volume plugin by name. +// FindAttachablePluginBySpec fetches a persistent volume plugin by spec. // Unlike the other "FindPlugin" methods, this does not return error if no // plugin is found. All volumes require a mounter and unmounter, but not // every volume will have an attacher/detacher. @@ -538,6 +685,58 @@ func (pm *VolumePluginMgr) FindAttachablePluginByName(name string) (AttachableVo return nil, nil } +// FindExpandablePluginBySpec fetches a persistent volume plugin by spec. +func (pm *VolumePluginMgr) FindExpandablePluginBySpec(spec *Spec) (ExpandableVolumePlugin, error) { + volumePlugin, err := pm.FindPluginBySpec(spec) + if err != nil { + return nil, err + } + + if expandableVolumePlugin, ok := volumePlugin.(ExpandableVolumePlugin); ok { + return expandableVolumePlugin, nil + } + return nil, nil +} + +// FindExpandablePluginBySpec fetches a persistent volume plugin by name. +func (pm *VolumePluginMgr) FindExpandablePluginByName(name string) (ExpandableVolumePlugin, error) { + volumePlugin, err := pm.FindPluginByName(name) + if err != nil { + return nil, err + } + + if expandableVolumePlugin, ok := volumePlugin.(ExpandableVolumePlugin); ok { + return expandableVolumePlugin, nil + } + return nil, nil +} + +// FindMapperPluginBySpec fetches a block volume plugin by spec. +func (pm *VolumePluginMgr) FindMapperPluginBySpec(spec *Spec) (BlockVolumePlugin, error) { + volumePlugin, err := pm.FindPluginBySpec(spec) + if err != nil { + return nil, err + } + + if blockVolumePlugin, ok := volumePlugin.(BlockVolumePlugin); ok { + return blockVolumePlugin, nil + } + return nil, nil +} + +// FindMapperPluginByName fetches a block volume plugin by name. +func (pm *VolumePluginMgr) FindMapperPluginByName(name string) (BlockVolumePlugin, error) { + volumePlugin, err := pm.FindPluginByName(name) + if err != nil { + return nil, err + } + + if blockVolumePlugin, ok := volumePlugin.(BlockVolumePlugin); ok { + return blockVolumePlugin, nil + } + return nil, nil +} + // NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler // pod. By default, a recycler pod simply runs "rm -rf" on a volume and tests // for emptiness. Most attributes of the template will be correct for most @@ -574,7 +773,7 @@ func NewPersistentVolumeRecyclerPodTemplate() *v1.Pod { Containers: []v1.Container{ { Name: "pv-recycler", - Image: "gcr.io/google_containers/busybox", + Image: "busybox:1.27", Command: []string{"/bin/sh"}, Args: []string{"-c", "test -e /scrub && rm -rf /scrub/..?* /scrub/.[!.]* /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"}, VolumeMounts: []v1.VolumeMount{ @@ -601,3 +800,8 @@ func ValidateRecyclerPodTemplate(pod *v1.Pod) error { } return nil } + +type dummyPluginProber struct{} + +func (*dummyPluginProber) Init() error { return nil } +func (*dummyPluginProber) Probe() (bool, []VolumePlugin, error) { return false, nil, nil } diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util.go b/vendor/k8s.io/kubernetes/pkg/volume/util.go deleted file mode 100644 index 2e5610362..000000000 --- a/vendor/k8s.io/kubernetes/pkg/volume/util.go +++ /dev/null @@ -1,456 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package volume - -import ( - "fmt" - "reflect" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - - "hash/fnv" - "math/rand" - "strconv" - "strings" - - "github.com/golang/glog" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - volutil "k8s.io/kubernetes/pkg/volume/util" -) - -type RecycleEventRecorder func(eventtype, message string) - -// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume -// Recyclers. This function will save the given Pod to the API and watch it -// until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, -// whichever comes first. An attempt to delete a recycler pod is always -// attempted before returning. -// -// In case there is a pod with the same namespace+name already running, this -// function assumes it's an older instance of the recycler pod and watches -// this old pod instead of starting a new one. -// -// pod - the pod designed by a volume plugin to recycle the volume. pod.Name -// will be overwritten with unique name based on PV.Name. -// client - kube client for API operations. -func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, kubeClient clientset.Interface, recorder RecycleEventRecorder) error { - return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient, recorder)) -} - -// same as above func comments, except 'recyclerClient' is a narrower pod API -// interface to ease testing -func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, recyclerClient recyclerClient) error { - glog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name) - - // Generate unique name for the recycler pod - we need to get "already - // exists" error when a previous controller has already started recycling - // the volume. Here we assume that pv.Name is already unique. - pod.Name = "recycler-for-" + pvName - pod.GenerateName = "" - - stopChannel := make(chan struct{}) - defer close(stopChannel) - podCh, err := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel) - if err != nil { - glog.V(4).Infof("cannot start watcher for pod %s/%s: %v", pod.Namespace, pod.Name, err) - return err - } - - // Start the pod - _, err = recyclerClient.CreatePod(pod) - if err != nil { - if errors.IsAlreadyExists(err) { - glog.V(5).Infof("old recycler pod %q found for volume", pod.Name) - } else { - return fmt.Errorf("unexpected error creating recycler pod: %+v\n", err) - } - } - defer func(pod *v1.Pod) { - glog.V(2).Infof("deleting recycler pod %s/%s", pod.Namespace, pod.Name) - if err := recyclerClient.DeletePod(pod.Name, pod.Namespace); err != nil { - glog.Errorf("failed to delete recycler pod %s/%s: %v", pod.Namespace, pod.Name, err) - } - }(pod) - - // Now only the old pod or the new pod run. Watch it until it finishes - // and send all events on the pod to the PV - for { - event, ok := <-podCh - if !ok { - return fmt.Errorf("recycler pod %q watch channel had been closed", pod.Name) - } - switch event.Object.(type) { - case *v1.Pod: - // POD changed - pod := event.Object.(*v1.Pod) - glog.V(4).Infof("recycler pod update received: %s %s/%s %s", event.Type, pod.Namespace, pod.Name, pod.Status.Phase) - switch event.Type { - case watch.Added, watch.Modified: - if pod.Status.Phase == v1.PodSucceeded { - // Recycle succeeded. - return nil - } - if pod.Status.Phase == v1.PodFailed { - if pod.Status.Message != "" { - return fmt.Errorf(pod.Status.Message) - } else { - return fmt.Errorf("pod failed, pod.Status.Message unknown.") - } - } - - case watch.Deleted: - return fmt.Errorf("recycler pod was deleted") - - case watch.Error: - return fmt.Errorf("recycler pod watcher failed") - } - - case *v1.Event: - // Event received - podEvent := event.Object.(*v1.Event) - glog.V(4).Infof("recycler event received: %s %s/%s %s/%s %s", event.Type, podEvent.Namespace, podEvent.Name, podEvent.InvolvedObject.Namespace, podEvent.InvolvedObject.Name, podEvent.Message) - if event.Type == watch.Added { - recyclerClient.Event(podEvent.Type, podEvent.Message) - } - } - } -} - -// recyclerClient abstracts access to a Pod by providing a narrower interface. -// This makes it easier to mock a client for testing. -type recyclerClient interface { - CreatePod(pod *v1.Pod) (*v1.Pod, error) - GetPod(name, namespace string) (*v1.Pod, error) - DeletePod(name, namespace string) error - // WatchPod returns a ListWatch for watching a pod. The stopChannel is used - // to close the reflector backing the watch. The caller is responsible for - // derring a close on the channel to stop the reflector. - WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) - // Event sends an event to the volume that is being recycled. - Event(eventtype, message string) -} - -func newRecyclerClient(client clientset.Interface, recorder RecycleEventRecorder) recyclerClient { - return &realRecyclerClient{ - client, - recorder, - } -} - -type realRecyclerClient struct { - client clientset.Interface - recorder RecycleEventRecorder -} - -func (c *realRecyclerClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) { - return c.client.Core().Pods(pod.Namespace).Create(pod) -} - -func (c *realRecyclerClient) GetPod(name, namespace string) (*v1.Pod, error) { - return c.client.Core().Pods(namespace).Get(name, metav1.GetOptions{}) -} - -func (c *realRecyclerClient) DeletePod(name, namespace string) error { - return c.client.Core().Pods(namespace).Delete(name, nil) -} - -func (c *realRecyclerClient) Event(eventtype, message string) { - c.recorder(eventtype, message) -} - -func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) { - podSelector, _ := fields.ParseSelector("metadata.name=" + name) - options := metav1.ListOptions{ - FieldSelector: podSelector.String(), - Watch: true, - } - - podWatch, err := c.client.Core().Pods(namespace).Watch(options) - if err != nil { - return nil, err - } - - eventSelector, _ := fields.ParseSelector("involvedObject.name=" + name) - eventWatch, err := c.client.Core().Events(namespace).Watch(metav1.ListOptions{ - FieldSelector: eventSelector.String(), - Watch: true, - }) - if err != nil { - podWatch.Stop() - return nil, err - } - - eventCh := make(chan watch.Event, 30) - - go func() { - defer eventWatch.Stop() - defer podWatch.Stop() - defer close(eventCh) - var podWatchChannelClosed bool - var eventWatchChannelClosed bool - for { - select { - case _ = <-stopChannel: - return - - case podEvent, ok := <-podWatch.ResultChan(): - if !ok { - podWatchChannelClosed = true - } else { - eventCh <- podEvent - } - case eventEvent, ok := <-eventWatch.ResultChan(): - if !ok { - eventWatchChannelClosed = true - } else { - eventCh <- eventEvent - } - } - if podWatchChannelClosed && eventWatchChannelClosed { - break - } - } - }() - - return eventCh, nil -} - -// CalculateTimeoutForVolume calculates time for a Recycler pod to complete a -// recycle operation. The calculation and return value is either the -// minimumTimeout or the timeoutIncrement per Gi of storage size, whichever is -// greater. -func CalculateTimeoutForVolume(minimumTimeout, timeoutIncrement int, pv *v1.PersistentVolume) int64 { - giQty := resource.MustParse("1Gi") - pvQty := pv.Spec.Capacity[v1.ResourceStorage] - giSize := giQty.Value() - pvSize := pvQty.Value() - timeout := (pvSize / giSize) * int64(timeoutIncrement) - if timeout < int64(minimumTimeout) { - return int64(minimumTimeout) - } else { - return timeout - } -} - -// RoundUpSize calculates how many allocation units are needed to accommodate -// a volume of given size. E.g. when user wants 1500MiB volume, while AWS EBS -// allocates volumes in gibibyte-sized chunks, -// RoundUpSize(1500 * 1024*1024, 1024*1024*1024) returns '2' -// (2 GiB is the smallest allocatable volume that can hold 1500MiB) -func RoundUpSize(volumeSizeBytes int64, allocationUnitBytes int64) int64 { - return (volumeSizeBytes + allocationUnitBytes - 1) / allocationUnitBytes -} - -// GenerateVolumeName returns a PV name with clusterName prefix. The function -// should be used to generate a name of GCE PD or Cinder volume. It basically -// adds "<clusterName>-dynamic-" before the PV name, making sure the resulting -// string fits given length and cuts "dynamic" if not. -func GenerateVolumeName(clusterName, pvName string, maxLength int) string { - prefix := clusterName + "-dynamic" - pvLen := len(pvName) - - // cut the "<clusterName>-dynamic" to fit full pvName into maxLength - // +1 for the '-' dash - if pvLen+1+len(prefix) > maxLength { - prefix = prefix[:maxLength-pvLen-1] - } - return prefix + "-" + pvName -} - -// Check if the path from the mounter is empty. -func GetPath(mounter Mounter) (string, error) { - path := mounter.GetPath() - if path == "" { - return "", fmt.Errorf("Path is empty %s", reflect.TypeOf(mounter).String()) - } - return path, nil -} - -// ChooseZone implements our heuristics for choosing a zone for volume creation based on the volume name -// Volumes are generally round-robin-ed across all active zones, using the hash of the PVC Name. -// However, if the PVCName ends with `-<integer>`, we will hash the prefix, and then add the integer to the hash. -// This means that a StatefulSet's volumes (`claimname-statefulsetname-id`) will spread across available zones, -// assuming the id values are consecutive. -func ChooseZoneForVolume(zones sets.String, pvcName string) string { - // We create the volume in a zone determined by the name - // Eventually the scheduler will coordinate placement into an available zone - var hash uint32 - var index uint32 - - if pvcName == "" { - // We should always be called with a name; this shouldn't happen - glog.Warningf("No name defined during volume create; choosing random zone") - - hash = rand.Uint32() - } else { - hashString := pvcName - - // Heuristic to make sure that volumes in a StatefulSet are spread across zones - // StatefulSet PVCs are (currently) named ClaimName-StatefulSetName-Id, - // where Id is an integer index. - // Note though that if a StatefulSet pod has multiple claims, we need them to be - // in the same zone, because otherwise the pod will be unable to mount both volumes, - // and will be unschedulable. So we hash _only_ the "StatefulSetName" portion when - // it looks like `ClaimName-StatefulSetName-Id`. - // We continue to round-robin volume names that look like `Name-Id` also; this is a useful - // feature for users that are creating statefulset-like functionality without using statefulsets. - lastDash := strings.LastIndexByte(pvcName, '-') - if lastDash != -1 { - statefulsetIDString := pvcName[lastDash+1:] - statefulsetID, err := strconv.ParseUint(statefulsetIDString, 10, 32) - if err == nil { - // Offset by the statefulsetID, so we round-robin across zones - index = uint32(statefulsetID) - // We still hash the volume name, but only the prefix - hashString = pvcName[:lastDash] - - // In the special case where it looks like `ClaimName-StatefulSetName-Id`, - // hash only the StatefulSetName, so that different claims on the same StatefulSet - // member end up in the same zone. - // Note that StatefulSetName (and ClaimName) might themselves both have dashes. - // We actually just take the portion after the final - of ClaimName-StatefulSetName. - // For our purposes it doesn't much matter (just suboptimal spreading). - lastDash := strings.LastIndexByte(hashString, '-') - if lastDash != -1 { - hashString = hashString[lastDash+1:] - } - - glog.V(2).Infof("Detected StatefulSet-style volume name %q; index=%d", pvcName, index) - } - } - - // We hash the (base) volume name, so we don't bias towards the first N zones - h := fnv.New32() - h.Write([]byte(hashString)) - hash = h.Sum32() - } - - // Zones.List returns zones in a consistent order (sorted) - // We do have a potential failure case where volumes will not be properly spread, - // if the set of zones changes during StatefulSet volume creation. However, this is - // probably relatively unlikely because we expect the set of zones to be essentially - // static for clusters. - // Hopefully we can address this problem if/when we do full scheduler integration of - // PVC placement (which could also e.g. avoid putting volumes in overloaded or - // unhealthy zones) - zoneSlice := zones.List() - zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))] - - glog.V(2).Infof("Creating volume for PVC %q; chose zone=%q from zones=%q", pvcName, zone, zoneSlice) - return zone -} - -// UnmountViaEmptyDir delegates the tear down operation for secret, configmap, git_repo and downwardapi -// to empty_dir -func UnmountViaEmptyDir(dir string, host VolumeHost, volName string, volSpec Spec, podUID types.UID) error { - glog.V(3).Infof("Tearing down volume %v for pod %v at %v", volName, podUID, dir) - - if pathExists, pathErr := volutil.PathExists(dir); pathErr != nil { - return fmt.Errorf("Error checking if path exists: %v", pathErr) - } else if !pathExists { - glog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir) - return nil - } - - // Wrap EmptyDir, let it do the teardown. - wrapped, err := host.NewWrapperUnmounter(volName, volSpec, podUID) - if err != nil { - return err - } - return wrapped.TearDownAt(dir) -} - -// MountOptionFromSpec extracts and joins mount options from volume spec with supplied options -func MountOptionFromSpec(spec *Spec, options ...string) []string { - pv := spec.PersistentVolume - - if pv != nil { - if mo, ok := pv.Annotations[v1.MountOptionAnnotation]; ok { - moList := strings.Split(mo, ",") - return JoinMountOptions(moList, options) - } - - } - return options -} - -// JoinMountOptions joins mount options eliminating duplicates -func JoinMountOptions(userOptions []string, systemOptions []string) []string { - allMountOptions := sets.NewString() - - for _, mountOption := range userOptions { - if len(mountOption) > 0 { - allMountOptions.Insert(mountOption) - } - } - - for _, mountOption := range systemOptions { - allMountOptions.Insert(mountOption) - } - return allMountOptions.UnsortedList() -} - -// ZonesToSet converts a string containing a comma separated list of zones to set -func ZonesToSet(zonesString string) (sets.String, error) { - zonesSlice := strings.Split(zonesString, ",") - zonesSet := make(sets.String) - for _, zone := range zonesSlice { - trimmedZone := strings.TrimSpace(zone) - if trimmedZone == "" { - return make(sets.String), fmt.Errorf("comma separated list of zones (%q) must not contain an empty zone", zonesString) - } - zonesSet.Insert(trimmedZone) - } - return zonesSet, nil -} - -// ValidateZone returns: -// - an error in case zone is an empty string or contains only any combination of spaces and tab characters -// - nil otherwise -func ValidateZone(zone string) error { - if strings.TrimSpace(zone) == "" { - return fmt.Errorf("the provided %q zone is not valid, it's an empty string or contains only spaces and tab characters", zone) - } - return nil -} - -// AccessModesContains returns whether the requested mode is contained by modes -func AccessModesContains(modes []v1.PersistentVolumeAccessMode, mode v1.PersistentVolumeAccessMode) bool { - for _, m := range modes { - if m == mode { - return true - } - } - return false -} - -// AccessModesContainedInAll returns whether all of the requested modes are contained by modes -func AccessModesContainedInAll(indexedModes []v1.PersistentVolumeAccessMode, requestedModes []v1.PersistentVolumeAccessMode) bool { - for _, mode := range requestedModes { - if !AccessModesContains(indexedModes, mode) { - return false - } - } - return true -} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/atomic_writer.go b/vendor/k8s.io/kubernetes/pkg/volume/util/atomic_writer.go deleted file mode 100644 index 5eef55b45..000000000 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/atomic_writer.go +++ /dev/null @@ -1,462 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "bytes" - "fmt" - "io/ioutil" - "os" - "path" - "path/filepath" - "runtime" - "strings" - "time" - - "github.com/golang/glog" - - "k8s.io/apimachinery/pkg/util/sets" -) - -const ( - maxFileNameLength = 255 - maxPathLength = 4096 -) - -// AtomicWriter handles atomically projecting content for a set of files into -// a target directory. -// -// Note: -// -// 1. AtomicWriter reserves the set of pathnames starting with `..`. -// 2. AtomicWriter offers no concurrency guarantees and must be synchronized -// by the caller. -// -// The visible files in this volume are symlinks to files in the writer's data -// directory. Actual files are stored in a hidden timestamped directory which -// is symlinked to by the data directory. The timestamped directory and -// data directory symlink are created in the writer's target dir. This scheme -// allows the files to be atomically updated by changing the target of the -// data directory symlink. -// -// Consumers of the target directory can monitor the ..data symlink using -// inotify or fanotify to receive events when the content in the volume is -// updated. -type AtomicWriter struct { - targetDir string - logContext string -} - -type FileProjection struct { - Data []byte - Mode int32 -} - -// NewAtomicWriter creates a new AtomicWriter configured to write to the given -// target directory, or returns an error if the target directory does not exist. -func NewAtomicWriter(targetDir string, logContext string) (*AtomicWriter, error) { - _, err := os.Stat(targetDir) - if os.IsNotExist(err) { - return nil, err - } - - return &AtomicWriter{targetDir: targetDir, logContext: logContext}, nil -} - -const ( - dataDirName = "..data" - newDataDirName = "..data_tmp" -) - -// Write does an atomic projection of the given payload into the writer's target -// directory. Input paths must not begin with '..'. -// -// The Write algorithm is: -// -// 1. The payload is validated; if the payload is invalid, the function returns -// 2. The user-visible portion of the volume is walked to determine whether any -// portion of the payload was deleted and is still present on disk. -// If the payload is already present on disk and there are no deleted files, -// the function returns -// 3. A check is made to determine whether data present in the payload has changed -// 4. A new timestamped dir is created -// 5. The payload is written to the new timestamped directory -// 6. Symlinks and directory for new user-visible files are created (if needed). -// -// For example, consider the files: -// <target-dir>/podName -// <target-dir>/user/labels -// <target-dir>/k8s/annotations -// -// The user visible files are symbolic links into the internal data directory: -// <target-dir>/podName -> ..data/podName -// <target-dir>/usr/labels -> ../..data/usr/labels -// <target-dir>/k8s/annotations -> ../..data/k8s/annotations -// -// Relative links are created into the data directory for files in subdirectories. -// -// The data directory itself is a link to a timestamped directory with -// the real data: -// <target-dir>/..data -> ..2016_02_01_15_04_05.12345678/ -// 7. The current timestamped directory is detected by reading the data directory -// symlink -// 8. A symlink to the new timestamped directory ..data_tmp is created that will -// become the new data directory -// 9. The new data directory symlink is renamed to the data directory; rename is atomic -// 10. Old paths are removed from the user-visible portion of the target directory -// 11. The previous timestamped directory is removed, if it exists -func (w *AtomicWriter) Write(payload map[string]FileProjection) error { - // (1) - cleanPayload, err := validatePayload(payload) - if err != nil { - glog.Errorf("%s: invalid payload: %v", w.logContext, err) - return err - } - - // (2) - pathsToRemove, err := w.pathsToRemove(cleanPayload) - if err != nil { - glog.Errorf("%s: error determining user-visible files to remove: %v", w.logContext, err) - return err - } - - // (3) - if should, err := w.shouldWritePayload(cleanPayload); err != nil { - glog.Errorf("%s: error determining whether payload should be written to disk: %v", w.logContext, err) - return err - } else if !should && len(pathsToRemove) == 0 { - glog.V(4).Infof("%s: no update required for target directory %v", w.logContext, w.targetDir) - return nil - } else { - glog.V(4).Infof("%s: write required for target directory %v", w.logContext, w.targetDir) - } - - // (4) - tsDir, err := w.newTimestampDir() - if err != nil { - glog.V(4).Infof("%s: error creating new ts data directory: %v", w.logContext, err) - return err - } - - // (5) - if err = w.writePayloadToDir(cleanPayload, tsDir); err != nil { - glog.Errorf("%s: error writing payload to ts data directory %s: %v", w.logContext, tsDir, err) - return err - } else { - glog.V(4).Infof("%s: performed write of new data to ts data directory: %s", w.logContext, tsDir) - } - - // (6) - if err = w.createUserVisibleFiles(cleanPayload); err != nil { - glog.Errorf("%s: error creating visible symlinks in %s: %v", w.logContext, w.targetDir, err) - return err - } - - // (7) - _, tsDirName := filepath.Split(tsDir) - dataDirPath := path.Join(w.targetDir, dataDirName) - oldTsDir, err := os.Readlink(dataDirPath) - if err != nil && !os.IsNotExist(err) { - glog.Errorf("%s: error reading link for data directory: %v", w.logContext, err) - return err - } - - // (8) - newDataDirPath := path.Join(w.targetDir, newDataDirName) - if err = os.Symlink(tsDirName, newDataDirPath); err != nil { - os.RemoveAll(tsDir) - glog.Errorf("%s: error creating symbolic link for atomic update: %v", w.logContext, err) - return err - } - - // (9) - if runtime.GOOS == "windows" { - os.Remove(dataDirPath) - err = os.Symlink(tsDirName, dataDirPath) - os.Remove(newDataDirPath) - } else { - err = os.Rename(newDataDirPath, dataDirPath) - } - if err != nil { - os.Remove(newDataDirPath) - os.RemoveAll(tsDir) - glog.Errorf("%s: error renaming symbolic link for data directory %s: %v", w.logContext, newDataDirPath, err) - return err - } - - // (10) - if err = w.removeUserVisiblePaths(pathsToRemove); err != nil { - glog.Errorf("%s: error removing old visible symlinks: %v", w.logContext, err) - return err - } - - // (11) - if len(oldTsDir) > 0 { - if err = os.RemoveAll(path.Join(w.targetDir, oldTsDir)); err != nil { - glog.Errorf("%s: error removing old data directory %s: %v", w.logContext, oldTsDir, err) - return err - } - } - - return nil -} - -// validatePayload returns an error if any path in the payload returns a copy of the payload with the paths cleaned. -func validatePayload(payload map[string]FileProjection) (map[string]FileProjection, error) { - cleanPayload := make(map[string]FileProjection) - for k, content := range payload { - if err := validatePath(k); err != nil { - return nil, err - } - - cleanPayload[path.Clean(k)] = content - } - - return cleanPayload, nil -} - -// validatePath validates a single path, returning an error if the path is -// invalid. paths may not: -// -// 1. be absolute -// 2. contain '..' as an element -// 3. start with '..' -// 4. contain filenames larger than 255 characters -// 5. be longer than 4096 characters -func validatePath(targetPath string) error { - // TODO: somehow unify this with the similar api validation, - // validateVolumeSourcePath; the error semantics are just different enough - // from this that it was time-prohibitive trying to find the right - // refactoring to re-use. - if targetPath == "" { - return fmt.Errorf("invalid path: must not be empty: %q", targetPath) - } - if path.IsAbs(targetPath) { - return fmt.Errorf("invalid path: must be relative path: %s", targetPath) - } - - if len(targetPath) > maxPathLength { - return fmt.Errorf("invalid path: must be less than %d characters", maxPathLength) - } - - items := strings.Split(targetPath, string(os.PathSeparator)) - for _, item := range items { - if item == ".." { - return fmt.Errorf("invalid path: must not contain '..': %s", targetPath) - } - if len(item) > maxFileNameLength { - return fmt.Errorf("invalid path: filenames must be less than %d characters", maxFileNameLength) - } - } - if strings.HasPrefix(items[0], "..") && len(items[0]) > 2 { - return fmt.Errorf("invalid path: must not start with '..': %s", targetPath) - } - - return nil -} - -// shouldWritePayload returns whether the payload should be written to disk. -func (w *AtomicWriter) shouldWritePayload(payload map[string]FileProjection) (bool, error) { - for userVisiblePath, fileProjection := range payload { - shouldWrite, err := w.shouldWriteFile(path.Join(w.targetDir, userVisiblePath), fileProjection.Data) - if err != nil { - return false, err - } - - if shouldWrite { - return true, nil - } - } - - return false, nil -} - -// shouldWriteFile returns whether a new version of a file should be written to disk. -func (w *AtomicWriter) shouldWriteFile(path string, content []byte) (bool, error) { - _, err := os.Lstat(path) - if os.IsNotExist(err) { - return true, nil - } - - contentOnFs, err := ioutil.ReadFile(path) - if err != nil { - return false, err - } - - return (bytes.Compare(content, contentOnFs) != 0), nil -} - -// pathsToRemove walks the user-visible portion of the target directory and -// determines which paths should be removed (if any) after the payload is -// written to the target directory. -func (w *AtomicWriter) pathsToRemove(payload map[string]FileProjection) (sets.String, error) { - paths := sets.NewString() - visitor := func(path string, info os.FileInfo, err error) error { - if path == w.targetDir { - return nil - } - - relativePath := strings.TrimPrefix(path, w.targetDir) - if runtime.GOOS == "windows" { - relativePath = strings.TrimPrefix(relativePath, "\\") - } else { - relativePath = strings.TrimPrefix(relativePath, "/") - } - if strings.HasPrefix(relativePath, "..") { - return nil - } - - paths.Insert(relativePath) - return nil - } - - err := filepath.Walk(w.targetDir, visitor) - if os.IsNotExist(err) { - return nil, nil - } else if err != nil { - return nil, err - } - glog.V(5).Infof("%s: current paths: %+v", w.targetDir, paths.List()) - - newPaths := sets.NewString() - for file := range payload { - // add all subpaths for the payload to the set of new paths - // to avoid attempting to remove non-empty dirs - for subPath := file; subPath != ""; { - newPaths.Insert(subPath) - subPath, _ = filepath.Split(subPath) - subPath = strings.TrimSuffix(subPath, "/") - } - } - glog.V(5).Infof("%s: new paths: %+v", w.targetDir, newPaths.List()) - - result := paths.Difference(newPaths) - glog.V(5).Infof("%s: paths to remove: %+v", w.targetDir, result) - - return result, nil -} - -// newTimestampDir creates a new timestamp directory -func (w *AtomicWriter) newTimestampDir() (string, error) { - tsDir, err := ioutil.TempDir(w.targetDir, fmt.Sprintf("..%s.", time.Now().Format("1981_02_01_15_04_05"))) - if err != nil { - glog.Errorf("%s: unable to create new temp directory: %v", w.logContext, err) - return "", err - } - - // 0755 permissions are needed to allow 'group' and 'other' to recurse the - // directory tree. do a chmod here to ensure that permissions are set correctly - // regardless of the process' umask. - err = os.Chmod(tsDir, 0755) - if err != nil { - glog.Errorf("%s: unable to set mode on new temp directory: %v", w.logContext, err) - return "", err - } - - return tsDir, nil -} - -// writePayloadToDir writes the given payload to the given directory. The -// directory must exist. -func (w *AtomicWriter) writePayloadToDir(payload map[string]FileProjection, dir string) error { - for userVisiblePath, fileProjection := range payload { - content := fileProjection.Data - mode := os.FileMode(fileProjection.Mode) - fullPath := path.Join(dir, userVisiblePath) - baseDir, _ := filepath.Split(fullPath) - - err := os.MkdirAll(baseDir, os.ModePerm) - if err != nil { - glog.Errorf("%s: unable to create directory %s: %v", w.logContext, baseDir, err) - return err - } - - err = ioutil.WriteFile(fullPath, content, mode) - if err != nil { - glog.Errorf("%s: unable to write file %s with mode %v: %v", w.logContext, fullPath, mode, err) - return err - } - // Chmod is needed because ioutil.WriteFile() ends up calling - // open(2) to create the file, so the final mode used is "mode & - // ~umask". But we want to make sure the specified mode is used - // in the file no matter what the umask is. - err = os.Chmod(fullPath, mode) - if err != nil { - glog.Errorf("%s: unable to write file %s with mode %v: %v", w.logContext, fullPath, mode, err) - } - } - - return nil -} - -// createUserVisibleFiles creates the relative symlinks for all the -// files configured in the payload. If the directory in a file path does not -// exist, it is created. -// -// Viz: -// For files: "bar", "foo/bar", "baz/bar", "foo/baz/blah" -// the following symlinks and subdirectories are created: -// bar -> ..data/bar -// foo/bar -> ../..data/foo/bar -// baz/bar -> ../..data/baz/bar -// foo/baz/blah -> ../../..data/foo/baz/blah -func (w *AtomicWriter) createUserVisibleFiles(payload map[string]FileProjection) error { - for userVisiblePath := range payload { - dir, _ := filepath.Split(userVisiblePath) - subDirs := 0 - if len(dir) > 0 { - // If dir is not empty, the projection path contains at least one - // subdirectory (example: userVisiblePath := "foo/bar"). - // Since filepath.Split leaves a trailing path separator, in this - // example, dir = "foo/". In order to calculate the number of - // subdirectories, we must subtract 1 from the number returned by split. - subDirs = len(strings.Split(dir, "/")) - 1 - err := os.MkdirAll(path.Join(w.targetDir, dir), os.ModePerm) - if err != nil { - return err - } - } - _, err := os.Readlink(path.Join(w.targetDir, userVisiblePath)) - if err != nil && os.IsNotExist(err) { - // The link into the data directory for this path doesn't exist; create it, - // respecting the number of subdirectories necessary to link - // correctly back into the data directory. - visibleFile := path.Join(w.targetDir, userVisiblePath) - dataDirFile := path.Join(strings.Repeat("../", subDirs), dataDirName, userVisiblePath) - - err = os.Symlink(dataDirFile, visibleFile) - if err != nil { - return err - } - } - } - return nil -} - -// removeUserVisiblePaths removes the set of paths from the user-visible -// portion of the writer's target directory. -func (w *AtomicWriter) removeUserVisiblePaths(paths sets.String) error { - orderedPaths := paths.List() - for ii := len(orderedPaths) - 1; ii >= 0; ii-- { - if err := os.Remove(path.Join(w.targetDir, orderedPaths[ii])); err != nil { - glog.Errorf("%s: error pruning old user-visible path %s: %v", w.logContext, orderedPaths[ii], err) - return err - } - } - - return nil -} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util.go b/vendor/k8s.io/kubernetes/pkg/volume/util/device_util.go deleted file mode 100644 index 9098d7b85..000000000 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util.go +++ /dev/null @@ -1,31 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -//DeviceUtil is a util for common device methods -type DeviceUtil interface { - FindMultipathDeviceForDevice(disk string) string -} - -type deviceHandler struct { - get_io IoUtil -} - -//NewDeviceHandler Create a new IoHandler implementation -func NewDeviceHandler(io IoUtil) DeviceUtil { - return &deviceHandler{get_io: io} -} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_linux.go b/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_linux.go deleted file mode 100644 index 0d9851140..000000000 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_linux.go +++ /dev/null @@ -1,61 +0,0 @@ -// +build linux - -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "errors" - "strings" -) - -// FindMultipathDeviceForDevice given a device name like /dev/sdx, find the devicemapper parent -func (handler *deviceHandler) FindMultipathDeviceForDevice(device string) string { - io := handler.get_io - disk, err := findDeviceForPath(device, io) - if err != nil { - return "" - } - sysPath := "/sys/block/" - if dirs, err := io.ReadDir(sysPath); err == nil { - for _, f := range dirs { - name := f.Name() - if strings.HasPrefix(name, "dm-") { - if _, err1 := io.Lstat(sysPath + name + "/slaves/" + disk); err1 == nil { - return "/dev/" + name - } - } - } - } - return "" -} - -// findDeviceForPath Find the underlaying disk for a linked path such as /dev/disk/by-path/XXXX or /dev/mapper/XXXX -// will return sdX or hdX etc, if /dev/sdX is passed in then sdX will be returned -func findDeviceForPath(path string, io IoUtil) (string, error) { - devicePath, err := io.EvalSymlinks(path) - if err != nil { - return "", err - } - // if path /dev/hdX split into "", "dev", "hdX" then we will - // return just the last part - parts := strings.Split(devicePath, "/") - if len(parts) == 3 && strings.HasPrefix(parts[1], "dev") { - return parts[2], nil - } - return "", errors.New("Illegal path for device " + devicePath) -} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_unsupported.go b/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_unsupported.go deleted file mode 100644 index 6afb1f139..000000000 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_unsupported.go +++ /dev/null @@ -1,24 +0,0 @@ -// +build !linux - -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -// FindMultipathDeviceForDevice unsupported returns "" -func (handler *deviceHandler) FindMultipathDeviceForDevice(device string) string { - return "" -} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/doc.go b/vendor/k8s.io/kubernetes/pkg/volume/util/doc.go deleted file mode 100644 index 620add69d..000000000 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Contains utility code for use by volume plugins. -package util // import "k8s.io/kubernetes/pkg/volume/util" diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/fs.go b/vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs.go index cfa7e30b4..bbb4b0105 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/fs.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs.go @@ -16,14 +16,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package fs import ( "bytes" "fmt" "os/exec" "strings" - "syscall" + + "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/api/resource" ) @@ -31,8 +32,8 @@ import ( // FSInfo linux returns (available bytes, byte capacity, byte usage, total inodes, inodes free, inode usage, error) // for the filesystem that path resides upon. func FsInfo(path string) (int64, int64, int64, int64, int64, int64, error) { - statfs := &syscall.Statfs_t{} - err := syscall.Statfs(path, statfs) + statfs := &unix.Statfs_t{} + err := unix.Statfs(path, statfs) if err != nil { return 0, 0, 0, 0, 0, 0, err } diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/fs_unsupported.go b/vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs_unsupported.go index 8d35d5dae..da41fc8ee 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/fs_unsupported.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs_unsupported.go @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package fs import ( "fmt" diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/io_util.go b/vendor/k8s.io/kubernetes/pkg/volume/util/io_util.go deleted file mode 100644 index e1f30f5c3..000000000 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/io_util.go +++ /dev/null @@ -1,47 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "io/ioutil" - "os" - "path/filepath" -) - -// IoUtil is a mockable util for common IO operations -type IoUtil interface { - ReadDir(dirname string) ([]os.FileInfo, error) - Lstat(name string) (os.FileInfo, error) - EvalSymlinks(path string) (string, error) -} - -type osIOHandler struct{} - -//NewIOHandler Create a new IoHandler implementation -func NewIOHandler() IoUtil { - return &osIOHandler{} -} - -func (handler *osIOHandler) ReadDir(dirname string) ([]os.FileInfo, error) { - return ioutil.ReadDir(dirname) -} -func (handler *osIOHandler) Lstat(name string) (os.FileInfo, error) { - return os.Lstat(name) -} -func (handler *osIOHandler) EvalSymlinks(path string) (string, error) { - return filepath.EvalSymlinks(path) -} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/recyclerclient/recycler_client.go b/vendor/k8s.io/kubernetes/pkg/volume/util/recyclerclient/recycler_client.go new file mode 100644 index 000000000..1af6465c6 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/volume/util/recyclerclient/recycler_client.go @@ -0,0 +1,252 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recyclerclient + +import ( + "fmt" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" + clientset "k8s.io/client-go/kubernetes" +) + +type RecycleEventRecorder func(eventtype, message string) + +// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume +// Recyclers. This function will save the given Pod to the API and watch it +// until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, +// whichever comes first. An attempt to delete a recycler pod is always +// attempted before returning. +// +// In case there is a pod with the same namespace+name already running, this +// function deletes it as it is not able to judge if it is an old recycler +// or user has forged a fake recycler to block Kubernetes from recycling.// +// +// pod - the pod designed by a volume plugin to recycle the volume. pod.Name +// will be overwritten with unique name based on PV.Name. +// client - kube client for API operations. +func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, kubeClient clientset.Interface, recorder RecycleEventRecorder) error { + return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient, recorder)) +} + +// same as above func comments, except 'recyclerClient' is a narrower pod API +// interface to ease testing +func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, recyclerClient recyclerClient) error { + glog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name) + + // Generate unique name for the recycler pod - we need to get "already + // exists" error when a previous controller has already started recycling + // the volume. Here we assume that pv.Name is already unique. + pod.Name = "recycler-for-" + pvName + pod.GenerateName = "" + + stopChannel := make(chan struct{}) + defer close(stopChannel) + podCh, err := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel) + if err != nil { + glog.V(4).Infof("cannot start watcher for pod %s/%s: %v", pod.Namespace, pod.Name, err) + return err + } + + // Start the pod + _, err = recyclerClient.CreatePod(pod) + if err != nil { + if errors.IsAlreadyExists(err) { + deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace) + if deleteErr != nil { + return fmt.Errorf("failed to delete old recycler pod %s/%s: %s", pod.Namespace, pod.Name, deleteErr) + } + // Recycler will try again and the old pod will be hopefully deleted + // at that time. + return fmt.Errorf("old recycler pod found, will retry later") + } + return fmt.Errorf("unexpected error creating recycler pod: %+v", err) + } + err = waitForPod(pod, recyclerClient, podCh) + + // In all cases delete the recycler pod and log its result. + glog.V(2).Infof("deleting recycler pod %s/%s", pod.Namespace, pod.Name) + deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace) + if deleteErr != nil { + glog.Errorf("failed to delete recycler pod %s/%s: %v", pod.Namespace, pod.Name, err) + } + + // Returning recycler error is preferred, the pod will be deleted again on + // the next retry. + if err != nil { + return fmt.Errorf("failed to recycle volume: %s", err) + } + + // Recycle succeeded but we failed to delete the recycler pod. Report it, + // the controller will re-try recycling the PV again shortly. + if deleteErr != nil { + return fmt.Errorf("failed to delete recycler pod: %s", deleteErr) + } + + return nil +} + +// waitForPod watches the pod it until it finishes and send all events on the +// pod to the PV. +func waitForPod(pod *v1.Pod, recyclerClient recyclerClient, podCh <-chan watch.Event) error { + for { + event, ok := <-podCh + if !ok { + return fmt.Errorf("recycler pod %q watch channel had been closed", pod.Name) + } + switch event.Object.(type) { + case *v1.Pod: + // POD changed + pod := event.Object.(*v1.Pod) + glog.V(4).Infof("recycler pod update received: %s %s/%s %s", event.Type, pod.Namespace, pod.Name, pod.Status.Phase) + switch event.Type { + case watch.Added, watch.Modified: + if pod.Status.Phase == v1.PodSucceeded { + // Recycle succeeded. + return nil + } + if pod.Status.Phase == v1.PodFailed { + if pod.Status.Message != "" { + return fmt.Errorf(pod.Status.Message) + } else { + return fmt.Errorf("pod failed, pod.Status.Message unknown.") + } + } + + case watch.Deleted: + return fmt.Errorf("recycler pod was deleted") + + case watch.Error: + return fmt.Errorf("recycler pod watcher failed") + } + + case *v1.Event: + // Event received + podEvent := event.Object.(*v1.Event) + glog.V(4).Infof("recycler event received: %s %s/%s %s/%s %s", event.Type, podEvent.Namespace, podEvent.Name, podEvent.InvolvedObject.Namespace, podEvent.InvolvedObject.Name, podEvent.Message) + if event.Type == watch.Added { + recyclerClient.Event(podEvent.Type, podEvent.Message) + } + } + } +} + +// recyclerClient abstracts access to a Pod by providing a narrower interface. +// This makes it easier to mock a client for testing. +type recyclerClient interface { + CreatePod(pod *v1.Pod) (*v1.Pod, error) + GetPod(name, namespace string) (*v1.Pod, error) + DeletePod(name, namespace string) error + // WatchPod returns a ListWatch for watching a pod. The stopChannel is used + // to close the reflector backing the watch. The caller is responsible for + // derring a close on the channel to stop the reflector. + WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) + // Event sends an event to the volume that is being recycled. + Event(eventtype, message string) +} + +func newRecyclerClient(client clientset.Interface, recorder RecycleEventRecorder) recyclerClient { + return &realRecyclerClient{ + client, + recorder, + } +} + +type realRecyclerClient struct { + client clientset.Interface + recorder RecycleEventRecorder +} + +func (c *realRecyclerClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) { + return c.client.CoreV1().Pods(pod.Namespace).Create(pod) +} + +func (c *realRecyclerClient) GetPod(name, namespace string) (*v1.Pod, error) { + return c.client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) +} + +func (c *realRecyclerClient) DeletePod(name, namespace string) error { + return c.client.CoreV1().Pods(namespace).Delete(name, nil) +} + +func (c *realRecyclerClient) Event(eventtype, message string) { + c.recorder(eventtype, message) +} + +func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) { + podSelector, err := fields.ParseSelector("metadata.name=" + name) + if err != nil { + return nil, err + } + options := metav1.ListOptions{ + FieldSelector: podSelector.String(), + Watch: true, + } + + podWatch, err := c.client.CoreV1().Pods(namespace).Watch(options) + if err != nil { + return nil, err + } + + eventSelector, _ := fields.ParseSelector("involvedObject.name=" + name) + eventWatch, err := c.client.CoreV1().Events(namespace).Watch(metav1.ListOptions{ + FieldSelector: eventSelector.String(), + Watch: true, + }) + if err != nil { + podWatch.Stop() + return nil, err + } + + eventCh := make(chan watch.Event, 30) + + go func() { + defer eventWatch.Stop() + defer podWatch.Stop() + defer close(eventCh) + var podWatchChannelClosed bool + var eventWatchChannelClosed bool + for { + select { + case _ = <-stopChannel: + return + + case podEvent, ok := <-podWatch.ResultChan(): + if !ok { + podWatchChannelClosed = true + } else { + eventCh <- podEvent + } + case eventEvent, ok := <-eventWatch.ResultChan(): + if !ok { + eventWatchChannelClosed = true + } else { + eventCh <- eventEvent + } + } + if podWatchChannelClosed && eventWatchChannelClosed { + break + } + } + }() + + return eventCh, nil +} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/util.go b/vendor/k8s.io/kubernetes/pkg/volume/util/util.go deleted file mode 100644 index 660c3c9db..000000000 --- a/vendor/k8s.io/kubernetes/pkg/volume/util/util.go +++ /dev/null @@ -1,213 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "fmt" - "os" - "path" - - "github.com/golang/glog" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/kubernetes/pkg/api/v1" - v1helper "k8s.io/kubernetes/pkg/api/v1/helper" - storage "k8s.io/kubernetes/pkg/apis/storage/v1" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/util/mount" -) - -const readyFileName = "ready" - -// IsReady checks for the existence of a regular file -// called 'ready' in the given directory and returns -// true if that file exists. -func IsReady(dir string) bool { - readyFile := path.Join(dir, readyFileName) - s, err := os.Stat(readyFile) - if err != nil { - return false - } - - if !s.Mode().IsRegular() { - glog.Errorf("ready-file is not a file: %s", readyFile) - return false - } - - return true -} - -// SetReady creates a file called 'ready' in the given -// directory. It logs an error if the file cannot be -// created. -func SetReady(dir string) { - if err := os.MkdirAll(dir, 0750); err != nil && !os.IsExist(err) { - glog.Errorf("Can't mkdir %s: %v", dir, err) - return - } - - readyFile := path.Join(dir, readyFileName) - file, err := os.Create(readyFile) - if err != nil { - glog.Errorf("Can't touch %s: %v", readyFile, err) - return - } - file.Close() -} - -// UnmountPath is a common unmount routine that unmounts the given path and -// deletes the remaining directory if successful. -func UnmountPath(mountPath string, mounter mount.Interface) error { - return UnmountMountPoint(mountPath, mounter, false /* extensiveMountPointCheck */) -} - -// UnmountMountPoint is a common unmount routine that unmounts the given path and -// deletes the remaining directory if successful. -// if extensiveMountPointCheck is true -// IsNotMountPoint will be called instead of IsLikelyNotMountPoint. -// IsNotMountPoint is more expensive but properly handles bind mounts. -func UnmountMountPoint(mountPath string, mounter mount.Interface, extensiveMountPointCheck bool) error { - if pathExists, pathErr := PathExists(mountPath); pathErr != nil { - return fmt.Errorf("Error checking if path exists: %v", pathErr) - } else if !pathExists { - glog.Warningf("Warning: Unmount skipped because path does not exist: %v", mountPath) - return nil - } - - var notMnt bool - var err error - - if extensiveMountPointCheck { - notMnt, err = mount.IsNotMountPoint(mounter, mountPath) - } else { - notMnt, err = mounter.IsLikelyNotMountPoint(mountPath) - } - - if err != nil { - return err - } - - if notMnt { - glog.Warningf("Warning: %q is not a mountpoint, deleting", mountPath) - return os.Remove(mountPath) - } - - // Unmount the mount path - glog.V(4).Infof("%q is a mountpoint, unmounting", mountPath) - if err := mounter.Unmount(mountPath); err != nil { - return err - } - notMnt, mntErr := mounter.IsLikelyNotMountPoint(mountPath) - if mntErr != nil { - return err - } - if notMnt { - glog.V(4).Infof("%q is unmounted, deleting the directory", mountPath) - return os.Remove(mountPath) - } - return fmt.Errorf("Failed to unmount path %v", mountPath) -} - -// PathExists returns true if the specified path exists. -func PathExists(path string) (bool, error) { - _, err := os.Stat(path) - if err == nil { - return true, nil - } else if os.IsNotExist(err) { - return false, nil - } else { - return false, err - } -} - -// GetSecretForPod locates secret by name in the pod's namespace and returns secret map -func GetSecretForPod(pod *v1.Pod, secretName string, kubeClient clientset.Interface) (map[string]string, error) { - secret := make(map[string]string) - if kubeClient == nil { - return secret, fmt.Errorf("Cannot get kube client") - } - secrets, err := kubeClient.Core().Secrets(pod.Namespace).Get(secretName, metav1.GetOptions{}) - if err != nil { - return secret, err - } - for name, data := range secrets.Data { - secret[name] = string(data) - } - return secret, nil -} - -// GetSecretForPV locates secret by name and namespace, verifies the secret type, and returns secret map -func GetSecretForPV(secretNamespace, secretName, volumePluginName string, kubeClient clientset.Interface) (map[string]string, error) { - secret := make(map[string]string) - if kubeClient == nil { - return secret, fmt.Errorf("Cannot get kube client") - } - secrets, err := kubeClient.Core().Secrets(secretNamespace).Get(secretName, metav1.GetOptions{}) - if err != nil { - return secret, err - } - if secrets.Type != v1.SecretType(volumePluginName) { - return secret, fmt.Errorf("Cannot get secret of type %s", volumePluginName) - } - for name, data := range secrets.Data { - secret[name] = string(data) - } - return secret, nil -} - -func GetClassForVolume(kubeClient clientset.Interface, pv *v1.PersistentVolume) (*storage.StorageClass, error) { - if kubeClient == nil { - return nil, fmt.Errorf("Cannot get kube client") - } - className := v1helper.GetPersistentVolumeClass(pv) - if className == "" { - return nil, fmt.Errorf("Volume has no storage class") - } - - class, err := kubeClient.StorageV1().StorageClasses().Get(className, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return class, nil -} - -// CheckNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels -// This ensures that we don't mount a volume that doesn't belong to this node -func CheckNodeAffinity(pv *v1.PersistentVolume, nodeLabels map[string]string) error { - affinity, err := v1helper.GetStorageNodeAffinityFromAnnotation(pv.Annotations) - if err != nil { - return fmt.Errorf("Error getting storage node affinity: %v", err) - } - if affinity == nil { - return nil - } - - if affinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { - terms := affinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms - glog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", terms) - for _, term := range terms { - selector, err := v1helper.NodeSelectorRequirementsAsSelector(term.MatchExpressions) - if err != nil { - return fmt.Errorf("Failed to parse MatchExpressions: %v", err) - } - if !selector.Matches(labels.Set(nodeLabels)) { - return fmt.Errorf("NodeSelectorTerm %+v does not match node labels", term.MatchExpressions) - } - } - } - return nil -} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/volume.go b/vendor/k8s.io/kubernetes/pkg/volume/volume.go index 76c96d2e2..471963556 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/volume.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/volume.go @@ -19,10 +19,10 @@ package volume import ( "time" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/kubernetes/pkg/api/v1" ) // Volume represents a directory used by pods or hosts on a node. All method @@ -37,6 +37,19 @@ type Volume interface { MetricsProvider } +// BlockVolume interface provides methods to generate global map path +// and pod device map path. +type BlockVolume interface { + // GetGlobalMapPath returns a global map path which contains + // symbolic links associated to a block device. + // ex. plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumePluginDependentPath}/{pod uuid} + GetGlobalMapPath(spec *Spec) (string, error) + // GetPodDeviceMapPath returns a pod device map path + // and name of a symbolic link associated to a block device. + // ex. pods/{podUid}}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/{volumeName} + GetPodDeviceMapPath() (string, string) +} + // MetricsProvider exposes metrics (e.g. used,available space) related to a // Volume. type MetricsProvider interface { @@ -132,6 +145,34 @@ type Unmounter interface { TearDownAt(dir string) error } +// BlockVolumeMapper interface provides methods to set up/map the volume. +type BlockVolumeMapper interface { + BlockVolume + // SetUpDevice prepares the volume to a self-determined directory path, + // which may or may not exist yet and returns combination of physical + // device path of a block volume and error. + // If the plugin is non-attachable, it should prepare the device + // in /dev/ (or where appropriate) and return unique device path. + // Unique device path across kubelet node reboot is required to avoid + // unexpected block volume destruction. + // If the plugin is attachable, it should not do anything here, + // just return empty string for device path. + // Instead, attachable plugin have to return unique device path + // at attacher.Attach() and attacher.WaitForAttach(). + // This may be called more than once, so implementations must be idempotent. + SetUpDevice() (string, error) +} + +// BlockVolumeUnmapper interface provides methods to cleanup/unmap the volumes. +type BlockVolumeUnmapper interface { + BlockVolume + // TearDownDevice removes traces of the SetUpDevice procedure under + // a self-determined directory. + // If the plugin is non-attachable, this method detaches the volume + // from a node. + TearDownDevice(mapPath string, devicePath string) error +} + // Provisioner is an interface that creates templates for PersistentVolumes // and can create the volume as a new resource in the infrastructure provider. type Provisioner interface { @@ -173,7 +214,7 @@ type Attacher interface { // node. If it successfully attaches, the path to the device // is returned. Otherwise, if the device does not attach after // the given timeout period, an error will be returned. - WaitForAttach(spec *Spec, devicePath string, timeout time.Duration) (string, error) + WaitForAttach(spec *Spec, devicePath string, pod *v1.Pod, timeout time.Duration) (string, error) // GetDeviceMountPath returns a path where the device should // be mounted after it is attached. This is a global mount @@ -195,8 +236,10 @@ type BulkVolumeVerifier interface { // Detacher can detach a volume from a node. type Detacher interface { - // Detach the given device from the node with the given Name. - Detach(deviceName string, nodeName types.NodeName) error + // Detach the given volume from the node with the given Name. + // volumeName is name of the volume as returned from plugin's + // GetVolumeName(). + Detach(volumeName string, nodeName types.NodeName) error // UnmountDevice unmounts the global mount of the disk. This // should only be called once all bind mounts have been diff --git a/vendor/k8s.io/kubernetes/pkg/volume/volume_linux.go b/vendor/k8s.io/kubernetes/pkg/volume/volume_linux.go index ef1f45208..d67ee4a95 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/volume_linux.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/volume_linux.go @@ -89,3 +89,17 @@ func SetVolumeOwnership(mounter Mounter, fsGroup *int64) error { return nil }) } + +// IsSameFSGroup is called only for requests to mount an already mounted +// volume. It checks if fsGroup of new mount request is the same or not. +// It returns false if it not the same. It also returns current Gid of a path +// provided for dir variable. +func IsSameFSGroup(dir string, fsGroup int64) (bool, int, error) { + info, err := os.Stat(dir) + if err != nil { + glog.Errorf("Error getting stats for %s (%v)", dir, err) + return false, 0, err + } + s := info.Sys().(*syscall.Stat_t) + return int(s.Gid) == int(fsGroup), int(s.Gid), nil +} diff --git a/vendor/k8s.io/kubernetes/pkg/volume/volume_unsupported.go b/vendor/k8s.io/kubernetes/pkg/volume/volume_unsupported.go index 45a6cc5ca..46a6aeaf0 100644 --- a/vendor/k8s.io/kubernetes/pkg/volume/volume_unsupported.go +++ b/vendor/k8s.io/kubernetes/pkg/volume/volume_unsupported.go @@ -21,3 +21,7 @@ package volume func SetVolumeOwnership(mounter Mounter, fsGroup *int64) error { return nil } + +func IsSameFSGroup(dir string, fsGroup int64) (bool, int, error) { + return true, int(fsGroup), nil +} |