1 package middlechild
2
3 import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "strings"
9 "time"
10
11 "cloud.google.com/go/pubsub"
12 store "github.com/Shopify/go-storage"
13 "github.com/gin-gonic/gin"
14 "github.com/go-logr/logr"
15 "github.com/joshdk/go-junit"
16
17 "edge-infra.dev/pkg/f8n/devinfra/gcp/job/storage"
18 "edge-infra.dev/pkg/f8n/devinfra/job"
19 "edge-infra.dev/pkg/f8n/devinfra/testinfra/model"
20 tisql "edge-infra.dev/pkg/f8n/devinfra/testinfra/sql"
21 "edge-infra.dev/pkg/lib/fog"
22 "edge-infra.dev/pkg/lib/runtime/manager"
23 "edge-infra.dev/pkg/lib/runtime/subscriber"
24 )
25
26 type MiddleChild struct {
27 Logger logr.Logger
28 db *tisql.DBHandle
29 }
30
31 type Job struct {
32 JobData model.EdgeJob
33 Metadata []model.EdgeJobMetadata
34 Tests []model.EdgeJobTest
35
36 files storage.Files
37 StartedTime int64
38 FinishedTime int64
39 }
40
41 type NotifData struct {
42 Name string `json:"name"`
43 Bucket string `json:"bucket"`
44 }
45
46 const (
47 defaultSubscription = "edge-job-storage-subscription"
48 defaultProjectID = "ret-edge-pltf-infra"
49 )
50
51 func init() {
52 gin.SetMode(gin.ReleaseMode)
53 }
54
55 func New() (*MiddleChild, error) {
56 log := fog.New().WithName("middlechild")
57
58 _db, err := tisql.FromEnv()
59 if err != nil {
60 log.Error(err, "failed to initialize sql conn")
61 return nil, err
62 }
63
64 return &MiddleChild{Logger: log, db: _db}, nil
65 }
66
67
68
69 func (mc *MiddleChild) Start() error {
70 ctx := context.Background()
71 mc.Logger.Info("monitoring for notifications")
72
73 mgr, err := manager.New(manager.Options{HealthProbeBindAddress: ":8080"})
74 if err != nil {
75 mc.Logger.Error(err, "failed to create a new manager")
76 return err
77 }
78
79 sub, err := subscriber.New(subscriber.Options{
80 ProjectID: defaultProjectID,
81 ID: defaultSubscription,
82 Handler: mc,
83 })
84 if err != nil {
85 mc.Logger.Error(err, "failed to create a new subscriber")
86 return err
87 }
88
89 if err := mgr.Add(sub); err != nil {
90 mc.Logger.Error(err, "failed to add pubsub sub")
91 return err
92 }
93
94 if err := mgr.Start(ctx); err != nil {
95 mc.Logger.Error(err, "middlechild errored")
96 return err
97 }
98
99 return nil
100 }
101
102 func (mc *MiddleChild) HandleMsg(ctx context.Context, msg *pubsub.Message) error {
103
104 if strings.Contains(msg.Attributes["objectId"], "/"+storage.FinishedFilename) {
105 mc.Logger.Info("handling a notification", "message", msg)
106
107 err := mc.handleNotification(ctx, msg)
108 if err != nil && errors.Is(err, tisql.ErrDuplicate) {
109
110 mc.Logger.Info("acking duplicate message", "path", msg.Attributes["objectId"])
111 msg.Ack()
112 return nil
113 }
114 if err != nil {
115 mc.Logger.Error(err, "encountered an error handling notification (SQL)", "path", msg.Attributes["objectId"])
116 msg.Nack()
117 }
118 }
119
120 msg.Ack()
121
122 return nil
123 }
124
125
126 func (mc *MiddleChild) handleNotification(ctx context.Context, m *pubsub.Message) error {
127 if mc.db == nil {
128 return fmt.Errorf("failed to handle notification. sql handle is nil")
129 }
130 mcj := &Job{}
131 notif := &NotifData{}
132
133
134 fog.IntoContext(ctx, mc.Logger)
135
136
137 err := json.Unmarshal(m.Data, notif)
138 if err != nil {
139 return fmt.Errorf("failed to unmarshal the pubsub message: %w", err)
140 }
141
142
143 s, err := storage.New(ctx)
144 if err != nil {
145 return fmt.Errorf("failed to create storage client: %w", err)
146 }
147
148 b, err := s.NewBucket(ctx)
149 if err != nil {
150 return fmt.Errorf("failed to create bucket handler: %w", err)
151 }
152
153
154 prefix := strings.ReplaceAll(notif.Name, "/"+storage.FinishedFilename, "")
155
156
157 mcj.files, err = b.RetrieveAllFiles(ctx, prefix)
158 if err != nil {
159 return fmt.Errorf("failed to fetch files: %w", err)
160 }
161
162
163 err = mc.gatherStarted(mcj)
164 if err != nil {
165 return fmt.Errorf("failed to gather started data: %w", err)
166 }
167
168
169 err = mc.gatherFinished(mcj)
170 if err != nil {
171 return fmt.Errorf("failed to gather finished data: %w", err)
172 }
173
174
175 err = mc.gatherTests(mcj)
176 if err != nil {
177 return fmt.Errorf("failed to gather test data: %w", err)
178 }
179
180 err = argoLogs(
181 ctx,
182 mcj,
183 store.NewCloudStorageFS(storage.DefaultBucket, nil),
184 store.NewCloudStorageFS(argoLogsBucket, nil),
185 )
186 if err != nil {
187 return fmt.Errorf("failed to fetch argo logs: %w", err)
188 }
189
190
191 mcj.JobData.Path = "gs://" + notif.Bucket + "/" + prefix
192
193
194 return mc.insert(ctx, mcj)
195 }
196
197
198 func (mc *MiddleChild) gatherFinished(mcj *Job) error {
199 finished := &job.FinishedJSON{}
200 err := json.Unmarshal(mcj.files.JSON["finished.json"], finished)
201 if err != nil {
202 return fmt.Errorf("failed to unmarshal finished.json: %w", err)
203 }
204
205 mcj.JobData.Passed = finished.Passed
206 mcj.JobData.Finished = time.Unix(finished.Timestamp, 0).UTC()
207 mcj.JobData.Elapsed = mcj.JobData.Finished.Sub(mcj.JobData.Started)
208 mcj.JobData.Number = finished.Metadata["number"]
209 mcj.JobData.Run = finished.Metadata["run_id"]
210 mcj.JobData.Job = finished.Metadata["job_name"]
211 mcj.JobData.Workflow = finished.Metadata["workflow_name"]
212
213 for key, value := range finished.Metadata {
214 mcj.Metadata = append(mcj.Metadata, model.EdgeJobMetadata{Key: key, Value: value})
215 }
216
217 return nil
218 }
219
220
221 func (mc *MiddleChild) gatherStarted(mcj *Job) error {
222 started := &job.StartedJSON{}
223 startedFile := mcj.files.JSON["started.json"]
224 if startedFile == nil {
225
226 return nil
227 }
228 err := json.Unmarshal(startedFile, started)
229 if err != nil {
230 return fmt.Errorf("failed to unmarshal started.json: %w", err)
231 }
232
233 mcj.JobData.Started = time.Unix(started.Timestamp, 0).UTC()
234 mcj.JobData.Version = started.Version
235 mcj.JobData.Repos = started.Repo
236 mcj.JobData.RepoCommit = started.Commit
237 mcj.Metadata = append(mcj.Metadata, model.EdgeJobMetadata{Key: "pull", Value: started.Pull})
238
239 return nil
240 }
241
242
243 func (mc *MiddleChild) gatherTests(mcj *Job) error {
244 tests := []model.EdgeJobTest{}
245
246 for n, v := range mcj.files.XML {
247 t, testCount, failCount, err := parseJUnit(v)
248 if err != nil {
249 return fmt.Errorf("failed to parse %s: %w", n, err)
250 }
251 tests = append(tests, t...)
252 mcj.JobData.TestsRun = mcj.JobData.TestsRun + testCount
253 mcj.JobData.TestsFailed = mcj.JobData.TestsFailed + failCount
254 }
255 mcj.Tests = tests
256
257 return nil
258 }
259
260
261 func (mc *MiddleChild) insert(ctx context.Context, mcj *Job) error {
262
263
264 mc.Logger.Info("inserting job")
265 jobID, err := mc.db.InsertEdgeJob(ctx, mcj.JobData)
266 if err != nil {
267 return err
268 }
269
270 mc.Logger.Info("job id", "id", jobID.String())
271
272 for _, meta := range mcj.Metadata {
273 meta.EdgeJob = jobID
274 _, err := mc.db.InsertEdgeJobMetadata(ctx, meta)
275 if err != nil {
276 return err
277 }
278 }
279
280 for _, test := range mcj.Tests {
281 test.EdgeJob = jobID
282 _, err := mc.db.InsertEdgeJobTest(ctx, test)
283 if err != nil {
284 return err
285 }
286 }
287
288
289 b, err := json.MarshalIndent(mcj, "", " ")
290 if err != nil {
291 return err
292 }
293 mc.Logger.Info("inserted job", "job", string(b))
294
295 return nil
296 }
297
298
299 func parseJUnit(t []byte) ([]model.EdgeJobTest, int, int, error) {
300 tests := []model.EdgeJobTest{}
301
302
303 suites, err := junit.Ingest(t)
304 if err != nil {
305 return tests, 0, 0, fmt.Errorf("failed to ingest junit data: %w", err)
306 }
307
308 numTests := 0
309 failedTests := 0
310
311
312 for _, suite := range suites {
313 numTests = numTests + suite.Totals.Tests
314 failedTests = failedTests + suite.Totals.Failed
315 for _, test := range suite.Tests {
316 singletest := model.EdgeJobTest{}
317 singletest.Name = test.Name
318 singletest.Time = test.Duration
319 singletest.Failed = false
320 singletest.Suite = suite.Name
321 singletest.FailureText = ""
322 if test.Error != nil {
323 singletest.Failed = true
324 singletest.FailureText = fmt.Sprintf("%v", test.Error.Error())
325 }
326 tests = append(tests, singletest)
327 }
328 }
329 return tests, numTests, failedTests, nil
330 }
331
View as plain text