1 package puddle
2
3 import (
4 "context"
5 "errors"
6 "sync"
7 "time"
8 )
9
10 const (
11 resourceStatusConstructing = 0
12 resourceStatusIdle = iota
13 resourceStatusAcquired = iota
14 resourceStatusHijacked = iota
15 )
16
17
18
19 var ErrClosedPool = errors.New("closed pool")
20
21
22
23 var ErrNotAvailable = errors.New("resource not available")
24
25
26 type Constructor func(ctx context.Context) (res interface{}, err error)
27
28
29 type Destructor func(res interface{})
30
31
32 type Resource struct {
33 value interface{}
34 pool *Pool
35 creationTime time.Time
36 lastUsedNano int64
37 status byte
38 }
39
40
41 func (res *Resource) Value() interface{} {
42 if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
43 panic("tried to access resource that is not acquired or hijacked")
44 }
45 return res.value
46 }
47
48
49 func (res *Resource) Release() {
50 if res.status != resourceStatusAcquired {
51 panic("tried to release resource that is not acquired")
52 }
53 res.pool.releaseAcquiredResource(res, nanotime())
54 }
55
56
57
58 func (res *Resource) ReleaseUnused() {
59 if res.status != resourceStatusAcquired {
60 panic("tried to release resource that is not acquired")
61 }
62 res.pool.releaseAcquiredResource(res, res.lastUsedNano)
63 }
64
65
66
67 func (res *Resource) Destroy() {
68 if res.status != resourceStatusAcquired {
69 panic("tried to destroy resource that is not acquired")
70 }
71 go res.pool.destroyAcquiredResource(res)
72 }
73
74
75
76 func (res *Resource) Hijack() {
77 if res.status != resourceStatusAcquired {
78 panic("tried to hijack resource that is not acquired")
79 }
80 res.pool.hijackAcquiredResource(res)
81 }
82
83
84 func (res *Resource) CreationTime() time.Time {
85 if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
86 panic("tried to access resource that is not acquired or hijacked")
87 }
88 return res.creationTime
89 }
90
91
92
93
94 func (res *Resource) LastUsedNanotime() int64 {
95 if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
96 panic("tried to access resource that is not acquired or hijacked")
97 }
98
99 return res.lastUsedNano
100 }
101
102
103
104 func (res *Resource) IdleDuration() time.Duration {
105 if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
106 panic("tried to access resource that is not acquired or hijacked")
107 }
108
109 return time.Duration(nanotime() - res.lastUsedNano)
110 }
111
112
113 type Pool struct {
114 cond *sync.Cond
115 destructWG *sync.WaitGroup
116
117 allResources []*Resource
118 idleResources []*Resource
119
120 constructor Constructor
121 destructor Destructor
122 maxSize int32
123
124 acquireCount int64
125 acquireDuration time.Duration
126 emptyAcquireCount int64
127 canceledAcquireCount int64
128
129 closed bool
130 }
131
132
133 func NewPool(constructor Constructor, destructor Destructor, maxSize int32) *Pool {
134 if maxSize < 1 {
135 panic("maxSize is less than 1")
136 }
137
138 return &Pool{
139 cond: sync.NewCond(new(sync.Mutex)),
140 destructWG: &sync.WaitGroup{},
141 maxSize: maxSize,
142 constructor: constructor,
143 destructor: destructor,
144 }
145 }
146
147
148
149 func (p *Pool) Close() {
150 p.cond.L.Lock()
151 if p.closed {
152 p.cond.L.Unlock()
153 return
154 }
155 p.closed = true
156
157 for _, res := range p.idleResources {
158 p.allResources = removeResource(p.allResources, res)
159 go p.destructResourceValue(res.value)
160 }
161 p.idleResources = nil
162 p.cond.L.Unlock()
163
164
165 p.cond.Broadcast()
166
167 p.destructWG.Wait()
168 }
169
170
171 type Stat struct {
172 constructingResources int32
173 acquiredResources int32
174 idleResources int32
175 maxResources int32
176 acquireCount int64
177 acquireDuration time.Duration
178 emptyAcquireCount int64
179 canceledAcquireCount int64
180 }
181
182
183
184
185 func (s *Stat) TotalResources() int32 {
186 return s.constructingResources + s.acquiredResources + s.idleResources
187 }
188
189
190
191 func (s *Stat) ConstructingResources() int32 {
192 return s.constructingResources
193 }
194
195
196 func (s *Stat) AcquiredResources() int32 {
197 return s.acquiredResources
198 }
199
200
201 func (s *Stat) IdleResources() int32 {
202 return s.idleResources
203 }
204
205
206 func (s *Stat) MaxResources() int32 {
207 return s.maxResources
208 }
209
210
211 func (s *Stat) AcquireCount() int64 {
212 return s.acquireCount
213 }
214
215
216
217 func (s *Stat) AcquireDuration() time.Duration {
218 return s.acquireDuration
219 }
220
221
222
223
224 func (s *Stat) EmptyAcquireCount() int64 {
225 return s.emptyAcquireCount
226 }
227
228
229
230 func (s *Stat) CanceledAcquireCount() int64 {
231 return s.canceledAcquireCount
232 }
233
234
235 func (p *Pool) Stat() *Stat {
236 p.cond.L.Lock()
237 s := &Stat{
238 maxResources: p.maxSize,
239 acquireCount: p.acquireCount,
240 emptyAcquireCount: p.emptyAcquireCount,
241 canceledAcquireCount: p.canceledAcquireCount,
242 acquireDuration: p.acquireDuration,
243 }
244
245 for _, res := range p.allResources {
246 switch res.status {
247 case resourceStatusConstructing:
248 s.constructingResources += 1
249 case resourceStatusIdle:
250 s.idleResources += 1
251 case resourceStatusAcquired:
252 s.acquiredResources += 1
253 }
254 }
255
256 p.cond.L.Unlock()
257 return s
258 }
259
260
261
262
263
264 func (p *Pool) Acquire(ctx context.Context) (*Resource, error) {
265 startNano := nanotime()
266 if doneChan := ctx.Done(); doneChan != nil {
267 select {
268 case <-ctx.Done():
269 p.cond.L.Lock()
270 p.canceledAcquireCount += 1
271 p.cond.L.Unlock()
272 return nil, ctx.Err()
273 default:
274 }
275 }
276
277 p.cond.L.Lock()
278
279 emptyAcquire := false
280
281 for {
282 if p.closed {
283 p.cond.L.Unlock()
284 return nil, ErrClosedPool
285 }
286
287
288 if len(p.idleResources) > 0 {
289 res := p.idleResources[len(p.idleResources)-1]
290 p.idleResources[len(p.idleResources)-1] = nil
291 p.idleResources = p.idleResources[:len(p.idleResources)-1]
292 res.status = resourceStatusAcquired
293 if emptyAcquire {
294 p.emptyAcquireCount += 1
295 }
296 p.acquireCount += 1
297 p.acquireDuration += time.Duration(nanotime() - startNano)
298 p.cond.L.Unlock()
299 return res, nil
300 }
301
302 emptyAcquire = true
303
304
305 if len(p.allResources) < int(p.maxSize) {
306 res := &Resource{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
307 p.allResources = append(p.allResources, res)
308 p.destructWG.Add(1)
309 p.cond.L.Unlock()
310
311
312
313
314
315 constructErrCh := make(chan error)
316 go func() {
317 value, err := p.constructResourceValue(ctx)
318 p.cond.L.Lock()
319 if err != nil {
320 p.allResources = removeResource(p.allResources, res)
321 p.destructWG.Done()
322
323
324
325 select {
326 case constructErrCh <- err:
327 case <-ctx.Done():
328 p.canceledAcquireCount += 1
329 }
330 p.cond.L.Unlock()
331 p.cond.Signal()
332 return
333 }
334 res.value = value
335
336
337 res.status = resourceStatusAcquired
338
339
340 select {
341 case constructErrCh <- nil:
342 p.emptyAcquireCount += 1
343 p.acquireCount += 1
344 p.acquireDuration += time.Duration(nanotime() - startNano)
345 p.cond.L.Unlock()
346
347 case <-ctx.Done():
348 p.canceledAcquireCount += 1
349 p.cond.L.Unlock()
350
351
352
353
354 p.releaseAcquiredResource(res, res.lastUsedNano)
355 }
356 }()
357
358 select {
359 case <-ctx.Done():
360 return nil, ctx.Err()
361 case err := <-constructErrCh:
362 if err != nil {
363 return nil, err
364 }
365
366
367 return res, nil
368 }
369 }
370
371 if ctx.Done() == nil {
372 p.cond.Wait()
373 } else {
374
375 waitChan := make(chan struct{}, 1)
376 go func() {
377 p.cond.Wait()
378 waitChan <- struct{}{}
379 }()
380
381 select {
382 case <-ctx.Done():
383
384
385 go func() {
386 <-waitChan
387 p.cond.L.Unlock()
388 p.cond.Signal()
389 }()
390
391 p.cond.L.Lock()
392 p.canceledAcquireCount += 1
393 p.cond.L.Unlock()
394 return nil, ctx.Err()
395 case <-waitChan:
396 }
397 }
398 }
399 }
400
401
402
403
404 func (p *Pool) TryAcquire(ctx context.Context) (*Resource, error) {
405 p.cond.L.Lock()
406 defer p.cond.L.Unlock()
407
408 if p.closed {
409 return nil, ErrClosedPool
410 }
411
412
413 if len(p.idleResources) > 0 {
414 res := p.idleResources[len(p.idleResources)-1]
415 p.idleResources[len(p.idleResources)-1] = nil
416 p.idleResources = p.idleResources[:len(p.idleResources)-1]
417 p.acquireCount += 1
418 res.status = resourceStatusAcquired
419 return res, nil
420 }
421
422 if len(p.allResources) < int(p.maxSize) {
423 res := &Resource{pool: p, creationTime: time.Now(), lastUsedNano: nanotime(), status: resourceStatusConstructing}
424 p.allResources = append(p.allResources, res)
425 p.destructWG.Add(1)
426
427 go func() {
428 value, err := p.constructResourceValue(ctx)
429 defer p.cond.Signal()
430 p.cond.L.Lock()
431 defer p.cond.L.Unlock()
432
433 if err != nil {
434 p.allResources = removeResource(p.allResources, res)
435 p.destructWG.Done()
436 return
437 }
438
439 res.value = value
440 res.status = resourceStatusIdle
441 p.idleResources = append(p.idleResources, res)
442 }()
443 }
444
445 return nil, ErrNotAvailable
446 }
447
448
449
450
451 func (p *Pool) AcquireAllIdle() []*Resource {
452 p.cond.L.Lock()
453 if p.closed {
454 p.cond.L.Unlock()
455 return nil
456 }
457
458 for _, res := range p.idleResources {
459 res.status = resourceStatusAcquired
460 }
461 resources := p.idleResources
462 p.idleResources = nil
463
464 p.cond.L.Unlock()
465 return resources
466 }
467
468
469
470
471 func (p *Pool) CreateResource(ctx context.Context) error {
472 p.cond.L.Lock()
473 if p.closed {
474 p.cond.L.Unlock()
475 return ErrClosedPool
476 }
477 p.cond.L.Unlock()
478
479 value, err := p.constructResourceValue(ctx)
480 if err != nil {
481 return err
482 }
483
484 res := &Resource{
485 pool: p,
486 creationTime: time.Now(),
487 status: resourceStatusIdle,
488 value: value,
489 lastUsedNano: nanotime(),
490 }
491 p.destructWG.Add(1)
492
493 p.cond.L.Lock()
494
495 if p.closed {
496 go p.destructResourceValue(res.value)
497 p.cond.L.Unlock()
498 return ErrClosedPool
499 }
500 p.allResources = append(p.allResources, res)
501 p.idleResources = append(p.idleResources, res)
502 p.cond.L.Unlock()
503
504 return nil
505 }
506
507
508 func (p *Pool) releaseAcquiredResource(res *Resource, lastUsedNano int64) {
509 p.cond.L.Lock()
510
511 if !p.closed {
512 res.lastUsedNano = lastUsedNano
513 res.status = resourceStatusIdle
514 p.idleResources = append(p.idleResources, res)
515 } else {
516 p.allResources = removeResource(p.allResources, res)
517 go p.destructResourceValue(res.value)
518 }
519
520 p.cond.L.Unlock()
521 p.cond.Signal()
522 }
523
524
525
526 func (p *Pool) destroyAcquiredResource(res *Resource) {
527 p.destructResourceValue(res.value)
528 p.cond.L.Lock()
529 p.allResources = removeResource(p.allResources, res)
530 p.cond.L.Unlock()
531 p.cond.Signal()
532 }
533
534 func (p *Pool) hijackAcquiredResource(res *Resource) {
535 p.cond.L.Lock()
536
537 p.allResources = removeResource(p.allResources, res)
538 res.status = resourceStatusHijacked
539 p.destructWG.Done()
540
541 p.cond.L.Unlock()
542 p.cond.Signal()
543 }
544
545 func removeResource(slice []*Resource, res *Resource) []*Resource {
546 for i := range slice {
547 if slice[i] == res {
548 slice[i] = slice[len(slice)-1]
549 slice[len(slice)-1] = nil
550 return slice[:len(slice)-1]
551 }
552 }
553
554 panic("BUG: removeResource could not find res in slice")
555 }
556
557 func (p *Pool) constructResourceValue(ctx context.Context) (interface{}, error) {
558 return p.constructor(ctx)
559 }
560
561 func (p *Pool) destructResourceValue(value interface{}) {
562 p.destructor(value)
563 p.destructWG.Done()
564 }
565
View as plain text