...

Source file src/edge-infra.dev/pkg/edge/logging/logreplay/replay.go

Documentation: edge-infra.dev/pkg/edge/logging/logreplay

     1  package logreplay
     2  
     3  import (
     4  	"bufio"
     5  	"context"
     6  	"errors"
     7  	"fmt"
     8  	"log/slog"
     9  	"os"
    10  	"strings"
    11  	"time"
    12  
    13  	corev1 "k8s.io/api/core/v1"
    14  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    15  	"k8s.io/client-go/kubernetes"
    16  	"k8s.io/client-go/rest"
    17  	"sigs.k8s.io/controller-runtime/pkg/client/config"
    18  )
    19  
    20  type Config struct {
    21  	konfig      *rest.Config
    22  	logger      *slog.Logger
    23  	namespaces  []string
    24  	startTime   metav1.Time
    25  	endTime     time.Time
    26  	severity    string
    27  	logReplayID string
    28  }
    29  
    30  type Client struct {
    31  	logger *slog.Logger
    32  	cfg    *Config
    33  	client *kubernetes.Clientset
    34  }
    35  
    36  func NewConfig() (*Config, error) {
    37  	jh := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
    38  		ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
    39  			// use "message" instead of "msg" for the output since "message" has a special meaning in GCP
    40  			if a.Key == slog.MessageKey {
    41  				a.Key = "message"
    42  			}
    43  			return a
    44  		},
    45  	})
    46  
    47  	log := slog.New(jh)
    48  	clientConfig, err := config.GetConfig()
    49  	if err != nil {
    50  		return nil, fmt.Errorf("error getting kube config: %w", err)
    51  	}
    52  
    53  	// namespaces will come in as a comma separated string of namespaces
    54  	namespaces := os.Getenv("NAMESPACE")
    55  	if namespaces == "" {
    56  		log.Warn("namespace was not provided. logs will not be processed")
    57  		return nil, errors.New("namespace was not provided")
    58  	}
    59  	ns := strings.Split(namespaces, ",")
    60  
    61  	startTimeStr := os.Getenv("STARTTIME")
    62  	endTimeStr := os.Getenv("ENDTIME")
    63  
    64  	var startTime metav1.Time
    65  	if startTimeStr == "" {
    66  		startTime = metav1.NewTime(time.Now().Add(-1 * time.Hour))
    67  	} else {
    68  		t, err := time.Parse(time.RFC3339, startTimeStr)
    69  		if err != nil {
    70  			log.Warn("start time string could not be parsed. defaulting to 1 hour", "time", startTimeStr, "reason", err)
    71  			startTime = metav1.NewTime(time.Now().Add(-1 * time.Hour))
    72  		} else {
    73  			startTime = metav1.NewTime(t)
    74  		}
    75  	}
    76  
    77  	var endTime time.Time
    78  	ts, err := time.Parse(time.RFC3339, endTimeStr)
    79  	if err != nil {
    80  		log.Warn("end time string could not be parsed. defaulting to current time", "endtime provided", endTimeStr, "reason", err)
    81  		endTime = time.Now()
    82  	} else {
    83  		endTime = ts
    84  	}
    85  
    86  	cfg := &Config{
    87  		namespaces:  ns,
    88  		startTime:   startTime,
    89  		endTime:     endTime,
    90  		logReplayID: os.Getenv("LOG_REPLAY_ID"),
    91  		severity:    os.Getenv("SEVERITY"),
    92  		logger:      log,
    93  		konfig:      clientConfig,
    94  	}
    95  
    96  	return cfg, nil
    97  }
    98  
    99  func New(config *Config) (*Client, error) {
   100  	log := config.logger
   101  
   102  	log.Info("log replay config",
   103  		"namespace", config.namespaces,
   104  		"starttime", config.startTime,
   105  		"endtime", config.endTime,
   106  		"severity", config.severity,
   107  		"log_replay_id", config.logReplayID,
   108  	)
   109  
   110  	clientset, err := kubernetes.NewForConfig(config.konfig)
   111  	if err != nil {
   112  		return nil, fmt.Errorf("failed to create kube client: %w", err)
   113  	}
   114  
   115  	logreplay := &Client{
   116  		logger: log,
   117  		client: clientset,
   118  		cfg:    config,
   119  	}
   120  
   121  	return logreplay, nil
   122  }
   123  
   124  func (l *Client) Run() error {
   125  	log := l.logger
   126  	log.Info("running log replay client")
   127  	for _, ns := range l.cfg.namespaces {
   128  		// get all pods for a given namespace
   129  		pods, err := l.client.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
   130  		if err != nil {
   131  			return fmt.Errorf("error listing pods for namespace %s: %w", ns, err)
   132  		}
   133  
   134  		// logOptions will be passed to the GetLogs function and is used to further restrict which pods to get logs from
   135  		// see https://pkg.go.dev/k8s.io/api/core/v1#PodLogOptions
   136  		logOptions := &corev1.PodLogOptions{
   137  			SinceTime:  &l.cfg.startTime,
   138  			Timestamps: true,
   139  		}
   140  
   141  		for _, p := range pods.Items {
   142  			for _, c := range p.Status.ContainerStatuses {
   143  				logOptions.Container = c.Name
   144  				// we need to reconstruct the original fluent-bit tag and attach it to the logs so that they can be parsed by fluent-bit
   145  				// tag := "k8s_container." + ns + "_" + p.Name + "_" + c.Name + "_" + c.ContainerID
   146  				tag := createTag(ns, p.Name, c.Name, c.ContainerID)
   147  
   148  				// get previous container logs
   149  				logOptions.Previous = true
   150  				if err := getLogs(l, ns, p, logOptions, tag); err != nil {
   151  					return err
   152  				}
   153  
   154  				// get current container logs
   155  				logOptions.Previous = false
   156  				if err := getLogs(l, ns, p, logOptions, tag); err != nil {
   157  					return err
   158  				}
   159  			}
   160  			for _, c := range p.Status.InitContainerStatuses {
   161  				logOptions.Container = c.Name
   162  				tag := createTag(ns, p.Name, c.Name, c.ContainerID)
   163  
   164  				logOptions.Previous = true
   165  				if err := getLogs(l, ns, p, logOptions, tag); err != nil {
   166  					return err
   167  				}
   168  
   169  				logOptions.Previous = false
   170  				if err := getLogs(l, ns, p, logOptions, tag); err != nil {
   171  					return err
   172  				}
   173  			}
   174  		}
   175  	}
   176  
   177  	return nil
   178  }
   179  
   180  // getLogs prints logs for a container in a pod in the specified namespace. The container to get logs from
   181  // will be part of logOptions. A "tag" is passed in so we can print out the original tag that fluent-bit
   182  // assigned to this ns/pod/container combo. This tag needs to be printed so fluent-bit can later use the
   183  // "rewrite tag" filter so that the log message gets sent to the cloud with the correct ns/pod/container combo
   184  // rather than the ns/pod/container for this go code
   185  func getLogs(l *Client, ns string, pod corev1.Pod, logOptions *corev1.PodLogOptions, tag string) error {
   186  	log := l.logger
   187  	log.Info("replay logs request", "namespace", ns, "pod", pod.Name, "options", logOptions, "endtime", l.cfg.endTime)
   188  
   189  	req := l.client.CoreV1().Pods(ns).GetLogs(pod.Name, logOptions)
   190  	logs, err := req.Stream(context.Background())
   191  	if err != nil {
   192  		log.Warn("could not stream logs", "namespace", ns, "pod", pod.Name, "container", logOptions, "err", err)
   193  		return nil
   194  	}
   195  	defer logs.Close()
   196  
   197  	var logsFound = false
   198  	scanner := bufio.NewScanner(logs)
   199  	for scanner.Scan() {
   200  		logsFound = true
   201  		logLine := scanner.Text()
   202  		parts := strings.SplitN(logLine, " ", 2)
   203  		timestamp := parts[0]
   204  		var origTime time.Time
   205  		if parsedTime, err := time.Parse(time.RFC3339Nano, timestamp); err == nil {
   206  			origTime = parsedTime
   207  		}
   208  
   209  		// only print the log if origTime is less than or equal to endTime
   210  		if origTime.Before(l.cfg.endTime) || origTime.Equal(l.cfg.endTime) {
   211  			// create a timestamp matching the format expected by fluent-bit
   212  			// ref: https://docs.fluentbit.io/manual/v/dev-2.2/concepts/key-concepts#timestamp
   213  			ts := fmt.Sprintf("%d.%09d", origTime.Unix(), origTime.Nanosecond())
   214  			logMessage := parts[1]
   215  			log.Info("logreplay", "original-namespace", ns, "original-pod", pod.Name, "replay-options", logOptions,
   216  				"original-tag", tag, "original-timestamp", timestamp, "message", logMessage, "original-log", logMessage,
   217  				"original-time", ts, "log_replay_id", l.cfg.logReplayID, "replay_severity", l.cfg.severity)
   218  		}
   219  	}
   220  	if !logsFound {
   221  		log.Info("no logs were found", "namespace", ns, "pod", pod.Name, "options", logOptions)
   222  	}
   223  
   224  	return nil
   225  }
   226  
   227  func createTag(namespace string, podName string, containerName string, containerID string) string {
   228  	// containerId comes in as "containerd://d61203cf790b8afb281c4816cd54723c4f7138266ad03be8577c43f2e1109203"
   229  	// and we only want the last part of it, so we split on "://"
   230  	parts := strings.Split(containerID, "/")
   231  	return "k8s_container." + namespace + "_" + podName + "_" + containerName + "_" + parts[len(parts)-1]
   232  }
   233  

View as plain text