...

Source file src/k8s.io/kubernetes/test/e2e/scheduling/events.go

Documentation: k8s.io/kubernetes/test/e2e/scheduling

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduling
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strings"
    23  	"sync"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/runtime"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	"k8s.io/apimachinery/pkg/watch"
    31  	clientset "k8s.io/client-go/kubernetes"
    32  	"k8s.io/client-go/tools/cache"
    33  	"k8s.io/kubernetes/test/e2e/framework"
    34  
    35  	"github.com/onsi/ginkgo/v2"
    36  )
    37  
    38  func scheduleSuccessEvent(ns, podName, nodeName string) func(*v1.Event) bool {
    39  	return func(e *v1.Event) bool {
    40  		return e.Type == v1.EventTypeNormal &&
    41  			e.Reason == "Scheduled" &&
    42  			strings.HasPrefix(e.Name, podName) &&
    43  			strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v/%v to %v", ns, podName, nodeName))
    44  	}
    45  }
    46  
    47  func scheduleFailureEvent(podName string) func(*v1.Event) bool {
    48  	return func(e *v1.Event) bool {
    49  		return strings.HasPrefix(e.Name, podName) &&
    50  			e.Type == "Warning" &&
    51  			e.Reason == "FailedScheduling"
    52  	}
    53  }
    54  
    55  // Action is a function to be performed by the system.
    56  type Action func(ctx context.Context) error
    57  
    58  // observeEventAfterAction returns true if an event matching the predicate was emitted
    59  // from the system after performing the supplied action.
    60  func observeEventAfterAction(ctx context.Context, c clientset.Interface, ns string, eventPredicate func(*v1.Event) bool, action Action) (bool, error) {
    61  	// TODO (pohly): add context support
    62  	observedMatchingEvent := false
    63  	informerStartedChan := make(chan struct{})
    64  	var informerStartedGuard sync.Once
    65  
    66  	// Create an informer to list/watch events from the test framework namespace.
    67  	_, controller := cache.NewInformer(
    68  		&cache.ListWatch{
    69  			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
    70  				ls, err := c.CoreV1().Events(ns).List(ctx, options)
    71  				return ls, err
    72  			},
    73  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
    74  				// Signal parent goroutine that watching has begun.
    75  				defer informerStartedGuard.Do(func() { close(informerStartedChan) })
    76  				w, err := c.CoreV1().Events(ns).Watch(ctx, options)
    77  				return w, err
    78  			},
    79  		},
    80  		&v1.Event{},
    81  		0,
    82  		cache.ResourceEventHandlerFuncs{
    83  			AddFunc: func(obj interface{}) {
    84  				e, ok := obj.(*v1.Event)
    85  				if !ok {
    86  					framework.Failf("Expected *v1.Event, got %T %v", obj, obj)
    87  				}
    88  				ginkgo.By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message))
    89  				if eventPredicate(e) {
    90  					observedMatchingEvent = true
    91  				}
    92  			},
    93  		},
    94  	)
    95  
    96  	// Start the informer and block this goroutine waiting for the started signal.
    97  	informerStopChan := make(chan struct{})
    98  	defer func() { close(informerStopChan) }()
    99  	go controller.Run(informerStopChan)
   100  	<-informerStartedChan
   101  
   102  	// Invoke the action function.
   103  	err := action(ctx)
   104  	if err != nil {
   105  		return false, err
   106  	}
   107  
   108  	// Poll whether the informer has found a matching event with a timeout.
   109  	// Wait up 2 minutes polling every second.
   110  	timeout := 2 * time.Minute
   111  	interval := 1 * time.Second
   112  	err = wait.PollUntilContextTimeout(ctx, interval, timeout, false, func(ctx context.Context) (bool, error) {
   113  		return observedMatchingEvent, nil
   114  	})
   115  	return err == nil, err
   116  }
   117  

View as plain text