...

Source file src/edge-infra.dev/pkg/edge/api/services/artifacts/artifacts_service.go

Documentation: edge-infra.dev/pkg/edge/api/services/artifacts

     1  package artifacts
     2  
     3  import (
     4  	"context"
     5  	"database/sql"
     6  	"fmt"
     7  	"slices"
     8  	"strings"
     9  
    10  	"github.com/google/go-containerregistry/pkg/name"
    11  
    12  	sqlerr "edge-infra.dev/pkg/edge/api/apierror/sql"
    13  	"edge-infra.dev/pkg/edge/api/services/interfaces"
    14  	sqlquery "edge-infra.dev/pkg/edge/api/sql"
    15  	"edge-infra.dev/pkg/edge/api/types"
    16  	"edge-infra.dev/pkg/edge/api/utils"
    17  )
    18  
    19  //go:generate mockgen -destination=../../mocks/mock_artifacts_service.go -package=mocks -mock_names=Service=MockArtifactsService edge-infra.dev/pkg/edge/api/services/artifacts Service
    20  type Service interface {
    21  	GetAvailableArtifactVersions(ctx context.Context, name string) ([]types.ArtifactVersion, error)
    22  	GetLatestAvailableArtifactVersion(ctx context.Context, name string) (*types.ArtifactVersion, error)
    23  	GetClusterArtifactVersions(ctx context.Context, clusterEdgeID string) ([]types.ArtifactVersion, error)
    24  	UpdateClusterFleetVersionAndArtifact(ctx context.Context, clusterEdgeID, version string) error
    25  	UpdateClustersToLatestArtifactVersion(ctx context.Context, name string) (clustersUpdated int, err error)
    26  	AddClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error
    27  	DeleteClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error
    28  }
    29  
    30  type artifactsService struct {
    31  	SQLDB               *sql.DB
    32  	ClusterLabelService interfaces.ClusterLabelService
    33  }
    34  
    35  func NewArtifactsService(sqlDB *sql.DB, clusterLabelSvc interfaces.ClusterLabelService) Service {
    36  	return &artifactsService{
    37  		SQLDB:               sqlDB,
    38  		ClusterLabelService: clusterLabelSvc,
    39  	}
    40  }
    41  
    42  // GetAvailableArtifactVersions gets the list of installable versions for an artifact
    43  func (s *artifactsService) GetAvailableArtifactVersions(ctx context.Context, name string) ([]types.ArtifactVersion, error) {
    44  	artifacts := []types.ArtifactVersion{}
    45  
    46  	rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetAvailableArtifactVersions, name)
    47  	if err != nil {
    48  		return []types.ArtifactVersion{}, err
    49  	}
    50  	for rows.Next() {
    51  		var a types.ArtifactVersion
    52  		if err := rows.Scan(&a.Name, &a.Version); err != nil {
    53  			return []types.ArtifactVersion{}, err
    54  		}
    55  		artifacts = append(artifacts, a)
    56  	}
    57  	if err := rows.Err(); err != nil {
    58  		return nil, sqlerr.Wrap(err)
    59  	}
    60  
    61  	return artifacts, nil
    62  }
    63  
    64  // GetLatestAvailableArtifactVersion gets the latest installable version of an artifact
    65  func (s *artifactsService) GetLatestAvailableArtifactVersion(ctx context.Context, name string) (*types.ArtifactVersion, error) {
    66  	artifact := &types.ArtifactVersion{
    67  		Name: name,
    68  	}
    69  	row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetLatestAvailableArtifactVersion, name)
    70  	if err := row.Scan(&artifact.Version); err != nil {
    71  		return nil, sqlerr.Wrap(err)
    72  	}
    73  
    74  	return artifact, nil
    75  }
    76  
    77  // GetClusterArtifactVersions gets the desired set of artifacts that should be scheduled to a cluster. Ie the name and a tag or digest
    78  // of a pallet to use in the cluster's Shipment
    79  func (s *artifactsService) GetClusterArtifactVersions(ctx context.Context, clusterEdgeID string) ([]types.ArtifactVersion, error) {
    80  	artifacts := []types.ArtifactVersion{}
    81  	rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterArtifactVersions, clusterEdgeID)
    82  	if err != nil {
    83  		return []types.ArtifactVersion{}, err
    84  	}
    85  	for rows.Next() {
    86  		var a types.ArtifactVersion
    87  		if err := rows.Scan(&a.Name, &a.Version); err != nil {
    88  			return []types.ArtifactVersion{}, err
    89  		}
    90  		artifacts = append(artifacts, a)
    91  	}
    92  	if err := rows.Err(); err != nil {
    93  		return nil, sqlerr.Wrap(err)
    94  	}
    95  	return artifacts, nil
    96  }
    97  
    98  // UpdateClusterFleetVersionAndArtifact sets cluster.fleet_version and updates the desired version of the corresponding fleet
    99  // artifact, e.g. store, that should be scheduled to the cluster
   100  func (s *artifactsService) UpdateClusterFleetVersionAndArtifact(ctx context.Context, clusterEdgeID, version string) error {
   101  	// get the cluster's fleet type to use as the artifact name
   102  	fleetType, err := s.ClusterLabelService.FetchFleetType(ctx, clusterEdgeID)
   103  	if err != nil {
   104  		return err
   105  	}
   106  
   107  	// resolve latest version
   108  	if version == types.DefaultVersionTag {
   109  		latestVersion, err := s.GetLatestAvailableArtifactVersion(ctx, fleetType.String())
   110  		if err != nil {
   111  			return fmt.Errorf("failed to resolve latest version of artifact %s. err: %v", fleetType, err)
   112  		}
   113  		version = latestVersion.Version
   114  	}
   115  
   116  	// check that the {fleet_type}(@|:){fleet_version} forms a valid image reference with either a tag or digest
   117  	if err := validateWeakImageRef(fleetType.String(), version); err != nil {
   118  		return err
   119  	}
   120  
   121  	// update clusters.fleet_version and cluster_artifact_versions for the default fleet artifact
   122  	tx, err := s.SQLDB.BeginTx(ctx, nil)
   123  	if err != nil {
   124  		return err
   125  	}
   126  	if _, err = tx.ExecContext(ctx, sqlquery.UpdateClusterFleetVersion, clusterEdgeID, version); err != nil {
   127  		errors := utils.NewErrorWrapper().AddError(err, "error executing UpdateClusterFleetVersion")
   128  		if rollbackErr := tx.Rollback(); rollbackErr != nil {
   129  			errors.AddError(err, "failed to roll back transaction")
   130  		}
   131  		return errors.Errors
   132  	}
   133  	if _, err = tx.ExecContext(ctx, sqlquery.UpdateClusterArtifactVersions, clusterEdgeID, version); err != nil {
   134  		errors := utils.NewErrorWrapper().AddError(err, "error executing UpdateClusterArtifactVersions")
   135  		if rollbackErr := tx.Rollback(); rollbackErr != nil {
   136  			errors.AddError(err, "failed to roll back transaction")
   137  		}
   138  		return errors.Errors
   139  	}
   140  	if err := tx.Commit(); err != nil {
   141  		return fmt.Errorf("failed to commit transaction. err: %v", err)
   142  	}
   143  
   144  	return nil
   145  }
   146  
   147  // UpdateClustersToLatestArtifactVersion updates all clusters with auto-update enabled to the latest version of
   148  // the given artifact. If the fleet type of the cluster matches the artifact name, eg store, the cluster's fleet_version
   149  // is also updated
   150  func (s *artifactsService) UpdateClustersToLatestArtifactVersion(ctx context.Context, name string) (int, error) {
   151  	res, err := s.SQLDB.ExecContext(ctx, sqlquery.UpdateClustersToLatestFleetArtifactVersion, name)
   152  	if err != nil {
   153  		return -1, err
   154  	}
   155  	n, err := res.RowsAffected()
   156  	if err != nil {
   157  		return -1, err
   158  	}
   159  	return int(n), nil
   160  }
   161  
   162  // AddClusterArtifactVersion adds an artifact to a cluster's inventory of artifact versions
   163  func (s *artifactsService) AddClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error {
   164  	var storeVersion string
   165  	// retrieve infra version for artifact
   166  	artifactVersions, err := s.GetClusterArtifactVersions(ctx, clusterEdgeID)
   167  	if err != nil {
   168  		return err
   169  	}
   170  
   171  	// get the cluster's fleet type to fetch version
   172  	fleetType, err := s.ClusterLabelService.FetchFleetType(ctx, clusterEdgeID)
   173  	if err != nil {
   174  		return err
   175  	}
   176  
   177  	for _, artifact := range artifactVersions {
   178  		if artifact.Name == fleetType.String() {
   179  			storeVersion = artifact.Version
   180  		}
   181  	}
   182  
   183  	if _, err := transaction.ExecContext(ctx, sqlquery.CreateClusterArtifactVersion, clusterEdgeID, name, storeVersion); err != nil {
   184  		return err
   185  	}
   186  	return nil
   187  }
   188  
   189  // DeleteClusterArtifactVersion deletes an artifact from a cluster's inventory of artifact versions
   190  func (s *artifactsService) DeleteClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error {
   191  	//do nothing if artifact doesn't exist
   192  	clusterArtifactVersions, err := s.GetClusterArtifactVersions(ctx, clusterEdgeID)
   193  	if err != nil {
   194  		return err
   195  	}
   196  	exists := slices.ContainsFunc(clusterArtifactVersions, func(artifact types.ArtifactVersion) bool {
   197  		return artifact.Name == name
   198  	})
   199  	if !exists {
   200  		return nil
   201  	}
   202  
   203  	if _, err := transaction.ExecContext(ctx, sqlquery.DeleteClusterArtifactVersion, clusterEdgeID, name); err != nil {
   204  		return err
   205  	}
   206  	return nil
   207  }
   208  
   209  func validateWeakImageRef(fleet, version string) error {
   210  	if fleet == "" || version == "" {
   211  		return fmt.Errorf("name and version are required to build image reference. name: %v, version: %v", fleet, version)
   212  	}
   213  	if strings.HasPrefix(version, ".") || strings.HasPrefix(version, "-") {
   214  		return fmt.Errorf("version cannot start with a period or hyphen. version: %v", version)
   215  	}
   216  	d, dErr := name.NewDigest(fmt.Sprintf("%s@%s", fleet, version), name.WeakValidation)
   217  	if dErr == nil {
   218  		if _, refErr := name.ParseReference(d.String(), name.WeakValidation); refErr != nil {
   219  			return fmt.Errorf("could not parse digest as image reference. err: %v", refErr)
   220  		}
   221  	}
   222  
   223  	t, tErr := name.NewTag(fmt.Sprintf("%s:%s", fleet, version), name.WeakValidation)
   224  	if tErr == nil {
   225  		if _, refErr := name.ParseReference(t.String(), name.WeakValidation); refErr != nil {
   226  			return fmt.Errorf("could not parse tag as image reference. err: %v", refErr)
   227  		}
   228  	}
   229  
   230  	if dErr != nil && tErr != nil {
   231  		return fmt.Errorf("invalid artifact name or version. could not parse either tag or digest. err(s): [%v, %v]", dErr, tErr)
   232  	}
   233  
   234  	return nil
   235  }
   236  

View as plain text