...

Source file src/k8s.io/kubernetes/test/images/agnhost/nettest/nettest.go

Documentation: k8s.io/kubernetes/test/images/agnhost/nettest

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // A tiny web server for checking networking connectivity.
    18  //
    19  // Will dial out to, and expect to hear from, every pod that is a member of
    20  // the service passed in the flag -service.
    21  //
    22  // Will serve a webserver on given -port.
    23  //
    24  // Visit /read to see the current state, or /quit to shut down.
    25  //
    26  // Visit /status to see pass/running/fail determination. (literally, it will
    27  // return one of those words.)
    28  //
    29  // /write is used by other network test pods to register connectivity.
    30  
    31  package nettest
    32  
    33  import (
    34  	"bytes"
    35  	"context"
    36  	"encoding/json"
    37  	"fmt"
    38  	"io"
    39  	"log"
    40  	"net"
    41  	"net/http"
    42  	"os"
    43  	"os/signal"
    44  	"sync"
    45  	"syscall"
    46  	"time"
    47  
    48  	"github.com/spf13/cobra"
    49  
    50  	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    51  	"k8s.io/apimachinery/pkg/util/sets"
    52  	"k8s.io/apimachinery/pkg/version"
    53  	clientset "k8s.io/client-go/kubernetes"
    54  	restclient "k8s.io/client-go/rest"
    55  )
    56  
    57  var (
    58  	port          int
    59  	peerCount     int
    60  	service       string
    61  	namespace     string
    62  	delayShutdown int
    63  )
    64  
    65  // CmdNettest is used by agnhost Cobra.
    66  var CmdNettest = &cobra.Command{
    67  	Use:   "nettest",
    68  	Short: "Starts a tiny web server for checking networking connectivity",
    69  	Long: `Starts a web server for checking networking connectivity on the given "--port".
    70  
    71  Will dial out to, and expect to hear from, every pod that is a member of the service
    72  passed in the flag "--service".
    73  
    74  The web server will have the following endpoints:
    75  
    76  - "/read": to see the current state, or "/quit" to shut down.
    77  
    78  - "/status": to see "pass/running/fail" determination. (literally, it will return
    79  one of those words.)
    80  
    81  - "/write": is used by other network test pods to register connectivity.`,
    82  	Args: cobra.MaximumNArgs(0),
    83  	Run:  main,
    84  }
    85  
    86  func init() {
    87  	CmdNettest.Flags().IntVar(&port, "port", 8080, "Port number to serve at.")
    88  	CmdNettest.Flags().IntVar(&peerCount, "peers", 8, "Must find at least this many peers for the test to pass.")
    89  	CmdNettest.Flags().StringVar(&service, "service", "nettest", "Service to find other network test pods in.")
    90  	CmdNettest.Flags().StringVar(&namespace, "namespace", "default", "Namespace of this pod. TODO: kubernetes should make this discoverable.")
    91  	CmdNettest.Flags().IntVar(&delayShutdown, "delay-shutdown", 0, "Number of seconds to delay shutdown when receiving SIGTERM.")
    92  }
    93  
    94  // State tracks the internal state of our little http server.
    95  // It's returned verbatim over the /read endpoint.
    96  type State struct {
    97  	// Hostname is set once and never changed-- it's always safe to read.
    98  	Hostname string
    99  
   100  	// The below fields require that lock is held before reading or writing.
   101  	Sent                 map[string]int
   102  	Received             map[string]int
   103  	Errors               []string
   104  	Log                  []string
   105  	StillContactingPeers bool
   106  
   107  	lock sync.Mutex
   108  }
   109  
   110  func (s *State) doneContactingPeers() {
   111  	s.lock.Lock()
   112  	defer s.lock.Unlock()
   113  	s.StillContactingPeers = false
   114  }
   115  
   116  // serveStatus returns "pass", "running", or "fail".
   117  func (s *State) serveStatus(w http.ResponseWriter, r *http.Request) {
   118  	s.lock.Lock()
   119  	defer s.lock.Unlock()
   120  	if len(s.Sent) >= peerCount && len(s.Received) >= peerCount {
   121  		fmt.Fprintf(w, "pass")
   122  		return
   123  	}
   124  	if s.StillContactingPeers {
   125  		fmt.Fprintf(w, "running")
   126  		return
   127  	}
   128  	// Logf can't be called while holding the lock, so defer using a goroutine
   129  	go s.Logf("Declaring failure for %s/%s with %d sent and %d received and %d peers", namespace, service, len(s.Sent), len(s.Received), peerCount)
   130  	fmt.Fprintf(w, "fail")
   131  }
   132  
   133  // serveRead writes our json encoded state
   134  func (s *State) serveRead(w http.ResponseWriter, r *http.Request) {
   135  	s.lock.Lock()
   136  	defer s.lock.Unlock()
   137  	w.WriteHeader(http.StatusOK)
   138  	b, err := json.MarshalIndent(s, "", "\t")
   139  	s.appendErr(err)
   140  	_, err = w.Write(b)
   141  	s.appendErr(err)
   142  }
   143  
   144  // WritePost is the format that (json encoded) requests to the /write handler should take.
   145  type WritePost struct {
   146  	Source string
   147  	Dest   string
   148  }
   149  
   150  // WriteResp is returned by /write
   151  type WriteResp struct {
   152  	Hostname string
   153  }
   154  
   155  // serveWrite records the contact in our state.
   156  func (s *State) serveWrite(w http.ResponseWriter, r *http.Request) {
   157  	defer r.Body.Close()
   158  	s.lock.Lock()
   159  	defer s.lock.Unlock()
   160  	w.WriteHeader(http.StatusOK)
   161  	var wp WritePost
   162  	s.appendErr(json.NewDecoder(r.Body).Decode(&wp))
   163  	if wp.Source == "" {
   164  		s.appendErr(fmt.Errorf("%v: Got request with no source", s.Hostname))
   165  	} else {
   166  		if s.Received == nil {
   167  			s.Received = map[string]int{}
   168  		}
   169  		s.Received[wp.Source]++
   170  	}
   171  	s.appendErr(json.NewEncoder(w).Encode(&WriteResp{Hostname: s.Hostname}))
   172  }
   173  
   174  // appendErr adds err to the list, if err is not nil. s must be locked.
   175  func (s *State) appendErr(err error) {
   176  	if err != nil {
   177  		s.Errors = append(s.Errors, err.Error())
   178  	}
   179  }
   180  
   181  // Logf writes to the log message list. s must not be locked.
   182  // s's Log member will drop an old message if it would otherwise
   183  // become longer than 500 messages.
   184  func (s *State) Logf(format string, args ...interface{}) {
   185  	s.lock.Lock()
   186  	defer s.lock.Unlock()
   187  	s.Log = append(s.Log, fmt.Sprintf(format, args...))
   188  	if len(s.Log) > 500 {
   189  		s.Log = s.Log[1:]
   190  	}
   191  }
   192  
   193  // s must not be locked
   194  func (s *State) appendSuccessfulSend(toHostname string) {
   195  	s.lock.Lock()
   196  	defer s.lock.Unlock()
   197  	if s.Sent == nil {
   198  		s.Sent = map[string]int{}
   199  	}
   200  	s.Sent[toHostname]++
   201  }
   202  
   203  var (
   204  	// Our one and only state object
   205  	state State
   206  )
   207  
   208  func main(cmd *cobra.Command, args []string) {
   209  	if service == "" {
   210  		log.Fatal("Must provide -service flag.")
   211  	}
   212  
   213  	hostname, err := os.Hostname()
   214  	if err != nil {
   215  		log.Fatalf("Error getting hostname: %v", err)
   216  	}
   217  
   218  	if delayShutdown > 0 {
   219  		termCh := make(chan os.Signal, 1)
   220  		signal.Notify(termCh, syscall.SIGTERM)
   221  		go func() {
   222  			<-termCh
   223  			log.Printf("Sleeping %d seconds before exit ...", delayShutdown)
   224  			time.Sleep(time.Duration(delayShutdown) * time.Second)
   225  			os.Exit(0)
   226  		}()
   227  	}
   228  
   229  	state := State{
   230  		Hostname:             hostname,
   231  		StillContactingPeers: true,
   232  	}
   233  
   234  	go contactOthers(&state)
   235  
   236  	http.HandleFunc("/quit", func(w http.ResponseWriter, r *http.Request) {
   237  		os.Exit(0)
   238  	})
   239  
   240  	http.HandleFunc("/read", state.serveRead)
   241  	http.HandleFunc("/write", state.serveWrite)
   242  	http.HandleFunc("/status", state.serveStatus)
   243  
   244  	go log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
   245  
   246  	select {}
   247  }
   248  
   249  // Find all sibling pods in the service and post to their /write handler.
   250  func contactOthers(state *State) {
   251  	var (
   252  		versionInfo *version.Info
   253  		err         error
   254  	)
   255  	sleepTime := 5 * time.Second
   256  	// In large cluster getting all endpoints is pretty expensive.
   257  	// Thus, we will limit ourselves to send on average at most 10 such
   258  	// requests per second
   259  	if sleepTime < time.Duration(peerCount/10)*time.Second {
   260  		sleepTime = time.Duration(peerCount/10) * time.Second
   261  	}
   262  	timeout := 5 * time.Minute
   263  	// Similarly we need to bump timeout so that it is reasonable in large
   264  	// clusters.
   265  	if timeout < time.Duration(peerCount)*time.Second {
   266  		timeout = time.Duration(peerCount) * time.Second
   267  	}
   268  	defer state.doneContactingPeers()
   269  
   270  	config, err := restclient.InClusterConfig()
   271  	if err != nil {
   272  		log.Fatalf("Unable to create config; error: %v\n", err)
   273  	}
   274  	config.ContentType = "application/vnd.kubernetes.protobuf"
   275  	client, err := clientset.NewForConfig(config)
   276  	if err != nil {
   277  		log.Fatalf("Unable to create client; error: %v\n", err)
   278  	}
   279  
   280  	// Try to get the server version until <timeout>; we use a timeout because
   281  	// the pod might not have immediate network connectivity.
   282  	for start := time.Now(); time.Since(start) < timeout; time.Sleep(sleepTime) {
   283  		// Double check that worked by getting the server version.
   284  		if versionInfo, err = client.Discovery().ServerVersion(); err != nil {
   285  			log.Printf("Unable to get server version: %v; retrying.\n", err)
   286  		} else {
   287  			log.Printf("Server version: %#v\n", versionInfo)
   288  			break
   289  		}
   290  		time.Sleep(1 * time.Second)
   291  	}
   292  
   293  	if err != nil {
   294  		log.Fatalf("Unable to contact Kubernetes: %v\n", err)
   295  	}
   296  
   297  	for start := time.Now(); time.Since(start) < timeout; time.Sleep(sleepTime) {
   298  		eps := getWebserverEndpoints(client)
   299  		if eps.Len() >= peerCount {
   300  			break
   301  		}
   302  		state.Logf("%v/%v has %v endpoints (%v), which is less than %v as expected. Waiting for all endpoints to come up.", namespace, service, len(eps), eps.List(), peerCount)
   303  	}
   304  
   305  	// Do this repeatedly, in case there's some propagation delay with getting
   306  	// newly started pods into the endpoints list.
   307  	for i := 0; i < 15; i++ {
   308  		eps := getWebserverEndpoints(client)
   309  		for ep := range eps {
   310  			state.Logf("Attempting to contact %s", ep)
   311  			contactSingle(ep, state)
   312  		}
   313  		time.Sleep(sleepTime)
   314  	}
   315  }
   316  
   317  // getWebserverEndpoints returns the webserver endpoints as a set of String, each in the format like "http://{ip}:{port}"
   318  func getWebserverEndpoints(client clientset.Interface) sets.String {
   319  	endpoints, err := client.CoreV1().Endpoints(namespace).Get(context.TODO(), service, v1.GetOptions{})
   320  	eps := sets.String{}
   321  	if err != nil {
   322  		state.Logf("Unable to read the endpoints for %v/%v: %v.", namespace, service, err)
   323  		return eps
   324  	}
   325  	for _, ss := range endpoints.Subsets {
   326  		for _, a := range ss.Addresses {
   327  			for _, p := range ss.Ports {
   328  				ipPort := net.JoinHostPort(a.IP, fmt.Sprint(p.Port))
   329  				eps.Insert(fmt.Sprintf("http://%s", ipPort))
   330  			}
   331  		}
   332  	}
   333  	return eps
   334  }
   335  
   336  // contactSingle dials the address 'e' and tries to POST to its /write address.
   337  func contactSingle(e string, state *State) {
   338  	body, err := json.Marshal(&WritePost{
   339  		Dest:   e,
   340  		Source: state.Hostname,
   341  	})
   342  	if err != nil {
   343  		log.Fatalf("json marshal error: %v", err)
   344  	}
   345  	resp, err := http.Post(e+"/write", "application/json", bytes.NewReader(body))
   346  	if err != nil {
   347  		state.Logf("Warning: unable to contact the endpoint %q: %v", e, err)
   348  		return
   349  	}
   350  	defer resp.Body.Close()
   351  
   352  	body, err = io.ReadAll(resp.Body)
   353  	if err != nil {
   354  		state.Logf("Warning: unable to read response from '%v': '%v'", e, err)
   355  		return
   356  	}
   357  	var wr WriteResp
   358  	err = json.Unmarshal(body, &wr)
   359  	if err != nil {
   360  		state.Logf("Warning: unable to unmarshal response (%v) from '%v': '%v'", string(body), e, err)
   361  		return
   362  	}
   363  	state.appendSuccessfulSend(wr.Hostname)
   364  }
   365  

View as plain text