1 package ktest
2
3 import (
4 "context"
5 "errors"
6 "flag"
7 "fmt"
8 "io"
9 "log"
10 "net"
11 "strings"
12 "testing"
13 "time"
14
15 corev1 "k8s.io/api/core/v1"
16 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17 "k8s.io/client-go/kubernetes"
18 "sigs.k8s.io/controller-runtime/pkg/client"
19 "sigs.k8s.io/controller-runtime/pkg/manager"
20
21 "edge-infra.dev/pkg/k8s/runtime/controller"
22 "edge-infra.dev/pkg/k8s/runtime/sap"
23 "edge-infra.dev/test/f2"
24 "edge-infra.dev/test/f2/fctx"
25 "edge-infra.dev/test/f2/x/ktest/envtest"
26 "edge-infra.dev/test/f2/x/ktest/kpoll"
27 )
28
29 const (
30 MaxNamespaceLen = 63
31 Timeout = time.Second * 30
32 Tick = time.Second * 1
33 startErrTimeout = time.Minute * 5
34 )
35
36 var startErrChan chan error
37
38
39
40
41
42
43
44
45
46
47
48 type K8s struct {
49 Env *envtest.Environment
50 Namespace string
51 Client client.Client
52 Manager manager.Manager
53 *kpoll.KPoll
54
55 Timeout time.Duration
56 Tick time.Duration
57
58 options *options
59 mgrCancel context.CancelFunc
60 }
61
62
63
64 func FromContext(ctx fctx.Context) (*K8s, error) {
65 v := fctx.ValueFrom[K8s](ctx)
66 if v == nil {
67 return nil, fmt.Errorf("%w: ktest.K8s extension", fctx.ErrNotFound)
68 }
69 return v, nil
70 }
71
72
73
74 func FromContextT(ctx fctx.Context, t *testing.T) *K8s {
75 return fctx.ValueFromT[K8s](ctx, t)
76 }
77
78
79 func New(opts ...Option) *K8s {
80 o := makeOptions(opts...)
81 k8s := &K8s{
82 options: o,
83 }
84 return k8s
85 }
86
87
88 func (k *K8s) RegisterFns(f f2.Framework) {
89
90 f.Setup(k.setupEnvtest)
91
92
93 if k.options.mgrCreator != nil {
94 f.Setup(k.startMgr)
95 f.Teardown(k.stopMgr)
96 }
97
98 f.Setup(func(ctx fctx.Context) (fctx.Context, error) {
99 return ctx, k.setClient()
100 })
101
102
103
104
105 f.BeforeEachTest(func(ctx fctx.Context, t *testing.T) (fctx.Context, error) {
106 if err := k.setClient(); err != nil {
107 return ctx, err
108 }
109
110 k.KPoll = kpoll.New(ctx, k.Client, k.Timeout, k.Tick)
111
112 if !k.options.skipNamespaceCreation {
113 name := strings.ToLower(strings.ReplaceAll(t.Name(), "_", "-"))
114 k.Namespace = name + "-" + ctx.RunID
115
116
117 if len(k.Namespace) > MaxNamespaceLen {
118 t.Log("proposed namespace was too long", k.Namespace)
119 k.Namespace = name[:len(name)-(len(k.Namespace)-MaxNamespaceLen)] + "-" + ctx.RunID
120 }
121
122 t.Log("creating namespace", k.Namespace)
123
124 return ctx, k.Client.Create(ctx, &corev1.Namespace{
125 ObjectMeta: metav1.ObjectMeta{
126 Name: k.Namespace,
127 Labels: map[string]string{
128 "ktest": t.Name(),
129 },
130 },
131 })
132 }
133
134 return ctx, nil
135 })
136
137 f.AfterEachTest(func(ctx fctx.Context, t *testing.T) (fctx.Context, error) {
138 if err := k.setClient(); err != nil {
139 return ctx, err
140 }
141
142 k.KPoll = kpoll.New(ctx, k.Client, k.Timeout, k.Tick)
143
144
145 if k.options.skipNamespaceDeletion || k.options.skipNamespaceCreation {
146 t.Log("skipping namespace deletion")
147 return ctx, nil
148 }
149
150 t.Log("deleting namespace", k.Namespace)
151
152 return ctx, k.Client.Delete(ctx, &corev1.Namespace{
153 ObjectMeta: metav1.ObjectMeta{
154 Name: k.Namespace,
155 Labels: map[string]string{
156 "ktest": t.Name(),
157 },
158 },
159 })
160 })
161
162
163 f.Teardown(k.teardownEnvtest)
164 }
165
166
167 func (k *K8s) BindFlags(fs *flag.FlagSet) {
168 envtest.BindFlags(fs)
169 fs.DurationVar(&k.Timeout,
170 "ktest-default-timeout",
171 Timeout,
172 "default timeout for K8s operations",
173 )
174 fs.DurationVar(&k.Tick,
175 "ktest-tick",
176 Tick,
177 "interval checks are evaluated at during tests",
178 )
179
180 }
181
182 func (k *K8s) Labels() map[string]string {
183 l := map[string]string{
184 "k8s": "true",
185 }
186
187 return l
188 }
189
190
191 func (k *K8s) IntoContext(ctx fctx.Context) fctx.Context {
192 return fctx.ValueInto(ctx, k)
193 }
194
195
196 func (k *K8s) GetContainerLogs(ctx context.Context, podname string, namespace string, container string) (io.ReadCloser, error) {
197 clientset, err := kubernetes.NewForConfig(k.Env.Config)
198 if err != nil {
199 return nil, err
200 }
201
202 podLogOptions := corev1.PodLogOptions{Container: container}
203
204 req := clientset.CoreV1().Pods(namespace).GetLogs(podname, &podLogOptions)
205 return req.Stream(ctx)
206 }
207
208 func (k *K8s) setClient() error {
209 if k.Env == nil {
210 return fmt.Errorf("setClient called before envtest setup")
211 }
212
213 opts := client.Options{}
214 if k.Manager != nil {
215 opts.Scheme = k.Manager.GetScheme()
216 }
217 if k.options.clientScheme != nil {
218 opts.Scheme = k.options.clientScheme
219 }
220 c, err := client.New(k.Env.Config, opts)
221 if err != nil {
222 return err
223 }
224 k.Client = c
225
226 return nil
227 }
228
229
230
231
232 func (k *K8s) startMgr(ctx fctx.Context) (fctx.Context, error) {
233
234 if k.Env == nil {
235 return ctx, fmt.Errorf("startMgr called before envtest setup")
236 }
237 if k.options.mgrCreator == nil {
238 return ctx, fmt.Errorf("startMgr called without mgrCreator being set")
239 }
240
241
242
243 port := "0"
244 if k.options.metricsAddress != "" {
245 var err error
246 port, err = getMetricsAddress(k.options.metricsAddress)
247 if err != nil {
248 return ctx, err
249 }
250 }
251
252 mgrOpts := []controller.Option{
253 controller.WithCfg(k.Env.Config),
254 controller.WithMetricsAddress(port),
255 }
256
257
258 if k.options.gracefulTimeout != "" {
259 timeout, err := time.ParseDuration(k.options.gracefulTimeout)
260 if err != nil {
261 return ctx, fmt.Errorf("failed to parse graceful shutdown timeout: %w", err)
262 }
263 mgrOpts = append(mgrOpts, controller.WithGracefulTimeout(timeout))
264 }
265
266 mgr, err := k.options.mgrCreator(mgrOpts...)
267 if err != nil {
268 return ctx, fmt.Errorf("failed to create controller manager: %w", err)
269 }
270 k.Manager = mgr
271
272 managerCtx, cancel := context.WithCancel(ctx)
273 k.mgrCancel = cancel
274
275 startErrChan = make(chan error, 1)
276
277
278
279
280
281
282
283 go func() {
284 startErrChan <- k.Manager.Start(managerCtx)
285 close(startErrChan)
286 }()
287
288 return ctx, nil
289 }
290
291
292
293 func (k *K8s) stopMgr(ctx fctx.Context) (fctx.Context, error) {
294 k.mgrCancel()
295
296 if err := receiveStartErr(startErrChan); err != nil {
297 return ctx, err
298 }
299
300 return ctx, nil
301 }
302
303 func (k *K8s) setupEnvtest(ctx fctx.Context) (fctx.Context, error) {
304 var err error
305 k.Env, err = envtest.Setup(k.options.envtestOpts...)
306 return ctx, err
307 }
308
309 func (k *K8s) teardownEnvtest(ctx fctx.Context) (fctx.Context, error) {
310 return ctx, k.Env.Stop()
311 }
312
313
314
315 func FieldManagerOwner() sap.Owner {
316 return sap.Owner{
317 Field: "f2-ktest",
318 Group: "f2-ktest",
319 }
320 }
321
322
323
324 func getMetricsAddress(port string) (string, error) {
325 l, err := net.Listen("tcp", port)
326 defer func() { _ = l.Close() }()
327 if err != nil {
328 l, err = net.Listen("tcp", "")
329 if err != nil {
330 return "", fmt.Errorf("failed to find open port: %w", err)
331 }
332 }
333 tcpAddr, ok := l.Addr().(*net.TCPAddr)
334 if !ok {
335 return "", fmt.Errorf("tcpAddr should be of type TCPAddr")
336 }
337 port = fmt.Sprintf(":%d", tcpAddr.Port)
338 return port, nil
339 }
340
341
342
343
344 func receiveStartErr(startErrChan <-chan error) error {
345 timeout := time.After(startErrTimeout)
346 select {
347 case err := <-startErrChan:
348 if errors.Is(err, context.Canceled) {
349 log.Printf("manager context canceled")
350 break
351 }
352 if err != nil {
353 log.Printf("manager error: %v", err)
354 return err
355 }
356 case <-timeout:
357 return fmt.Errorf("no error received from startMgr before 5m timeout")
358 }
359 return nil
360 }
361
View as plain text