diff options
author | OpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com> | 2020-10-07 15:06:02 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-07 15:06:02 -0400 |
commit | 0e1d01103e45430693736dac10be13c49cf23f03 (patch) | |
tree | aed0f84e88d4e4de22b5e262761e95c951f4f610 /libpod/pod_api.go | |
parent | 9ae873e60e149677db66782eaf2b4ed1402e97d2 (diff) | |
parent | 55f5e4af11a2428b14b36bbdcd4b1d91e868d786 (diff) | |
download | podman-0e1d01103e45430693736dac10be13c49cf23f03.tar.gz podman-0e1d01103e45430693736dac10be13c49cf23f03.tar.bz2 podman-0e1d01103e45430693736dac10be13c49cf23f03.zip |
Merge pull request #7382 from mheon/pod_parallel
Move pod jobs to parallel execution
Diffstat (limited to 'libpod/pod_api.go')
-rw-r--r-- | libpod/pod_api.go | 241 |
1 files changed, 108 insertions, 133 deletions
diff --git a/libpod/pod_api.go b/libpod/pod_api.go index 0ae180356..f2ddba9c9 100644 --- a/libpod/pod_api.go +++ b/libpod/pod_api.go @@ -6,6 +6,7 @@ import ( "github.com/containers/podman/v2/libpod/define" "github.com/containers/podman/v2/libpod/events" "github.com/containers/podman/v2/pkg/cgroups" + "github.com/containers/podman/v2/pkg/parallel" "github.com/containers/podman/v2/pkg/rootless" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -99,47 +100,52 @@ func (p *Pod) StopWithTimeout(ctx context.Context, cleanup bool, timeout int) (m return nil, err } - ctrErrors := make(map[string]error) - // TODO: There may be cases where it makes sense to order stops based on // dependencies. Should we bother with this? - // Stop to all containers - for _, ctr := range allCtrs { - ctr.lock.Lock() + ctrErrChan := make(map[string]<-chan error) - if err := ctr.syncContainer(); err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } - - // Ignore containers that are not running - if ctr.state.State != define.ContainerStateRunning { - ctr.lock.Unlock() - continue - } - stopTimeout := ctr.config.StopTimeout - if timeout > -1 { - stopTimeout = uint(timeout) - } - if err := ctr.stop(stopTimeout); err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } + // Enqueue a function for each container with the parallel executor. + for _, ctr := range allCtrs { + c := ctr + logrus.Debugf("Adding parallel job to stop container %s", c.ID()) + retChan := parallel.Enqueue(ctx, func() error { + // TODO: Might be better to batch stop and cleanup + // together? + if timeout > -1 { + if err := c.StopWithTimeout(uint(timeout)); err != nil { + return err + } + } else { + if err := c.Stop(); err != nil { + return err + } + } - if cleanup { - if err := ctr.cleanup(ctx); err != nil { - ctrErrors[ctr.ID()] = err + if cleanup { + return c.Cleanup(ctx) } - } - ctr.lock.Unlock() + return nil + }) + + ctrErrChan[c.ID()] = retChan } p.newPodEvent(events.Stop) + ctrErrors := make(map[string]error) + + // Get returned error for every container we worked on + for id, channel := range ctrErrChan { + if err := <-channel; err != nil { + if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped { + continue + } + ctrErrors[id] = err + } + } + if len(ctrErrors) > 0 { return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error stopping some containers") } @@ -169,45 +175,29 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) { return nil, err } - ctrErrors := make(map[string]error) + ctrErrChan := make(map[string]<-chan error) - // Clean up all containers + // Enqueue a function for each container with the parallel executor. for _, ctr := range allCtrs { - ctr.lock.Lock() - - if err := ctr.syncContainer(); err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } - - // Ignore containers that are running/paused - if !ctr.ensureState(define.ContainerStateConfigured, define.ContainerStateCreated, define.ContainerStateStopped, define.ContainerStateExited) { - ctr.lock.Unlock() - continue - } - - // Check for running exec sessions, ignore containers with them. - sessions, err := ctr.getActiveExecSessions() - if err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } - if len(sessions) > 0 { - ctr.lock.Unlock() - continue - } + c := ctr + logrus.Debugf("Adding parallel job to clean up container %s", c.ID()) + retChan := parallel.Enqueue(ctx, func() error { + return c.Cleanup(ctx) + }) - // TODO: Should we handle restart policy here? + ctrErrChan[c.ID()] = retChan + } - ctr.newContainerEvent(events.Cleanup) + ctrErrors := make(map[string]error) - if err := ctr.cleanup(ctx); err != nil { - ctrErrors[ctr.ID()] = err + // Get returned error for every container we worked on + for id, channel := range ctrErrChan { + if err := <-channel; err != nil { + if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped { + continue + } + ctrErrors[id] = err } - - ctr.lock.Unlock() } if len(ctrErrors) > 0 { @@ -229,7 +219,7 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) { // containers. The container ID is mapped to the error encountered. The error is // set to ErrPodPartialFail. // If both error and the map are nil, all containers were paused without error -func (p *Pod) Pause() (map[string]error, error) { +func (p *Pod) Pause(ctx context.Context) (map[string]error, error) { p.lock.Lock() defer p.lock.Unlock() @@ -252,37 +242,34 @@ func (p *Pod) Pause() (map[string]error, error) { return nil, err } - ctrErrors := make(map[string]error) + ctrErrChan := make(map[string]<-chan error) - // Pause to all containers + // Enqueue a function for each container with the parallel executor. for _, ctr := range allCtrs { - ctr.lock.Lock() + c := ctr + logrus.Debugf("Adding parallel job to pause container %s", c.ID()) + retChan := parallel.Enqueue(ctx, c.Pause) - if err := ctr.syncContainer(); err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } + ctrErrChan[c.ID()] = retChan + } - // Ignore containers that are not running - if ctr.state.State != define.ContainerStateRunning { - ctr.lock.Unlock() - continue - } + p.newPodEvent(events.Pause) - if err := ctr.pause(); err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } + ctrErrors := make(map[string]error) - ctr.lock.Unlock() + // Get returned error for every container we worked on + for id, channel := range ctrErrChan { + if err := <-channel; err != nil { + if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped { + continue + } + ctrErrors[id] = err + } } if len(ctrErrors) > 0 { return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error pausing some containers") } - defer p.newPodEvent(events.Pause) return nil, nil } @@ -298,7 +285,7 @@ func (p *Pod) Pause() (map[string]error, error) { // containers. The container ID is mapped to the error encountered. The error is // set to ErrPodPartialFail. // If both error and the map are nil, all containers were unpaused without error. -func (p *Pod) Unpause() (map[string]error, error) { +func (p *Pod) Unpause(ctx context.Context) (map[string]error, error) { p.lock.Lock() defer p.lock.Unlock() @@ -311,38 +298,34 @@ func (p *Pod) Unpause() (map[string]error, error) { return nil, err } - ctrErrors := make(map[string]error) + ctrErrChan := make(map[string]<-chan error) - // Pause to all containers + // Enqueue a function for each container with the parallel executor. for _, ctr := range allCtrs { - ctr.lock.Lock() + c := ctr + logrus.Debugf("Adding parallel job to unpause container %s", c.ID()) + retChan := parallel.Enqueue(ctx, c.Unpause) - if err := ctr.syncContainer(); err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } + ctrErrChan[c.ID()] = retChan + } - // Ignore containers that are not paused - if ctr.state.State != define.ContainerStatePaused { - ctr.lock.Unlock() - continue - } + p.newPodEvent(events.Unpause) - if err := ctr.unpause(); err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } + ctrErrors := make(map[string]error) - ctr.lock.Unlock() + // Get returned error for every container we worked on + for id, channel := range ctrErrChan { + if err := <-channel; err != nil { + if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped { + continue + } + ctrErrors[id] = err + } } if len(ctrErrors) > 0 { return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error unpausing some containers") } - - defer p.newPodEvent(events.Unpause) return nil, nil } @@ -411,7 +394,7 @@ func (p *Pod) Restart(ctx context.Context) (map[string]error, error) { // containers. The container ID is mapped to the error encountered. The error is // set to ErrPodPartialFail. // If both error and the map are nil, all containers were signalled successfully. -func (p *Pod) Kill(signal uint) (map[string]error, error) { +func (p *Pod) Kill(ctx context.Context, signal uint) (map[string]error, error) { p.lock.Lock() defer p.lock.Unlock() @@ -424,44 +407,36 @@ func (p *Pod) Kill(signal uint) (map[string]error, error) { return nil, err } - ctrErrors := make(map[string]error) + ctrErrChan := make(map[string]<-chan error) - // Send a signal to all containers + // Enqueue a function for each container with the parallel executor. for _, ctr := range allCtrs { - ctr.lock.Lock() - - if err := ctr.syncContainer(); err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } + c := ctr + logrus.Debugf("Adding parallel job to kill container %s", c.ID()) + retChan := parallel.Enqueue(ctx, func() error { + return c.Kill(signal) + }) - // Ignore containers that are not running - if ctr.state.State != define.ContainerStateRunning { - ctr.lock.Unlock() - continue - } + ctrErrChan[c.ID()] = retChan + } - if err := ctr.ociRuntime.KillContainer(ctr, signal, false); err != nil { - ctr.lock.Unlock() - ctrErrors[ctr.ID()] = err - continue - } + p.newPodEvent(events.Kill) - logrus.Debugf("Killed container %s with signal %d", ctr.ID(), signal) + ctrErrors := make(map[string]error) - ctr.state.StoppedByUser = true - if err := ctr.save(); err != nil { - ctrErrors[ctr.ID()] = err + // Get returned error for every container we worked on + for id, channel := range ctrErrChan { + if err := <-channel; err != nil { + if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped { + continue + } + ctrErrors[id] = err } - - ctr.lock.Unlock() } if len(ctrErrors) > 0 { return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error killing some containers") } - defer p.newPodEvent(events.Kill) return nil, nil } |