summaryrefslogtreecommitdiff
path: root/cmd/podman
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/podman')
-rw-r--r--cmd/podman/stop.go27
-rw-r--r--cmd/podman/utils.go48
2 files changed, 68 insertions, 7 deletions
diff --git a/cmd/podman/stop.go b/cmd/podman/stop.go
index d2fa87730..664d91ea3 100644
--- a/cmd/podman/stop.go
+++ b/cmd/podman/stop.go
@@ -3,6 +3,7 @@ package main
import (
"fmt"
"os"
+ rt "runtime"
"github.com/containers/libpod/cmd/podman/libpodruntime"
"github.com/containers/libpod/libpod"
@@ -98,21 +99,33 @@ func stopCmd(c *cli.Context) error {
}
}
+ var stopFuncs []workerInput
for _, ctr := range containers {
+ con := ctr
var stopTimeout uint
if c.IsSet("timeout") {
stopTimeout = c.Uint("timeout")
} else {
stopTimeout = ctr.StopTimeout()
}
- if err := ctr.StopWithTimeout(stopTimeout); err != nil && err != libpod.ErrCtrStopped {
- if lastError != nil {
- fmt.Fprintln(os.Stderr, lastError)
- }
- lastError = errors.Wrapf(err, "failed to stop container %v", ctr.ID())
- } else {
- fmt.Println(ctr.ID())
+ f := func() error {
+ return con.StopWithTimeout(stopTimeout)
+ }
+ stopFuncs = append(stopFuncs, workerInput{
+ containerID: con.ID(),
+ parallelFunc: f,
+ })
+ }
+
+ stopErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, stopFuncs)
+
+ for cid, result := range stopErrors {
+ if result != nil && result != libpod.ErrCtrStopped {
+ fmt.Println(result.Error())
+ lastError = result
+ continue
}
+ fmt.Println(cid)
}
return lastError
}
diff --git a/cmd/podman/utils.go b/cmd/podman/utils.go
index 89ec48dbe..1b767532e 100644
--- a/cmd/podman/utils.go
+++ b/cmd/podman/utils.go
@@ -5,6 +5,7 @@ import (
"fmt"
"os"
gosignal "os/signal"
+ "sync"
"github.com/containers/libpod/libpod"
"github.com/docker/docker/pkg/signal"
@@ -215,3 +216,50 @@ func getPodsFromContext(c *cli.Context, r *libpod.Runtime) ([]*libpod.Pod, error
}
return pods, lastError
}
+
+type pFunc func() error
+
+type workerInput struct {
+ containerID string
+ parallelFunc pFunc
+}
+
+// worker is a "threaded" worker that takes jobs from the channel "queue"
+func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results map[string]error) {
+ for j := range jobs {
+ err := j.parallelFunc()
+ results[j.containerID] = err
+ wg.Done()
+ }
+}
+
+// parallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker
+// int is determines how many workers/threads should be premade.
+func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]error {
+ var (
+ wg sync.WaitGroup
+ )
+ results := make(map[string]error)
+ paraJobs := make(chan workerInput, len(functions))
+
+ // If we have more workers than functions, match up the number of workers and functions
+ if workers > len(functions) {
+ workers = len(functions)
+ }
+
+ // Create the workers
+ for w := 1; w <= workers; w++ {
+ go worker(&wg, paraJobs, results)
+ }
+
+ // Add jobs to the workers
+ for _, j := range functions {
+ j := j
+ wg.Add(1)
+ paraJobs <- j
+ }
+
+ close(paraJobs)
+ wg.Wait()
+ return results
+}