summaryrefslogtreecommitdiff
path: root/pkg/varlinkapi
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/varlinkapi')
-rw-r--r--pkg/varlinkapi/attach.go103
-rw-r--r--pkg/varlinkapi/containers.go65
-rw-r--r--pkg/varlinkapi/generate.go30
-rw-r--r--pkg/varlinkapi/util.go33
-rw-r--r--pkg/varlinkapi/virtwriter/virtwriter.go155
5 files changed, 386 insertions, 0 deletions
diff --git a/pkg/varlinkapi/attach.go b/pkg/varlinkapi/attach.go
new file mode 100644
index 000000000..9e2a265be
--- /dev/null
+++ b/pkg/varlinkapi/attach.go
@@ -0,0 +1,103 @@
+// +build varlink
+
+package varlinkapi
+
+import (
+ "bufio"
+ "io"
+
+ "github.com/containers/libpod/cmd/podman/varlink"
+ "github.com/containers/libpod/libpod"
+ "github.com/containers/libpod/pkg/varlinkapi/virtwriter"
+ "github.com/sirupsen/logrus"
+ "k8s.io/client-go/tools/remotecommand"
+)
+
+func setupStreams(call iopodman.VarlinkCall) (*bufio.Reader, *bufio.Writer, *io.PipeReader, *io.PipeWriter, *libpod.AttachStreams) {
+
+ // These are the varlink sockets
+ reader := call.Call.Reader
+ writer := call.Call.Writer
+
+ // This pipe is used to pass stdin from the client to the input stream
+ // once the msg has been "decoded"
+ pr, pw := io.Pipe()
+
+ stdoutWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.ToStdout)
+ // TODO if runc ever starts passing stderr, we can too
+ //stderrWriter := NewVirtWriteCloser(writer, ToStderr)
+
+ streams := libpod.AttachStreams{
+ OutputStream: stdoutWriter,
+ InputStream: pr,
+ // Runc eats the error stream
+ ErrorStream: stdoutWriter,
+ AttachInput: true,
+ AttachOutput: true,
+ // Runc eats the error stream
+ AttachError: true,
+ }
+ return reader, writer, pr, pw, &streams
+}
+
+// Attach connects to a containers console
+func (i *LibpodAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys string, start bool) error {
+ var finalErr error
+ resize := make(chan remotecommand.TerminalSize)
+ errChan := make(chan error)
+
+ if !call.WantsUpgrade() {
+ return call.ReplyErrorOccurred("client must use upgraded connection to attach")
+ }
+ ctr, err := i.Runtime.LookupContainer(name)
+ if err != nil {
+ return call.ReplyErrorOccurred(err.Error())
+ }
+
+ reader, writer, _, pw, streams := setupStreams(call)
+
+ go func() {
+ if err := virtwriter.Reader(reader, nil, nil, pw, resize); err != nil {
+ errChan <- err
+ }
+ }()
+
+ if start {
+ finalErr = startAndAttach(ctr, streams, detachKeys, resize, errChan)
+ } else {
+ finalErr = attach(ctr, streams, detachKeys, resize, errChan)
+ }
+
+ if finalErr != libpod.ErrDetach && finalErr != nil {
+ logrus.Error(finalErr)
+ }
+ quitWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.Quit)
+ _, err = quitWriter.Write([]byte("HANG-UP"))
+ // TODO error handling is not quite right here yet
+ return call.Writer.Flush()
+}
+
+func attach(ctr *libpod.Container, streams *libpod.AttachStreams, detachKeys string, resize chan remotecommand.TerminalSize, errChan chan error) error {
+ go func() {
+ if err := ctr.Attach(streams, detachKeys, resize); err != nil {
+ errChan <- err
+ }
+ }()
+ attachError := <-errChan
+ return attachError
+}
+
+func startAndAttach(ctr *libpod.Container, streams *libpod.AttachStreams, detachKeys string, resize chan remotecommand.TerminalSize, errChan chan error) error {
+ var finalErr error
+ attachChan, err := ctr.StartAndAttach(getContext(), streams, detachKeys, resize, false)
+ if err != nil {
+ return err
+ }
+ select {
+ case attachChanErr := <-attachChan:
+ finalErr = attachChanErr
+ case chanError := <-errChan:
+ finalErr = chanError
+ }
+ return finalErr
+}
diff --git a/pkg/varlinkapi/containers.go b/pkg/varlinkapi/containers.go
index ac1352dac..17792ccfe 100644
--- a/pkg/varlinkapi/containers.go
+++ b/pkg/varlinkapi/containers.go
@@ -47,6 +47,55 @@ func (i *LibpodAPI) ListContainers(call iopodman.VarlinkCall) error {
return call.ReplyListContainers(listContainers)
}
+func (i *LibpodAPI) Ps(call iopodman.VarlinkCall, opts iopodman.PsOpts) error {
+ var (
+ containers []iopodman.PsContainer
+ )
+ maxWorkers := shared.Parallelize("ps")
+ psOpts := makePsOpts(opts)
+ filters := []string{}
+ if opts.Filters != nil {
+ filters = *opts.Filters
+ }
+ psContainerOutputs, err := shared.GetPsContainerOutput(i.Runtime, psOpts, filters, maxWorkers)
+ if err != nil {
+ return call.ReplyErrorOccurred(err.Error())
+ }
+
+ for _, ctr := range psContainerOutputs {
+ container := iopodman.PsContainer{
+ Id: ctr.ID,
+ Image: ctr.Image,
+ Command: ctr.Command,
+ Created: ctr.Created,
+ Ports: ctr.Ports,
+ Names: ctr.Names,
+ IsInfra: ctr.IsInfra,
+ Status: ctr.Status,
+ State: ctr.State.String(),
+ PidNum: int64(ctr.Pid),
+ RootFsSize: ctr.Size.RootFsSize,
+ RwSize: ctr.Size.RwSize,
+ Pod: ctr.Pod,
+ CreatedAt: ctr.CreatedAt.Format(time.RFC3339Nano),
+ ExitedAt: ctr.ExitedAt.Format(time.RFC3339Nano),
+ StartedAt: ctr.StartedAt.Format(time.RFC3339Nano),
+ Labels: ctr.Labels,
+ NsPid: ctr.PID,
+ Cgroup: ctr.Cgroup,
+ Ipc: ctr.Cgroup,
+ Mnt: ctr.MNT,
+ Net: ctr.NET,
+ PidNs: ctr.PIDNS,
+ User: ctr.User,
+ Uts: ctr.UTS,
+ Mounts: ctr.Mounts,
+ }
+ containers = append(containers, container)
+ }
+ return call.ReplyPs(containers)
+}
+
// GetContainer ...
func (i *LibpodAPI) GetContainer(call iopodman.VarlinkCall, id string) error {
ctr, err := i.Runtime.LookupContainer(id)
@@ -585,6 +634,22 @@ func (i *LibpodAPI) GetContainerStatsWithHistory(call iopodman.VarlinkCall, prev
return call.ReplyGetContainerStatsWithHistory(cStats)
}
+// Spec ...
+func (i *LibpodAPI) Spec(call iopodman.VarlinkCall, name string) error {
+ ctr, err := i.Runtime.LookupContainer(name)
+ if err != nil {
+ return call.ReplyErrorOccurred(err.Error())
+ }
+
+ spec := ctr.Spec()
+ b, err := json.Marshal(spec)
+ if err != nil {
+ return call.ReplyErrorOccurred(err.Error())
+ }
+
+ return call.ReplySpec(string(b))
+}
+
// GetContainersLogs is the varlink endpoint to obtain one or more container logs
func (i *LibpodAPI) GetContainersLogs(call iopodman.VarlinkCall, names []string, follow, latest bool, since string, tail int64, timestamps bool) error {
var wg sync.WaitGroup
diff --git a/pkg/varlinkapi/generate.go b/pkg/varlinkapi/generate.go
new file mode 100644
index 000000000..bc600c397
--- /dev/null
+++ b/pkg/varlinkapi/generate.go
@@ -0,0 +1,30 @@
+// +build varlink
+
+package varlinkapi
+
+import (
+ "encoding/json"
+ "github.com/containers/libpod/cmd/podman/shared"
+ iopodman "github.com/containers/libpod/cmd/podman/varlink"
+)
+
+// GenerateKube ...
+func (i *LibpodAPI) GenerateKube(call iopodman.VarlinkCall, name string, service bool) error {
+ pod, serv, err := shared.GenerateKube(name, service, i.Runtime)
+ if err != nil {
+ return call.ReplyErrorOccurred(err.Error())
+ }
+ podB, err := json.Marshal(pod)
+ if err != nil {
+ return call.ReplyErrorOccurred(err.Error())
+ }
+ servB, err := json.Marshal(serv)
+ if err != nil {
+ return call.ReplyErrorOccurred(err.Error())
+ }
+
+ return call.ReplyGenerateKube(iopodman.KubePodService{
+ Pod: string(podB),
+ Service: string(servB),
+ })
+}
diff --git a/pkg/varlinkapi/util.go b/pkg/varlinkapi/util.go
index 3c4b9b79a..8716c963a 100644
--- a/pkg/varlinkapi/util.go
+++ b/pkg/varlinkapi/util.go
@@ -162,3 +162,36 @@ func stringPullPolicyToType(s string) buildah.PullPolicy {
}
return buildah.PullIfMissing
}
+
+func derefBool(inBool *bool) bool {
+ if inBool == nil {
+ return false
+ }
+ return *inBool
+}
+
+func derefString(in *string) string {
+ if in == nil {
+ return ""
+ }
+ return *in
+}
+
+func makePsOpts(inOpts iopodman.PsOpts) shared.PsOptions {
+ last := 0
+ if inOpts.Last != nil {
+ lastT := *inOpts.Last
+ last = int(lastT)
+ }
+ return shared.PsOptions{
+ All: inOpts.All,
+ Last: last,
+ Latest: derefBool(inOpts.Latest),
+ NoTrunc: derefBool(inOpts.NoTrunc),
+ Pod: derefBool(inOpts.Pod),
+ Size: true,
+ Sort: derefString(inOpts.Sort),
+ Namespace: true,
+ Sync: derefBool(inOpts.Sync),
+ }
+}
diff --git a/pkg/varlinkapi/virtwriter/virtwriter.go b/pkg/varlinkapi/virtwriter/virtwriter.go
new file mode 100644
index 000000000..3adaf6e17
--- /dev/null
+++ b/pkg/varlinkapi/virtwriter/virtwriter.go
@@ -0,0 +1,155 @@
+package virtwriter
+
+import (
+ "bufio"
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "io"
+ "os"
+
+ "k8s.io/client-go/tools/remotecommand"
+)
+
+// SocketDest is the "key" to where IO should go on the varlink
+// multiplexed socket
+type SocketDest int
+
+const (
+ // ToStdout indicates traffic should go stdout
+ ToStdout SocketDest = iota
+ // ToStdin indicates traffic came from stdin
+ ToStdin SocketDest = iota
+ // ToStderr indicates traffuc should go to stderr
+ ToStderr SocketDest = iota
+ // TerminalResize indicates a terminal resize event has occurred
+ // and data should be passed to resizer
+ TerminalResize SocketDest = iota
+ // Quit and detach
+ Quit SocketDest = iota
+)
+
+// IntToSocketDest returns a socketdest based on integer input
+func IntToSocketDest(i int) SocketDest {
+ switch i {
+ case ToStdout.Int():
+ return ToStdout
+ case ToStderr.Int():
+ return ToStderr
+ case ToStdin.Int():
+ return ToStdin
+ case TerminalResize.Int():
+ return TerminalResize
+ case Quit.Int():
+ return Quit
+ default:
+ return ToStderr
+ }
+}
+
+// Int returns the integer representation of the socket dest
+func (sd SocketDest) Int() int {
+ return int(sd)
+}
+
+// VirtWriteCloser are writers for attach which include the dest
+// of the data
+type VirtWriteCloser struct {
+ writer *bufio.Writer
+ dest SocketDest
+}
+
+// NewVirtWriteCloser is a constructor
+func NewVirtWriteCloser(w *bufio.Writer, dest SocketDest) VirtWriteCloser {
+ return VirtWriteCloser{w, dest}
+}
+
+// Close is a required method for a writecloser
+func (v VirtWriteCloser) Close() error {
+ return nil
+}
+
+// Write prepends a header to the input message. The header is
+// 8bytes. Position one contains the destination. Positions
+// 5,6,7,8 are a big-endian encoded uint32 for len of the message.
+func (v VirtWriteCloser) Write(input []byte) (int, error) {
+ header := []byte{byte(v.dest), 0, 0, 0}
+ // Go makes us define the byte for big endian
+ mlen := make([]byte, 4)
+ binary.BigEndian.PutUint32(mlen, uint32(len(input)))
+ // append the message len to the header
+ msg := append(header, mlen...)
+ // append the message to the header
+ msg = append(msg, input...)
+ _, err := v.writer.Write(msg)
+ if err != nil {
+ return 0, err
+ }
+ err = v.writer.Flush()
+ return len(input), err
+}
+
+// Reader decodes the content that comes over the wire and directs it to the proper destination.
+func Reader(r *bufio.Reader, output, errput *os.File, input *io.PipeWriter, resize chan remotecommand.TerminalSize) error {
+ var saveb []byte
+ var eom int
+ for {
+ readb := make([]byte, 32*1024)
+ n, err := r.Read(readb)
+ // TODO, later may be worth checking in len of the read is 0
+ if err != nil {
+ return err
+ }
+ b := append(saveb, readb[0:n]...)
+ // no sense in reading less than the header len
+ for len(b) > 7 {
+ eom = int(binary.BigEndian.Uint32(b[4:8])) + 8
+ // The message and header are togther
+ if len(b) >= eom {
+ out := append([]byte{}, b[8:eom]...)
+
+ switch IntToSocketDest(int(b[0])) {
+ case ToStdout:
+ n, err := output.Write(out)
+ if err != nil {
+ return err
+ }
+ if n < len(out) {
+ return errors.New("short write error occurred on stdout")
+ }
+ case ToStderr:
+ n, err := errput.Write(out)
+ if err != nil {
+ return err
+ }
+ if n < len(out) {
+ return errors.New("short write error occurred on stderr")
+ }
+ case ToStdin:
+ n, err := input.Write(out)
+ if err != nil {
+ return err
+ }
+ if n < len(out) {
+ return errors.New("short write error occurred on stdin")
+ }
+ case TerminalResize:
+ // Resize events come over in bytes, need to be reserialized
+ resizeEvent := remotecommand.TerminalSize{}
+ if err := json.Unmarshal(out, &resizeEvent); err != nil {
+ return err
+ }
+ resize <- resizeEvent
+ case Quit:
+ return nil
+ }
+ b = b[eom:]
+ } else {
+ // We do not have the header and full message, need to slurp again
+ saveb = b
+ break
+ }
+ }
+ }
+ return nil
+}