diff options
-rw-r--r-- | cmd/podman/main.go | 5 | ||||
-rw-r--r-- | cmd/podman/ps.go | 9 | ||||
-rw-r--r-- | cmd/podman/rm.go | 23 | ||||
-rw-r--r-- | cmd/podman/shared/container.go | 4 | ||||
-rw-r--r-- | cmd/podman/shared/parallel.go | 91 | ||||
-rw-r--r-- | cmd/podman/stop.go | 20 | ||||
-rw-r--r-- | cmd/podman/utils.go | 66 |
7 files changed, 135 insertions, 83 deletions
diff --git a/cmd/podman/main.go b/cmd/podman/main.go index d4c8454a8..38eac4504 100644 --- a/cmd/podman/main.go +++ b/cmd/podman/main.go @@ -211,6 +211,11 @@ func main() { Value: hooks.DefaultDir, Hidden: true, }, + cli.IntFlag{ + Name: "max-workers", + Usage: "the maximum number of workers for parallel operations", + Hidden: true, + }, cli.StringFlag{ Name: "log-level", Usage: "log messages above specified level: debug, info, warn, error (default), fatal or panic", diff --git a/cmd/podman/ps.go b/cmd/podman/ps.go index a468f6121..fa333f952 100644 --- a/cmd/podman/ps.go +++ b/cmd/podman/ps.go @@ -20,6 +20,7 @@ import ( "github.com/cri-o/ocicni/pkg/ocicni" "github.com/docker/go-units" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "github.com/urfave/cli" "k8s.io/apimachinery/pkg/fields" ) @@ -300,7 +301,13 @@ func psCmd(c *cli.Context) error { outputContainers = []*libpod.Container{latestCtr} } - pss := shared.PBatch(outputContainers, 8, opts) + maxWorkers := shared.Parallelize("ps") + if c.GlobalIsSet("max-workers") { + maxWorkers = c.GlobalInt("max-workers") + } + logrus.Debugf("Setting maximum workers to %d", maxWorkers) + + pss := shared.PBatch(outputContainers, maxWorkers, opts) if opts.Sort != "" { pss, err = sortPsOutput(opts.Sort, pss) if err != nil { diff --git a/cmd/podman/rm.go b/cmd/podman/rm.go index c6641e879..0fb5345ee 100644 --- a/cmd/podman/rm.go +++ b/cmd/podman/rm.go @@ -2,11 +2,11 @@ package main import ( "fmt" - rt "runtime" - "github.com/containers/libpod/cmd/podman/libpodruntime" + "github.com/containers/libpod/cmd/podman/shared" "github.com/containers/libpod/libpod" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "github.com/urfave/cli" ) @@ -48,14 +48,13 @@ func rmCmd(c *cli.Context) error { var ( delContainers []*libpod.Container lastError error - deleteFuncs []workerInput + deleteFuncs []shared.ParallelWorkerInput ) ctx := getContext() if err := validateFlags(c, rmFlags); err != nil { return err } - runtime, err := libpodruntime.GetRuntime(c) if err != nil { return errors.Wrapf(err, "could not get runtime") @@ -69,17 +68,23 @@ func rmCmd(c *cli.Context) error { delContainers, lastError = getAllOrLatestContainers(c, runtime, -1, "all") for _, container := range delContainers { + con := container f := func() error { - return runtime.RemoveContainer(ctx, container, c.Bool("force")) + return runtime.RemoveContainer(ctx, con, c.Bool("force")) } - deleteFuncs = append(deleteFuncs, workerInput{ - containerID: container.ID(), - parallelFunc: f, + deleteFuncs = append(deleteFuncs, shared.ParallelWorkerInput{ + ContainerID: con.ID(), + ParallelFunc: f, }) } + maxWorkers := shared.Parallelize("rm") + if c.GlobalIsSet("max-workers") { + maxWorkers = c.GlobalInt("max-workers") + } + logrus.Debugf("Setting maximum workers to %d", maxWorkers) - deleteErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, deleteFuncs) + deleteErrors := shared.ParallelExecuteWorkerPool(maxWorkers, deleteFuncs) for cid, result := range deleteErrors { if result != nil { fmt.Println(result.Error()) diff --git a/cmd/podman/shared/container.go b/cmd/podman/shared/container.go index 4af737e0a..b847314a4 100644 --- a/cmd/podman/shared/container.go +++ b/cmd/podman/shared/container.go @@ -226,10 +226,10 @@ func NewBatchContainer(ctr *libpod.Container, opts PsOptions) (PsContainerOutput return pso, nil } -type pFunc func() (PsContainerOutput, error) +type batchFunc func() (PsContainerOutput, error) type workerInput struct { - parallelFunc pFunc + parallelFunc batchFunc opts PsOptions cid string job int diff --git a/cmd/podman/shared/parallel.go b/cmd/podman/shared/parallel.go new file mode 100644 index 000000000..03eba2f0b --- /dev/null +++ b/cmd/podman/shared/parallel.go @@ -0,0 +1,91 @@ +package shared + +import ( + "runtime" + "sync" +) + +type pFunc func() error + +// ParallelWorkerInput is a struct used to pass in a slice of parallel funcs to be +// performed on a container ID +type ParallelWorkerInput struct { + ContainerID string + ParallelFunc pFunc +} + +type containerError struct { + ContainerID string + Err error +} + +// ParallelWorker is a "threaded" worker that takes jobs from the channel "queue" +func ParallelWorker(wg *sync.WaitGroup, jobs <-chan ParallelWorkerInput, results chan<- containerError) { + for j := range jobs { + err := j.ParallelFunc() + results <- containerError{ContainerID: j.ContainerID, Err: err} + wg.Done() + } +} + +// ParallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker +// int determines how many workers/threads should be premade. +func ParallelExecuteWorkerPool(workers int, functions []ParallelWorkerInput) map[string]error { + var ( + wg sync.WaitGroup + ) + + resultChan := make(chan containerError, len(functions)) + results := make(map[string]error) + paraJobs := make(chan ParallelWorkerInput, len(functions)) + + // If we have more workers than functions, match up the number of workers and functions + if workers > len(functions) { + workers = len(functions) + } + + // Create the workers + for w := 1; w <= workers; w++ { + go ParallelWorker(&wg, paraJobs, resultChan) + } + + // Add jobs to the workers + for _, j := range functions { + j := j + wg.Add(1) + paraJobs <- j + } + + close(paraJobs) + wg.Wait() + + close(resultChan) + for ctrError := range resultChan { + results[ctrError.ContainerID] = ctrError.Err + } + + return results +} + +// Parallelize provides the maximum number of parallel workers (int) as calculated by a basic +// heuristic. This can be overriden by the --max-workers primary switch to podman. +func Parallelize(job string) int { + numCpus := runtime.NumCPU() + switch job { + case "stop": + if numCpus <= 2 { + return 4 + } else { + return numCpus * 3 + } + case "rm": + if numCpus <= 3 { + return numCpus * 3 + } else { + return numCpus * 4 + } + case "ps": + return 8 + } + return 3 +} diff --git a/cmd/podman/stop.go b/cmd/podman/stop.go index edadbda89..afeb49f76 100644 --- a/cmd/podman/stop.go +++ b/cmd/podman/stop.go @@ -2,12 +2,12 @@ package main import ( "fmt" - rt "runtime" - "github.com/containers/libpod/cmd/podman/libpodruntime" + "github.com/containers/libpod/cmd/podman/shared" "github.com/containers/libpod/libpod" "github.com/containers/libpod/pkg/rootless" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "github.com/urfave/cli" ) @@ -61,7 +61,7 @@ func stopCmd(c *cli.Context) error { containers, lastError := getAllOrLatestContainers(c, runtime, libpod.ContainerStateRunning, "running") - var stopFuncs []workerInput + var stopFuncs []shared.ParallelWorkerInput for _, ctr := range containers { con := ctr var stopTimeout uint @@ -73,13 +73,19 @@ func stopCmd(c *cli.Context) error { f := func() error { return con.StopWithTimeout(stopTimeout) } - stopFuncs = append(stopFuncs, workerInput{ - containerID: con.ID(), - parallelFunc: f, + stopFuncs = append(stopFuncs, shared.ParallelWorkerInput{ + ContainerID: con.ID(), + ParallelFunc: f, }) } - stopErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, stopFuncs) + maxWorkers := shared.Parallelize("stop") + if c.GlobalIsSet("max-workers") { + maxWorkers = c.GlobalInt("max-workers") + } + logrus.Debugf("Setting maximum workers to %d", maxWorkers) + + stopErrors := shared.ParallelExecuteWorkerPool(maxWorkers, stopFuncs) for cid, result := range stopErrors { if result != nil && result != libpod.ErrCtrStopped { diff --git a/cmd/podman/utils.go b/cmd/podman/utils.go index f9971fd88..afeccb668 100644 --- a/cmd/podman/utils.go +++ b/cmd/podman/utils.go @@ -3,10 +3,6 @@ package main import ( "context" "fmt" - "os" - gosignal "os/signal" - "sync" - "github.com/containers/libpod/libpod" "github.com/docker/docker/pkg/signal" "github.com/docker/docker/pkg/term" @@ -15,6 +11,8 @@ import ( "github.com/urfave/cli" "golang.org/x/crypto/ssh/terminal" "k8s.io/client-go/tools/remotecommand" + "os" + gosignal "os/signal" ) type RawTtyFormatter struct { @@ -209,63 +207,3 @@ func getPodsFromContext(c *cli.Context, r *libpod.Runtime) ([]*libpod.Pod, error } return pods, lastError } - -type pFunc func() error - -type workerInput struct { - containerID string - parallelFunc pFunc -} - -type containerError struct { - containerID string - err error -} - -// worker is a "threaded" worker that takes jobs from the channel "queue" -func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results chan<- containerError) { - for j := range jobs { - err := j.parallelFunc() - results <- containerError{containerID: j.containerID, err: err} - wg.Done() - } -} - -// parallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker -// int is determines how many workers/threads should be premade. -func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]error { - var ( - wg sync.WaitGroup - ) - - resultChan := make(chan containerError, len(functions)) - results := make(map[string]error) - paraJobs := make(chan workerInput, len(functions)) - - // If we have more workers than functions, match up the number of workers and functions - if workers > len(functions) { - workers = len(functions) - } - - // Create the workers - for w := 1; w <= workers; w++ { - go worker(&wg, paraJobs, resultChan) - } - - // Add jobs to the workers - for _, j := range functions { - j := j - wg.Add(1) - paraJobs <- j - } - - close(paraJobs) - wg.Wait() - - close(resultChan) - for ctrError := range resultChan { - results[ctrError.containerID] = ctrError.err - } - - return results -} |