diff options
Diffstat (limited to 'cmd/podman/shared')
-rw-r--r-- | cmd/podman/shared/container.go | 171 | ||||
-rw-r--r-- | cmd/podman/shared/workers.go | 133 |
2 files changed, 303 insertions, 1 deletions
diff --git a/cmd/podman/shared/container.go b/cmd/podman/shared/container.go index 6826191c5..7bef62355 100644 --- a/cmd/podman/shared/container.go +++ b/cmd/podman/shared/container.go @@ -44,7 +44,6 @@ type PsOptions struct { Quiet bool Size bool Sort string - Label string Namespace bool Sync bool } @@ -274,6 +273,176 @@ func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results chan<- PsContai } } +func generateContainerFilterFuncs(filter, filterValue string, r *libpod.Runtime) (func(container *libpod.Container) bool, error) { + switch filter { + case "id": + return func(c *libpod.Container) bool { + return strings.Contains(c.ID(), filterValue) + }, nil + case "label": + var filterArray []string = strings.SplitN(filterValue, "=", 2) + var filterKey string = filterArray[0] + if len(filterArray) > 1 { + filterValue = filterArray[1] + } else { + filterValue = "" + } + return func(c *libpod.Container) bool { + for labelKey, labelValue := range c.Labels() { + if labelKey == filterKey && ("" == filterValue || labelValue == filterValue) { + return true + } + } + return false + }, nil + case "name": + return func(c *libpod.Container) bool { + return strings.Contains(c.Name(), filterValue) + }, nil + case "exited": + exitCode, err := strconv.ParseInt(filterValue, 10, 32) + if err != nil { + return nil, errors.Wrapf(err, "exited code out of range %q", filterValue) + } + return func(c *libpod.Container) bool { + ec, exited, err := c.ExitCode() + if ec == int32(exitCode) && err == nil && exited == true { + return true + } + return false + }, nil + case "status": + if !util.StringInSlice(filterValue, []string{"created", "running", "paused", "stopped", "exited", "unknown"}) { + return nil, errors.Errorf("%s is not a valid status", filterValue) + } + return func(c *libpod.Container) bool { + status, err := c.State() + if err != nil { + return false + } + if filterValue == "stopped" { + filterValue = "exited" + } + state := status.String() + if status == libpod.ContainerStateConfigured { + state = "created" + } else if status == libpod.ContainerStateStopped { + state = "exited" + } + return state == filterValue + }, nil + case "ancestor": + // This needs to refine to match docker + // - ancestor=(<image-name>[:tag]|<image-id>| ⟨image@digest⟩) - containers created from an image or a descendant. + return func(c *libpod.Container) bool { + containerConfig := c.Config() + if strings.Contains(containerConfig.RootfsImageID, filterValue) || strings.Contains(containerConfig.RootfsImageName, filterValue) { + return true + } + return false + }, nil + case "before": + ctr, err := r.LookupContainer(filterValue) + if err != nil { + return nil, errors.Errorf("unable to find container by name or id of %s", filterValue) + } + containerConfig := ctr.Config() + createTime := containerConfig.CreatedTime + return func(c *libpod.Container) bool { + cc := c.Config() + return createTime.After(cc.CreatedTime) + }, nil + case "since": + ctr, err := r.LookupContainer(filterValue) + if err != nil { + return nil, errors.Errorf("unable to find container by name or id of %s", filterValue) + } + containerConfig := ctr.Config() + createTime := containerConfig.CreatedTime + return func(c *libpod.Container) bool { + cc := c.Config() + return createTime.Before(cc.CreatedTime) + }, nil + case "volume": + //- volume=(<volume-name>|<mount-point-destination>) + return func(c *libpod.Container) bool { + containerConfig := c.Config() + var dest string + arr := strings.Split(filterValue, ":") + source := arr[0] + if len(arr) == 2 { + dest = arr[1] + } + for _, mount := range containerConfig.Spec.Mounts { + if dest != "" && (mount.Source == source && mount.Destination == dest) { + return true + } + if dest == "" && mount.Source == source { + return true + } + } + return false + }, nil + case "health": + return func(c *libpod.Container) bool { + hcStatus, err := c.HealthCheckStatus() + if err != nil { + return false + } + return hcStatus == filterValue + }, nil + } + return nil, errors.Errorf("%s is an invalid filter", filter) +} + +// GetPsContainerOutput returns a slice of containers specifically for ps output +func GetPsContainerOutput(r *libpod.Runtime, opts PsOptions, filters []string, maxWorkers int) ([]PsContainerOutput, error) { + var ( + filterFuncs []libpod.ContainerFilter + outputContainers []*libpod.Container + ) + + if len(filters) > 0 { + for _, f := range filters { + filterSplit := strings.SplitN(f, "=", 2) + if len(filterSplit) < 2 { + return nil, errors.Errorf("filter input must be in the form of filter=value: %s is invalid", f) + } + generatedFunc, err := generateContainerFilterFuncs(filterSplit[0], filterSplit[1], r) + if err != nil { + return nil, errors.Wrapf(err, "invalid filter") + } + filterFuncs = append(filterFuncs, generatedFunc) + } + } + if !opts.Latest { + // Get all containers + containers, err := r.GetContainers(filterFuncs...) + if err != nil { + return nil, err + } + + // We only want the last few containers + if opts.Last > 0 && opts.Last <= len(containers) { + return nil, errors.Errorf("--last not yet supported") + } else { + outputContainers = containers + } + } else { + // Get just the latest container + // Ignore filters + latestCtr, err := r.GetLatestContainer() + if err != nil { + return nil, err + } + + outputContainers = []*libpod.Container{latestCtr} + } + + pss := PBatch(outputContainers, maxWorkers, opts) + return pss, nil +} + // PBatch is performs batch operations on a container in parallel. It spawns the number of workers // relative to the the number of parallel operations desired. func PBatch(containers []*libpod.Container, workers int, opts PsOptions) []PsContainerOutput { diff --git a/cmd/podman/shared/workers.go b/cmd/podman/shared/workers.go new file mode 100644 index 000000000..112af89cc --- /dev/null +++ b/cmd/podman/shared/workers.go @@ -0,0 +1,133 @@ +package shared + +import ( + "reflect" + "runtime" + "strings" + "sync" + + "github.com/sirupsen/logrus" +) + +// JobFunc provides the function signature for the pool'ed functions +type JobFunc func() error + +// Job defines the function to run +type Job struct { + ID string + Fn JobFunc +} + +// JobResult defines the results from the function ran +type JobResult struct { + Job Job + Err error +} + +// Pool defines the worker pool and queues +type Pool struct { + id string + wg *sync.WaitGroup + jobs chan Job + results chan JobResult + size int + capacity int +} + +// NewPool creates and initializes a new Pool +func NewPool(id string, size int, capacity int) *Pool { + var wg sync.WaitGroup + + // min for int... + s := size + if s > capacity { + s = capacity + } + + return &Pool{ + id, + &wg, + make(chan Job, capacity), + make(chan JobResult, capacity), + s, + capacity, + } +} + +// Add Job to pool for parallel processing +func (p *Pool) Add(job Job) { + p.wg.Add(1) + p.jobs <- job +} + +// Run the Job's in the pool, gather and return results +func (p *Pool) Run() ([]string, map[string]error, error) { + var ( + ok = []string{} + failures = map[string]error{} + ) + + for w := 0; w < p.size; w++ { + w := w + go p.newWorker(w) + } + close(p.jobs) + p.wg.Wait() + + close(p.results) + for r := range p.results { + if r.Err == nil { + ok = append(ok, r.Job.ID) + } else { + failures[r.Job.ID] = r.Err + } + } + + if logrus.GetLevel() == logrus.DebugLevel { + for i, f := range failures { + logrus.Debugf("Pool[%s, %s: %s]", p.id, i, f.Error()) + } + } + + return ok, failures, nil +} + +// newWorker creates new parallel workers to monitor jobs channel from Pool +func (p *Pool) newWorker(slot int) { + for job := range p.jobs { + err := job.Fn() + p.results <- JobResult{job, err} + if logrus.GetLevel() == logrus.DebugLevel { + n := strings.Split(runtime.FuncForPC(reflect.ValueOf(job.Fn).Pointer()).Name(), ".") + logrus.Debugf("Worker#%d finished job %s/%s (%v)", slot, n[2:], job.ID, err) + } + p.wg.Done() + } +} + +// DefaultPoolSize provides the maximum number of parallel workers (int) as calculated by a basic +// heuristic. This can be overriden by the --max-workers primary switch to podman. +func DefaultPoolSize(name string) int { + numCpus := runtime.NumCPU() + switch name { + case "kill": + case "pause": + case "rm": + case "unpause": + if numCpus <= 3 { + return numCpus * 3 + } + return numCpus * 4 + case "ps": + return 8 + case "restart": + return numCpus * 2 + case "stop": + if numCpus <= 2 { + return 4 + } else { + return numCpus * 3 + } + } + return 3 +} |