1
2 package ccache
3
4 import (
5 "container/list"
6 "hash/fnv"
7 "sync/atomic"
8 "time"
9 )
10
11 type LayeredCache struct {
12 *Configuration
13 list *list.List
14 buckets []*layeredBucket
15 bucketMask uint32
16 size int64
17 deletables chan *Item
18 promotables chan *Item
19 control chan interface{}
20 }
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 func Layered(config *Configuration) *LayeredCache {
36 c := &LayeredCache{
37 list: list.New(),
38 Configuration: config,
39 bucketMask: uint32(config.buckets) - 1,
40 buckets: make([]*layeredBucket, config.buckets),
41 deletables: make(chan *Item, config.deleteBuffer),
42 control: make(chan interface{}),
43 }
44 for i := 0; i < int(config.buckets); i++ {
45 c.buckets[i] = &layeredBucket{
46 buckets: make(map[string]*bucket),
47 }
48 }
49 c.restart()
50 return c
51 }
52
53 func (c *LayeredCache) ItemCount() int {
54 count := 0
55 for _, b := range c.buckets {
56 count += b.itemCount()
57 }
58 return count
59 }
60
61
62
63
64
65 func (c *LayeredCache) Get(primary, secondary string) *Item {
66 item := c.bucket(primary).get(primary, secondary)
67 if item == nil {
68 return nil
69 }
70 if item.expires > time.Now().UnixNano() {
71 c.promote(item)
72 }
73 return item
74 }
75
76 func (c *LayeredCache) ForEachFunc(primary string, matches func(key string, item *Item) bool) {
77 c.bucket(primary).forEachFunc(primary, matches)
78 }
79
80
81
82
83 func (c *LayeredCache) GetOrCreateSecondaryCache(primary string) *SecondaryCache {
84 primaryBkt := c.bucket(primary)
85 bkt := primaryBkt.getSecondaryBucket(primary)
86 primaryBkt.Lock()
87 if bkt == nil {
88 bkt = &bucket{lookup: make(map[string]*Item)}
89 primaryBkt.buckets[primary] = bkt
90 }
91 primaryBkt.Unlock()
92 return &SecondaryCache{
93 bucket: bkt,
94 pCache: c,
95 }
96 }
97
98
99
100 func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
101 item := c.Get(primary, secondary)
102 if item == nil {
103 return NilTracked
104 }
105 item.track()
106 return item
107 }
108
109
110 func (c *LayeredCache) TrackingSet(primary, secondary string, value interface{}, duration time.Duration) TrackedItem {
111 return c.set(primary, secondary, value, duration, true)
112 }
113
114
115 func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
116 c.set(primary, secondary, value, duration, false)
117 }
118
119
120
121
122 func (c *LayeredCache) Replace(primary, secondary string, value interface{}) bool {
123 item := c.bucket(primary).get(primary, secondary)
124 if item == nil {
125 return false
126 }
127 c.Set(primary, secondary, value, item.TTL())
128 return true
129 }
130
131
132
133
134 func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
135 item := c.Get(primary, secondary)
136 if item != nil {
137 return item, nil
138 }
139 value, err := fetch()
140 if err != nil {
141 return nil, err
142 }
143 return c.set(primary, secondary, value, duration, false), nil
144 }
145
146
147 func (c *LayeredCache) Delete(primary, secondary string) bool {
148 item := c.bucket(primary).delete(primary, secondary)
149 if item != nil {
150 c.deletables <- item
151 return true
152 }
153 return false
154 }
155
156
157 func (c *LayeredCache) DeleteAll(primary string) bool {
158 return c.bucket(primary).deleteAll(primary, c.deletables)
159 }
160
161
162 func (c *LayeredCache) DeletePrefix(primary, prefix string) int {
163 return c.bucket(primary).deletePrefix(primary, prefix, c.deletables)
164 }
165
166
167 func (c *LayeredCache) DeleteFunc(primary string, matches func(key string, item *Item) bool) int {
168 return c.bucket(primary).deleteFunc(primary, matches, c.deletables)
169 }
170
171
172 func (c *LayeredCache) Clear() {
173 done := make(chan struct{})
174 c.control <- clear{done: done}
175 <-done
176 }
177
178 func (c *LayeredCache) Stop() {
179 close(c.promotables)
180 <-c.control
181 }
182
183
184
185 func (c *LayeredCache) GetDropped() int {
186 res := make(chan int)
187 c.control <- getDropped{res: res}
188 return <-res
189 }
190
191
192
193 func (c *LayeredCache) SetMaxSize(size int64) {
194 c.control <- setMaxSize{size}
195 }
196
197 func (c *LayeredCache) restart() {
198 c.promotables = make(chan *Item, c.promoteBuffer)
199 c.control = make(chan interface{})
200 go c.worker()
201 }
202
203 func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration, track bool) *Item {
204 item, existing := c.bucket(primary).set(primary, secondary, value, duration, track)
205 if existing != nil {
206 c.deletables <- existing
207 }
208 c.promote(item)
209 return item
210 }
211
212 func (c *LayeredCache) bucket(key string) *layeredBucket {
213 h := fnv.New32a()
214 h.Write([]byte(key))
215 return c.buckets[h.Sum32()&c.bucketMask]
216 }
217
218 func (c *LayeredCache) promote(item *Item) {
219 c.promotables <- item
220 }
221
222 func (c *LayeredCache) worker() {
223 defer close(c.control)
224 dropped := 0
225 for {
226 select {
227 case item, ok := <-c.promotables:
228 if ok == false {
229 return
230 }
231 if c.doPromote(item) && c.size > c.maxSize {
232 dropped += c.gc()
233 }
234 case item := <-c.deletables:
235 if item.element == nil {
236 atomic.StoreInt32(&item.promotions, -2)
237 } else {
238 c.size -= item.size
239 if c.onDelete != nil {
240 c.onDelete(item)
241 }
242 c.list.Remove(item.element)
243 }
244 case control := <-c.control:
245 switch msg := control.(type) {
246 case getDropped:
247 msg.res <- dropped
248 dropped = 0
249 case setMaxSize:
250 c.maxSize = msg.size
251 if c.size > c.maxSize {
252 dropped += c.gc()
253 }
254 case clear:
255 for _, bucket := range c.buckets {
256 bucket.clear()
257 }
258 c.size = 0
259 c.list = list.New()
260 msg.done <- struct{}{}
261 }
262 }
263 }
264 }
265
266 func (c *LayeredCache) doPromote(item *Item) bool {
267
268 if atomic.LoadInt32(&item.promotions) == -2 {
269 return false
270 }
271 if item.element != nil {
272 if item.shouldPromote(c.getsPerPromote) {
273 c.list.MoveToFront(item.element)
274 atomic.StoreInt32(&item.promotions, 0)
275 }
276 return false
277 }
278 c.size += item.size
279 item.element = c.list.PushFront(item)
280 return true
281 }
282
283 func (c *LayeredCache) gc() int {
284 element := c.list.Back()
285 dropped := 0
286 for i := 0; i < c.itemsToPrune; i++ {
287 if element == nil {
288 return dropped
289 }
290 prev := element.Prev()
291 item := element.Value.(*Item)
292 if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
293 c.bucket(item.group).delete(item.group, item.key)
294 c.size -= item.size
295 c.list.Remove(element)
296 item.promotions = -2
297 dropped += 1
298 }
299 element = prev
300 }
301 return dropped
302 }
303
View as plain text