1 package kubeapply
2
3 import (
4 "context"
5 "fmt"
6 "time"
7
8 corev1 "k8s.io/api/core/v1"
9
10 "github.com/datawire/ambassador/v2/pkg/k8s"
11 kates_internal "github.com/datawire/ambassador/v2/pkg/kates_internal"
12 "github.com/datawire/dlib/dlog"
13 )
14
15
16
17 type Waiter struct {
18 watcher *k8s.Watcher
19 kinds map[k8s.ResourceType]map[string]struct{}
20 }
21
22
23 func NewWaiter(watcher *k8s.Watcher) (w *Waiter, err error) {
24 if watcher == nil {
25 cli, err := k8s.NewClient(nil)
26 if err != nil {
27 return nil, err
28 }
29 watcher = cli.Watcher()
30 }
31 return &Waiter{
32 watcher: watcher,
33 kinds: make(map[k8s.ResourceType]map[string]struct{}),
34 }, nil
35 }
36
37 func (w *Waiter) add(resource k8s.Resource) error {
38 resourceType, err := w.watcher.Client.ResolveResourceType(resource.QKind())
39 if err != nil {
40 return err
41 }
42
43 resourceName := resource.Name()
44 if resourceType.Namespaced {
45 namespace := resource.Namespace()
46 if namespace == "" {
47 namespace = w.watcher.Client.Namespace
48 }
49 resourceName += "." + namespace
50 }
51
52 if _, ok := w.kinds[resourceType]; !ok {
53 w.kinds[resourceType] = make(map[string]struct{})
54 }
55 w.kinds[resourceType][resourceName] = struct{}{}
56 return nil
57 }
58
59
60
61 func (w *Waiter) Scan(ctx context.Context, path string) error {
62 resources, err := LoadResources(ctx, path)
63 if err != nil {
64 return fmt.Errorf("LoadResources: %w", err)
65 }
66 for _, res := range resources {
67 if err = w.add(res); err != nil {
68 return fmt.Errorf("%s/%s: %w", res.QKind(), res.QName(), err)
69 }
70 }
71 return nil
72 }
73
74 func (w *Waiter) remove(kind k8s.ResourceType, name string) {
75 delete(w.kinds[kind], name)
76 }
77
78 func (w *Waiter) isEmpty() bool {
79 for _, names := range w.kinds {
80 if len(names) > 0 {
81 return false
82 }
83 }
84
85 return true
86 }
87
88
89
90
91
92 func (w *Waiter) Wait(ctx context.Context, deadline time.Time) (bool, error) {
93 start := time.Now()
94 printed := make(map[string]bool)
95 err := w.watcher.WatchQuery(k8s.Query{Kind: "Events.v1.", Namespace: k8s.NamespaceAll}, func(watcher *k8s.Watcher) error {
96 list, err := watcher.List("Events.v1.")
97 if err != nil {
98 return err
99 }
100 for _, untypedEvent := range list {
101 var event corev1.Event
102 if err := kates_internal.Convert(untypedEvent, &event); err != nil {
103 dlog.Errorln(ctx, err)
104 continue
105 }
106 if event.LastTimestamp.Time.Before(start) && !event.LastTimestamp.IsZero() {
107 continue
108 }
109 eventQName := fmt.Sprintf("%s.%s", event.Name, event.Namespace)
110 if !printed[eventQName] {
111 involvedQKind := k8s.QKind(event.InvolvedObject.APIVersion, event.InvolvedObject.Kind)
112 involvedQName := fmt.Sprintf("%s.%s", event.InvolvedObject.Name, event.InvolvedObject.Namespace)
113
114 dlog.Printf(ctx, "event: %s/%s: %s\n", involvedQKind, involvedQName, event.Message)
115 printed[eventQName] = true
116 }
117 }
118 return nil
119 })
120 if err != nil {
121 return false, err
122 }
123
124 listener := func(watcher *k8s.Watcher) error {
125 for kind, names := range w.kinds {
126 for name := range names {
127 r, err := watcher.Get(kind.String(), name)
128 if err != nil {
129 return err
130 }
131 if Ready(r) {
132 if ReadyImplemented(r) {
133 dlog.Printf(ctx, "ready: %s/%s\n", r.QKind(), r.QName())
134 } else {
135 dlog.Printf(ctx, "ready: %s/%s (UNIMPLEMENTED)\n",
136 r.QKind(), r.QName())
137 }
138 w.remove(kind, name)
139 }
140 }
141 }
142
143 if w.isEmpty() {
144 watcher.Stop()
145 }
146 return nil
147 }
148
149 for k := range w.kinds {
150 if err := w.watcher.WatchQuery(k8s.Query{Kind: k.String(), Namespace: k8s.NamespaceAll}, listener); err != nil {
151 return false, err
152 }
153 }
154
155 if err := w.watcher.Start(ctx); err != nil {
156 return false, err
157 }
158
159 go func() {
160 time.Sleep(time.Until(deadline))
161 w.watcher.Stop()
162 }()
163
164 if err := w.watcher.Wait(ctx); err != nil {
165 return false, err
166 }
167
168 result := true
169
170 for kind, names := range w.kinds {
171 for name := range names {
172 fmt.Printf("not ready: %s/%s\n", kind, name)
173 result = false
174 }
175 }
176
177 return result, nil
178 }
179
View as plain text