...
1
16
17 package controller
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "github.com/go-logr/logr"
25 "k8s.io/client-go/util/workqueue"
26 "k8s.io/klog/v2"
27
28 "sigs.k8s.io/controller-runtime/pkg/internal/controller"
29 "sigs.k8s.io/controller-runtime/pkg/manager"
30 "sigs.k8s.io/controller-runtime/pkg/ratelimiter"
31 "sigs.k8s.io/controller-runtime/pkg/reconcile"
32 "sigs.k8s.io/controller-runtime/pkg/source"
33 )
34
35
36 type Options struct {
37
38 MaxConcurrentReconciles int
39
40
41
42 CacheSyncTimeout time.Duration
43
44
45
46 RecoverPanic *bool
47
48
49
50 NeedLeaderElection *bool
51
52
53 Reconciler reconcile.Reconciler
54
55
56
57
58 RateLimiter ratelimiter.RateLimiter
59
60
61
62
63
64
65
66
67
68
69
70 NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface
71
72
73
74 LogConstructor func(request *reconcile.Request) logr.Logger
75 }
76
77
78
79
80
81 type Controller interface {
82
83 reconcile.Reconciler
84
85
86 Watch(src source.Source) error
87
88
89
90 Start(ctx context.Context) error
91
92
93 GetLogger() logr.Logger
94 }
95
96
97
98 func New(name string, mgr manager.Manager, options Options) (Controller, error) {
99 c, err := NewUnmanaged(name, mgr, options)
100 if err != nil {
101 return nil, err
102 }
103
104
105 return c, mgr.Add(c)
106 }
107
108
109
110 func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
111 if options.Reconciler == nil {
112 return nil, fmt.Errorf("must specify Reconciler")
113 }
114
115 if len(name) == 0 {
116 return nil, fmt.Errorf("must specify Name for Controller")
117 }
118
119 if options.LogConstructor == nil {
120 log := mgr.GetLogger().WithValues(
121 "controller", name,
122 )
123 options.LogConstructor = func(req *reconcile.Request) logr.Logger {
124 log := log
125 if req != nil {
126 log = log.WithValues(
127 "object", klog.KRef(req.Namespace, req.Name),
128 "namespace", req.Namespace, "name", req.Name,
129 )
130 }
131 return log
132 }
133 }
134
135 if options.MaxConcurrentReconciles <= 0 {
136 if mgr.GetControllerOptions().MaxConcurrentReconciles > 0 {
137 options.MaxConcurrentReconciles = mgr.GetControllerOptions().MaxConcurrentReconciles
138 } else {
139 options.MaxConcurrentReconciles = 1
140 }
141 }
142
143 if options.CacheSyncTimeout == 0 {
144 if mgr.GetControllerOptions().CacheSyncTimeout != 0 {
145 options.CacheSyncTimeout = mgr.GetControllerOptions().CacheSyncTimeout
146 } else {
147 options.CacheSyncTimeout = 2 * time.Minute
148 }
149 }
150
151 if options.RateLimiter == nil {
152 options.RateLimiter = workqueue.DefaultControllerRateLimiter()
153 }
154
155 if options.NewQueue == nil {
156 options.NewQueue = func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface {
157 return workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{
158 Name: controllerName,
159 })
160 }
161 }
162
163 if options.RecoverPanic == nil {
164 options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
165 }
166
167 if options.NeedLeaderElection == nil {
168 options.NeedLeaderElection = mgr.GetControllerOptions().NeedLeaderElection
169 }
170
171
172 return &controller.Controller{
173 Do: options.Reconciler,
174 RateLimiter: options.RateLimiter,
175 NewQueue: options.NewQueue,
176 MaxConcurrentReconciles: options.MaxConcurrentReconciles,
177 CacheSyncTimeout: options.CacheSyncTimeout,
178 Name: name,
179 LogConstructor: options.LogConstructor,
180 RecoverPanic: options.RecoverPanic,
181 LeaderElected: options.NeedLeaderElection,
182 }, nil
183 }
184
185
186 var ReconcileIDFromContext = controller.ReconcileIDFromContext
187
View as plain text