1 package logreplay
2
3 import (
4 "bufio"
5 "context"
6 "errors"
7 "fmt"
8 "log/slog"
9 "os"
10 "strings"
11 "time"
12
13 corev1 "k8s.io/api/core/v1"
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/client-go/kubernetes"
16 "k8s.io/client-go/rest"
17 "sigs.k8s.io/controller-runtime/pkg/client/config"
18 )
19
20 type Config struct {
21 konfig *rest.Config
22 logger *slog.Logger
23 namespaces []string
24 startTime metav1.Time
25 endTime time.Time
26 severity string
27 logReplayID string
28 }
29
30 type Client struct {
31 logger *slog.Logger
32 cfg *Config
33 client *kubernetes.Clientset
34 }
35
36 func NewConfig() (*Config, error) {
37 jh := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
38 ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
39
40 if a.Key == slog.MessageKey {
41 a.Key = "message"
42 }
43 return a
44 },
45 })
46
47 log := slog.New(jh)
48 clientConfig, err := config.GetConfig()
49 if err != nil {
50 return nil, fmt.Errorf("error getting kube config: %w", err)
51 }
52
53
54 namespaces := os.Getenv("NAMESPACE")
55 if namespaces == "" {
56 log.Warn("namespace was not provided. logs will not be processed")
57 return nil, errors.New("namespace was not provided")
58 }
59 ns := strings.Split(namespaces, ",")
60
61 startTimeStr := os.Getenv("STARTTIME")
62 endTimeStr := os.Getenv("ENDTIME")
63
64 var startTime metav1.Time
65 if startTimeStr == "" {
66 startTime = metav1.NewTime(time.Now().Add(-1 * time.Hour))
67 } else {
68 t, err := time.Parse(time.RFC3339, startTimeStr)
69 if err != nil {
70 log.Warn("start time string could not be parsed. defaulting to 1 hour", "time", startTimeStr, "reason", err)
71 startTime = metav1.NewTime(time.Now().Add(-1 * time.Hour))
72 } else {
73 startTime = metav1.NewTime(t)
74 }
75 }
76
77 var endTime time.Time
78 ts, err := time.Parse(time.RFC3339, endTimeStr)
79 if err != nil {
80 log.Warn("end time string could not be parsed. defaulting to current time", "endtime provided", endTimeStr, "reason", err)
81 endTime = time.Now()
82 } else {
83 endTime = ts
84 }
85
86 cfg := &Config{
87 namespaces: ns,
88 startTime: startTime,
89 endTime: endTime,
90 logReplayID: os.Getenv("LOG_REPLAY_ID"),
91 severity: os.Getenv("SEVERITY"),
92 logger: log,
93 konfig: clientConfig,
94 }
95
96 return cfg, nil
97 }
98
99 func New(config *Config) (*Client, error) {
100 log := config.logger
101
102 log.Info("log replay config",
103 "namespace", config.namespaces,
104 "starttime", config.startTime,
105 "endtime", config.endTime,
106 "severity", config.severity,
107 "log_replay_id", config.logReplayID,
108 )
109
110 clientset, err := kubernetes.NewForConfig(config.konfig)
111 if err != nil {
112 return nil, fmt.Errorf("failed to create kube client: %w", err)
113 }
114
115 logreplay := &Client{
116 logger: log,
117 client: clientset,
118 cfg: config,
119 }
120
121 return logreplay, nil
122 }
123
124 func (l *Client) Run() error {
125 log := l.logger
126 log.Info("running log replay client")
127 for _, ns := range l.cfg.namespaces {
128
129 pods, err := l.client.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
130 if err != nil {
131 return fmt.Errorf("error listing pods for namespace %s: %w", ns, err)
132 }
133
134
135
136 logOptions := &corev1.PodLogOptions{
137 SinceTime: &l.cfg.startTime,
138 Timestamps: true,
139 }
140
141 for _, p := range pods.Items {
142 for _, c := range p.Status.ContainerStatuses {
143 logOptions.Container = c.Name
144
145
146 tag := createTag(ns, p.Name, c.Name, c.ContainerID)
147
148
149 logOptions.Previous = true
150 if err := getLogs(l, ns, p, logOptions, tag); err != nil {
151 return err
152 }
153
154
155 logOptions.Previous = false
156 if err := getLogs(l, ns, p, logOptions, tag); err != nil {
157 return err
158 }
159 }
160 for _, c := range p.Status.InitContainerStatuses {
161 logOptions.Container = c.Name
162 tag := createTag(ns, p.Name, c.Name, c.ContainerID)
163
164 logOptions.Previous = true
165 if err := getLogs(l, ns, p, logOptions, tag); err != nil {
166 return err
167 }
168
169 logOptions.Previous = false
170 if err := getLogs(l, ns, p, logOptions, tag); err != nil {
171 return err
172 }
173 }
174 }
175 }
176
177 return nil
178 }
179
180
181
182
183
184
185 func getLogs(l *Client, ns string, pod corev1.Pod, logOptions *corev1.PodLogOptions, tag string) error {
186 log := l.logger
187 log.Info("replay logs request", "namespace", ns, "pod", pod.Name, "options", logOptions, "endtime", l.cfg.endTime)
188
189 req := l.client.CoreV1().Pods(ns).GetLogs(pod.Name, logOptions)
190 logs, err := req.Stream(context.Background())
191 if err != nil {
192 log.Warn("could not stream logs", "namespace", ns, "pod", pod.Name, "container", logOptions, "err", err)
193 return nil
194 }
195 defer logs.Close()
196
197 var logsFound = false
198 scanner := bufio.NewScanner(logs)
199 for scanner.Scan() {
200 logsFound = true
201 logLine := scanner.Text()
202 parts := strings.SplitN(logLine, " ", 2)
203 timestamp := parts[0]
204 var origTime time.Time
205 if parsedTime, err := time.Parse(time.RFC3339Nano, timestamp); err == nil {
206 origTime = parsedTime
207 }
208
209
210 if origTime.Before(l.cfg.endTime) || origTime.Equal(l.cfg.endTime) {
211
212
213 ts := fmt.Sprintf("%d.%09d", origTime.Unix(), origTime.Nanosecond())
214 logMessage := parts[1]
215 log.Info("logreplay", "original-namespace", ns, "original-pod", pod.Name, "replay-options", logOptions,
216 "original-tag", tag, "original-timestamp", timestamp, "message", logMessage, "original-log", logMessage,
217 "original-time", ts, "log_replay_id", l.cfg.logReplayID, "replay_severity", l.cfg.severity)
218 }
219 }
220 if !logsFound {
221 log.Info("no logs were found", "namespace", ns, "pod", pod.Name, "options", logOptions)
222 }
223
224 return nil
225 }
226
227 func createTag(namespace string, podName string, containerName string, containerID string) string {
228
229
230 parts := strings.Split(containerID, "/")
231 return "k8s_container." + namespace + "_" + podName + "_" + containerName + "_" + parts[len(parts)-1]
232 }
233
View as plain text