1
2
3
4 package engine
5
6 import (
7 "context"
8 "strings"
9 "testing"
10 "time"
11
12 "github.com/stretchr/testify/assert"
13 appsv1 "k8s.io/api/apps/v1"
14 v1 "k8s.io/api/core/v1"
15 "k8s.io/apimachinery/pkg/api/meta"
16 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
17 "k8s.io/apimachinery/pkg/runtime/schema"
18 fakecr "sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader/fake"
19 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
20 "sigs.k8s.io/cli-utils/pkg/kstatus/status"
21 "sigs.k8s.io/cli-utils/pkg/object"
22 fakemapper "sigs.k8s.io/cli-utils/pkg/testutil"
23 "sigs.k8s.io/controller-runtime/pkg/client"
24 )
25
26 func TestStatusPollerRunner(t *testing.T) {
27 testCases := map[string]struct {
28 identifiers object.ObjMetadataSet
29 defaultStatusReader StatusReader
30 expectedEventTypes []event.Type
31 }{
32 "single resource": {
33 identifiers: object.ObjMetadataSet{
34 {
35 GroupKind: schema.GroupKind{
36 Group: "apps",
37 Kind: "Deployment",
38 },
39 Name: "foo",
40 Namespace: "bar",
41 },
42 },
43 defaultStatusReader: &fakeStatusReader{
44 resourceStatuses: map[schema.GroupKind][]status.Status{
45 schema.GroupKind{Group: "apps", Kind: "Deployment"}: {
46 status.InProgressStatus,
47 status.CurrentStatus,
48 },
49 },
50 resourceStatusCount: make(map[schema.GroupKind]int),
51 },
52 expectedEventTypes: []event.Type{
53 event.ResourceUpdateEvent,
54 event.ResourceUpdateEvent,
55 },
56 },
57 "multiple resources": {
58 identifiers: object.ObjMetadataSet{
59 {
60 GroupKind: schema.GroupKind{
61 Group: "apps",
62 Kind: "Deployment",
63 },
64 Name: "foo",
65 Namespace: "default",
66 },
67 {
68 GroupKind: schema.GroupKind{
69 Group: "",
70 Kind: "Service",
71 },
72 Name: "bar",
73 Namespace: "default",
74 },
75 },
76 defaultStatusReader: &fakeStatusReader{
77 resourceStatuses: map[schema.GroupKind][]status.Status{
78 schema.GroupKind{Group: "apps", Kind: "Deployment"}: {
79 status.InProgressStatus,
80 status.CurrentStatus,
81 },
82 schema.GroupKind{Group: "", Kind: "Service"}: {
83 status.InProgressStatus,
84 status.InProgressStatus,
85 status.CurrentStatus,
86 },
87 },
88 resourceStatusCount: make(map[schema.GroupKind]int),
89 },
90 expectedEventTypes: []event.Type{
91 event.ResourceUpdateEvent,
92 event.ResourceUpdateEvent,
93 event.ResourceUpdateEvent,
94 event.ResourceUpdateEvent,
95 },
96 },
97 }
98
99 for tn, tc := range testCases {
100 t.Run(tn, func(t *testing.T) {
101 ctx, cancel := context.WithCancel(context.Background())
102 defer cancel()
103
104 identifiers := tc.identifiers
105
106 fakeMapper := fakemapper.NewFakeRESTMapper(
107 appsv1.SchemeGroupVersion.WithKind("Deployment"),
108 v1.SchemeGroupVersion.WithKind("Service"),
109 )
110
111 engine := PollerEngine{
112 Mapper: fakeMapper,
113 DefaultStatusReader: tc.defaultStatusReader,
114 StatusReaders: []StatusReader{},
115 ClusterReaderFactory: ClusterReaderFactoryFunc(func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error) {
116 return fakecr.NewNoopClusterReader(), nil
117 }),
118 }
119
120 options := Options{
121 PollInterval: 2 * time.Second,
122 }
123
124 eventChannel := engine.Poll(ctx, identifiers, options)
125
126 var eventTypes []event.Type
127 for ch := range eventChannel {
128 eventTypes = append(eventTypes, ch.Type)
129 if len(eventTypes) == len(tc.expectedEventTypes) {
130 cancel()
131 }
132 }
133
134 assert.Equal(t, tc.expectedEventTypes, eventTypes)
135 })
136 }
137 }
138
139 func TestNewStatusPollerRunnerCancellation(t *testing.T) {
140 identifiers := make(object.ObjMetadataSet, 0)
141
142 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
143 defer cancel()
144
145 timer := time.NewTimer(5 * time.Second)
146
147 engine := PollerEngine{
148 ClusterReaderFactory: ClusterReaderFactoryFunc(func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error) {
149 return fakecr.NewNoopClusterReader(), nil
150 }),
151 }
152
153 options := Options{
154 PollInterval: 2 * time.Second,
155 }
156
157 eventChannel := engine.Poll(ctx, identifiers, options)
158
159 for {
160 select {
161 case <-eventChannel:
162 timer.Stop()
163 return
164 case <-timer.C:
165 t.Errorf("expected runner to time out, but it didn't")
166 return
167 }
168 }
169 }
170
171 func TestNewStatusPollerRunnerCancellationWithMultipleResources(t *testing.T) {
172 identifiers := object.ObjMetadataSet{
173 {
174 GroupKind: schema.GroupKind{
175 Group: "apps",
176 Kind: "Deployment",
177 },
178 Name: "foo",
179 Namespace: "default",
180 },
181 {
182 GroupKind: schema.GroupKind{
183 Group: "apps",
184 Kind: "StatefulSet",
185 },
186 Name: "bar",
187 Namespace: "default",
188 },
189 }
190
191 fakeMapper := fakemapper.NewFakeRESTMapper(
192 appsv1.SchemeGroupVersion.WithKind("Deployment"),
193 appsv1.SchemeGroupVersion.WithKind("StatefulSet"),
194 )
195
196 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
197 defer cancel()
198
199 timer := time.NewTimer(10 * time.Second)
200
201 engine := PollerEngine{
202 Mapper: fakeMapper,
203 ClusterReaderFactory: ClusterReaderFactoryFunc(func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error) {
204 return &fakecr.ClusterReader{
205 SyncErr: context.Canceled,
206 }, nil
207 }),
208 }
209
210 options := Options{
211 PollInterval: 1 * time.Second,
212 }
213
214 eventChannel := engine.Poll(ctx, identifiers, options)
215
216 var events []event.Event
217 loop:
218 for {
219 select {
220 case e, ok := <-eventChannel:
221 if !ok {
222 timer.Stop()
223 break loop
224 }
225 events = append(events, e)
226 case <-timer.C:
227 t.Errorf("expected runner to time out, but it didn't")
228 return
229 }
230 }
231 assert.Equal(t, 0, len(events))
232 }
233
234 func TestNewStatusPollerRunnerIdentifierValidation(t *testing.T) {
235 identifiers := object.ObjMetadataSet{
236 {
237 GroupKind: schema.GroupKind{
238 Group: "apps",
239 Kind: "Deployment",
240 },
241 Name: "foo",
242 },
243 }
244
245 engine := PollerEngine{
246 Mapper: fakemapper.NewFakeRESTMapper(
247 appsv1.SchemeGroupVersion.WithKind("Deployment"),
248 ),
249 ClusterReaderFactory: ClusterReaderFactoryFunc(func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error) {
250 return fakecr.NewNoopClusterReader(), nil
251 }),
252 }
253
254 eventChannel := engine.Poll(context.Background(), identifiers, Options{})
255
256 timer := time.NewTimer(3 * time.Second)
257 defer timer.Stop()
258 select {
259 case e := <-eventChannel:
260 if e.Type != event.ErrorEvent {
261 t.Errorf("expected an error event, but got %s", e.Type.String())
262 return
263 }
264 err := e.Error
265 if !strings.Contains(err.Error(), "namespace is not set") {
266 t.Errorf("expected error with namespace not set, but got %v", err)
267 }
268 return
269 case <-timer.C:
270 t.Errorf("expected an error event, but didn't get one")
271 }
272 }
273
274 type fakeStatusReader struct {
275 resourceStatuses map[schema.GroupKind][]status.Status
276 resourceStatusCount map[schema.GroupKind]int
277 }
278
279 func (f *fakeStatusReader) Supports(schema.GroupKind) bool {
280 return true
281 }
282
283 func (f *fakeStatusReader) ReadStatus(_ context.Context, _ ClusterReader, identifier object.ObjMetadata) (*event.ResourceStatus, error) {
284 count := f.resourceStatusCount[identifier.GroupKind]
285 resourceStatusSlice := f.resourceStatuses[identifier.GroupKind]
286 var resourceStatus status.Status
287 if len(resourceStatusSlice) > count {
288 resourceStatus = resourceStatusSlice[count]
289 } else {
290 resourceStatus = resourceStatusSlice[len(resourceStatusSlice)-1]
291 }
292 f.resourceStatusCount[identifier.GroupKind] = count + 1
293 return &event.ResourceStatus{
294 Identifier: identifier,
295 Status: resourceStatus,
296 }, nil
297 }
298
299 func (f *fakeStatusReader) ReadStatusForObject(_ context.Context, _ ClusterReader, _ *unstructured.Unstructured) (*event.ResourceStatus, error) {
300 return nil, nil
301 }
302
View as plain text