...

Source file src/sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector/collector_test.go

Documentation: sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package collector
     5  
     6  import (
     7  	"errors"
     8  	"fmt"
     9  	"sort"
    10  	"testing"
    11  	"time"
    12  
    13  	"github.com/stretchr/testify/assert"
    14  	appsv1 "k8s.io/api/apps/v1"
    15  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    16  	"sigs.k8s.io/cli-utils/pkg/object"
    17  )
    18  
    19  func TestCollectorStopsWhenEventChannelIsClosed(t *testing.T) {
    20  	var identifiers object.ObjMetadataSet
    21  
    22  	collector := NewResourceStatusCollector(identifiers)
    23  
    24  	eventCh := make(chan event.Event)
    25  	stopCh := make(chan struct{})
    26  	defer close(stopCh)
    27  
    28  	completedCh := collector.Listen(eventCh)
    29  
    30  	timer := time.NewTimer(3 * time.Second)
    31  
    32  	close(eventCh)
    33  	select {
    34  	case <-timer.C:
    35  		t.Errorf("expected collector to close the completedCh, but it didn't")
    36  	case <-completedCh:
    37  		timer.Stop()
    38  	}
    39  }
    40  
    41  func TestCollectorWithFatalError(t *testing.T) {
    42  	var identifiers object.ObjMetadataSet
    43  
    44  	collector := NewResourceStatusCollector(identifiers)
    45  
    46  	eventCh := make(chan event.Event)
    47  
    48  	completedCh := collector.Listen(eventCh)
    49  
    50  	exampleErr := fmt.Errorf("this is a test error")
    51  	eventCh <- event.Event{
    52  		Type:  event.ErrorEvent,
    53  		Error: exampleErr,
    54  	}
    55  
    56  	var err error
    57  	timer := time.NewTimer(3 * time.Second)
    58  	close(eventCh)
    59  	select {
    60  	case <-timer.C:
    61  		t.Errorf("expected collector to close the completedCh, but it didn't")
    62  	case msg, ok := <-completedCh:
    63  		if ok {
    64  			err = msg.Err
    65  		} else {
    66  			timer.Stop()
    67  		}
    68  	}
    69  
    70  	if !errors.Is(err, exampleErr) {
    71  		t.Errorf("expected exampleErr, but found %v", err)
    72  	}
    73  }
    74  
    75  var (
    76  	deploymentGVK       = appsv1.SchemeGroupVersion.WithKind("Deployment")
    77  	statefulSetGVK      = appsv1.SchemeGroupVersion.WithKind("StatefulSet")
    78  	resourceIdentifiers = map[string]object.ObjMetadata{
    79  		"deployment": {
    80  			GroupKind: deploymentGVK.GroupKind(),
    81  			Name:      "Foo",
    82  			Namespace: "default",
    83  		},
    84  		"statefulSet": {
    85  			GroupKind: statefulSetGVK.GroupKind(),
    86  			Name:      "Bar",
    87  			Namespace: "default",
    88  		},
    89  	}
    90  )
    91  
    92  func TestCollectorEventProcessing(t *testing.T) {
    93  	testCases := map[string]struct {
    94  		identifiers object.ObjMetadataSet
    95  		events      []event.Event
    96  	}{
    97  		"no resources and no events": {},
    98  		"single resource and single event": {
    99  			identifiers: object.ObjMetadataSet{
   100  				resourceIdentifiers["deployment"],
   101  			},
   102  			events: []event.Event{
   103  				{
   104  					Type: event.ResourceUpdateEvent,
   105  					Resource: &event.ResourceStatus{
   106  						Identifier: resourceIdentifiers["deployment"],
   107  					},
   108  				},
   109  			},
   110  		},
   111  		"multiple resources and multiple events": {
   112  			identifiers: object.ObjMetadataSet{
   113  				resourceIdentifiers["deployment"],
   114  				resourceIdentifiers["statefulSet"],
   115  			},
   116  			events: []event.Event{
   117  				{
   118  					Type: event.ResourceUpdateEvent,
   119  					Resource: &event.ResourceStatus{
   120  						Identifier: resourceIdentifiers["deployment"],
   121  					},
   122  				},
   123  				{
   124  					Type: event.ResourceUpdateEvent,
   125  					Resource: &event.ResourceStatus{
   126  						Identifier: resourceIdentifiers["statefulSet"],
   127  					},
   128  				},
   129  				{
   130  					Type: event.ResourceUpdateEvent,
   131  					Resource: &event.ResourceStatus{
   132  						Identifier: resourceIdentifiers["deployment"],
   133  					},
   134  				},
   135  				{
   136  					Type: event.ResourceUpdateEvent,
   137  					Resource: &event.ResourceStatus{
   138  						Identifier: resourceIdentifiers["statefulSet"],
   139  					},
   140  				},
   141  			},
   142  		},
   143  	}
   144  
   145  	for tn, tc := range testCases {
   146  		t.Run(tn, func(t *testing.T) {
   147  			collector := NewResourceStatusCollector(tc.identifiers)
   148  
   149  			eventCh := make(chan event.Event)
   150  			defer close(eventCh)
   151  
   152  			collector.Listen(eventCh)
   153  
   154  			var latestEvent *event.Event
   155  			latestEventByIdentifier := make(map[object.ObjMetadata]event.Event)
   156  			for _, e := range tc.events {
   157  				if e.Resource != nil {
   158  					latestEventByIdentifier[e.Resource.Identifier] = e
   159  				}
   160  				ev := e
   161  				latestEvent = &ev
   162  				eventCh <- e
   163  			}
   164  			// Give the collector some time to process the event.
   165  			<-time.NewTimer(time.Second).C
   166  
   167  			observation := collector.LatestObservation()
   168  
   169  			var expectedObservation *Observation
   170  			if latestEvent != nil {
   171  				expectedObservation = &Observation{
   172  					LastEventType: latestEvent.Type,
   173  				}
   174  			} else {
   175  				expectedObservation = &Observation{}
   176  			}
   177  
   178  			var resourceStatuses event.ResourceStatuses
   179  			for _, e := range latestEventByIdentifier {
   180  				resourceStatuses = append(resourceStatuses, e.Resource)
   181  			}
   182  			sort.Sort(resourceStatuses)
   183  			expectedObservation.ResourceStatuses = resourceStatuses
   184  
   185  			assert.Equal(t, expectedObservation, observation)
   186  		})
   187  	}
   188  }
   189  

View as plain text