1
16
17 package main
18
19 import (
20 "flag"
21 "fmt"
22 "time"
23
24 "k8s.io/klog/v2"
25
26 v1 "k8s.io/api/core/v1"
27 meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/fields"
29 "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/util/wait"
31 "k8s.io/client-go/kubernetes"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/client-go/tools/clientcmd"
34 "k8s.io/client-go/util/workqueue"
35 )
36
37
38 type Controller struct {
39 indexer cache.Indexer
40 queue workqueue.RateLimitingInterface
41 informer cache.Controller
42 }
43
44
45 func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
46 return &Controller{
47 informer: informer,
48 indexer: indexer,
49 queue: queue,
50 }
51 }
52
53 func (c *Controller) processNextItem() bool {
54
55 key, quit := c.queue.Get()
56 if quit {
57 return false
58 }
59
60
61
62 defer c.queue.Done(key)
63
64
65 err := c.syncToStdout(key.(string))
66
67 c.handleErr(err, key)
68 return true
69 }
70
71
72
73
74 func (c *Controller) syncToStdout(key string) error {
75 obj, exists, err := c.indexer.GetByKey(key)
76 if err != nil {
77 klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
78 return err
79 }
80
81 if !exists {
82
83 fmt.Printf("Pod %s does not exist anymore\n", key)
84 } else {
85
86
87 fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
88 }
89 return nil
90 }
91
92
93 func (c *Controller) handleErr(err error, key interface{}) {
94 if err == nil {
95
96
97
98 c.queue.Forget(key)
99 return
100 }
101
102
103 if c.queue.NumRequeues(key) < 5 {
104 klog.Infof("Error syncing pod %v: %v", key, err)
105
106
107
108 c.queue.AddRateLimited(key)
109 return
110 }
111
112 c.queue.Forget(key)
113
114 runtime.HandleError(err)
115 klog.Infof("Dropping pod %q out of the queue: %v", key, err)
116 }
117
118
119 func (c *Controller) Run(workers int, stopCh chan struct{}) {
120 defer runtime.HandleCrash()
121
122
123 defer c.queue.ShutDown()
124 klog.Info("Starting Pod controller")
125
126 go c.informer.Run(stopCh)
127
128
129 if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
130 runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
131 return
132 }
133
134 for i := 0; i < workers; i++ {
135 go wait.Until(c.runWorker, time.Second, stopCh)
136 }
137
138 <-stopCh
139 klog.Info("Stopping Pod controller")
140 }
141
142 func (c *Controller) runWorker() {
143 for c.processNextItem() {
144 }
145 }
146
147 func main() {
148 var kubeconfig string
149 var master string
150
151 flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
152 flag.StringVar(&master, "master", "", "master url")
153 flag.Parse()
154
155
156 config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
157 if err != nil {
158 klog.Fatal(err)
159 }
160
161
162 clientset, err := kubernetes.NewForConfig(config)
163 if err != nil {
164 klog.Fatal(err)
165 }
166
167
168 podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
169
170
171 queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
172
173
174
175
176
177 indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
178 AddFunc: func(obj interface{}) {
179 key, err := cache.MetaNamespaceKeyFunc(obj)
180 if err == nil {
181 queue.Add(key)
182 }
183 },
184 UpdateFunc: func(old interface{}, new interface{}) {
185 key, err := cache.MetaNamespaceKeyFunc(new)
186 if err == nil {
187 queue.Add(key)
188 }
189 },
190 DeleteFunc: func(obj interface{}) {
191
192
193 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
194 if err == nil {
195 queue.Add(key)
196 }
197 },
198 }, cache.Indexers{})
199
200 controller := NewController(queue, indexer, informer)
201
202
203
204
205
206 indexer.Add(&v1.Pod{
207 ObjectMeta: meta_v1.ObjectMeta{
208 Name: "mypod",
209 Namespace: v1.NamespaceDefault,
210 },
211 })
212
213
214 stop := make(chan struct{})
215 defer close(stop)
216 go controller.Run(1, stop)
217
218
219 select {}
220 }
221
View as plain text