1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package cache
17
18 import (
19 "context"
20 "errors"
21 "strconv"
22 "strings"
23 "sync"
24
25 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
26 )
27
28 type watches = map[chan Response]struct{}
29
30
31
32
33
34
35 type LinearCache struct {
36
37 typeURL string
38
39 resources map[string]types.Resource
40
41
42 watches map[string]watches
43
44 watchAll watches
45
46 version uint64
47
48 versionPrefix string
49
50 versionVector map[string]uint64
51 mu sync.Mutex
52 }
53
54 var _ Cache = &LinearCache{}
55
56
57 type LinearCacheOption func(*LinearCache)
58
59
60
61
62 func WithVersionPrefix(prefix string) LinearCacheOption {
63 return func(cache *LinearCache) {
64 cache.versionPrefix = prefix
65 }
66 }
67
68
69 func WithInitialResources(resources map[string]types.Resource) LinearCacheOption {
70 return func(cache *LinearCache) {
71 cache.resources = resources
72 for name := range resources {
73 cache.versionVector[name] = 0
74 }
75 }
76 }
77
78
79 func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
80 out := &LinearCache{
81 typeURL: typeURL,
82 resources: make(map[string]types.Resource),
83 watches: make(map[string]watches),
84 watchAll: make(watches),
85 version: 0,
86 versionVector: make(map[string]uint64),
87 }
88 for _, opt := range opts {
89 opt(out)
90 }
91 return out
92 }
93
94 func (cache *LinearCache) respond(value chan Response, staleResources []string) {
95 var resources []types.ResourceWithTtl
96
97 if len(staleResources) == 0 {
98 resources = make([]types.ResourceWithTtl, 0, len(cache.resources))
99 for _, resource := range cache.resources {
100 resources = append(resources, types.ResourceWithTtl{Resource: resource})
101 }
102 } else {
103 resources = make([]types.ResourceWithTtl, 0, len(staleResources))
104 for _, name := range staleResources {
105 resource := cache.resources[name]
106 if resource != nil {
107 resources = append(resources, types.ResourceWithTtl{Resource: resource})
108 }
109 }
110 }
111 value <- &RawResponse{
112 Request: &Request{TypeUrl: cache.typeURL},
113 Resources: resources,
114 Version: cache.versionPrefix + strconv.FormatUint(cache.version, 10),
115 }
116 }
117
118 func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
119
120 notifyList := make(map[chan Response][]string)
121 for name := range modified {
122 for watch := range cache.watches[name] {
123 notifyList[watch] = append(notifyList[watch], name)
124 }
125 delete(cache.watches, name)
126 }
127 for value, stale := range notifyList {
128 cache.respond(value, stale)
129 }
130 for value := range cache.watchAll {
131 cache.respond(value, nil)
132 }
133 cache.watchAll = make(watches)
134 }
135
136
137 func (cache *LinearCache) UpdateResource(name string, res types.Resource) error {
138 if res == nil {
139 return errors.New("nil resource")
140 }
141 cache.mu.Lock()
142 defer cache.mu.Unlock()
143
144 cache.version += 1
145 cache.versionVector[name] = cache.version
146 cache.resources[name] = res
147
148
149 cache.notifyAll(map[string]struct{}{name: {}})
150
151 return nil
152 }
153
154
155 func (cache *LinearCache) DeleteResource(name string) error {
156 cache.mu.Lock()
157 defer cache.mu.Unlock()
158
159 cache.version += 1
160 delete(cache.versionVector, name)
161 delete(cache.resources, name)
162
163
164 cache.notifyAll(map[string]struct{}{name: {}})
165 return nil
166 }
167
168 func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func()) {
169 value := make(chan Response, 1)
170 if request.TypeUrl != cache.typeURL {
171 close(value)
172 return value, nil
173 }
174
175
176
177 stale := false
178 staleResources := []string{}
179
180
181 var lastVersion uint64
182 var err error
183 if strings.HasPrefix(request.VersionInfo, cache.versionPrefix) {
184 lastVersion, err = strconv.ParseUint(request.VersionInfo[len(cache.versionPrefix):], 0, 64)
185 } else {
186 err = errors.New("mis-matched version prefix")
187 }
188
189 cache.mu.Lock()
190 defer cache.mu.Unlock()
191
192 if err != nil {
193 stale = true
194 staleResources = request.ResourceNames
195 } else if len(request.ResourceNames) == 0 {
196 stale = lastVersion != cache.version
197 } else {
198 for _, name := range request.ResourceNames {
199
200 if lastVersion < cache.versionVector[name] {
201 stale = true
202 staleResources = append(staleResources, name)
203 }
204 }
205 }
206 if stale {
207 cache.respond(value, staleResources)
208 return value, nil
209 }
210
211 if len(request.ResourceNames) == 0 {
212 cache.watchAll[value] = struct{}{}
213 return value, func() {
214 cache.mu.Lock()
215 defer cache.mu.Unlock()
216 delete(cache.watchAll, value)
217 }
218 }
219 for _, name := range request.ResourceNames {
220 set, exists := cache.watches[name]
221 if !exists {
222 set = make(watches)
223 cache.watches[name] = set
224 }
225 set[value] = struct{}{}
226 }
227 return value, func() {
228 cache.mu.Lock()
229 defer cache.mu.Unlock()
230 for _, name := range request.ResourceNames {
231 set, exists := cache.watches[name]
232 if exists {
233 delete(set, value)
234 }
235 if len(set) == 0 {
236 delete(cache.watches, name)
237 }
238 }
239 }
240 }
241
242 func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) {
243 return nil, errors.New("not implemented")
244 }
245
246
247 func (cache *LinearCache) NumWatches(name string) int {
248 cache.mu.Lock()
249 defer cache.mu.Unlock()
250 return len(cache.watches[name]) + len(cache.watchAll)
251 }
252
View as plain text