diff options
-rw-r--r-- | cmd/podman/stop.go | 27 | ||||
-rw-r--r-- | cmd/podman/utils.go | 48 | ||||
-rw-r--r-- | vendor.conf | 1 |
3 files changed, 69 insertions, 7 deletions
diff --git a/cmd/podman/stop.go b/cmd/podman/stop.go index d2fa87730..664d91ea3 100644 --- a/cmd/podman/stop.go +++ b/cmd/podman/stop.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + rt "runtime" "github.com/containers/libpod/cmd/podman/libpodruntime" "github.com/containers/libpod/libpod" @@ -98,21 +99,33 @@ func stopCmd(c *cli.Context) error { } } + var stopFuncs []workerInput for _, ctr := range containers { + con := ctr var stopTimeout uint if c.IsSet("timeout") { stopTimeout = c.Uint("timeout") } else { stopTimeout = ctr.StopTimeout() } - if err := ctr.StopWithTimeout(stopTimeout); err != nil && err != libpod.ErrCtrStopped { - if lastError != nil { - fmt.Fprintln(os.Stderr, lastError) - } - lastError = errors.Wrapf(err, "failed to stop container %v", ctr.ID()) - } else { - fmt.Println(ctr.ID()) + f := func() error { + return con.StopWithTimeout(stopTimeout) + } + stopFuncs = append(stopFuncs, workerInput{ + containerID: con.ID(), + parallelFunc: f, + }) + } + + stopErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, stopFuncs) + + for cid, result := range stopErrors { + if result != nil && result != libpod.ErrCtrStopped { + fmt.Println(result.Error()) + lastError = result + continue } + fmt.Println(cid) } return lastError } diff --git a/cmd/podman/utils.go b/cmd/podman/utils.go index 89ec48dbe..1b767532e 100644 --- a/cmd/podman/utils.go +++ b/cmd/podman/utils.go @@ -5,6 +5,7 @@ import ( "fmt" "os" gosignal "os/signal" + "sync" "github.com/containers/libpod/libpod" "github.com/docker/docker/pkg/signal" @@ -215,3 +216,50 @@ func getPodsFromContext(c *cli.Context, r *libpod.Runtime) ([]*libpod.Pod, error } return pods, lastError } + +type pFunc func() error + +type workerInput struct { + containerID string + parallelFunc pFunc +} + +// worker is a "threaded" worker that takes jobs from the channel "queue" +func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results map[string]error) { + for j := range jobs { + err := j.parallelFunc() + results[j.containerID] = err + wg.Done() + } +} + +// parallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker +// int is determines how many workers/threads should be premade. +func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]error { + var ( + wg sync.WaitGroup + ) + results := make(map[string]error) + paraJobs := make(chan workerInput, len(functions)) + + // If we have more workers than functions, match up the number of workers and functions + if workers > len(functions) { + workers = len(functions) + } + + // Create the workers + for w := 1; w <= workers; w++ { + go worker(&wg, paraJobs, results) + } + + // Add jobs to the workers + for _, j := range functions { + j := j + wg.Add(1) + paraJobs <- j + } + + close(paraJobs) + wg.Wait() + return results +} diff --git a/vendor.conf b/vendor.conf index 75b92e846..be89c418e 100644 --- a/vendor.conf +++ b/vendor.conf @@ -75,6 +75,7 @@ golang.org/x/net c427ad74c6d7a814201695e9ffde0c5d400a7674 golang.org/x/sys master golang.org/x/text f72d8390a633d5dfb0cc84043294db9f6c935756 golang.org/x/time f51c12702a4d776e4c1fa9b0fabab841babae631 +golang.org/x/sync master google.golang.org/grpc v1.0.4 https://github.com/grpc/grpc-go gopkg.in/cheggaaa/pb.v1 v1.0.7 gopkg.in/inf.v0 v0.9.0 |