1 package manager
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "sync/atomic"
8 "time"
9
10 . "github.com/onsi/ginkgo/v2"
11 . "github.com/onsi/gomega"
12 "k8s.io/utils/ptr"
13
14 "sigs.k8s.io/controller-runtime/pkg/cache/informertest"
15 "sigs.k8s.io/controller-runtime/pkg/webhook"
16 )
17
18 var _ = Describe("runnables", func() {
19 errCh := make(chan error)
20
21 It("should be able to create a new runnables object", func() {
22 Expect(newRunnables(defaultBaseContext, errCh)).ToNot(BeNil())
23 })
24
25 It("should add HTTP servers to the appropriate group", func() {
26 server := &Server{}
27 r := newRunnables(defaultBaseContext, errCh)
28 Expect(r.Add(server)).To(Succeed())
29 Expect(r.HTTPServers.startQueue).To(HaveLen(1))
30 })
31
32 It("should add caches to the appropriate group", func() {
33 cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}
34 r := newRunnables(defaultBaseContext, errCh)
35 Expect(r.Add(cache)).To(Succeed())
36 Expect(r.Caches.startQueue).To(HaveLen(1))
37 })
38
39 It("should add webhooks to the appropriate group", func() {
40 webhook := webhook.NewServer(webhook.Options{})
41 r := newRunnables(defaultBaseContext, errCh)
42 Expect(r.Add(webhook)).To(Succeed())
43 Expect(r.Webhooks.startQueue).To(HaveLen(1))
44 })
45
46 It("should add any runnable to the leader election group", func() {
47 err := errors.New("runnable func")
48 runnable := RunnableFunc(func(c context.Context) error {
49 return err
50 })
51
52 r := newRunnables(defaultBaseContext, errCh)
53 Expect(r.Add(runnable)).To(Succeed())
54 Expect(r.LeaderElection.startQueue).To(HaveLen(1))
55 })
56 })
57
58 var _ = Describe("runnableGroup", func() {
59 errCh := make(chan error)
60
61 It("should be able to add new runnables before it starts", func() {
62 ctx, cancel := context.WithCancel(context.Background())
63 defer cancel()
64 rg := newRunnableGroup(defaultBaseContext, errCh)
65 Expect(rg.Add(RunnableFunc(func(c context.Context) error {
66 <-ctx.Done()
67 return nil
68 }), nil)).To(Succeed())
69
70 Expect(rg.Started()).To(BeFalse())
71 })
72
73 It("should be able to add new runnables before and after start", func() {
74 ctx, cancel := context.WithCancel(context.Background())
75 defer cancel()
76 rg := newRunnableGroup(defaultBaseContext, errCh)
77 Expect(rg.Add(RunnableFunc(func(c context.Context) error {
78 <-ctx.Done()
79 return nil
80 }), nil)).To(Succeed())
81 Expect(rg.Start(ctx)).To(Succeed())
82 Expect(rg.Started()).To(BeTrue())
83 Expect(rg.Add(RunnableFunc(func(c context.Context) error {
84 <-ctx.Done()
85 return nil
86 }), nil)).To(Succeed())
87 })
88
89 It("should be able to add new runnables before and after start concurrently", func() {
90 ctx, cancel := context.WithCancel(context.Background())
91 defer cancel()
92 rg := newRunnableGroup(defaultBaseContext, errCh)
93
94 go func() {
95 defer GinkgoRecover()
96 <-time.After(50 * time.Millisecond)
97 Expect(rg.Start(ctx)).To(Succeed())
98 }()
99
100 for i := 0; i < 20; i++ {
101 go func(i int) {
102 defer GinkgoRecover()
103
104 <-time.After(time.Duration(i) * 10 * time.Millisecond)
105 Expect(rg.Add(RunnableFunc(func(c context.Context) error {
106 <-ctx.Done()
107 return nil
108 }), nil)).To(Succeed())
109 }(i)
110 }
111 })
112
113 It("should be able to close the group and wait for all runnables to finish", func() {
114 ctx, cancel := context.WithCancel(context.Background())
115
116 exited := ptr.To(int64(0))
117 rg := newRunnableGroup(defaultBaseContext, errCh)
118 for i := 0; i < 10; i++ {
119 Expect(rg.Add(RunnableFunc(func(c context.Context) error {
120 defer atomic.AddInt64(exited, 1)
121 <-ctx.Done()
122 <-time.After(time.Duration(i) * 10 * time.Millisecond)
123 return nil
124 }), nil)).To(Succeed())
125 }
126 Expect(rg.Start(ctx)).To(Succeed())
127
128
129 cancel()
130 rg.StopAndWait(context.Background())
131
132 Expect(rg.Add(RunnableFunc(func(c context.Context) error {
133 return nil
134 }), nil)).ToNot(Succeed())
135
136 Expect(atomic.LoadInt64(exited)).To(BeNumerically("==", 10))
137 })
138
139 It("should be able to wait for all runnables to be ready at different intervals", func() {
140 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
141 defer cancel()
142 rg := newRunnableGroup(defaultBaseContext, errCh)
143
144 go func() {
145 defer GinkgoRecover()
146 <-time.After(50 * time.Millisecond)
147 Expect(rg.Start(ctx)).To(Succeed())
148 }()
149
150 for i := 0; i < 20; i++ {
151 go func(i int) {
152 defer GinkgoRecover()
153
154 Expect(rg.Add(RunnableFunc(func(c context.Context) error {
155 <-ctx.Done()
156 return nil
157 }), func(_ context.Context) bool {
158 <-time.After(time.Duration(i) * 10 * time.Millisecond)
159 return true
160 })).To(Succeed())
161 }(i)
162 }
163 })
164
165 It("should be able to handle adding runnables while stopping", func() {
166 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
167 defer cancel()
168 rg := newRunnableGroup(defaultBaseContext, errCh)
169
170 go func() {
171 defer GinkgoRecover()
172 <-time.After(1 * time.Millisecond)
173 Expect(rg.Start(ctx)).To(Succeed())
174 }()
175 go func() {
176 defer GinkgoRecover()
177 <-time.After(1 * time.Millisecond)
178 ctx, cancel := context.WithCancel(context.Background())
179 cancel()
180 rg.StopAndWait(ctx)
181 }()
182
183 for i := 0; i < 200; i++ {
184 go func(i int) {
185 defer GinkgoRecover()
186
187 <-time.After(time.Duration(i) * time.Microsecond)
188 Expect(rg.Add(RunnableFunc(func(c context.Context) error {
189 <-ctx.Done()
190 return nil
191 }), func(_ context.Context) bool {
192 return true
193 })).To(SatisfyAny(
194 Succeed(),
195 Equal(errRunnableGroupStopped),
196 ))
197 }(i)
198 }
199 })
200
201 It("should not turn ready if some readiness check fail", func() {
202 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
203 defer cancel()
204 rg := newRunnableGroup(defaultBaseContext, errCh)
205
206 go func() {
207 defer GinkgoRecover()
208 <-time.After(50 * time.Millisecond)
209 Expect(rg.Start(ctx)).To(Succeed())
210 }()
211
212 for i := 0; i < 20; i++ {
213 go func(i int) {
214 defer GinkgoRecover()
215
216 Expect(rg.Add(RunnableFunc(func(c context.Context) error {
217 <-ctx.Done()
218 return nil
219 }), func(_ context.Context) bool {
220 <-time.After(time.Duration(i) * 10 * time.Millisecond)
221 return i%2 == 0
222 })).To(Succeed())
223 }(i)
224 }
225 })
226 })
227
View as plain text