package artifacts import ( "context" "database/sql" "fmt" "slices" "strings" "github.com/google/go-containerregistry/pkg/name" sqlerr "edge-infra.dev/pkg/edge/api/apierror/sql" "edge-infra.dev/pkg/edge/api/services/interfaces" sqlquery "edge-infra.dev/pkg/edge/api/sql" "edge-infra.dev/pkg/edge/api/types" "edge-infra.dev/pkg/edge/api/utils" ) //go:generate mockgen -destination=../../mocks/mock_artifacts_service.go -package=mocks -mock_names=Service=MockArtifactsService edge-infra.dev/pkg/edge/api/services/artifacts Service type Service interface { GetAvailableArtifactVersions(ctx context.Context, name string) ([]types.ArtifactVersion, error) GetLatestAvailableArtifactVersion(ctx context.Context, name string) (*types.ArtifactVersion, error) GetClusterArtifactVersions(ctx context.Context, clusterEdgeID string) ([]types.ArtifactVersion, error) UpdateClusterFleetVersionAndArtifact(ctx context.Context, clusterEdgeID, version string) error UpdateClustersToLatestArtifactVersion(ctx context.Context, name string) (clustersUpdated int, err error) AddClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error DeleteClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error } type artifactsService struct { SQLDB *sql.DB ClusterLabelService interfaces.ClusterLabelService } func NewArtifactsService(sqlDB *sql.DB, clusterLabelSvc interfaces.ClusterLabelService) Service { return &artifactsService{ SQLDB: sqlDB, ClusterLabelService: clusterLabelSvc, } } // GetAvailableArtifactVersions gets the list of installable versions for an artifact func (s *artifactsService) GetAvailableArtifactVersions(ctx context.Context, name string) ([]types.ArtifactVersion, error) { artifacts := []types.ArtifactVersion{} rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetAvailableArtifactVersions, name) if err != nil { return []types.ArtifactVersion{}, err } for rows.Next() { var a types.ArtifactVersion if err := rows.Scan(&a.Name, &a.Version); err != nil { return []types.ArtifactVersion{}, err } artifacts = append(artifacts, a) } if err := rows.Err(); err != nil { return nil, sqlerr.Wrap(err) } return artifacts, nil } // GetLatestAvailableArtifactVersion gets the latest installable version of an artifact func (s *artifactsService) GetLatestAvailableArtifactVersion(ctx context.Context, name string) (*types.ArtifactVersion, error) { artifact := &types.ArtifactVersion{ Name: name, } row := s.SQLDB.QueryRowContext(ctx, sqlquery.GetLatestAvailableArtifactVersion, name) if err := row.Scan(&artifact.Version); err != nil { return nil, sqlerr.Wrap(err) } return artifact, nil } // GetClusterArtifactVersions gets the desired set of artifacts that should be scheduled to a cluster. Ie the name and a tag or digest // of a pallet to use in the cluster's Shipment func (s *artifactsService) GetClusterArtifactVersions(ctx context.Context, clusterEdgeID string) ([]types.ArtifactVersion, error) { artifacts := []types.ArtifactVersion{} rows, err := s.SQLDB.QueryContext(ctx, sqlquery.GetClusterArtifactVersions, clusterEdgeID) if err != nil { return []types.ArtifactVersion{}, err } for rows.Next() { var a types.ArtifactVersion if err := rows.Scan(&a.Name, &a.Version); err != nil { return []types.ArtifactVersion{}, err } artifacts = append(artifacts, a) } if err := rows.Err(); err != nil { return nil, sqlerr.Wrap(err) } return artifacts, nil } // UpdateClusterFleetVersionAndArtifact sets cluster.fleet_version and updates the desired version of the corresponding fleet // artifact, e.g. store, that should be scheduled to the cluster func (s *artifactsService) UpdateClusterFleetVersionAndArtifact(ctx context.Context, clusterEdgeID, version string) error { // get the cluster's fleet type to use as the artifact name fleetType, err := s.ClusterLabelService.FetchFleetType(ctx, clusterEdgeID) if err != nil { return err } // resolve latest version if version == types.DefaultVersionTag { latestVersion, err := s.GetLatestAvailableArtifactVersion(ctx, fleetType.String()) if err != nil { return fmt.Errorf("failed to resolve latest version of artifact %s. err: %v", fleetType, err) } version = latestVersion.Version } // check that the {fleet_type}(@|:){fleet_version} forms a valid image reference with either a tag or digest if err := validateWeakImageRef(fleetType.String(), version); err != nil { return err } // update clusters.fleet_version and cluster_artifact_versions for the default fleet artifact tx, err := s.SQLDB.BeginTx(ctx, nil) if err != nil { return err } if _, err = tx.ExecContext(ctx, sqlquery.UpdateClusterFleetVersion, clusterEdgeID, version); err != nil { errors := utils.NewErrorWrapper().AddError(err, "error executing UpdateClusterFleetVersion") if rollbackErr := tx.Rollback(); rollbackErr != nil { errors.AddError(err, "failed to roll back transaction") } return errors.Errors } if _, err = tx.ExecContext(ctx, sqlquery.UpdateClusterArtifactVersions, clusterEdgeID, version); err != nil { errors := utils.NewErrorWrapper().AddError(err, "error executing UpdateClusterArtifactVersions") if rollbackErr := tx.Rollback(); rollbackErr != nil { errors.AddError(err, "failed to roll back transaction") } return errors.Errors } if err := tx.Commit(); err != nil { return fmt.Errorf("failed to commit transaction. err: %v", err) } return nil } // UpdateClustersToLatestArtifactVersion updates all clusters with auto-update enabled to the latest version of // the given artifact. If the fleet type of the cluster matches the artifact name, eg store, the cluster's fleet_version // is also updated func (s *artifactsService) UpdateClustersToLatestArtifactVersion(ctx context.Context, name string) (int, error) { res, err := s.SQLDB.ExecContext(ctx, sqlquery.UpdateClustersToLatestFleetArtifactVersion, name) if err != nil { return -1, err } n, err := res.RowsAffected() if err != nil { return -1, err } return int(n), nil } // AddClusterArtifactVersion adds an artifact to a cluster's inventory of artifact versions func (s *artifactsService) AddClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error { var storeVersion string // retrieve infra version for artifact artifactVersions, err := s.GetClusterArtifactVersions(ctx, clusterEdgeID) if err != nil { return err } // get the cluster's fleet type to fetch version fleetType, err := s.ClusterLabelService.FetchFleetType(ctx, clusterEdgeID) if err != nil { return err } for _, artifact := range artifactVersions { if artifact.Name == fleetType.String() { storeVersion = artifact.Version } } if _, err := transaction.ExecContext(ctx, sqlquery.CreateClusterArtifactVersion, clusterEdgeID, name, storeVersion); err != nil { return err } return nil } // DeleteClusterArtifactVersion deletes an artifact from a cluster's inventory of artifact versions func (s *artifactsService) DeleteClusterArtifactVersion(ctx context.Context, transaction *sql.Tx, clusterEdgeID, name string) error { //do nothing if artifact doesn't exist clusterArtifactVersions, err := s.GetClusterArtifactVersions(ctx, clusterEdgeID) if err != nil { return err } exists := slices.ContainsFunc(clusterArtifactVersions, func(artifact types.ArtifactVersion) bool { return artifact.Name == name }) if !exists { return nil } if _, err := transaction.ExecContext(ctx, sqlquery.DeleteClusterArtifactVersion, clusterEdgeID, name); err != nil { return err } return nil } func validateWeakImageRef(fleet, version string) error { if fleet == "" || version == "" { return fmt.Errorf("name and version are required to build image reference. name: %v, version: %v", fleet, version) } if strings.HasPrefix(version, ".") || strings.HasPrefix(version, "-") { return fmt.Errorf("version cannot start with a period or hyphen. version: %v", version) } d, dErr := name.NewDigest(fmt.Sprintf("%s@%s", fleet, version), name.WeakValidation) if dErr == nil { if _, refErr := name.ParseReference(d.String(), name.WeakValidation); refErr != nil { return fmt.Errorf("could not parse digest as image reference. err: %v", refErr) } } t, tErr := name.NewTag(fmt.Sprintf("%s:%s", fleet, version), name.WeakValidation) if tErr == nil { if _, refErr := name.ParseReference(t.String(), name.WeakValidation); refErr != nil { return fmt.Errorf("could not parse tag as image reference. err: %v", refErr) } } if dErr != nil && tErr != nil { return fmt.Errorf("invalid artifact name or version. could not parse either tag or digest. err(s): [%v, %v]", dErr, tErr) } return nil }