1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
package shared
import (
"runtime"
"sync"
)
type pFunc func() error
// ParallelWorkerInput is a struct used to pass in a slice of parallel funcs to be
// performed on a container ID
type ParallelWorkerInput struct {
ContainerID string
ParallelFunc pFunc
}
type containerError struct {
ContainerID string
Err error
}
// ParallelWorker is a "threaded" worker that takes jobs from the channel "queue"
func ParallelWorker(wg *sync.WaitGroup, jobs <-chan ParallelWorkerInput, results chan<- containerError) {
for j := range jobs {
err := j.ParallelFunc()
results <- containerError{ContainerID: j.ContainerID, Err: err}
wg.Done()
}
}
// ParallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker
// int determines how many workers/threads should be premade.
func ParallelExecuteWorkerPool(workers int, functions []ParallelWorkerInput) (map[string]error, int) {
var (
wg sync.WaitGroup
errorCount int
)
resultChan := make(chan containerError, len(functions))
results := make(map[string]error)
paraJobs := make(chan ParallelWorkerInput, len(functions))
// If we have more workers than functions, match up the number of workers and functions
if workers > len(functions) {
workers = len(functions)
}
// Create the workers
for w := 1; w <= workers; w++ {
go ParallelWorker(&wg, paraJobs, resultChan)
}
// Add jobs to the workers
for _, j := range functions {
j := j
wg.Add(1)
paraJobs <- j
}
close(paraJobs)
wg.Wait()
close(resultChan)
for ctrError := range resultChan {
results[ctrError.ContainerID] = ctrError.Err
if ctrError.Err != nil {
errorCount += 1
}
}
return results, errorCount
}
// Parallelize provides the maximum number of parallel workers (int) as calculated by a basic
// heuristic. This can be overriden by the --max-workers primary switch to podman.
func Parallelize(job string) int {
numCpus := runtime.NumCPU()
switch job {
case "kill":
if numCpus <= 3 {
return numCpus * 3
}
return numCpus * 4
case "pause":
if numCpus <= 3 {
return numCpus * 3
}
return numCpus * 4
case "ps":
return 8
case "restart":
return numCpus * 2
case "rm":
if numCpus <= 3 {
return numCpus * 3
} else {
return numCpus * 4
}
case "stop":
if numCpus <= 2 {
return 4
} else {
return numCpus * 3
}
case "unpause":
if numCpus <= 3 {
return numCpus * 3
}
return numCpus * 4
}
return 3
}
|