aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libpod/pod_api.go241
-rw-r--r--pkg/api/handlers/libpod/pods.go6
-rw-r--r--pkg/domain/infra/abi/containers.go6
-rw-r--r--pkg/domain/infra/abi/pods.go6
-rw-r--r--pkg/parallel/ctr/ctr.go (renamed from pkg/parallel/parallel_linux.go)45
-rw-r--r--pkg/parallel/parallel.go30
-rw-r--r--pkg/varlinkapi/pods.go7
-rw-r--r--test/e2e/common_test.go4
8 files changed, 167 insertions, 178 deletions
diff --git a/libpod/pod_api.go b/libpod/pod_api.go
index 0ae180356..f2ddba9c9 100644
--- a/libpod/pod_api.go
+++ b/libpod/pod_api.go
@@ -6,6 +6,7 @@ import (
"github.com/containers/podman/v2/libpod/define"
"github.com/containers/podman/v2/libpod/events"
"github.com/containers/podman/v2/pkg/cgroups"
+ "github.com/containers/podman/v2/pkg/parallel"
"github.com/containers/podman/v2/pkg/rootless"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -99,47 +100,52 @@ func (p *Pod) StopWithTimeout(ctx context.Context, cleanup bool, timeout int) (m
return nil, err
}
- ctrErrors := make(map[string]error)
-
// TODO: There may be cases where it makes sense to order stops based on
// dependencies. Should we bother with this?
- // Stop to all containers
- for _, ctr := range allCtrs {
- ctr.lock.Lock()
+ ctrErrChan := make(map[string]<-chan error)
- if err := ctr.syncContainer(); err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
-
- // Ignore containers that are not running
- if ctr.state.State != define.ContainerStateRunning {
- ctr.lock.Unlock()
- continue
- }
- stopTimeout := ctr.config.StopTimeout
- if timeout > -1 {
- stopTimeout = uint(timeout)
- }
- if err := ctr.stop(stopTimeout); err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
+ // Enqueue a function for each container with the parallel executor.
+ for _, ctr := range allCtrs {
+ c := ctr
+ logrus.Debugf("Adding parallel job to stop container %s", c.ID())
+ retChan := parallel.Enqueue(ctx, func() error {
+ // TODO: Might be better to batch stop and cleanup
+ // together?
+ if timeout > -1 {
+ if err := c.StopWithTimeout(uint(timeout)); err != nil {
+ return err
+ }
+ } else {
+ if err := c.Stop(); err != nil {
+ return err
+ }
+ }
- if cleanup {
- if err := ctr.cleanup(ctx); err != nil {
- ctrErrors[ctr.ID()] = err
+ if cleanup {
+ return c.Cleanup(ctx)
}
- }
- ctr.lock.Unlock()
+ return nil
+ })
+
+ ctrErrChan[c.ID()] = retChan
}
p.newPodEvent(events.Stop)
+ ctrErrors := make(map[string]error)
+
+ // Get returned error for every container we worked on
+ for id, channel := range ctrErrChan {
+ if err := <-channel; err != nil {
+ if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
+ continue
+ }
+ ctrErrors[id] = err
+ }
+ }
+
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error stopping some containers")
}
@@ -169,45 +175,29 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) {
return nil, err
}
- ctrErrors := make(map[string]error)
+ ctrErrChan := make(map[string]<-chan error)
- // Clean up all containers
+ // Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
- ctr.lock.Lock()
-
- if err := ctr.syncContainer(); err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
-
- // Ignore containers that are running/paused
- if !ctr.ensureState(define.ContainerStateConfigured, define.ContainerStateCreated, define.ContainerStateStopped, define.ContainerStateExited) {
- ctr.lock.Unlock()
- continue
- }
-
- // Check for running exec sessions, ignore containers with them.
- sessions, err := ctr.getActiveExecSessions()
- if err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
- if len(sessions) > 0 {
- ctr.lock.Unlock()
- continue
- }
+ c := ctr
+ logrus.Debugf("Adding parallel job to clean up container %s", c.ID())
+ retChan := parallel.Enqueue(ctx, func() error {
+ return c.Cleanup(ctx)
+ })
- // TODO: Should we handle restart policy here?
+ ctrErrChan[c.ID()] = retChan
+ }
- ctr.newContainerEvent(events.Cleanup)
+ ctrErrors := make(map[string]error)
- if err := ctr.cleanup(ctx); err != nil {
- ctrErrors[ctr.ID()] = err
+ // Get returned error for every container we worked on
+ for id, channel := range ctrErrChan {
+ if err := <-channel; err != nil {
+ if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
+ continue
+ }
+ ctrErrors[id] = err
}
-
- ctr.lock.Unlock()
}
if len(ctrErrors) > 0 {
@@ -229,7 +219,7 @@ func (p *Pod) Cleanup(ctx context.Context) (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were paused without error
-func (p *Pod) Pause() (map[string]error, error) {
+func (p *Pod) Pause(ctx context.Context) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()
@@ -252,37 +242,34 @@ func (p *Pod) Pause() (map[string]error, error) {
return nil, err
}
- ctrErrors := make(map[string]error)
+ ctrErrChan := make(map[string]<-chan error)
- // Pause to all containers
+ // Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
- ctr.lock.Lock()
+ c := ctr
+ logrus.Debugf("Adding parallel job to pause container %s", c.ID())
+ retChan := parallel.Enqueue(ctx, c.Pause)
- if err := ctr.syncContainer(); err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
+ ctrErrChan[c.ID()] = retChan
+ }
- // Ignore containers that are not running
- if ctr.state.State != define.ContainerStateRunning {
- ctr.lock.Unlock()
- continue
- }
+ p.newPodEvent(events.Pause)
- if err := ctr.pause(); err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
+ ctrErrors := make(map[string]error)
- ctr.lock.Unlock()
+ // Get returned error for every container we worked on
+ for id, channel := range ctrErrChan {
+ if err := <-channel; err != nil {
+ if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
+ continue
+ }
+ ctrErrors[id] = err
+ }
}
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error pausing some containers")
}
- defer p.newPodEvent(events.Pause)
return nil, nil
}
@@ -298,7 +285,7 @@ func (p *Pod) Pause() (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were unpaused without error.
-func (p *Pod) Unpause() (map[string]error, error) {
+func (p *Pod) Unpause(ctx context.Context) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()
@@ -311,38 +298,34 @@ func (p *Pod) Unpause() (map[string]error, error) {
return nil, err
}
- ctrErrors := make(map[string]error)
+ ctrErrChan := make(map[string]<-chan error)
- // Pause to all containers
+ // Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
- ctr.lock.Lock()
+ c := ctr
+ logrus.Debugf("Adding parallel job to unpause container %s", c.ID())
+ retChan := parallel.Enqueue(ctx, c.Unpause)
- if err := ctr.syncContainer(); err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
+ ctrErrChan[c.ID()] = retChan
+ }
- // Ignore containers that are not paused
- if ctr.state.State != define.ContainerStatePaused {
- ctr.lock.Unlock()
- continue
- }
+ p.newPodEvent(events.Unpause)
- if err := ctr.unpause(); err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
+ ctrErrors := make(map[string]error)
- ctr.lock.Unlock()
+ // Get returned error for every container we worked on
+ for id, channel := range ctrErrChan {
+ if err := <-channel; err != nil {
+ if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
+ continue
+ }
+ ctrErrors[id] = err
+ }
}
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error unpausing some containers")
}
-
- defer p.newPodEvent(events.Unpause)
return nil, nil
}
@@ -411,7 +394,7 @@ func (p *Pod) Restart(ctx context.Context) (map[string]error, error) {
// containers. The container ID is mapped to the error encountered. The error is
// set to ErrPodPartialFail.
// If both error and the map are nil, all containers were signalled successfully.
-func (p *Pod) Kill(signal uint) (map[string]error, error) {
+func (p *Pod) Kill(ctx context.Context, signal uint) (map[string]error, error) {
p.lock.Lock()
defer p.lock.Unlock()
@@ -424,44 +407,36 @@ func (p *Pod) Kill(signal uint) (map[string]error, error) {
return nil, err
}
- ctrErrors := make(map[string]error)
+ ctrErrChan := make(map[string]<-chan error)
- // Send a signal to all containers
+ // Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
- ctr.lock.Lock()
-
- if err := ctr.syncContainer(); err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
+ c := ctr
+ logrus.Debugf("Adding parallel job to kill container %s", c.ID())
+ retChan := parallel.Enqueue(ctx, func() error {
+ return c.Kill(signal)
+ })
- // Ignore containers that are not running
- if ctr.state.State != define.ContainerStateRunning {
- ctr.lock.Unlock()
- continue
- }
+ ctrErrChan[c.ID()] = retChan
+ }
- if err := ctr.ociRuntime.KillContainer(ctr, signal, false); err != nil {
- ctr.lock.Unlock()
- ctrErrors[ctr.ID()] = err
- continue
- }
+ p.newPodEvent(events.Kill)
- logrus.Debugf("Killed container %s with signal %d", ctr.ID(), signal)
+ ctrErrors := make(map[string]error)
- ctr.state.StoppedByUser = true
- if err := ctr.save(); err != nil {
- ctrErrors[ctr.ID()] = err
+ // Get returned error for every container we worked on
+ for id, channel := range ctrErrChan {
+ if err := <-channel; err != nil {
+ if errors.Cause(err) == define.ErrCtrStateInvalid || errors.Cause(err) == define.ErrCtrStopped {
+ continue
+ }
+ ctrErrors[id] = err
}
-
- ctr.lock.Unlock()
}
if len(ctrErrors) > 0 {
return ctrErrors, errors.Wrapf(define.ErrPodPartialFail, "error killing some containers")
}
- defer p.newPodEvent(events.Kill)
return nil, nil
}
diff --git a/pkg/api/handlers/libpod/pods.go b/pkg/api/handlers/libpod/pods.go
index 3aa554171..5422411cf 100644
--- a/pkg/api/handlers/libpod/pods.go
+++ b/pkg/api/handlers/libpod/pods.go
@@ -270,7 +270,7 @@ func PodPause(w http.ResponseWriter, r *http.Request) {
utils.PodNotFound(w, name, err)
return
}
- responses, err := pod.Pause()
+ responses, err := pod.Pause(r.Context())
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "Something went wrong", http.StatusInternalServerError, err)
return
@@ -294,7 +294,7 @@ func PodUnpause(w http.ResponseWriter, r *http.Request) {
utils.PodNotFound(w, name, err)
return
}
- responses, err := pod.Unpause()
+ responses, err := pod.Unpause(r.Context())
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "failed to pause pod", http.StatusInternalServerError, err)
return
@@ -402,7 +402,7 @@ func PodKill(w http.ResponseWriter, r *http.Request) {
return
}
- responses, err := pod.Kill(uint(sig))
+ responses, err := pod.Kill(r.Context(), uint(sig))
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
utils.Error(w, "failed to kill pod", http.StatusInternalServerError, err)
return
diff --git a/pkg/domain/infra/abi/containers.go b/pkg/domain/infra/abi/containers.go
index d92911e0c..0107e18c4 100644
--- a/pkg/domain/infra/abi/containers.go
+++ b/pkg/domain/infra/abi/containers.go
@@ -23,7 +23,7 @@ import (
"github.com/containers/podman/v2/pkg/checkpoint"
"github.com/containers/podman/v2/pkg/domain/entities"
"github.com/containers/podman/v2/pkg/domain/infra/abi/terminal"
- "github.com/containers/podman/v2/pkg/parallel"
+ parallelctr "github.com/containers/podman/v2/pkg/parallel/ctr"
"github.com/containers/podman/v2/pkg/ps"
"github.com/containers/podman/v2/pkg/rootless"
"github.com/containers/podman/v2/pkg/signal"
@@ -157,7 +157,7 @@ func (ic *ContainerEngine) ContainerStop(ctx context.Context, namesOrIds []strin
if err != nil && !(options.Ignore && errors.Cause(err) == define.ErrNoSuchCtr) {
return nil, err
}
- errMap, err := parallel.ContainerOp(ctx, ctrs, func(c *libpod.Container) error {
+ errMap, err := parallelctr.ContainerOp(ctx, ctrs, func(c *libpod.Container) error {
var err error
if options.Timeout != nil {
err = c.StopWithTimeout(*options.Timeout)
@@ -321,7 +321,7 @@ func (ic *ContainerEngine) ContainerRm(ctx context.Context, namesOrIds []string,
return reports, nil
}
- errMap, err := parallel.ContainerOp(ctx, ctrs, func(c *libpod.Container) error {
+ errMap, err := parallelctr.ContainerOp(ctx, ctrs, func(c *libpod.Container) error {
err := ic.Libpod.RemoveContainer(ctx, c, options.Force, options.Volumes)
if err != nil {
if options.Ignore && errors.Cause(err) == define.ErrNoSuchCtr {
diff --git a/pkg/domain/infra/abi/pods.go b/pkg/domain/infra/abi/pods.go
index 747da9fd4..258640a81 100644
--- a/pkg/domain/infra/abi/pods.go
+++ b/pkg/domain/infra/abi/pods.go
@@ -66,7 +66,7 @@ func (ic *ContainerEngine) PodKill(ctx context.Context, namesOrIds []string, opt
for _, p := range pods {
report := entities.PodKillReport{Id: p.ID()}
- conErrs, err := p.Kill(uint(sig))
+ conErrs, err := p.Kill(ctx, uint(sig))
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
report.Errs = []error{err}
reports = append(reports, &report)
@@ -92,7 +92,7 @@ func (ic *ContainerEngine) PodPause(ctx context.Context, namesOrIds []string, op
}
for _, p := range pods {
report := entities.PodPauseReport{Id: p.ID()}
- errs, err := p.Pause()
+ errs, err := p.Pause(ctx)
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
report.Errs = []error{err}
continue
@@ -117,7 +117,7 @@ func (ic *ContainerEngine) PodUnpause(ctx context.Context, namesOrIds []string,
}
for _, p := range pods {
report := entities.PodUnpauseReport{Id: p.ID()}
- errs, err := p.Unpause()
+ errs, err := p.Unpause(ctx)
if err != nil && errors.Cause(err) != define.ErrPodPartialFail {
report.Errs = []error{err}
continue
diff --git a/pkg/parallel/parallel_linux.go b/pkg/parallel/ctr/ctr.go
index 442db1502..e8c1292b8 100644
--- a/pkg/parallel/parallel_linux.go
+++ b/pkg/parallel/ctr/ctr.go
@@ -1,11 +1,10 @@
-package parallel
+package ctr
import (
"context"
- "sync"
"github.com/containers/podman/v2/libpod"
- "github.com/pkg/errors"
+ "github.com/containers/podman/v2/pkg/parallel"
"github.com/sirupsen/logrus"
)
@@ -14,44 +13,28 @@ import (
// If no error is returned, each container specified in ctrs will have an entry
// in the resulting map; containers with no error will be set to nil.
func ContainerOp(ctx context.Context, ctrs []*libpod.Container, applyFunc func(*libpod.Container) error) (map[*libpod.Container]error, error) {
- jobControlLock.RLock()
- defer jobControlLock.RUnlock()
-
// We could use a sync.Map but given Go's lack of generic I'd rather
// just use a lock on a normal map...
// The expectation is that most of the time is spent in applyFunc
// anyways.
var (
- errMap = make(map[*libpod.Container]error)
- errLock sync.Mutex
- allDone sync.WaitGroup
+ errMap = make(map[*libpod.Container]<-chan error)
)
for _, ctr := range ctrs {
- // Block until a thread is available
- if err := jobControl.Acquire(ctx, 1); err != nil {
- return nil, errors.Wrapf(err, "error acquiring job control semaphore")
- }
-
- allDone.Add(1)
-
c := ctr
- go func() {
- logrus.Debugf("Launching job on container %s", c.ID())
-
- err := applyFunc(c)
- errLock.Lock()
- errMap[c] = err
- errLock.Unlock()
-
- allDone.Done()
- jobControl.Release(1)
- }()
+ logrus.Debugf("Starting parallel job on container %s", c.ID())
+ errChan := parallel.Enqueue(ctx, func() error {
+ return applyFunc(c)
+ })
+ errMap[c] = errChan
}
- allDone.Wait()
+ finalErr := make(map[*libpod.Container]error)
+ for ctr, errChan := range errMap {
+ err := <-errChan
+ finalErr[ctr] = err
+ }
- return errMap, nil
+ return finalErr, nil
}
-
-// TODO: Add an Enqueue() function that returns a promise
diff --git a/pkg/parallel/parallel.go b/pkg/parallel/parallel.go
index c9e4da50d..4da7e0f89 100644
--- a/pkg/parallel/parallel.go
+++ b/pkg/parallel/parallel.go
@@ -1,6 +1,7 @@
package parallel
import (
+ "context"
"sync"
"github.com/pkg/errors"
@@ -42,3 +43,32 @@ func SetMaxThreads(threads uint) error {
func GetMaxThreads() uint {
return numThreads
}
+
+// Enqueue adds a single function to the parallel jobs queue. This function will
+// be run when an unused thread is available.
+// Returns a receive-only error channel that will return the error (if any) from
+// the provided function fn when fn has finished executing. The channel will be
+// closed after this.
+func Enqueue(ctx context.Context, fn func() error) <-chan error {
+ retChan := make(chan error)
+
+ go func() {
+ jobControlLock.RLock()
+ defer jobControlLock.RUnlock()
+
+ defer close(retChan)
+
+ if err := jobControl.Acquire(ctx, 1); err != nil {
+ retChan <- errors.Wrapf(err, "error acquiring job control semaphore")
+ return
+ }
+
+ err := fn()
+
+ jobControl.Release(1)
+
+ retChan <- err
+ }()
+
+ return retChan
+}
diff --git a/pkg/varlinkapi/pods.go b/pkg/varlinkapi/pods.go
index 189434780..6d03afb7a 100644
--- a/pkg/varlinkapi/pods.go
+++ b/pkg/varlinkapi/pods.go
@@ -3,6 +3,7 @@
package varlinkapi
import (
+ "context"
"encoding/json"
"fmt"
"strconv"
@@ -207,7 +208,7 @@ func (i *VarlinkAPI) KillPod(call iopodman.VarlinkCall, name string, signal int6
if err != nil {
return call.ReplyPodNotFound(name, err.Error())
}
- ctrErrs, err := pod.Kill(killSignal)
+ ctrErrs, err := pod.Kill(context.TODO(), killSignal)
callErr := handlePodCall(call, pod, ctrErrs, err)
if callErr != nil {
return err
@@ -221,7 +222,7 @@ func (i *VarlinkAPI) PausePod(call iopodman.VarlinkCall, name string) error {
if err != nil {
return call.ReplyPodNotFound(name, err.Error())
}
- ctrErrs, err := pod.Pause()
+ ctrErrs, err := pod.Pause(context.TODO())
callErr := handlePodCall(call, pod, ctrErrs, err)
if callErr != nil {
return err
@@ -235,7 +236,7 @@ func (i *VarlinkAPI) UnpausePod(call iopodman.VarlinkCall, name string) error {
if err != nil {
return call.ReplyPodNotFound(name, err.Error())
}
- ctrErrs, err := pod.Unpause()
+ ctrErrs, err := pod.Unpause(context.TODO())
callErr := handlePodCall(call, pod, ctrErrs, err)
if callErr != nil {
return err
diff --git a/test/e2e/common_test.go b/test/e2e/common_test.go
index c663a4dca..ec910109b 100644
--- a/test/e2e/common_test.go
+++ b/test/e2e/common_test.go
@@ -453,7 +453,7 @@ func (p *PodmanTestIntegration) PodmanPID(args []string) (*PodmanSessionIntegrat
func (p *PodmanTestIntegration) Cleanup() {
// Remove all containers
stopall := p.Podman([]string{"stop", "-a", "--time", "0"})
- stopall.Wait(90)
+ stopall.WaitWithDefaultTimeout()
podstop := p.Podman([]string{"pod", "stop", "-a", "-t", "0"})
podstop.WaitWithDefaultTimeout()
@@ -461,7 +461,7 @@ func (p *PodmanTestIntegration) Cleanup() {
podrm.WaitWithDefaultTimeout()
session := p.Podman([]string{"rm", "-fa"})
- session.Wait(90)
+ session.WaitWithDefaultTimeout()
p.StopRemoteService()
// Nuke tempdir