...

Source file src/edge-infra.dev/pkg/edge/api/services/cluster_network_service_orchestration.go

Documentation: edge-infra.dev/pkg/edge/api/services

     1  package services
     2  
     3  import (
     4  	"context"
     5  	"database/sql"
     6  	"fmt"
     7  	"net"
     8  	"strings"
     9  
    10  	"github.com/hashicorp/go-multierror"
    11  
    12  	netutils "k8s.io/utils/net"
    13  
    14  	sqlerr "edge-infra.dev/pkg/edge/api/apierror/sql"
    15  	"edge-infra.dev/pkg/edge/api/graph/model"
    16  	sqlquery "edge-infra.dev/pkg/edge/api/sql"
    17  	"edge-infra.dev/pkg/edge/constants"
    18  	"edge-infra.dev/pkg/lib/networkvalidator"
    19  )
    20  
    21  const (
    22  	defaultPriority = 100
    23  )
    24  
    25  var (
    26  	networkServiceValidators = map[string]networkServiceValidator{
    27  		constants.ServiceTypeNTP:                validateNetworkServiceNTP,
    28  		constants.ServiceTypeDNS:                validateNetworkServiceIPAddress,
    29  		constants.ServiceTypeVIP:                validateNetworkServiceIPAddress,
    30  		constants.ServiceTypeClusterDNS:         validateNetworkServiceClusterDNS,
    31  		constants.ServiceTypePodNetworkCIDR:     validateK8sNetworkCIDR,
    32  		constants.ServiceTypeServiceNetworkCIDR: validateK8sNetworkCIDR,
    33  		constants.ServiceTypeEgressTunnelsCIDR:  validateK8sNetworkCIDR,
    34  	}
    35  	networkRangeValidators = map[string]networkRangeValidator{
    36  		constants.ServiceTypePodNetworkCIDR:     validatePodNetworkRange,
    37  		constants.ServiceTypeServiceNetworkCIDR: validateServiceNetworkRange,
    38  		constants.ServiceTypeEgressTunnelsCIDR:  validateEgressTunnelNetworkRange,
    39  	}
    40  )
    41  
    42  type networkServiceValidator func(*model.ClusterNetworkServiceInfo, []*model.ClusterNetworkServiceInfo) error
    43  
    44  type networkRangeValidator func(net.IPMask) error
    45  
    46  func (s *storeClusterService) GetClusterNetworkServices(ctx context.Context, clusterEdgeID string) ([]*model.ClusterNetworkServiceInfo, error) {
    47  	rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterNetworkServices, clusterEdgeID)
    48  	if err != nil {
    49  		return nil, sqlerr.Wrap(err)
    50  	}
    51  
    52  	defer rows.Close()
    53  
    54  	networkServices := []*model.ClusterNetworkServiceInfo{}
    55  
    56  	for rows.Next() {
    57  		netService := &model.ClusterNetworkServiceInfo{}
    58  		if err := rows.Scan(&netService.NetworkServiceID, &netService.IP, &netService.Family, &netService.ServiceType, &netService.Priority); err != nil {
    59  			return nil, sqlerr.Wrap(err)
    60  		}
    61  
    62  		networkServices = append(networkServices, netService)
    63  	}
    64  	if err := rows.Err(); err != nil {
    65  		return nil, sqlerr.Wrap(err)
    66  	}
    67  	return networkServices, nil
    68  }
    69  
    70  func (s *storeClusterService) GetClusterNetworkServiceByNetworkID(ctx context.Context, clusterEdgeID, networkServiceID string) (*model.ClusterNetworkServiceInfo, error) {
    71  	networkService := &model.ClusterNetworkServiceInfo{}
    72  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetClusterNetworkServiceByNetworkID, clusterEdgeID, networkServiceID)
    73  	if err := row.Scan(&networkService.NetworkServiceID, &networkService.IP, &networkService.Family, &networkService.ServiceType, &networkService.Priority); err != nil {
    74  		return networkService, sqlerr.Wrap(err)
    75  	}
    76  	return networkService, nil
    77  }
    78  
    79  func (s *storeClusterService) CreateClusterNetworkService(ctx context.Context, clusterEdgeID string, networkService *model.CreateNetworkServiceInfo) (*model.ClusterNetworkServiceInfo, error) {
    80  	netService := createNetServiceToNetService(networkService)
    81  
    82  	existingNetworkServices, err := s.GetClusterNetworkServices(ctx, clusterEdgeID)
    83  	if err != nil {
    84  		return nil, err
    85  	}
    86  
    87  	if err := networkServiceValidators[networkService.ServiceType](netService, existingNetworkServices); err != nil {
    88  		return nil, err
    89  	}
    90  
    91  	if hasDuplicateIP(netService, existingNetworkServices) {
    92  		return nil, fmt.Errorf("IP address %s already exists for %s", netService.IP, netService.ServiceType)
    93  	}
    94  
    95  	priority := validateCreateNetworkServicePriorityField(networkService.Priority)
    96  	result := s.SQLDB.QueryRowContext(ctx, sqlquery.CreateClusterNetworkServices, clusterEdgeID, networkService.IP, networkService.Family, networkService.ServiceType, priority)
    97  	if err := castNetworkServiceResult(netService, result); err != nil {
    98  		return nil, err
    99  	}
   100  
   101  	return netService, nil
   102  }
   103  
   104  func (s *storeClusterService) CreateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.CreateNetworkServiceInfo) ([]*model.ClusterNetworkServiceInfo, error) {
   105  	var allErrs error
   106  	networkServices := []*model.ClusterNetworkServiceInfo{}
   107  
   108  	for _, networkService := range networkServicesInfo {
   109  		netService, err := s.CreateClusterNetworkService(ctx, clusterEdgeID, networkService)
   110  		if err != nil {
   111  			allErrs = multierror.Append(err, allErrs)
   112  		}
   113  
   114  		networkServices = append(networkServices, netService)
   115  	}
   116  
   117  	return networkServices, allErrs
   118  }
   119  
   120  func (s *storeClusterService) UpdateClusterNetworkService(ctx context.Context, clusterEdgeID string, networkService *model.UpdateNetworkServiceInfo, serviceType string) (*model.ClusterNetworkServiceInfo, error) {
   121  	// netService := model.ClusterNetworkServiceInfo(*networkService)
   122  	netService := updateNetServiceToNetService(networkService, serviceType)
   123  
   124  	clusterNetworkServices, err := s.GetClusterNetworkServices(ctx, clusterEdgeID)
   125  	if err != nil {
   126  		return nil, err
   127  	}
   128  
   129  	if err := networkServiceValidators[serviceType](netService, clusterNetworkServices); err != nil {
   130  		return nil, err
   131  	}
   132  
   133  	if hasDuplicateIP(netService, clusterNetworkServices) {
   134  		return nil, fmt.Errorf("IP address %s already exists for %s", netService.IP, netService.ServiceType)
   135  	}
   136  
   137  	if networkService.Priority != nil {
   138  		priority := validateUpdateNetworkServicePriorityField(networkService.Priority)
   139  		networkService.Priority = &priority
   140  	}
   141  
   142  	result := s.SQLDB.QueryRowContext(ctx, sqlquery.UpdateClusterNetworkServices, netService.IP, netService.Family, netService.Priority, netService.NetworkServiceID, clusterEdgeID)
   143  	if err := castNetworkServiceResult(netService, result); err != nil {
   144  		return nil, err
   145  	}
   146  
   147  	// if we have updated the service network cidr,
   148  	// we should update the cluster dns ip to ensure its within the same subnet
   149  	if netService.ServiceType == constants.ServiceTypeServiceNetworkCIDR {
   150  		if err := s.updateClusterDNSIP(ctx, clusterEdgeID, netService.IP); err != nil {
   151  			return nil, err
   152  		}
   153  	}
   154  
   155  	return netService, nil
   156  }
   157  
   158  func (s *storeClusterService) UpdateClusterNetworkServices(ctx context.Context, clusterEdgeID string, networkServicesInfo []*model.UpdateNetworkServiceInfo, existingServiceTypesByID map[string]string) ([]*model.ClusterNetworkServiceInfo, error) {
   159  	var allErrs error
   160  	networkServices := []*model.ClusterNetworkServiceInfo{}
   161  
   162  	for _, networkService := range networkServicesInfo {
   163  		if networkService == nil {
   164  			continue
   165  		}
   166  
   167  		netService, err := s.UpdateClusterNetworkService(ctx, clusterEdgeID, networkService, existingServiceTypesByID[networkService.NetworkServiceID])
   168  		if err != nil {
   169  			allErrs = multierror.Append(err, allErrs)
   170  		}
   171  
   172  		networkServices = append(networkServices, netService)
   173  	}
   174  
   175  	return networkServices, allErrs
   176  }
   177  
   178  func (s *storeClusterService) DeleteClusterNetworkService(ctx context.Context, clusterEdgeID, networkServiceID string) (bool, error) {
   179  	networkService, err := s.GetClusterNetworkServiceByNetworkID(ctx, clusterEdgeID, networkServiceID)
   180  	if err != nil {
   181  		return false, err
   182  	}
   183  
   184  	switch networkService.ServiceType {
   185  	case
   186  		constants.ServiceTypePodNetworkCIDR,
   187  		constants.ServiceTypeServiceNetworkCIDR,
   188  		constants.ServiceTypeClusterDNS:
   189  		return true, fmt.Errorf("adhoc deletion of the %s is forbidden", networkService.ServiceType)
   190  	}
   191  
   192  	_, err = s.SQLDB.ExecContext(ctx, sqlquery.DeleteClusterNetworkService, clusterEdgeID, networkServiceID)
   193  	if err != nil {
   194  		return false, err
   195  	}
   196  	return true, nil
   197  }
   198  
   199  func (s *storeClusterService) GetClusterK8sNetworkServices(ctx context.Context, clusterEdgeID string) (map[string]string, error) {
   200  	clusterNetworkServices, err := s.GetClusterNetworkServices(ctx, clusterEdgeID)
   201  	if err != nil {
   202  		return nil, err
   203  	}
   204  	services := map[string]string{
   205  		constants.ServiceTypeClusterDNS:         "",
   206  		constants.ServiceTypePodNetworkCIDR:     "",
   207  		constants.ServiceTypeServiceNetworkCIDR: "",
   208  		constants.ServiceTypeEgressTunnelsCIDR:  "",
   209  	}
   210  	for _, clusterNetworkService := range clusterNetworkServices {
   211  		_, exists := services[clusterNetworkService.ServiceType]
   212  		if exists {
   213  			services[clusterNetworkService.ServiceType] = clusterNetworkService.IP
   214  		}
   215  	}
   216  	return services, nil
   217  }
   218  
   219  func (s *storeClusterService) GetK8sClusterNetworkService(ctx context.Context, clusterEdgeID, networkService string) (*model.ClusterNetworkServiceInfo, error) {
   220  	clusterNetworkServices, err := s.GetClusterNetworkServices(ctx, clusterEdgeID)
   221  	if err != nil {
   222  		return nil, err
   223  	}
   224  	return getNetworkServiceFromList(clusterNetworkServices, networkService)
   225  }
   226  
   227  func getNetworkServiceFromList(clusterNetworkServices []*model.ClusterNetworkServiceInfo, networkService string) (*model.ClusterNetworkServiceInfo, error) {
   228  	for _, service := range clusterNetworkServices {
   229  		if service.ServiceType == networkService {
   230  			return service, nil
   231  		}
   232  	}
   233  	return nil, fmt.Errorf("could not find network service %s", networkService)
   234  }
   235  
   236  func (s *storeClusterService) updateClusterDNSIP(ctx context.Context, clusterEdgeID, serviceCIDR string) error {
   237  	clusterDNS, err := s.GetK8sClusterNetworkService(ctx, clusterEdgeID, constants.ServiceTypeClusterDNS)
   238  	if err != nil {
   239  		return err
   240  	}
   241  
   242  	// Set DNS IP to 10th IP in service cidr subnet
   243  	_, serviceSubnet, _ := net.ParseCIDR(serviceCIDR)
   244  	dnsIP, err := netutils.GetIndexedIP(serviceSubnet, 10)
   245  	if err != nil {
   246  		return err
   247  	}
   248  
   249  	priority := defaultPriority
   250  
   251  	networkService := &model.ClusterNetworkServiceInfo{
   252  		NetworkServiceID: clusterDNS.NetworkServiceID,
   253  		ServiceType:      constants.ServiceTypeClusterDNS,
   254  		IP:               dnsIP.String(),
   255  		Family:           "inet",
   256  		Priority:         &priority,
   257  	}
   258  
   259  	result := s.SQLDB.QueryRowContext(ctx, sqlquery.UpdateClusterNetworkServices, networkService.IP, networkService.Family, networkService.Priority, networkService.NetworkServiceID, clusterEdgeID)
   260  	return castNetworkServiceResult(networkService, result)
   261  }
   262  
   263  func validateCreateNetworkServicePriorityField(priority *int) int {
   264  	if priority == nil {
   265  		return defaultPriority
   266  	}
   267  	if *priority < 1 {
   268  		return defaultPriority
   269  	}
   270  	return *priority
   271  }
   272  
   273  func validateUpdateNetworkServicePriorityField(priority *int) int {
   274  	if *priority < 1 {
   275  		return defaultPriority
   276  	}
   277  	return *priority
   278  }
   279  
   280  func createNetServiceToNetService(createNetService *model.CreateNetworkServiceInfo) *model.ClusterNetworkServiceInfo {
   281  	return &model.ClusterNetworkServiceInfo{
   282  		ServiceType: createNetService.ServiceType,
   283  		IP:          createNetService.IP,
   284  		Family:      createNetService.Family,
   285  		Priority:    createNetService.Priority,
   286  	}
   287  }
   288  func updateNetServiceToNetService(updateNetService *model.UpdateNetworkServiceInfo, serviceType string) *model.ClusterNetworkServiceInfo {
   289  	return &model.ClusterNetworkServiceInfo{
   290  		ServiceType:      serviceType,
   291  		NetworkServiceID: updateNetService.NetworkServiceID,
   292  		IP:               updateNetService.IP,
   293  		Family:           updateNetService.Family,
   294  		Priority:         updateNetService.Priority,
   295  	}
   296  }
   297  
   298  func castNetworkServiceResult(networkService *model.ClusterNetworkServiceInfo, result *sql.Row) error {
   299  	if err := result.Err(); err != nil {
   300  		return sqlerr.Wrap(err)
   301  	}
   302  
   303  	if err := result.Scan(&networkService.NetworkServiceID, &networkService.IP, &networkService.Family, &networkService.ServiceType, &networkService.Priority); err != nil {
   304  		return sqlerr.Wrap(err)
   305  	}
   306  	return nil
   307  }
   308  
   309  func validateNetworkServiceIPAddress(networkService *model.ClusterNetworkServiceInfo, _ []*model.ClusterNetworkServiceInfo) error {
   310  	if net.ParseIP(networkService.IP) == nil {
   311  		return fmt.Errorf("invalid IP address %s for %s", networkService.IP, networkService.ServiceType)
   312  	}
   313  	return nil
   314  }
   315  
   316  func validateNetworkServiceNTP(networkService *model.ClusterNetworkServiceInfo, _ []*model.ClusterNetworkServiceInfo) error {
   317  	if !(networkvalidator.IsValidDomain(networkService.IP) || networkvalidator.ValidateIP(networkService.IP)) {
   318  		return fmt.Errorf("invalid IP/domain address %s for %s", networkService.IP, networkService.ServiceType)
   319  	}
   320  	return nil
   321  }
   322  
   323  func validateNetworkServiceClusterDNS(_ *model.ClusterNetworkServiceInfo, _ []*model.ClusterNetworkServiceInfo) error {
   324  	return nil
   325  }
   326  
   327  func validateK8sNetworkCIDR(networkService *model.ClusterNetworkServiceInfo, clusterNetworkServices []*model.ClusterNetworkServiceInfo) error {
   328  	_, network, err := net.ParseCIDR(networkService.IP)
   329  	if err != nil || network == nil {
   330  		return fmt.Errorf("invalid CIDR address %s for %s", networkService.IP, networkService.ServiceType)
   331  	}
   332  	if err := networkRangeValidators[networkService.ServiceType](network.Mask); err != nil {
   333  		return err
   334  	}
   335  	return validateDisjointSubnets(networkService, clusterNetworkServices)
   336  }
   337  
   338  func hasDuplicateIP(networkService *model.ClusterNetworkServiceInfo, clusterNetworkServices []*model.ClusterNetworkServiceInfo) bool {
   339  	for _, clusterNetworkService := range clusterNetworkServices {
   340  		if networkService.ServiceType == clusterNetworkService.ServiceType && networkService.IP == clusterNetworkService.IP {
   341  			if networkService.NetworkServiceID == clusterNetworkService.NetworkServiceID {
   342  				// The same network service is being updated, so duplicate IPs are expected here
   343  				continue
   344  			}
   345  			// Network service is being created/updated with a non-unique {ServiceType, IP} pairing
   346  			return true
   347  		}
   348  	}
   349  	return false
   350  }
   351  
   352  func validatePodNetworkRange(mask net.IPMask) error {
   353  	prefixLen, _ := mask.Size()
   354  	if prefixLen < 16 || prefixLen > 21 {
   355  		return fmt.Errorf("invalid prefix length. Prefix length must be between /16 and /21 for k8s pod network")
   356  	}
   357  	return nil
   358  }
   359  
   360  func validateServiceNetworkRange(mask net.IPMask) error {
   361  	prefixLen, _ := mask.Size()
   362  	if prefixLen < 16 || prefixLen > 22 {
   363  		return fmt.Errorf("invalid prefix length. Prefix length must be between /16 and /22 for k8s service network")
   364  	}
   365  	return nil
   366  }
   367  
   368  func validateEgressTunnelNetworkRange(mask net.IPMask) error {
   369  	prefixLen, _ := mask.Size()
   370  	if prefixLen < 22 || prefixLen > 31 {
   371  		return fmt.Errorf("invalid prefix length. Prefix length must be between /16 and /22 for egress gateway tunnels")
   372  	}
   373  	return nil
   374  }
   375  
   376  func validateDisjointSubnets(networkService *model.ClusterNetworkServiceInfo, clusterNetworkServices []*model.ClusterNetworkServiceInfo) error {
   377  	subnets := []string{constants.ServiceTypePodNetworkCIDR, constants.ServiceTypeServiceNetworkCIDR}
   378  	_, subnetToValidate, _ := net.ParseCIDR(networkService.IP)
   379  	for _, subnet := range subnets {
   380  		if subnet != networkService.ServiceType {
   381  			subnetService, err := getNetworkServiceFromList(clusterNetworkServices, subnet)
   382  			if err != nil {
   383  				if strings.Contains(err.Error(), "could not find network service") {
   384  					return nil
   385  				}
   386  				return err
   387  			}
   388  			_, otherSubnet, _ := net.ParseCIDR(subnetService.IP)
   389  			if subnetToValidate.Contains(otherSubnet.IP) || otherSubnet.Contains(subnetToValidate.IP) {
   390  				return fmt.Errorf("invalid subnet - %s must not overlap with %s", networkService.ServiceType, subnet)
   391  			}
   392  		}
   393  	}
   394  	return nil
   395  }
   396  

View as plain text