summaryrefslogtreecommitdiff
path: root/pkg/parallel/parallel.go
blob: 4da7e0f89a65e693e451ac7806cc2bc57100dd45 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package parallel

import (
	"context"
	"sync"

	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
	"golang.org/x/sync/semaphore"
)

var (
	// Maximum number of jobs that will be used.
	// Set a low, but non-zero, default. We'll be overriding it by default
	// anyways.
	numThreads uint = 8
	// Semaphore to control thread creation and ensure numThreads is
	// respected.
	jobControl *semaphore.Weighted
	// Lock to control changing the semaphore - we don't want to do it
	// while anyone is using it.
	jobControlLock sync.RWMutex
)

// SetMaxThreads sets the number of threads that will be used for parallel jobs.
func SetMaxThreads(threads uint) error {
	if threads == 0 {
		return errors.New("must give a non-zero number of threads to execute with")
	}

	jobControlLock.Lock()
	defer jobControlLock.Unlock()

	numThreads = threads
	jobControl = semaphore.NewWeighted(int64(threads))
	logrus.Infof("Setting parallel job count to %d", threads)

	return nil
}

// GetMaxThreads returns the current number of threads that will be used for
// parallel jobs.
func GetMaxThreads() uint {
	return numThreads
}

// Enqueue adds a single function to the parallel jobs queue. This function will
// be run when an unused thread is available.
// Returns a receive-only error channel that will return the error (if any) from
// the provided function fn when fn has finished executing. The channel will be
// closed after this.
func Enqueue(ctx context.Context, fn func() error) <-chan error {
	retChan := make(chan error)

	go func() {
		jobControlLock.RLock()
		defer jobControlLock.RUnlock()

		defer close(retChan)

		if err := jobControl.Acquire(ctx, 1); err != nil {
			retChan <- errors.Wrapf(err, "error acquiring job control semaphore")
			return
		}

		err := fn()

		jobControl.Release(1)

		retChan <- err
	}()

	return retChan
}