package logreplay import ( "bufio" "context" "errors" "fmt" "log/slog" "os" "strings" "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client/config" ) type Config struct { konfig *rest.Config logger *slog.Logger namespaces []string startTime metav1.Time endTime time.Time severity string logReplayID string } type Client struct { logger *slog.Logger cfg *Config client *kubernetes.Clientset } func NewConfig() (*Config, error) { jh := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr { // use "message" instead of "msg" for the output since "message" has a special meaning in GCP if a.Key == slog.MessageKey { a.Key = "message" } return a }, }) log := slog.New(jh) clientConfig, err := config.GetConfig() if err != nil { return nil, fmt.Errorf("error getting kube config: %w", err) } // namespaces will come in as a comma separated string of namespaces namespaces := os.Getenv("NAMESPACE") if namespaces == "" { log.Warn("namespace was not provided. logs will not be processed") return nil, errors.New("namespace was not provided") } ns := strings.Split(namespaces, ",") startTimeStr := os.Getenv("STARTTIME") endTimeStr := os.Getenv("ENDTIME") var startTime metav1.Time if startTimeStr == "" { startTime = metav1.NewTime(time.Now().Add(-1 * time.Hour)) } else { t, err := time.Parse(time.RFC3339, startTimeStr) if err != nil { log.Warn("start time string could not be parsed. defaulting to 1 hour", "time", startTimeStr, "reason", err) startTime = metav1.NewTime(time.Now().Add(-1 * time.Hour)) } else { startTime = metav1.NewTime(t) } } var endTime time.Time ts, err := time.Parse(time.RFC3339, endTimeStr) if err != nil { log.Warn("end time string could not be parsed. defaulting to current time", "endtime provided", endTimeStr, "reason", err) endTime = time.Now() } else { endTime = ts } cfg := &Config{ namespaces: ns, startTime: startTime, endTime: endTime, logReplayID: os.Getenv("LOG_REPLAY_ID"), severity: os.Getenv("SEVERITY"), logger: log, konfig: clientConfig, } return cfg, nil } func New(config *Config) (*Client, error) { log := config.logger log.Info("log replay config", "namespace", config.namespaces, "starttime", config.startTime, "endtime", config.endTime, "severity", config.severity, "log_replay_id", config.logReplayID, ) clientset, err := kubernetes.NewForConfig(config.konfig) if err != nil { return nil, fmt.Errorf("failed to create kube client: %w", err) } logreplay := &Client{ logger: log, client: clientset, cfg: config, } return logreplay, nil } func (l *Client) Run() error { log := l.logger log.Info("running log replay client") for _, ns := range l.cfg.namespaces { // get all pods for a given namespace pods, err := l.client.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{}) if err != nil { return fmt.Errorf("error listing pods for namespace %s: %w", ns, err) } // logOptions will be passed to the GetLogs function and is used to further restrict which pods to get logs from // see https://pkg.go.dev/k8s.io/api/core/v1#PodLogOptions logOptions := &corev1.PodLogOptions{ SinceTime: &l.cfg.startTime, Timestamps: true, } for _, p := range pods.Items { for _, c := range p.Status.ContainerStatuses { logOptions.Container = c.Name // we need to reconstruct the original fluent-bit tag and attach it to the logs so that they can be parsed by fluent-bit // tag := "k8s_container." + ns + "_" + p.Name + "_" + c.Name + "_" + c.ContainerID tag := createTag(ns, p.Name, c.Name, c.ContainerID) // get previous container logs logOptions.Previous = true if err := getLogs(l, ns, p, logOptions, tag); err != nil { return err } // get current container logs logOptions.Previous = false if err := getLogs(l, ns, p, logOptions, tag); err != nil { return err } } for _, c := range p.Status.InitContainerStatuses { logOptions.Container = c.Name tag := createTag(ns, p.Name, c.Name, c.ContainerID) logOptions.Previous = true if err := getLogs(l, ns, p, logOptions, tag); err != nil { return err } logOptions.Previous = false if err := getLogs(l, ns, p, logOptions, tag); err != nil { return err } } } } return nil } // getLogs prints logs for a container in a pod in the specified namespace. The container to get logs from // will be part of logOptions. A "tag" is passed in so we can print out the original tag that fluent-bit // assigned to this ns/pod/container combo. This tag needs to be printed so fluent-bit can later use the // "rewrite tag" filter so that the log message gets sent to the cloud with the correct ns/pod/container combo // rather than the ns/pod/container for this go code func getLogs(l *Client, ns string, pod corev1.Pod, logOptions *corev1.PodLogOptions, tag string) error { log := l.logger log.Info("replay logs request", "namespace", ns, "pod", pod.Name, "options", logOptions, "endtime", l.cfg.endTime) req := l.client.CoreV1().Pods(ns).GetLogs(pod.Name, logOptions) logs, err := req.Stream(context.Background()) if err != nil { log.Warn("could not stream logs", "namespace", ns, "pod", pod.Name, "container", logOptions, "err", err) return nil } defer logs.Close() var logsFound = false scanner := bufio.NewScanner(logs) for scanner.Scan() { logsFound = true logLine := scanner.Text() parts := strings.SplitN(logLine, " ", 2) timestamp := parts[0] var origTime time.Time if parsedTime, err := time.Parse(time.RFC3339Nano, timestamp); err == nil { origTime = parsedTime } // only print the log if origTime is less than or equal to endTime if origTime.Before(l.cfg.endTime) || origTime.Equal(l.cfg.endTime) { // create a timestamp matching the format expected by fluent-bit // ref: https://docs.fluentbit.io/manual/v/dev-2.2/concepts/key-concepts#timestamp ts := fmt.Sprintf("%d.%09d", origTime.Unix(), origTime.Nanosecond()) logMessage := parts[1] log.Info("logreplay", "original-namespace", ns, "original-pod", pod.Name, "replay-options", logOptions, "original-tag", tag, "original-timestamp", timestamp, "message", logMessage, "original-log", logMessage, "original-time", ts, "log_replay_id", l.cfg.logReplayID, "replay_severity", l.cfg.severity) } } if !logsFound { log.Info("no logs were found", "namespace", ns, "pod", pod.Name, "options", logOptions) } return nil } func createTag(namespace string, podName string, containerName string, containerID string) string { // containerId comes in as "containerd://d61203cf790b8afb281c4816cd54723c4f7138266ad03be8577c43f2e1109203" // and we only want the last part of it, so we split on "://" parts := strings.Split(containerID, "/") return "k8s_container." + namespace + "_" + podName + "_" + containerName + "_" + parts[len(parts)-1] }