...
1
16
17 package queue
18
19 import (
20 "sync"
21 "time"
22
23 "k8s.io/apimachinery/pkg/types"
24 "k8s.io/utils/clock"
25 )
26
27
28
29 type WorkQueue interface {
30
31 GetWork() []types.UID
32
33 Enqueue(item types.UID, delay time.Duration)
34 }
35
36 type basicWorkQueue struct {
37 clock clock.Clock
38 lock sync.Mutex
39 queue map[types.UID]time.Time
40 }
41
42 var _ WorkQueue = &basicWorkQueue{}
43
44
45 func NewBasicWorkQueue(clock clock.Clock) WorkQueue {
46 queue := make(map[types.UID]time.Time)
47 return &basicWorkQueue{queue: queue, clock: clock}
48 }
49
50 func (q *basicWorkQueue) GetWork() []types.UID {
51 q.lock.Lock()
52 defer q.lock.Unlock()
53 now := q.clock.Now()
54 var items []types.UID
55 for k, v := range q.queue {
56 if v.Before(now) {
57 items = append(items, k)
58 delete(q.queue, k)
59 }
60 }
61 return items
62 }
63
64 func (q *basicWorkQueue) Enqueue(item types.UID, delay time.Duration) {
65 q.lock.Lock()
66 defer q.lock.Unlock()
67 q.queue[item] = q.clock.Now().Add(delay)
68 }
69
View as plain text