package utils import ( "context" "errors" "time" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" kwatch "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/controller-runtime/pkg/client" ) var ( ErrWatchTimeout = errors.New("timeout getting data") ErrNotAMatch = errors.New("updated object is not a match") ) // MatchFn return nil when matching, ErrNotAMatch when not matching, will stop watching for any other errors type MatchFn func(obj runtime.Object) error func Watch(cl client.WithWatch, list client.ObjectList, objectKey client.ObjectKey, timeout time.Duration, matches MatchFn) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() watch, err := cl.Watch(ctx, list, &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", objectKey.Name), Namespace: objectKey.Namespace, }) if err != nil { return err } defer watch.Stop() watchChannel := watch.ResultChan() for { select { case event := <-watchChannel: switch event.Type { case kwatch.Added, kwatch.Modified, kwatch.Deleted: err = matches(event.Object) if err == nil { // when there is a match no error is return return nil } if !errors.Is(err, ErrNotAMatch) { // stop watch for any other errors return err } } case <-ctx.Done(): return ErrWatchTimeout } } }