diff options
author | OpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com> | 2020-06-05 12:27:25 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-05 12:27:25 -0400 |
commit | c448c03269139dabf6f6fa7ff0fedb2291018b2a (patch) | |
tree | a0270fa5403d5e42639dd1b71ea752444131eaaa /pkg/parallel/parallel_linux.go | |
parent | c6da1a86cedfa04ea69886a4cb88ef1c570574b4 (diff) | |
parent | 89a1e7db39ed1015762d733379a4a5d443b1f4de (diff) | |
download | podman-c448c03269139dabf6f6fa7ff0fedb2291018b2a.tar.gz podman-c448c03269139dabf6f6fa7ff0fedb2291018b2a.tar.bz2 podman-c448c03269139dabf6f6fa7ff0fedb2291018b2a.zip |
Merge pull request #6495 from mheon/parallel_execution
Add parallel execution code for container operations
Diffstat (limited to 'pkg/parallel/parallel_linux.go')
-rw-r--r-- | pkg/parallel/parallel_linux.go | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/pkg/parallel/parallel_linux.go b/pkg/parallel/parallel_linux.go new file mode 100644 index 000000000..e3f086c0e --- /dev/null +++ b/pkg/parallel/parallel_linux.go @@ -0,0 +1,57 @@ +package parallel + +import ( + "context" + "sync" + + "github.com/containers/libpod/libpod" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// ParallelContainerOp performs the given function on the given set of +// containers, using a number of parallel threads. +// If no error is returned, each container specified in ctrs will have an entry +// in the resulting map; containers with no error will be set to nil. +func ParallelContainerOp(ctx context.Context, ctrs []*libpod.Container, applyFunc func(*libpod.Container) error) (map[*libpod.Container]error, error) { + jobControlLock.RLock() + defer jobControlLock.RUnlock() + + // We could use a sync.Map but given Go's lack of generic I'd rather + // just use a lock on a normal map... + // The expectation is that most of the time is spent in applyFunc + // anyways. + var ( + errMap map[*libpod.Container]error = make(map[*libpod.Container]error) + errLock sync.Mutex + allDone sync.WaitGroup + ) + + for _, ctr := range ctrs { + // Block until a thread is available + if err := jobControl.Acquire(ctx, 1); err != nil { + return nil, errors.Wrapf(err, "error acquiring job control semaphore") + } + + allDone.Add(1) + + c := ctr + go func() { + logrus.Debugf("Launching job on container %s", c.ID()) + + err := applyFunc(c) + errLock.Lock() + errMap[c] = err + errLock.Unlock() + + allDone.Done() + jobControl.Release(1) + }() + } + + allDone.Wait() + + return errMap, nil +} + +// TODO: Add an Enqueue() function that returns a promise |