...

Source file src/github.com/datawire/ambassador/v2/pkg/kubeapply/waiter.go

Documentation: github.com/datawire/ambassador/v2/pkg/kubeapply

     1  package kubeapply
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"time"
     7  
     8  	corev1 "k8s.io/api/core/v1"
     9  
    10  	"github.com/datawire/ambassador/v2/pkg/k8s"
    11  	kates_internal "github.com/datawire/ambassador/v2/pkg/kates_internal"
    12  	"github.com/datawire/dlib/dlog"
    13  )
    14  
    15  // Waiter takes some YAML and waits for all of the resources described
    16  // in it to be ready.
    17  type Waiter struct {
    18  	watcher *k8s.Watcher
    19  	kinds   map[k8s.ResourceType]map[string]struct{}
    20  }
    21  
    22  // NewWaiter constructs a Waiter object based on the supplied Watcher.
    23  func NewWaiter(watcher *k8s.Watcher) (w *Waiter, err error) {
    24  	if watcher == nil {
    25  		cli, err := k8s.NewClient(nil)
    26  		if err != nil {
    27  			return nil, err
    28  		}
    29  		watcher = cli.Watcher()
    30  	}
    31  	return &Waiter{
    32  		watcher: watcher,
    33  		kinds:   make(map[k8s.ResourceType]map[string]struct{}),
    34  	}, nil
    35  }
    36  
    37  func (w *Waiter) add(resource k8s.Resource) error {
    38  	resourceType, err := w.watcher.Client.ResolveResourceType(resource.QKind())
    39  	if err != nil {
    40  		return err
    41  	}
    42  
    43  	resourceName := resource.Name()
    44  	if resourceType.Namespaced {
    45  		namespace := resource.Namespace()
    46  		if namespace == "" {
    47  			namespace = w.watcher.Client.Namespace
    48  		}
    49  		resourceName += "." + namespace
    50  	}
    51  
    52  	if _, ok := w.kinds[resourceType]; !ok {
    53  		w.kinds[resourceType] = make(map[string]struct{})
    54  	}
    55  	w.kinds[resourceType][resourceName] = struct{}{}
    56  	return nil
    57  }
    58  
    59  // Scan calls LoadResources(path), and add all resources loaded to the
    60  // Waiter.
    61  func (w *Waiter) Scan(ctx context.Context, path string) error {
    62  	resources, err := LoadResources(ctx, path)
    63  	if err != nil {
    64  		return fmt.Errorf("LoadResources: %w", err)
    65  	}
    66  	for _, res := range resources {
    67  		if err = w.add(res); err != nil {
    68  			return fmt.Errorf("%s/%s: %w", res.QKind(), res.QName(), err)
    69  		}
    70  	}
    71  	return nil
    72  }
    73  
    74  func (w *Waiter) remove(kind k8s.ResourceType, name string) {
    75  	delete(w.kinds[kind], name)
    76  }
    77  
    78  func (w *Waiter) isEmpty() bool {
    79  	for _, names := range w.kinds {
    80  		if len(names) > 0 {
    81  			return false
    82  		}
    83  	}
    84  
    85  	return true
    86  }
    87  
    88  // Wait spews a bunch of crap on stdout, and waits for all of the
    89  // Scan()ed resources to be ready.  If they all become ready before
    90  // deadline, then it returns true.  If they don't become ready by
    91  // then, then it bails early and returns false.
    92  func (w *Waiter) Wait(ctx context.Context, deadline time.Time) (bool, error) {
    93  	start := time.Now()
    94  	printed := make(map[string]bool)
    95  	err := w.watcher.WatchQuery(k8s.Query{Kind: "Events.v1.", Namespace: k8s.NamespaceAll}, func(watcher *k8s.Watcher) error {
    96  		list, err := watcher.List("Events.v1.")
    97  		if err != nil {
    98  			return err
    99  		}
   100  		for _, untypedEvent := range list {
   101  			var event corev1.Event
   102  			if err := kates_internal.Convert(untypedEvent, &event); err != nil {
   103  				dlog.Errorln(ctx, err)
   104  				continue
   105  			}
   106  			if event.LastTimestamp.Time.Before(start) && !event.LastTimestamp.IsZero() {
   107  				continue
   108  			}
   109  			eventQName := fmt.Sprintf("%s.%s", event.Name, event.Namespace)
   110  			if !printed[eventQName] {
   111  				involvedQKind := k8s.QKind(event.InvolvedObject.APIVersion, event.InvolvedObject.Kind)
   112  				involvedQName := fmt.Sprintf("%s.%s", event.InvolvedObject.Name, event.InvolvedObject.Namespace)
   113  
   114  				dlog.Printf(ctx, "event: %s/%s: %s\n", involvedQKind, involvedQName, event.Message)
   115  				printed[eventQName] = true
   116  			}
   117  		}
   118  		return nil
   119  	})
   120  	if err != nil {
   121  		return false, err
   122  	}
   123  
   124  	listener := func(watcher *k8s.Watcher) error {
   125  		for kind, names := range w.kinds {
   126  			for name := range names {
   127  				r, err := watcher.Get(kind.String(), name)
   128  				if err != nil {
   129  					return err
   130  				}
   131  				if Ready(r) {
   132  					if ReadyImplemented(r) {
   133  						dlog.Printf(ctx, "ready: %s/%s\n", r.QKind(), r.QName())
   134  					} else {
   135  						dlog.Printf(ctx, "ready: %s/%s (UNIMPLEMENTED)\n",
   136  							r.QKind(), r.QName())
   137  					}
   138  					w.remove(kind, name)
   139  				}
   140  			}
   141  		}
   142  
   143  		if w.isEmpty() {
   144  			watcher.Stop()
   145  		}
   146  		return nil
   147  	}
   148  
   149  	for k := range w.kinds {
   150  		if err := w.watcher.WatchQuery(k8s.Query{Kind: k.String(), Namespace: k8s.NamespaceAll}, listener); err != nil {
   151  			return false, err
   152  		}
   153  	}
   154  
   155  	if err := w.watcher.Start(ctx); err != nil {
   156  		return false, err
   157  	}
   158  
   159  	go func() {
   160  		time.Sleep(time.Until(deadline))
   161  		w.watcher.Stop()
   162  	}()
   163  
   164  	if err := w.watcher.Wait(ctx); err != nil {
   165  		return false, err
   166  	}
   167  
   168  	result := true
   169  
   170  	for kind, names := range w.kinds {
   171  		for name := range names {
   172  			fmt.Printf("not ready: %s/%s\n", kind, name)
   173  			result = false
   174  		}
   175  	}
   176  
   177  	return result, nil
   178  }
   179  

View as plain text