summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/kubernetes/pkg/volume
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/volume')
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/metrics_du.go8
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/metrics_statfs.go4
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/plugins.go242
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util.go456
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/atomic_writer.go462
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/device_util.go31
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/device_util_linux.go61
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/device_util_unsupported.go24
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/doc.go18
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs.go (renamed from vendor/k8s.io/kubernetes/pkg/volume/util/fs.go)9
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs_unsupported.go (renamed from vendor/k8s.io/kubernetes/pkg/volume/util/fs_unsupported.go)2
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/io_util.go47
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/recyclerclient/recycler_client.go252
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/util/util.go213
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/volume.go51
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/volume_linux.go14
-rw-r--r--vendor/k8s.io/kubernetes/pkg/volume/volume_unsupported.go4
17 files changed, 552 insertions, 1346 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/metrics_du.go b/vendor/k8s.io/kubernetes/pkg/volume/metrics_du.go
index 19a29cbbc..88a985d5a 100644
--- a/vendor/k8s.io/kubernetes/pkg/volume/metrics_du.go
+++ b/vendor/k8s.io/kubernetes/pkg/volume/metrics_du.go
@@ -19,7 +19,7 @@ package volume
import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/kubernetes/pkg/volume/util"
+ "k8s.io/kubernetes/pkg/volume/util/fs"
)
var _ MetricsProvider = &metricsDu{}
@@ -66,7 +66,7 @@ func (md *metricsDu) GetMetrics() (*Metrics, error) {
// runDu executes the "du" command and writes the results to metrics.Used
func (md *metricsDu) runDu(metrics *Metrics) error {
- used, err := util.Du(md.path)
+ used, err := fs.Du(md.path)
if err != nil {
return err
}
@@ -76,7 +76,7 @@ func (md *metricsDu) runDu(metrics *Metrics) error {
// runFind executes the "find" command and writes the results to metrics.InodesUsed
func (md *metricsDu) runFind(metrics *Metrics) error {
- inodesUsed, err := util.Find(md.path)
+ inodesUsed, err := fs.Find(md.path)
if err != nil {
return err
}
@@ -87,7 +87,7 @@ func (md *metricsDu) runFind(metrics *Metrics) error {
// getFsInfo writes metrics.Capacity and metrics.Available from the filesystem
// info
func (md *metricsDu) getFsInfo(metrics *Metrics) error {
- available, capacity, _, inodes, inodesFree, _, err := util.FsInfo(md.path)
+ available, capacity, _, inodes, inodesFree, _, err := fs.FsInfo(md.path)
if err != nil {
return NewFsInfoFailedError(err)
}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/metrics_statfs.go b/vendor/k8s.io/kubernetes/pkg/volume/metrics_statfs.go
index ede4f6ef8..66f99e30a 100644
--- a/vendor/k8s.io/kubernetes/pkg/volume/metrics_statfs.go
+++ b/vendor/k8s.io/kubernetes/pkg/volume/metrics_statfs.go
@@ -19,7 +19,7 @@ package volume
import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/kubernetes/pkg/volume/util"
+ "k8s.io/kubernetes/pkg/volume/util/fs"
)
var _ MetricsProvider = &metricsStatFS{}
@@ -55,7 +55,7 @@ func (md *metricsStatFS) GetMetrics() (*Metrics, error) {
// getFsInfo writes metrics.Capacity, metrics.Used and metrics.Available from the filesystem info
func (md *metricsStatFS) getFsInfo(metrics *Metrics) error {
- available, capacity, usage, inodes, inodesFree, inodesUsed, err := util.FsInfo(md.path)
+ available, capacity, usage, inodes, inodesFree, inodesUsed, err := fs.FsInfo(md.path)
if err != nil {
return NewFsInfoFailedError(err)
}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/plugins.go b/vendor/k8s.io/kubernetes/pkg/volume/plugins.go
index 41721d1ee..ec4ec5791 100644
--- a/vendor/k8s.io/kubernetes/pkg/volume/plugins.go
+++ b/vendor/k8s.io/kubernetes/pkg/volume/plugins.go
@@ -23,15 +23,25 @@ import (
"sync"
"github.com/golang/glog"
+ "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/validation"
- "k8s.io/kubernetes/pkg/api/v1"
- "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
+ clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
+ "k8s.io/kubernetes/pkg/volume/util/recyclerclient"
+)
+
+const (
+ // Common parameter which can be specified in StorageClass to specify the desired FSType
+ // Provisioners SHOULD implement support for this if they are block device based
+ // Must be a filesystem type supported by the host operating system.
+ // Ex. "ext4", "xfs", "ntfs". Default value depends on the provisioner
+ VolumeParameterFSType = "fstype"
)
// VolumeOptions contains option information about a volume.
@@ -42,6 +52,8 @@ type VolumeOptions struct {
// Reclamation policy for a persistent volume
PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy
+ // Mount options for a persistent volume
+ MountOptions []string
// Suggested PV.Name of the PersistentVolume to provision.
// This is a generated name guaranteed to be unique in Kubernetes cluster.
// If you choose not to use it as volume name, ensure uniqueness by either
@@ -60,6 +72,17 @@ type VolumeOptions struct {
Parameters map[string]string
}
+type DynamicPluginProber interface {
+ Init() error
+
+ // If an update has occurred since the last probe, updated = true
+ // and the list of probed plugins is returned.
+ // Otherwise, update = false and probedPlugins = nil.
+ //
+ // If an error occurs, updated and probedPlugins are undefined.
+ Probe() (updated bool, probedPlugins []VolumePlugin, err error)
+}
+
// VolumePlugin is an interface to volume plugins that can be used on a
// kubernetes node (e.g. by kubelet) to instantiate and manage volumes.
type VolumePlugin interface {
@@ -139,7 +162,7 @@ type RecyclableVolumePlugin interface {
// Recycle will use the provided recorder to write any events that might be
// interesting to user. It's expected that caller will pass these events to
// the PV being recycled.
- Recycle(pvName string, spec *Spec, eventRecorder RecycleEventRecorder) error
+ Recycle(pvName string, spec *Spec, eventRecorder recyclerclient.RecycleEventRecorder) error
}
// DeletableVolumePlugin is an extended interface of VolumePlugin and is used
@@ -178,6 +201,34 @@ type AttachableVolumePlugin interface {
GetDeviceMountRefs(deviceMountPath string) ([]string, error)
}
+// ExpandableVolumePlugin is an extended interface of VolumePlugin and is used for volumes that can be
+// expanded
+type ExpandableVolumePlugin interface {
+ VolumePlugin
+ ExpandVolumeDevice(spec *Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error)
+ RequiresFSResize() bool
+}
+
+// BlockVolumePlugin is an extend interface of VolumePlugin and is used for block volumes support.
+type BlockVolumePlugin interface {
+ VolumePlugin
+ // NewBlockVolumeMapper creates a new volume.BlockVolumeMapper from an API specification.
+ // Ownership of the spec pointer in *not* transferred.
+ // - spec: The v1.Volume spec
+ // - pod: The enclosing pod
+ NewBlockVolumeMapper(spec *Spec, podRef *v1.Pod, opts VolumeOptions) (BlockVolumeMapper, error)
+ // NewBlockVolumeUnmapper creates a new volume.BlockVolumeUnmapper from recoverable state.
+ // - name: The volume name, as per the v1.Volume spec.
+ // - podUID: The UID of the enclosing pod
+ NewBlockVolumeUnmapper(name string, podUID types.UID) (BlockVolumeUnmapper, error)
+ // ConstructBlockVolumeSpec constructs a volume spec based on the given
+ // podUID, volume name and a pod device map path.
+ // The spec may have incomplete information due to limited information
+ // from input. This function is used by volume manager to reconstruct
+ // volume spec by reading the volume directories from disk.
+ ConstructBlockVolumeSpec(podUID types.UID, volumeName, mountPath string) (*Spec, error)
+}
+
// VolumeHost is an interface that plugins can use to access the kubelet.
type VolumeHost interface {
// GetPluginDir returns the absolute path to a directory under which
@@ -186,6 +237,11 @@ type VolumeHost interface {
// GetPodPluginDir().
GetPluginDir(pluginName string) string
+ // GetVolumeDevicePluginDir returns the absolute path to a directory
+ // under which a given plugin may store data.
+ // ex. plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumePluginDependentPath}/
+ GetVolumeDevicePluginDir(pluginName string) string
+
// GetPodVolumeDir returns the absolute path a directory which
// represents the named volume under the named plugin for the given
// pod. If the specified pod does not exist, the result of this call
@@ -198,6 +254,13 @@ type VolumeHost interface {
// directory might not actually exist on disk yet.
GetPodPluginDir(podUID types.UID, pluginName string) string
+ // GetPodVolumeDeviceDir returns the absolute path a directory which
+ // represents the named plugin for the given pod.
+ // If the specified pod does not exist, the result of this call
+ // might not exist.
+ // ex. pods/{podUid}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/
+ GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string
+
// GetKubeClient returns a client interface
GetKubeClient() clientset.Interface
@@ -216,7 +279,7 @@ type VolumeHost interface {
GetCloudProvider() cloudprovider.Interface
// Get mounter interface.
- GetMounter() mount.Interface
+ GetMounter(pluginName string) mount.Interface
// Get writer interface for writing data to disk.
GetWriter() io.Writer
@@ -236,15 +299,23 @@ type VolumeHost interface {
// Returns a function that returns a configmap.
GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error)
+ // Returns an interface that should be used to execute any utilities in volume plugins
+ GetExec(pluginName string) mount.Exec
+
// Returns the labels on the node
GetNodeLabels() (map[string]string, error)
+
+ // Returns the name of the node
+ GetNodeName() types.NodeName
}
// VolumePluginMgr tracks registered plugins.
type VolumePluginMgr struct {
- mutex sync.Mutex
- plugins map[string]VolumePlugin
- Host VolumeHost
+ mutex sync.Mutex
+ plugins map[string]VolumePlugin
+ prober DynamicPluginProber
+ probedPlugins []VolumePlugin
+ Host VolumeHost
}
// Spec is an internal representation of a volume. All API volume types translate to Spec.
@@ -339,11 +410,24 @@ func NewSpecFromPersistentVolume(pv *v1.PersistentVolume, readOnly bool) *Spec {
// InitPlugins initializes each plugin. All plugins must have unique names.
// This must be called exactly once before any New* methods are called on any
// plugins.
-func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost) error {
+func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPluginProber, host VolumeHost) error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.Host = host
+
+ if prober == nil {
+ // Use a dummy prober to prevent nil deference.
+ pm.prober = &dummyPluginProber{}
+ } else {
+ pm.prober = prober
+ }
+ if err := pm.prober.Init(); err != nil {
+ // Prober init failure should not affect the initialization of other plugins.
+ glog.Errorf("Error initializing dynamic plugin prober: %s", err)
+ pm.prober = &dummyPluginProber{}
+ }
+
if pm.plugins == nil {
pm.plugins = map[string]VolumePlugin{}
}
@@ -362,7 +446,7 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost)
}
err := plugin.Init(host)
if err != nil {
- glog.Errorf("Failed to load volume plugin %s, error: %s", plugin, err.Error())
+ glog.Errorf("Failed to load volume plugin %s, error: %s", name, err.Error())
allErrs = append(allErrs, err)
continue
}
@@ -372,6 +456,21 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost)
return utilerrors.NewAggregate(allErrs)
}
+func (pm *VolumePluginMgr) initProbedPlugin(probedPlugin VolumePlugin) error {
+ name := probedPlugin.GetPluginName()
+ if errs := validation.IsQualifiedName(name); len(errs) != 0 {
+ return fmt.Errorf("volume plugin has invalid name: %q: %s", name, strings.Join(errs, ";"))
+ }
+
+ err := probedPlugin.Init(pm.Host)
+ if err != nil {
+ return fmt.Errorf("Failed to load volume plugin %s, error: %s", name, err.Error())
+ }
+
+ glog.V(1).Infof("Loaded volume plugin %q", name)
+ return nil
+}
+
// FindPluginBySpec looks for a plugin that can support a given volume
// specification. If no plugins can support or more than one plugin can
// support it, return error.
@@ -379,19 +478,34 @@ func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
- matches := []string{}
+ if spec == nil {
+ return nil, fmt.Errorf("Could not find plugin because volume spec is nil")
+ }
+
+ matchedPluginNames := []string{}
+ matches := []VolumePlugin{}
for k, v := range pm.plugins {
if v.CanSupport(spec) {
- matches = append(matches, k)
+ matchedPluginNames = append(matchedPluginNames, k)
+ matches = append(matches, v)
+ }
+ }
+
+ pm.refreshProbedPlugins()
+ for _, plugin := range pm.probedPlugins {
+ if plugin.CanSupport(spec) {
+ matchedPluginNames = append(matchedPluginNames, plugin.GetPluginName())
+ matches = append(matches, plugin)
}
}
+
if len(matches) == 0 {
return nil, fmt.Errorf("no volume plugin matched")
}
if len(matches) > 1 {
- return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matches, ","))
+ return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ","))
}
- return pm.plugins[matches[0]], nil
+ return matches[0], nil
}
// FindPluginByName fetches a plugin by name or by legacy name. If no plugin
@@ -401,19 +515,52 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) {
defer pm.mutex.Unlock()
// Once we can get rid of legacy names we can reduce this to a map lookup.
- matches := []string{}
+ matchedPluginNames := []string{}
+ matches := []VolumePlugin{}
for k, v := range pm.plugins {
if v.GetPluginName() == name {
- matches = append(matches, k)
+ matchedPluginNames = append(matchedPluginNames, k)
+ matches = append(matches, v)
}
}
+
+ pm.refreshProbedPlugins()
+ for _, plugin := range pm.probedPlugins {
+ if plugin.GetPluginName() == name {
+ matchedPluginNames = append(matchedPluginNames, plugin.GetPluginName())
+ matches = append(matches, plugin)
+ }
+ }
+
if len(matches) == 0 {
return nil, fmt.Errorf("no volume plugin matched")
}
if len(matches) > 1 {
- return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matches, ","))
+ return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ","))
+ }
+ return matches[0], nil
+}
+
+// Check if probedPlugin cache update is required.
+// If it is, initialize all probed plugins and replace the cache with them.
+func (pm *VolumePluginMgr) refreshProbedPlugins() {
+ updated, plugins, err := pm.prober.Probe()
+ if err != nil {
+ glog.Errorf("Error dynamically probing plugins: %s", err)
+ return // Use cached plugins upon failure.
+ }
+
+ if updated {
+ pm.probedPlugins = []VolumePlugin{}
+ for _, plugin := range plugins {
+ if err := pm.initProbedPlugin(plugin); err != nil {
+ glog.Errorf("Error initializing dynamically probed plugin %s; error: %s",
+ plugin.GetPluginName(), err)
+ continue
+ }
+ pm.probedPlugins = append(pm.probedPlugins, plugin)
+ }
}
- return pm.plugins[matches[0]], nil
}
// FindPersistentPluginBySpec looks for a persistent volume plugin that can
@@ -508,7 +655,7 @@ func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (ProvisionableV
return nil, fmt.Errorf("no creatable volume plugin matched")
}
-// FindAttachablePluginBySpec fetches a persistent volume plugin by name.
+// FindAttachablePluginBySpec fetches a persistent volume plugin by spec.
// Unlike the other "FindPlugin" methods, this does not return error if no
// plugin is found. All volumes require a mounter and unmounter, but not
// every volume will have an attacher/detacher.
@@ -538,6 +685,58 @@ func (pm *VolumePluginMgr) FindAttachablePluginByName(name string) (AttachableVo
return nil, nil
}
+// FindExpandablePluginBySpec fetches a persistent volume plugin by spec.
+func (pm *VolumePluginMgr) FindExpandablePluginBySpec(spec *Spec) (ExpandableVolumePlugin, error) {
+ volumePlugin, err := pm.FindPluginBySpec(spec)
+ if err != nil {
+ return nil, err
+ }
+
+ if expandableVolumePlugin, ok := volumePlugin.(ExpandableVolumePlugin); ok {
+ return expandableVolumePlugin, nil
+ }
+ return nil, nil
+}
+
+// FindExpandablePluginBySpec fetches a persistent volume plugin by name.
+func (pm *VolumePluginMgr) FindExpandablePluginByName(name string) (ExpandableVolumePlugin, error) {
+ volumePlugin, err := pm.FindPluginByName(name)
+ if err != nil {
+ return nil, err
+ }
+
+ if expandableVolumePlugin, ok := volumePlugin.(ExpandableVolumePlugin); ok {
+ return expandableVolumePlugin, nil
+ }
+ return nil, nil
+}
+
+// FindMapperPluginBySpec fetches a block volume plugin by spec.
+func (pm *VolumePluginMgr) FindMapperPluginBySpec(spec *Spec) (BlockVolumePlugin, error) {
+ volumePlugin, err := pm.FindPluginBySpec(spec)
+ if err != nil {
+ return nil, err
+ }
+
+ if blockVolumePlugin, ok := volumePlugin.(BlockVolumePlugin); ok {
+ return blockVolumePlugin, nil
+ }
+ return nil, nil
+}
+
+// FindMapperPluginByName fetches a block volume plugin by name.
+func (pm *VolumePluginMgr) FindMapperPluginByName(name string) (BlockVolumePlugin, error) {
+ volumePlugin, err := pm.FindPluginByName(name)
+ if err != nil {
+ return nil, err
+ }
+
+ if blockVolumePlugin, ok := volumePlugin.(BlockVolumePlugin); ok {
+ return blockVolumePlugin, nil
+ }
+ return nil, nil
+}
+
// NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler
// pod. By default, a recycler pod simply runs "rm -rf" on a volume and tests
// for emptiness. Most attributes of the template will be correct for most
@@ -574,7 +773,7 @@ func NewPersistentVolumeRecyclerPodTemplate() *v1.Pod {
Containers: []v1.Container{
{
Name: "pv-recycler",
- Image: "gcr.io/google_containers/busybox",
+ Image: "busybox:1.27",
Command: []string{"/bin/sh"},
Args: []string{"-c", "test -e /scrub && rm -rf /scrub/..?* /scrub/.[!.]* /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"},
VolumeMounts: []v1.VolumeMount{
@@ -601,3 +800,8 @@ func ValidateRecyclerPodTemplate(pod *v1.Pod) error {
}
return nil
}
+
+type dummyPluginProber struct{}
+
+func (*dummyPluginProber) Init() error { return nil }
+func (*dummyPluginProber) Probe() (bool, []VolumePlugin, error) { return false, nil, nil }
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util.go b/vendor/k8s.io/kubernetes/pkg/volume/util.go
deleted file mode 100644
index 2e5610362..000000000
--- a/vendor/k8s.io/kubernetes/pkg/volume/util.go
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
-Copyright 2014 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package volume
-
-import (
- "fmt"
- "reflect"
-
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/watch"
- "k8s.io/kubernetes/pkg/api/v1"
- "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
-
- "hash/fnv"
- "math/rand"
- "strconv"
- "strings"
-
- "github.com/golang/glog"
- "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/api/resource"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/sets"
- volutil "k8s.io/kubernetes/pkg/volume/util"
-)
-
-type RecycleEventRecorder func(eventtype, message string)
-
-// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume
-// Recyclers. This function will save the given Pod to the API and watch it
-// until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded,
-// whichever comes first. An attempt to delete a recycler pod is always
-// attempted before returning.
-//
-// In case there is a pod with the same namespace+name already running, this
-// function assumes it's an older instance of the recycler pod and watches
-// this old pod instead of starting a new one.
-//
-// pod - the pod designed by a volume plugin to recycle the volume. pod.Name
-// will be overwritten with unique name based on PV.Name.
-// client - kube client for API operations.
-func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, kubeClient clientset.Interface, recorder RecycleEventRecorder) error {
- return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient, recorder))
-}
-
-// same as above func comments, except 'recyclerClient' is a narrower pod API
-// interface to ease testing
-func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, recyclerClient recyclerClient) error {
- glog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name)
-
- // Generate unique name for the recycler pod - we need to get "already
- // exists" error when a previous controller has already started recycling
- // the volume. Here we assume that pv.Name is already unique.
- pod.Name = "recycler-for-" + pvName
- pod.GenerateName = ""
-
- stopChannel := make(chan struct{})
- defer close(stopChannel)
- podCh, err := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel)
- if err != nil {
- glog.V(4).Infof("cannot start watcher for pod %s/%s: %v", pod.Namespace, pod.Name, err)
- return err
- }
-
- // Start the pod
- _, err = recyclerClient.CreatePod(pod)
- if err != nil {
- if errors.IsAlreadyExists(err) {
- glog.V(5).Infof("old recycler pod %q found for volume", pod.Name)
- } else {
- return fmt.Errorf("unexpected error creating recycler pod: %+v\n", err)
- }
- }
- defer func(pod *v1.Pod) {
- glog.V(2).Infof("deleting recycler pod %s/%s", pod.Namespace, pod.Name)
- if err := recyclerClient.DeletePod(pod.Name, pod.Namespace); err != nil {
- glog.Errorf("failed to delete recycler pod %s/%s: %v", pod.Namespace, pod.Name, err)
- }
- }(pod)
-
- // Now only the old pod or the new pod run. Watch it until it finishes
- // and send all events on the pod to the PV
- for {
- event, ok := <-podCh
- if !ok {
- return fmt.Errorf("recycler pod %q watch channel had been closed", pod.Name)
- }
- switch event.Object.(type) {
- case *v1.Pod:
- // POD changed
- pod := event.Object.(*v1.Pod)
- glog.V(4).Infof("recycler pod update received: %s %s/%s %s", event.Type, pod.Namespace, pod.Name, pod.Status.Phase)
- switch event.Type {
- case watch.Added, watch.Modified:
- if pod.Status.Phase == v1.PodSucceeded {
- // Recycle succeeded.
- return nil
- }
- if pod.Status.Phase == v1.PodFailed {
- if pod.Status.Message != "" {
- return fmt.Errorf(pod.Status.Message)
- } else {
- return fmt.Errorf("pod failed, pod.Status.Message unknown.")
- }
- }
-
- case watch.Deleted:
- return fmt.Errorf("recycler pod was deleted")
-
- case watch.Error:
- return fmt.Errorf("recycler pod watcher failed")
- }
-
- case *v1.Event:
- // Event received
- podEvent := event.Object.(*v1.Event)
- glog.V(4).Infof("recycler event received: %s %s/%s %s/%s %s", event.Type, podEvent.Namespace, podEvent.Name, podEvent.InvolvedObject.Namespace, podEvent.InvolvedObject.Name, podEvent.Message)
- if event.Type == watch.Added {
- recyclerClient.Event(podEvent.Type, podEvent.Message)
- }
- }
- }
-}
-
-// recyclerClient abstracts access to a Pod by providing a narrower interface.
-// This makes it easier to mock a client for testing.
-type recyclerClient interface {
- CreatePod(pod *v1.Pod) (*v1.Pod, error)
- GetPod(name, namespace string) (*v1.Pod, error)
- DeletePod(name, namespace string) error
- // WatchPod returns a ListWatch for watching a pod. The stopChannel is used
- // to close the reflector backing the watch. The caller is responsible for
- // derring a close on the channel to stop the reflector.
- WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error)
- // Event sends an event to the volume that is being recycled.
- Event(eventtype, message string)
-}
-
-func newRecyclerClient(client clientset.Interface, recorder RecycleEventRecorder) recyclerClient {
- return &realRecyclerClient{
- client,
- recorder,
- }
-}
-
-type realRecyclerClient struct {
- client clientset.Interface
- recorder RecycleEventRecorder
-}
-
-func (c *realRecyclerClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) {
- return c.client.Core().Pods(pod.Namespace).Create(pod)
-}
-
-func (c *realRecyclerClient) GetPod(name, namespace string) (*v1.Pod, error) {
- return c.client.Core().Pods(namespace).Get(name, metav1.GetOptions{})
-}
-
-func (c *realRecyclerClient) DeletePod(name, namespace string) error {
- return c.client.Core().Pods(namespace).Delete(name, nil)
-}
-
-func (c *realRecyclerClient) Event(eventtype, message string) {
- c.recorder(eventtype, message)
-}
-
-func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) {
- podSelector, _ := fields.ParseSelector("metadata.name=" + name)
- options := metav1.ListOptions{
- FieldSelector: podSelector.String(),
- Watch: true,
- }
-
- podWatch, err := c.client.Core().Pods(namespace).Watch(options)
- if err != nil {
- return nil, err
- }
-
- eventSelector, _ := fields.ParseSelector("involvedObject.name=" + name)
- eventWatch, err := c.client.Core().Events(namespace).Watch(metav1.ListOptions{
- FieldSelector: eventSelector.String(),
- Watch: true,
- })
- if err != nil {
- podWatch.Stop()
- return nil, err
- }
-
- eventCh := make(chan watch.Event, 30)
-
- go func() {
- defer eventWatch.Stop()
- defer podWatch.Stop()
- defer close(eventCh)
- var podWatchChannelClosed bool
- var eventWatchChannelClosed bool
- for {
- select {
- case _ = <-stopChannel:
- return
-
- case podEvent, ok := <-podWatch.ResultChan():
- if !ok {
- podWatchChannelClosed = true
- } else {
- eventCh <- podEvent
- }
- case eventEvent, ok := <-eventWatch.ResultChan():
- if !ok {
- eventWatchChannelClosed = true
- } else {
- eventCh <- eventEvent
- }
- }
- if podWatchChannelClosed && eventWatchChannelClosed {
- break
- }
- }
- }()
-
- return eventCh, nil
-}
-
-// CalculateTimeoutForVolume calculates time for a Recycler pod to complete a
-// recycle operation. The calculation and return value is either the
-// minimumTimeout or the timeoutIncrement per Gi of storage size, whichever is
-// greater.
-func CalculateTimeoutForVolume(minimumTimeout, timeoutIncrement int, pv *v1.PersistentVolume) int64 {
- giQty := resource.MustParse("1Gi")
- pvQty := pv.Spec.Capacity[v1.ResourceStorage]
- giSize := giQty.Value()
- pvSize := pvQty.Value()
- timeout := (pvSize / giSize) * int64(timeoutIncrement)
- if timeout < int64(minimumTimeout) {
- return int64(minimumTimeout)
- } else {
- return timeout
- }
-}
-
-// RoundUpSize calculates how many allocation units are needed to accommodate
-// a volume of given size. E.g. when user wants 1500MiB volume, while AWS EBS
-// allocates volumes in gibibyte-sized chunks,
-// RoundUpSize(1500 * 1024*1024, 1024*1024*1024) returns '2'
-// (2 GiB is the smallest allocatable volume that can hold 1500MiB)
-func RoundUpSize(volumeSizeBytes int64, allocationUnitBytes int64) int64 {
- return (volumeSizeBytes + allocationUnitBytes - 1) / allocationUnitBytes
-}
-
-// GenerateVolumeName returns a PV name with clusterName prefix. The function
-// should be used to generate a name of GCE PD or Cinder volume. It basically
-// adds "<clusterName>-dynamic-" before the PV name, making sure the resulting
-// string fits given length and cuts "dynamic" if not.
-func GenerateVolumeName(clusterName, pvName string, maxLength int) string {
- prefix := clusterName + "-dynamic"
- pvLen := len(pvName)
-
- // cut the "<clusterName>-dynamic" to fit full pvName into maxLength
- // +1 for the '-' dash
- if pvLen+1+len(prefix) > maxLength {
- prefix = prefix[:maxLength-pvLen-1]
- }
- return prefix + "-" + pvName
-}
-
-// Check if the path from the mounter is empty.
-func GetPath(mounter Mounter) (string, error) {
- path := mounter.GetPath()
- if path == "" {
- return "", fmt.Errorf("Path is empty %s", reflect.TypeOf(mounter).String())
- }
- return path, nil
-}
-
-// ChooseZone implements our heuristics for choosing a zone for volume creation based on the volume name
-// Volumes are generally round-robin-ed across all active zones, using the hash of the PVC Name.
-// However, if the PVCName ends with `-<integer>`, we will hash the prefix, and then add the integer to the hash.
-// This means that a StatefulSet's volumes (`claimname-statefulsetname-id`) will spread across available zones,
-// assuming the id values are consecutive.
-func ChooseZoneForVolume(zones sets.String, pvcName string) string {
- // We create the volume in a zone determined by the name
- // Eventually the scheduler will coordinate placement into an available zone
- var hash uint32
- var index uint32
-
- if pvcName == "" {
- // We should always be called with a name; this shouldn't happen
- glog.Warningf("No name defined during volume create; choosing random zone")
-
- hash = rand.Uint32()
- } else {
- hashString := pvcName
-
- // Heuristic to make sure that volumes in a StatefulSet are spread across zones
- // StatefulSet PVCs are (currently) named ClaimName-StatefulSetName-Id,
- // where Id is an integer index.
- // Note though that if a StatefulSet pod has multiple claims, we need them to be
- // in the same zone, because otherwise the pod will be unable to mount both volumes,
- // and will be unschedulable. So we hash _only_ the "StatefulSetName" portion when
- // it looks like `ClaimName-StatefulSetName-Id`.
- // We continue to round-robin volume names that look like `Name-Id` also; this is a useful
- // feature for users that are creating statefulset-like functionality without using statefulsets.
- lastDash := strings.LastIndexByte(pvcName, '-')
- if lastDash != -1 {
- statefulsetIDString := pvcName[lastDash+1:]
- statefulsetID, err := strconv.ParseUint(statefulsetIDString, 10, 32)
- if err == nil {
- // Offset by the statefulsetID, so we round-robin across zones
- index = uint32(statefulsetID)
- // We still hash the volume name, but only the prefix
- hashString = pvcName[:lastDash]
-
- // In the special case where it looks like `ClaimName-StatefulSetName-Id`,
- // hash only the StatefulSetName, so that different claims on the same StatefulSet
- // member end up in the same zone.
- // Note that StatefulSetName (and ClaimName) might themselves both have dashes.
- // We actually just take the portion after the final - of ClaimName-StatefulSetName.
- // For our purposes it doesn't much matter (just suboptimal spreading).
- lastDash := strings.LastIndexByte(hashString, '-')
- if lastDash != -1 {
- hashString = hashString[lastDash+1:]
- }
-
- glog.V(2).Infof("Detected StatefulSet-style volume name %q; index=%d", pvcName, index)
- }
- }
-
- // We hash the (base) volume name, so we don't bias towards the first N zones
- h := fnv.New32()
- h.Write([]byte(hashString))
- hash = h.Sum32()
- }
-
- // Zones.List returns zones in a consistent order (sorted)
- // We do have a potential failure case where volumes will not be properly spread,
- // if the set of zones changes during StatefulSet volume creation. However, this is
- // probably relatively unlikely because we expect the set of zones to be essentially
- // static for clusters.
- // Hopefully we can address this problem if/when we do full scheduler integration of
- // PVC placement (which could also e.g. avoid putting volumes in overloaded or
- // unhealthy zones)
- zoneSlice := zones.List()
- zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))]
-
- glog.V(2).Infof("Creating volume for PVC %q; chose zone=%q from zones=%q", pvcName, zone, zoneSlice)
- return zone
-}
-
-// UnmountViaEmptyDir delegates the tear down operation for secret, configmap, git_repo and downwardapi
-// to empty_dir
-func UnmountViaEmptyDir(dir string, host VolumeHost, volName string, volSpec Spec, podUID types.UID) error {
- glog.V(3).Infof("Tearing down volume %v for pod %v at %v", volName, podUID, dir)
-
- if pathExists, pathErr := volutil.PathExists(dir); pathErr != nil {
- return fmt.Errorf("Error checking if path exists: %v", pathErr)
- } else if !pathExists {
- glog.Warningf("Warning: Unmount skipped because path does not exist: %v", dir)
- return nil
- }
-
- // Wrap EmptyDir, let it do the teardown.
- wrapped, err := host.NewWrapperUnmounter(volName, volSpec, podUID)
- if err != nil {
- return err
- }
- return wrapped.TearDownAt(dir)
-}
-
-// MountOptionFromSpec extracts and joins mount options from volume spec with supplied options
-func MountOptionFromSpec(spec *Spec, options ...string) []string {
- pv := spec.PersistentVolume
-
- if pv != nil {
- if mo, ok := pv.Annotations[v1.MountOptionAnnotation]; ok {
- moList := strings.Split(mo, ",")
- return JoinMountOptions(moList, options)
- }
-
- }
- return options
-}
-
-// JoinMountOptions joins mount options eliminating duplicates
-func JoinMountOptions(userOptions []string, systemOptions []string) []string {
- allMountOptions := sets.NewString()
-
- for _, mountOption := range userOptions {
- if len(mountOption) > 0 {
- allMountOptions.Insert(mountOption)
- }
- }
-
- for _, mountOption := range systemOptions {
- allMountOptions.Insert(mountOption)
- }
- return allMountOptions.UnsortedList()
-}
-
-// ZonesToSet converts a string containing a comma separated list of zones to set
-func ZonesToSet(zonesString string) (sets.String, error) {
- zonesSlice := strings.Split(zonesString, ",")
- zonesSet := make(sets.String)
- for _, zone := range zonesSlice {
- trimmedZone := strings.TrimSpace(zone)
- if trimmedZone == "" {
- return make(sets.String), fmt.Errorf("comma separated list of zones (%q) must not contain an empty zone", zonesString)
- }
- zonesSet.Insert(trimmedZone)
- }
- return zonesSet, nil
-}
-
-// ValidateZone returns:
-// - an error in case zone is an empty string or contains only any combination of spaces and tab characters
-// - nil otherwise
-func ValidateZone(zone string) error {
- if strings.TrimSpace(zone) == "" {
- return fmt.Errorf("the provided %q zone is not valid, it's an empty string or contains only spaces and tab characters", zone)
- }
- return nil
-}
-
-// AccessModesContains returns whether the requested mode is contained by modes
-func AccessModesContains(modes []v1.PersistentVolumeAccessMode, mode v1.PersistentVolumeAccessMode) bool {
- for _, m := range modes {
- if m == mode {
- return true
- }
- }
- return false
-}
-
-// AccessModesContainedInAll returns whether all of the requested modes are contained by modes
-func AccessModesContainedInAll(indexedModes []v1.PersistentVolumeAccessMode, requestedModes []v1.PersistentVolumeAccessMode) bool {
- for _, mode := range requestedModes {
- if !AccessModesContains(indexedModes, mode) {
- return false
- }
- }
- return true
-}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/atomic_writer.go b/vendor/k8s.io/kubernetes/pkg/volume/util/atomic_writer.go
deleted file mode 100644
index 5eef55b45..000000000
--- a/vendor/k8s.io/kubernetes/pkg/volume/util/atomic_writer.go
+++ /dev/null
@@ -1,462 +0,0 @@
-/*
-Copyright 2016 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package util
-
-import (
- "bytes"
- "fmt"
- "io/ioutil"
- "os"
- "path"
- "path/filepath"
- "runtime"
- "strings"
- "time"
-
- "github.com/golang/glog"
-
- "k8s.io/apimachinery/pkg/util/sets"
-)
-
-const (
- maxFileNameLength = 255
- maxPathLength = 4096
-)
-
-// AtomicWriter handles atomically projecting content for a set of files into
-// a target directory.
-//
-// Note:
-//
-// 1. AtomicWriter reserves the set of pathnames starting with `..`.
-// 2. AtomicWriter offers no concurrency guarantees and must be synchronized
-// by the caller.
-//
-// The visible files in this volume are symlinks to files in the writer's data
-// directory. Actual files are stored in a hidden timestamped directory which
-// is symlinked to by the data directory. The timestamped directory and
-// data directory symlink are created in the writer's target dir.  This scheme
-// allows the files to be atomically updated by changing the target of the
-// data directory symlink.
-//
-// Consumers of the target directory can monitor the ..data symlink using
-// inotify or fanotify to receive events when the content in the volume is
-// updated.
-type AtomicWriter struct {
- targetDir string
- logContext string
-}
-
-type FileProjection struct {
- Data []byte
- Mode int32
-}
-
-// NewAtomicWriter creates a new AtomicWriter configured to write to the given
-// target directory, or returns an error if the target directory does not exist.
-func NewAtomicWriter(targetDir string, logContext string) (*AtomicWriter, error) {
- _, err := os.Stat(targetDir)
- if os.IsNotExist(err) {
- return nil, err
- }
-
- return &AtomicWriter{targetDir: targetDir, logContext: logContext}, nil
-}
-
-const (
- dataDirName = "..data"
- newDataDirName = "..data_tmp"
-)
-
-// Write does an atomic projection of the given payload into the writer's target
-// directory. Input paths must not begin with '..'.
-//
-// The Write algorithm is:
-//
-// 1. The payload is validated; if the payload is invalid, the function returns
-// 2. The user-visible portion of the volume is walked to determine whether any
-// portion of the payload was deleted and is still present on disk.
-// If the payload is already present on disk and there are no deleted files,
-// the function returns
-// 3. A check is made to determine whether data present in the payload has changed
-// 4.  A new timestamped dir is created
-// 5. The payload is written to the new timestamped directory
-// 6.  Symlinks and directory for new user-visible files are created (if needed).
-//
-// For example, consider the files:
-// <target-dir>/podName
-// <target-dir>/user/labels
-// <target-dir>/k8s/annotations
-//
-// The user visible files are symbolic links into the internal data directory:
-// <target-dir>/podName -> ..data/podName
-// <target-dir>/usr/labels -> ../..data/usr/labels
-// <target-dir>/k8s/annotations -> ../..data/k8s/annotations
-//
-// Relative links are created into the data directory for files in subdirectories.
-//
-// The data directory itself is a link to a timestamped directory with
-// the real data:
-// <target-dir>/..data -> ..2016_02_01_15_04_05.12345678/
-// 7.  The current timestamped directory is detected by reading the data directory
-// symlink
-// 8.  A symlink to the new timestamped directory ..data_tmp is created that will
-// become the new data directory
-// 9.  The new data directory symlink is renamed to the data directory; rename is atomic
-// 10. Old paths are removed from the user-visible portion of the target directory
-// 11.  The previous timestamped directory is removed, if it exists
-func (w *AtomicWriter) Write(payload map[string]FileProjection) error {
- // (1)
- cleanPayload, err := validatePayload(payload)
- if err != nil {
- glog.Errorf("%s: invalid payload: %v", w.logContext, err)
- return err
- }
-
- // (2)
- pathsToRemove, err := w.pathsToRemove(cleanPayload)
- if err != nil {
- glog.Errorf("%s: error determining user-visible files to remove: %v", w.logContext, err)
- return err
- }
-
- // (3)
- if should, err := w.shouldWritePayload(cleanPayload); err != nil {
- glog.Errorf("%s: error determining whether payload should be written to disk: %v", w.logContext, err)
- return err
- } else if !should && len(pathsToRemove) == 0 {
- glog.V(4).Infof("%s: no update required for target directory %v", w.logContext, w.targetDir)
- return nil
- } else {
- glog.V(4).Infof("%s: write required for target directory %v", w.logContext, w.targetDir)
- }
-
- // (4)
- tsDir, err := w.newTimestampDir()
- if err != nil {
- glog.V(4).Infof("%s: error creating new ts data directory: %v", w.logContext, err)
- return err
- }
-
- // (5)
- if err = w.writePayloadToDir(cleanPayload, tsDir); err != nil {
- glog.Errorf("%s: error writing payload to ts data directory %s: %v", w.logContext, tsDir, err)
- return err
- } else {
- glog.V(4).Infof("%s: performed write of new data to ts data directory: %s", w.logContext, tsDir)
- }
-
- // (6)
- if err = w.createUserVisibleFiles(cleanPayload); err != nil {
- glog.Errorf("%s: error creating visible symlinks in %s: %v", w.logContext, w.targetDir, err)
- return err
- }
-
- // (7)
- _, tsDirName := filepath.Split(tsDir)
- dataDirPath := path.Join(w.targetDir, dataDirName)
- oldTsDir, err := os.Readlink(dataDirPath)
- if err != nil && !os.IsNotExist(err) {
- glog.Errorf("%s: error reading link for data directory: %v", w.logContext, err)
- return err
- }
-
- // (8)
- newDataDirPath := path.Join(w.targetDir, newDataDirName)
- if err = os.Symlink(tsDirName, newDataDirPath); err != nil {
- os.RemoveAll(tsDir)
- glog.Errorf("%s: error creating symbolic link for atomic update: %v", w.logContext, err)
- return err
- }
-
- // (9)
- if runtime.GOOS == "windows" {
- os.Remove(dataDirPath)
- err = os.Symlink(tsDirName, dataDirPath)
- os.Remove(newDataDirPath)
- } else {
- err = os.Rename(newDataDirPath, dataDirPath)
- }
- if err != nil {
- os.Remove(newDataDirPath)
- os.RemoveAll(tsDir)
- glog.Errorf("%s: error renaming symbolic link for data directory %s: %v", w.logContext, newDataDirPath, err)
- return err
- }
-
- // (10)
- if err = w.removeUserVisiblePaths(pathsToRemove); err != nil {
- glog.Errorf("%s: error removing old visible symlinks: %v", w.logContext, err)
- return err
- }
-
- // (11)
- if len(oldTsDir) > 0 {
- if err = os.RemoveAll(path.Join(w.targetDir, oldTsDir)); err != nil {
- glog.Errorf("%s: error removing old data directory %s: %v", w.logContext, oldTsDir, err)
- return err
- }
- }
-
- return nil
-}
-
-// validatePayload returns an error if any path in the payload returns a copy of the payload with the paths cleaned.
-func validatePayload(payload map[string]FileProjection) (map[string]FileProjection, error) {
- cleanPayload := make(map[string]FileProjection)
- for k, content := range payload {
- if err := validatePath(k); err != nil {
- return nil, err
- }
-
- cleanPayload[path.Clean(k)] = content
- }
-
- return cleanPayload, nil
-}
-
-// validatePath validates a single path, returning an error if the path is
-// invalid. paths may not:
-//
-// 1. be absolute
-// 2. contain '..' as an element
-// 3. start with '..'
-// 4. contain filenames larger than 255 characters
-// 5. be longer than 4096 characters
-func validatePath(targetPath string) error {
- // TODO: somehow unify this with the similar api validation,
- // validateVolumeSourcePath; the error semantics are just different enough
- // from this that it was time-prohibitive trying to find the right
- // refactoring to re-use.
- if targetPath == "" {
- return fmt.Errorf("invalid path: must not be empty: %q", targetPath)
- }
- if path.IsAbs(targetPath) {
- return fmt.Errorf("invalid path: must be relative path: %s", targetPath)
- }
-
- if len(targetPath) > maxPathLength {
- return fmt.Errorf("invalid path: must be less than %d characters", maxPathLength)
- }
-
- items := strings.Split(targetPath, string(os.PathSeparator))
- for _, item := range items {
- if item == ".." {
- return fmt.Errorf("invalid path: must not contain '..': %s", targetPath)
- }
- if len(item) > maxFileNameLength {
- return fmt.Errorf("invalid path: filenames must be less than %d characters", maxFileNameLength)
- }
- }
- if strings.HasPrefix(items[0], "..") && len(items[0]) > 2 {
- return fmt.Errorf("invalid path: must not start with '..': %s", targetPath)
- }
-
- return nil
-}
-
-// shouldWritePayload returns whether the payload should be written to disk.
-func (w *AtomicWriter) shouldWritePayload(payload map[string]FileProjection) (bool, error) {
- for userVisiblePath, fileProjection := range payload {
- shouldWrite, err := w.shouldWriteFile(path.Join(w.targetDir, userVisiblePath), fileProjection.Data)
- if err != nil {
- return false, err
- }
-
- if shouldWrite {
- return true, nil
- }
- }
-
- return false, nil
-}
-
-// shouldWriteFile returns whether a new version of a file should be written to disk.
-func (w *AtomicWriter) shouldWriteFile(path string, content []byte) (bool, error) {
- _, err := os.Lstat(path)
- if os.IsNotExist(err) {
- return true, nil
- }
-
- contentOnFs, err := ioutil.ReadFile(path)
- if err != nil {
- return false, err
- }
-
- return (bytes.Compare(content, contentOnFs) != 0), nil
-}
-
-// pathsToRemove walks the user-visible portion of the target directory and
-// determines which paths should be removed (if any) after the payload is
-// written to the target directory.
-func (w *AtomicWriter) pathsToRemove(payload map[string]FileProjection) (sets.String, error) {
- paths := sets.NewString()
- visitor := func(path string, info os.FileInfo, err error) error {
- if path == w.targetDir {
- return nil
- }
-
- relativePath := strings.TrimPrefix(path, w.targetDir)
- if runtime.GOOS == "windows" {
- relativePath = strings.TrimPrefix(relativePath, "\\")
- } else {
- relativePath = strings.TrimPrefix(relativePath, "/")
- }
- if strings.HasPrefix(relativePath, "..") {
- return nil
- }
-
- paths.Insert(relativePath)
- return nil
- }
-
- err := filepath.Walk(w.targetDir, visitor)
- if os.IsNotExist(err) {
- return nil, nil
- } else if err != nil {
- return nil, err
- }
- glog.V(5).Infof("%s: current paths: %+v", w.targetDir, paths.List())
-
- newPaths := sets.NewString()
- for file := range payload {
- // add all subpaths for the payload to the set of new paths
- // to avoid attempting to remove non-empty dirs
- for subPath := file; subPath != ""; {
- newPaths.Insert(subPath)
- subPath, _ = filepath.Split(subPath)
- subPath = strings.TrimSuffix(subPath, "/")
- }
- }
- glog.V(5).Infof("%s: new paths: %+v", w.targetDir, newPaths.List())
-
- result := paths.Difference(newPaths)
- glog.V(5).Infof("%s: paths to remove: %+v", w.targetDir, result)
-
- return result, nil
-}
-
-// newTimestampDir creates a new timestamp directory
-func (w *AtomicWriter) newTimestampDir() (string, error) {
- tsDir, err := ioutil.TempDir(w.targetDir, fmt.Sprintf("..%s.", time.Now().Format("1981_02_01_15_04_05")))
- if err != nil {
- glog.Errorf("%s: unable to create new temp directory: %v", w.logContext, err)
- return "", err
- }
-
- // 0755 permissions are needed to allow 'group' and 'other' to recurse the
- // directory tree. do a chmod here to ensure that permissions are set correctly
- // regardless of the process' umask.
- err = os.Chmod(tsDir, 0755)
- if err != nil {
- glog.Errorf("%s: unable to set mode on new temp directory: %v", w.logContext, err)
- return "", err
- }
-
- return tsDir, nil
-}
-
-// writePayloadToDir writes the given payload to the given directory. The
-// directory must exist.
-func (w *AtomicWriter) writePayloadToDir(payload map[string]FileProjection, dir string) error {
- for userVisiblePath, fileProjection := range payload {
- content := fileProjection.Data
- mode := os.FileMode(fileProjection.Mode)
- fullPath := path.Join(dir, userVisiblePath)
- baseDir, _ := filepath.Split(fullPath)
-
- err := os.MkdirAll(baseDir, os.ModePerm)
- if err != nil {
- glog.Errorf("%s: unable to create directory %s: %v", w.logContext, baseDir, err)
- return err
- }
-
- err = ioutil.WriteFile(fullPath, content, mode)
- if err != nil {
- glog.Errorf("%s: unable to write file %s with mode %v: %v", w.logContext, fullPath, mode, err)
- return err
- }
- // Chmod is needed because ioutil.WriteFile() ends up calling
- // open(2) to create the file, so the final mode used is "mode &
- // ~umask". But we want to make sure the specified mode is used
- // in the file no matter what the umask is.
- err = os.Chmod(fullPath, mode)
- if err != nil {
- glog.Errorf("%s: unable to write file %s with mode %v: %v", w.logContext, fullPath, mode, err)
- }
- }
-
- return nil
-}
-
-// createUserVisibleFiles creates the relative symlinks for all the
-// files configured in the payload. If the directory in a file path does not
-// exist, it is created.
-//
-// Viz:
-// For files: "bar", "foo/bar", "baz/bar", "foo/baz/blah"
-// the following symlinks and subdirectories are created:
-// bar -> ..data/bar
-// foo/bar -> ../..data/foo/bar
-// baz/bar -> ../..data/baz/bar
-// foo/baz/blah -> ../../..data/foo/baz/blah
-func (w *AtomicWriter) createUserVisibleFiles(payload map[string]FileProjection) error {
- for userVisiblePath := range payload {
- dir, _ := filepath.Split(userVisiblePath)
- subDirs := 0
- if len(dir) > 0 {
- // If dir is not empty, the projection path contains at least one
- // subdirectory (example: userVisiblePath := "foo/bar").
- // Since filepath.Split leaves a trailing path separator, in this
- // example, dir = "foo/". In order to calculate the number of
- // subdirectories, we must subtract 1 from the number returned by split.
- subDirs = len(strings.Split(dir, "/")) - 1
- err := os.MkdirAll(path.Join(w.targetDir, dir), os.ModePerm)
- if err != nil {
- return err
- }
- }
- _, err := os.Readlink(path.Join(w.targetDir, userVisiblePath))
- if err != nil && os.IsNotExist(err) {
- // The link into the data directory for this path doesn't exist; create it,
- // respecting the number of subdirectories necessary to link
- // correctly back into the data directory.
- visibleFile := path.Join(w.targetDir, userVisiblePath)
- dataDirFile := path.Join(strings.Repeat("../", subDirs), dataDirName, userVisiblePath)
-
- err = os.Symlink(dataDirFile, visibleFile)
- if err != nil {
- return err
- }
- }
- }
- return nil
-}
-
-// removeUserVisiblePaths removes the set of paths from the user-visible
-// portion of the writer's target directory.
-func (w *AtomicWriter) removeUserVisiblePaths(paths sets.String) error {
- orderedPaths := paths.List()
- for ii := len(orderedPaths) - 1; ii >= 0; ii-- {
- if err := os.Remove(path.Join(w.targetDir, orderedPaths[ii])); err != nil {
- glog.Errorf("%s: error pruning old user-visible path %s: %v", w.logContext, orderedPaths[ii], err)
- return err
- }
- }
-
- return nil
-}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util.go b/vendor/k8s.io/kubernetes/pkg/volume/util/device_util.go
deleted file mode 100644
index 9098d7b85..000000000
--- a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util.go
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Copyright 2016 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package util
-
-//DeviceUtil is a util for common device methods
-type DeviceUtil interface {
- FindMultipathDeviceForDevice(disk string) string
-}
-
-type deviceHandler struct {
- get_io IoUtil
-}
-
-//NewDeviceHandler Create a new IoHandler implementation
-func NewDeviceHandler(io IoUtil) DeviceUtil {
- return &deviceHandler{get_io: io}
-}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_linux.go b/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_linux.go
deleted file mode 100644
index 0d9851140..000000000
--- a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_linux.go
+++ /dev/null
@@ -1,61 +0,0 @@
-// +build linux
-
-/*
-Copyright 2016 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package util
-
-import (
- "errors"
- "strings"
-)
-
-// FindMultipathDeviceForDevice given a device name like /dev/sdx, find the devicemapper parent
-func (handler *deviceHandler) FindMultipathDeviceForDevice(device string) string {
- io := handler.get_io
- disk, err := findDeviceForPath(device, io)
- if err != nil {
- return ""
- }
- sysPath := "/sys/block/"
- if dirs, err := io.ReadDir(sysPath); err == nil {
- for _, f := range dirs {
- name := f.Name()
- if strings.HasPrefix(name, "dm-") {
- if _, err1 := io.Lstat(sysPath + name + "/slaves/" + disk); err1 == nil {
- return "/dev/" + name
- }
- }
- }
- }
- return ""
-}
-
-// findDeviceForPath Find the underlaying disk for a linked path such as /dev/disk/by-path/XXXX or /dev/mapper/XXXX
-// will return sdX or hdX etc, if /dev/sdX is passed in then sdX will be returned
-func findDeviceForPath(path string, io IoUtil) (string, error) {
- devicePath, err := io.EvalSymlinks(path)
- if err != nil {
- return "", err
- }
- // if path /dev/hdX split into "", "dev", "hdX" then we will
- // return just the last part
- parts := strings.Split(devicePath, "/")
- if len(parts) == 3 && strings.HasPrefix(parts[1], "dev") {
- return parts[2], nil
- }
- return "", errors.New("Illegal path for device " + devicePath)
-}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_unsupported.go b/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_unsupported.go
deleted file mode 100644
index 6afb1f139..000000000
--- a/vendor/k8s.io/kubernetes/pkg/volume/util/device_util_unsupported.go
+++ /dev/null
@@ -1,24 +0,0 @@
-// +build !linux
-
-/*
-Copyright 2016 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package util
-
-// FindMultipathDeviceForDevice unsupported returns ""
-func (handler *deviceHandler) FindMultipathDeviceForDevice(device string) string {
- return ""
-}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/doc.go b/vendor/k8s.io/kubernetes/pkg/volume/util/doc.go
deleted file mode 100644
index 620add69d..000000000
--- a/vendor/k8s.io/kubernetes/pkg/volume/util/doc.go
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
-Copyright 2015 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-// Contains utility code for use by volume plugins.
-package util // import "k8s.io/kubernetes/pkg/volume/util"
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/fs.go b/vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs.go
index cfa7e30b4..bbb4b0105 100644
--- a/vendor/k8s.io/kubernetes/pkg/volume/util/fs.go
+++ b/vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs.go
@@ -16,14 +16,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package util
+package fs
import (
"bytes"
"fmt"
"os/exec"
"strings"
- "syscall"
+
+ "golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/api/resource"
)
@@ -31,8 +32,8 @@ import (
// FSInfo linux returns (available bytes, byte capacity, byte usage, total inodes, inodes free, inode usage, error)
// for the filesystem that path resides upon.
func FsInfo(path string) (int64, int64, int64, int64, int64, int64, error) {
- statfs := &syscall.Statfs_t{}
- err := syscall.Statfs(path, statfs)
+ statfs := &unix.Statfs_t{}
+ err := unix.Statfs(path, statfs)
if err != nil {
return 0, 0, 0, 0, 0, 0, err
}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/fs_unsupported.go b/vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs_unsupported.go
index 8d35d5dae..da41fc8ee 100644
--- a/vendor/k8s.io/kubernetes/pkg/volume/util/fs_unsupported.go
+++ b/vendor/k8s.io/kubernetes/pkg/volume/util/fs/fs_unsupported.go
@@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package util
+package fs
import (
"fmt"
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/io_util.go b/vendor/k8s.io/kubernetes/pkg/volume/util/io_util.go
deleted file mode 100644
index e1f30f5c3..000000000
--- a/vendor/k8s.io/kubernetes/pkg/volume/util/io_util.go
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-Copyright 2016 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package util
-
-import (
- "io/ioutil"
- "os"
- "path/filepath"
-)
-
-// IoUtil is a mockable util for common IO operations
-type IoUtil interface {
- ReadDir(dirname string) ([]os.FileInfo, error)
- Lstat(name string) (os.FileInfo, error)
- EvalSymlinks(path string) (string, error)
-}
-
-type osIOHandler struct{}
-
-//NewIOHandler Create a new IoHandler implementation
-func NewIOHandler() IoUtil {
- return &osIOHandler{}
-}
-
-func (handler *osIOHandler) ReadDir(dirname string) ([]os.FileInfo, error) {
- return ioutil.ReadDir(dirname)
-}
-func (handler *osIOHandler) Lstat(name string) (os.FileInfo, error) {
- return os.Lstat(name)
-}
-func (handler *osIOHandler) EvalSymlinks(path string) (string, error) {
- return filepath.EvalSymlinks(path)
-}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/recyclerclient/recycler_client.go b/vendor/k8s.io/kubernetes/pkg/volume/util/recyclerclient/recycler_client.go
new file mode 100644
index 000000000..1af6465c6
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/volume/util/recyclerclient/recycler_client.go
@@ -0,0 +1,252 @@
+/*
+Copyright 2018 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package recyclerclient
+
+import (
+ "fmt"
+
+ "github.com/golang/glog"
+ "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/watch"
+ clientset "k8s.io/client-go/kubernetes"
+)
+
+type RecycleEventRecorder func(eventtype, message string)
+
+// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume
+// Recyclers. This function will save the given Pod to the API and watch it
+// until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded,
+// whichever comes first. An attempt to delete a recycler pod is always
+// attempted before returning.
+//
+// In case there is a pod with the same namespace+name already running, this
+// function deletes it as it is not able to judge if it is an old recycler
+// or user has forged a fake recycler to block Kubernetes from recycling.//
+//
+// pod - the pod designed by a volume plugin to recycle the volume. pod.Name
+// will be overwritten with unique name based on PV.Name.
+// client - kube client for API operations.
+func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, kubeClient clientset.Interface, recorder RecycleEventRecorder) error {
+ return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient, recorder))
+}
+
+// same as above func comments, except 'recyclerClient' is a narrower pod API
+// interface to ease testing
+func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *v1.Pod, recyclerClient recyclerClient) error {
+ glog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name)
+
+ // Generate unique name for the recycler pod - we need to get "already
+ // exists" error when a previous controller has already started recycling
+ // the volume. Here we assume that pv.Name is already unique.
+ pod.Name = "recycler-for-" + pvName
+ pod.GenerateName = ""
+
+ stopChannel := make(chan struct{})
+ defer close(stopChannel)
+ podCh, err := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel)
+ if err != nil {
+ glog.V(4).Infof("cannot start watcher for pod %s/%s: %v", pod.Namespace, pod.Name, err)
+ return err
+ }
+
+ // Start the pod
+ _, err = recyclerClient.CreatePod(pod)
+ if err != nil {
+ if errors.IsAlreadyExists(err) {
+ deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace)
+ if deleteErr != nil {
+ return fmt.Errorf("failed to delete old recycler pod %s/%s: %s", pod.Namespace, pod.Name, deleteErr)
+ }
+ // Recycler will try again and the old pod will be hopefully deleted
+ // at that time.
+ return fmt.Errorf("old recycler pod found, will retry later")
+ }
+ return fmt.Errorf("unexpected error creating recycler pod: %+v", err)
+ }
+ err = waitForPod(pod, recyclerClient, podCh)
+
+ // In all cases delete the recycler pod and log its result.
+ glog.V(2).Infof("deleting recycler pod %s/%s", pod.Namespace, pod.Name)
+ deleteErr := recyclerClient.DeletePod(pod.Name, pod.Namespace)
+ if deleteErr != nil {
+ glog.Errorf("failed to delete recycler pod %s/%s: %v", pod.Namespace, pod.Name, err)
+ }
+
+ // Returning recycler error is preferred, the pod will be deleted again on
+ // the next retry.
+ if err != nil {
+ return fmt.Errorf("failed to recycle volume: %s", err)
+ }
+
+ // Recycle succeeded but we failed to delete the recycler pod. Report it,
+ // the controller will re-try recycling the PV again shortly.
+ if deleteErr != nil {
+ return fmt.Errorf("failed to delete recycler pod: %s", deleteErr)
+ }
+
+ return nil
+}
+
+// waitForPod watches the pod it until it finishes and send all events on the
+// pod to the PV.
+func waitForPod(pod *v1.Pod, recyclerClient recyclerClient, podCh <-chan watch.Event) error {
+ for {
+ event, ok := <-podCh
+ if !ok {
+ return fmt.Errorf("recycler pod %q watch channel had been closed", pod.Name)
+ }
+ switch event.Object.(type) {
+ case *v1.Pod:
+ // POD changed
+ pod := event.Object.(*v1.Pod)
+ glog.V(4).Infof("recycler pod update received: %s %s/%s %s", event.Type, pod.Namespace, pod.Name, pod.Status.Phase)
+ switch event.Type {
+ case watch.Added, watch.Modified:
+ if pod.Status.Phase == v1.PodSucceeded {
+ // Recycle succeeded.
+ return nil
+ }
+ if pod.Status.Phase == v1.PodFailed {
+ if pod.Status.Message != "" {
+ return fmt.Errorf(pod.Status.Message)
+ } else {
+ return fmt.Errorf("pod failed, pod.Status.Message unknown.")
+ }
+ }
+
+ case watch.Deleted:
+ return fmt.Errorf("recycler pod was deleted")
+
+ case watch.Error:
+ return fmt.Errorf("recycler pod watcher failed")
+ }
+
+ case *v1.Event:
+ // Event received
+ podEvent := event.Object.(*v1.Event)
+ glog.V(4).Infof("recycler event received: %s %s/%s %s/%s %s", event.Type, podEvent.Namespace, podEvent.Name, podEvent.InvolvedObject.Namespace, podEvent.InvolvedObject.Name, podEvent.Message)
+ if event.Type == watch.Added {
+ recyclerClient.Event(podEvent.Type, podEvent.Message)
+ }
+ }
+ }
+}
+
+// recyclerClient abstracts access to a Pod by providing a narrower interface.
+// This makes it easier to mock a client for testing.
+type recyclerClient interface {
+ CreatePod(pod *v1.Pod) (*v1.Pod, error)
+ GetPod(name, namespace string) (*v1.Pod, error)
+ DeletePod(name, namespace string) error
+ // WatchPod returns a ListWatch for watching a pod. The stopChannel is used
+ // to close the reflector backing the watch. The caller is responsible for
+ // derring a close on the channel to stop the reflector.
+ WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error)
+ // Event sends an event to the volume that is being recycled.
+ Event(eventtype, message string)
+}
+
+func newRecyclerClient(client clientset.Interface, recorder RecycleEventRecorder) recyclerClient {
+ return &realRecyclerClient{
+ client,
+ recorder,
+ }
+}
+
+type realRecyclerClient struct {
+ client clientset.Interface
+ recorder RecycleEventRecorder
+}
+
+func (c *realRecyclerClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) {
+ return c.client.CoreV1().Pods(pod.Namespace).Create(pod)
+}
+
+func (c *realRecyclerClient) GetPod(name, namespace string) (*v1.Pod, error) {
+ return c.client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
+}
+
+func (c *realRecyclerClient) DeletePod(name, namespace string) error {
+ return c.client.CoreV1().Pods(namespace).Delete(name, nil)
+}
+
+func (c *realRecyclerClient) Event(eventtype, message string) {
+ c.recorder(eventtype, message)
+}
+
+func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) {
+ podSelector, err := fields.ParseSelector("metadata.name=" + name)
+ if err != nil {
+ return nil, err
+ }
+ options := metav1.ListOptions{
+ FieldSelector: podSelector.String(),
+ Watch: true,
+ }
+
+ podWatch, err := c.client.CoreV1().Pods(namespace).Watch(options)
+ if err != nil {
+ return nil, err
+ }
+
+ eventSelector, _ := fields.ParseSelector("involvedObject.name=" + name)
+ eventWatch, err := c.client.CoreV1().Events(namespace).Watch(metav1.ListOptions{
+ FieldSelector: eventSelector.String(),
+ Watch: true,
+ })
+ if err != nil {
+ podWatch.Stop()
+ return nil, err
+ }
+
+ eventCh := make(chan watch.Event, 30)
+
+ go func() {
+ defer eventWatch.Stop()
+ defer podWatch.Stop()
+ defer close(eventCh)
+ var podWatchChannelClosed bool
+ var eventWatchChannelClosed bool
+ for {
+ select {
+ case _ = <-stopChannel:
+ return
+
+ case podEvent, ok := <-podWatch.ResultChan():
+ if !ok {
+ podWatchChannelClosed = true
+ } else {
+ eventCh <- podEvent
+ }
+ case eventEvent, ok := <-eventWatch.ResultChan():
+ if !ok {
+ eventWatchChannelClosed = true
+ } else {
+ eventCh <- eventEvent
+ }
+ }
+ if podWatchChannelClosed && eventWatchChannelClosed {
+ break
+ }
+ }
+ }()
+
+ return eventCh, nil
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/util/util.go b/vendor/k8s.io/kubernetes/pkg/volume/util/util.go
deleted file mode 100644
index 660c3c9db..000000000
--- a/vendor/k8s.io/kubernetes/pkg/volume/util/util.go
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
-Copyright 2015 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package util
-
-import (
- "fmt"
- "os"
- "path"
-
- "github.com/golang/glog"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/kubernetes/pkg/api/v1"
- v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
- storage "k8s.io/kubernetes/pkg/apis/storage/v1"
- "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
- "k8s.io/kubernetes/pkg/util/mount"
-)
-
-const readyFileName = "ready"
-
-// IsReady checks for the existence of a regular file
-// called 'ready' in the given directory and returns
-// true if that file exists.
-func IsReady(dir string) bool {
- readyFile := path.Join(dir, readyFileName)
- s, err := os.Stat(readyFile)
- if err != nil {
- return false
- }
-
- if !s.Mode().IsRegular() {
- glog.Errorf("ready-file is not a file: %s", readyFile)
- return false
- }
-
- return true
-}
-
-// SetReady creates a file called 'ready' in the given
-// directory. It logs an error if the file cannot be
-// created.
-func SetReady(dir string) {
- if err := os.MkdirAll(dir, 0750); err != nil && !os.IsExist(err) {
- glog.Errorf("Can't mkdir %s: %v", dir, err)
- return
- }
-
- readyFile := path.Join(dir, readyFileName)
- file, err := os.Create(readyFile)
- if err != nil {
- glog.Errorf("Can't touch %s: %v", readyFile, err)
- return
- }
- file.Close()
-}
-
-// UnmountPath is a common unmount routine that unmounts the given path and
-// deletes the remaining directory if successful.
-func UnmountPath(mountPath string, mounter mount.Interface) error {
- return UnmountMountPoint(mountPath, mounter, false /* extensiveMountPointCheck */)
-}
-
-// UnmountMountPoint is a common unmount routine that unmounts the given path and
-// deletes the remaining directory if successful.
-// if extensiveMountPointCheck is true
-// IsNotMountPoint will be called instead of IsLikelyNotMountPoint.
-// IsNotMountPoint is more expensive but properly handles bind mounts.
-func UnmountMountPoint(mountPath string, mounter mount.Interface, extensiveMountPointCheck bool) error {
- if pathExists, pathErr := PathExists(mountPath); pathErr != nil {
- return fmt.Errorf("Error checking if path exists: %v", pathErr)
- } else if !pathExists {
- glog.Warningf("Warning: Unmount skipped because path does not exist: %v", mountPath)
- return nil
- }
-
- var notMnt bool
- var err error
-
- if extensiveMountPointCheck {
- notMnt, err = mount.IsNotMountPoint(mounter, mountPath)
- } else {
- notMnt, err = mounter.IsLikelyNotMountPoint(mountPath)
- }
-
- if err != nil {
- return err
- }
-
- if notMnt {
- glog.Warningf("Warning: %q is not a mountpoint, deleting", mountPath)
- return os.Remove(mountPath)
- }
-
- // Unmount the mount path
- glog.V(4).Infof("%q is a mountpoint, unmounting", mountPath)
- if err := mounter.Unmount(mountPath); err != nil {
- return err
- }
- notMnt, mntErr := mounter.IsLikelyNotMountPoint(mountPath)
- if mntErr != nil {
- return err
- }
- if notMnt {
- glog.V(4).Infof("%q is unmounted, deleting the directory", mountPath)
- return os.Remove(mountPath)
- }
- return fmt.Errorf("Failed to unmount path %v", mountPath)
-}
-
-// PathExists returns true if the specified path exists.
-func PathExists(path string) (bool, error) {
- _, err := os.Stat(path)
- if err == nil {
- return true, nil
- } else if os.IsNotExist(err) {
- return false, nil
- } else {
- return false, err
- }
-}
-
-// GetSecretForPod locates secret by name in the pod's namespace and returns secret map
-func GetSecretForPod(pod *v1.Pod, secretName string, kubeClient clientset.Interface) (map[string]string, error) {
- secret := make(map[string]string)
- if kubeClient == nil {
- return secret, fmt.Errorf("Cannot get kube client")
- }
- secrets, err := kubeClient.Core().Secrets(pod.Namespace).Get(secretName, metav1.GetOptions{})
- if err != nil {
- return secret, err
- }
- for name, data := range secrets.Data {
- secret[name] = string(data)
- }
- return secret, nil
-}
-
-// GetSecretForPV locates secret by name and namespace, verifies the secret type, and returns secret map
-func GetSecretForPV(secretNamespace, secretName, volumePluginName string, kubeClient clientset.Interface) (map[string]string, error) {
- secret := make(map[string]string)
- if kubeClient == nil {
- return secret, fmt.Errorf("Cannot get kube client")
- }
- secrets, err := kubeClient.Core().Secrets(secretNamespace).Get(secretName, metav1.GetOptions{})
- if err != nil {
- return secret, err
- }
- if secrets.Type != v1.SecretType(volumePluginName) {
- return secret, fmt.Errorf("Cannot get secret of type %s", volumePluginName)
- }
- for name, data := range secrets.Data {
- secret[name] = string(data)
- }
- return secret, nil
-}
-
-func GetClassForVolume(kubeClient clientset.Interface, pv *v1.PersistentVolume) (*storage.StorageClass, error) {
- if kubeClient == nil {
- return nil, fmt.Errorf("Cannot get kube client")
- }
- className := v1helper.GetPersistentVolumeClass(pv)
- if className == "" {
- return nil, fmt.Errorf("Volume has no storage class")
- }
-
- class, err := kubeClient.StorageV1().StorageClasses().Get(className, metav1.GetOptions{})
- if err != nil {
- return nil, err
- }
- return class, nil
-}
-
-// CheckNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels
-// This ensures that we don't mount a volume that doesn't belong to this node
-func CheckNodeAffinity(pv *v1.PersistentVolume, nodeLabels map[string]string) error {
- affinity, err := v1helper.GetStorageNodeAffinityFromAnnotation(pv.Annotations)
- if err != nil {
- return fmt.Errorf("Error getting storage node affinity: %v", err)
- }
- if affinity == nil {
- return nil
- }
-
- if affinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
- terms := affinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
- glog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", terms)
- for _, term := range terms {
- selector, err := v1helper.NodeSelectorRequirementsAsSelector(term.MatchExpressions)
- if err != nil {
- return fmt.Errorf("Failed to parse MatchExpressions: %v", err)
- }
- if !selector.Matches(labels.Set(nodeLabels)) {
- return fmt.Errorf("NodeSelectorTerm %+v does not match node labels", term.MatchExpressions)
- }
- }
- }
- return nil
-}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/volume.go b/vendor/k8s.io/kubernetes/pkg/volume/volume.go
index 76c96d2e2..471963556 100644
--- a/vendor/k8s.io/kubernetes/pkg/volume/volume.go
+++ b/vendor/k8s.io/kubernetes/pkg/volume/volume.go
@@ -19,10 +19,10 @@ package volume
import (
"time"
+ "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
- "k8s.io/kubernetes/pkg/api/v1"
)
// Volume represents a directory used by pods or hosts on a node. All method
@@ -37,6 +37,19 @@ type Volume interface {
MetricsProvider
}
+// BlockVolume interface provides methods to generate global map path
+// and pod device map path.
+type BlockVolume interface {
+ // GetGlobalMapPath returns a global map path which contains
+ // symbolic links associated to a block device.
+ // ex. plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumePluginDependentPath}/{pod uuid}
+ GetGlobalMapPath(spec *Spec) (string, error)
+ // GetPodDeviceMapPath returns a pod device map path
+ // and name of a symbolic link associated to a block device.
+ // ex. pods/{podUid}}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/{volumeName}
+ GetPodDeviceMapPath() (string, string)
+}
+
// MetricsProvider exposes metrics (e.g. used,available space) related to a
// Volume.
type MetricsProvider interface {
@@ -132,6 +145,34 @@ type Unmounter interface {
TearDownAt(dir string) error
}
+// BlockVolumeMapper interface provides methods to set up/map the volume.
+type BlockVolumeMapper interface {
+ BlockVolume
+ // SetUpDevice prepares the volume to a self-determined directory path,
+ // which may or may not exist yet and returns combination of physical
+ // device path of a block volume and error.
+ // If the plugin is non-attachable, it should prepare the device
+ // in /dev/ (or where appropriate) and return unique device path.
+ // Unique device path across kubelet node reboot is required to avoid
+ // unexpected block volume destruction.
+ // If the plugin is attachable, it should not do anything here,
+ // just return empty string for device path.
+ // Instead, attachable plugin have to return unique device path
+ // at attacher.Attach() and attacher.WaitForAttach().
+ // This may be called more than once, so implementations must be idempotent.
+ SetUpDevice() (string, error)
+}
+
+// BlockVolumeUnmapper interface provides methods to cleanup/unmap the volumes.
+type BlockVolumeUnmapper interface {
+ BlockVolume
+ // TearDownDevice removes traces of the SetUpDevice procedure under
+ // a self-determined directory.
+ // If the plugin is non-attachable, this method detaches the volume
+ // from a node.
+ TearDownDevice(mapPath string, devicePath string) error
+}
+
// Provisioner is an interface that creates templates for PersistentVolumes
// and can create the volume as a new resource in the infrastructure provider.
type Provisioner interface {
@@ -173,7 +214,7 @@ type Attacher interface {
// node. If it successfully attaches, the path to the device
// is returned. Otherwise, if the device does not attach after
// the given timeout period, an error will be returned.
- WaitForAttach(spec *Spec, devicePath string, timeout time.Duration) (string, error)
+ WaitForAttach(spec *Spec, devicePath string, pod *v1.Pod, timeout time.Duration) (string, error)
// GetDeviceMountPath returns a path where the device should
// be mounted after it is attached. This is a global mount
@@ -195,8 +236,10 @@ type BulkVolumeVerifier interface {
// Detacher can detach a volume from a node.
type Detacher interface {
- // Detach the given device from the node with the given Name.
- Detach(deviceName string, nodeName types.NodeName) error
+ // Detach the given volume from the node with the given Name.
+ // volumeName is name of the volume as returned from plugin's
+ // GetVolumeName().
+ Detach(volumeName string, nodeName types.NodeName) error
// UnmountDevice unmounts the global mount of the disk. This
// should only be called once all bind mounts have been
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/volume_linux.go b/vendor/k8s.io/kubernetes/pkg/volume/volume_linux.go
index ef1f45208..d67ee4a95 100644
--- a/vendor/k8s.io/kubernetes/pkg/volume/volume_linux.go
+++ b/vendor/k8s.io/kubernetes/pkg/volume/volume_linux.go
@@ -89,3 +89,17 @@ func SetVolumeOwnership(mounter Mounter, fsGroup *int64) error {
return nil
})
}
+
+// IsSameFSGroup is called only for requests to mount an already mounted
+// volume. It checks if fsGroup of new mount request is the same or not.
+// It returns false if it not the same. It also returns current Gid of a path
+// provided for dir variable.
+func IsSameFSGroup(dir string, fsGroup int64) (bool, int, error) {
+ info, err := os.Stat(dir)
+ if err != nil {
+ glog.Errorf("Error getting stats for %s (%v)", dir, err)
+ return false, 0, err
+ }
+ s := info.Sys().(*syscall.Stat_t)
+ return int(s.Gid) == int(fsGroup), int(s.Gid), nil
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/volume/volume_unsupported.go b/vendor/k8s.io/kubernetes/pkg/volume/volume_unsupported.go
index 45a6cc5ca..46a6aeaf0 100644
--- a/vendor/k8s.io/kubernetes/pkg/volume/volume_unsupported.go
+++ b/vendor/k8s.io/kubernetes/pkg/volume/volume_unsupported.go
@@ -21,3 +21,7 @@ package volume
func SetVolumeOwnership(mounter Mounter, fsGroup *int64) error {
return nil
}
+
+func IsSameFSGroup(dir string, fsGroup int64) (bool, int, error) {
+ return true, int(fsGroup), nil
+}