1 package services
2
3 import (
4 "context"
5 "database/sql"
6 "fmt"
7 "net"
8 "strings"
9
10 "github.com/hashicorp/go-multierror"
11
12 netutils "k8s.io/utils/net"
13
14 sqlerr "edge-infra.dev/pkg/edge/api/apierror/sql"
15 "edge-infra.dev/pkg/edge/api/graph/model"
16 sqlquery "edge-infra.dev/pkg/edge/api/sql"
17 "edge-infra.dev/pkg/edge/constants"
18 "edge-infra.dev/pkg/lib/networkvalidator"
19 )
20
21 const (
22 defaultPriority = 100
23 )
24
25 var (
26 networkServiceValidators = map[string]networkServiceValidator{
27 constants.ServiceTypeNTP: validateNetworkServiceNTP,
28 constants.ServiceTypeDNS: validateNetworkServiceIPAddress,
29 constants.ServiceTypeVIP: validateNetworkServiceIPAddress,
30 constants.ServiceTypeClusterDNS: validateNetworkServiceClusterDNS,
31 constants.ServiceTypePodNetworkCIDR: validateK8sNetworkCIDR,
32 constants.ServiceTypeServiceNetworkCIDR: validateK8sNetworkCIDR,
33 constants.ServiceTypeEgressTunnelsCIDR: validateK8sNetworkCIDR,
34 }
35 networkRangeValidators = map[string]networkRangeValidator{
36 constants.ServiceTypePodNetworkCIDR: validatePodNetworkRange,
37 constants.ServiceTypeServiceNetworkCIDR: validateServiceNetworkRange,
38 constants.ServiceTypeEgressTunnelsCIDR: validateEgressTunnelNetworkRange,
39 }
40 )
41
42 type networkServiceValidator func(*model.ClusterNetworkServiceInfo, []*model.ClusterNetworkServiceInfo) error
43
44 type networkRangeValidator func(net.IPMask) error
45
46 func (s *storeClusterService) GetClusterNetworkServices(ctx context.Context, clusterEdgeID string) ([]*model.ClusterNetworkServiceInfo, error) {
47 rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterNetworkServices, clusterEdgeID)
48 if err != nil {
49 return nil, sqlerr.Wrap(err)
50 }
51
52 defer rows.Close()
53
54 networkServices := []*model.ClusterNetworkServiceInfo{}
55
56 for rows.Next() {
57 netService := &model.ClusterNetworkServiceInfo{}
58 if err := rows.Scan(&netService.NetworkServiceID, &netService.IP, &netService.Family, &netService.ServiceType, &netService.Priority); err != nil {
59 return nil, sqlerr.Wrap(err)
60 }
61
62 networkServices = append(networkServices, netService)
63 }
64 if err := rows.Err(); err != nil {
65 return nil, sqlerr.Wrap(err)
66 }
67 return networkServices, nil
68 }
69
70 func (s *storeClusterService) GetClusterNetworkServiceByNetworkID(ctx context.Context, clusterEdgeID, networkServiceID string) (*model.ClusterNetworkServiceInfo, error) {
71 networkService := &model.ClusterNetworkServiceInfo{}
72 row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterNetworkServiceByNetworkID, clusterEdgeID, networkServiceID)
73 if err := row.Scan(&networkService.NetworkServiceID, &networkService.IP, &networkService.Family, &networkService.ServiceType, &networkService.Priority); err != nil {
74 return networkService, sqlerr.Wrap(err)
75 }
76 return networkService, nil
77 }
78
79 func (s *storeClusterService) CreateClusterNetworkService(ctx context.Context, clusterEdgeID string, networkService *model.CreateNetworkServiceInfo) (*model.ClusterNetworkServiceInfo, error) {
80 netService := createNetServiceToNetService(networkService)
81
82 existingNetworkServices, err := s.GetClusterNetworkServices(ctx, clusterEdgeID)
83 if err != nil {
84 return nil, err
85 }
86
87 if err := networkServiceValidators[networkService.ServiceType](netService, existingNetworkServices); err != nil {
88 return nil, err
89 }
90
91 if hasDuplicateIP(netService, existingNetworkServices) {
92 return nil, fmt.Errorf("IP address %s already exists for %s", netService.IP, netService.ServiceType)
93 }
94
95 priority := validateCreateNetworkServicePriorityField(networkService.Priority)
96 result := s.SQLDB.QueryRowContext(ctx, sqlquery.CreateClusterNetworkServices, clusterEdgeID, networkService.IP, networkService.Family, networkService.ServiceType, priority)
97 if err := castNetworkServiceResult(netService, result); err != nil {
98 return nil, err
99 }
100
101 return netService, nil
102 }
103
104 func (s *storeClusterService) CreateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.CreateNetworkServiceInfo) ([]*model.ClusterNetworkServiceInfo, error) {
105 var allErrs error
106 networkServices := []*model.ClusterNetworkServiceInfo{}
107
108 for _, networkService := range networkServicesInfo {
109 netService, err := s.CreateClusterNetworkService(ctx, clusterEdgeID, networkService)
110 if err != nil {
111 allErrs = multierror.Append(err, allErrs)
112 }
113
114 networkServices = append(networkServices, netService)
115 }
116
117 return networkServices, allErrs
118 }
119
120 func (s *storeClusterService) UpdateClusterNetworkService(ctx context.Context, clusterEdgeID string, networkService *model.UpdateNetworkServiceInfo, serviceType string) (*model.ClusterNetworkServiceInfo, error) {
121
122 netService := updateNetServiceToNetService(networkService, serviceType)
123
124 clusterNetworkServices, err := s.GetClusterNetworkServices(ctx, clusterEdgeID)
125 if err != nil {
126 return nil, err
127 }
128
129 if err := networkServiceValidators[serviceType](netService, clusterNetworkServices); err != nil {
130 return nil, err
131 }
132
133 if hasDuplicateIP(netService, clusterNetworkServices) {
134 return nil, fmt.Errorf("IP address %s already exists for %s", netService.IP, netService.ServiceType)
135 }
136
137 if networkService.Priority != nil {
138 priority := validateUpdateNetworkServicePriorityField(networkService.Priority)
139 networkService.Priority = &priority
140 }
141
142 result := s.SQLDB.QueryRowContext(ctx, sqlquery.UpdateClusterNetworkServices, netService.IP, netService.Family, netService.Priority, netService.NetworkServiceID, clusterEdgeID)
143 if err := castNetworkServiceResult(netService, result); err != nil {
144 return nil, err
145 }
146
147
148
149 if netService.ServiceType == constants.ServiceTypeServiceNetworkCIDR {
150 if err := s.updateClusterDNSIP(ctx, clusterEdgeID, netService.IP); err != nil {
151 return nil, err
152 }
153 }
154
155 return netService, nil
156 }
157
158 func (s *storeClusterService) UpdateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.UpdateNetworkServiceInfo, existingServiceTypesByID map[string]string) ([]*model.ClusterNetworkServiceInfo, error) {
159 var allErrs error
160 networkServices := []*model.ClusterNetworkServiceInfo{}
161
162 for _, networkService := range networkServicesInfo {
163 if networkService == nil {
164 continue
165 }
166
167 netService, err := s.UpdateClusterNetworkService(ctx, clusterEdgeID, networkService, existingServiceTypesByID[networkService.NetworkServiceID])
168 if err != nil {
169 allErrs = multierror.Append(err, allErrs)
170 }
171
172 networkServices = append(networkServices, netService)
173 }
174
175 return networkServices, allErrs
176 }
177
178 func (s *storeClusterService) DeleteClusterNetworkService(ctx context.Context, clusterEdgeID, networkServiceID string) (bool, error) {
179 networkService, err := s.GetClusterNetworkServiceByNetworkID(ctx, clusterEdgeID, networkServiceID)
180 if err != nil {
181 return false, err
182 }
183
184 switch networkService.ServiceType {
185 case
186 constants.ServiceTypePodNetworkCIDR,
187 constants.ServiceTypeServiceNetworkCIDR,
188 constants.ServiceTypeClusterDNS:
189 return true, fmt.Errorf("adhoc deletion of the %s is forbidden", networkService.ServiceType)
190 }
191
192 _, err = s.SQLDB.ExecContext(ctx, sqlquery.DeleteClusterNetworkService, clusterEdgeID, networkServiceID)
193 if err != nil {
194 return false, err
195 }
196 return true, nil
197 }
198
199 func (s *storeClusterService) GetClusterK8sNetworkServices(ctx context.Context, clusterEdgeID string) (map[string]string, error) {
200 clusterNetworkServices, err := s.GetClusterNetworkServices(ctx, clusterEdgeID)
201 if err != nil {
202 return nil, err
203 }
204 services := map[string]string{
205 constants.ServiceTypeClusterDNS: "",
206 constants.ServiceTypePodNetworkCIDR: "",
207 constants.ServiceTypeServiceNetworkCIDR: "",
208 constants.ServiceTypeEgressTunnelsCIDR: "",
209 }
210 for _, clusterNetworkService := range clusterNetworkServices {
211 _, exists := services[clusterNetworkService.ServiceType]
212 if exists {
213 services[clusterNetworkService.ServiceType] = clusterNetworkService.IP
214 }
215 }
216 return services, nil
217 }
218
219 func (s *storeClusterService) GetK8sClusterNetworkService(ctx context.Context, clusterEdgeID, networkService string) (*model.ClusterNetworkServiceInfo, error) {
220 clusterNetworkServices, err := s.GetClusterNetworkServices(ctx, clusterEdgeID)
221 if err != nil {
222 return nil, err
223 }
224 return getNetworkServiceFromList(clusterNetworkServices, networkService)
225 }
226
227 func getNetworkServiceFromList(clusterNetworkServices []*model.ClusterNetworkServiceInfo, networkService string) (*model.ClusterNetworkServiceInfo, error) {
228 for _, service := range clusterNetworkServices {
229 if service.ServiceType == networkService {
230 return service, nil
231 }
232 }
233 return nil, fmt.Errorf("could not find network service %s", networkService)
234 }
235
236 func (s *storeClusterService) updateClusterDNSIP(ctx context.Context, clusterEdgeID, serviceCIDR string) error {
237 clusterDNS, err := s.GetK8sClusterNetworkService(ctx, clusterEdgeID, constants.ServiceTypeClusterDNS)
238 if err != nil {
239 return err
240 }
241
242
243 _, serviceSubnet, _ := net.ParseCIDR(serviceCIDR)
244 dnsIP, err := netutils.GetIndexedIP(serviceSubnet, 10)
245 if err != nil {
246 return err
247 }
248
249 priority := defaultPriority
250
251 networkService := &model.ClusterNetworkServiceInfo{
252 NetworkServiceID: clusterDNS.NetworkServiceID,
253 ServiceType: constants.ServiceTypeClusterDNS,
254 IP: dnsIP.String(),
255 Family: "inet",
256 Priority: &priority,
257 }
258
259 result := s.SQLDB.QueryRowContext(ctx, sqlquery.UpdateClusterNetworkServices, networkService.IP, networkService.Family, networkService.Priority, networkService.NetworkServiceID, clusterEdgeID)
260 return castNetworkServiceResult(networkService, result)
261 }
262
263 func validateCreateNetworkServicePriorityField(priority *int) int {
264 if priority == nil {
265 return defaultPriority
266 }
267 if *priority < 1 {
268 return defaultPriority
269 }
270 return *priority
271 }
272
273 func validateUpdateNetworkServicePriorityField(priority *int) int {
274 if *priority < 1 {
275 return defaultPriority
276 }
277 return *priority
278 }
279
280 func createNetServiceToNetService(createNetService *model.CreateNetworkServiceInfo) *model.ClusterNetworkServiceInfo {
281 return &model.ClusterNetworkServiceInfo{
282 ServiceType: createNetService.ServiceType,
283 IP: createNetService.IP,
284 Family: createNetService.Family,
285 Priority: createNetService.Priority,
286 }
287 }
288 func updateNetServiceToNetService(updateNetService *model.UpdateNetworkServiceInfo, serviceType string) *model.ClusterNetworkServiceInfo {
289 return &model.ClusterNetworkServiceInfo{
290 ServiceType: serviceType,
291 NetworkServiceID: updateNetService.NetworkServiceID,
292 IP: updateNetService.IP,
293 Family: updateNetService.Family,
294 Priority: updateNetService.Priority,
295 }
296 }
297
298 func castNetworkServiceResult(networkService *model.ClusterNetworkServiceInfo, result *sql.Row) error {
299 if err := result.Err(); err != nil {
300 return sqlerr.Wrap(err)
301 }
302
303 if err := result.Scan(&networkService.NetworkServiceID, &networkService.IP, &networkService.Family, &networkService.ServiceType, &networkService.Priority); err != nil {
304 return sqlerr.Wrap(err)
305 }
306 return nil
307 }
308
309 func validateNetworkServiceIPAddress(networkService *model.ClusterNetworkServiceInfo, _ []*model.ClusterNetworkServiceInfo) error {
310 if net.ParseIP(networkService.IP) == nil {
311 return fmt.Errorf("invalid IP address %s for %s", networkService.IP, networkService.ServiceType)
312 }
313 return nil
314 }
315
316 func validateNetworkServiceNTP(networkService *model.ClusterNetworkServiceInfo, _ []*model.ClusterNetworkServiceInfo) error {
317 if !(networkvalidator.IsValidDomain(networkService.IP) || networkvalidator.ValidateIP(networkService.IP)) {
318 return fmt.Errorf("invalid IP/domain address %s for %s", networkService.IP, networkService.ServiceType)
319 }
320 return nil
321 }
322
323 func validateNetworkServiceClusterDNS(_ *model.ClusterNetworkServiceInfo, _ []*model.ClusterNetworkServiceInfo) error {
324 return nil
325 }
326
327 func validateK8sNetworkCIDR(networkService *model.ClusterNetworkServiceInfo, clusterNetworkServices []*model.ClusterNetworkServiceInfo) error {
328 _, network, err := net.ParseCIDR(networkService.IP)
329 if err != nil || network == nil {
330 return fmt.Errorf("invalid CIDR address %s for %s", networkService.IP, networkService.ServiceType)
331 }
332 if err := networkRangeValidators[networkService.ServiceType](network.Mask); err != nil {
333 return err
334 }
335 return validateDisjointSubnets(networkService, clusterNetworkServices)
336 }
337
338 func hasDuplicateIP(networkService *model.ClusterNetworkServiceInfo, clusterNetworkServices []*model.ClusterNetworkServiceInfo) bool {
339 for _, clusterNetworkService := range clusterNetworkServices {
340 if networkService.ServiceType == clusterNetworkService.ServiceType && networkService.IP == clusterNetworkService.IP {
341 if networkService.NetworkServiceID == clusterNetworkService.NetworkServiceID {
342
343 continue
344 }
345
346 return true
347 }
348 }
349 return false
350 }
351
352 func validatePodNetworkRange(mask net.IPMask) error {
353 prefixLen, _ := mask.Size()
354 if prefixLen < 16 || prefixLen > 21 {
355 return fmt.Errorf("invalid prefix length. Prefix length must be between /16 and /21 for k8s pod network")
356 }
357 return nil
358 }
359
360 func validateServiceNetworkRange(mask net.IPMask) error {
361 prefixLen, _ := mask.Size()
362 if prefixLen < 16 || prefixLen > 22 {
363 return fmt.Errorf("invalid prefix length. Prefix length must be between /16 and /22 for k8s service network")
364 }
365 return nil
366 }
367
368 func validateEgressTunnelNetworkRange(mask net.IPMask) error {
369 prefixLen, _ := mask.Size()
370 if prefixLen < 22 || prefixLen > 31 {
371 return fmt.Errorf("invalid prefix length. Prefix length must be between /16 and /22 for egress gateway tunnels")
372 }
373 return nil
374 }
375
376 func validateDisjointSubnets(networkService *model.ClusterNetworkServiceInfo, clusterNetworkServices []*model.ClusterNetworkServiceInfo) error {
377 subnets := []string{constants.ServiceTypePodNetworkCIDR, constants.ServiceTypeServiceNetworkCIDR}
378 _, subnetToValidate, _ := net.ParseCIDR(networkService.IP)
379 for _, subnet := range subnets {
380 if subnet != networkService.ServiceType {
381 subnetService, err := getNetworkServiceFromList(clusterNetworkServices, subnet)
382 if err != nil {
383 if strings.Contains(err.Error(), "could not find network service") {
384 return nil
385 }
386 return err
387 }
388 _, otherSubnet, _ := net.ParseCIDR(subnetService.IP)
389 if subnetToValidate.Contains(otherSubnet.IP) || otherSubnet.Contains(subnetToValidate.IP) {
390 return fmt.Errorf("invalid subnet - %s must not overlap with %s", networkService.ServiceType, subnet)
391 }
392 }
393 }
394 return nil
395 }
396
View as plain text