1
16
17 package scheduling
18
19 import (
20 "context"
21 "fmt"
22 "strings"
23 "sync"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/util/wait"
30 "k8s.io/apimachinery/pkg/watch"
31 clientset "k8s.io/client-go/kubernetes"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/kubernetes/test/e2e/framework"
34
35 "github.com/onsi/ginkgo/v2"
36 )
37
38 func scheduleSuccessEvent(ns, podName, nodeName string) func(*v1.Event) bool {
39 return func(e *v1.Event) bool {
40 return e.Type == v1.EventTypeNormal &&
41 e.Reason == "Scheduled" &&
42 strings.HasPrefix(e.Name, podName) &&
43 strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v/%v to %v", ns, podName, nodeName))
44 }
45 }
46
47 func scheduleFailureEvent(podName string) func(*v1.Event) bool {
48 return func(e *v1.Event) bool {
49 return strings.HasPrefix(e.Name, podName) &&
50 e.Type == "Warning" &&
51 e.Reason == "FailedScheduling"
52 }
53 }
54
55
56 type Action func(ctx context.Context) error
57
58
59
60 func observeEventAfterAction(ctx context.Context, c clientset.Interface, ns string, eventPredicate func(*v1.Event) bool, action Action) (bool, error) {
61
62 observedMatchingEvent := false
63 informerStartedChan := make(chan struct{})
64 var informerStartedGuard sync.Once
65
66
67 _, controller := cache.NewInformer(
68 &cache.ListWatch{
69 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
70 ls, err := c.CoreV1().Events(ns).List(ctx, options)
71 return ls, err
72 },
73 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
74
75 defer informerStartedGuard.Do(func() { close(informerStartedChan) })
76 w, err := c.CoreV1().Events(ns).Watch(ctx, options)
77 return w, err
78 },
79 },
80 &v1.Event{},
81 0,
82 cache.ResourceEventHandlerFuncs{
83 AddFunc: func(obj interface{}) {
84 e, ok := obj.(*v1.Event)
85 if !ok {
86 framework.Failf("Expected *v1.Event, got %T %v", obj, obj)
87 }
88 ginkgo.By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message))
89 if eventPredicate(e) {
90 observedMatchingEvent = true
91 }
92 },
93 },
94 )
95
96
97 informerStopChan := make(chan struct{})
98 defer func() { close(informerStopChan) }()
99 go controller.Run(informerStopChan)
100 <-informerStartedChan
101
102
103 err := action(ctx)
104 if err != nil {
105 return false, err
106 }
107
108
109
110 timeout := 2 * time.Minute
111 interval := 1 * time.Second
112 err = wait.PollUntilContextTimeout(ctx, interval, timeout, false, func(ctx context.Context) (bool, error) {
113 return observedMatchingEvent, nil
114 })
115 return err == nil, err
116 }
117
View as plain text