1
16
17 package tainteviction
18
19 import (
20 "context"
21 "sync"
22 "time"
23
24 "k8s.io/apimachinery/pkg/types"
25 "k8s.io/klog/v2"
26 "k8s.io/utils/clock"
27 )
28
29
30 type WorkArgs struct {
31 NamespacedName types.NamespacedName
32 }
33
34
35 func (w *WorkArgs) KeyFromWorkArgs() string {
36 return w.NamespacedName.String()
37 }
38
39
40 func NewWorkArgs(name, namespace string) *WorkArgs {
41 return &WorkArgs{
42 NamespacedName: types.NamespacedName{Namespace: namespace, Name: name},
43 }
44 }
45
46
47 type TimedWorker struct {
48 WorkItem *WorkArgs
49 CreatedAt time.Time
50 FireAt time.Time
51 Timer clock.Timer
52 }
53
54
55 func createWorker(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(ctx context.Context, fireAt time.Time, args *WorkArgs) error, clock clock.WithDelayedExecution) *TimedWorker {
56 delay := fireAt.Sub(createdAt)
57 logger := klog.FromContext(ctx)
58 fWithErrorLogging := func() {
59 err := f(ctx, fireAt, args)
60 if err != nil {
61 logger.Error(err, "TaintEvictionController: timed worker failed")
62 }
63 }
64 if delay <= 0 {
65 go fWithErrorLogging()
66 return nil
67 }
68 timer := clock.AfterFunc(delay, fWithErrorLogging)
69 return &TimedWorker{
70 WorkItem: args,
71 CreatedAt: createdAt,
72 FireAt: fireAt,
73 Timer: timer,
74 }
75 }
76
77
78 func (w *TimedWorker) Cancel() {
79 if w != nil {
80 w.Timer.Stop()
81 }
82 }
83
84
85 type TimedWorkerQueue struct {
86 sync.Mutex
87
88 workers map[string]*TimedWorker
89 workFunc func(ctx context.Context, fireAt time.Time, args *WorkArgs) error
90 clock clock.WithDelayedExecution
91 }
92
93
94
95 func CreateWorkerQueue(f func(ctx context.Context, fireAt time.Time, args *WorkArgs) error) *TimedWorkerQueue {
96 return &TimedWorkerQueue{
97 workers: make(map[string]*TimedWorker),
98 workFunc: f,
99 clock: clock.RealClock{},
100 }
101 }
102
103 func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
104 return func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
105 err := q.workFunc(ctx, fireAt, args)
106 q.Lock()
107 defer q.Unlock()
108 if err == nil {
109
110
111 q.workers[key] = nil
112 } else {
113 delete(q.workers, key)
114 }
115 return err
116 }
117 }
118
119
120 func (q *TimedWorkerQueue) AddWork(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time) {
121 key := args.KeyFromWorkArgs()
122 logger := klog.FromContext(ctx)
123 logger.V(4).Info("Adding TimedWorkerQueue item and to be fired at firedTime", "item", key, "createTime", createdAt, "firedTime", fireAt)
124
125 q.Lock()
126 defer q.Unlock()
127 if _, exists := q.workers[key]; exists {
128 logger.Info("Trying to add already existing work, skipping", "args", args)
129 return
130 }
131 worker := createWorker(ctx, args, createdAt, fireAt, q.getWrappedWorkerFunc(key), q.clock)
132 q.workers[key] = worker
133 }
134
135
136 func (q *TimedWorkerQueue) CancelWork(logger klog.Logger, key string) bool {
137 q.Lock()
138 defer q.Unlock()
139 worker, found := q.workers[key]
140 result := false
141 if found {
142 logger.V(4).Info("Cancelling TimedWorkerQueue item", "item", key, "time", time.Now())
143 if worker != nil {
144 result = true
145 worker.Cancel()
146 }
147 delete(q.workers, key)
148 }
149 return result
150 }
151
152
153
154 func (q *TimedWorkerQueue) GetWorkerUnsafe(key string) *TimedWorker {
155 q.Lock()
156 defer q.Unlock()
157 return q.workers[key]
158 }
159
View as plain text