1
16
17 package sync
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "reflect"
24 "testing"
25 "time"
26
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/klog/v2"
29 "k8s.io/klog/v2/ktesting"
30 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
31 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test"
32 netutils "k8s.io/utils/net"
33
34 v1 "k8s.io/api/core/v1"
35 )
36
37 var (
38 _, clusterCIDRRange, _ = netutils.ParseCIDRSloppy("10.1.0.0/16")
39 )
40
41 type fakeEvent struct {
42 nodeName string
43 reason string
44 }
45
46 type fakeAPIs struct {
47 aliasRange *net.IPNet
48 aliasErr error
49 addAliasErr error
50 nodeRet *v1.Node
51 nodeErr error
52 updateNodeErr error
53 resyncTimeout time.Duration
54 reportChan chan struct{}
55
56 updateNodeNetworkUnavailableErr error
57
58 calls []string
59 events []fakeEvent
60 results []error
61
62 logger klog.Logger
63 }
64
65 func (f *fakeAPIs) Alias(ctx context.Context, node *v1.Node) (*net.IPNet, error) {
66 f.calls = append(f.calls, fmt.Sprintf("alias %v", node.Name))
67 return f.aliasRange, f.aliasErr
68 }
69
70 func (f *fakeAPIs) AddAlias(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error {
71 f.calls = append(f.calls, fmt.Sprintf("addAlias %v %v", node.Name, cidrRange))
72 return f.addAliasErr
73 }
74
75 func (f *fakeAPIs) Node(ctx context.Context, name string) (*v1.Node, error) {
76 f.calls = append(f.calls, fmt.Sprintf("node %v", name))
77 return f.nodeRet, f.nodeErr
78 }
79
80 func (f *fakeAPIs) UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error {
81 f.calls = append(f.calls, fmt.Sprintf("updateNode %v", node))
82 return f.updateNodeErr
83 }
84
85 func (f *fakeAPIs) UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error {
86 f.calls = append(f.calls, fmt.Sprintf("updateNodeNetworkUnavailable %v %v", nodeName, unavailable))
87 return f.updateNodeNetworkUnavailableErr
88 }
89
90 func (f *fakeAPIs) EmitNodeWarningEvent(nodeName, reason, fmtStr string, args ...interface{}) {
91 f.events = append(f.events, fakeEvent{nodeName, reason})
92 }
93
94 func (f *fakeAPIs) ReportResult(err error) {
95 f.logger.V(2).Info("ReportResult", "err", err)
96 f.results = append(f.results, err)
97 if f.reportChan != nil {
98 f.reportChan <- struct{}{}
99 }
100 }
101
102 func (f *fakeAPIs) ResyncTimeout() time.Duration {
103 if f.resyncTimeout == 0 {
104 return time.Second * 10000
105 }
106 return f.resyncTimeout
107 }
108
109 func (f *fakeAPIs) dumpTrace() {
110 for i, x := range f.calls {
111 f.logger.Info("trace", "index", i, "call", x)
112 }
113 }
114
115 var nodeWithoutCIDRRange = &v1.Node{
116 ObjectMeta: metav1.ObjectMeta{Name: "node1"},
117 }
118
119 var nodeWithCIDRRange = &v1.Node{
120 ObjectMeta: metav1.ObjectMeta{Name: "node1"},
121 Spec: v1.NodeSpec{PodCIDR: "10.1.1.0/24"},
122 }
123
124 func TestNodeSyncUpdate(t *testing.T) {
125 t.Parallel()
126
127 for _, tc := range []struct {
128 desc string
129 mode NodeSyncMode
130 node *v1.Node
131 fake fakeAPIs
132
133 events []fakeEvent
134 wantError bool
135 }{
136 {
137 desc: "validate range ==",
138 mode: SyncFromCloud,
139 node: nodeWithCIDRRange,
140 fake: fakeAPIs{
141 aliasRange: test.MustParseCIDR(nodeWithCIDRRange.Spec.PodCIDR),
142 },
143 },
144 {
145 desc: "validate range !=",
146 mode: SyncFromCloud,
147 node: nodeWithCIDRRange,
148 fake: fakeAPIs{aliasRange: test.MustParseCIDR("192.168.0.0/24")},
149 events: []fakeEvent{{"node1", "CloudCIDRAllocatorMismatch"}},
150 },
151 {
152 desc: "update alias from node",
153 mode: SyncFromCloud,
154 node: nodeWithCIDRRange,
155 events: []fakeEvent{{"node1", "CloudCIDRAllocatorInvalidMode"}},
156 wantError: true,
157 },
158 {
159 desc: "update alias from node",
160 mode: SyncFromCluster,
161 node: nodeWithCIDRRange,
162
163 },
164 {
165 desc: "update node from alias",
166 mode: SyncFromCloud,
167 node: nodeWithoutCIDRRange,
168 fake: fakeAPIs{aliasRange: test.MustParseCIDR("10.1.2.3/16")},
169
170 },
171 {
172 desc: "update node from alias",
173 mode: SyncFromCluster,
174 node: nodeWithoutCIDRRange,
175 fake: fakeAPIs{aliasRange: test.MustParseCIDR("10.1.2.3/16")},
176 events: []fakeEvent{{"node1", "CloudCIDRAllocatorInvalidMode"}},
177 wantError: true,
178 },
179 {
180 desc: "allocate range",
181 mode: SyncFromCloud,
182 node: nodeWithoutCIDRRange,
183 events: []fakeEvent{{"node1", "CloudCIDRAllocatorInvalidMode"}},
184 wantError: true,
185 },
186 {
187 desc: "allocate range",
188 mode: SyncFromCluster,
189 node: nodeWithoutCIDRRange,
190 },
191 {
192 desc: "update with node==nil",
193 mode: SyncFromCluster,
194 node: nil,
195 fake: fakeAPIs{
196 nodeRet: nodeWithCIDRRange,
197 },
198 wantError: false,
199 },
200 } {
201 logger, _ := ktesting.NewTestContext(t)
202 cidr, _ := cidrset.NewCIDRSet(clusterCIDRRange, 24)
203 tc.fake.logger = logger
204 sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidr)
205 doneChan := make(chan struct{})
206
207
208 go sync.Loop(logger, doneChan)
209 sync.Update(tc.node)
210 close(sync.opChan)
211 <-doneChan
212 tc.fake.dumpTrace()
213
214 if !reflect.DeepEqual(tc.fake.events, tc.events) {
215 t.Errorf("%v, %v; fake.events = %#v, want %#v", tc.desc, tc.mode, tc.fake.events, tc.events)
216 }
217
218 var hasError bool
219 for _, r := range tc.fake.results {
220 hasError = hasError || (r != nil)
221 }
222 if hasError != tc.wantError {
223 t.Errorf("%v, %v; hasError = %t, errors = %v, want %t",
224 tc.desc, tc.mode, hasError, tc.fake.events, tc.wantError)
225 }
226 }
227 }
228
229 func TestNodeSyncResync(t *testing.T) {
230 logger, _ := ktesting.NewTestContext(t)
231 fake := &fakeAPIs{
232 nodeRet: nodeWithCIDRRange,
233 resyncTimeout: time.Millisecond,
234 reportChan: make(chan struct{}),
235 logger: logger,
236 }
237 cidr, _ := cidrset.NewCIDRSet(clusterCIDRRange, 24)
238 sync := New(fake, fake, fake, SyncFromCluster, "node1", cidr)
239 doneChan := make(chan struct{})
240 go sync.Loop(logger, doneChan)
241 <-fake.reportChan
242 close(sync.opChan)
243
244 go func() {
245 <-fake.reportChan
246 }()
247 <-doneChan
248 fake.dumpTrace()
249 }
250
251 func TestNodeSyncDelete(t *testing.T) {
252 t.Parallel()
253
254 for _, tc := range []struct {
255 desc string
256 mode NodeSyncMode
257 node *v1.Node
258 fake fakeAPIs
259 }{
260 {
261 desc: "delete",
262 mode: SyncFromCluster,
263 node: nodeWithCIDRRange,
264 },
265 {
266 desc: "delete without CIDR range",
267 mode: SyncFromCluster,
268 node: nodeWithoutCIDRRange,
269 },
270 {
271 desc: "delete with invalid CIDR range",
272 mode: SyncFromCluster,
273 node: &v1.Node{
274 ObjectMeta: metav1.ObjectMeta{Name: "node1"},
275 Spec: v1.NodeSpec{PodCIDR: "invalid"},
276 },
277 },
278 } {
279 logger, _ := ktesting.NewTestContext(t)
280 cidr, _ := cidrset.NewCIDRSet(clusterCIDRRange, 24)
281 tc.fake.logger = logger
282 sync := New(&tc.fake, &tc.fake, &tc.fake, tc.mode, "node1", cidr)
283 doneChan := make(chan struct{})
284
285
286 go sync.Loop(logger, doneChan)
287 sync.Delete(tc.node)
288 <-doneChan
289 tc.fake.dumpTrace()
290
291
305 }
306 }
307
View as plain text