1
16
17 package events
18
19 import (
20 "context"
21 "strconv"
22 "testing"
23 "time"
24
25 "os"
26 "strings"
27
28 v1 "k8s.io/api/core/v1"
29 eventsv1 "k8s.io/api/events/v1"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 k8sruntime "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/util/wait"
33 fake "k8s.io/client-go/kubernetes/fake"
34 "k8s.io/client-go/kubernetes/scheme"
35 restclient "k8s.io/client-go/rest"
36 ref "k8s.io/client-go/tools/reference"
37 "k8s.io/klog/v2/ktesting"
38 )
39
40 type testEventSeriesSink struct {
41 OnCreate func(e *eventsv1.Event) (*eventsv1.Event, error)
42 OnUpdate func(e *eventsv1.Event) (*eventsv1.Event, error)
43 OnPatch func(e *eventsv1.Event, p []byte) (*eventsv1.Event, error)
44 }
45
46
47 func (t *testEventSeriesSink) Create(ctx context.Context, e *eventsv1.Event) (*eventsv1.Event, error) {
48 if t.OnCreate != nil {
49 return t.OnCreate(e)
50 }
51 return e, nil
52 }
53
54
55 func (t *testEventSeriesSink) Update(ctx context.Context, e *eventsv1.Event) (*eventsv1.Event, error) {
56 if t.OnUpdate != nil {
57 return t.OnUpdate(e)
58 }
59 return e, nil
60 }
61
62
63 func (t *testEventSeriesSink) Patch(ctx context.Context, e *eventsv1.Event, p []byte) (*eventsv1.Event, error) {
64 if t.OnPatch != nil {
65 return t.OnPatch(e, p)
66 }
67 return e, nil
68 }
69
70 func TestEventSeriesf(t *testing.T) {
71 hostname, _ := os.Hostname()
72
73 testPod := &v1.Pod{
74 ObjectMeta: metav1.ObjectMeta{
75 Name: "foo",
76 Namespace: "baz",
77 UID: "bar",
78 },
79 }
80
81 regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
82 if err != nil {
83 t.Fatal(err)
84 }
85
86 related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
87 if err != nil {
88 t.Fatal(err)
89 }
90
91 expectedEvent := &eventsv1.Event{
92 ObjectMeta: metav1.ObjectMeta{
93 Name: "foo",
94 Namespace: "baz",
95 },
96 EventTime: metav1.MicroTime{Time: time.Now()},
97 ReportingController: "eventTest",
98 ReportingInstance: "eventTest-" + hostname,
99 Action: "started",
100 Reason: "test",
101 Regarding: *regarding,
102 Related: related,
103 Note: "some verbose message: 1",
104 Type: v1.EventTypeNormal,
105 }
106
107 isomorphicEvent := expectedEvent.DeepCopy()
108
109 nonIsomorphicEvent := expectedEvent.DeepCopy()
110 nonIsomorphicEvent.Action = "stopped"
111
112 expectedEvent.Series = &eventsv1.EventSeries{Count: 2}
113 table := []struct {
114 regarding k8sruntime.Object
115 related k8sruntime.Object
116 actual *eventsv1.Event
117 elements []interface{}
118 expect *eventsv1.Event
119 expectUpdate bool
120 }{
121 {
122 regarding: regarding,
123 related: related,
124 actual: isomorphicEvent,
125 elements: []interface{}{1},
126 expect: expectedEvent,
127 expectUpdate: true,
128 },
129 {
130 regarding: regarding,
131 related: related,
132 actual: nonIsomorphicEvent,
133 elements: []interface{}{1},
134 expect: nonIsomorphicEvent,
135 expectUpdate: false,
136 },
137 }
138
139 _, ctx := ktesting.NewTestContext(t)
140 ctx, cancel := context.WithCancel(ctx)
141 defer cancel()
142
143 createEvent := make(chan *eventsv1.Event)
144 updateEvent := make(chan *eventsv1.Event)
145 patchEvent := make(chan *eventsv1.Event)
146
147 testEvents := testEventSeriesSink{
148 OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
149 createEvent <- event
150 return event, nil
151 },
152 OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
153 updateEvent <- event
154 return event, nil
155 },
156 OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
157
158
159 patchEvent <- event
160 return event, nil
161 },
162 }
163 eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*eventsv1.Event{})
164 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest")
165 broadcaster := eventBroadcaster.(*eventBroadcasterImpl)
166
167
168
169 err = broadcaster.startRecordingEvents(ctx)
170 if err != nil {
171 t.Fatal(err)
172 }
173 recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
174
175 <-createEvent
176 for index, item := range table {
177 actual := item.actual
178 recorder.Eventf(item.regarding, item.related, actual.Type, actual.Reason, actual.Action, actual.Note, item.elements)
179
180 if item.expectUpdate {
181 actualEvent := <-patchEvent
182 t.Logf("%v - validating event affected by patch request", index)
183 validateEvent(strconv.Itoa(index), true, actualEvent, item.expect, t)
184 } else {
185 actualEvent := <-createEvent
186 t.Logf("%v - validating event affected by a create request", index)
187 validateEvent(strconv.Itoa(index), false, actualEvent, item.expect, t)
188 }
189 }
190 }
191
192
193
194
195 func TestEventSeriesWithEventSinkImplRace(t *testing.T) {
196 kubeClient := fake.NewSimpleClientset()
197
198 eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
199 eventBroadcaster := NewBroadcaster(eventSink)
200
201 stopCh := make(chan struct{})
202 eventBroadcaster.StartRecordingToSink(stopCh)
203
204 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "test")
205
206 recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
207 recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
208
209 err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
210 events, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
211 if err != nil {
212 return false, err
213 }
214
215 if len(events.Items) != 1 {
216 return false, nil
217 }
218
219 if events.Items[0].Series == nil {
220 return false, nil
221 }
222
223 return true, nil
224 })
225 if err != nil {
226 t.Fatal("expected that 2 identical Eventf calls would result in the creation of an Event with a Serie")
227 }
228 }
229
230 func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) {
231 recvEvent := *actualEvent
232
233
234 if recvEvent.EventTime.IsZero() {
235 t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent)
236 }
237
238 if expectedUpdate {
239 if recvEvent.Series == nil {
240 t.Errorf("%v - Series was nil but expected: %#v", messagePrefix, recvEvent.Series)
241
242 } else {
243 if recvEvent.Series.Count != expectedEvent.Series.Count {
244 t.Errorf("%v - Series mismatch actual was: %#v but expected: %#v", messagePrefix, recvEvent.Series, expectedEvent.Series)
245 }
246 }
247
248
249 if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
250 t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)
251 }
252 } else {
253 if recvEvent.Series != nil {
254 t.Errorf("%v - series was expected to be nil but was: %#v", messagePrefix, recvEvent.Series)
255 }
256 }
257
258 }
259
260 func TestFinishSeries(t *testing.T) {
261 _, ctx := ktesting.NewTestContext(t)
262 hostname, _ := os.Hostname()
263 testPod := &v1.Pod{
264 ObjectMeta: metav1.ObjectMeta{
265 Name: "foo",
266 Namespace: "baz",
267 UID: "bar",
268 },
269 }
270 regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
271 if err != nil {
272 t.Fatal(err)
273 }
274 related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
275 if err != nil {
276 t.Fatal(err)
277 }
278 LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
279
280 createEvent := make(chan *eventsv1.Event, 10)
281 updateEvent := make(chan *eventsv1.Event, 10)
282 patchEvent := make(chan *eventsv1.Event, 10)
283 testEvents := testEventSeriesSink{
284 OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
285 createEvent <- event
286 return event, nil
287 },
288 OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
289 updateEvent <- event
290 return event, nil
291 },
292 OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
293
294
295 patchEvent <- event
296 return event, nil
297 },
298 }
299 cache := map[eventKey]*eventsv1.Event{}
300 eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
301 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImplLogger)
302 cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{Time: time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
303 nonFinishedEvent := cachedEvent.DeepCopy()
304 nonFinishedEvent.ReportingController = "nonFinished-controller"
305 cachedEvent.Series = &eventsv1.EventSeries{
306 Count: 10,
307 LastObservedTime: LastObservedTime,
308 }
309 cache[getKey(cachedEvent)] = cachedEvent
310 cache[getKey(nonFinishedEvent)] = nonFinishedEvent
311 eventBroadcaster.finishSeries(ctx)
312 select {
313 case actualEvent := <-patchEvent:
314 t.Logf("validating event affected by patch request")
315 eventBroadcaster.mu.Lock()
316 defer eventBroadcaster.mu.Unlock()
317 if len(cache) != 1 {
318 t.Errorf("cache should be empty, but instead got a size of %v", len(cache))
319 }
320 if !actualEvent.Series.LastObservedTime.Equal(&cachedEvent.Series.LastObservedTime) {
321 t.Errorf("series was expected be seen with LastObservedTime %v, but instead got %v ", cachedEvent.Series.LastObservedTime, actualEvent.Series.LastObservedTime)
322 }
323
324 if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
325 t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
326 }
327 case <-time.After(wait.ForeverTestTimeout):
328 t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
329 }
330 }
331
332 func TestRefreshExistingEventSeries(t *testing.T) {
333 _, ctx := ktesting.NewTestContext(t)
334 hostname, _ := os.Hostname()
335 testPod := &v1.Pod{
336 ObjectMeta: metav1.ObjectMeta{
337 Name: "foo",
338 Namespace: "baz",
339 UID: "bar",
340 },
341 }
342 regarding, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[1]")
343 if err != nil {
344 t.Fatal(err)
345 }
346 related, err := ref.GetPartialReference(scheme.Scheme, testPod, ".spec.containers[0]")
347 if err != nil {
348 t.Fatal(err)
349 }
350 LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)}
351 createEvent := make(chan *eventsv1.Event, 10)
352 updateEvent := make(chan *eventsv1.Event, 10)
353 patchEvent := make(chan *eventsv1.Event, 10)
354
355 table := []struct {
356 patchFunc func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error)
357 }{
358 {
359 patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
360
361
362 patchEvent <- event
363 return event, nil
364 },
365 },
366 {
367 patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) {
368
369 patchEvent <- nil
370 return nil, &restclient.RequestConstructionError{}
371 },
372 },
373 }
374 for _, item := range table {
375 testEvents := testEventSeriesSink{
376 OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
377 createEvent <- event
378 return event, nil
379 },
380 OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) {
381 updateEvent <- event
382 return event, nil
383 },
384 OnPatch: item.patchFunc,
385 }
386 cache := map[eventKey]*eventsv1.Event{}
387 eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl)
388 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImplLogger)
389 cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{Time: time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started")
390 cachedEvent.Series = &eventsv1.EventSeries{
391 Count: 10,
392 LastObservedTime: LastObservedTime,
393 }
394 cacheKey := getKey(cachedEvent)
395 cache[cacheKey] = cachedEvent
396
397 eventBroadcaster.refreshExistingEventSeries(ctx)
398 select {
399 case <-patchEvent:
400 t.Logf("validating event affected by patch request")
401 eventBroadcaster.mu.Lock()
402 defer eventBroadcaster.mu.Unlock()
403 if len(cache) != 1 {
404 t.Errorf("cache should be with same size, but instead got a size of %v", len(cache))
405 }
406
407 if len(patchEvent) != 0 || len(createEvent) != 0 || len(updateEvent) != 0 {
408 t.Errorf("exactly one event should be emitted, but got %v", len(patchEvent))
409 }
410 cacheEvent, exists := cache[cacheKey]
411
412 if cacheEvent == nil || !exists {
413 t.Errorf("expected event to exist and not being nil, but instead event: %v and exists: %v", cacheEvent, exists)
414 }
415 case <-time.After(wait.ForeverTestTimeout):
416 t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
417 }
418 }
419 }
420
View as plain text