1 package services
2
3 import (
4 "context"
5 "database/sql"
6 "fmt"
7
8 "github.com/lib/pq"
9
10 "edge-infra.dev/pkg/edge/api/graph/model"
11 sqlquery "edge-infra.dev/pkg/edge/api/sql"
12 "edge-infra.dev/pkg/lib/edgeutils"
13 )
14
15
16 type LogReplayService interface {
17 GetLogReplay(ctx context.Context, logReplayID string) (*model.LogReplay, error)
18 GetLogReplays(ctx context.Context, clusterEdgeID string, unexecutedLogReplays *bool) ([]*model.LogReplay, error)
19 GetLogReplayJobs(ctx context.Context, clusterEdgeID string) ([]*model.LogReplayJob, error)
20 CreateLogReplay(ctx context.Context, clusterEdgeID string, inputPayload model.CreateLogReplayPayload) (bool, error)
21 UpdateLogReplay(ctx context.Context, logReplayID string, clusterEdgeID string, inputPayload model.UpdateLogReplayPayload) (bool, error)
22 DeleteLogReplay(ctx context.Context, logReplayID string) (bool, error)
23 }
24
25 type logReplayService struct {
26 SQLDB *sql.DB
27 }
28
29 func (l *logReplayService) GetLogReplay(ctx context.Context, logReplayID string) (*model.LogReplay, error) {
30 row := l.SQLDB.QueryRowContext(ctx, sqlquery.GetLogReplay, logReplayID)
31
32 lr := &model.LogReplay{}
33 lr.Namespaces = []string{}
34 if err := row.Scan(&lr.LogReplayID, &lr.ClusterEdgeID, pq.Array(&lr.Namespaces), &lr.LogLevel, &lr.StartTime, &lr.EndTime, &lr.Queued, &lr.Executed, &lr.Status, &lr.UpdatedAt); err != nil {
35 return nil, fmt.Errorf("error getting log replay: SQL error %w", err)
36 }
37 startFormatted, err := edgeutils.ConvertToRFC3339(lr.StartTime)
38 if err != nil {
39 return nil, fmt.Errorf("error getting log replay: %w", err)
40 }
41 endFormatted, err := edgeutils.ConvertToRFC3339(lr.EndTime)
42 if err != nil {
43 return nil, fmt.Errorf("error getting log replay: %w", err)
44 }
45 updatedFormatted, err := edgeutils.ConvertToRFC3339(lr.UpdatedAt)
46 if err != nil {
47 return nil, fmt.Errorf("error getting log replay: %w", err)
48 }
49 lr.StartTime = startFormatted
50 lr.EndTime = endFormatted
51 lr.UpdatedAt = updatedFormatted
52
53 return lr, nil
54 }
55
56 func (l *logReplayService) GetLogReplayJobs(ctx context.Context, clusterEdgeID string) ([]*model.LogReplayJob, error) {
57 rows, err := l.SQLDB.QueryContext(ctx, sqlquery.GetLogReplayJobs, clusterEdgeID)
58 if err != nil {
59 return nil, fmt.Errorf("error getting log replay jobs: SQL error %w", err)
60 }
61
62 var logReplays []*model.LogReplayJob
63 defer rows.Close()
64 for rows.Next() {
65 var lrj model.LogReplayJob
66
67 if err := rows.Scan(&lrj.Jsonpath, &lrj.Value, &lrj.Missing, &lrj.Name, &lrj.Queued, &lrj.Executed, &lrj.Status, &lrj.UpdatedAt, &lrj.LogReplayID); err != nil {
68 return nil, fmt.Errorf("error getting log replay jobs not executed: row scan error %w", err)
69 }
70 updatedFormatted, err := edgeutils.ConvertToRFC3339(lrj.UpdatedAt)
71 if err != nil {
72 return nil, fmt.Errorf("error getting log replay jobs: %w", err)
73 }
74 lrj.UpdatedAt = updatedFormatted
75
76 logReplays = append(logReplays, &lrj)
77 }
78
79 return logReplays, nil
80 }
81
82 func (l *logReplayService) GetLogReplays(ctx context.Context, clusterEdgeID string, unexecutedLogReplays *bool) ([]*model.LogReplay, error) {
83 if unexecutedLogReplays == nil || !*unexecutedLogReplays {
84 return l.getLogReplays(ctx, clusterEdgeID, sqlquery.GetLogReplays)
85 }
86 return l.getLogReplays(ctx, clusterEdgeID, sqlquery.GetLogReplaysNotExecuted)
87 }
88
89 func (l *logReplayService) DeleteLogReplay(ctx context.Context, logReplayID string) (bool, error) {
90 _, err := l.SQLDB.ExecContext(ctx, sqlquery.DeleteLogReplay, logReplayID)
91 if err != nil {
92 return false, fmt.Errorf("error deleting log replay: %w", err)
93 }
94 return true, nil
95 }
96
97 func (l *logReplayService) CreateLogReplay(ctx context.Context, clusterEdgeID string, clrp model.CreateLogReplayPayload) (bool, error) {
98 err := l.validateInput(clrp)
99 if err != nil {
100 return false, fmt.Errorf("error updating log replay: %w", err)
101 }
102 logLevel := clrp.LogLevel.String()
103
104 _, err = l.SQLDB.ExecContext(ctx, sqlquery.CreateLogReplay, clusterEdgeID, pq.Array(&clrp.Namespaces), logLevel, clrp.StartTime, clrp.EndTime, false, false, model.LogReplayStatusNotStarted.String())
105 if err != nil {
106 return false, fmt.Errorf("error creating log replay: %w", err)
107 }
108 return true, nil
109 }
110
111 func (l *logReplayService) UpdateLogReplay(ctx context.Context, logReplayID string, clusterEdgeID string, inputPayload model.UpdateLogReplayPayload) (bool, error) {
112 lr, err := l.GetLogReplay(ctx, logReplayID)
113 if err != nil {
114 return false, fmt.Errorf("error updating log replay: %w", err)
115 }
116
117 q := lr.Queued
118 e := lr.Executed
119 status := lr.Status
120
121
122
123 change := false
124
125 if inputPayload.Queued != nil && q != *inputPayload.Queued {
126 change = true
127 q = *inputPayload.Queued
128 }
129 if inputPayload.Executed != nil && e != *inputPayload.Executed {
130 change = true
131 e = *inputPayload.Executed
132 }
133 if inputPayload.Status != nil && status != inputPayload.Status.String() {
134 change = true
135 status = inputPayload.Status.String()
136 }
137 if change {
138 _, err = l.SQLDB.ExecContext(ctx, sqlquery.UpdateLogReplay, logReplayID, clusterEdgeID, q, e, status)
139 if err != nil {
140 return false, fmt.Errorf("error updating log replay: %w", err)
141 }
142 }
143 return change, nil
144 }
145
146 func (l logReplayService) validateInput(inputPayload model.CreateLogReplayPayload) error {
147 err := edgeutils.IsValidTimestamp(inputPayload.StartTime)
148 if err != nil {
149 return fmt.Errorf("error validating start time format: %w", err)
150 }
151
152 err = edgeutils.IsValidTimestamp(inputPayload.EndTime)
153 if err != nil {
154 return fmt.Errorf("error validating end time format: %w", err)
155 }
156
157 err = edgeutils.TimeSequenceCheck(inputPayload.StartTime, inputPayload.EndTime)
158 if err != nil {
159 return fmt.Errorf("error validating time orderings: %w", err)
160 }
161
162 return nil
163 }
164
165 func (l *logReplayService) getLogReplays(ctx context.Context, clusterEdgeID string, query string) ([]*model.LogReplay, error) {
166 row, err := l.SQLDB.QueryContext(ctx, query, clusterEdgeID)
167 if err != nil {
168 return nil, fmt.Errorf("error getting log replays: SQL error %w", err)
169 }
170
171 var logReplays []*model.LogReplay
172 defer row.Close()
173 for row.Next() {
174 var lr model.LogReplay
175 lr.Namespaces = []string{}
176
177 if err := row.Scan(&lr.LogReplayID, &lr.ClusterEdgeID, pq.Array(&lr.Namespaces), &lr.LogLevel, &lr.StartTime, &lr.EndTime, &lr.Queued, &lr.Executed, &lr.Status, &lr.UpdatedAt); err != nil {
178 return nil, fmt.Errorf("error getting log replays not executed: row scan error %w", err)
179 }
180
181 startFormatted, err := edgeutils.ConvertToRFC3339(lr.StartTime)
182 if err != nil {
183 return nil, fmt.Errorf("error getting log replay: %w", err)
184 }
185 endFormatted, err := edgeutils.ConvertToRFC3339(lr.EndTime)
186 if err != nil {
187 return nil, fmt.Errorf("error getting log replay: %w", err)
188 }
189 updatedFormatted, err := edgeutils.ConvertToRFC3339(lr.UpdatedAt)
190 if err != nil {
191 return nil, fmt.Errorf("error getting log replay: %w", err)
192 }
193 lr.StartTime = startFormatted
194 lr.EndTime = endFormatted
195 lr.UpdatedAt = updatedFormatted
196
197 logReplays = append(logReplays, &lr)
198 }
199
200 return logReplays, nil
201 }
202
203
204 func NewLogReplayService(sqlDB *sql.DB) *logReplayService {
205 return &logReplayService{
206 SQLDB: sqlDB,
207 }
208 }
209
View as plain text