From 594ac4a14658a90ad7bc5541dab6349a0c629a5c Mon Sep 17 00:00:00 2001 From: Matthew Heon Date: Mon, 16 Nov 2020 14:32:12 -0500 Subject: Add API for communicating with Docker volume plugins Docker provides extensibility through a plugin system, of which several types are available. This provides an initial library API for communicating with one type of plugins, volume plugins. Volume plugins allow for an external service to create and manage a volume on Podman's behalf. This does not integrate the plugin system into Libpod or Podman yet; that will come in subsequent pull requests. Signed-off-by: Matthew Heon --- libpod/plugin/volume_api.go | 454 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 454 insertions(+) create mode 100644 libpod/plugin/volume_api.go (limited to 'libpod/plugin/volume_api.go') diff --git a/libpod/plugin/volume_api.go b/libpod/plugin/volume_api.go new file mode 100644 index 000000000..2500a4f36 --- /dev/null +++ b/libpod/plugin/volume_api.go @@ -0,0 +1,454 @@ +package plugin + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/containers/podman/v2/libpod/define" + "github.com/docker/go-plugins-helpers/sdk" + "github.com/docker/go-plugins-helpers/volume" + jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +// TODO: We should add syntax for specifying plugins to containers.conf, and +// support for loading based on that. + +// Copied from docker/go-plugins-helpers/volume/api.go - not exported, so we +// need to do this to get at them. +// These are well-established paths that should not change unless the plugin API +// version changes. +var ( + activatePath = "/Plugin.Activate" + createPath = "/VolumeDriver.Create" + getPath = "/VolumeDriver.Get" + listPath = "/VolumeDriver.List" + removePath = "/VolumeDriver.Remove" + hostVirtualPath = "/VolumeDriver.Path" + mountPath = "/VolumeDriver.Mount" + unmountPath = "/VolumeDriver.Unmount" + // nolint + capabilitiesPath = "/VolumeDriver.Capabilities" +) + +const ( + defaultTimeout = 5 * time.Second + defaultPath = "/run/docker/plugins" + volumePluginType = "VolumeDriver" +) + +var ( + ErrNotPlugin = errors.New("target does not appear to be a valid plugin") + ErrNotVolumePlugin = errors.New("plugin is not a volume plugin") + ErrPluginRemoved = errors.New("plugin is no longer available (shut down?)") + + // This stores available, initialized volume plugins. + pluginsLock sync.Mutex + plugins map[string]*VolumePlugin +) + +// VolumePlugin is a single volume plugin. +type VolumePlugin struct { + // Name is the name of the volume plugin. This will be used to refer to + // it. + Name string + // SocketPath is the unix socket at which the plugin is accessed. + SocketPath string +} + +// This is the response from the activate endpoint of the API. +type activateResponse struct { + Implements []string +} + +// Validate that the given plugin is good to use. +// Add it to available plugins if so. +func validatePlugin(newPlugin *VolumePlugin) error { + // It's a socket. Is it a plugin? + // Hit the Activate endpoint to find out if it is, and if so what kind + req, err := http.NewRequest("POST", activatePath, nil) + if err != nil { + return errors.Wrapf(err, "error making request to volume plugin %s activation endpoint", newPlugin.Name) + } + + req.Header.Set("Host", newPlugin.getURI()) + req.Header.Set("Content-Type", sdk.DefaultContentTypeV1_1) + + client := new(http.Client) + client.Timeout = defaultTimeout + resp, err := client.Do(req) + if err != nil { + return errors.Wrapf(err, "error sending request to plugin %s activation endpoint", newPlugin.Name) + } + defer resp.Body.Close() + + // Response code MUST be 200. Anything else, we have to assume it's not + // a valid plugin. + if resp.StatusCode != 200 { + return errors.Wrapf(ErrNotPlugin, "got status code %d from activation endpoint for plugin %s", resp.StatusCode, newPlugin.Name) + } + + // Read and decode the body so we can tell if this is a volume plugin. + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Wrapf(err, "error reading activation response body from plugin %s", newPlugin.Name) + } + + respStruct := new(activateResponse) + if err := json.Unmarshal(respBytes, respStruct); err != nil { + return errors.Wrapf(err, "error unmarshalling plugin %s activation response", newPlugin.Name) + } + + foundVolume := false + for _, pluginType := range respStruct.Implements { + if pluginType == volumePluginType { + foundVolume = true + break + } + } + + if !foundVolume { + return errors.Wrapf(ErrNotVolumePlugin, "plugin %s does not implement volume plugin, instead provides %s", newPlugin.Name, strings.Join(respStruct.Implements, ", ")) + } + + plugins[newPlugin.Name] = newPlugin + + return nil +} + +// GetVolumePlugin gets a single volume plugin by path. +// TODO: We should not be auto-completing based on a default path; we should +// require volumes to have been pre-specified in containers.conf (will need a +// function to pre-populate the plugins list, and we should probably do a lazy +// initialization there to not slow things down too much). +func GetVolumePlugin(name string) (*VolumePlugin, error) { + pluginsLock.Lock() + defer pluginsLock.Unlock() + + plugin, exists := plugins[name] + if exists { + return plugin, nil + } + + // It's not cached. We need to get it. + + newPlugin := new(VolumePlugin) + newPlugin.Name = name + newPlugin.SocketPath = filepath.Join(defaultPath, fmt.Sprintf("%s.sock", name)) + + stat, err := os.Stat(newPlugin.SocketPath) + if err != nil { + return nil, errors.Wrapf(err, "cannot access plugin %s socket %q", name, newPlugin.SocketPath) + } + if stat.Mode()&os.ModeSocket == 0 { + return nil, errors.Wrapf(ErrNotPlugin, "volume %s path %q is not a unix socket", name, newPlugin.SocketPath) + } + + if err := validatePlugin(newPlugin); err != nil { + return nil, err + } + + return newPlugin, nil +} + +func (p *VolumePlugin) getURI() string { + return "unix://" + p.SocketPath +} + +// Verify the plugin is still available. +// TODO: Do we want to ping with an HTTP request? There's no ping endpoint so +// we'd need to hit Activate or Capabilities? +func (p *VolumePlugin) verifyReachable() error { + if _, err := os.Stat(p.SocketPath); err != nil { + if os.IsNotExist(err) { + pluginsLock.Lock() + defer pluginsLock.Unlock() + delete(plugins, p.Name) + return errors.Wrapf(ErrPluginRemoved, p.Name) + } + + return errors.Wrapf(err, "error accessing plugin %s", p.Name) + } + return nil +} + +// Send a request to the volume plugin for handling. +func (p *VolumePlugin) sendRequest(toJSON interface{}, hasBody bool, endpoint string) (*http.Response, error) { + var ( + reqJSON []byte + err error + ) + + if hasBody { + reqJSON, err = json.Marshal(toJSON) + if err != nil { + return nil, errors.Wrapf(err, "error marshalling request JSON for volume plugin %s endpoint %s", p.Name, endpoint) + } + } + + req, err := http.NewRequest("POST", endpoint, bytes.NewReader(reqJSON)) + if err != nil { + return nil, errors.Wrapf(err, "error making request to volume plugin %s endpoint %s", p.Name, endpoint) + } + + req.Header.Set("Host", p.getURI()) + req.Header.Set("Content-Type", sdk.DefaultContentTypeV1_1) + + client := new(http.Client) + client.Timeout = defaultTimeout + resp, err := client.Do(req) + if err != nil { + return nil, errors.Wrapf(err, "error sending request to volume plugin %s endpoint %s", p.Name, endpoint) + } + defer resp.Body.Close() + + return resp, nil +} + +// Turn an error response from a volume plugin into a well-formatted Go error. +func (p *VolumePlugin) makeErrorResponse(err, endpoint, volName string) error { + if err == "" { + err = "empty error from plugin" + } + if volName != "" { + return errors.Wrapf(errors.New(err), "error on %s on volume %s in volume plugin %s", endpoint, volName, p.Name) + } else { + return errors.Wrapf(errors.New(err), "error on %s in volume plugin %s", endpoint, p.Name) + } +} + +// Handle error responses from plugin +func (p *VolumePlugin) handleErrorResponse(resp *http.Response, endpoint, volName string) error { + // The official plugin reference implementation uses HTTP 500 for + // errors, but I don't think we can guarantee all plugins do that. + // Let's interpret anything other than 200 as an error. + // If there isn't an error, don't even bother decoding the response. + if resp.StatusCode != 200 { + errResp, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Wrapf(err, "error reading response body from volume plugin %s", p.Name) + } + + errStruct := new(volume.ErrorResponse) + if err := json.Unmarshal(errResp, errStruct); err != nil { + return errors.Wrapf(err, "error unmarshalling JSON response from volume plugin %s", p.Name) + } + + return p.makeErrorResponse(errStruct.Err, endpoint, volName) + } + + return nil +} + +// CreateVolume creates a volume in the plugin. +func (p *VolumePlugin) CreateVolume(req *volume.CreateRequest) error { + if req == nil { + return errors.Wrapf(define.ErrInvalidArg, "must provide non-nil request to CreateVolume") + } + + if err := p.verifyReachable(); err != nil { + return err + } + + logrus.Infof("Creating volume %s using plugin %s", req.Name, p.Name) + + resp, err := p.sendRequest(req, true, createPath) + if err != nil { + return err + } + defer resp.Body.Close() + + return p.handleErrorResponse(resp, createPath, req.Name) +} + +// ListVolumes lists volumes available in the plugin. +func (p *VolumePlugin) ListVolumes() ([]*volume.Volume, error) { + if err := p.verifyReachable(); err != nil { + return nil, err + } + + logrus.Infof("Listing volumes using plugin %s", p.Name) + + resp, err := p.sendRequest(nil, false, listPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err := p.handleErrorResponse(resp, listPath, ""); err != nil { + return nil, err + } + + // TODO: Can probably unify response reading under a helper + volumeRespBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrapf(err, "error reading response body from volume plugin %s", p.Name) + } + + volumeResp := new(volume.ListResponse) + if err := json.Unmarshal(volumeRespBytes, volumeResp); err != nil { + return nil, errors.Wrapf(err, "error unmarshalling volume plugin %s list response", p.Name) + } + + return volumeResp.Volumes, nil +} + +// GetVolume gets a single volume from the plugin. +func (p *VolumePlugin) GetVolume(req *volume.GetRequest) (*volume.Volume, error) { + if req == nil { + return nil, errors.Wrapf(define.ErrInvalidArg, "must provide non-nil request to GetVolume") + } + + if err := p.verifyReachable(); err != nil { + return nil, err + } + + logrus.Infof("Getting volume %s using plugin %s", req.Name, p.Name) + + resp, err := p.sendRequest(req, true, getPath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err := p.handleErrorResponse(resp, getPath, req.Name); err != nil { + return nil, err + } + + getRespBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrapf(err, "error reading response body from volume plugin %s", p.Name) + } + + getResp := new(volume.GetResponse) + if err := json.Unmarshal(getRespBytes, getResp); err != nil { + return nil, errors.Wrapf(err, "error unmarshalling volume plugin %s get response", p.Name) + } + + return getResp.Volume, nil +} + +// RemoveVolume removes a single volume from the plugin. +func (p *VolumePlugin) RemoveVolume(req *volume.RemoveRequest) error { + if req == nil { + return errors.Wrapf(define.ErrInvalidArg, "must provide non-nil request to RemoveVolume") + } + + if err := p.verifyReachable(); err != nil { + return err + } + + logrus.Infof("Removing volume %s using plugin %s", req.Name, p.Name) + + resp, err := p.sendRequest(req, true, removePath) + if err != nil { + return err + } + defer resp.Body.Close() + + return p.handleErrorResponse(resp, removePath, req.Name) +} + +// GetVolumePath gets the path the given volume is mounted at. +func (p *VolumePlugin) GetVolumePath(req *volume.PathRequest) (string, error) { + if req == nil { + return "", errors.Wrapf(define.ErrInvalidArg, "must provide non-nil request to GetVolumePath") + } + + if err := p.verifyReachable(); err != nil { + return "", err + } + + logrus.Infof("Getting volume %s path using plugin %s", req.Name, p.Name) + + resp, err := p.sendRequest(req, true, hostVirtualPath) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if err := p.handleErrorResponse(resp, hostVirtualPath, req.Name); err != nil { + return "", err + } + + pathRespBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", errors.Wrapf(err, "error reading response body from volume plugin %s", p.Name) + } + + pathResp := new(volume.PathResponse) + if err := json.Unmarshal(pathRespBytes, pathResp); err != nil { + return "", errors.Wrapf(err, "error unmarshalling volume plugin %s path response", p.Name) + } + + return pathResp.Mountpoint, nil +} + +// MountVolume mounts the given volume. The ID argument is the ID of the +// mounting container, used for internal record-keeping by the plugin. Returns +// the path the volume has been mounted at. +func (p *VolumePlugin) MountVolume(req *volume.MountRequest) (string, error) { + if req == nil { + return "", errors.Wrapf(define.ErrInvalidArg, "must provide non-nil request to MountVolume") + } + + if err := p.verifyReachable(); err != nil { + return "", err + } + + logrus.Infof("Mounting volume %s using plugin %s for container %s", req.Name, p.Name, req.ID) + + resp, err := p.sendRequest(req, true, mountPath) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if err := p.handleErrorResponse(resp, mountPath, req.Name); err != nil { + return "", err + } + + mountRespBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", errors.Wrapf(err, "error reading response body from volume plugin %s", p.Name) + } + + mountResp := new(volume.MountResponse) + if err := json.Unmarshal(mountRespBytes, mountResp); err != nil { + return "", errors.Wrapf(err, "error unmarshalling volume plugin %s path response", p.Name) + } + + return mountResp.Mountpoint, nil +} + +// UnmountVolume unmounts the given volume. The ID argument is the ID of the +// container that is unmounting, used for internal record-keeping by the plugin. +func (p *VolumePlugin) UnmountVolume(req *volume.UnmountRequest) error { + if req == nil { + return errors.Wrapf(define.ErrInvalidArg, "must provide non-nil request to UnmountVolume") + } + + if err := p.verifyReachable(); err != nil { + return err + } + + logrus.Infof("Unmounting volume %s using plugin %s for container %s", req.Name, p.Name, req.ID) + + resp, err := p.sendRequest(req, true, unmountPath) + if err != nil { + return err + } + defer resp.Body.Close() + + return p.handleErrorResponse(resp, unmountPath, req.Name) +} -- cgit v1.2.3-54-g00ecf