...

Source file src/edge-infra.dev/pkg/edge/api/services/log_replay_service.go

Documentation: edge-infra.dev/pkg/edge/api/services

     1  package services
     2  
     3  import (
     4  	"context"
     5  	"database/sql"
     6  	"fmt"
     7  
     8  	"github.com/lib/pq"
     9  
    10  	"edge-infra.dev/pkg/edge/api/graph/model"
    11  	sqlquery "edge-infra.dev/pkg/edge/api/sql"
    12  	"edge-infra.dev/pkg/lib/edgeutils"
    13  )
    14  
    15  //go:generate mockgen -destination=../mocks/mock_log_replay_service.go -package=mocks edge-infra.dev/pkg/edge/api/services LogReplayService
    16  type LogReplayService interface {
    17  	GetLogReplay(ctx context.Context, logReplayID string) (*model.LogReplay, error)
    18  	GetLogReplays(ctx context.Context, clusterEdgeID string, unexecutedLogReplays *bool) ([]*model.LogReplay, error)
    19  	GetLogReplayJobs(ctx context.Context, clusterEdgeID string) ([]*model.LogReplayJob, error)
    20  	CreateLogReplay(ctx context.Context, clusterEdgeID string, inputPayload model.CreateLogReplayPayload) (bool, error)
    21  	UpdateLogReplay(ctx context.Context, logReplayID string, clusterEdgeID string, inputPayload model.UpdateLogReplayPayload) (bool, error)
    22  	DeleteLogReplay(ctx context.Context, logReplayID string) (bool, error)
    23  }
    24  
    25  type logReplayService struct {
    26  	SQLDB *sql.DB
    27  }
    28  
    29  func (l *logReplayService) GetLogReplay(ctx context.Context, logReplayID string) (*model.LogReplay, error) {
    30  	row := l.SQLDB.QueryRowContext(ctx, sqlquery.GetLogReplay, logReplayID)
    31  
    32  	lr := &model.LogReplay{}
    33  	lr.Namespaces = []string{}
    34  	if err := row.Scan(&lr.LogReplayID, &lr.ClusterEdgeID, pq.Array(&lr.Namespaces), &lr.LogLevel, &lr.StartTime, &lr.EndTime, &lr.Queued, &lr.Executed, &lr.Status, &lr.UpdatedAt); err != nil {
    35  		return nil, fmt.Errorf("error getting log replay: SQL error %w", err)
    36  	}
    37  	startFormatted, err := edgeutils.ConvertToRFC3339(lr.StartTime)
    38  	if err != nil {
    39  		return nil, fmt.Errorf("error getting log replay: %w", err)
    40  	}
    41  	endFormatted, err := edgeutils.ConvertToRFC3339(lr.EndTime)
    42  	if err != nil {
    43  		return nil, fmt.Errorf("error getting log replay: %w", err)
    44  	}
    45  	updatedFormatted, err := edgeutils.ConvertToRFC3339(lr.UpdatedAt)
    46  	if err != nil {
    47  		return nil, fmt.Errorf("error getting log replay: %w", err)
    48  	}
    49  	lr.StartTime = startFormatted
    50  	lr.EndTime = endFormatted
    51  	lr.UpdatedAt = updatedFormatted
    52  
    53  	return lr, nil
    54  }
    55  
    56  func (l *logReplayService) GetLogReplayJobs(ctx context.Context, clusterEdgeID string) ([]*model.LogReplayJob, error) {
    57  	rows, err := l.SQLDB.QueryContext(ctx, sqlquery.GetLogReplayJobs, clusterEdgeID)
    58  	if err != nil {
    59  		return nil, fmt.Errorf("error getting log replay jobs: SQL error %w", err)
    60  	}
    61  
    62  	var logReplays []*model.LogReplayJob
    63  	defer rows.Close()
    64  	for rows.Next() {
    65  		var lrj model.LogReplayJob
    66  
    67  		if err := rows.Scan(&lrj.Jsonpath, &lrj.Value, &lrj.Missing, &lrj.Name, &lrj.Queued, &lrj.Executed, &lrj.Status, &lrj.UpdatedAt, &lrj.LogReplayID); err != nil {
    68  			return nil, fmt.Errorf("error getting log replay jobs not executed: row scan error %w", err)
    69  		}
    70  		updatedFormatted, err := edgeutils.ConvertToRFC3339(lrj.UpdatedAt)
    71  		if err != nil {
    72  			return nil, fmt.Errorf("error getting log replay jobs: %w", err)
    73  		}
    74  		lrj.UpdatedAt = updatedFormatted
    75  
    76  		logReplays = append(logReplays, &lrj)
    77  	}
    78  
    79  	return logReplays, nil
    80  }
    81  
    82  func (l *logReplayService) GetLogReplays(ctx context.Context, clusterEdgeID string, unexecutedLogReplays *bool) ([]*model.LogReplay, error) {
    83  	if unexecutedLogReplays == nil || !*unexecutedLogReplays {
    84  		return l.getLogReplays(ctx, clusterEdgeID, sqlquery.GetLogReplays)
    85  	}
    86  	return l.getLogReplays(ctx, clusterEdgeID, sqlquery.GetLogReplaysNotExecuted)
    87  }
    88  
    89  func (l *logReplayService) DeleteLogReplay(ctx context.Context, logReplayID string) (bool, error) {
    90  	_, err := l.SQLDB.ExecContext(ctx, sqlquery.DeleteLogReplay, logReplayID)
    91  	if err != nil {
    92  		return false, fmt.Errorf("error deleting log replay: %w", err)
    93  	}
    94  	return true, nil
    95  }
    96  
    97  func (l *logReplayService) CreateLogReplay(ctx context.Context, clusterEdgeID string, clrp model.CreateLogReplayPayload) (bool, error) {
    98  	err := l.validateInput(clrp)
    99  	if err != nil {
   100  		return false, fmt.Errorf("error updating log replay: %w", err)
   101  	}
   102  	logLevel := clrp.LogLevel.String()
   103  
   104  	_, err = l.SQLDB.ExecContext(ctx, sqlquery.CreateLogReplay, clusterEdgeID, pq.Array(&clrp.Namespaces), logLevel, clrp.StartTime, clrp.EndTime, false, false, model.LogReplayStatusNotStarted.String())
   105  	if err != nil {
   106  		return false, fmt.Errorf("error creating log replay: %w", err)
   107  	}
   108  	return true, nil
   109  }
   110  
   111  func (l *logReplayService) UpdateLogReplay(ctx context.Context, logReplayID string, clusterEdgeID string, inputPayload model.UpdateLogReplayPayload) (bool, error) {
   112  	lr, err := l.GetLogReplay(ctx, logReplayID)
   113  	if err != nil {
   114  		return false, fmt.Errorf("error updating log replay: %w", err)
   115  	}
   116  
   117  	q := lr.Queued
   118  	e := lr.Executed
   119  	status := lr.Status
   120  
   121  	// We want to keep the UpdatedAt Status to be last true update to the status.
   122  	// If the flag didn't exist it would be updated every 3 mins by the reconcile loop
   123  	change := false
   124  
   125  	if inputPayload.Queued != nil && q != *inputPayload.Queued {
   126  		change = true
   127  		q = *inputPayload.Queued
   128  	}
   129  	if inputPayload.Executed != nil && e != *inputPayload.Executed {
   130  		change = true
   131  		e = *inputPayload.Executed
   132  	}
   133  	if inputPayload.Status != nil && status != inputPayload.Status.String() {
   134  		change = true
   135  		status = inputPayload.Status.String()
   136  	}
   137  	if change {
   138  		_, err = l.SQLDB.ExecContext(ctx, sqlquery.UpdateLogReplay, logReplayID, clusterEdgeID, q, e, status)
   139  		if err != nil {
   140  			return false, fmt.Errorf("error updating log replay: %w", err)
   141  		}
   142  	}
   143  	return change, nil
   144  }
   145  
   146  func (l logReplayService) validateInput(inputPayload model.CreateLogReplayPayload) error {
   147  	err := edgeutils.IsValidTimestamp(inputPayload.StartTime)
   148  	if err != nil {
   149  		return fmt.Errorf("error validating start time format: %w", err)
   150  	}
   151  
   152  	err = edgeutils.IsValidTimestamp(inputPayload.EndTime)
   153  	if err != nil {
   154  		return fmt.Errorf("error validating end time format: %w", err)
   155  	}
   156  
   157  	err = edgeutils.TimeSequenceCheck(inputPayload.StartTime, inputPayload.EndTime)
   158  	if err != nil {
   159  		return fmt.Errorf("error validating time orderings: %w", err)
   160  	}
   161  
   162  	return nil
   163  }
   164  
   165  func (l *logReplayService) getLogReplays(ctx context.Context, clusterEdgeID string, query string) ([]*model.LogReplay, error) {
   166  	row, err := l.SQLDB.QueryContext(ctx, query, clusterEdgeID)
   167  	if err != nil {
   168  		return nil, fmt.Errorf("error getting log replays: SQL error %w", err)
   169  	}
   170  
   171  	var logReplays []*model.LogReplay
   172  	defer row.Close()
   173  	for row.Next() {
   174  		var lr model.LogReplay
   175  		lr.Namespaces = []string{}
   176  
   177  		if err := row.Scan(&lr.LogReplayID, &lr.ClusterEdgeID, pq.Array(&lr.Namespaces), &lr.LogLevel, &lr.StartTime, &lr.EndTime, &lr.Queued, &lr.Executed, &lr.Status, &lr.UpdatedAt); err != nil {
   178  			return nil, fmt.Errorf("error getting log replays not executed: row scan error %w", err)
   179  		}
   180  
   181  		startFormatted, err := edgeutils.ConvertToRFC3339(lr.StartTime)
   182  		if err != nil {
   183  			return nil, fmt.Errorf("error getting log replay: %w", err)
   184  		}
   185  		endFormatted, err := edgeutils.ConvertToRFC3339(lr.EndTime)
   186  		if err != nil {
   187  			return nil, fmt.Errorf("error getting log replay: %w", err)
   188  		}
   189  		updatedFormatted, err := edgeutils.ConvertToRFC3339(lr.UpdatedAt)
   190  		if err != nil {
   191  			return nil, fmt.Errorf("error getting log replay: %w", err)
   192  		}
   193  		lr.StartTime = startFormatted
   194  		lr.EndTime = endFormatted
   195  		lr.UpdatedAt = updatedFormatted
   196  
   197  		logReplays = append(logReplays, &lr)
   198  	}
   199  
   200  	return logReplays, nil
   201  }
   202  
   203  // nolint
   204  func NewLogReplayService(sqlDB *sql.DB) *logReplayService {
   205  	return &logReplayService{
   206  		SQLDB: sqlDB,
   207  	}
   208  }
   209  

View as plain text