diff options
author | Matthew Heon <matthew.heon@pm.me> | 2020-06-04 14:28:01 -0400 |
---|---|---|
committer | Matthew Heon <matthew.heon@pm.me> | 2020-06-05 11:31:05 -0400 |
commit | 89a1e7db39ed1015762d733379a4a5d443b1f4de (patch) | |
tree | 1b99dd8c457c10b96110d86c091a71d13ca3df1d /pkg/parallel/parallel_linux.go | |
parent | bf8337b3fc488d3fc449a7622ae7744a67b9f348 (diff) | |
download | podman-89a1e7db39ed1015762d733379a4a5d443b1f4de.tar.gz podman-89a1e7db39ed1015762d733379a4a5d443b1f4de.tar.bz2 podman-89a1e7db39ed1015762d733379a4a5d443b1f4de.zip |
Add parallel execution code for container operations
This code will run container operations in parallel, up to a
given maximum number of threads. Currently, it has only been
enabled for local `podman rm` as a proof of concept.
Signed-off-by: Matthew Heon <matthew.heon@pm.me>
Diffstat (limited to 'pkg/parallel/parallel_linux.go')
-rw-r--r-- | pkg/parallel/parallel_linux.go | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/pkg/parallel/parallel_linux.go b/pkg/parallel/parallel_linux.go new file mode 100644 index 000000000..e3f086c0e --- /dev/null +++ b/pkg/parallel/parallel_linux.go @@ -0,0 +1,57 @@ +package parallel + +import ( + "context" + "sync" + + "github.com/containers/libpod/libpod" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// ParallelContainerOp performs the given function on the given set of +// containers, using a number of parallel threads. +// If no error is returned, each container specified in ctrs will have an entry +// in the resulting map; containers with no error will be set to nil. +func ParallelContainerOp(ctx context.Context, ctrs []*libpod.Container, applyFunc func(*libpod.Container) error) (map[*libpod.Container]error, error) { + jobControlLock.RLock() + defer jobControlLock.RUnlock() + + // We could use a sync.Map but given Go's lack of generic I'd rather + // just use a lock on a normal map... + // The expectation is that most of the time is spent in applyFunc + // anyways. + var ( + errMap map[*libpod.Container]error = make(map[*libpod.Container]error) + errLock sync.Mutex + allDone sync.WaitGroup + ) + + for _, ctr := range ctrs { + // Block until a thread is available + if err := jobControl.Acquire(ctx, 1); err != nil { + return nil, errors.Wrapf(err, "error acquiring job control semaphore") + } + + allDone.Add(1) + + c := ctr + go func() { + logrus.Debugf("Launching job on container %s", c.ID()) + + err := applyFunc(c) + errLock.Lock() + errMap[c] = err + errLock.Unlock() + + allDone.Done() + jobControl.Release(1) + }() + } + + allDone.Wait() + + return errMap, nil +} + +// TODO: Add an Enqueue() function that returns a promise |