...

Source file src/edge-infra.dev/pkg/f8n/devinfra/middlechild/middlechild.go

Documentation: edge-infra.dev/pkg/f8n/devinfra/middlechild

     1  package middlechild
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"errors"
     7  	"fmt"
     8  	"strings"
     9  	"time"
    10  
    11  	"cloud.google.com/go/pubsub"
    12  	store "github.com/Shopify/go-storage"
    13  	"github.com/gin-gonic/gin"
    14  	"github.com/go-logr/logr"
    15  	"github.com/joshdk/go-junit"
    16  
    17  	"edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage"
    18  	"edge-infra.dev/pkg/f8n/devinfra/job"
    19  	"edge-infra.dev/pkg/f8n/devinfra/testinfra/model"
    20  	tisql "edge-infra.dev/pkg/f8n/devinfra/testinfra/sql"
    21  	"edge-infra.dev/pkg/lib/fog"
    22  	"edge-infra.dev/pkg/lib/runtime/manager"
    23  	"edge-infra.dev/pkg/lib/runtime/subscriber"
    24  )
    25  
    26  type MiddleChild struct {
    27  	Logger logr.Logger
    28  	db     *tisql.DBHandle
    29  }
    30  
    31  type Job struct {
    32  	JobData  model.EdgeJob
    33  	Metadata []model.EdgeJobMetadata
    34  	Tests    []model.EdgeJobTest
    35  
    36  	files        storage.Files
    37  	StartedTime  int64
    38  	FinishedTime int64
    39  }
    40  
    41  type NotifData struct {
    42  	Name   string `json:"name"`
    43  	Bucket string `json:"bucket"`
    44  }
    45  
    46  const (
    47  	defaultSubscription = "edge-job-storage-subscription"
    48  	defaultProjectID    = "ret-edge-pltf-infra"
    49  )
    50  
    51  func init() {
    52  	gin.SetMode(gin.ReleaseMode)
    53  }
    54  
    55  func New() (*MiddleChild, error) {
    56  	log := fog.New().WithName("middlechild")
    57  
    58  	_db, err := tisql.FromEnv()
    59  	if err != nil {
    60  		log.Error(err, "failed to initialize sql conn")
    61  		return nil, err
    62  	}
    63  
    64  	return &MiddleChild{Logger: log, db: _db}, nil
    65  }
    66  
    67  // Main creates a new client on a given subscription and monitors for a notification
    68  // that includes a finished.json file
    69  func (mc *MiddleChild) Start() error {
    70  	ctx := context.Background()
    71  	mc.Logger.Info("monitoring for notifications")
    72  
    73  	mgr, err := manager.New(manager.Options{HealthProbeBindAddress: ":8080"})
    74  	if err != nil {
    75  		mc.Logger.Error(err, "failed to create a new manager")
    76  		return err
    77  	}
    78  
    79  	sub, err := subscriber.New(subscriber.Options{
    80  		ProjectID: defaultProjectID,
    81  		ID:        defaultSubscription,
    82  		Handler:   mc,
    83  	})
    84  	if err != nil {
    85  		mc.Logger.Error(err, "failed to create a new subscriber")
    86  		return err
    87  	}
    88  
    89  	if err := mgr.Add(sub); err != nil {
    90  		mc.Logger.Error(err, "failed to add pubsub sub")
    91  		return err
    92  	}
    93  
    94  	if err := mgr.Start(ctx); err != nil {
    95  		mc.Logger.Error(err, "middlechild errored")
    96  		return err
    97  	}
    98  
    99  	return nil
   100  }
   101  
   102  func (mc *MiddleChild) HandleMsg(ctx context.Context, msg *pubsub.Message) error {
   103  	// if a finished.json is detected act otherwise ack and move on
   104  	if strings.Contains(msg.Attributes["objectId"], "/"+storage.FinishedFilename) {
   105  		mc.Logger.Info("handling a notification", "message", msg)
   106  
   107  		err := mc.handleNotification(ctx, msg)
   108  		if err != nil && errors.Is(err, tisql.ErrDuplicate) {
   109  			// if the message is a duplicate ack it and move on
   110  			mc.Logger.Info("acking duplicate message", "path", msg.Attributes["objectId"])
   111  			msg.Ack()
   112  			return nil
   113  		}
   114  		if err != nil {
   115  			mc.Logger.Error(err, "encountered an error handling notification (SQL)", "path", msg.Attributes["objectId"])
   116  			msg.Nack()
   117  		}
   118  	}
   119  	// ack everything else
   120  	msg.Ack()
   121  
   122  	return nil
   123  }
   124  
   125  // handle a finished.json notification
   126  func (mc *MiddleChild) handleNotification(ctx context.Context, m *pubsub.Message) error {
   127  	if mc.db == nil {
   128  		return fmt.Errorf("failed to handle notification. sql handle is nil")
   129  	}
   130  	mcj := &Job{}
   131  	notif := &NotifData{}
   132  
   133  	// set the logger into the context
   134  	fog.IntoContext(ctx, mc.Logger)
   135  
   136  	// get the bucket and name from the notification
   137  	err := json.Unmarshal(m.Data, notif)
   138  	if err != nil {
   139  		return fmt.Errorf("failed to unmarshal the pubsub message: %w", err)
   140  	}
   141  
   142  	// create a new storage object
   143  	s, err := storage.New(ctx)
   144  	if err != nil {
   145  		return fmt.Errorf("failed to create storage client: %w", err)
   146  	}
   147  
   148  	b, err := s.NewBucket(ctx)
   149  	if err != nil {
   150  		return fmt.Errorf("failed to create bucket handler: %w", err)
   151  	}
   152  
   153  	// remove the filename from the notification name to get the bucket path
   154  	prefix := strings.ReplaceAll(notif.Name, "/"+storage.FinishedFilename, "")
   155  
   156  	// get all the files stored within the bucket path
   157  	mcj.files, err = b.RetrieveAllFiles(ctx, prefix)
   158  	if err != nil {
   159  		return fmt.Errorf("failed to fetch files: %w", err)
   160  	}
   161  
   162  	// unmarshal and set all the started data
   163  	err = mc.gatherStarted(mcj)
   164  	if err != nil {
   165  		return fmt.Errorf("failed to gather started data: %w", err)
   166  	}
   167  
   168  	// unmarshal and set all the finished data
   169  	err = mc.gatherFinished(mcj)
   170  	if err != nil {
   171  		return fmt.Errorf("failed to gather finished data: %w", err)
   172  	}
   173  
   174  	// unmarshal and set all the test data
   175  	err = mc.gatherTests(mcj)
   176  	if err != nil {
   177  		return fmt.Errorf("failed to gather test data: %w", err)
   178  	}
   179  
   180  	err = argoLogs(
   181  		ctx,
   182  		mcj,
   183  		store.NewCloudStorageFS(storage.DefaultBucket, nil),
   184  		store.NewCloudStorageFS(argoLogsBucket, nil),
   185  	)
   186  	if err != nil {
   187  		return fmt.Errorf("failed to fetch argo logs: %w", err)
   188  	}
   189  
   190  	// build and set the bucket path
   191  	mcj.JobData.Path = "gs://" + notif.Bucket + "/" + prefix
   192  
   193  	// attempt to insert the JobData and return
   194  	return mc.insert(ctx, mcj)
   195  }
   196  
   197  // gatherFinished unmarshals the finished file and set the dependent fields in JobData
   198  func (mc *MiddleChild) gatherFinished(mcj *Job) error {
   199  	finished := &job.FinishedJSON{}
   200  	err := json.Unmarshal(mcj.files.JSON["finished.json"], finished)
   201  	if err != nil {
   202  		return fmt.Errorf("failed to unmarshal finished.json: %w", err)
   203  	}
   204  
   205  	mcj.JobData.Passed = finished.Passed
   206  	mcj.JobData.Finished = time.Unix(finished.Timestamp, 0).UTC()
   207  	mcj.JobData.Elapsed = mcj.JobData.Finished.Sub(mcj.JobData.Started)
   208  	mcj.JobData.Number = finished.Metadata["number"]
   209  	mcj.JobData.Run = finished.Metadata["run_id"]
   210  	mcj.JobData.Job = finished.Metadata["job_name"]
   211  	mcj.JobData.Workflow = finished.Metadata["workflow_name"]
   212  
   213  	for key, value := range finished.Metadata {
   214  		mcj.Metadata = append(mcj.Metadata, model.EdgeJobMetadata{Key: key, Value: value})
   215  	}
   216  
   217  	return nil
   218  }
   219  
   220  // gatherStartedc unmarshals the started file and set the dependent fields in JobData
   221  func (mc *MiddleChild) gatherStarted(mcj *Job) error {
   222  	started := &job.StartedJSON{}
   223  	startedFile := mcj.files.JSON["started.json"]
   224  	if startedFile == nil {
   225  		// TODO(ss185994): skip for now
   226  		return nil
   227  	}
   228  	err := json.Unmarshal(startedFile, started)
   229  	if err != nil {
   230  		return fmt.Errorf("failed to unmarshal started.json: %w", err)
   231  	}
   232  
   233  	mcj.JobData.Started = time.Unix(started.Timestamp, 0).UTC()
   234  	mcj.JobData.Version = started.Version
   235  	mcj.JobData.Repos = started.Repo
   236  	mcj.JobData.RepoCommit = started.Commit
   237  	mcj.Metadata = append(mcj.Metadata, model.EdgeJobMetadata{Key: "pull", Value: started.Pull})
   238  
   239  	return nil
   240  }
   241  
   242  // gatherTests parses through all the test data and generates an entry into JobData
   243  func (mc *MiddleChild) gatherTests(mcj *Job) error {
   244  	tests := []model.EdgeJobTest{}
   245  
   246  	for n, v := range mcj.files.XML {
   247  		t, testCount, failCount, err := parseJUnit(v)
   248  		if err != nil {
   249  			return fmt.Errorf("failed to parse %s: %w", n, err)
   250  		}
   251  		tests = append(tests, t...)
   252  		mcj.JobData.TestsRun = mcj.JobData.TestsRun + testCount
   253  		mcj.JobData.TestsFailed = mcj.JobData.TestsFailed + failCount
   254  	}
   255  	mcj.Tests = tests
   256  
   257  	return nil
   258  }
   259  
   260  // insert attempts to insert the JobData struct into postgres
   261  func (mc *MiddleChild) insert(ctx context.Context, mcj *Job) error {
   262  	// TODO(ss185994): rewrite to do in single sql query instead of looping and setting FKeys
   263  
   264  	mc.Logger.Info("inserting job")
   265  	jobID, err := mc.db.InsertEdgeJob(ctx, mcj.JobData)
   266  	if err != nil {
   267  		return err
   268  	}
   269  
   270  	mc.Logger.Info("job id", "id", jobID.String())
   271  
   272  	for _, meta := range mcj.Metadata {
   273  		meta.EdgeJob = jobID
   274  		_, err := mc.db.InsertEdgeJobMetadata(ctx, meta)
   275  		if err != nil {
   276  			return err
   277  		}
   278  	}
   279  
   280  	for _, test := range mcj.Tests {
   281  		test.EdgeJob = jobID
   282  		_, err := mc.db.InsertEdgeJobTest(ctx, test)
   283  		if err != nil {
   284  			return err
   285  		}
   286  	}
   287  
   288  	// pretty print struct
   289  	b, err := json.MarshalIndent(mcj, "", "  ")
   290  	if err != nil {
   291  		return err
   292  	}
   293  	mc.Logger.Info("inserted job", "job", string(b))
   294  
   295  	return nil
   296  }
   297  
   298  // parseJUnit will add individual tests within a suite to a Test struct list
   299  func parseJUnit(t []byte) ([]model.EdgeJobTest, int, int, error) {
   300  	tests := []model.EdgeJobTest{}
   301  
   302  	// unmarshal the xml
   303  	suites, err := junit.Ingest(t)
   304  	if err != nil {
   305  		return tests, 0, 0, fmt.Errorf("failed to ingest junit data: %w", err)
   306  	}
   307  
   308  	numTests := 0
   309  	failedTests := 0
   310  
   311  	// go through the suites  and breakdown the tests into relevant info
   312  	for _, suite := range suites {
   313  		numTests = numTests + suite.Totals.Tests
   314  		failedTests = failedTests + suite.Totals.Failed
   315  		for _, test := range suite.Tests {
   316  			singletest := model.EdgeJobTest{}
   317  			singletest.Name = test.Name
   318  			singletest.Time = test.Duration
   319  			singletest.Failed = false
   320  			singletest.Suite = suite.Name
   321  			singletest.FailureText = ""
   322  			if test.Error != nil {
   323  				singletest.Failed = true
   324  				singletest.FailureText = fmt.Sprintf("%v", test.Error.Error())
   325  			}
   326  			tests = append(tests, singletest)
   327  		}
   328  	}
   329  	return tests, numTests, failedTests, nil
   330  }
   331  

View as plain text