1
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 package cached
52
53 import (
54 "fmt"
55 "sync"
56 "sync/atomic"
57 )
58
59
60 type Value[T any] interface {
61 Get() (value T, etag string, err error)
62 }
63
64
65
66 type Result[T any] struct {
67 Value T
68 Etag string
69 Err error
70 }
71
72 func (r Result[T]) Get() (T, string, error) {
73 return r.Value, r.Etag, r.Err
74 }
75
76
77 func Func[T any](fn func() (T, string, error)) Value[T] {
78 return valueFunc[T](fn)
79 }
80
81 type valueFunc[T any] func() (T, string, error)
82
83 func (c valueFunc[T]) Get() (T, string, error) {
84 return c()
85 }
86
87
88 func Static[T any](value T, etag string) Value[T] {
89 return Result[T]{Value: value, Etag: etag}
90 }
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 func Merge[K comparable, T, V any](mergeFn func(results map[K]Result[T]) (V, string, error), caches map[K]Value[T]) Value[V] {
108 list := make([]Value[T], 0, len(caches))
109
110
111 indexes := make(map[int]K, len(caches))
112 i := 0
113 for k := range caches {
114 list = append(list, caches[k])
115 indexes[i] = k
116 i++
117 }
118
119 return MergeList(func(results []Result[T]) (V, string, error) {
120 if len(results) != len(indexes) {
121 panic(fmt.Errorf("invalid result length %d, expected %d", len(results), len(indexes)))
122 }
123 m := make(map[K]Result[T], len(results))
124 for i := range results {
125 m[indexes[i]] = results[i]
126 }
127 return mergeFn(m)
128 }, list)
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 func MergeList[T, V any](mergeFn func(results []Result[T]) (V, string, error), delegates []Value[T]) Value[V] {
147 return &listMerger[T, V]{
148 mergeFn: mergeFn,
149 delegates: delegates,
150 }
151 }
152
153 type listMerger[T, V any] struct {
154 lock sync.Mutex
155 mergeFn func([]Result[T]) (V, string, error)
156 delegates []Value[T]
157 cache []Result[T]
158 result Result[V]
159 }
160
161 func (c *listMerger[T, V]) prepareResultsLocked() []Result[T] {
162 cacheResults := make([]Result[T], len(c.delegates))
163 ch := make(chan struct {
164 int
165 Result[T]
166 }, len(c.delegates))
167 for i := range c.delegates {
168 go func(index int) {
169 value, etag, err := c.delegates[index].Get()
170 ch <- struct {
171 int
172 Result[T]
173 }{index, Result[T]{Value: value, Etag: etag, Err: err}}
174 }(i)
175 }
176 for i := 0; i < len(c.delegates); i++ {
177 res := <-ch
178 cacheResults[res.int] = res.Result
179 }
180 return cacheResults
181 }
182
183 func (c *listMerger[T, V]) needsRunningLocked(results []Result[T]) bool {
184 if c.cache == nil {
185 return true
186 }
187 if c.result.Err != nil {
188 return true
189 }
190 if len(results) != len(c.cache) {
191 panic(fmt.Errorf("invalid number of results: %v (expected %v)", len(results), len(c.cache)))
192 }
193 for i, oldResult := range c.cache {
194 newResult := results[i]
195 if newResult.Etag != oldResult.Etag || newResult.Err != nil || oldResult.Err != nil {
196 return true
197 }
198 }
199 return false
200 }
201
202 func (c *listMerger[T, V]) Get() (V, string, error) {
203 c.lock.Lock()
204 defer c.lock.Unlock()
205 cacheResults := c.prepareResultsLocked()
206 if c.needsRunningLocked(cacheResults) {
207 c.cache = cacheResults
208 c.result.Value, c.result.Etag, c.result.Err = c.mergeFn(c.cache)
209 }
210 return c.result.Value, c.result.Etag, c.result.Err
211 }
212
213
214
215
216
217
218
219 func Transform[T, V any](transformerFn func(T, string, error) (V, string, error), source Value[T]) Value[V] {
220 return MergeList(func(delegates []Result[T]) (V, string, error) {
221 if len(delegates) != 1 {
222 panic(fmt.Errorf("invalid cache for transformer cache: %v", delegates))
223 }
224 return transformerFn(delegates[0].Value, delegates[0].Etag, delegates[0].Err)
225 }, []Value[T]{source})
226 }
227
228
229 func Once[T any](d Value[T]) Value[T] {
230 return &once[T]{
231 data: d,
232 }
233 }
234
235 type once[T any] struct {
236 once sync.Once
237 data Value[T]
238 result Result[T]
239 }
240
241 func (c *once[T]) Get() (T, string, error) {
242 c.once.Do(func() {
243 c.result.Value, c.result.Etag, c.result.Err = c.data.Get()
244 })
245 return c.result.Value, c.result.Etag, c.result.Err
246 }
247
248
249
250 type Replaceable[T any] interface {
251 Value[T]
252 Store(Value[T])
253 }
254
255
256
257 type Atomic[T any] struct {
258 value atomic.Pointer[Value[T]]
259 }
260
261 var _ Replaceable[[]byte] = &Atomic[[]byte]{}
262
263 func (x *Atomic[T]) Store(val Value[T]) { x.value.Store(&val) }
264 func (x *Atomic[T]) Get() (T, string, error) { return (*x.value.Load()).Get() }
265
266
267
268 type LastSuccess[T any] struct {
269 Atomic[T]
270 success atomic.Pointer[Result[T]]
271 }
272
273 var _ Replaceable[[]byte] = &LastSuccess[[]byte]{}
274
275 func (c *LastSuccess[T]) Get() (T, string, error) {
276 success := c.success.Load()
277 value, etag, err := c.Atomic.Get()
278 if err == nil {
279 if success == nil {
280 c.success.CompareAndSwap(nil, &Result[T]{Value: value, Etag: etag, Err: err})
281 }
282 return value, etag, err
283 }
284
285 if success != nil {
286 return success.Value, success.Etag, success.Err
287 }
288
289 return value, etag, err
290 }
291
View as plain text