...
1
16
17 package workqueue
18
19 import (
20 "context"
21 "sync"
22
23 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
24 )
25
26 type DoWorkPieceFunc func(piece int)
27
28 type options struct {
29 chunkSize int
30 }
31
32 type Options func(*options)
33
34
35
36
37
38 func WithChunkSize(c int) func(*options) {
39 return func(o *options) {
40 o.chunkSize = c
41 }
42 }
43
44
45
46 func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) {
47 if pieces == 0 {
48 return
49 }
50 o := options{}
51 for _, opt := range opts {
52 opt(&o)
53 }
54 chunkSize := o.chunkSize
55 if chunkSize < 1 {
56 chunkSize = 1
57 }
58
59 chunks := ceilDiv(pieces, chunkSize)
60 toProcess := make(chan int, chunks)
61 for i := 0; i < chunks; i++ {
62 toProcess <- i
63 }
64 close(toProcess)
65
66 var stop <-chan struct{}
67 if ctx != nil {
68 stop = ctx.Done()
69 }
70 if chunks < workers {
71 workers = chunks
72 }
73 wg := sync.WaitGroup{}
74 wg.Add(workers)
75 for i := 0; i < workers; i++ {
76 go func() {
77 defer utilruntime.HandleCrash()
78 defer wg.Done()
79 for chunk := range toProcess {
80 start := chunk * chunkSize
81 end := start + chunkSize
82 if end > pieces {
83 end = pieces
84 }
85 for p := start; p < end; p++ {
86 select {
87 case <-stop:
88 return
89 default:
90 doWorkPiece(p)
91 }
92 }
93 }
94 }()
95 }
96 wg.Wait()
97 }
98
99 func ceilDiv(a, b int) int {
100 return (a + b - 1) / b
101 }
102
View as plain text