diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/adapter/containers.go | 3 | ||||
-rw-r--r-- | pkg/adapter/containers_remote.go | 222 | ||||
-rw-r--r-- | pkg/adapter/terminal.go | 23 | ||||
-rw-r--r-- | pkg/adapter/terminal_linux.go | 23 | ||||
-rw-r--r-- | pkg/ctime/ctime_linux.go | 3 | ||||
-rw-r--r-- | pkg/hooks/hooks.go | 24 | ||||
-rw-r--r-- | pkg/hooks/monitor.go | 49 | ||||
-rw-r--r-- | pkg/hooks/monitor_test.go | 25 | ||||
-rw-r--r-- | pkg/hooks/read.go | 11 | ||||
-rw-r--r-- | pkg/varlinkapi/attach.go | 4 | ||||
-rw-r--r-- | pkg/varlinkapi/containers.go | 57 | ||||
-rw-r--r-- | pkg/varlinkapi/virtwriter/virtwriter.go | 64 |
12 files changed, 271 insertions, 237 deletions
diff --git a/pkg/adapter/containers.go b/pkg/adapter/containers.go index 47f1b091e..faaef3e60 100644 --- a/pkg/adapter/containers.go +++ b/pkg/adapter/containers.go @@ -1000,7 +1000,8 @@ func (r *LocalRuntime) ExecContainer(ctx context.Context, cli *cliconfig.ExecVal streams.AttachOutput = true streams.AttachError = true - return ExecAttachCtr(ctx, ctr.Container, cli.Tty, cli.Privileged, envs, cmd, cli.User, cli.Workdir, streams, cli.PreserveFDs, cli.DetachKeys) + ec, err = ExecAttachCtr(ctx, ctr.Container, cli.Tty, cli.Privileged, envs, cmd, cli.User, cli.Workdir, streams, cli.PreserveFDs, cli.DetachKeys) + return define.TranslateExecErrorToExitCode(ec, err), err } // Prune removes stopped containers diff --git a/pkg/adapter/containers_remote.go b/pkg/adapter/containers_remote.go index 6b9fc8ee7..5a26f537f 100644 --- a/pkg/adapter/containers_remote.go +++ b/pkg/adapter/containers_remote.go @@ -3,6 +3,7 @@ package adapter import ( + "bufio" "context" "encoding/json" "fmt" @@ -555,93 +556,6 @@ func (r *LocalRuntime) Ps(c *cliconfig.PsValues, opts shared.PsOptions) ([]share return psContainers, nil } -func (r *LocalRuntime) attach(ctx context.Context, stdin, stdout *os.File, cid string, start bool, detachKeys string) (chan error, error) { - var ( - oldTermState *term.State - ) - errChan := make(chan error) - spec, err := r.Spec(cid) - if err != nil { - return nil, err - } - resize := make(chan remotecommand.TerminalSize, 5) - haveTerminal := terminal.IsTerminal(int(os.Stdin.Fd())) - - // Check if we are attached to a terminal. If we are, generate resize - // events, and set the terminal to raw mode - if haveTerminal && spec.Process.Terminal { - logrus.Debugf("Handling terminal attach") - - subCtx, cancel := context.WithCancel(ctx) - defer cancel() - - resizeTty(subCtx, resize) - oldTermState, err = term.SaveState(os.Stdin.Fd()) - if err != nil { - return nil, errors.Wrapf(err, "unable to save terminal state") - } - - logrus.SetFormatter(&RawTtyFormatter{}) - term.SetRawTerminal(os.Stdin.Fd()) - - } - // TODO add detach keys support - reply, err := iopodman.Attach().Send(r.Conn, varlink.Upgrade, cid, detachKeys, start) - if err != nil { - restoreTerminal(oldTermState) - return nil, err - } - - // See if the server accepts the upgraded connection or returns an error - _, err = reply() - - if err != nil { - restoreTerminal(oldTermState) - return nil, err - } - - // These are the varlink sockets - reader := r.Conn.Reader - writer := r.Conn.Writer - - // These are the special writers that encode input from the client. - varlinkStdinWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.ToStdin) - varlinkResizeWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.TerminalResize) - - go func() { - // Read from the wire and direct to stdout or stderr - err := virtwriter.Reader(reader, stdout, os.Stderr, nil, nil) - defer restoreTerminal(oldTermState) - errChan <- err - }() - - go func() { - for termResize := range resize { - b, err := json.Marshal(termResize) - if err != nil { - defer restoreTerminal(oldTermState) - errChan <- err - } - _, err = varlinkResizeWriter.Write(b) - if err != nil { - defer restoreTerminal(oldTermState) - errChan <- err - } - } - }() - - // Takes stdinput and sends it over the wire after being encoded - go func() { - if _, err := io.Copy(varlinkStdinWriter, stdin); err != nil { - defer restoreTerminal(oldTermState) - errChan <- err - } - - }() - return errChan, nil - -} - // Attach to a remote terminal func (r *LocalRuntime) Attach(ctx context.Context, c *cliconfig.AttachValues) error { ctr, err := r.LookupContainer(c.InputArgs[0]) @@ -796,6 +710,49 @@ func (r *LocalRuntime) Start(ctx context.Context, c *cliconfig.StartValues, sigP return exitCode, finalErr } +func (r *LocalRuntime) attach(ctx context.Context, stdin, stdout *os.File, cid string, start bool, detachKeys string) (chan error, error) { + var ( + oldTermState *term.State + ) + spec, err := r.Spec(cid) + if err != nil { + return nil, err + } + resize := make(chan remotecommand.TerminalSize, 5) + haveTerminal := terminal.IsTerminal(int(os.Stdin.Fd())) + + // Check if we are attached to a terminal. If we are, generate resize + // events, and set the terminal to raw mode + if haveTerminal && spec.Process.Terminal { + cancel, oldTermState, err := handleTerminalAttach(ctx, resize) + if err != nil { + return nil, err + } + defer cancel() + defer restoreTerminal(oldTermState) + + logrus.SetFormatter(&RawTtyFormatter{}) + term.SetRawTerminal(os.Stdin.Fd()) + } + + reply, err := iopodman.Attach().Send(r.Conn, varlink.Upgrade, cid, detachKeys, start) + if err != nil { + restoreTerminal(oldTermState) + return nil, err + } + + // See if the server accepts the upgraded connection or returns an error + _, err = reply() + + if err != nil { + restoreTerminal(oldTermState) + return nil, err + } + + errChan := configureVarlinkAttachStdio(r.Conn.Reader, r.Conn.Writer, stdin, stdout, oldTermState, resize, nil) + return errChan, nil +} + // PauseContainers pauses container(s) based on CLI inputs. func (r *LocalRuntime) PauseContainers(ctx context.Context, cli *cliconfig.PauseValues) ([]string, map[string]error, error) { var ( @@ -1037,8 +994,11 @@ func (r *LocalRuntime) Commit(ctx context.Context, c *cliconfig.CommitValues, co // ExecContainer executes a command in the container func (r *LocalRuntime) ExecContainer(ctx context.Context, cli *cliconfig.ExecValues) (int, error) { + var ( + oldTermState *term.State + ec int = define.ExecErrorCodeGeneric + ) // default invalid command exit code - ec := 125 // Validate given environment variables env := map[string]string{} if err := parse.ReadKVStrings(env, []string{}, cli.Env); err != nil { @@ -1051,6 +1011,23 @@ func (r *LocalRuntime) ExecContainer(ctx context.Context, cli *cliconfig.ExecVal envs = append(envs, fmt.Sprintf("%s=%s", k, v)) } + resize := make(chan remotecommand.TerminalSize, 5) + haveTerminal := terminal.IsTerminal(int(os.Stdin.Fd())) + + // Check if we are attached to a terminal. If we are, generate resize + // events, and set the terminal to raw mode + if haveTerminal && cli.Tty { + cancel, oldTermState, err := handleTerminalAttach(ctx, resize) + if err != nil { + return ec, err + } + defer cancel() + defer restoreTerminal(oldTermState) + + logrus.SetFormatter(&RawTtyFormatter{}) + term.SetRawTerminal(os.Stdin.Fd()) + } + opts := iopodman.ExecOpts{ Name: cli.InputArgs[0], Tty: cli.Tty, @@ -1059,18 +1036,79 @@ func (r *LocalRuntime) ExecContainer(ctx context.Context, cli *cliconfig.ExecVal User: &cli.User, Workdir: &cli.Workdir, Env: &envs, + DetachKeys: &cli.DetachKeys, + } + + inputStream := os.Stdin + if !cli.Interactive { + inputStream = nil } - receive, err := iopodman.ExecContainer().Send(r.Conn, varlink.Upgrade, opts) + reply, err := iopodman.ExecContainer().Send(r.Conn, varlink.Upgrade, opts) if err != nil { return ec, errors.Wrapf(err, "Exec failed to contact service for %s", cli.InputArgs) } - _, err = receive() + _, err = reply() if err != nil { return ec, errors.Wrapf(err, "Exec operation failed for %s", cli.InputArgs) } + ecChan := make(chan int, 1) + errChan := configureVarlinkAttachStdio(r.Conn.Reader, r.Conn.Writer, inputStream, os.Stdout, oldTermState, resize, ecChan) + + ec = <-ecChan + err = <-errChan + + return ec, err +} + +func configureVarlinkAttachStdio(reader *bufio.Reader, writer *bufio.Writer, stdin *os.File, stdout *os.File, oldTermState *term.State, resize chan remotecommand.TerminalSize, ecChan chan int) chan error { + errChan := make(chan error, 1) + // These are the special writers that encode input from the client. + varlinkStdinWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.ToStdin) + varlinkResizeWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.TerminalResize) + + go func() { + // Read from the wire and direct to stdout or stderr + err := virtwriter.Reader(reader, stdout, os.Stderr, nil, nil, ecChan) + defer restoreTerminal(oldTermState) + sendGenericError(ecChan) + errChan <- err + }() + + go func() { + for termResize := range resize { + b, err := json.Marshal(termResize) + if err != nil { + defer restoreTerminal(oldTermState) + sendGenericError(ecChan) + errChan <- err + } + _, err = varlinkResizeWriter.Write(b) + if err != nil { + defer restoreTerminal(oldTermState) + sendGenericError(ecChan) + errChan <- err + } + } + }() + + if stdin != nil { + // Takes stdinput and sends it over the wire after being encoded + go func() { + if _, err := io.Copy(varlinkStdinWriter, stdin); err != nil { + defer restoreTerminal(oldTermState) + sendGenericError(ecChan) + errChan <- err + } + + }() + } + return errChan +} - // TODO return exit code from exec call - return 0, nil +func sendGenericError(ecChan chan int) { + if ecChan != nil { + ecChan <- define.ExecErrorCodeGeneric + } } diff --git a/pkg/adapter/terminal.go b/pkg/adapter/terminal.go index 373c78322..51b747d23 100644 --- a/pkg/adapter/terminal.go +++ b/pkg/adapter/terminal.go @@ -7,6 +7,7 @@ import ( "github.com/docker/docker/pkg/signal" "github.com/docker/docker/pkg/term" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "k8s.io/client-go/tools/remotecommand" ) @@ -76,3 +77,25 @@ func (f *RawTtyFormatter) Format(entry *logrus.Entry) ([]byte, error) { return bytes, err } + +func handleTerminalAttach(ctx context.Context, resize chan remotecommand.TerminalSize) (context.CancelFunc, *term.State, error) { + logrus.Debugf("Handling terminal attach") + + subCtx, cancel := context.WithCancel(ctx) + + resizeTty(subCtx, resize) + + oldTermState, err := term.SaveState(os.Stdin.Fd()) + if err != nil { + // allow caller to not have to do any cleaning up if we error here + cancel() + return nil, nil, errors.Wrapf(err, "unable to save terminal state") + } + + logrus.SetFormatter(&RawTtyFormatter{}) + if _, err := term.SetRawTerminal(os.Stdin.Fd()); err != nil { + return cancel, nil, err + } + + return cancel, oldTermState, nil +} diff --git a/pkg/adapter/terminal_linux.go b/pkg/adapter/terminal_linux.go index de2600b75..26cfd7b5e 100644 --- a/pkg/adapter/terminal_linux.go +++ b/pkg/adapter/terminal_linux.go @@ -6,7 +6,6 @@ import ( "os" "github.com/containers/libpod/libpod" - "github.com/docker/docker/pkg/term" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh/terminal" @@ -108,25 +107,3 @@ func StartAttachCtr(ctx context.Context, ctr *libpod.Container, stdout, stderr, return nil } - -func handleTerminalAttach(ctx context.Context, resize chan remotecommand.TerminalSize) (context.CancelFunc, *term.State, error) { - logrus.Debugf("Handling terminal attach") - - subCtx, cancel := context.WithCancel(ctx) - - resizeTty(subCtx, resize) - - oldTermState, err := term.SaveState(os.Stdin.Fd()) - if err != nil { - // allow caller to not have to do any cleaning up if we error here - cancel() - return nil, nil, errors.Wrapf(err, "unable to save terminal state") - } - - logrus.SetFormatter(&RawTtyFormatter{}) - if _, err := term.SetRawTerminal(os.Stdin.Fd()); err != nil { - return cancel, nil, err - } - - return cancel, oldTermState, nil -} diff --git a/pkg/ctime/ctime_linux.go b/pkg/ctime/ctime_linux.go index 28ad959cf..113693e87 100644 --- a/pkg/ctime/ctime_linux.go +++ b/pkg/ctime/ctime_linux.go @@ -10,5 +10,6 @@ import ( func created(fi os.FileInfo) time.Time { st := fi.Sys().(*syscall.Stat_t) - return time.Unix(st.Ctim.Sec, st.Ctim.Nsec) + //nolint + return time.Unix(int64(st.Ctim.Sec), int64(st.Ctim.Nsec)) } diff --git a/pkg/hooks/hooks.go b/pkg/hooks/hooks.go index 5ed028b95..b962ffa5c 100644 --- a/pkg/hooks/hooks.go +++ b/pkg/hooks/hooks.go @@ -4,7 +4,6 @@ package hooks import ( "context" "fmt" - "path/filepath" "sort" "strings" "sync" @@ -138,26 +137,3 @@ func (m *Manager) Hooks(config *rspec.Spec, annotations map[string]string, hasBi return extensionStageHooks, nil } - -// remove remove a hook by name. -func (m *Manager) remove(hook string) (ok bool) { - m.lock.Lock() - defer m.lock.Unlock() - _, ok = m.hooks[hook] - if ok { - delete(m.hooks, hook) - } - return ok -} - -// add adds a hook by path -func (m *Manager) add(path string) (err error) { - m.lock.Lock() - defer m.lock.Unlock() - hook, err := Read(path, m.extensionStages) - if err != nil { - return err - } - m.hooks[filepath.Base(path)] = hook - return nil -} diff --git a/pkg/hooks/monitor.go b/pkg/hooks/monitor.go index febe3483f..c50b321f2 100644 --- a/pkg/hooks/monitor.go +++ b/pkg/hooks/monitor.go @@ -2,9 +2,8 @@ package hooks import ( "context" - "os" - "path/filepath" + current "github.com/containers/libpod/pkg/hooks/1.0.0" "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" ) @@ -49,47 +48,11 @@ func (m *Manager) Monitor(ctx context.Context, sync chan<- error) { for { select { case event := <-watcher.Events: - filename := filepath.Base(event.Name) - if len(m.directories) <= 1 { - if event.Op&fsnotify.Remove == fsnotify.Remove { - ok := m.remove(filename) - if ok { - logrus.Debugf("removed hook %s", event.Name) - } - } else if event.Op&fsnotify.Create == fsnotify.Create || event.Op&fsnotify.Write == fsnotify.Write { - err = m.add(event.Name) - if err == nil { - logrus.Debugf("added hook %s", event.Name) - } else if err != ErrNoJSONSuffix { - logrus.Errorf("failed to add hook %s: %v", event.Name, err) - } - } - } else if event.Op&fsnotify.Create == fsnotify.Create || event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Remove == fsnotify.Remove { - err = nil - found := false - for i := len(m.directories) - 1; i >= 0; i-- { - path := filepath.Join(m.directories[i], filename) - err = m.add(path) - if err == nil { - found = true - logrus.Debugf("(re)added hook %s (triggered activity on %s)", path, event.Name) - break - } else if err == ErrNoJSONSuffix { - found = true - break // this is not going to change for fallback directories - } else if os.IsNotExist(err) { - continue // move on to the next fallback directory - } else { - found = true - logrus.Errorf("failed to (re)add hook %s (triggered by activity on %s): %v", path, event.Name, err) - break - } - } - if (found || event.Op&fsnotify.Remove == fsnotify.Remove) && err != nil { - ok := m.remove(filename) - if ok { - logrus.Debugf("removed hook %s (triggered by activity on %s)", filename, event.Name) - } + m.hooks = make(map[string]*current.Hook) + for _, dir := range m.directories { + err = ReadDir(dir, m.extensionStages, m.hooks) + if err != nil { + logrus.Errorf("failed loading hooks for %s: %v", event.Name, err) } } case <-ctx.Done(): diff --git a/pkg/hooks/monitor_test.go b/pkg/hooks/monitor_test.go index 31d7f9e39..dc67eaf83 100644 --- a/pkg/hooks/monitor_test.go +++ b/pkg/hooks/monitor_test.go @@ -226,7 +226,28 @@ func TestMonitorTwoDirGood(t *testing.T) { assert.Equal(t, primaryInjected, config.Hooks) // masked by primary }) - t.Run("bad-primary-addition", func(t *testing.T) { + primaryPath2 := filepath.Join(primaryDir, "0a.json") //0a because it will be before a.json alphabetically + + t.Run("bad-primary-new-addition", func(t *testing.T) { + err = ioutil.WriteFile(primaryPath2, []byte("{\"version\": \"-1\"}"), 0644) + if err != nil { + t.Fatal(err) + } + + time.Sleep(100 * time.Millisecond) // wait for monitor to notice + + config := &rspec.Spec{} + fmt.Println("expected: ", config.Hooks) + expected := primaryInjected // 0a.json is bad, a.json is still good + _, err = manager.Hooks(config, map[string]string{}, false) + fmt.Println("actual: ", config.Hooks) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, expected, config.Hooks) + }) + + t.Run("bad-primary-same-addition", func(t *testing.T) { err = ioutil.WriteFile(primaryPath, []byte("{\"version\": \"-1\"}"), 0644) if err != nil { t.Fatal(err) @@ -235,7 +256,7 @@ func TestMonitorTwoDirGood(t *testing.T) { time.Sleep(100 * time.Millisecond) // wait for monitor to notice config := &rspec.Spec{} - expected := config.Hooks + expected := fallbackInjected _, err = manager.Hooks(config, map[string]string{}, false) if err != nil { t.Fatal(err) diff --git a/pkg/hooks/read.go b/pkg/hooks/read.go index d3995a0be..560ff1899 100644 --- a/pkg/hooks/read.go +++ b/pkg/hooks/read.go @@ -67,7 +67,7 @@ func ReadDir(path string, extensionStages []string, hooks map[string]*current.Ho if err != nil { return err } - + res := err for _, file := range files { filePath := filepath.Join(path, file.Name()) hook, err := Read(filePath, extensionStages) @@ -80,12 +80,17 @@ func ReadDir(path string, extensionStages []string, hooks map[string]*current.Ho continue } } - return err + if res == nil { + res = err + } else { + res = errors.Wrapf(res, "%v", err) + } + continue } hooks[file.Name()] = hook logrus.Debugf("added hook %s", filePath) } - return nil + return res } func init() { diff --git a/pkg/varlinkapi/attach.go b/pkg/varlinkapi/attach.go index 97ba525a5..1f8d48eb9 100644 --- a/pkg/varlinkapi/attach.go +++ b/pkg/varlinkapi/attach.go @@ -68,7 +68,7 @@ func (i *LibpodAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys st reader, writer, _, pw, streams := setupStreams(call) go func() { - if err := virtwriter.Reader(reader, nil, nil, pw, resize); err != nil { + if err := virtwriter.Reader(reader, nil, nil, pw, resize, nil); err != nil { errChan <- err } }() @@ -83,7 +83,7 @@ func (i *LibpodAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys st logrus.Error(finalErr) } - if err = virtwriter.HangUp(writer); err != nil { + if err = virtwriter.HangUp(writer, 0); err != nil { logrus.Errorf("Failed to HANG-UP attach to %s: %s", ctr.ID(), err.Error()) } return call.Writer.Flush() diff --git a/pkg/varlinkapi/containers.go b/pkg/varlinkapi/containers.go index 19a8bfd2e..cd5f305c9 100644 --- a/pkg/varlinkapi/containers.go +++ b/pkg/varlinkapi/containers.go @@ -782,6 +782,9 @@ func (i *LibpodAPI) ExecContainer(call iopodman.VarlinkCall, opts iopodman.ExecO fmt.Sprintf("exec requires a running container, %s is %s", ctr.ID(), state.String())) } + // ACK the client upgrade request + call.ReplyExecContainer() + envs := []string{} if opts.Env != nil { envs = *opts.Env @@ -797,44 +800,52 @@ func (i *LibpodAPI) ExecContainer(call iopodman.VarlinkCall, opts iopodman.ExecO workDir = *opts.Workdir } + var detachKeys string + if opts.DetachKeys != nil { + detachKeys = *opts.DetachKeys + } + resizeChan := make(chan remotecommand.TerminalSize) - errChan := make(chan error) reader, writer, _, pipeWriter, streams := setupStreams(call) + type ExitCodeError struct { + ExitCode uint32 + Error error + } + ecErrChan := make(chan ExitCodeError, 1) + go func() { - fmt.Printf("ExecContainer Start Reader\n") - if err := virtwriter.Reader(reader, nil, nil, pipeWriter, resizeChan); err != nil { - fmt.Printf("ExecContainer Reader err %s, %s\n", err.Error(), errors.Cause(err).Error()) - errChan <- err + if err := virtwriter.Reader(reader, nil, nil, pipeWriter, resizeChan, nil); err != nil { + ecErrChan <- ExitCodeError{ + define.ExecErrorCodeGeneric, + err, + } } }() - // Debugging... - time.Sleep(5 * time.Second) - go func() { - fmt.Printf("ExecContainer Start ctr.Exec\n") - // TODO detach keys and resize - // TODO add handling for exit code - // TODO capture exit code and return to main thread - _, err := ctr.Exec(opts.Tty, opts.Privileged, envs, opts.Cmd, user, workDir, streams, 0, nil, "") + ec, err := ctr.Exec(opts.Tty, opts.Privileged, envs, opts.Cmd, user, workDir, streams, 0, resizeChan, detachKeys) if err != nil { - fmt.Printf("ExecContainer Exec err %s, %s\n", err.Error(), errors.Cause(err).Error()) - errChan <- errors.Wrapf(err, "ExecContainer failed for container %s", ctr.ID()) + logrus.Errorf(err.Error()) + } + ecErrChan <- ExitCodeError{ + uint32(ec), + err, } }() - execErr := <-errChan + ecErr := <-ecErrChan - if execErr != nil && errors.Cause(execErr) != io.EOF { - fmt.Printf("ExecContainer err: %s\n", execErr.Error()) - return call.ReplyErrorOccurred(execErr.Error()) - } + exitCode := define.TranslateExecErrorToExitCode(int(ecErr.ExitCode), ecErr.Error) - if err = virtwriter.HangUp(writer); err != nil { - fmt.Printf("ExecContainer hangup err: %s\n", err.Error()) + if err = virtwriter.HangUp(writer, uint32(exitCode)); err != nil { logrus.Errorf("ExecContainer failed to HANG-UP on %s: %s", ctr.ID(), err.Error()) } - return call.Writer.Flush() + + if err := call.Writer.Flush(); err != nil { + logrus.Errorf("Exec Container err: %s", err.Error()) + } + + return ecErr.Error } diff --git a/pkg/varlinkapi/virtwriter/virtwriter.go b/pkg/varlinkapi/virtwriter/virtwriter.go index 0da2a91fc..27ecd1f52 100644 --- a/pkg/varlinkapi/virtwriter/virtwriter.go +++ b/pkg/varlinkapi/virtwriter/virtwriter.go @@ -89,10 +89,14 @@ func (v VirtWriteCloser) Write(input []byte) (int, error) { } // Reader decodes the content that comes over the wire and directs it to the proper destination. -func Reader(r *bufio.Reader, output io.Writer, errput io.Writer, input io.Writer, resize chan remotecommand.TerminalSize) error { +func Reader(r *bufio.Reader, output, errput, input io.Writer, resize chan remotecommand.TerminalSize, execEcChan chan int) error { var messageSize int64 headerBytes := make([]byte, 8) + if r == nil { + return errors.Errorf("Reader must not be nil") + } + for { n, err := io.ReadFull(r, headerBytes) if err != nil { @@ -106,35 +110,43 @@ func Reader(r *bufio.Reader, output io.Writer, errput io.Writer, input io.Writer switch IntToSocketDest(int(headerBytes[0])) { case ToStdout: - _, err := io.CopyN(output, r, messageSize) - if err != nil { - return err + if output != nil { + _, err := io.CopyN(output, r, messageSize) + if err != nil { + return err + } } case ToStderr: - _, err := io.CopyN(errput, r, messageSize) - if err != nil { - return err + if errput != nil { + _, err := io.CopyN(errput, r, messageSize) + if err != nil { + return err + } } case ToStdin: - _, err := io.CopyN(input, r, messageSize) - if err != nil { - return err - } - case TerminalResize: - out := make([]byte, messageSize) - if messageSize > 0 { - _, err = io.ReadFull(r, out) - + if input != nil { + _, err := io.CopyN(input, r, messageSize) if err != nil { return err } } - // Resize events come over in bytes, need to be reserialized - resizeEvent := remotecommand.TerminalSize{} - if err := json.Unmarshal(out, &resizeEvent); err != nil { - return err + case TerminalResize: + if resize != nil { + out := make([]byte, messageSize) + if messageSize > 0 { + _, err = io.ReadFull(r, out) + + if err != nil { + return err + } + } + // Resize events come over in bytes, need to be reserialized + resizeEvent := remotecommand.TerminalSize{} + if err := json.Unmarshal(out, &resizeEvent); err != nil { + return err + } + resize <- resizeEvent } - resize <- resizeEvent case Quit: out := make([]byte, messageSize) if messageSize > 0 { @@ -144,6 +156,10 @@ func Reader(r *bufio.Reader, output io.Writer, errput io.Writer, input io.Writer return err } } + if execEcChan != nil { + ecInt := binary.BigEndian.Uint32(out) + execEcChan <- int(ecInt) + } return nil default: @@ -154,9 +170,11 @@ func Reader(r *bufio.Reader, output io.Writer, errput io.Writer, input io.Writer } // HangUp sends message to peer to close connection -func HangUp(writer *bufio.Writer) (err error) { +func HangUp(writer *bufio.Writer, ec uint32) (err error) { n := 0 - msg := []byte("HANG-UP") + msg := make([]byte, 4) + + binary.BigEndian.PutUint32(msg, ec) writeQuit := NewVirtWriteCloser(writer, Quit) if n, err = writeQuit.Write(msg); err != nil { |