...

Source file src/k8s.io/client-go/tools/events/eventseries_test.go

Documentation: k8s.io/client-go/tools/events

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package events
    18  
    19  import (
    20  	"context"
    21  	"strconv"
    22  	"testing"
    23  	"time"
    24  
    25  	"os"
    26  	"strings"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	eventsv1 "k8s.io/api/events/v1"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	k8sruntime "k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	fake "k8s.io/client-go/kubernetes/fake"
    34  	"k8s.io/client-go/kubernetes/scheme"
    35  	restclient "k8s.io/client-go/rest"
    36  	ref "k8s.io/client-go/tools/reference"
    37  	"k8s.io/klog/v2/ktesting"
    38  )
    39  
    40  type testEventSeriesSink struct {
    41  	OnCreate func(e *eventsv1.Event) (*eventsv1.Event, error)
    42  	OnUpdate func(e *eventsv1.Event) (*eventsv1.Event, error)
    43  	OnPatch  func(e *eventsv1.Event, p []byte) (*eventsv1.Event, error)
    44  }
    45  
    46  // Create records the event for testing.
    47  func (t *testEventSeriesSink) Create(ctx context.Context, e *eventsv1.Event) (*eventsv1.Event, error) {
    48  	if t.OnCreate != nil {
    49  		return t.OnCreate(e)
    50  	}
    51  	return e, nil
    52  }
    53  
    54  // Update records the event for testing.
    55  func (t *testEventSeriesSink) Update(ctx context.Context, e *eventsv1.Event) (*eventsv1.Event, error) {
    56  	if t.OnUpdate != nil {
    57  		return t.OnUpdate(e)
    58  	}
    59  	return e, nil
    60  }
    61  
    62  // Patch records the event for testing.
    63  func (t *testEventSeriesSink) Patch(ctx context.Context, e *eventsv1.Event, p []byte) (*eventsv1.Event, error) {
    64  	if t.OnPatch != nil {
    65  		return t.OnPatch(e, p)
    66  	}
    67  	return e, nil
    68  }
    69  
    70  func TestEventSeriesf(t *testing.T) {
    71  	hostname, _ := os.Hostname()
    72  
    73  	testPod := &v1.Pod{
    74  		ObjectMeta: metav1.ObjectMeta{
    75  			Name:      "foo",
    76  			Namespace: "baz",
    77  			UID:       "bar",
    78  		},
    79  	}
    80  
    81  	regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
    82  	if err != nil {
    83  		t.Fatal(err)
    84  	}
    85  
    86  	related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
    87  	if err != nil {
    88  		t.Fatal(err)
    89  	}
    90  
    91  	expectedEvent := &eventsv1.Event{
    92  		ObjectMeta: metav1.ObjectMeta{
    93  			Name:      "foo",
    94  			Namespace: "baz",
    95  		},
    96  		EventTime:           metav1.MicroTime{Time: time.Now()},
    97  		ReportingController: "eventTest",
    98  		ReportingInstance:   "eventTest-" + hostname,
    99  		Action:              "started",
   100  		Reason:              "test",
   101  		Regarding:           *regarding,
   102  		Related:             related,
   103  		Note:                "some verbose message: 1",
   104  		Type:                v1.EventTypeNormal,
   105  	}
   106  
   107  	isomorphicEvent := expectedEvent.DeepCopy()
   108  
   109  	nonIsomorphicEvent := expectedEvent.DeepCopy()
   110  	nonIsomorphicEvent.Action = "stopped"
   111  
   112  	expectedEvent.Series = &eventsv1.EventSeries{Count: 2}
   113  	table := []struct {
   114  		regarding    k8sruntime.Object
   115  		related      k8sruntime.Object
   116  		actual       *eventsv1.Event
   117  		elements     []interface{}
   118  		expect       *eventsv1.Event
   119  		expectUpdate bool
   120  	}{
   121  		{
   122  			regarding:    regarding,
   123  			related:      related,
   124  			actual:       isomorphicEvent,
   125  			elements:     []interface{}{1},
   126  			expect:       expectedEvent,
   127  			expectUpdate: true,
   128  		},
   129  		{
   130  			regarding:    regarding,
   131  			related:      related,
   132  			actual:       nonIsomorphicEvent,
   133  			elements:     []interface{}{1},
   134  			expect:       nonIsomorphicEvent,
   135  			expectUpdate: false,
   136  		},
   137  	}
   138  
   139  	_, ctx := ktesting.NewTestContext(t)
   140  	ctx, cancel := context.WithCancel(ctx)
   141  	defer cancel()
   142  
   143  	createEvent := make(chan *eventsv1.Event)
   144  	updateEvent := make(chan *eventsv1.Event)
   145  	patchEvent := make(chan *eventsv1.Event)
   146  
   147  	testEvents := testEventSeriesSink{
   148  		OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
   149  			createEvent <- event
   150  			return event, nil
   151  		},
   152  		OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
   153  			updateEvent <- event
   154  			return event, nil
   155  		},
   156  		OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
   157  			// event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here
   158  			// we'll use it directly
   159  			patchEvent <- event
   160  			return event, nil
   161  		},
   162  	}
   163  	eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*eventsv1.Event{})
   164  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
   165  	broadcaster := eventBroadcaster.(*eventBroadcasterImpl)
   166  	// Don't call StartRecordingToSink, as we don't need neither refreshing event
   167  	// series nor finishing them in this tests and additional events updated would
   168  	// race with our expected ones.
   169  	err = broadcaster.startRecordingEvents(ctx)
   170  	if err != nil {
   171  		t.Fatal(err)
   172  	}
   173  	recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
   174  	// read from the chan as this was needed only to populate the cache
   175  	<-createEvent
   176  	for index, item := range table {
   177  		actual := item.actual
   178  		recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements)
   179  		// validate event
   180  		if item.expectUpdate {
   181  			actualEvent := <-patchEvent
   182  			t.Logf("%v - validating event affected by patch request", index)
   183  			validateEvent(strconv.Itoa(index), true, actualEvent, item.expect, t)
   184  		} else {
   185  			actualEvent := <-createEvent
   186  			t.Logf("%v - validating event affected by a create request", index)
   187  			validateEvent(strconv.Itoa(index), false, actualEvent, item.expect, t)
   188  		}
   189  	}
   190  }
   191  
   192  // TestEventSeriesWithEventSinkImplRace verifies that when Events are emitted to
   193  // an EventSink consecutively there is no data race.  This test is meant to be
   194  // run with the `-race` option.
   195  func TestEventSeriesWithEventSinkImplRace(t *testing.T) {
   196  	kubeClient := fake.NewSimpleClientset()
   197  
   198  	eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
   199  	eventBroadcaster := NewBroadcaster(eventSink)
   200  
   201  	stopCh := make(chan struct{})
   202  	eventBroadcaster.StartRecordingToSink(stopCh)
   203  
   204  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "test")
   205  
   206  	recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
   207  	recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
   208  
   209  	err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
   210  		events, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
   211  		if err != nil {
   212  			return false, err
   213  		}
   214  
   215  		if len(events.Items) != 1 {
   216  			return false, nil
   217  		}
   218  
   219  		if events.Items[0].Series == nil {
   220  			return false, nil
   221  		}
   222  
   223  		return true, nil
   224  	})
   225  	if err != nil {
   226  		t.Fatal("expected that 2 identical Eventf calls would result in the creation of an Event with a Serie")
   227  	}
   228  }
   229  
   230  func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) {
   231  	recvEvent := *actualEvent
   232  
   233  	// Just check that the timestamp was set.
   234  	if recvEvent.EventTime.IsZero() {
   235  		t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent)
   236  	}
   237  
   238  	if expectedUpdate {
   239  		if recvEvent.Series == nil {
   240  			t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series)
   241  
   242  		} else {
   243  			if recvEvent.Series.Count != expectedEvent.Series.Count {
   244  				t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series)
   245  			}
   246  		}
   247  
   248  		// Check that name has the right prefix.
   249  		if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
   250  			t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)
   251  		}
   252  	} else {
   253  		if recvEvent.Series != nil {
   254  			t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series)
   255  		}
   256  	}
   257  
   258  }
   259  
   260  func TestFinishSeries(t *testing.T) {
   261  	_, ctx := ktesting.NewTestContext(t)
   262  	hostname, _ := os.Hostname()
   263  	testPod := &v1.Pod{
   264  		ObjectMeta: metav1.ObjectMeta{
   265  			Name:      "foo",
   266  			Namespace: "baz",
   267  			UID:       "bar",
   268  		},
   269  	}
   270  	regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
   271  	if err != nil {
   272  		t.Fatal(err)
   273  	}
   274  	related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
   275  	if err != nil {
   276  		t.Fatal(err)
   277  	}
   278  	LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
   279  
   280  	createEvent := make(chan *eventsv1.Event, 10)
   281  	updateEvent := make(chan *eventsv1.Event, 10)
   282  	patchEvent := make(chan *eventsv1.Event, 10)
   283  	testEvents := testEventSeriesSink{
   284  		OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
   285  			createEvent <- event
   286  			return event, nil
   287  		},
   288  		OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
   289  			updateEvent <- event
   290  			return event, nil
   291  		},
   292  		OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
   293  			// event we receive is already patched, usually the sink uses it
   294  			// only to retrieve the name and namespace, here we'll use it directly
   295  			patchEvent <- event
   296  			return event, nil
   297  		},
   298  	}
   299  	cache := map[eventKey]*eventsv1.Event{}
   300  	eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
   301  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImplLogger)
   302  	cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{Time: time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
   303  	nonFinishedEvent := cachedEvent.DeepCopy()
   304  	nonFinishedEvent.ReportingController = "nonFinished-controller"
   305  	cachedEvent.Series = &eventsv1.EventSeries{
   306  		Count:            10,
   307  		LastObservedTime: LastObservedTime,
   308  	}
   309  	cache[getKey(cachedEvent)] = cachedEvent
   310  	cache[getKey(nonFinishedEvent)] = nonFinishedEvent
   311  	eventBroadcaster.finishSeries(ctx)
   312  	select {
   313  	case actualEvent := <-patchEvent:
   314  		t.Logf("validating event affected by patch request")
   315  		eventBroadcaster.mu.Lock()
   316  		defer eventBroadcaster.mu.Unlock()
   317  		if len(cache) != 1 {
   318  			t.Errorf("cache should be empty, but instead got a size of %v", len(cache))
   319  		}
   320  		if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) {
   321  			t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime)
   322  		}
   323  		// check that we emitted only one event
   324  		if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
   325  			t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
   326  		}
   327  	case <-time.After(wait.ForeverTestTimeout):
   328  		t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
   329  	}
   330  }
   331  
   332  func TestRefreshExistingEventSeries(t *testing.T) {
   333  	_, ctx := ktesting.NewTestContext(t)
   334  	hostname, _ := os.Hostname()
   335  	testPod := &v1.Pod{
   336  		ObjectMeta: metav1.ObjectMeta{
   337  			Name:      "foo",
   338  			Namespace: "baz",
   339  			UID:       "bar",
   340  		},
   341  	}
   342  	regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
   343  	if err != nil {
   344  		t.Fatal(err)
   345  	}
   346  	related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
   347  	if err != nil {
   348  		t.Fatal(err)
   349  	}
   350  	LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
   351  	createEvent := make(chan *eventsv1.Event, 10)
   352  	updateEvent := make(chan *eventsv1.Event, 10)
   353  	patchEvent := make(chan *eventsv1.Event, 10)
   354  
   355  	table := []struct {
   356  		patchFunc func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error)
   357  	}{
   358  		{
   359  			patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
   360  				// event we receive is already patched, usually the sink uses it
   361  				//only to retrieve the name and namespace, here we'll use it directly.
   362  				patchEvent <- event
   363  				return event, nil
   364  			},
   365  		},
   366  		{
   367  			patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
   368  				// we simulate an apiserver error here
   369  				patchEvent <- nil
   370  				return nil, &restclient.RequestConstructionError{}
   371  			},
   372  		},
   373  	}
   374  	for _, item := range table {
   375  		testEvents := testEventSeriesSink{
   376  			OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
   377  				createEvent <- event
   378  				return event, nil
   379  			},
   380  			OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
   381  				updateEvent <- event
   382  				return event, nil
   383  			},
   384  			OnPatch: item.patchFunc,
   385  		}
   386  		cache := map[eventKey]*eventsv1.Event{}
   387  		eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
   388  		recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImplLogger)
   389  		cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{Time: time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
   390  		cachedEvent.Series = &eventsv1.EventSeries{
   391  			Count:            10,
   392  			LastObservedTime: LastObservedTime,
   393  		}
   394  		cacheKey := getKey(cachedEvent)
   395  		cache[cacheKey] = cachedEvent
   396  
   397  		eventBroadcaster.refreshExistingEventSeries(ctx)
   398  		select {
   399  		case <-patchEvent:
   400  			t.Logf("validating event affected by patch request")
   401  			eventBroadcaster.mu.Lock()
   402  			defer eventBroadcaster.mu.Unlock()
   403  			if len(cache) != 1 {
   404  				t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
   405  			}
   406  			// check that we emitted only one event
   407  			if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
   408  				t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
   409  			}
   410  			cacheEvent, exists := cache[cacheKey]
   411  
   412  			if cacheEvent == nil || !exists {
   413  				t.Errorf("expected event to exist and not being nil, but instead event: %v and exists: %v", cacheEvent, exists)
   414  			}
   415  		case <-time.After(wait.ForeverTestTimeout):
   416  			t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
   417  		}
   418  	}
   419  }
   420  

View as plain text