1
16
17 package cache
18
19 import (
20 "context"
21 "os"
22 "sort"
23 "strconv"
24 "time"
25
26 "github.com/google/go-cmp/cmp"
27
28 "k8s.io/apimachinery/pkg/api/meta"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/runtime"
31 "k8s.io/apimachinery/pkg/util/wait"
32 "k8s.io/klog/v2"
33 )
34
35 var dataConsistencyDetectionEnabled = false
36
37 func init() {
38 dataConsistencyDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
39 }
40
41
42
43
44
45
46
47
48
49
50 func checkWatchListConsistencyIfRequested(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
51 if !dataConsistencyDetectionEnabled {
52 return
53 }
54 checkWatchListConsistency(stopCh, identity, lastSyncedResourceVersion, listerWatcher, store)
55 }
56
57
58
59
60
61
62 func checkWatchListConsistency(stopCh <-chan struct{}, identity string, lastSyncedResourceVersion string, listerWatcher Lister, store Store) {
63 klog.Warningf("%s: data consistency check for the watch-list feature is enabled, this will result in an additional call to the API server.", identity)
64 opts := metav1.ListOptions{
65 ResourceVersion: lastSyncedResourceVersion,
66 ResourceVersionMatch: metav1.ResourceVersionMatchExact,
67 }
68 var list runtime.Object
69 err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), time.Second, true, func(_ context.Context) (done bool, err error) {
70 list, err = listerWatcher.List(opts)
71 if err != nil {
72
73
74
75 klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err)
76 return false, nil
77 }
78 return true, nil
79 })
80 if err != nil {
81 klog.Errorf("failed to list data from the server, the watch-list consistency check won't be performed, stopCh was closed, err: %v", err)
82 return
83 }
84
85 rawListItems, err := meta.ExtractListWithAlloc(list)
86 if err != nil {
87 panic(err)
88 }
89
90 listItems := toMetaObjectSliceOrDie(rawListItems)
91 storeItems := toMetaObjectSliceOrDie(store.List())
92
93 sort.Sort(byUID(listItems))
94 sort.Sort(byUID(storeItems))
95
96 if !cmp.Equal(listItems, storeItems) {
97 klog.Infof("%s: data received by the new watch-list api call is different than received by the standard list api call, diff: %v", identity, cmp.Diff(listItems, storeItems))
98 msg := "data inconsistency detected for the watch-list feature, panicking!"
99 panic(msg)
100 }
101 }
102
103 type byUID []metav1.Object
104
105 func (a byUID) Len() int { return len(a) }
106 func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() }
107 func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
108
109 func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object {
110 result := make([]metav1.Object, len(s))
111 for i, v := range s {
112 m, err := meta.Accessor(v)
113 if err != nil {
114 panic(err)
115 }
116 result[i] = m
117 }
118 return result
119 }
120
View as plain text