diff options
Diffstat (limited to 'pkg/varlinkapi')
-rw-r--r-- | pkg/varlinkapi/attach.go | 21 | ||||
-rw-r--r-- | pkg/varlinkapi/containers.go | 205 | ||||
-rw-r--r-- | pkg/varlinkapi/images.go | 104 | ||||
-rw-r--r-- | pkg/varlinkapi/system.go | 6 | ||||
-rw-r--r-- | pkg/varlinkapi/transfers.go | 8 | ||||
-rw-r--r-- | pkg/varlinkapi/util.go | 45 | ||||
-rw-r--r-- | pkg/varlinkapi/virtwriter/virtwriter.go | 141 |
7 files changed, 353 insertions, 177 deletions
diff --git a/pkg/varlinkapi/attach.go b/pkg/varlinkapi/attach.go index 2234899a5..1f8d48eb9 100644 --- a/pkg/varlinkapi/attach.go +++ b/pkg/varlinkapi/attach.go @@ -8,6 +8,7 @@ import ( "github.com/containers/libpod/cmd/podman/varlink" "github.com/containers/libpod/libpod" + "github.com/containers/libpod/libpod/define" "github.com/containers/libpod/pkg/varlinkapi/virtwriter" "github.com/sirupsen/logrus" "k8s.io/client-go/tools/remotecommand" @@ -57,30 +58,34 @@ func (i *LibpodAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys st if err != nil { return call.ReplyErrorOccurred(err.Error()) } - if !start && state != libpod.ContainerStateRunning { + if !start && state != define.ContainerStateRunning { return call.ReplyErrorOccurred("container must be running to attach") } - call.Reply(nil) + + // ACK the client upgrade request + call.ReplyAttach() + reader, writer, _, pw, streams := setupStreams(call) go func() { - if err := virtwriter.Reader(reader, nil, nil, pw, resize); err != nil { + if err := virtwriter.Reader(reader, nil, nil, pw, resize, nil); err != nil { errChan <- err } }() - if state == libpod.ContainerStateRunning { + if state == define.ContainerStateRunning { finalErr = attach(ctr, streams, detachKeys, resize, errChan) } else { finalErr = startAndAttach(ctr, streams, detachKeys, resize, errChan) } - if finalErr != libpod.ErrDetach && finalErr != nil { + if finalErr != define.ErrDetach && finalErr != nil { logrus.Error(finalErr) } - quitWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.Quit) - _, err = quitWriter.Write([]byte("HANG-UP")) - // TODO error handling is not quite right here yet + + if err = virtwriter.HangUp(writer, 0); err != nil { + logrus.Errorf("Failed to HANG-UP attach to %s: %s", ctr.ID(), err.Error()) + } return call.Writer.Flush() } diff --git a/pkg/varlinkapi/containers.go b/pkg/varlinkapi/containers.go index 8611a1a7d..cd5f305c9 100644 --- a/pkg/varlinkapi/containers.go +++ b/pkg/varlinkapi/containers.go @@ -16,10 +16,14 @@ import ( "github.com/containers/libpod/cmd/podman/shared" "github.com/containers/libpod/cmd/podman/varlink" "github.com/containers/libpod/libpod" + "github.com/containers/libpod/libpod/define" + "github.com/containers/libpod/libpod/logs" "github.com/containers/libpod/pkg/adapter/shortcuts" - cc "github.com/containers/libpod/pkg/spec" + "github.com/containers/libpod/pkg/varlinkapi/virtwriter" "github.com/containers/storage/pkg/archive" "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/client-go/tools/remotecommand" ) // ListContainers ... @@ -64,32 +68,34 @@ func (i *LibpodAPI) Ps(call iopodman.VarlinkCall, opts iopodman.PsOpts) error { for _, ctr := range psContainerOutputs { container := iopodman.PsContainer{ - Id: ctr.ID, - Image: ctr.Image, - Command: ctr.Command, - Created: ctr.Created, - Ports: ctr.Ports, - Names: ctr.Names, - IsInfra: ctr.IsInfra, - Status: ctr.Status, - State: ctr.State.String(), - PidNum: int64(ctr.Pid), - RootFsSize: ctr.Size.RootFsSize, - RwSize: ctr.Size.RwSize, - Pod: ctr.Pod, - CreatedAt: ctr.CreatedAt.Format(time.RFC3339Nano), - ExitedAt: ctr.ExitedAt.Format(time.RFC3339Nano), - StartedAt: ctr.StartedAt.Format(time.RFC3339Nano), - Labels: ctr.Labels, - NsPid: ctr.PID, - Cgroup: ctr.Cgroup, - Ipc: ctr.Cgroup, - Mnt: ctr.MNT, - Net: ctr.NET, - PidNs: ctr.PIDNS, - User: ctr.User, - Uts: ctr.UTS, - Mounts: ctr.Mounts, + Id: ctr.ID, + Image: ctr.Image, + Command: ctr.Command, + Created: ctr.Created, + Ports: ctr.Ports, + Names: ctr.Names, + IsInfra: ctr.IsInfra, + Status: ctr.Status, + State: ctr.State.String(), + PidNum: int64(ctr.Pid), + Pod: ctr.Pod, + CreatedAt: ctr.CreatedAt.Format(time.RFC3339Nano), + ExitedAt: ctr.ExitedAt.Format(time.RFC3339Nano), + StartedAt: ctr.StartedAt.Format(time.RFC3339Nano), + Labels: ctr.Labels, + NsPid: ctr.PID, + Cgroup: ctr.Cgroup, + Ipc: ctr.Cgroup, + Mnt: ctr.MNT, + Net: ctr.NET, + PidNs: ctr.PIDNS, + User: ctr.User, + Uts: ctr.UTS, + Mounts: ctr.Mounts, + } + if ctr.Size != nil { + container.RootFsSize = ctr.Size.RootFsSize + container.RwSize = ctr.Size.RwSize } containers = append(containers, container) } @@ -119,7 +125,7 @@ func (i *LibpodAPI) GetContainersByContext(call iopodman.VarlinkCall, all, lates ctrs, err := shortcuts.GetContainersByContext(all, latest, input, i.Runtime) if err != nil { - if errors.Cause(err) == libpod.ErrNoSuchCtr { + if errors.Cause(err) == define.ErrNoSuchCtr { return call.ReplyContainerNotFound("", err.Error()) } return call.ReplyErrorOccurred(err.Error()) @@ -138,7 +144,7 @@ func (i *LibpodAPI) GetContainersByStatus(call iopodman.VarlinkCall, statuses [] containers []iopodman.Container ) for _, status := range statuses { - lpstatus, err := libpod.StringToContainerStatus(status) + lpstatus, err := define.StringToContainerStatus(status) if err != nil { return call.ReplyErrorOccurred(err.Error()) } @@ -168,16 +174,7 @@ func (i *LibpodAPI) InspectContainer(call iopodman.VarlinkCall, name string) err if err != nil { return call.ReplyContainerNotFound(name, err.Error()) } - inspectInfo, err := ctr.Inspect(true) - if err != nil { - return call.ReplyErrorOccurred(err.Error()) - } - artifact, err := getArtifact(ctr) - if err != nil { - return call.ReplyErrorOccurred(err.Error()) - } - - data, err := shared.GetCtrInspectInfo(ctr.Config(), inspectInfo, artifact) + data, err := ctr.Inspect(true) if err != nil { return call.ReplyErrorOccurred(err.Error()) } @@ -198,7 +195,7 @@ func (i *LibpodAPI) ListContainerProcesses(call iopodman.VarlinkCall, name strin if err != nil { return call.ReplyErrorOccurred(err.Error()) } - if containerState != libpod.ContainerStateRunning { + if containerState != define.ContainerStateRunning { return call.ReplyErrorOccurred(fmt.Sprintf("container %s is not running", name)) } var psArgs []string @@ -229,7 +226,7 @@ func (i *LibpodAPI) GetContainerLogs(call iopodman.VarlinkCall, name string) err return call.ReplyErrorOccurred(err.Error()) } if _, err := os.Stat(logPath); err != nil { - if containerState == libpod.ContainerStateConfigured { + if containerState == define.ContainerStateConfigured { return call.ReplyGetContainerLogs(logs) } } @@ -259,7 +256,7 @@ func (i *LibpodAPI) GetContainerLogs(call iopodman.VarlinkCall, name string) err if err != nil { return call.ReplyErrorOccurred(err.Error()) } - if state != libpod.ContainerStateRunning && state != libpod.ContainerStatePaused { + if state != define.ContainerStateRunning && state != define.ContainerStatePaused { return call.ReplyErrorOccurred(fmt.Sprintf("%s is no longer running", ctr.ID())) } @@ -326,7 +323,7 @@ func (i *LibpodAPI) GetContainerStats(call iopodman.VarlinkCall, name string) er } containerStats, err := ctr.GetContainerStats(&libpod.ContainerStats{}) if err != nil { - if errors.Cause(err) == libpod.ErrCtrStateInvalid { + if errors.Cause(err) == define.ErrCtrStateInvalid { return call.ReplyNoContainerRunning() } return call.ReplyErrorOccurred(err.Error()) @@ -359,7 +356,7 @@ func (i *LibpodAPI) StartContainer(call iopodman.VarlinkCall, name string) error if err != nil { return call.ReplyErrorOccurred(err.Error()) } - if state == libpod.ContainerStateRunning || state == libpod.ContainerStatePaused { + if state == define.ContainerStateRunning || state == define.ContainerStatePaused { return call.ReplyErrorOccurred("container is already running or paused") } recursive := false @@ -379,7 +376,7 @@ func (i *LibpodAPI) InitContainer(call iopodman.VarlinkCall, name string) error return call.ReplyContainerNotFound(name, err.Error()) } if err := ctr.Init(getContext()); err != nil { - if errors.Cause(err) == libpod.ErrCtrStateInvalid { + if errors.Cause(err) == define.ErrCtrStateInvalid { return call.ReplyInvalidState(ctr.ID(), err.Error()) } return call.ReplyErrorOccurred(err.Error()) @@ -394,10 +391,10 @@ func (i *LibpodAPI) StopContainer(call iopodman.VarlinkCall, name string, timeou return call.ReplyContainerNotFound(name, err.Error()) } if err := ctr.StopWithTimeout(uint(timeout)); err != nil { - if errors.Cause(err) == libpod.ErrCtrStopped { + if errors.Cause(err) == define.ErrCtrStopped { return call.ReplyErrCtrStopped(ctr.ID()) } - if errors.Cause(err) == libpod.ErrCtrStateInvalid { + if errors.Cause(err) == define.ErrCtrStateInvalid { return call.ReplyInvalidState(ctr.ID(), err.Error()) } return call.ReplyErrorOccurred(err.Error()) @@ -420,7 +417,7 @@ func (i *LibpodAPI) RestartContainer(call iopodman.VarlinkCall, name string, tim // ContainerExists looks in local storage for the existence of a container func (i *LibpodAPI) ContainerExists(call iopodman.VarlinkCall, name string) error { _, err := i.Runtime.LookupContainer(name) - if errors.Cause(err) == libpod.ErrNoSuchCtr { + if errors.Cause(err) == define.ErrNoSuchCtr { return call.ReplyContainerExists(1) } if err != nil { @@ -510,7 +507,7 @@ func (i *LibpodAPI) DeleteStoppedContainers(call iopodman.VarlinkCall) error { if err != nil { return call.ReplyErrorOccurred(err.Error()) } - if state != libpod.ContainerStateRunning { + if state != define.ContainerStateRunning { if err := i.Runtime.RemoveContainer(ctx, ctr, false, false); err != nil { return call.ReplyErrorOccurred(err.Error()) } @@ -534,7 +531,7 @@ func (i *LibpodAPI) GetAttachSockets(call iopodman.VarlinkCall, name string) err // If the container hasn't been run, we need to run init // so the conmon sockets get created. - if status == libpod.ContainerStateConfigured || status == libpod.ContainerStateStopped { + if status == define.ContainerStateConfigured || status == define.ContainerStateStopped { if err := ctr.Init(getContext()); err != nil { return call.ReplyErrorOccurred(err.Error()) } @@ -585,18 +582,6 @@ func (i *LibpodAPI) ContainerRestore(call iopodman.VarlinkCall, name string, kee return call.ReplyContainerRestore(ctr.ID()) } -func getArtifact(ctr *libpod.Container) (*cc.CreateConfig, error) { - var createArtifact cc.CreateConfig - artifact, err := ctr.GetArtifact("create-config") - if err != nil { - return nil, err - } - if err := json.Unmarshal(artifact, &createArtifact); err != nil { - return nil, err - } - return &createArtifact, nil -} - // ContainerConfig returns just the container.config struct func (i *LibpodAPI) ContainerConfig(call iopodman.VarlinkCall, name string) error { ctr, err := i.Runtime.LookupContainer(name) @@ -719,7 +704,7 @@ func (i *LibpodAPI) GetContainersLogs(call iopodman.VarlinkCall, names []string, if err != nil { return call.ReplyErrorOccurred(err.Error()) } - options := libpod.LogOptions{ + options := logs.LogOptions{ Follow: follow, Since: sinceTime, Tail: uint64(tail), @@ -730,7 +715,7 @@ func (i *LibpodAPI) GetContainersLogs(call iopodman.VarlinkCall, names []string, if len(names) > 1 { options.Multi = true } - logChannel := make(chan *libpod.LogLine, int(tail)*len(names)+1) + logChannel := make(chan *logs.LogLine, int(tail)*len(names)+1) containers, err := shortcuts.GetContainersByContext(false, latest, names, i.Runtime) if err != nil { return call.ReplyErrorOccurred(err.Error()) @@ -752,7 +737,7 @@ func (i *LibpodAPI) GetContainersLogs(call iopodman.VarlinkCall, names []string, return call.ReplyGetContainersLogs(iopodman.LogLine{}) } -func newPodmanLogLine(line *libpod.LogLine) iopodman.LogLine { +func newPodmanLogLine(line *logs.LogLine) iopodman.LogLine { return iopodman.LogLine{ Device: line.Device, ParseLogType: line.ParseLogType, @@ -774,3 +759,93 @@ func (i *LibpodAPI) Top(call iopodman.VarlinkCall, nameOrID string, descriptors } return call.ReplyTop(topInfo) } + +// ExecContainer is the varlink endpoint to execute a command in a container +func (i *LibpodAPI) ExecContainer(call iopodman.VarlinkCall, opts iopodman.ExecOpts) error { + if !call.WantsUpgrade() { + return call.ReplyErrorOccurred("client must use upgraded connection to exec") + } + + ctr, err := i.Runtime.LookupContainer(opts.Name) + if err != nil { + return call.ReplyContainerNotFound(opts.Name, err.Error()) + } + + state, err := ctr.State() + if err != nil { + return call.ReplyErrorOccurred( + fmt.Sprintf("exec failed to obtain container %s state: %s", ctr.ID(), err.Error())) + } + + if state != define.ContainerStateRunning { + return call.ReplyErrorOccurred( + fmt.Sprintf("exec requires a running container, %s is %s", ctr.ID(), state.String())) + } + + // ACK the client upgrade request + call.ReplyExecContainer() + + envs := []string{} + if opts.Env != nil { + envs = *opts.Env + } + + var user string + if opts.User != nil { + user = *opts.User + } + + var workDir string + if opts.Workdir != nil { + workDir = *opts.Workdir + } + + var detachKeys string + if opts.DetachKeys != nil { + detachKeys = *opts.DetachKeys + } + + resizeChan := make(chan remotecommand.TerminalSize) + + reader, writer, _, pipeWriter, streams := setupStreams(call) + + type ExitCodeError struct { + ExitCode uint32 + Error error + } + ecErrChan := make(chan ExitCodeError, 1) + + go func() { + if err := virtwriter.Reader(reader, nil, nil, pipeWriter, resizeChan, nil); err != nil { + ecErrChan <- ExitCodeError{ + define.ExecErrorCodeGeneric, + err, + } + } + }() + + go func() { + ec, err := ctr.Exec(opts.Tty, opts.Privileged, envs, opts.Cmd, user, workDir, streams, 0, resizeChan, detachKeys) + if err != nil { + logrus.Errorf(err.Error()) + } + ecErrChan <- ExitCodeError{ + uint32(ec), + err, + } + }() + + ecErr := <-ecErrChan + + exitCode := define.TranslateExecErrorToExitCode(int(ecErr.ExitCode), ecErr.Error) + + if err = virtwriter.HangUp(writer, uint32(exitCode)); err != nil { + logrus.Errorf("ExecContainer failed to HANG-UP on %s: %s", ctr.ID(), err.Error()) + } + + if err := call.Writer.Flush(); err != nil { + logrus.Errorf("Exec Container err: %s", err.Error()) + } + + return ecErr.Error +} diff --git a/pkg/varlinkapi/images.go b/pkg/varlinkapi/images.go index fa1a0a109..739a3e582 100644 --- a/pkg/varlinkapi/images.go +++ b/pkg/varlinkapi/images.go @@ -23,7 +23,9 @@ import ( "github.com/containers/libpod/cmd/podman/shared" "github.com/containers/libpod/cmd/podman/varlink" "github.com/containers/libpod/libpod" + "github.com/containers/libpod/libpod/define" "github.com/containers/libpod/libpod/image" + "github.com/containers/libpod/pkg/channelwriter" "github.com/containers/libpod/pkg/util" "github.com/containers/libpod/utils" "github.com/containers/storage/pkg/archive" @@ -67,6 +69,7 @@ func (i *LibpodAPI) ListImages(call iopodman.VarlinkCall) error { Containers: int64(len(containers)), Labels: labels, IsParent: isParent, + ReadOnly: image.IsReadOnly(), } imageList = append(imageList, i) } @@ -107,6 +110,7 @@ func (i *LibpodAPI) GetImage(call iopodman.VarlinkCall, id string) error { Containers: int64(len(containers)), Labels: labels, TopLayer: newImage.TopLayer(), + ReadOnly: newImage.IsReadOnly(), } return call.ReplyGetImage(il) } @@ -371,7 +375,6 @@ func (i *LibpodAPI) PushImage(call iopodman.VarlinkCall, name, tag string, compr done = true default: if !call.WantsMore() { - time.Sleep(1 * time.Second) break } br := iopodman.MoreResponse{ @@ -495,6 +498,19 @@ func (i *LibpodAPI) DeleteUnusedImages(call iopodman.VarlinkCall) error { // Commit ... func (i *LibpodAPI) Commit(call iopodman.VarlinkCall, name, imageName string, changes []string, author, message string, pause bool, manifestType string) error { + var ( + newImage *image.Image + log []string + mimeType string + ) + output := channelwriter.NewChannelWriter() + channelClose := func() { + if err := output.Close(); err != nil { + logrus.Errorf("failed to close channel writer: %q", err) + } + } + defer channelClose() + ctr, err := i.Runtime.LookupContainer(name) if err != nil { return call.ReplyContainerNotFound(name, err.Error()) @@ -504,7 +520,6 @@ func (i *LibpodAPI) Commit(call iopodman.VarlinkCall, name, imageName string, ch return call.ReplyErrorOccurred(err.Error()) } sc := image.GetSystemContext(rtc.SignaturePolicyPath, "", false) - var mimeType string switch manifestType { case "oci", "": //nolint mimeType = buildah.OCIv1ImageManifest @@ -515,7 +530,7 @@ func (i *LibpodAPI) Commit(call iopodman.VarlinkCall, name, imageName string, ch } coptions := buildah.CommitOptions{ SignaturePolicyPath: rtc.SignaturePolicyPath, - ReportWriter: nil, + ReportWriter: output, SystemContext: sc, PreferredManifestType: mimeType, } @@ -527,11 +542,36 @@ func (i *LibpodAPI) Commit(call iopodman.VarlinkCall, name, imageName string, ch Author: author, } - newImage, err := ctr.Commit(getContext(), imageName, options) + if call.WantsMore() { + call.Continues = true + } + + c := make(chan error) + defer close(c) + + go func() { + newImage, err = ctr.Commit(getContext(), imageName, options) + if err != nil { + c <- err + } + c <- nil + }() + + // reply is the func being sent to the output forwarder. in this case it is replying + // with a more response struct + reply := func(br iopodman.MoreResponse) error { + return call.ReplyCommit(br) + } + log, err = forwardOutput(log, c, call.WantsMore(), output, reply) if err != nil { return call.ReplyErrorOccurred(err.Error()) } - return call.ReplyCommit(newImage.ID()) + call.Continues = false + br := iopodman.MoreResponse{ + Logs: log, + Id: newImage.ID(), + } + return call.ReplyCommit(br) } // ImportImage imports an image from a tarball to the image store @@ -583,6 +623,7 @@ func (i *LibpodAPI) ExportImage(call iopodman.VarlinkCall, name, destination str func (i *LibpodAPI) PullImage(call iopodman.VarlinkCall, name string) error { var ( imageID string + err error ) dockerRegistryOptions := image.DockerRegistryOptions{} so := image.SigningOptions{} @@ -590,8 +631,16 @@ func (i *LibpodAPI) PullImage(call iopodman.VarlinkCall, name string) error { if call.WantsMore() { call.Continues = true } - output := bytes.NewBuffer([]byte{}) + output := channelwriter.NewChannelWriter() + channelClose := func() { + if err := output.Close(); err != nil { + logrus.Errorf("failed to close channel writer: %q", err) + } + } + defer channelClose() c := make(chan error) + defer close(c) + go func() { if strings.HasPrefix(name, dockerarchive.Transport.Name()+":") { srcRef, err := alltransports.ParseImageName(name) @@ -613,44 +662,17 @@ func (i *LibpodAPI) PullImage(call iopodman.VarlinkCall, name string) error { } } c <- nil - close(c) }() var log []string - done := false - for { - line, err := output.ReadString('\n') - if err == nil { - log = append(log, line) - continue - } else if err == io.EOF { - select { - case err := <-c: - if err != nil { - logrus.Errorf("reading of output during pull failed for %s", name) - return call.ReplyErrorOccurred(err.Error()) - } - done = true - default: - if !call.WantsMore() { - time.Sleep(1 * time.Second) - break - } - br := iopodman.MoreResponse{ - Logs: log, - } - call.ReplyPullImage(br) - log = []string{} - } - } else { - return call.ReplyErrorOccurred(err.Error()) - } - if done { - break - } + reply := func(br iopodman.MoreResponse) error { + return call.ReplyPullImage(br) + } + log, err = forwardOutput(log, c, call.WantsMore(), output, reply) + if err != nil { + return call.ReplyErrorOccurred(err.Error()) } call.Continues = false - br := iopodman.MoreResponse{ Logs: log, Id: imageID, @@ -709,7 +731,7 @@ func (i *LibpodAPI) ImagesPrune(call iopodman.VarlinkCall, all bool) error { func (i *LibpodAPI) ImageSave(call iopodman.VarlinkCall, options iopodman.ImageSaveOptions) error { newImage, err := i.Runtime.ImageRuntime().NewFromLocal(options.Name) if err != nil { - if errors.Cause(err) == libpod.ErrNoSuchImage { + if errors.Cause(err) == define.ErrNoSuchImage { return call.ReplyImageNotFound(options.Name, err.Error()) } return call.ReplyErrorOccurred(err.Error()) @@ -764,7 +786,6 @@ func (i *LibpodAPI) ImageSave(call iopodman.VarlinkCall, options iopodman.ImageS done = true default: if !call.WantsMore() { - time.Sleep(1 * time.Second) break } br := iopodman.MoreResponse{ @@ -844,7 +865,6 @@ func (i *LibpodAPI) LoadImage(call iopodman.VarlinkCall, name, inputFile string, done = true default: if !call.WantsMore() { - time.Sleep(1 * time.Second) break } br := iopodman.MoreResponse{ diff --git a/pkg/varlinkapi/system.go b/pkg/varlinkapi/system.go index 59bfec75b..9b5b3a5b1 100644 --- a/pkg/varlinkapi/system.go +++ b/pkg/varlinkapi/system.go @@ -3,17 +3,17 @@ package varlinkapi import ( + "github.com/containers/libpod/libpod/define" goruntime "runtime" "strings" "time" "github.com/containers/libpod/cmd/podman/varlink" - "github.com/containers/libpod/libpod" ) // GetVersion ... func (i *LibpodAPI) GetVersion(call iopodman.VarlinkCall) error { - versionInfo, err := libpod.GetVersion() + versionInfo, err := define.GetVersion() if err != nil { return err } @@ -30,7 +30,7 @@ func (i *LibpodAPI) GetVersion(call iopodman.VarlinkCall) error { // GetInfo returns details about the podman host and its stores func (i *LibpodAPI) GetInfo(call iopodman.VarlinkCall) error { - versionInfo, err := libpod.GetVersion() + versionInfo, err := define.GetVersion() if err != nil { return err } diff --git a/pkg/varlinkapi/transfers.go b/pkg/varlinkapi/transfers.go index 96f76bcdc..31d26c3aa 100644 --- a/pkg/varlinkapi/transfers.go +++ b/pkg/varlinkapi/transfers.go @@ -26,7 +26,8 @@ func (i *LibpodAPI) SendFile(call iopodman.VarlinkCall, ftype string, length int defer outputFile.Close() if err = call.ReplySendFile(outputFile.Name()); err != nil { - return call.ReplyErrorOccurred(err.Error()) + // If an error occurs while sending the reply, return the error + return err } writer := bufio.NewWriter(outputFile) @@ -60,9 +61,10 @@ func (i *LibpodAPI) ReceiveFile(call iopodman.VarlinkCall, filepath string, dele } // Send the file length down to client - // Varlink connection upraded + // Varlink connection upgraded if err = call.ReplyReceiveFile(fileInfo.Size()); err != nil { - return call.ReplyErrorOccurred(err.Error()) + // If an error occurs while sending the reply, return the error + return err } reader := bufio.NewReader(fs) diff --git a/pkg/varlinkapi/util.go b/pkg/varlinkapi/util.go index 8716c963a..d3a41f7ab 100644 --- a/pkg/varlinkapi/util.go +++ b/pkg/varlinkapi/util.go @@ -12,6 +12,8 @@ import ( "github.com/containers/libpod/cmd/podman/shared" "github.com/containers/libpod/cmd/podman/varlink" "github.com/containers/libpod/libpod" + "github.com/containers/libpod/libpod/define" + "github.com/containers/libpod/pkg/channelwriter" "github.com/containers/storage/pkg/archive" ) @@ -73,7 +75,7 @@ func makeListContainer(containerID string, batchInfo shared.BatchContainerStruct Names: batchInfo.ConConfig.Name, Labels: batchInfo.ConConfig.Labels, Mounts: mounts, - Containerrunning: batchInfo.ConState == libpod.ContainerStateRunning, + Containerrunning: batchInfo.ConState == define.ContainerStateRunning, Namespaces: namespace, } if batchInfo.Size != nil { @@ -189,9 +191,48 @@ func makePsOpts(inOpts iopodman.PsOpts) shared.PsOptions { Latest: derefBool(inOpts.Latest), NoTrunc: derefBool(inOpts.NoTrunc), Pod: derefBool(inOpts.Pod), - Size: true, + Size: derefBool(inOpts.Size), Sort: derefString(inOpts.Sort), Namespace: true, Sync: derefBool(inOpts.Sync), } } + +// forwardOutput is a helper method for varlink endpoints that employ both more and without +// more. it is capable of sending updates as the output writer gets them or append them +// all to a log. the chan error is the error from the libpod call so we can honor +// and error event in that case. +func forwardOutput(log []string, c chan error, wantsMore bool, output *channelwriter.Writer, reply func(br iopodman.MoreResponse) error) ([]string, error) { + done := false + for { + select { + // We need to check if the libpod func being called has returned an + // error yet + case err := <-c: + if err != nil { + return nil, err + } + done = true + // if no error is found, we pull what we can from the log writer and + // append it to log string slice + case line := <-output.ByteChannel: + log = append(log, string(line)) + // If the end point is being used in more mode, send what we have + if wantsMore { + br := iopodman.MoreResponse{ + Logs: log, + } + if err := reply(br); err != nil { + return nil, err + } + // "reset" the log to empty because we are sending what we + // get as we get it + log = []string{} + } + } + if done { + break + } + } + return log, nil +} diff --git a/pkg/varlinkapi/virtwriter/virtwriter.go b/pkg/varlinkapi/virtwriter/virtwriter.go index 3adaf6e17..27ecd1f52 100644 --- a/pkg/varlinkapi/virtwriter/virtwriter.go +++ b/pkg/varlinkapi/virtwriter/virtwriter.go @@ -4,10 +4,9 @@ import ( "bufio" "encoding/binary" "encoding/json" - "errors" "io" - "os" + "github.com/pkg/errors" "k8s.io/client-go/tools/remotecommand" ) @@ -90,66 +89,100 @@ func (v VirtWriteCloser) Write(input []byte) (int, error) { } // Reader decodes the content that comes over the wire and directs it to the proper destination. -func Reader(r *bufio.Reader, output, errput *os.File, input *io.PipeWriter, resize chan remotecommand.TerminalSize) error { - var saveb []byte - var eom int +func Reader(r *bufio.Reader, output, errput, input io.Writer, resize chan remotecommand.TerminalSize, execEcChan chan int) error { + var messageSize int64 + headerBytes := make([]byte, 8) + + if r == nil { + return errors.Errorf("Reader must not be nil") + } + for { - readb := make([]byte, 32*1024) - n, err := r.Read(readb) - // TODO, later may be worth checking in len of the read is 0 + n, err := io.ReadFull(r, headerBytes) if err != nil { - return err + return errors.Wrapf(err, "Virtual Read failed, %d", n) } - b := append(saveb, readb[0:n]...) - // no sense in reading less than the header len - for len(b) > 7 { - eom = int(binary.BigEndian.Uint32(b[4:8])) + 8 - // The message and header are togther - if len(b) >= eom { - out := append([]byte{}, b[8:eom]...) - - switch IntToSocketDest(int(b[0])) { - case ToStdout: - n, err := output.Write(out) - if err != nil { - return err - } - if n < len(out) { - return errors.New("short write error occurred on stdout") - } - case ToStderr: - n, err := errput.Write(out) - if err != nil { - return err - } - if n < len(out) { - return errors.New("short write error occurred on stderr") - } - case ToStdin: - n, err := input.Write(out) + if n < 8 { + return errors.New("short read and no full header read") + } + + messageSize = int64(binary.BigEndian.Uint32(headerBytes[4:8])) + + switch IntToSocketDest(int(headerBytes[0])) { + case ToStdout: + if output != nil { + _, err := io.CopyN(output, r, messageSize) + if err != nil { + return err + } + } + case ToStderr: + if errput != nil { + _, err := io.CopyN(errput, r, messageSize) + if err != nil { + return err + } + } + case ToStdin: + if input != nil { + _, err := io.CopyN(input, r, messageSize) + if err != nil { + return err + } + } + case TerminalResize: + if resize != nil { + out := make([]byte, messageSize) + if messageSize > 0 { + _, err = io.ReadFull(r, out) + if err != nil { return err } - if n < len(out) { - return errors.New("short write error occurred on stdin") - } - case TerminalResize: - // Resize events come over in bytes, need to be reserialized - resizeEvent := remotecommand.TerminalSize{} - if err := json.Unmarshal(out, &resizeEvent); err != nil { - return err - } - resize <- resizeEvent - case Quit: - return nil } - b = b[eom:] - } else { - // We do not have the header and full message, need to slurp again - saveb = b - break + // Resize events come over in bytes, need to be reserialized + resizeEvent := remotecommand.TerminalSize{} + if err := json.Unmarshal(out, &resizeEvent); err != nil { + return err + } + resize <- resizeEvent + } + case Quit: + out := make([]byte, messageSize) + if messageSize > 0 { + _, err = io.ReadFull(r, out) + + if err != nil { + return err + } + } + if execEcChan != nil { + ecInt := binary.BigEndian.Uint32(out) + execEcChan <- int(ecInt) } + return nil + + default: + // Something really went wrong + return errors.New("unknown multiplex destination") } } - return nil +} + +// HangUp sends message to peer to close connection +func HangUp(writer *bufio.Writer, ec uint32) (err error) { + n := 0 + msg := make([]byte, 4) + + binary.BigEndian.PutUint32(msg, ec) + + writeQuit := NewVirtWriteCloser(writer, Quit) + if n, err = writeQuit.Write(msg); err != nil { + return + } + + if n != len(msg) { + return errors.Errorf("Failed to send complete %s message", string(msg)) + } + return } |