package middlechild import ( "context" "encoding/json" "errors" "fmt" "strings" "time" "cloud.google.com/go/pubsub" store "github.com/Shopify/go-storage" "github.com/gin-gonic/gin" "github.com/go-logr/logr" "github.com/joshdk/go-junit" "edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage" "edge-infra.dev/pkg/f8n/devinfra/job" "edge-infra.dev/pkg/f8n/devinfra/testinfra/model" tisql "edge-infra.dev/pkg/f8n/devinfra/testinfra/sql" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/lib/runtime/manager" "edge-infra.dev/pkg/lib/runtime/subscriber" ) type MiddleChild struct { Logger logr.Logger db *tisql.DBHandle } type Job struct { JobData model.EdgeJob Metadata []model.EdgeJobMetadata Tests []model.EdgeJobTest files storage.Files StartedTime int64 FinishedTime int64 } type NotifData struct { Name string `json:"name"` Bucket string `json:"bucket"` } const ( defaultSubscription = "edge-job-storage-subscription" defaultProjectID = "ret-edge-pltf-infra" ) func init() { gin.SetMode(gin.ReleaseMode) } func New() (*MiddleChild, error) { log := fog.New().WithName("middlechild") _db, err := tisql.FromEnv() if err != nil { log.Error(err, "failed to initialize sql conn") return nil, err } return &MiddleChild{Logger: log, db: _db}, nil } // Main creates a new client on a given subscription and monitors for a notification // that includes a finished.json file func (mc *MiddleChild) Start() error { ctx := context.Background() mc.Logger.Info("monitoring for notifications") mgr, err := manager.New(manager.Options{HealthProbeBindAddress: ":8080"}) if err != nil { mc.Logger.Error(err, "failed to create a new manager") return err } sub, err := subscriber.New(subscriber.Options{ ProjectID: defaultProjectID, ID: defaultSubscription, Handler: mc, }) if err != nil { mc.Logger.Error(err, "failed to create a new subscriber") return err } if err := mgr.Add(sub); err != nil { mc.Logger.Error(err, "failed to add pubsub sub") return err } if err := mgr.Start(ctx); err != nil { mc.Logger.Error(err, "middlechild errored") return err } return nil } func (mc *MiddleChild) HandleMsg(ctx context.Context, msg *pubsub.Message) error { // if a finished.json is detected act otherwise ack and move on if strings.Contains(msg.Attributes["objectId"], "/"+storage.FinishedFilename) { mc.Logger.Info("handling a notification", "message", msg) err := mc.handleNotification(ctx, msg) if err != nil && errors.Is(err, tisql.ErrDuplicate) { // if the message is a duplicate ack it and move on mc.Logger.Info("acking duplicate message", "path", msg.Attributes["objectId"]) msg.Ack() return nil } if err != nil { mc.Logger.Error(err, "encountered an error handling notification (SQL)", "path", msg.Attributes["objectId"]) msg.Nack() } } // ack everything else msg.Ack() return nil } // handle a finished.json notification func (mc *MiddleChild) handleNotification(ctx context.Context, m *pubsub.Message) error { if mc.db == nil { return fmt.Errorf("failed to handle notification. sql handle is nil") } mcj := &Job{} notif := &NotifData{} // set the logger into the context fog.IntoContext(ctx, mc.Logger) // get the bucket and name from the notification err := json.Unmarshal(m.Data, notif) if err != nil { return fmt.Errorf("failed to unmarshal the pubsub message: %w", err) } // create a new storage object s, err := storage.New(ctx) if err != nil { return fmt.Errorf("failed to create storage client: %w", err) } b, err := s.NewBucket(ctx) if err != nil { return fmt.Errorf("failed to create bucket handler: %w", err) } // remove the filename from the notification name to get the bucket path prefix := strings.ReplaceAll(notif.Name, "/"+storage.FinishedFilename, "") // get all the files stored within the bucket path mcj.files, err = b.RetrieveAllFiles(ctx, prefix) if err != nil { return fmt.Errorf("failed to fetch files: %w", err) } // unmarshal and set all the started data err = mc.gatherStarted(mcj) if err != nil { return fmt.Errorf("failed to gather started data: %w", err) } // unmarshal and set all the finished data err = mc.gatherFinished(mcj) if err != nil { return fmt.Errorf("failed to gather finished data: %w", err) } // unmarshal and set all the test data err = mc.gatherTests(mcj) if err != nil { return fmt.Errorf("failed to gather test data: %w", err) } err = argoLogs( ctx, mcj, store.NewCloudStorageFS(storage.DefaultBucket, nil), store.NewCloudStorageFS(argoLogsBucket, nil), ) if err != nil { return fmt.Errorf("failed to fetch argo logs: %w", err) } // build and set the bucket path mcj.JobData.Path = "gs://" + notif.Bucket + "/" + prefix // attempt to insert the JobData and return return mc.insert(ctx, mcj) } // gatherFinished unmarshals the finished file and set the dependent fields in JobData func (mc *MiddleChild) gatherFinished(mcj *Job) error { finished := &job.FinishedJSON{} err := json.Unmarshal(mcj.files.JSON["finished.json"], finished) if err != nil { return fmt.Errorf("failed to unmarshal finished.json: %w", err) } mcj.JobData.Passed = finished.Passed mcj.JobData.Finished = time.Unix(finished.Timestamp, 0).UTC() mcj.JobData.Elapsed = mcj.JobData.Finished.Sub(mcj.JobData.Started) mcj.JobData.Number = finished.Metadata["number"] mcj.JobData.Run = finished.Metadata["run_id"] mcj.JobData.Job = finished.Metadata["job_name"] mcj.JobData.Workflow = finished.Metadata["workflow_name"] for key, value := range finished.Metadata { mcj.Metadata = append(mcj.Metadata, model.EdgeJobMetadata{Key: key, Value: value}) } return nil } // gatherStartedc unmarshals the started file and set the dependent fields in JobData func (mc *MiddleChild) gatherStarted(mcj *Job) error { started := &job.StartedJSON{} startedFile := mcj.files.JSON["started.json"] if startedFile == nil { // TODO(ss185994): skip for now return nil } err := json.Unmarshal(startedFile, started) if err != nil { return fmt.Errorf("failed to unmarshal started.json: %w", err) } mcj.JobData.Started = time.Unix(started.Timestamp, 0).UTC() mcj.JobData.Version = started.Version mcj.JobData.Repos = started.Repo mcj.JobData.RepoCommit = started.Commit mcj.Metadata = append(mcj.Metadata, model.EdgeJobMetadata{Key: "pull", Value: started.Pull}) return nil } // gatherTests parses through all the test data and generates an entry into JobData func (mc *MiddleChild) gatherTests(mcj *Job) error { tests := []model.EdgeJobTest{} for n, v := range mcj.files.XML { t, testCount, failCount, err := parseJUnit(v) if err != nil { return fmt.Errorf("failed to parse %s: %w", n, err) } tests = append(tests, t...) mcj.JobData.TestsRun = mcj.JobData.TestsRun + testCount mcj.JobData.TestsFailed = mcj.JobData.TestsFailed + failCount } mcj.Tests = tests return nil } // insert attempts to insert the JobData struct into postgres func (mc *MiddleChild) insert(ctx context.Context, mcj *Job) error { // TODO(ss185994): rewrite to do in single sql query instead of looping and setting FKeys mc.Logger.Info("inserting job") jobID, err := mc.db.InsertEdgeJob(ctx, mcj.JobData) if err != nil { return err } mc.Logger.Info("job id", "id", jobID.String()) for _, meta := range mcj.Metadata { meta.EdgeJob = jobID _, err := mc.db.InsertEdgeJobMetadata(ctx, meta) if err != nil { return err } } for _, test := range mcj.Tests { test.EdgeJob = jobID _, err := mc.db.InsertEdgeJobTest(ctx, test) if err != nil { return err } } // pretty print struct b, err := json.MarshalIndent(mcj, "", " ") if err != nil { return err } mc.Logger.Info("inserted job", "job", string(b)) return nil } // parseJUnit will add individual tests within a suite to a Test struct list func parseJUnit(t []byte) ([]model.EdgeJobTest, int, int, error) { tests := []model.EdgeJobTest{} // unmarshal the xml suites, err := junit.Ingest(t) if err != nil { return tests, 0, 0, fmt.Errorf("failed to ingest junit data: %w", err) } numTests := 0 failedTests := 0 // go through the suites and breakdown the tests into relevant info for _, suite := range suites { numTests = numTests + suite.Totals.Tests failedTests = failedTests + suite.Totals.Failed for _, test := range suite.Tests { singletest := model.EdgeJobTest{} singletest.Name = test.Name singletest.Time = test.Duration singletest.Failed = false singletest.Suite = suite.Name singletest.FailureText = "" if test.Error != nil { singletest.Failed = true singletest.FailureText = fmt.Sprintf("%v", test.Error.Error()) } tests = append(tests, singletest) } } return tests, numTests, failedTests, nil }