...
1
16
17 package config
18
19 import (
20 "time"
21
22 v1 "k8s.io/api/core/v1"
23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24 "k8s.io/apimachinery/pkg/fields"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/apimachinery/pkg/util/wait"
27 clientset "k8s.io/client-go/kubernetes"
28 "k8s.io/client-go/tools/cache"
29 "k8s.io/klog/v2"
30 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
31 )
32
33
34 const WaitForAPIServerSyncPeriod = 1 * time.Second
35
36
37 func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
38 lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
39
40
41
42 klog.InfoS("Waiting for node sync before watching apiserver pods")
43 go func() {
44 for {
45 if nodeHasSynced() {
46 klog.V(4).InfoS("node sync completed")
47 break
48 }
49 time.Sleep(WaitForAPIServerSyncPeriod)
50 klog.V(4).InfoS("node sync has not completed yet")
51 }
52 klog.InfoS("Watching apiserver")
53 newSourceApiserverFromLW(lw, updates)
54 }()
55 }
56
57
58 func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
59 send := func(objs []interface{}) {
60 var pods []*v1.Pod
61 for _, o := range objs {
62 pods = append(pods, o.(*v1.Pod))
63 }
64 updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
65 }
66 r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
67 go r.Run(wait.NeverStop)
68 }
69
View as plain text