1
2
3
4
19
20 package tracing
21
22 import (
23 "context"
24 "encoding/hex"
25 "encoding/json"
26 "fmt"
27 "net"
28 "os"
29 "strings"
30 "sync"
31 "testing"
32 "time"
33
34 traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1"
35 commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
36 tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
37 "google.golang.org/grpc"
38
39 v1 "k8s.io/api/core/v1"
40 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
41 "k8s.io/apimachinery/pkg/types"
42 "k8s.io/apimachinery/pkg/util/strategicpatch"
43 kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2"
44 client "k8s.io/client-go/kubernetes"
45 utiltesting "k8s.io/client-go/util/testing"
46 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
47 "k8s.io/kubernetes/test/integration/framework"
48 )
49
50 func TestAPIServerTracingWithKMSv2(t *testing.T) {
51
52
53 listener, err := net.Listen("tcp", "localhost:")
54 if err != nil {
55 t.Fatal(err)
56 }
57
58 encryptionConfigFile, err := os.CreateTemp("", "encryption-config.yaml")
59 if err != nil {
60 t.Fatal(err)
61 }
62 defer os.Remove(encryptionConfigFile.Name())
63
64 if err := os.WriteFile(encryptionConfigFile.Name(), []byte(`
65 apiVersion: apiserver.config.k8s.io/v1
66 kind: EncryptionConfiguration
67 resources:
68 - resources:
69 - secrets
70 providers:
71 - kms:
72 apiVersion: v2
73 name: kms-provider
74 endpoint: unix:///@kms-provider.sock`), os.FileMode(0755)); err != nil {
75 t.Fatal(err)
76 }
77
78
79 tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
80 if err != nil {
81 t.Fatal(err)
82 }
83 defer os.Remove(tracingConfigFile.Name())
84
85 if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
86 apiVersion: apiserver.config.k8s.io/v1beta1
87 kind: TracingConfiguration
88 samplingRatePerMillion: 1000000
89 endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
90 t.Fatal(err)
91 }
92
93 srv := grpc.NewServer()
94 fakeServer := &traceServer{t: t}
95 fakeServer.resetExpectations([]*spanExpectation{})
96 traceservice.RegisterTraceServiceServer(srv, fakeServer)
97
98 go func() {
99 if err := srv.Serve(listener); err != nil {
100 t.Error(err)
101 return
102 }
103 }()
104 defer srv.Stop()
105
106 _ = kmsv2mock.NewBase64Plugin(t, "@kms-provider.sock")
107
108
109 testServer := kubeapiservertesting.StartTestServerOrDie(t,
110 kubeapiservertesting.NewDefaultTestServerOptions(),
111 []string{
112 "--tracing-config-file=" + tracingConfigFile.Name(),
113 "--encryption-provider-config=" + encryptionConfigFile.Name(),
114 },
115 framework.SharedEtcd(),
116 )
117 defer testServer.TearDownFn()
118 clientSet, err := client.NewForConfig(testServer.ClientConfig)
119 if err != nil {
120 t.Fatal(err)
121 }
122
123 for _, tc := range []struct {
124 desc string
125 apiCall func(client.Interface) error
126 expectedTrace []*spanExpectation
127 }{
128 {
129 desc: "create secret",
130 apiCall: func(c client.Interface) error {
131 _, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Create(context.Background(),
132 &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "fake"}, Data: map[string][]byte{"foo": []byte("bar")}}, metav1.CreateOptions{})
133 return err
134 },
135 expectedTrace: []*spanExpectation{
136 {
137 name: "TransformToStorage with envelopeTransformer",
138 attributes: map[string]func(*commonv1.AnyValue) bool{
139 "transformer.provider.name": func(v *commonv1.AnyValue) bool {
140 return v.GetStringValue() == "kms-provider"
141 },
142 },
143 events: []string{
144 "About to encrypt data using DEK",
145 "Data encryption succeeded",
146 "About to encode encrypted object",
147 "Encoded encrypted object",
148 },
149 },
150 },
151 },
152 {
153 desc: "get secret",
154 apiCall: func(c client.Interface) error {
155
156 _, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Get(context.Background(), "fake", metav1.GetOptions{})
157 return err
158 },
159 expectedTrace: []*spanExpectation{
160 {
161 name: "TransformFromStorage with envelopeTransformer",
162 attributes: map[string]func(*commonv1.AnyValue) bool{
163 "transformer.provider.name": func(v *commonv1.AnyValue) bool {
164 return v.GetStringValue() == "kms-provider"
165 },
166 },
167 events: []string{
168 "About to decode encrypted object",
169 "Decoded encrypted object",
170 "About to decrypt data using DEK",
171 "Data decryption succeeded",
172 },
173 },
174 },
175 },
176 } {
177 t.Run(tc.desc, func(t *testing.T) {
178 fakeServer.resetExpectations(tc.expectedTrace)
179
180
181 if err := tc.apiCall(clientSet); err != nil {
182 t.Fatal(err)
183 }
184
185
186 select {
187 case <-fakeServer.traceFound:
188 case <-time.After(30 * time.Second):
189 t.Fatal("Timed out waiting for trace")
190 }
191 })
192 }
193 }
194
195 func TestAPIServerTracingWithEgressSelector(t *testing.T) {
196
197
198 listener, err := net.Listen("tcp", "localhost:")
199 if err != nil {
200 t.Fatal(err)
201 }
202
203
204 egressSelectorConfigFile, err := os.CreateTemp("", "egress_selector_configuration.yaml")
205 if err != nil {
206 t.Fatal(err)
207 }
208 defer os.Remove(egressSelectorConfigFile.Name())
209
210 if err := os.WriteFile(egressSelectorConfigFile.Name(), []byte(`
211 apiVersion: apiserver.config.k8s.io/v1beta1
212 kind: EgressSelectorConfiguration
213 egressSelections:
214 - name: cluster
215 connection:
216 proxyProtocol: Direct
217 transport:`), os.FileMode(0755)); err != nil {
218 t.Fatal(err)
219 }
220
221
222 tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
223 if err != nil {
224 t.Fatal(err)
225 }
226 defer utiltesting.CloseAndRemove(t, tracingConfigFile)
227
228 if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
229 apiVersion: apiserver.config.k8s.io/v1beta1
230 kind: TracingConfiguration
231 endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
232 t.Fatal(err)
233 }
234
235
236 testServer := kubeapiservertesting.StartTestServerOrDie(t,
237 kubeapiservertesting.NewDefaultTestServerOptions(),
238 []string{
239 "--tracing-config-file=" + tracingConfigFile.Name(),
240 "--egress-selector-config-file=" + egressSelectorConfigFile.Name(),
241 },
242 framework.SharedEtcd(),
243 )
244 defer testServer.TearDownFn()
245 clientSet, err := client.NewForConfig(testServer.ClientConfig)
246 if err != nil {
247 t.Fatal(err)
248 }
249
250 _, err = clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
251 if err != nil {
252 t.Fatal(err)
253 }
254 }
255
256 func TestAPIServerTracing(t *testing.T) {
257
258
259 listener, err := net.Listen("tcp", "localhost:")
260 if err != nil {
261 t.Fatal(err)
262 }
263
264 tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
265 if err != nil {
266 t.Fatal(err)
267 }
268 defer os.Remove(tracingConfigFile.Name())
269
270 if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
271 apiVersion: apiserver.config.k8s.io/v1beta1
272 kind: TracingConfiguration
273 samplingRatePerMillion: 1000000
274 endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
275 t.Fatal(err)
276 }
277
278 srv := grpc.NewServer()
279 fakeServer := &traceServer{t: t}
280 fakeServer.resetExpectations([]*spanExpectation{})
281 traceservice.RegisterTraceServiceServer(srv, fakeServer)
282
283 go srv.Serve(listener)
284 defer srv.Stop()
285
286
287 testServer := kubeapiservertesting.StartTestServerOrDie(t,
288 kubeapiservertesting.NewDefaultTestServerOptions(),
289 []string{"--tracing-config-file=" + tracingConfigFile.Name()},
290 framework.SharedEtcd(),
291 )
292 defer testServer.TearDownFn()
293 clientSet, err := client.NewForConfig(testServer.ClientConfig)
294 if err != nil {
295 t.Fatal(err)
296 }
297
298 for _, tc := range []struct {
299 desc string
300 apiCall func(*client.Clientset) error
301 expectedTrace []*spanExpectation
302 }{
303 {
304 desc: "create node",
305 apiCall: func(c *client.Clientset) error {
306 _, err = clientSet.CoreV1().Nodes().Create(context.Background(),
307 &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "fake"}}, metav1.CreateOptions{})
308 return err
309 },
310 expectedTrace: []*spanExpectation{
311 {
312 name: "KubernetesAPI",
313 attributes: map[string]func(*commonv1.AnyValue) bool{
314 "http.user_agent": func(v *commonv1.AnyValue) bool {
315 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
316 },
317 "http.target": func(v *commonv1.AnyValue) bool {
318 return v.GetStringValue() == "/api/v1/nodes"
319 },
320 "http.method": func(v *commonv1.AnyValue) bool {
321 return v.GetStringValue() == "POST"
322 },
323 },
324 },
325 {
326 name: "authentication",
327 },
328 {
329 name: "Create",
330 attributes: map[string]func(*commonv1.AnyValue) bool{
331 "url": func(v *commonv1.AnyValue) bool {
332 return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes")
333 },
334 "user-agent": func(v *commonv1.AnyValue) bool {
335 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
336 },
337 "audit-id": func(v *commonv1.AnyValue) bool {
338 return v.GetStringValue() != ""
339 },
340 "client": func(v *commonv1.AnyValue) bool {
341 return v.GetStringValue() == "127.0.0.1"
342 },
343 "accept": func(v *commonv1.AnyValue) bool {
344 return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
345 },
346 "protocol": func(v *commonv1.AnyValue) bool {
347 return v.GetStringValue() == "HTTP/2.0"
348 },
349 },
350 events: []string{
351 "limitedReadBody succeeded",
352 "About to convert to expected version",
353 "Conversion done",
354 "About to store object in database",
355 "Write to database call succeeded",
356 "About to write a response",
357 "Writing http response done",
358 },
359 },
360 {
361 name: "Create etcd3",
362 attributes: map[string]func(*commonv1.AnyValue) bool{
363 "audit-id": func(v *commonv1.AnyValue) bool {
364 return v.GetStringValue() != ""
365 },
366 "key": func(v *commonv1.AnyValue) bool {
367 return v.GetStringValue() == "/minions/fake"
368 },
369 "type": func(v *commonv1.AnyValue) bool {
370 return v.GetStringValue() == "*core.Node"
371 },
372 "resource": func(v *commonv1.AnyValue) bool {
373 return v.GetStringValue() == "nodes"
374 },
375 },
376 events: []string{
377 "About to Encode",
378 "Encode succeeded",
379 "TransformToStorage succeeded",
380 "Txn call succeeded",
381 "decode succeeded",
382 },
383 },
384 {
385 name: "etcdserverpb.KV/Txn",
386 attributes: map[string]func(*commonv1.AnyValue) bool{
387 "rpc.system": func(v *commonv1.AnyValue) bool {
388 return v.GetStringValue() == "grpc"
389 },
390 },
391 events: []string{"message"},
392 },
393 {
394 name: "SerializeObject",
395 attributes: map[string]func(*commonv1.AnyValue) bool{
396 "url": func(v *commonv1.AnyValue) bool {
397 return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes")
398 },
399 "audit-id": func(v *commonv1.AnyValue) bool {
400 return v.GetStringValue() != ""
401 },
402 "protocol": func(v *commonv1.AnyValue) bool {
403 return v.GetStringValue() == "HTTP/2.0"
404 },
405 "method": func(v *commonv1.AnyValue) bool {
406 return v.GetStringValue() == "POST"
407 },
408 "mediaType": func(v *commonv1.AnyValue) bool {
409 return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
410 },
411 "encoder": func(v *commonv1.AnyValue) bool {
412 return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
413 },
414 },
415 events: []string{
416 "About to start writing response",
417 "Write call succeeded",
418 },
419 },
420 },
421 },
422 {
423 desc: "get node",
424 apiCall: func(c *client.Clientset) error {
425
426 _, err = clientSet.CoreV1().Nodes().Get(context.Background(), "fake", metav1.GetOptions{})
427 return err
428 },
429 expectedTrace: []*spanExpectation{
430 {
431 name: "KubernetesAPI",
432 attributes: map[string]func(*commonv1.AnyValue) bool{
433 "http.user_agent": func(v *commonv1.AnyValue) bool {
434 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
435 },
436 "http.target": func(v *commonv1.AnyValue) bool {
437 return v.GetStringValue() == "/api/v1/nodes/fake"
438 },
439 "http.method": func(v *commonv1.AnyValue) bool {
440 return v.GetStringValue() == "GET"
441 },
442 },
443 },
444 {
445 name: "authentication",
446 },
447 {
448 name: "Get",
449 attributes: map[string]func(*commonv1.AnyValue) bool{
450 "url": func(v *commonv1.AnyValue) bool {
451 return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
452 },
453 "user-agent": func(v *commonv1.AnyValue) bool {
454 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
455 },
456 "audit-id": func(v *commonv1.AnyValue) bool {
457 return v.GetStringValue() != ""
458 },
459 "client": func(v *commonv1.AnyValue) bool {
460 return v.GetStringValue() == "127.0.0.1"
461 },
462 "accept": func(v *commonv1.AnyValue) bool {
463 return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
464 },
465 "protocol": func(v *commonv1.AnyValue) bool {
466 return v.GetStringValue() == "HTTP/2.0"
467 },
468 },
469 events: []string{
470 "About to Get from storage",
471 "About to write a response",
472 "Writing http response done",
473 },
474 },
475 {
476 name: "etcdserverpb.KV/Range",
477 attributes: map[string]func(*commonv1.AnyValue) bool{
478 "rpc.system": func(v *commonv1.AnyValue) bool {
479 return v.GetStringValue() == "grpc"
480 },
481 },
482 events: []string{"message"},
483 },
484 {
485 name: "SerializeObject",
486 attributes: map[string]func(*commonv1.AnyValue) bool{
487 "url": func(v *commonv1.AnyValue) bool {
488 return v.GetStringValue() == "/api/v1/nodes/fake"
489 },
490 "audit-id": func(v *commonv1.AnyValue) bool {
491 return v.GetStringValue() != ""
492 },
493 "protocol": func(v *commonv1.AnyValue) bool {
494 return v.GetStringValue() == "HTTP/2.0"
495 },
496 "method": func(v *commonv1.AnyValue) bool {
497 return v.GetStringValue() == "GET"
498 },
499 "mediaType": func(v *commonv1.AnyValue) bool {
500 return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
501 },
502 "encoder": func(v *commonv1.AnyValue) bool {
503 return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
504 },
505 },
506 events: []string{
507 "About to start writing response",
508 "Write call succeeded",
509 },
510 },
511 },
512 },
513 {
514 desc: "list nodes",
515 apiCall: func(c *client.Clientset) error {
516 _, err = clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
517 return err
518 },
519 expectedTrace: []*spanExpectation{
520 {
521 name: "KubernetesAPI",
522 attributes: map[string]func(*commonv1.AnyValue) bool{
523 "http.user_agent": func(v *commonv1.AnyValue) bool {
524 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
525 },
526 "http.target": func(v *commonv1.AnyValue) bool {
527 return v.GetStringValue() == "/api/v1/nodes"
528 },
529 "http.method": func(v *commonv1.AnyValue) bool {
530 return v.GetStringValue() == "GET"
531 },
532 },
533 },
534 {
535 name: "authentication",
536 },
537 {
538 name: "List",
539 attributes: map[string]func(*commonv1.AnyValue) bool{
540 "url": func(v *commonv1.AnyValue) bool {
541 return v.GetStringValue() == "/api/v1/nodes"
542 },
543 "user-agent": func(v *commonv1.AnyValue) bool {
544 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
545 },
546 "audit-id": func(v *commonv1.AnyValue) bool {
547 return v.GetStringValue() != ""
548 },
549 "client": func(v *commonv1.AnyValue) bool {
550 return v.GetStringValue() == "127.0.0.1"
551 },
552 "accept": func(v *commonv1.AnyValue) bool {
553 return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
554 },
555 "protocol": func(v *commonv1.AnyValue) bool {
556 return v.GetStringValue() == "HTTP/2.0"
557 },
558 },
559 events: []string{
560 "About to List from storage",
561 "Listing from storage done",
562 "Writing http response done",
563 },
564 },
565 {
566 name: "List(recursive=true) etcd3",
567 attributes: map[string]func(*commonv1.AnyValue) bool{
568 "audit-id": func(v *commonv1.AnyValue) bool {
569 return v.GetStringValue() != ""
570 },
571 "key": func(v *commonv1.AnyValue) bool {
572 return v.GetStringValue() == "/minions"
573 },
574 "resourceVersion": func(v *commonv1.AnyValue) bool {
575 return v.GetStringValue() == ""
576 },
577 "resourceVersionMatch": func(v *commonv1.AnyValue) bool {
578 return v.GetStringValue() == ""
579 },
580 "limit": func(v *commonv1.AnyValue) bool {
581 return v.GetIntValue() == 0
582 },
583 "continue": func(v *commonv1.AnyValue) bool {
584 return v.GetStringValue() == ""
585 },
586 },
587 },
588 {
589 name: "etcdserverpb.KV/Range",
590 attributes: map[string]func(*commonv1.AnyValue) bool{
591 "rpc.system": func(v *commonv1.AnyValue) bool {
592 return v.GetStringValue() == "grpc"
593 },
594 },
595 events: []string{"message"},
596 },
597 {
598 name: "SerializeObject",
599 attributes: map[string]func(*commonv1.AnyValue) bool{
600 "url": func(v *commonv1.AnyValue) bool {
601 return v.GetStringValue() == "/api/v1/nodes"
602 },
603 "audit-id": func(v *commonv1.AnyValue) bool {
604 return v.GetStringValue() != ""
605 },
606 "protocol": func(v *commonv1.AnyValue) bool {
607 return v.GetStringValue() == "HTTP/2.0"
608 },
609 "method": func(v *commonv1.AnyValue) bool {
610 return v.GetStringValue() == "GET"
611 },
612 "mediaType": func(v *commonv1.AnyValue) bool {
613 return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
614 },
615 "encoder": func(v *commonv1.AnyValue) bool {
616 return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
617 },
618 },
619 events: []string{
620 "About to start writing response",
621 "Write call succeeded",
622 },
623 },
624 },
625 },
626 {
627 desc: "update node",
628 apiCall: func(c *client.Clientset) error {
629
630 _, err = clientSet.CoreV1().Nodes().Update(context.Background(),
631 &v1.Node{ObjectMeta: metav1.ObjectMeta{
632 Name: "fake",
633 Annotations: map[string]string{"foo": "bar"},
634 }}, metav1.UpdateOptions{})
635 return err
636 },
637 expectedTrace: []*spanExpectation{
638 {
639 name: "KubernetesAPI",
640 attributes: map[string]func(*commonv1.AnyValue) bool{
641 "http.user_agent": func(v *commonv1.AnyValue) bool {
642 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
643 },
644 "http.target": func(v *commonv1.AnyValue) bool {
645 return v.GetStringValue() == "/api/v1/nodes/fake"
646 },
647 "http.method": func(v *commonv1.AnyValue) bool {
648 return v.GetStringValue() == "PUT"
649 },
650 },
651 },
652 {
653 name: "authentication",
654 },
655 {
656 name: "Update",
657 attributes: map[string]func(*commonv1.AnyValue) bool{
658 "url": func(v *commonv1.AnyValue) bool {
659 return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
660 },
661 "user-agent": func(v *commonv1.AnyValue) bool {
662 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
663 },
664 "audit-id": func(v *commonv1.AnyValue) bool {
665 return v.GetStringValue() != ""
666 },
667 "client": func(v *commonv1.AnyValue) bool {
668 return v.GetStringValue() == "127.0.0.1"
669 },
670 "accept": func(v *commonv1.AnyValue) bool {
671 return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
672 },
673 "protocol": func(v *commonv1.AnyValue) bool {
674 return v.GetStringValue() == "HTTP/2.0"
675 },
676 },
677 events: []string{
678 "limitedReadBody succeeded",
679 "About to convert to expected version",
680 "Conversion done",
681 "About to store object in database",
682 "Write to database call succeeded",
683 "About to write a response",
684 "Writing http response done",
685 },
686 },
687 {
688 name: "GuaranteedUpdate etcd3",
689 attributes: map[string]func(*commonv1.AnyValue) bool{
690 "audit-id": func(v *commonv1.AnyValue) bool {
691 return v.GetStringValue() != ""
692 },
693 "key": func(v *commonv1.AnyValue) bool {
694 return v.GetStringValue() == "/minions/fake"
695 },
696 "type": func(v *commonv1.AnyValue) bool {
697 return v.GetStringValue() == "*core.Node"
698 },
699 "resource": func(v *commonv1.AnyValue) bool {
700 return v.GetStringValue() == "nodes"
701 },
702 },
703 events: []string{
704 "initial value restored",
705 "About to Encode",
706 "Encode succeeded",
707 "TransformToStorage succeeded",
708 "Transaction prepared",
709 "Txn call completed",
710 "Transaction committed",
711 "decode succeeded",
712 },
713 },
714 {
715 name: "etcdserverpb.KV/Txn",
716 attributes: map[string]func(*commonv1.AnyValue) bool{
717 "rpc.system": func(v *commonv1.AnyValue) bool {
718 return v.GetStringValue() == "grpc"
719 },
720 },
721 events: []string{"message"},
722 },
723 {
724 name: "SerializeObject",
725 attributes: map[string]func(*commonv1.AnyValue) bool{
726 "url": func(v *commonv1.AnyValue) bool {
727 return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
728 },
729 "audit-id": func(v *commonv1.AnyValue) bool {
730 return v.GetStringValue() != ""
731 },
732 "protocol": func(v *commonv1.AnyValue) bool {
733 return v.GetStringValue() == "HTTP/2.0"
734 },
735 "method": func(v *commonv1.AnyValue) bool {
736 return v.GetStringValue() == "PUT"
737 },
738 "mediaType": func(v *commonv1.AnyValue) bool {
739 return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
740 },
741 "encoder": func(v *commonv1.AnyValue) bool {
742 return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
743 },
744 },
745 events: []string{
746 "About to start writing response",
747 "Write call succeeded",
748 },
749 },
750 },
751 },
752 {
753 desc: "patch node",
754 apiCall: func(c *client.Clientset) error {
755
756 oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{
757 Name: "fake",
758 Annotations: map[string]string{"foo": "bar"},
759 }}
760 oldData, err := json.Marshal(oldNode)
761 if err != nil {
762 return err
763 }
764 newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{
765 Name: "fake",
766 Annotations: map[string]string{"foo": "bar"},
767 Labels: map[string]string{"hello": "world"},
768 }}
769 newData, err := json.Marshal(newNode)
770 if err != nil {
771 return err
772 }
773
774 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
775 if err != nil {
776 return err
777 }
778 _, err = clientSet.CoreV1().Nodes().Patch(context.Background(), "fake", types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
779 return err
780 },
781 expectedTrace: []*spanExpectation{
782 {
783 name: "KubernetesAPI",
784 attributes: map[string]func(*commonv1.AnyValue) bool{
785 "http.user_agent": func(v *commonv1.AnyValue) bool {
786 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
787 },
788 "http.target": func(v *commonv1.AnyValue) bool {
789 return v.GetStringValue() == "/api/v1/nodes/fake"
790 },
791 "http.method": func(v *commonv1.AnyValue) bool {
792 return v.GetStringValue() == "PATCH"
793 },
794 },
795 },
796 {
797 name: "authentication",
798 },
799 {
800 name: "Patch",
801 attributes: map[string]func(*commonv1.AnyValue) bool{
802 "url": func(v *commonv1.AnyValue) bool {
803 return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
804 },
805 "user-agent": func(v *commonv1.AnyValue) bool {
806 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
807 },
808 "audit-id": func(v *commonv1.AnyValue) bool {
809 return v.GetStringValue() != ""
810 },
811 "client": func(v *commonv1.AnyValue) bool {
812 return v.GetStringValue() == "127.0.0.1"
813 },
814 "accept": func(v *commonv1.AnyValue) bool {
815 return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
816 },
817 "protocol": func(v *commonv1.AnyValue) bool {
818 return v.GetStringValue() == "HTTP/2.0"
819 },
820 },
821 events: []string{
822 "limitedReadBody succeeded",
823 "Recorded the audit event",
824 "About to apply patch",
825 "About to check admission control",
826 "Object stored in database",
827 "About to write a response",
828 "Writing http response done",
829 },
830 },
831 {
832 name: "GuaranteedUpdate etcd3",
833 attributes: map[string]func(*commonv1.AnyValue) bool{
834 "audit-id": func(v *commonv1.AnyValue) bool {
835 return v.GetStringValue() != ""
836 },
837 "key": func(v *commonv1.AnyValue) bool {
838 return v.GetStringValue() == "/minions/fake"
839 },
840 "type": func(v *commonv1.AnyValue) bool {
841 return v.GetStringValue() == "*core.Node"
842 },
843 "resource": func(v *commonv1.AnyValue) bool {
844 return v.GetStringValue() == "nodes"
845 },
846 },
847 events: []string{
848 "initial value restored",
849 "About to Encode",
850 "Encode succeeded",
851 "TransformToStorage succeeded",
852 "Transaction prepared",
853 "Txn call completed",
854 "Transaction committed",
855 "decode succeeded",
856 },
857 },
858 {
859 name: "etcdserverpb.KV/Txn",
860 attributes: map[string]func(*commonv1.AnyValue) bool{
861 "rpc.system": func(v *commonv1.AnyValue) bool {
862 return v.GetStringValue() == "grpc"
863 },
864 },
865 events: []string{"message"},
866 },
867 {
868 name: "SerializeObject",
869 attributes: map[string]func(*commonv1.AnyValue) bool{
870 "url": func(v *commonv1.AnyValue) bool {
871 return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
872 },
873 "audit-id": func(v *commonv1.AnyValue) bool {
874 return v.GetStringValue() != ""
875 },
876 "protocol": func(v *commonv1.AnyValue) bool {
877 return v.GetStringValue() == "HTTP/2.0"
878 },
879 "method": func(v *commonv1.AnyValue) bool {
880 return v.GetStringValue() == "PATCH"
881 },
882 "mediaType": func(v *commonv1.AnyValue) bool {
883 return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
884 },
885 "encoder": func(v *commonv1.AnyValue) bool {
886 return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
887 },
888 },
889 events: []string{
890 "About to start writing response",
891 "Write call succeeded",
892 },
893 },
894 },
895 },
896 {
897 desc: "delete node",
898 apiCall: func(c *client.Clientset) error {
899
900 return clientSet.CoreV1().Nodes().Delete(context.Background(), "fake", metav1.DeleteOptions{})
901 },
902 expectedTrace: []*spanExpectation{
903 {
904 name: "KubernetesAPI",
905 attributes: map[string]func(*commonv1.AnyValue) bool{
906 "http.user_agent": func(v *commonv1.AnyValue) bool {
907 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
908 },
909 "http.target": func(v *commonv1.AnyValue) bool {
910 return v.GetStringValue() == "/api/v1/nodes/fake"
911 },
912 "http.method": func(v *commonv1.AnyValue) bool {
913 return v.GetStringValue() == "DELETE"
914 },
915 },
916 },
917 {
918 name: "authentication",
919 },
920 {
921 name: "Delete",
922 attributes: map[string]func(*commonv1.AnyValue) bool{
923 "url": func(v *commonv1.AnyValue) bool {
924 return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
925 },
926 "user-agent": func(v *commonv1.AnyValue) bool {
927 return strings.HasPrefix(v.GetStringValue(), "tracing.test")
928 },
929 "audit-id": func(v *commonv1.AnyValue) bool {
930 return v.GetStringValue() != ""
931 },
932 "client": func(v *commonv1.AnyValue) bool {
933 return v.GetStringValue() == "127.0.0.1"
934 },
935 "accept": func(v *commonv1.AnyValue) bool {
936 return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
937 },
938 "protocol": func(v *commonv1.AnyValue) bool {
939 return v.GetStringValue() == "HTTP/2.0"
940 },
941 },
942 events: []string{
943 "limitedReadBody succeeded",
944 "Decoded delete options",
945 "Recorded the audit event",
946 "About to delete object from database",
947 "Object deleted from database",
948 "About to write a response",
949 "Writing http response done",
950 },
951 },
952 {
953 name: "etcdserverpb.KV/Txn",
954 attributes: map[string]func(*commonv1.AnyValue) bool{
955 "rpc.system": func(v *commonv1.AnyValue) bool {
956 return v.GetStringValue() == "grpc"
957 },
958 },
959 events: []string{"message"},
960 },
961 {
962 name: "SerializeObject",
963 attributes: map[string]func(*commonv1.AnyValue) bool{
964 "url": func(v *commonv1.AnyValue) bool {
965 return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
966 },
967 "audit-id": func(v *commonv1.AnyValue) bool {
968 return v.GetStringValue() != ""
969 },
970 "protocol": func(v *commonv1.AnyValue) bool {
971 return v.GetStringValue() == "HTTP/2.0"
972 },
973 "method": func(v *commonv1.AnyValue) bool {
974 return v.GetStringValue() == "DELETE"
975 },
976 "mediaType": func(v *commonv1.AnyValue) bool {
977 return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
978 },
979 "encoder": func(v *commonv1.AnyValue) bool {
980 return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
981 },
982 },
983 events: []string{
984 "About to start writing response",
985 "Write call succeeded",
986 },
987 },
988 },
989 },
990 } {
991 t.Run(tc.desc, func(t *testing.T) {
992 fakeServer.resetExpectations(tc.expectedTrace)
993
994
995 if err := tc.apiCall(clientSet); err != nil {
996 t.Fatal(err)
997 }
998
999
1000 select {
1001 case <-fakeServer.traceFound:
1002 case <-time.After(30 * time.Second):
1003 t.Fatal("Timed out waiting for trace")
1004 }
1005 })
1006 }
1007 }
1008
1009
1010
1011 type traceServer struct {
1012 t *testing.T
1013 traceservice.UnimplementedTraceServiceServer
1014
1015 lock sync.Mutex
1016 traceFound chan struct{}
1017 expectations traceExpectation
1018 }
1019
1020 func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) {
1021 t.lock.Lock()
1022 defer t.lock.Unlock()
1023
1024 t.expectations.update(req)
1025
1026 if t.expectations.met() {
1027 select {
1028 case <-t.traceFound:
1029
1030 default:
1031 close(t.traceFound)
1032 }
1033 }
1034 return &traceservice.ExportTraceServiceResponse{}, nil
1035 }
1036
1037
1038
1039 func (t *traceServer) resetExpectations(newExpectations traceExpectation) {
1040 t.lock.Lock()
1041 defer t.lock.Unlock()
1042 t.traceFound = make(chan struct{})
1043 t.expectations = newExpectations
1044 }
1045
1046
1047 type traceExpectation []*spanExpectation
1048
1049
1050
1051 func (t traceExpectation) met() bool {
1052 if len(t) == 0 {
1053 return true
1054 }
1055
1056
1057 possibleTraceIDs := t[0].metTraceIDs
1058 for _, tid := range possibleTraceIDs {
1059 if t.contains(tid) {
1060 return true
1061 }
1062 }
1063 return false
1064 }
1065
1066
1067
1068 func (t traceExpectation) contains(checkTID string) bool {
1069 for _, expectation := range t {
1070 if !expectation.contains(checkTID) {
1071 return false
1072 }
1073 }
1074 return true
1075 }
1076
1077
1078
1079 func (t traceExpectation) update(req *traceservice.ExportTraceServiceRequest) {
1080 for _, resourceSpans := range req.GetResourceSpans() {
1081 for _, instrumentationSpans := range resourceSpans.GetScopeSpans() {
1082 for _, span := range instrumentationSpans.GetSpans() {
1083 t.updateForSpan(span)
1084 }
1085 }
1086 }
1087 }
1088
1089
1090 func (t traceExpectation) updateForSpan(span *tracev1.Span) {
1091 for i, spanExpectation := range t {
1092 if span.Name != spanExpectation.name {
1093 continue
1094 }
1095 if !spanExpectation.attributes.matches(span.GetAttributes()) {
1096 continue
1097 }
1098 if !spanExpectation.events.matches(span.GetEvents()) {
1099 continue
1100 }
1101 t[i].metTraceIDs = append(spanExpectation.metTraceIDs, hex.EncodeToString(span.TraceId[:]))
1102 }
1103
1104 }
1105
1106
1107 type spanExpectation struct {
1108 name string
1109 attributes attributeExpectation
1110 events eventExpectation
1111
1112
1113 metTraceIDs []string
1114 }
1115
1116 func (s *spanExpectation) contains(tid string) bool {
1117 for _, metTID := range s.metTraceIDs {
1118 if tid == metTID {
1119 return true
1120 }
1121 }
1122 return false
1123 }
1124
1125
1126
1127 type eventExpectation []string
1128
1129
1130 func (e eventExpectation) matches(events []*tracev1.Span_Event) bool {
1131 eventMap := map[string]struct{}{}
1132 for _, event := range events {
1133 eventMap[event.Name] = struct{}{}
1134 }
1135 for _, wantEvent := range e {
1136 if _, ok := eventMap[wantEvent]; !ok {
1137 return false
1138 }
1139 }
1140 return true
1141 }
1142
1143
1144
1145 type attributeExpectation map[string]func(*commonv1.AnyValue) bool
1146
1147
1148 func (a attributeExpectation) matches(attrs []*commonv1.KeyValue) bool {
1149 attrsMap := map[string]*commonv1.AnyValue{}
1150 for _, attr := range attrs {
1151 attrsMap[attr.GetKey()] = attr.GetValue()
1152 }
1153 for key, checkVal := range a {
1154 if val, ok := attrsMap[key]; !ok || !checkVal(val) {
1155 return false
1156 }
1157 }
1158 return true
1159 }
1160
View as plain text