...

Source file src/edge-infra.dev/pkg/edge/api/utils/watch.go

Documentation: edge-infra.dev/pkg/edge/api/utils

     1  package utils
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"time"
     7  
     8  	"k8s.io/apimachinery/pkg/fields"
     9  	"k8s.io/apimachinery/pkg/runtime"
    10  	kwatch "k8s.io/apimachinery/pkg/watch"
    11  	"sigs.k8s.io/controller-runtime/pkg/client"
    12  )
    13  
    14  var (
    15  	ErrWatchTimeout = errors.New("timeout getting data")
    16  	ErrNotAMatch    = errors.New("updated object is not a match")
    17  )
    18  
    19  // MatchFn return nil when matching, ErrNotAMatch when not matching, will stop watching for any other errors
    20  type MatchFn func(obj runtime.Object) error
    21  
    22  func Watch(cl client.WithWatch, list client.ObjectList, objectKey client.ObjectKey, timeout time.Duration, matches MatchFn) error {
    23  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
    24  	defer cancel()
    25  	watch, err := cl.Watch(ctx, list, &client.ListOptions{
    26  		FieldSelector: fields.OneTermEqualSelector("metadata.name", objectKey.Name),
    27  		Namespace:     objectKey.Namespace,
    28  	})
    29  	if err != nil {
    30  		return err
    31  	}
    32  	defer watch.Stop()
    33  
    34  	watchChannel := watch.ResultChan()
    35  	for {
    36  		select {
    37  		case event := <-watchChannel:
    38  			switch event.Type {
    39  			case kwatch.Added, kwatch.Modified, kwatch.Deleted:
    40  				err = matches(event.Object)
    41  				if err == nil { // when there is a match no error is return
    42  					return nil
    43  				}
    44  				if !errors.Is(err, ErrNotAMatch) { // stop watch for any other errors
    45  					return err
    46  				}
    47  			}
    48  		case <-ctx.Done():
    49  			return ErrWatchTimeout
    50  		}
    51  	}
    52  }
    53  

View as plain text