1
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 package nettest
32
33 import (
34 "bytes"
35 "context"
36 "encoding/json"
37 "fmt"
38 "io"
39 "log"
40 "net"
41 "net/http"
42 "os"
43 "os/signal"
44 "sync"
45 "syscall"
46 "time"
47
48 "github.com/spf13/cobra"
49
50 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
51 "k8s.io/apimachinery/pkg/util/sets"
52 "k8s.io/apimachinery/pkg/version"
53 clientset "k8s.io/client-go/kubernetes"
54 restclient "k8s.io/client-go/rest"
55 )
56
57 var (
58 port int
59 peerCount int
60 service string
61 namespace string
62 delayShutdown int
63 )
64
65
66 var CmdNettest = &cobra.Command{
67 Use: "nettest",
68 Short: "Starts a tiny web server for checking networking connectivity",
69 Long: `Starts a web server for checking networking connectivity on the given "--port".
70
71 Will dial out to, and expect to hear from, every pod that is a member of the service
72 passed in the flag "--service".
73
74 The web server will have the following endpoints:
75
76 - "/read": to see the current state, or "/quit" to shut down.
77
78 - "/status": to see "pass/running/fail" determination. (literally, it will return
79 one of those words.)
80
81 - "/write": is used by other network test pods to register connectivity.`,
82 Args: cobra.MaximumNArgs(0),
83 Run: main,
84 }
85
86 func init() {
87 CmdNettest.Flags().IntVar(&port, "port", 8080, "Port number to serve at.")
88 CmdNettest.Flags().IntVar(&peerCount, "peers", 8, "Must find at least this many peers for the test to pass.")
89 CmdNettest.Flags().StringVar(&service, "service", "nettest", "Service to find other network test pods in.")
90 CmdNettest.Flags().StringVar(&namespace, "namespace", "default", "Namespace of this pod. TODO: kubernetes should make this discoverable.")
91 CmdNettest.Flags().IntVar(&delayShutdown, "delay-shutdown", 0, "Number of seconds to delay shutdown when receiving SIGTERM.")
92 }
93
94
95
96 type State struct {
97
98 Hostname string
99
100
101 Sent map[string]int
102 Received map[string]int
103 Errors []string
104 Log []string
105 StillContactingPeers bool
106
107 lock sync.Mutex
108 }
109
110 func (s *State) doneContactingPeers() {
111 s.lock.Lock()
112 defer s.lock.Unlock()
113 s.StillContactingPeers = false
114 }
115
116
117 func (s *State) serveStatus(w http.ResponseWriter, r *http.Request) {
118 s.lock.Lock()
119 defer s.lock.Unlock()
120 if len(s.Sent) >= peerCount && len(s.Received) >= peerCount {
121 fmt.Fprintf(w, "pass")
122 return
123 }
124 if s.StillContactingPeers {
125 fmt.Fprintf(w, "running")
126 return
127 }
128
129 go s.Logf("Declaring failure for %s/%s with %d sent and %d received and %d peers", namespace, service, len(s.Sent), len(s.Received), peerCount)
130 fmt.Fprintf(w, "fail")
131 }
132
133
134 func (s *State) serveRead(w http.ResponseWriter, r *http.Request) {
135 s.lock.Lock()
136 defer s.lock.Unlock()
137 w.WriteHeader(http.StatusOK)
138 b, err := json.MarshalIndent(s, "", "\t")
139 s.appendErr(err)
140 _, err = w.Write(b)
141 s.appendErr(err)
142 }
143
144
145 type WritePost struct {
146 Source string
147 Dest string
148 }
149
150
151 type WriteResp struct {
152 Hostname string
153 }
154
155
156 func (s *State) serveWrite(w http.ResponseWriter, r *http.Request) {
157 defer r.Body.Close()
158 s.lock.Lock()
159 defer s.lock.Unlock()
160 w.WriteHeader(http.StatusOK)
161 var wp WritePost
162 s.appendErr(json.NewDecoder(r.Body).Decode(&wp))
163 if wp.Source == "" {
164 s.appendErr(fmt.Errorf("%v: Got request with no source", s.Hostname))
165 } else {
166 if s.Received == nil {
167 s.Received = map[string]int{}
168 }
169 s.Received[wp.Source]++
170 }
171 s.appendErr(json.NewEncoder(w).Encode(&WriteResp{Hostname: s.Hostname}))
172 }
173
174
175 func (s *State) appendErr(err error) {
176 if err != nil {
177 s.Errors = append(s.Errors, err.Error())
178 }
179 }
180
181
182
183
184 func (s *State) Logf(format string, args ...interface{}) {
185 s.lock.Lock()
186 defer s.lock.Unlock()
187 s.Log = append(s.Log, fmt.Sprintf(format, args...))
188 if len(s.Log) > 500 {
189 s.Log = s.Log[1:]
190 }
191 }
192
193
194 func (s *State) appendSuccessfulSend(toHostname string) {
195 s.lock.Lock()
196 defer s.lock.Unlock()
197 if s.Sent == nil {
198 s.Sent = map[string]int{}
199 }
200 s.Sent[toHostname]++
201 }
202
203 var (
204
205 state State
206 )
207
208 func main(cmd *cobra.Command, args []string) {
209 if service == "" {
210 log.Fatal("Must provide -service flag.")
211 }
212
213 hostname, err := os.Hostname()
214 if err != nil {
215 log.Fatalf("Error getting hostname: %v", err)
216 }
217
218 if delayShutdown > 0 {
219 termCh := make(chan os.Signal, 1)
220 signal.Notify(termCh, syscall.SIGTERM)
221 go func() {
222 <-termCh
223 log.Printf("Sleeping %d seconds before exit ...", delayShutdown)
224 time.Sleep(time.Duration(delayShutdown) * time.Second)
225 os.Exit(0)
226 }()
227 }
228
229 state := State{
230 Hostname: hostname,
231 StillContactingPeers: true,
232 }
233
234 go contactOthers(&state)
235
236 http.HandleFunc("/quit", func(w http.ResponseWriter, r *http.Request) {
237 os.Exit(0)
238 })
239
240 http.HandleFunc("/read", state.serveRead)
241 http.HandleFunc("/write", state.serveWrite)
242 http.HandleFunc("/status", state.serveStatus)
243
244 go log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
245
246 select {}
247 }
248
249
250 func contactOthers(state *State) {
251 var (
252 versionInfo *version.Info
253 err error
254 )
255 sleepTime := 5 * time.Second
256
257
258
259 if sleepTime < time.Duration(peerCount/10)*time.Second {
260 sleepTime = time.Duration(peerCount/10) * time.Second
261 }
262 timeout := 5 * time.Minute
263
264
265 if timeout < time.Duration(peerCount)*time.Second {
266 timeout = time.Duration(peerCount) * time.Second
267 }
268 defer state.doneContactingPeers()
269
270 config, err := restclient.InClusterConfig()
271 if err != nil {
272 log.Fatalf("Unable to create config; error: %v\n", err)
273 }
274 config.ContentType = "application/vnd.kubernetes.protobuf"
275 client, err := clientset.NewForConfig(config)
276 if err != nil {
277 log.Fatalf("Unable to create client; error: %v\n", err)
278 }
279
280
281
282 for start := time.Now(); time.Since(start) < timeout; time.Sleep(sleepTime) {
283
284 if versionInfo, err = client.Discovery().ServerVersion(); err != nil {
285 log.Printf("Unable to get server version: %v; retrying.\n", err)
286 } else {
287 log.Printf("Server version: %#v\n", versionInfo)
288 break
289 }
290 time.Sleep(1 * time.Second)
291 }
292
293 if err != nil {
294 log.Fatalf("Unable to contact Kubernetes: %v\n", err)
295 }
296
297 for start := time.Now(); time.Since(start) < timeout; time.Sleep(sleepTime) {
298 eps := getWebserverEndpoints(client)
299 if eps.Len() >= peerCount {
300 break
301 }
302 state.Logf("%v/%v has %v endpoints (%v), which is less than %v as expected. Waiting for all endpoints to come up.", namespace, service, len(eps), eps.List(), peerCount)
303 }
304
305
306
307 for i := 0; i < 15; i++ {
308 eps := getWebserverEndpoints(client)
309 for ep := range eps {
310 state.Logf("Attempting to contact %s", ep)
311 contactSingle(ep, state)
312 }
313 time.Sleep(sleepTime)
314 }
315 }
316
317
318 func getWebserverEndpoints(client clientset.Interface) sets.String {
319 endpoints, err := client.CoreV1().Endpoints(namespace).Get(context.TODO(), service, v1.GetOptions{})
320 eps := sets.String{}
321 if err != nil {
322 state.Logf("Unable to read the endpoints for %v/%v: %v.", namespace, service, err)
323 return eps
324 }
325 for _, ss := range endpoints.Subsets {
326 for _, a := range ss.Addresses {
327 for _, p := range ss.Ports {
328 ipPort := net.JoinHostPort(a.IP, fmt.Sprint(p.Port))
329 eps.Insert(fmt.Sprintf("http://%s", ipPort))
330 }
331 }
332 }
333 return eps
334 }
335
336
337 func contactSingle(e string, state *State) {
338 body, err := json.Marshal(&WritePost{
339 Dest: e,
340 Source: state.Hostname,
341 })
342 if err != nil {
343 log.Fatalf("json marshal error: %v", err)
344 }
345 resp, err := http.Post(e+"/write", "application/json", bytes.NewReader(body))
346 if err != nil {
347 state.Logf("Warning: unable to contact the endpoint %q: %v", e, err)
348 return
349 }
350 defer resp.Body.Close()
351
352 body, err = io.ReadAll(resp.Body)
353 if err != nil {
354 state.Logf("Warning: unable to read response from '%v': '%v'", e, err)
355 return
356 }
357 var wr WriteResp
358 err = json.Unmarshal(body, &wr)
359 if err != nil {
360 state.Logf("Warning: unable to unmarshal response (%v) from '%v': '%v'", string(body), e, err)
361 return
362 }
363 state.appendSuccessfulSend(wr.Hostname)
364 }
365
View as plain text