1
16
17 package debug
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "strconv"
24 "strings"
25 "sync"
26 "text/tabwriter"
27 "time"
28
29 clientset "k8s.io/client-go/kubernetes"
30
31 "k8s.io/kubernetes/test/e2e/framework"
32 e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
33 )
34
35 const (
36
37 pollingPeriod = 60 * time.Second
38 workersNo = 5
39 kubeletLogsPath = "/var/log/kubelet.log"
40 kubeProxyLogsPath = "/var/log/kube-proxy.log"
41 kubeAddonsLogsPath = "/var/log/kube-addons.log"
42 kubeMasterAddonsLogsPath = "/var/log/kube-master-addons.log"
43 apiServerLogsPath = "/var/log/kube-apiserver.log"
44 controllersLogsPath = "/var/log/kube-controller-manager.log"
45 schedulerLogsPath = "/var/log/kube-scheduler.log"
46 )
47
48 var (
49 nodeLogsToCheck = []string{kubeletLogsPath, kubeProxyLogsPath}
50 masterLogsToCheck = []string{kubeletLogsPath, kubeAddonsLogsPath, kubeMasterAddonsLogsPath,
51 apiServerLogsPath, controllersLogsPath, schedulerLogsPath}
52 )
53
54
55 type TimestampedSize struct {
56 timestamp time.Time
57 size int
58 }
59
60
61 type LogSizeGatherer struct {
62 stopChannel chan bool
63 data *LogsSizeData
64 wg *sync.WaitGroup
65 workChannel chan WorkItem
66 }
67
68
69
70 type LogsSizeVerifier struct {
71 client clientset.Interface
72 stopChannel chan bool
73
74 data *LogsSizeData
75 masterAddress string
76 nodeAddresses []string
77 wg sync.WaitGroup
78 workChannel chan WorkItem
79 workers []*LogSizeGatherer
80 }
81
82
83 type SingleLogSummary struct {
84 AverageGenerationRate int
85 NumberOfProbes int
86 }
87
88
89 type LogSizeDataTimeseries map[string]map[string][]TimestampedSize
90
91
92
93 type LogsSizeDataSummary map[string]map[string]SingleLogSummary
94
95
96
97 func (s *LogsSizeDataSummary) PrintHumanReadable() string {
98 buf := &bytes.Buffer{}
99 w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
100 fmt.Fprintf(w, "host\tlog_file\taverage_rate (B/s)\tnumber_of_probes\n")
101 for k, v := range *s {
102 fmt.Fprintf(w, "%v\t\t\t\n", k)
103 for path, data := range v {
104 fmt.Fprintf(w, "\t%v\t%v\t%v\n", path, data.AverageGenerationRate, data.NumberOfProbes)
105 }
106 }
107 w.Flush()
108 return buf.String()
109 }
110
111
112 func (s *LogsSizeDataSummary) PrintJSON() string {
113 return framework.PrettyPrintJSON(*s)
114 }
115
116
117 func (s *LogsSizeDataSummary) SummaryKind() string {
118 return "LogSizeSummary"
119 }
120
121
122 type LogsSizeData struct {
123 data LogSizeDataTimeseries
124 lock sync.Mutex
125 }
126
127
128
129 type WorkItem struct {
130 ip string
131 paths []string
132 backoffMultiplier int
133 }
134
135 func prepareData(masterAddress string, nodeAddresses []string) *LogsSizeData {
136 data := make(LogSizeDataTimeseries)
137 ips := append(nodeAddresses, masterAddress)
138 for _, ip := range ips {
139 data[ip] = make(map[string][]TimestampedSize)
140 }
141 return &LogsSizeData{
142 data: data,
143 lock: sync.Mutex{},
144 }
145 }
146
147 func (d *LogsSizeData) addNewData(ip, path string, timestamp time.Time, size int) {
148 d.lock.Lock()
149 defer d.lock.Unlock()
150 d.data[ip][path] = append(
151 d.data[ip][path],
152 TimestampedSize{
153 timestamp: timestamp,
154 size: size,
155 },
156 )
157 }
158
159
160 func NewLogsVerifier(ctx context.Context, c clientset.Interface) *LogsSizeVerifier {
161 nodeAddresses, err := e2essh.NodeSSHHosts(ctx, c)
162 framework.ExpectNoError(err)
163 instanceAddress := framework.APIAddress() + ":22"
164
165 workChannel := make(chan WorkItem, len(nodeAddresses)+1)
166 workers := make([]*LogSizeGatherer, workersNo)
167
168 verifier := &LogsSizeVerifier{
169 client: c,
170 data: prepareData(instanceAddress, nodeAddresses),
171 masterAddress: instanceAddress,
172 nodeAddresses: nodeAddresses,
173 wg: sync.WaitGroup{},
174 workChannel: workChannel,
175 workers: workers,
176 }
177 verifier.wg.Add(workersNo)
178 for i := 0; i < workersNo; i++ {
179 workers[i] = &LogSizeGatherer{
180 data: verifier.data,
181 wg: &verifier.wg,
182 workChannel: workChannel,
183 }
184 }
185 return verifier
186 }
187
188
189 func (s *LogsSizeVerifier) GetSummary() *LogsSizeDataSummary {
190 result := make(LogsSizeDataSummary)
191 for k, v := range s.data.data {
192 result[k] = make(map[string]SingleLogSummary)
193 for path, data := range v {
194 if len(data) > 1 {
195 last := data[len(data)-1]
196 first := data[0]
197 rate := (last.size - first.size) / int(last.timestamp.Sub(first.timestamp)/time.Second)
198 result[k][path] = SingleLogSummary{
199 AverageGenerationRate: rate,
200 NumberOfProbes: len(data),
201 }
202 }
203 }
204 }
205 return &result
206 }
207
208
209 func (s *LogsSizeVerifier) Run(ctx context.Context) {
210 s.workChannel <- WorkItem{
211 ip: s.masterAddress,
212 paths: masterLogsToCheck,
213 backoffMultiplier: 1,
214 }
215 for _, node := range s.nodeAddresses {
216 s.workChannel <- WorkItem{
217 ip: node,
218 paths: nodeLogsToCheck,
219 backoffMultiplier: 1,
220 }
221 }
222 for _, worker := range s.workers {
223 go worker.Run(ctx)
224 }
225 <-s.stopChannel
226 s.wg.Wait()
227 }
228
229
230 func (g *LogSizeGatherer) Run(ctx context.Context) {
231 for g.Work(ctx) {
232 }
233 }
234
235 func (g *LogSizeGatherer) pushWorkItem(workItem WorkItem) {
236 select {
237 case <-time.After(time.Duration(workItem.backoffMultiplier) * pollingPeriod):
238 g.workChannel <- workItem
239 case <-g.stopChannel:
240 return
241 }
242 }
243
244
245
246
247 func (g *LogSizeGatherer) Work(ctx context.Context) bool {
248 var workItem WorkItem
249 select {
250 case <-g.stopChannel:
251 g.wg.Done()
252 return false
253 case workItem = <-g.workChannel:
254 }
255 sshResult, err := e2essh.SSH(
256 ctx,
257 fmt.Sprintf("ls -l %v | awk '{print $9, $5}' | tr '\n' ' '", strings.Join(workItem.paths, " ")),
258 workItem.ip,
259 framework.TestContext.Provider,
260 )
261 if err != nil {
262 framework.Logf("Error while trying to SSH to %v, skipping probe. Error: %v", workItem.ip, err)
263
264 if workItem.backoffMultiplier >= 128 {
265 framework.Logf("Failed to ssh to a node %v multiple times in a row. Giving up.", workItem.ip)
266 g.wg.Done()
267 return false
268 }
269 workItem.backoffMultiplier *= 2
270 go g.pushWorkItem(workItem)
271 return true
272 }
273 workItem.backoffMultiplier = 1
274 results := strings.Split(sshResult.Stdout, " ")
275
276 now := time.Now()
277 for i := 0; i+1 < len(results); i = i + 2 {
278 path := results[i]
279 size, err := strconv.Atoi(results[i+1])
280 if err != nil {
281 framework.Logf("Error during conversion to int: %v, skipping data. Error: %v", results[i+1], err)
282 continue
283 }
284 g.data.addNewData(workItem.ip, path, now, size)
285 }
286 go g.pushWorkItem(workItem)
287 return true
288 }
289
View as plain text