summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbaude <bbaude@redhat.com>2018-10-23 18:40:34 -0500
committerbaude <bbaude@redhat.com>2018-10-25 07:50:46 -0500
commit3e5a5c68da9884ddb46cb9e84435956996347a4b (patch)
treea4b169f923fc64b3fadab0bab8de711d677eb3c5
parent57f778aed93efc0961b1335bcd07c3c82a11da0a (diff)
downloadpodman-3e5a5c68da9884ddb46cb9e84435956996347a4b.tar.gz
podman-3e5a5c68da9884ddb46cb9e84435956996347a4b.tar.bz2
podman-3e5a5c68da9884ddb46cb9e84435956996347a4b.zip
Add --max-workers and heuristics for parallel operations
add a global flag for --max-workers so users can limit the number of parallel operations for a given function. also, when not limited by max-workers, we implement a heuristic function that returns the number of preferred parallel workers based on the number of CPUs and the given operation. Signed-off-by: baude <bbaude@redhat.com>
-rw-r--r--cmd/podman/main.go5
-rw-r--r--cmd/podman/ps.go9
-rw-r--r--cmd/podman/rm.go23
-rw-r--r--cmd/podman/shared/container.go4
-rw-r--r--cmd/podman/shared/parallel.go91
-rw-r--r--cmd/podman/stop.go20
-rw-r--r--cmd/podman/utils.go66
7 files changed, 135 insertions, 83 deletions
diff --git a/cmd/podman/main.go b/cmd/podman/main.go
index d4c8454a8..38eac4504 100644
--- a/cmd/podman/main.go
+++ b/cmd/podman/main.go
@@ -211,6 +211,11 @@ func main() {
Value: hooks.DefaultDir,
Hidden: true,
},
+ cli.IntFlag{
+ Name: "max-workers",
+ Usage: "the maximum number of workers for parallel operations",
+ Hidden: true,
+ },
cli.StringFlag{
Name: "log-level",
Usage: "log messages above specified level: debug, info, warn, error (default), fatal or panic",
diff --git a/cmd/podman/ps.go b/cmd/podman/ps.go
index a468f6121..fa333f952 100644
--- a/cmd/podman/ps.go
+++ b/cmd/podman/ps.go
@@ -20,6 +20,7 @@ import (
"github.com/cri-o/ocicni/pkg/ocicni"
"github.com/docker/go-units"
"github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
"github.com/urfave/cli"
"k8s.io/apimachinery/pkg/fields"
)
@@ -300,7 +301,13 @@ func psCmd(c *cli.Context) error {
outputContainers = []*libpod.Container{latestCtr}
}
- pss := shared.PBatch(outputContainers, 8, opts)
+ maxWorkers := shared.Parallelize("ps")
+ if c.GlobalIsSet("max-workers") {
+ maxWorkers = c.GlobalInt("max-workers")
+ }
+ logrus.Debugf("Setting maximum workers to %d", maxWorkers)
+
+ pss := shared.PBatch(outputContainers, maxWorkers, opts)
if opts.Sort != "" {
pss, err = sortPsOutput(opts.Sort, pss)
if err != nil {
diff --git a/cmd/podman/rm.go b/cmd/podman/rm.go
index c6641e879..0fb5345ee 100644
--- a/cmd/podman/rm.go
+++ b/cmd/podman/rm.go
@@ -2,11 +2,11 @@ package main
import (
"fmt"
- rt "runtime"
-
"github.com/containers/libpod/cmd/podman/libpodruntime"
+ "github.com/containers/libpod/cmd/podman/shared"
"github.com/containers/libpod/libpod"
"github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
@@ -48,14 +48,13 @@ func rmCmd(c *cli.Context) error {
var (
delContainers []*libpod.Container
lastError error
- deleteFuncs []workerInput
+ deleteFuncs []shared.ParallelWorkerInput
)
ctx := getContext()
if err := validateFlags(c, rmFlags); err != nil {
return err
}
-
runtime, err := libpodruntime.GetRuntime(c)
if err != nil {
return errors.Wrapf(err, "could not get runtime")
@@ -69,17 +68,23 @@ func rmCmd(c *cli.Context) error {
delContainers, lastError = getAllOrLatestContainers(c, runtime, -1, "all")
for _, container := range delContainers {
+ con := container
f := func() error {
- return runtime.RemoveContainer(ctx, container, c.Bool("force"))
+ return runtime.RemoveContainer(ctx, con, c.Bool("force"))
}
- deleteFuncs = append(deleteFuncs, workerInput{
- containerID: container.ID(),
- parallelFunc: f,
+ deleteFuncs = append(deleteFuncs, shared.ParallelWorkerInput{
+ ContainerID: con.ID(),
+ ParallelFunc: f,
})
}
+ maxWorkers := shared.Parallelize("rm")
+ if c.GlobalIsSet("max-workers") {
+ maxWorkers = c.GlobalInt("max-workers")
+ }
+ logrus.Debugf("Setting maximum workers to %d", maxWorkers)
- deleteErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, deleteFuncs)
+ deleteErrors := shared.ParallelExecuteWorkerPool(maxWorkers, deleteFuncs)
for cid, result := range deleteErrors {
if result != nil {
fmt.Println(result.Error())
diff --git a/cmd/podman/shared/container.go b/cmd/podman/shared/container.go
index 4af737e0a..b847314a4 100644
--- a/cmd/podman/shared/container.go
+++ b/cmd/podman/shared/container.go
@@ -226,10 +226,10 @@ func NewBatchContainer(ctr *libpod.Container, opts PsOptions) (PsContainerOutput
return pso, nil
}
-type pFunc func() (PsContainerOutput, error)
+type batchFunc func() (PsContainerOutput, error)
type workerInput struct {
- parallelFunc pFunc
+ parallelFunc batchFunc
opts PsOptions
cid string
job int
diff --git a/cmd/podman/shared/parallel.go b/cmd/podman/shared/parallel.go
new file mode 100644
index 000000000..03eba2f0b
--- /dev/null
+++ b/cmd/podman/shared/parallel.go
@@ -0,0 +1,91 @@
+package shared
+
+import (
+ "runtime"
+ "sync"
+)
+
+type pFunc func() error
+
+// ParallelWorkerInput is a struct used to pass in a slice of parallel funcs to be
+// performed on a container ID
+type ParallelWorkerInput struct {
+ ContainerID string
+ ParallelFunc pFunc
+}
+
+type containerError struct {
+ ContainerID string
+ Err error
+}
+
+// ParallelWorker is a "threaded" worker that takes jobs from the channel "queue"
+func ParallelWorker(wg *sync.WaitGroup, jobs <-chan ParallelWorkerInput, results chan<- containerError) {
+ for j := range jobs {
+ err := j.ParallelFunc()
+ results <- containerError{ContainerID: j.ContainerID, Err: err}
+ wg.Done()
+ }
+}
+
+// ParallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker
+// int determines how many workers/threads should be premade.
+func ParallelExecuteWorkerPool(workers int, functions []ParallelWorkerInput) map[string]error {
+ var (
+ wg sync.WaitGroup
+ )
+
+ resultChan := make(chan containerError, len(functions))
+ results := make(map[string]error)
+ paraJobs := make(chan ParallelWorkerInput, len(functions))
+
+ // If we have more workers than functions, match up the number of workers and functions
+ if workers > len(functions) {
+ workers = len(functions)
+ }
+
+ // Create the workers
+ for w := 1; w <= workers; w++ {
+ go ParallelWorker(&wg, paraJobs, resultChan)
+ }
+
+ // Add jobs to the workers
+ for _, j := range functions {
+ j := j
+ wg.Add(1)
+ paraJobs <- j
+ }
+
+ close(paraJobs)
+ wg.Wait()
+
+ close(resultChan)
+ for ctrError := range resultChan {
+ results[ctrError.ContainerID] = ctrError.Err
+ }
+
+ return results
+}
+
+// Parallelize provides the maximum number of parallel workers (int) as calculated by a basic
+// heuristic. This can be overriden by the --max-workers primary switch to podman.
+func Parallelize(job string) int {
+ numCpus := runtime.NumCPU()
+ switch job {
+ case "stop":
+ if numCpus <= 2 {
+ return 4
+ } else {
+ return numCpus * 3
+ }
+ case "rm":
+ if numCpus <= 3 {
+ return numCpus * 3
+ } else {
+ return numCpus * 4
+ }
+ case "ps":
+ return 8
+ }
+ return 3
+}
diff --git a/cmd/podman/stop.go b/cmd/podman/stop.go
index edadbda89..afeb49f76 100644
--- a/cmd/podman/stop.go
+++ b/cmd/podman/stop.go
@@ -2,12 +2,12 @@ package main
import (
"fmt"
- rt "runtime"
-
"github.com/containers/libpod/cmd/podman/libpodruntime"
+ "github.com/containers/libpod/cmd/podman/shared"
"github.com/containers/libpod/libpod"
"github.com/containers/libpod/pkg/rootless"
"github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
@@ -61,7 +61,7 @@ func stopCmd(c *cli.Context) error {
containers, lastError := getAllOrLatestContainers(c, runtime, libpod.ContainerStateRunning, "running")
- var stopFuncs []workerInput
+ var stopFuncs []shared.ParallelWorkerInput
for _, ctr := range containers {
con := ctr
var stopTimeout uint
@@ -73,13 +73,19 @@ func stopCmd(c *cli.Context) error {
f := func() error {
return con.StopWithTimeout(stopTimeout)
}
- stopFuncs = append(stopFuncs, workerInput{
- containerID: con.ID(),
- parallelFunc: f,
+ stopFuncs = append(stopFuncs, shared.ParallelWorkerInput{
+ ContainerID: con.ID(),
+ ParallelFunc: f,
})
}
- stopErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, stopFuncs)
+ maxWorkers := shared.Parallelize("stop")
+ if c.GlobalIsSet("max-workers") {
+ maxWorkers = c.GlobalInt("max-workers")
+ }
+ logrus.Debugf("Setting maximum workers to %d", maxWorkers)
+
+ stopErrors := shared.ParallelExecuteWorkerPool(maxWorkers, stopFuncs)
for cid, result := range stopErrors {
if result != nil && result != libpod.ErrCtrStopped {
diff --git a/cmd/podman/utils.go b/cmd/podman/utils.go
index f9971fd88..afeccb668 100644
--- a/cmd/podman/utils.go
+++ b/cmd/podman/utils.go
@@ -3,10 +3,6 @@ package main
import (
"context"
"fmt"
- "os"
- gosignal "os/signal"
- "sync"
-
"github.com/containers/libpod/libpod"
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/term"
@@ -15,6 +11,8 @@ import (
"github.com/urfave/cli"
"golang.org/x/crypto/ssh/terminal"
"k8s.io/client-go/tools/remotecommand"
+ "os"
+ gosignal "os/signal"
)
type RawTtyFormatter struct {
@@ -209,63 +207,3 @@ func getPodsFromContext(c *cli.Context, r *libpod.Runtime) ([]*libpod.Pod, error
}
return pods, lastError
}
-
-type pFunc func() error
-
-type workerInput struct {
- containerID string
- parallelFunc pFunc
-}
-
-type containerError struct {
- containerID string
- err error
-}
-
-// worker is a "threaded" worker that takes jobs from the channel "queue"
-func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results chan<- containerError) {
- for j := range jobs {
- err := j.parallelFunc()
- results <- containerError{containerID: j.containerID, err: err}
- wg.Done()
- }
-}
-
-// parallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker
-// int is determines how many workers/threads should be premade.
-func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]error {
- var (
- wg sync.WaitGroup
- )
-
- resultChan := make(chan containerError, len(functions))
- results := make(map[string]error)
- paraJobs := make(chan workerInput, len(functions))
-
- // If we have more workers than functions, match up the number of workers and functions
- if workers > len(functions) {
- workers = len(functions)
- }
-
- // Create the workers
- for w := 1; w <= workers; w++ {
- go worker(&wg, paraJobs, resultChan)
- }
-
- // Add jobs to the workers
- for _, j := range functions {
- j := j
- wg.Add(1)
- paraJobs <- j
- }
-
- close(paraJobs)
- wg.Wait()
-
- close(resultChan)
- for ctrError := range resultChan {
- results[ctrError.containerID] = ctrError.err
- }
-
- return results
-}