...

Source file src/k8s.io/kubernetes/test/integration/apiserver/tracing/tracing_test.go

Documentation: k8s.io/kubernetes/test/integration/apiserver/tracing

     1  //go:build !windows
     2  // +build !windows
     3  
     4  /*
     5  Copyright 2021 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package tracing
    21  
    22  import (
    23  	"context"
    24  	"encoding/hex"
    25  	"encoding/json"
    26  	"fmt"
    27  	"net"
    28  	"os"
    29  	"strings"
    30  	"sync"
    31  	"testing"
    32  	"time"
    33  
    34  	traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1"
    35  	commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
    36  	tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
    37  	"google.golang.org/grpc"
    38  
    39  	v1 "k8s.io/api/core/v1"
    40  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    41  	"k8s.io/apimachinery/pkg/types"
    42  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    43  	kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2"
    44  	client "k8s.io/client-go/kubernetes"
    45  	utiltesting "k8s.io/client-go/util/testing"
    46  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    47  	"k8s.io/kubernetes/test/integration/framework"
    48  )
    49  
    50  func TestAPIServerTracingWithKMSv2(t *testing.T) {
    51  	// Listen for traces from the API Server before starting it, so the
    52  	// API Server will successfully connect right away during the test.
    53  	listener, err := net.Listen("tcp", "localhost:")
    54  	if err != nil {
    55  		t.Fatal(err)
    56  	}
    57  
    58  	encryptionConfigFile, err := os.CreateTemp("", "encryption-config.yaml")
    59  	if err != nil {
    60  		t.Fatal(err)
    61  	}
    62  	defer os.Remove(encryptionConfigFile.Name())
    63  
    64  	if err := os.WriteFile(encryptionConfigFile.Name(), []byte(`
    65  apiVersion: apiserver.config.k8s.io/v1
    66  kind: EncryptionConfiguration
    67  resources:
    68    - resources:
    69      - secrets
    70      providers:
    71      - kms:
    72         apiVersion: v2
    73         name: kms-provider
    74         endpoint: unix:///@kms-provider.sock`), os.FileMode(0755)); err != nil {
    75  		t.Fatal(err)
    76  	}
    77  
    78  	// Write the configuration for tracing to a file
    79  	tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
    80  	if err != nil {
    81  		t.Fatal(err)
    82  	}
    83  	defer os.Remove(tracingConfigFile.Name())
    84  
    85  	if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
    86  apiVersion: apiserver.config.k8s.io/v1beta1
    87  kind: TracingConfiguration
    88  samplingRatePerMillion: 1000000
    89  endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
    90  		t.Fatal(err)
    91  	}
    92  
    93  	srv := grpc.NewServer()
    94  	fakeServer := &traceServer{t: t}
    95  	fakeServer.resetExpectations([]*spanExpectation{})
    96  	traceservice.RegisterTraceServiceServer(srv, fakeServer)
    97  
    98  	go func() {
    99  		if err := srv.Serve(listener); err != nil {
   100  			t.Error(err)
   101  			return
   102  		}
   103  	}()
   104  	defer srv.Stop()
   105  
   106  	_ = kmsv2mock.NewBase64Plugin(t, "@kms-provider.sock")
   107  
   108  	// Start the API Server with our tracing configuration
   109  	testServer := kubeapiservertesting.StartTestServerOrDie(t,
   110  		kubeapiservertesting.NewDefaultTestServerOptions(),
   111  		[]string{
   112  			"--tracing-config-file=" + tracingConfigFile.Name(),
   113  			"--encryption-provider-config=" + encryptionConfigFile.Name(),
   114  		},
   115  		framework.SharedEtcd(),
   116  	)
   117  	defer testServer.TearDownFn()
   118  	clientSet, err := client.NewForConfig(testServer.ClientConfig)
   119  	if err != nil {
   120  		t.Fatal(err)
   121  	}
   122  
   123  	for _, tc := range []struct {
   124  		desc          string
   125  		apiCall       func(client.Interface) error
   126  		expectedTrace []*spanExpectation
   127  	}{
   128  		{
   129  			desc: "create secret",
   130  			apiCall: func(c client.Interface) error {
   131  				_, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Create(context.Background(),
   132  					&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "fake"}, Data: map[string][]byte{"foo": []byte("bar")}}, metav1.CreateOptions{})
   133  				return err
   134  			},
   135  			expectedTrace: []*spanExpectation{
   136  				{
   137  					name: "TransformToStorage with envelopeTransformer",
   138  					attributes: map[string]func(*commonv1.AnyValue) bool{
   139  						"transformer.provider.name": func(v *commonv1.AnyValue) bool {
   140  							return v.GetStringValue() == "kms-provider"
   141  						},
   142  					},
   143  					events: []string{
   144  						"About to encrypt data using DEK",
   145  						"Data encryption succeeded",
   146  						"About to encode encrypted object",
   147  						"Encoded encrypted object",
   148  					},
   149  				},
   150  			},
   151  		},
   152  		{
   153  			desc: "get secret",
   154  			apiCall: func(c client.Interface) error {
   155  				// This depends on the "create secret" step having completed successfully
   156  				_, err = clientSet.CoreV1().Secrets(v1.NamespaceDefault).Get(context.Background(), "fake", metav1.GetOptions{})
   157  				return err
   158  			},
   159  			expectedTrace: []*spanExpectation{
   160  				{
   161  					name: "TransformFromStorage with envelopeTransformer",
   162  					attributes: map[string]func(*commonv1.AnyValue) bool{
   163  						"transformer.provider.name": func(v *commonv1.AnyValue) bool {
   164  							return v.GetStringValue() == "kms-provider"
   165  						},
   166  					},
   167  					events: []string{
   168  						"About to decode encrypted object",
   169  						"Decoded encrypted object",
   170  						"About to decrypt data using DEK",
   171  						"Data decryption succeeded",
   172  					},
   173  				},
   174  			},
   175  		},
   176  	} {
   177  		t.Run(tc.desc, func(t *testing.T) {
   178  			fakeServer.resetExpectations(tc.expectedTrace)
   179  
   180  			// Make our call to the API server
   181  			if err := tc.apiCall(clientSet); err != nil {
   182  				t.Fatal(err)
   183  			}
   184  
   185  			// Wait for a span to be recorded from our request
   186  			select {
   187  			case <-fakeServer.traceFound:
   188  			case <-time.After(30 * time.Second):
   189  				t.Fatal("Timed out waiting for trace")
   190  			}
   191  		})
   192  	}
   193  }
   194  
   195  func TestAPIServerTracingWithEgressSelector(t *testing.T) {
   196  	// Listen for traces from the API Server before starting it, so the
   197  	// API Server will successfully connect right away during the test.
   198  	listener, err := net.Listen("tcp", "localhost:")
   199  	if err != nil {
   200  		t.Fatal(err)
   201  	}
   202  	// Use an egress selector which doesn't have a controlplane config to ensure
   203  	// tracing works in that context. Write the egress selector configuration to a file.
   204  	egressSelectorConfigFile, err := os.CreateTemp("", "egress_selector_configuration.yaml")
   205  	if err != nil {
   206  		t.Fatal(err)
   207  	}
   208  	defer os.Remove(egressSelectorConfigFile.Name())
   209  
   210  	if err := os.WriteFile(egressSelectorConfigFile.Name(), []byte(`
   211  apiVersion: apiserver.config.k8s.io/v1beta1
   212  kind: EgressSelectorConfiguration
   213  egressSelections:
   214  - name: cluster
   215    connection:
   216      proxyProtocol: Direct
   217      transport:`), os.FileMode(0755)); err != nil {
   218  		t.Fatal(err)
   219  	}
   220  
   221  	// Write the configuration for tracing to a file
   222  	tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
   223  	if err != nil {
   224  		t.Fatal(err)
   225  	}
   226  	defer utiltesting.CloseAndRemove(t, tracingConfigFile)
   227  
   228  	if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
   229  apiVersion: apiserver.config.k8s.io/v1beta1
   230  kind: TracingConfiguration
   231  endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
   232  		t.Fatal(err)
   233  	}
   234  
   235  	// Start the API Server with our tracing configuration
   236  	testServer := kubeapiservertesting.StartTestServerOrDie(t,
   237  		kubeapiservertesting.NewDefaultTestServerOptions(),
   238  		[]string{
   239  			"--tracing-config-file=" + tracingConfigFile.Name(),
   240  			"--egress-selector-config-file=" + egressSelectorConfigFile.Name(),
   241  		},
   242  		framework.SharedEtcd(),
   243  	)
   244  	defer testServer.TearDownFn()
   245  	clientSet, err := client.NewForConfig(testServer.ClientConfig)
   246  	if err != nil {
   247  		t.Fatal(err)
   248  	}
   249  	// Make sure the API Server hasn't crashed.
   250  	_, err = clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
   251  	if err != nil {
   252  		t.Fatal(err)
   253  	}
   254  }
   255  
   256  func TestAPIServerTracing(t *testing.T) {
   257  	// Listen for traces from the API Server before starting it, so the
   258  	// API Server will successfully connect right away during the test.
   259  	listener, err := net.Listen("tcp", "localhost:")
   260  	if err != nil {
   261  		t.Fatal(err)
   262  	}
   263  	// Write the configuration for tracing to a file
   264  	tracingConfigFile, err := os.CreateTemp("", "tracing-config.yaml")
   265  	if err != nil {
   266  		t.Fatal(err)
   267  	}
   268  	defer os.Remove(tracingConfigFile.Name())
   269  
   270  	if err := os.WriteFile(tracingConfigFile.Name(), []byte(fmt.Sprintf(`
   271  apiVersion: apiserver.config.k8s.io/v1beta1
   272  kind: TracingConfiguration
   273  samplingRatePerMillion: 1000000
   274  endpoint: %s`, listener.Addr().String())), os.FileMode(0755)); err != nil {
   275  		t.Fatal(err)
   276  	}
   277  
   278  	srv := grpc.NewServer()
   279  	fakeServer := &traceServer{t: t}
   280  	fakeServer.resetExpectations([]*spanExpectation{})
   281  	traceservice.RegisterTraceServiceServer(srv, fakeServer)
   282  
   283  	go srv.Serve(listener)
   284  	defer srv.Stop()
   285  
   286  	// Start the API Server with our tracing configuration
   287  	testServer := kubeapiservertesting.StartTestServerOrDie(t,
   288  		kubeapiservertesting.NewDefaultTestServerOptions(),
   289  		[]string{"--tracing-config-file=" + tracingConfigFile.Name()},
   290  		framework.SharedEtcd(),
   291  	)
   292  	defer testServer.TearDownFn()
   293  	clientSet, err := client.NewForConfig(testServer.ClientConfig)
   294  	if err != nil {
   295  		t.Fatal(err)
   296  	}
   297  
   298  	for _, tc := range []struct {
   299  		desc          string
   300  		apiCall       func(*client.Clientset) error
   301  		expectedTrace []*spanExpectation
   302  	}{
   303  		{
   304  			desc: "create node",
   305  			apiCall: func(c *client.Clientset) error {
   306  				_, err = clientSet.CoreV1().Nodes().Create(context.Background(),
   307  					&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "fake"}}, metav1.CreateOptions{})
   308  				return err
   309  			},
   310  			expectedTrace: []*spanExpectation{
   311  				{
   312  					name: "KubernetesAPI",
   313  					attributes: map[string]func(*commonv1.AnyValue) bool{
   314  						"http.user_agent": func(v *commonv1.AnyValue) bool {
   315  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   316  						},
   317  						"http.target": func(v *commonv1.AnyValue) bool {
   318  							return v.GetStringValue() == "/api/v1/nodes"
   319  						},
   320  						"http.method": func(v *commonv1.AnyValue) bool {
   321  							return v.GetStringValue() == "POST"
   322  						},
   323  					},
   324  				},
   325  				{
   326  					name: "authentication",
   327  				},
   328  				{
   329  					name: "Create",
   330  					attributes: map[string]func(*commonv1.AnyValue) bool{
   331  						"url": func(v *commonv1.AnyValue) bool {
   332  							return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes")
   333  						},
   334  						"user-agent": func(v *commonv1.AnyValue) bool {
   335  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   336  						},
   337  						"audit-id": func(v *commonv1.AnyValue) bool {
   338  							return v.GetStringValue() != ""
   339  						},
   340  						"client": func(v *commonv1.AnyValue) bool {
   341  							return v.GetStringValue() == "127.0.0.1"
   342  						},
   343  						"accept": func(v *commonv1.AnyValue) bool {
   344  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
   345  						},
   346  						"protocol": func(v *commonv1.AnyValue) bool {
   347  							return v.GetStringValue() == "HTTP/2.0"
   348  						},
   349  					},
   350  					events: []string{
   351  						"limitedReadBody succeeded",
   352  						"About to convert to expected version",
   353  						"Conversion done",
   354  						"About to store object in database",
   355  						"Write to database call succeeded",
   356  						"About to write a response",
   357  						"Writing http response done",
   358  					},
   359  				},
   360  				{
   361  					name: "Create etcd3",
   362  					attributes: map[string]func(*commonv1.AnyValue) bool{
   363  						"audit-id": func(v *commonv1.AnyValue) bool {
   364  							return v.GetStringValue() != ""
   365  						},
   366  						"key": func(v *commonv1.AnyValue) bool {
   367  							return v.GetStringValue() == "/minions/fake"
   368  						},
   369  						"type": func(v *commonv1.AnyValue) bool {
   370  							return v.GetStringValue() == "*core.Node"
   371  						},
   372  						"resource": func(v *commonv1.AnyValue) bool {
   373  							return v.GetStringValue() == "nodes"
   374  						},
   375  					},
   376  					events: []string{
   377  						"About to Encode",
   378  						"Encode succeeded",
   379  						"TransformToStorage succeeded",
   380  						"Txn call succeeded",
   381  						"decode succeeded",
   382  					},
   383  				},
   384  				{
   385  					name: "etcdserverpb.KV/Txn",
   386  					attributes: map[string]func(*commonv1.AnyValue) bool{
   387  						"rpc.system": func(v *commonv1.AnyValue) bool {
   388  							return v.GetStringValue() == "grpc"
   389  						},
   390  					},
   391  					events: []string{"message"},
   392  				},
   393  				{
   394  					name: "SerializeObject",
   395  					attributes: map[string]func(*commonv1.AnyValue) bool{
   396  						"url": func(v *commonv1.AnyValue) bool {
   397  							return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes")
   398  						},
   399  						"audit-id": func(v *commonv1.AnyValue) bool {
   400  							return v.GetStringValue() != ""
   401  						},
   402  						"protocol": func(v *commonv1.AnyValue) bool {
   403  							return v.GetStringValue() == "HTTP/2.0"
   404  						},
   405  						"method": func(v *commonv1.AnyValue) bool {
   406  							return v.GetStringValue() == "POST"
   407  						},
   408  						"mediaType": func(v *commonv1.AnyValue) bool {
   409  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
   410  						},
   411  						"encoder": func(v *commonv1.AnyValue) bool {
   412  							return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
   413  						},
   414  					},
   415  					events: []string{
   416  						"About to start writing response",
   417  						"Write call succeeded",
   418  					},
   419  				},
   420  			},
   421  		},
   422  		{
   423  			desc: "get node",
   424  			apiCall: func(c *client.Clientset) error {
   425  				// This depends on the "create node" step having completed successfully
   426  				_, err = clientSet.CoreV1().Nodes().Get(context.Background(), "fake", metav1.GetOptions{})
   427  				return err
   428  			},
   429  			expectedTrace: []*spanExpectation{
   430  				{
   431  					name: "KubernetesAPI",
   432  					attributes: map[string]func(*commonv1.AnyValue) bool{
   433  						"http.user_agent": func(v *commonv1.AnyValue) bool {
   434  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   435  						},
   436  						"http.target": func(v *commonv1.AnyValue) bool {
   437  							return v.GetStringValue() == "/api/v1/nodes/fake"
   438  						},
   439  						"http.method": func(v *commonv1.AnyValue) bool {
   440  							return v.GetStringValue() == "GET"
   441  						},
   442  					},
   443  				},
   444  				{
   445  					name: "authentication",
   446  				},
   447  				{
   448  					name: "Get",
   449  					attributes: map[string]func(*commonv1.AnyValue) bool{
   450  						"url": func(v *commonv1.AnyValue) bool {
   451  							return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
   452  						},
   453  						"user-agent": func(v *commonv1.AnyValue) bool {
   454  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   455  						},
   456  						"audit-id": func(v *commonv1.AnyValue) bool {
   457  							return v.GetStringValue() != ""
   458  						},
   459  						"client": func(v *commonv1.AnyValue) bool {
   460  							return v.GetStringValue() == "127.0.0.1"
   461  						},
   462  						"accept": func(v *commonv1.AnyValue) bool {
   463  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
   464  						},
   465  						"protocol": func(v *commonv1.AnyValue) bool {
   466  							return v.GetStringValue() == "HTTP/2.0"
   467  						},
   468  					},
   469  					events: []string{
   470  						"About to Get from storage",
   471  						"About to write a response",
   472  						"Writing http response done",
   473  					},
   474  				},
   475  				{
   476  					name: "etcdserverpb.KV/Range",
   477  					attributes: map[string]func(*commonv1.AnyValue) bool{
   478  						"rpc.system": func(v *commonv1.AnyValue) bool {
   479  							return v.GetStringValue() == "grpc"
   480  						},
   481  					},
   482  					events: []string{"message"},
   483  				},
   484  				{
   485  					name: "SerializeObject",
   486  					attributes: map[string]func(*commonv1.AnyValue) bool{
   487  						"url": func(v *commonv1.AnyValue) bool {
   488  							return v.GetStringValue() == "/api/v1/nodes/fake"
   489  						},
   490  						"audit-id": func(v *commonv1.AnyValue) bool {
   491  							return v.GetStringValue() != ""
   492  						},
   493  						"protocol": func(v *commonv1.AnyValue) bool {
   494  							return v.GetStringValue() == "HTTP/2.0"
   495  						},
   496  						"method": func(v *commonv1.AnyValue) bool {
   497  							return v.GetStringValue() == "GET"
   498  						},
   499  						"mediaType": func(v *commonv1.AnyValue) bool {
   500  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
   501  						},
   502  						"encoder": func(v *commonv1.AnyValue) bool {
   503  							return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
   504  						},
   505  					},
   506  					events: []string{
   507  						"About to start writing response",
   508  						"Write call succeeded",
   509  					},
   510  				},
   511  			},
   512  		},
   513  		{
   514  			desc: "list nodes",
   515  			apiCall: func(c *client.Clientset) error {
   516  				_, err = clientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
   517  				return err
   518  			},
   519  			expectedTrace: []*spanExpectation{
   520  				{
   521  					name: "KubernetesAPI",
   522  					attributes: map[string]func(*commonv1.AnyValue) bool{
   523  						"http.user_agent": func(v *commonv1.AnyValue) bool {
   524  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   525  						},
   526  						"http.target": func(v *commonv1.AnyValue) bool {
   527  							return v.GetStringValue() == "/api/v1/nodes"
   528  						},
   529  						"http.method": func(v *commonv1.AnyValue) bool {
   530  							return v.GetStringValue() == "GET"
   531  						},
   532  					},
   533  				},
   534  				{
   535  					name: "authentication",
   536  				},
   537  				{
   538  					name: "List",
   539  					attributes: map[string]func(*commonv1.AnyValue) bool{
   540  						"url": func(v *commonv1.AnyValue) bool {
   541  							return v.GetStringValue() == "/api/v1/nodes"
   542  						},
   543  						"user-agent": func(v *commonv1.AnyValue) bool {
   544  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   545  						},
   546  						"audit-id": func(v *commonv1.AnyValue) bool {
   547  							return v.GetStringValue() != ""
   548  						},
   549  						"client": func(v *commonv1.AnyValue) bool {
   550  							return v.GetStringValue() == "127.0.0.1"
   551  						},
   552  						"accept": func(v *commonv1.AnyValue) bool {
   553  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
   554  						},
   555  						"protocol": func(v *commonv1.AnyValue) bool {
   556  							return v.GetStringValue() == "HTTP/2.0"
   557  						},
   558  					},
   559  					events: []string{
   560  						"About to List from storage",
   561  						"Listing from storage done",
   562  						"Writing http response done",
   563  					},
   564  				},
   565  				{
   566  					name: "List(recursive=true) etcd3",
   567  					attributes: map[string]func(*commonv1.AnyValue) bool{
   568  						"audit-id": func(v *commonv1.AnyValue) bool {
   569  							return v.GetStringValue() != ""
   570  						},
   571  						"key": func(v *commonv1.AnyValue) bool {
   572  							return v.GetStringValue() == "/minions"
   573  						},
   574  						"resourceVersion": func(v *commonv1.AnyValue) bool {
   575  							return v.GetStringValue() == ""
   576  						},
   577  						"resourceVersionMatch": func(v *commonv1.AnyValue) bool {
   578  							return v.GetStringValue() == ""
   579  						},
   580  						"limit": func(v *commonv1.AnyValue) bool {
   581  							return v.GetIntValue() == 0
   582  						},
   583  						"continue": func(v *commonv1.AnyValue) bool {
   584  							return v.GetStringValue() == ""
   585  						},
   586  					},
   587  				},
   588  				{
   589  					name: "etcdserverpb.KV/Range",
   590  					attributes: map[string]func(*commonv1.AnyValue) bool{
   591  						"rpc.system": func(v *commonv1.AnyValue) bool {
   592  							return v.GetStringValue() == "grpc"
   593  						},
   594  					},
   595  					events: []string{"message"},
   596  				},
   597  				{
   598  					name: "SerializeObject",
   599  					attributes: map[string]func(*commonv1.AnyValue) bool{
   600  						"url": func(v *commonv1.AnyValue) bool {
   601  							return v.GetStringValue() == "/api/v1/nodes"
   602  						},
   603  						"audit-id": func(v *commonv1.AnyValue) bool {
   604  							return v.GetStringValue() != ""
   605  						},
   606  						"protocol": func(v *commonv1.AnyValue) bool {
   607  							return v.GetStringValue() == "HTTP/2.0"
   608  						},
   609  						"method": func(v *commonv1.AnyValue) bool {
   610  							return v.GetStringValue() == "GET"
   611  						},
   612  						"mediaType": func(v *commonv1.AnyValue) bool {
   613  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
   614  						},
   615  						"encoder": func(v *commonv1.AnyValue) bool {
   616  							return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
   617  						},
   618  					},
   619  					events: []string{
   620  						"About to start writing response",
   621  						"Write call succeeded",
   622  					},
   623  				},
   624  			},
   625  		},
   626  		{
   627  			desc: "update node",
   628  			apiCall: func(c *client.Clientset) error {
   629  				// This depends on the "create node" step having completed successfully
   630  				_, err = clientSet.CoreV1().Nodes().Update(context.Background(),
   631  					&v1.Node{ObjectMeta: metav1.ObjectMeta{
   632  						Name:        "fake",
   633  						Annotations: map[string]string{"foo": "bar"},
   634  					}}, metav1.UpdateOptions{})
   635  				return err
   636  			},
   637  			expectedTrace: []*spanExpectation{
   638  				{
   639  					name: "KubernetesAPI",
   640  					attributes: map[string]func(*commonv1.AnyValue) bool{
   641  						"http.user_agent": func(v *commonv1.AnyValue) bool {
   642  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   643  						},
   644  						"http.target": func(v *commonv1.AnyValue) bool {
   645  							return v.GetStringValue() == "/api/v1/nodes/fake"
   646  						},
   647  						"http.method": func(v *commonv1.AnyValue) bool {
   648  							return v.GetStringValue() == "PUT"
   649  						},
   650  					},
   651  				},
   652  				{
   653  					name: "authentication",
   654  				},
   655  				{
   656  					name: "Update",
   657  					attributes: map[string]func(*commonv1.AnyValue) bool{
   658  						"url": func(v *commonv1.AnyValue) bool {
   659  							return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
   660  						},
   661  						"user-agent": func(v *commonv1.AnyValue) bool {
   662  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   663  						},
   664  						"audit-id": func(v *commonv1.AnyValue) bool {
   665  							return v.GetStringValue() != ""
   666  						},
   667  						"client": func(v *commonv1.AnyValue) bool {
   668  							return v.GetStringValue() == "127.0.0.1"
   669  						},
   670  						"accept": func(v *commonv1.AnyValue) bool {
   671  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
   672  						},
   673  						"protocol": func(v *commonv1.AnyValue) bool {
   674  							return v.GetStringValue() == "HTTP/2.0"
   675  						},
   676  					},
   677  					events: []string{
   678  						"limitedReadBody succeeded",
   679  						"About to convert to expected version",
   680  						"Conversion done",
   681  						"About to store object in database",
   682  						"Write to database call succeeded",
   683  						"About to write a response",
   684  						"Writing http response done",
   685  					},
   686  				},
   687  				{
   688  					name: "GuaranteedUpdate etcd3",
   689  					attributes: map[string]func(*commonv1.AnyValue) bool{
   690  						"audit-id": func(v *commonv1.AnyValue) bool {
   691  							return v.GetStringValue() != ""
   692  						},
   693  						"key": func(v *commonv1.AnyValue) bool {
   694  							return v.GetStringValue() == "/minions/fake"
   695  						},
   696  						"type": func(v *commonv1.AnyValue) bool {
   697  							return v.GetStringValue() == "*core.Node"
   698  						},
   699  						"resource": func(v *commonv1.AnyValue) bool {
   700  							return v.GetStringValue() == "nodes"
   701  						},
   702  					},
   703  					events: []string{
   704  						"initial value restored",
   705  						"About to Encode",
   706  						"Encode succeeded",
   707  						"TransformToStorage succeeded",
   708  						"Transaction prepared",
   709  						"Txn call completed",
   710  						"Transaction committed",
   711  						"decode succeeded",
   712  					},
   713  				},
   714  				{
   715  					name: "etcdserverpb.KV/Txn",
   716  					attributes: map[string]func(*commonv1.AnyValue) bool{
   717  						"rpc.system": func(v *commonv1.AnyValue) bool {
   718  							return v.GetStringValue() == "grpc"
   719  						},
   720  					},
   721  					events: []string{"message"},
   722  				},
   723  				{
   724  					name: "SerializeObject",
   725  					attributes: map[string]func(*commonv1.AnyValue) bool{
   726  						"url": func(v *commonv1.AnyValue) bool {
   727  							return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
   728  						},
   729  						"audit-id": func(v *commonv1.AnyValue) bool {
   730  							return v.GetStringValue() != ""
   731  						},
   732  						"protocol": func(v *commonv1.AnyValue) bool {
   733  							return v.GetStringValue() == "HTTP/2.0"
   734  						},
   735  						"method": func(v *commonv1.AnyValue) bool {
   736  							return v.GetStringValue() == "PUT"
   737  						},
   738  						"mediaType": func(v *commonv1.AnyValue) bool {
   739  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
   740  						},
   741  						"encoder": func(v *commonv1.AnyValue) bool {
   742  							return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
   743  						},
   744  					},
   745  					events: []string{
   746  						"About to start writing response",
   747  						"Write call succeeded",
   748  					},
   749  				},
   750  			},
   751  		},
   752  		{
   753  			desc: "patch node",
   754  			apiCall: func(c *client.Clientset) error {
   755  				// This depends on the "create node" step having completed successfully
   756  				oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{
   757  					Name:        "fake",
   758  					Annotations: map[string]string{"foo": "bar"},
   759  				}}
   760  				oldData, err := json.Marshal(oldNode)
   761  				if err != nil {
   762  					return err
   763  				}
   764  				newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{
   765  					Name:        "fake",
   766  					Annotations: map[string]string{"foo": "bar"},
   767  					Labels:      map[string]string{"hello": "world"},
   768  				}}
   769  				newData, err := json.Marshal(newNode)
   770  				if err != nil {
   771  					return err
   772  				}
   773  
   774  				patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
   775  				if err != nil {
   776  					return err
   777  				}
   778  				_, err = clientSet.CoreV1().Nodes().Patch(context.Background(), "fake", types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
   779  				return err
   780  			},
   781  			expectedTrace: []*spanExpectation{
   782  				{
   783  					name: "KubernetesAPI",
   784  					attributes: map[string]func(*commonv1.AnyValue) bool{
   785  						"http.user_agent": func(v *commonv1.AnyValue) bool {
   786  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   787  						},
   788  						"http.target": func(v *commonv1.AnyValue) bool {
   789  							return v.GetStringValue() == "/api/v1/nodes/fake"
   790  						},
   791  						"http.method": func(v *commonv1.AnyValue) bool {
   792  							return v.GetStringValue() == "PATCH"
   793  						},
   794  					},
   795  				},
   796  				{
   797  					name: "authentication",
   798  				},
   799  				{
   800  					name: "Patch",
   801  					attributes: map[string]func(*commonv1.AnyValue) bool{
   802  						"url": func(v *commonv1.AnyValue) bool {
   803  							return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
   804  						},
   805  						"user-agent": func(v *commonv1.AnyValue) bool {
   806  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   807  						},
   808  						"audit-id": func(v *commonv1.AnyValue) bool {
   809  							return v.GetStringValue() != ""
   810  						},
   811  						"client": func(v *commonv1.AnyValue) bool {
   812  							return v.GetStringValue() == "127.0.0.1"
   813  						},
   814  						"accept": func(v *commonv1.AnyValue) bool {
   815  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
   816  						},
   817  						"protocol": func(v *commonv1.AnyValue) bool {
   818  							return v.GetStringValue() == "HTTP/2.0"
   819  						},
   820  					},
   821  					events: []string{
   822  						"limitedReadBody succeeded",
   823  						"Recorded the audit event",
   824  						"About to apply patch",
   825  						"About to check admission control",
   826  						"Object stored in database",
   827  						"About to write a response",
   828  						"Writing http response done",
   829  					},
   830  				},
   831  				{
   832  					name: "GuaranteedUpdate etcd3",
   833  					attributes: map[string]func(*commonv1.AnyValue) bool{
   834  						"audit-id": func(v *commonv1.AnyValue) bool {
   835  							return v.GetStringValue() != ""
   836  						},
   837  						"key": func(v *commonv1.AnyValue) bool {
   838  							return v.GetStringValue() == "/minions/fake"
   839  						},
   840  						"type": func(v *commonv1.AnyValue) bool {
   841  							return v.GetStringValue() == "*core.Node"
   842  						},
   843  						"resource": func(v *commonv1.AnyValue) bool {
   844  							return v.GetStringValue() == "nodes"
   845  						},
   846  					},
   847  					events: []string{
   848  						"initial value restored",
   849  						"About to Encode",
   850  						"Encode succeeded",
   851  						"TransformToStorage succeeded",
   852  						"Transaction prepared",
   853  						"Txn call completed",
   854  						"Transaction committed",
   855  						"decode succeeded",
   856  					},
   857  				},
   858  				{
   859  					name: "etcdserverpb.KV/Txn",
   860  					attributes: map[string]func(*commonv1.AnyValue) bool{
   861  						"rpc.system": func(v *commonv1.AnyValue) bool {
   862  							return v.GetStringValue() == "grpc"
   863  						},
   864  					},
   865  					events: []string{"message"},
   866  				},
   867  				{
   868  					name: "SerializeObject",
   869  					attributes: map[string]func(*commonv1.AnyValue) bool{
   870  						"url": func(v *commonv1.AnyValue) bool {
   871  							return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
   872  						},
   873  						"audit-id": func(v *commonv1.AnyValue) bool {
   874  							return v.GetStringValue() != ""
   875  						},
   876  						"protocol": func(v *commonv1.AnyValue) bool {
   877  							return v.GetStringValue() == "HTTP/2.0"
   878  						},
   879  						"method": func(v *commonv1.AnyValue) bool {
   880  							return v.GetStringValue() == "PATCH"
   881  						},
   882  						"mediaType": func(v *commonv1.AnyValue) bool {
   883  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
   884  						},
   885  						"encoder": func(v *commonv1.AnyValue) bool {
   886  							return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
   887  						},
   888  					},
   889  					events: []string{
   890  						"About to start writing response",
   891  						"Write call succeeded",
   892  					},
   893  				},
   894  			},
   895  		},
   896  		{
   897  			desc: "delete node",
   898  			apiCall: func(c *client.Clientset) error {
   899  				// This depends on the "create node" step having completed successfully
   900  				return clientSet.CoreV1().Nodes().Delete(context.Background(), "fake", metav1.DeleteOptions{})
   901  			},
   902  			expectedTrace: []*spanExpectation{
   903  				{
   904  					name: "KubernetesAPI",
   905  					attributes: map[string]func(*commonv1.AnyValue) bool{
   906  						"http.user_agent": func(v *commonv1.AnyValue) bool {
   907  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   908  						},
   909  						"http.target": func(v *commonv1.AnyValue) bool {
   910  							return v.GetStringValue() == "/api/v1/nodes/fake"
   911  						},
   912  						"http.method": func(v *commonv1.AnyValue) bool {
   913  							return v.GetStringValue() == "DELETE"
   914  						},
   915  					},
   916  				},
   917  				{
   918  					name: "authentication",
   919  				},
   920  				{
   921  					name: "Delete",
   922  					attributes: map[string]func(*commonv1.AnyValue) bool{
   923  						"url": func(v *commonv1.AnyValue) bool {
   924  							return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
   925  						},
   926  						"user-agent": func(v *commonv1.AnyValue) bool {
   927  							return strings.HasPrefix(v.GetStringValue(), "tracing.test")
   928  						},
   929  						"audit-id": func(v *commonv1.AnyValue) bool {
   930  							return v.GetStringValue() != ""
   931  						},
   932  						"client": func(v *commonv1.AnyValue) bool {
   933  							return v.GetStringValue() == "127.0.0.1"
   934  						},
   935  						"accept": func(v *commonv1.AnyValue) bool {
   936  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf, */*"
   937  						},
   938  						"protocol": func(v *commonv1.AnyValue) bool {
   939  							return v.GetStringValue() == "HTTP/2.0"
   940  						},
   941  					},
   942  					events: []string{
   943  						"limitedReadBody succeeded",
   944  						"Decoded delete options",
   945  						"Recorded the audit event",
   946  						"About to delete object from database",
   947  						"Object deleted from database",
   948  						"About to write a response",
   949  						"Writing http response done",
   950  					},
   951  				},
   952  				{
   953  					name: "etcdserverpb.KV/Txn",
   954  					attributes: map[string]func(*commonv1.AnyValue) bool{
   955  						"rpc.system": func(v *commonv1.AnyValue) bool {
   956  							return v.GetStringValue() == "grpc"
   957  						},
   958  					},
   959  					events: []string{"message"},
   960  				},
   961  				{
   962  					name: "SerializeObject",
   963  					attributes: map[string]func(*commonv1.AnyValue) bool{
   964  						"url": func(v *commonv1.AnyValue) bool {
   965  							return strings.HasSuffix(v.GetStringValue(), "/api/v1/nodes/fake")
   966  						},
   967  						"audit-id": func(v *commonv1.AnyValue) bool {
   968  							return v.GetStringValue() != ""
   969  						},
   970  						"protocol": func(v *commonv1.AnyValue) bool {
   971  							return v.GetStringValue() == "HTTP/2.0"
   972  						},
   973  						"method": func(v *commonv1.AnyValue) bool {
   974  							return v.GetStringValue() == "DELETE"
   975  						},
   976  						"mediaType": func(v *commonv1.AnyValue) bool {
   977  							return v.GetStringValue() == "application/vnd.kubernetes.protobuf"
   978  						},
   979  						"encoder": func(v *commonv1.AnyValue) bool {
   980  							return v.GetStringValue() == "{\"encodeGV\":\"v1\",\"encoder\":\"protobuf\",\"name\":\"versioning\"}"
   981  						},
   982  					},
   983  					events: []string{
   984  						"About to start writing response",
   985  						"Write call succeeded",
   986  					},
   987  				},
   988  			},
   989  		},
   990  	} {
   991  		t.Run(tc.desc, func(t *testing.T) {
   992  			fakeServer.resetExpectations(tc.expectedTrace)
   993  
   994  			// Make our call to the API server
   995  			if err := tc.apiCall(clientSet); err != nil {
   996  				t.Fatal(err)
   997  			}
   998  
   999  			// Wait for a span to be recorded from our request
  1000  			select {
  1001  			case <-fakeServer.traceFound:
  1002  			case <-time.After(30 * time.Second):
  1003  				t.Fatal("Timed out waiting for trace")
  1004  			}
  1005  		})
  1006  	}
  1007  }
  1008  
  1009  // traceServer implements TracesServiceServer, which can receive spans from the
  1010  // API Server via OTLP.
  1011  type traceServer struct {
  1012  	t *testing.T
  1013  	traceservice.UnimplementedTraceServiceServer
  1014  	// the lock guards the per-scenario state below
  1015  	lock         sync.Mutex
  1016  	traceFound   chan struct{}
  1017  	expectations traceExpectation
  1018  }
  1019  
  1020  func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) {
  1021  	t.lock.Lock()
  1022  	defer t.lock.Unlock()
  1023  
  1024  	t.expectations.update(req)
  1025  	// if all expectations are met, notify the test scenario by closing traceFound
  1026  	if t.expectations.met() {
  1027  		select {
  1028  		case <-t.traceFound:
  1029  			// traceFound is already closed
  1030  		default:
  1031  			close(t.traceFound)
  1032  		}
  1033  	}
  1034  	return &traceservice.ExportTraceServiceResponse{}, nil
  1035  }
  1036  
  1037  // resetExpectations is used by a new test scenario to set new expectations for
  1038  // the test server.
  1039  func (t *traceServer) resetExpectations(newExpectations traceExpectation) {
  1040  	t.lock.Lock()
  1041  	defer t.lock.Unlock()
  1042  	t.traceFound = make(chan struct{})
  1043  	t.expectations = newExpectations
  1044  }
  1045  
  1046  // traceExpectation is an expectation for an entire trace
  1047  type traceExpectation []*spanExpectation
  1048  
  1049  // met returns true if all span expectations the server is looking for have
  1050  // been satisfied.
  1051  func (t traceExpectation) met() bool {
  1052  	if len(t) == 0 {
  1053  		return true
  1054  	}
  1055  	// we want to find any trace ID which all span IDs contain.
  1056  	// try each trace ID met by the first span.
  1057  	possibleTraceIDs := t[0].metTraceIDs
  1058  	for _, tid := range possibleTraceIDs {
  1059  		if t.contains(tid) {
  1060  			return true
  1061  		}
  1062  	}
  1063  	return false
  1064  }
  1065  
  1066  // contains returns true if the all spans in the trace expectation contain the
  1067  // trace ID
  1068  func (t traceExpectation) contains(checkTID string) bool {
  1069  	for _, expectation := range t {
  1070  		if !expectation.contains(checkTID) {
  1071  			return false
  1072  		}
  1073  	}
  1074  	return true
  1075  }
  1076  
  1077  // update finds all expectations that are met by a span in the
  1078  // incoming request.
  1079  func (t traceExpectation) update(req *traceservice.ExportTraceServiceRequest) {
  1080  	for _, resourceSpans := range req.GetResourceSpans() {
  1081  		for _, instrumentationSpans := range resourceSpans.GetScopeSpans() {
  1082  			for _, span := range instrumentationSpans.GetSpans() {
  1083  				t.updateForSpan(span)
  1084  			}
  1085  		}
  1086  	}
  1087  }
  1088  
  1089  // updateForSpan updates expectations based on a single incoming span.
  1090  func (t traceExpectation) updateForSpan(span *tracev1.Span) {
  1091  	for i, spanExpectation := range t {
  1092  		if span.Name != spanExpectation.name {
  1093  			continue
  1094  		}
  1095  		if !spanExpectation.attributes.matches(span.GetAttributes()) {
  1096  			continue
  1097  		}
  1098  		if !spanExpectation.events.matches(span.GetEvents()) {
  1099  			continue
  1100  		}
  1101  		t[i].metTraceIDs = append(spanExpectation.metTraceIDs, hex.EncodeToString(span.TraceId[:]))
  1102  	}
  1103  
  1104  }
  1105  
  1106  // spanExpectation is the expectation for a single span
  1107  type spanExpectation struct {
  1108  	name       string
  1109  	attributes attributeExpectation
  1110  	events     eventExpectation
  1111  	// For each trace ID that meets this expectation, record it here.
  1112  	// This way, we can ensure that all spans that should be in the same trace have the same trace ID
  1113  	metTraceIDs []string
  1114  }
  1115  
  1116  func (s *spanExpectation) contains(tid string) bool {
  1117  	for _, metTID := range s.metTraceIDs {
  1118  		if tid == metTID {
  1119  			return true
  1120  		}
  1121  	}
  1122  	return false
  1123  }
  1124  
  1125  // eventExpectation is the expectation for an event attached to a span.
  1126  // It is comprised of event names.
  1127  type eventExpectation []string
  1128  
  1129  // matches returns true if all expected events exist in the list of input events.
  1130  func (e eventExpectation) matches(events []*tracev1.Span_Event) bool {
  1131  	eventMap := map[string]struct{}{}
  1132  	for _, event := range events {
  1133  		eventMap[event.Name] = struct{}{}
  1134  	}
  1135  	for _, wantEvent := range e {
  1136  		if _, ok := eventMap[wantEvent]; !ok {
  1137  			return false
  1138  		}
  1139  	}
  1140  	return true
  1141  }
  1142  
  1143  // eventExpectation is the expectation for an event attached to a span.
  1144  // It is a map from attribute key, to a value-matching function.
  1145  type attributeExpectation map[string]func(*commonv1.AnyValue) bool
  1146  
  1147  // matches returns true if all expected attributes exist in the intput list of attributes.
  1148  func (a attributeExpectation) matches(attrs []*commonv1.KeyValue) bool {
  1149  	attrsMap := map[string]*commonv1.AnyValue{}
  1150  	for _, attr := range attrs {
  1151  		attrsMap[attr.GetKey()] = attr.GetValue()
  1152  	}
  1153  	for key, checkVal := range a {
  1154  		if val, ok := attrsMap[key]; !ok || !checkVal(val) {
  1155  			return false
  1156  		}
  1157  	}
  1158  	return true
  1159  }
  1160  

View as plain text