From b53cb57680a6fd7b383636ac2d6cd71003532915 Mon Sep 17 00:00:00 2001 From: Matthew Heon Date: Fri, 4 Dec 2020 16:24:56 -0500 Subject: Initial implementation of volume plugins This implements support for mounting and unmounting volumes backed by volume plugins. Support for actually retrieving plugins requires a pull request to land in containers.conf and then that to be vendored, and as such is not yet ready. Given this, this code is only compile tested. However, the code for everything past retrieving the plugin has been written - there is support for creating, removing, mounting, and unmounting volumes, which should allow full functionality once the c/common PR is merged. A major change is the signature of the MountPoint function for volumes, which now, by necessity, returns an error. Named volumes managed by a plugin do not have a mountpoint we control; instead, it is managed entirely by the plugin. As such, we need to cache the path in the DB, and calls to retrieve it now need to access the DB (and may fail as such). Notably absent is support for SELinux relabelling and chowning these volumes. Given that we don't manage the mountpoint for these volumes, I am extremely reluctant to try and modify it - we could easily break the plugin trying to chown or relabel it. Also, we had no less than *5* separate implementations of inspecting a volume floating around in pkg/infra/abi and pkg/api/handlers/libpod. And none of them used volume.Inspect(), the only correct way of inspecting volumes. Remove them all and consolidate to using the correct way. Compat API is likely still doing things the wrong way, but that is an issue for another day. Fixes #4304 Signed-off-by: Matthew Heon --- libpod/plugin/volume_api.go | 55 ++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 18 deletions(-) (limited to 'libpod/plugin') diff --git a/libpod/plugin/volume_api.go b/libpod/plugin/volume_api.go index 2500a4f36..c5dec651c 100644 --- a/libpod/plugin/volume_api.go +++ b/libpod/plugin/volume_api.go @@ -2,8 +2,9 @@ package plugin import ( "bytes" - "fmt" + "context" "io/ioutil" + "net" "net/http" "os" "path/filepath" @@ -43,7 +44,6 @@ var ( const ( defaultTimeout = 5 * time.Second - defaultPath = "/run/docker/plugins" volumePluginType = "VolumeDriver" ) @@ -64,6 +64,8 @@ type VolumePlugin struct { Name string // SocketPath is the unix socket at which the plugin is accessed. SocketPath string + // Client is the HTTP client we use to connect to the plugin. + Client *http.Client } // This is the response from the activate endpoint of the API. @@ -76,7 +78,7 @@ type activateResponse struct { func validatePlugin(newPlugin *VolumePlugin) error { // It's a socket. Is it a plugin? // Hit the Activate endpoint to find out if it is, and if so what kind - req, err := http.NewRequest("POST", activatePath, nil) + req, err := http.NewRequest("POST", "http://plugin"+activatePath, nil) if err != nil { return errors.Wrapf(err, "error making request to volume plugin %s activation endpoint", newPlugin.Name) } @@ -84,9 +86,7 @@ func validatePlugin(newPlugin *VolumePlugin) error { req.Header.Set("Host", newPlugin.getURI()) req.Header.Set("Content-Type", sdk.DefaultContentTypeV1_1) - client := new(http.Client) - client.Timeout = defaultTimeout - resp, err := client.Do(req) + resp, err := newPlugin.Client.Do(req) if err != nil { return errors.Wrapf(err, "error sending request to plugin %s activation endpoint", newPlugin.Name) } @@ -121,22 +121,28 @@ func validatePlugin(newPlugin *VolumePlugin) error { return errors.Wrapf(ErrNotVolumePlugin, "plugin %s does not implement volume plugin, instead provides %s", newPlugin.Name, strings.Join(respStruct.Implements, ", ")) } + if plugins == nil { + plugins = make(map[string]*VolumePlugin) + } + plugins[newPlugin.Name] = newPlugin return nil } -// GetVolumePlugin gets a single volume plugin by path. -// TODO: We should not be auto-completing based on a default path; we should -// require volumes to have been pre-specified in containers.conf (will need a -// function to pre-populate the plugins list, and we should probably do a lazy -// initialization there to not slow things down too much). -func GetVolumePlugin(name string) (*VolumePlugin, error) { +// GetVolumePlugin gets a single volume plugin, with the given name, at the +// given path. +func GetVolumePlugin(name string, path string) (*VolumePlugin, error) { pluginsLock.Lock() defer pluginsLock.Unlock() plugin, exists := plugins[name] if exists { + // This shouldn't be possible, but just in case... + if plugin.SocketPath != filepath.Clean(path) { + return nil, errors.Wrapf(define.ErrInvalidArg, "requested path %q for volume plugin %s does not match pre-existing path for plugin, %q", path, name, plugin.SocketPath) + } + return plugin, nil } @@ -144,7 +150,20 @@ func GetVolumePlugin(name string) (*VolumePlugin, error) { newPlugin := new(VolumePlugin) newPlugin.Name = name - newPlugin.SocketPath = filepath.Join(defaultPath, fmt.Sprintf("%s.sock", name)) + newPlugin.SocketPath = filepath.Clean(path) + + // Need an HTTP client to force a Unix connection. + // And since we can reuse it, might as well cache it. + client := new(http.Client) + client.Timeout = defaultTimeout + // This bit borrowed from pkg/bindings/connection.go + client.Transport = &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "unix", newPlugin.SocketPath) + }, + DisableCompression: true, + } + newPlugin.Client = client stat, err := os.Stat(newPlugin.SocketPath) if err != nil { @@ -183,6 +202,7 @@ func (p *VolumePlugin) verifyReachable() error { } // Send a request to the volume plugin for handling. +// Callers *MUST* close the response when they are done. func (p *VolumePlugin) sendRequest(toJSON interface{}, hasBody bool, endpoint string) (*http.Response, error) { var ( reqJSON []byte @@ -196,7 +216,7 @@ func (p *VolumePlugin) sendRequest(toJSON interface{}, hasBody bool, endpoint st } } - req, err := http.NewRequest("POST", endpoint, bytes.NewReader(reqJSON)) + req, err := http.NewRequest("POST", "http://plugin"+endpoint, bytes.NewReader(reqJSON)) if err != nil { return nil, errors.Wrapf(err, "error making request to volume plugin %s endpoint %s", p.Name, endpoint) } @@ -204,13 +224,12 @@ func (p *VolumePlugin) sendRequest(toJSON interface{}, hasBody bool, endpoint st req.Header.Set("Host", p.getURI()) req.Header.Set("Content-Type", sdk.DefaultContentTypeV1_1) - client := new(http.Client) - client.Timeout = defaultTimeout - resp, err := client.Do(req) + resp, err := p.Client.Do(req) if err != nil { return nil, errors.Wrapf(err, "error sending request to volume plugin %s endpoint %s", p.Name, endpoint) } - defer resp.Body.Close() + // We are *deliberately not closing* response here. It is the + // responsibility of the caller to do so after reading the response. return resp, nil } -- cgit v1.2.3-54-g00ecf