...
1 package agent
2
3 import (
4 "fmt"
5 "sync"
6
7 "github.com/datawire/ambassador/v2/pkg/kates"
8 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
9 "k8s.io/apimachinery/pkg/types"
10 )
11
12
13
14 type RolloutStore struct {
15
16
17
18 deltas []*kates.Delta
19
20
21
22 sotw map[types.UID]*unstructured.Unstructured
23
24 mux sync.Mutex
25 }
26
27
28
29 type ApplicationStore struct {
30
31
32
33 deltas []*kates.Delta
34
35
36
37 sotw map[types.UID]*unstructured.Unstructured
38
39 mux sync.Mutex
40 }
41
42
43 func NewApplicationStore() *ApplicationStore {
44 return &ApplicationStore{}
45 }
46
47
48 func (a *ApplicationStore) Deltas() []*kates.Delta {
49 return a.deltas
50 }
51
52
53
54 func (a *ApplicationStore) StateOfWorld() []*unstructured.Unstructured {
55 results := []*unstructured.Unstructured{}
56 for _, v := range a.sotw {
57 results = append(results, v)
58 }
59 return results
60 }
61
62
63 func (a *ApplicationStore) FromCallback(callback *GenericCallback) (*ApplicationStore, error) {
64 a.mux.Lock()
65 defer a.mux.Unlock()
66 if a.sotw == nil {
67 uMap, err := toUntructuredMap(callback.Sotw)
68 if err != nil {
69 return nil, err
70 }
71 a.sotw = uMap
72 }
73 a.deltas = append(a.deltas, toDelta(callback.Obj, callback.EventType))
74 switch callback.EventType {
75 case CallbackEventAdded, CallbackEventUpdated:
76 a.sotw[callback.Obj.GetUID()] = callback.Obj
77 case CallbackEventDeleted:
78 delete(a.sotw, callback.Obj.GetUID())
79 }
80 return a, nil
81 }
82
83
84 func NewRolloutStore() *RolloutStore {
85 return &RolloutStore{}
86 }
87
88
89 func (s *RolloutStore) Deltas() []*kates.Delta {
90 return s.deltas
91 }
92
93
94
95 func (a *RolloutStore) StateOfWorld() []*unstructured.Unstructured {
96 results := []*unstructured.Unstructured{}
97 for _, v := range a.sotw {
98 results = append(results, v)
99 }
100 return results
101 }
102
103
104 func (r *RolloutStore) FromCallback(callback *GenericCallback) (*RolloutStore, error) {
105 r.mux.Lock()
106 defer r.mux.Unlock()
107 if r.sotw == nil {
108 uMap, err := toUntructuredMap(callback.Sotw)
109 if err != nil {
110 return nil, err
111 }
112 r.sotw = uMap
113 }
114 r.deltas = append(r.deltas, toDelta(callback.Obj, callback.EventType))
115 switch callback.EventType {
116 case CallbackEventAdded, CallbackEventUpdated:
117 r.sotw[callback.Obj.GetUID()] = callback.Obj
118 case CallbackEventDeleted:
119 delete(r.sotw, callback.Obj.GetUID())
120 }
121 return r, nil
122 }
123
124 func toUntructuredMap(objs []interface{}) (map[types.UID]*unstructured.Unstructured, error) {
125 results := make(map[types.UID]*unstructured.Unstructured)
126 for _, obj := range objs {
127 u, ok := obj.(*unstructured.Unstructured)
128 if !ok {
129 return nil, fmt.Errorf("toUntructuredSlice error: obj is %T: expected unstructured.Unstructured", obj)
130 }
131 results[u.GetUID()] = u
132 }
133 return results, nil
134 }
135
136 func toDelta(obj *unstructured.Unstructured, t CallbackEventType) *kates.Delta {
137 deltaType := toKatesDeltaType(t)
138 return &kates.Delta{
139 TypeMeta: kates.TypeMeta{
140 APIVersion: obj.GetAPIVersion(),
141 Kind: obj.GetKind(),
142 },
143 ObjectMeta: kates.ObjectMeta{
144 Name: obj.GetName(),
145 Namespace: obj.GetNamespace(),
146 CreationTimestamp: obj.GetCreationTimestamp(),
147 },
148 DeltaType: deltaType,
149 }
150 }
151
152 func toKatesDeltaType(t CallbackEventType) kates.DeltaType {
153 var kt kates.DeltaType
154 switch t {
155 case CallbackEventAdded:
156 kt = kates.ObjectAdd
157 case CallbackEventUpdated:
158 kt = kates.ObjectUpdate
159 case CallbackEventDeleted:
160 kt = kates.ObjectDelete
161 }
162 return kt
163 }
164
View as plain text