diff options
Diffstat (limited to 'pkg/varlinkapi')
-rw-r--r-- | pkg/varlinkapi/attach.go | 103 | ||||
-rw-r--r-- | pkg/varlinkapi/containers.go | 65 | ||||
-rw-r--r-- | pkg/varlinkapi/generate.go | 30 | ||||
-rw-r--r-- | pkg/varlinkapi/util.go | 33 | ||||
-rw-r--r-- | pkg/varlinkapi/virtwriter/virtwriter.go | 155 |
5 files changed, 386 insertions, 0 deletions
diff --git a/pkg/varlinkapi/attach.go b/pkg/varlinkapi/attach.go new file mode 100644 index 000000000..9e2a265be --- /dev/null +++ b/pkg/varlinkapi/attach.go @@ -0,0 +1,103 @@ +// +build varlink + +package varlinkapi + +import ( + "bufio" + "io" + + "github.com/containers/libpod/cmd/podman/varlink" + "github.com/containers/libpod/libpod" + "github.com/containers/libpod/pkg/varlinkapi/virtwriter" + "github.com/sirupsen/logrus" + "k8s.io/client-go/tools/remotecommand" +) + +func setupStreams(call iopodman.VarlinkCall) (*bufio.Reader, *bufio.Writer, *io.PipeReader, *io.PipeWriter, *libpod.AttachStreams) { + + // These are the varlink sockets + reader := call.Call.Reader + writer := call.Call.Writer + + // This pipe is used to pass stdin from the client to the input stream + // once the msg has been "decoded" + pr, pw := io.Pipe() + + stdoutWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.ToStdout) + // TODO if runc ever starts passing stderr, we can too + //stderrWriter := NewVirtWriteCloser(writer, ToStderr) + + streams := libpod.AttachStreams{ + OutputStream: stdoutWriter, + InputStream: pr, + // Runc eats the error stream + ErrorStream: stdoutWriter, + AttachInput: true, + AttachOutput: true, + // Runc eats the error stream + AttachError: true, + } + return reader, writer, pr, pw, &streams +} + +// Attach connects to a containers console +func (i *LibpodAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys string, start bool) error { + var finalErr error + resize := make(chan remotecommand.TerminalSize) + errChan := make(chan error) + + if !call.WantsUpgrade() { + return call.ReplyErrorOccurred("client must use upgraded connection to attach") + } + ctr, err := i.Runtime.LookupContainer(name) + if err != nil { + return call.ReplyErrorOccurred(err.Error()) + } + + reader, writer, _, pw, streams := setupStreams(call) + + go func() { + if err := virtwriter.Reader(reader, nil, nil, pw, resize); err != nil { + errChan <- err + } + }() + + if start { + finalErr = startAndAttach(ctr, streams, detachKeys, resize, errChan) + } else { + finalErr = attach(ctr, streams, detachKeys, resize, errChan) + } + + if finalErr != libpod.ErrDetach && finalErr != nil { + logrus.Error(finalErr) + } + quitWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.Quit) + _, err = quitWriter.Write([]byte("HANG-UP")) + // TODO error handling is not quite right here yet + return call.Writer.Flush() +} + +func attach(ctr *libpod.Container, streams *libpod.AttachStreams, detachKeys string, resize chan remotecommand.TerminalSize, errChan chan error) error { + go func() { + if err := ctr.Attach(streams, detachKeys, resize); err != nil { + errChan <- err + } + }() + attachError := <-errChan + return attachError +} + +func startAndAttach(ctr *libpod.Container, streams *libpod.AttachStreams, detachKeys string, resize chan remotecommand.TerminalSize, errChan chan error) error { + var finalErr error + attachChan, err := ctr.StartAndAttach(getContext(), streams, detachKeys, resize, false) + if err != nil { + return err + } + select { + case attachChanErr := <-attachChan: + finalErr = attachChanErr + case chanError := <-errChan: + finalErr = chanError + } + return finalErr +} diff --git a/pkg/varlinkapi/containers.go b/pkg/varlinkapi/containers.go index ac1352dac..17792ccfe 100644 --- a/pkg/varlinkapi/containers.go +++ b/pkg/varlinkapi/containers.go @@ -47,6 +47,55 @@ func (i *LibpodAPI) ListContainers(call iopodman.VarlinkCall) error { return call.ReplyListContainers(listContainers) } +func (i *LibpodAPI) Ps(call iopodman.VarlinkCall, opts iopodman.PsOpts) error { + var ( + containers []iopodman.PsContainer + ) + maxWorkers := shared.Parallelize("ps") + psOpts := makePsOpts(opts) + filters := []string{} + if opts.Filters != nil { + filters = *opts.Filters + } + psContainerOutputs, err := shared.GetPsContainerOutput(i.Runtime, psOpts, filters, maxWorkers) + if err != nil { + return call.ReplyErrorOccurred(err.Error()) + } + + for _, ctr := range psContainerOutputs { + container := iopodman.PsContainer{ + Id: ctr.ID, + Image: ctr.Image, + Command: ctr.Command, + Created: ctr.Created, + Ports: ctr.Ports, + Names: ctr.Names, + IsInfra: ctr.IsInfra, + Status: ctr.Status, + State: ctr.State.String(), + PidNum: int64(ctr.Pid), + RootFsSize: ctr.Size.RootFsSize, + RwSize: ctr.Size.RwSize, + Pod: ctr.Pod, + CreatedAt: ctr.CreatedAt.Format(time.RFC3339Nano), + ExitedAt: ctr.ExitedAt.Format(time.RFC3339Nano), + StartedAt: ctr.StartedAt.Format(time.RFC3339Nano), + Labels: ctr.Labels, + NsPid: ctr.PID, + Cgroup: ctr.Cgroup, + Ipc: ctr.Cgroup, + Mnt: ctr.MNT, + Net: ctr.NET, + PidNs: ctr.PIDNS, + User: ctr.User, + Uts: ctr.UTS, + Mounts: ctr.Mounts, + } + containers = append(containers, container) + } + return call.ReplyPs(containers) +} + // GetContainer ... func (i *LibpodAPI) GetContainer(call iopodman.VarlinkCall, id string) error { ctr, err := i.Runtime.LookupContainer(id) @@ -585,6 +634,22 @@ func (i *LibpodAPI) GetContainerStatsWithHistory(call iopodman.VarlinkCall, prev return call.ReplyGetContainerStatsWithHistory(cStats) } +// Spec ... +func (i *LibpodAPI) Spec(call iopodman.VarlinkCall, name string) error { + ctr, err := i.Runtime.LookupContainer(name) + if err != nil { + return call.ReplyErrorOccurred(err.Error()) + } + + spec := ctr.Spec() + b, err := json.Marshal(spec) + if err != nil { + return call.ReplyErrorOccurred(err.Error()) + } + + return call.ReplySpec(string(b)) +} + // GetContainersLogs is the varlink endpoint to obtain one or more container logs func (i *LibpodAPI) GetContainersLogs(call iopodman.VarlinkCall, names []string, follow, latest bool, since string, tail int64, timestamps bool) error { var wg sync.WaitGroup diff --git a/pkg/varlinkapi/generate.go b/pkg/varlinkapi/generate.go new file mode 100644 index 000000000..bc600c397 --- /dev/null +++ b/pkg/varlinkapi/generate.go @@ -0,0 +1,30 @@ +// +build varlink + +package varlinkapi + +import ( + "encoding/json" + "github.com/containers/libpod/cmd/podman/shared" + iopodman "github.com/containers/libpod/cmd/podman/varlink" +) + +// GenerateKube ... +func (i *LibpodAPI) GenerateKube(call iopodman.VarlinkCall, name string, service bool) error { + pod, serv, err := shared.GenerateKube(name, service, i.Runtime) + if err != nil { + return call.ReplyErrorOccurred(err.Error()) + } + podB, err := json.Marshal(pod) + if err != nil { + return call.ReplyErrorOccurred(err.Error()) + } + servB, err := json.Marshal(serv) + if err != nil { + return call.ReplyErrorOccurred(err.Error()) + } + + return call.ReplyGenerateKube(iopodman.KubePodService{ + Pod: string(podB), + Service: string(servB), + }) +} diff --git a/pkg/varlinkapi/util.go b/pkg/varlinkapi/util.go index 3c4b9b79a..8716c963a 100644 --- a/pkg/varlinkapi/util.go +++ b/pkg/varlinkapi/util.go @@ -162,3 +162,36 @@ func stringPullPolicyToType(s string) buildah.PullPolicy { } return buildah.PullIfMissing } + +func derefBool(inBool *bool) bool { + if inBool == nil { + return false + } + return *inBool +} + +func derefString(in *string) string { + if in == nil { + return "" + } + return *in +} + +func makePsOpts(inOpts iopodman.PsOpts) shared.PsOptions { + last := 0 + if inOpts.Last != nil { + lastT := *inOpts.Last + last = int(lastT) + } + return shared.PsOptions{ + All: inOpts.All, + Last: last, + Latest: derefBool(inOpts.Latest), + NoTrunc: derefBool(inOpts.NoTrunc), + Pod: derefBool(inOpts.Pod), + Size: true, + Sort: derefString(inOpts.Sort), + Namespace: true, + Sync: derefBool(inOpts.Sync), + } +} diff --git a/pkg/varlinkapi/virtwriter/virtwriter.go b/pkg/varlinkapi/virtwriter/virtwriter.go new file mode 100644 index 000000000..3adaf6e17 --- /dev/null +++ b/pkg/varlinkapi/virtwriter/virtwriter.go @@ -0,0 +1,155 @@ +package virtwriter + +import ( + "bufio" + "encoding/binary" + "encoding/json" + "errors" + "io" + "os" + + "k8s.io/client-go/tools/remotecommand" +) + +// SocketDest is the "key" to where IO should go on the varlink +// multiplexed socket +type SocketDest int + +const ( + // ToStdout indicates traffic should go stdout + ToStdout SocketDest = iota + // ToStdin indicates traffic came from stdin + ToStdin SocketDest = iota + // ToStderr indicates traffuc should go to stderr + ToStderr SocketDest = iota + // TerminalResize indicates a terminal resize event has occurred + // and data should be passed to resizer + TerminalResize SocketDest = iota + // Quit and detach + Quit SocketDest = iota +) + +// IntToSocketDest returns a socketdest based on integer input +func IntToSocketDest(i int) SocketDest { + switch i { + case ToStdout.Int(): + return ToStdout + case ToStderr.Int(): + return ToStderr + case ToStdin.Int(): + return ToStdin + case TerminalResize.Int(): + return TerminalResize + case Quit.Int(): + return Quit + default: + return ToStderr + } +} + +// Int returns the integer representation of the socket dest +func (sd SocketDest) Int() int { + return int(sd) +} + +// VirtWriteCloser are writers for attach which include the dest +// of the data +type VirtWriteCloser struct { + writer *bufio.Writer + dest SocketDest +} + +// NewVirtWriteCloser is a constructor +func NewVirtWriteCloser(w *bufio.Writer, dest SocketDest) VirtWriteCloser { + return VirtWriteCloser{w, dest} +} + +// Close is a required method for a writecloser +func (v VirtWriteCloser) Close() error { + return nil +} + +// Write prepends a header to the input message. The header is +// 8bytes. Position one contains the destination. Positions +// 5,6,7,8 are a big-endian encoded uint32 for len of the message. +func (v VirtWriteCloser) Write(input []byte) (int, error) { + header := []byte{byte(v.dest), 0, 0, 0} + // Go makes us define the byte for big endian + mlen := make([]byte, 4) + binary.BigEndian.PutUint32(mlen, uint32(len(input))) + // append the message len to the header + msg := append(header, mlen...) + // append the message to the header + msg = append(msg, input...) + _, err := v.writer.Write(msg) + if err != nil { + return 0, err + } + err = v.writer.Flush() + return len(input), err +} + +// Reader decodes the content that comes over the wire and directs it to the proper destination. +func Reader(r *bufio.Reader, output, errput *os.File, input *io.PipeWriter, resize chan remotecommand.TerminalSize) error { + var saveb []byte + var eom int + for { + readb := make([]byte, 32*1024) + n, err := r.Read(readb) + // TODO, later may be worth checking in len of the read is 0 + if err != nil { + return err + } + b := append(saveb, readb[0:n]...) + // no sense in reading less than the header len + for len(b) > 7 { + eom = int(binary.BigEndian.Uint32(b[4:8])) + 8 + // The message and header are togther + if len(b) >= eom { + out := append([]byte{}, b[8:eom]...) + + switch IntToSocketDest(int(b[0])) { + case ToStdout: + n, err := output.Write(out) + if err != nil { + return err + } + if n < len(out) { + return errors.New("short write error occurred on stdout") + } + case ToStderr: + n, err := errput.Write(out) + if err != nil { + return err + } + if n < len(out) { + return errors.New("short write error occurred on stderr") + } + case ToStdin: + n, err := input.Write(out) + if err != nil { + return err + } + if n < len(out) { + return errors.New("short write error occurred on stdin") + } + case TerminalResize: + // Resize events come over in bytes, need to be reserialized + resizeEvent := remotecommand.TerminalSize{} + if err := json.Unmarshal(out, &resizeEvent); err != nil { + return err + } + resize <- resizeEvent + case Quit: + return nil + } + b = b[eom:] + } else { + // We do not have the header and full message, need to slurp again + saveb = b + break + } + } + } + return nil +} |