...

Source file src/sigs.k8s.io/controller-runtime/pkg/internal/testing/process/process.go

Documentation: sigs.k8s.io/controller-runtime/pkg/internal/testing/process

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package process
    18  
    19  import (
    20  	"crypto/tls"
    21  	"fmt"
    22  	"io"
    23  	"net"
    24  	"net/http"
    25  	"net/url"
    26  	"os"
    27  	"os/exec"
    28  	"path"
    29  	"regexp"
    30  	"sync"
    31  	"syscall"
    32  	"time"
    33  )
    34  
    35  // ListenAddr represents some listening address and port.
    36  type ListenAddr struct {
    37  	Address string
    38  	Port    string
    39  }
    40  
    41  // URL returns a URL for this address with the given scheme and subpath.
    42  func (l *ListenAddr) URL(scheme string, path string) *url.URL {
    43  	return &url.URL{
    44  		Scheme: scheme,
    45  		Host:   l.HostPort(),
    46  		Path:   path,
    47  	}
    48  }
    49  
    50  // HostPort returns the joined host-port pair for this address.
    51  func (l *ListenAddr) HostPort() string {
    52  	return net.JoinHostPort(l.Address, l.Port)
    53  }
    54  
    55  // HealthCheck describes the information needed to health-check a process via
    56  // some health-check URL.
    57  type HealthCheck struct {
    58  	url.URL
    59  
    60  	// HealthCheckPollInterval is the interval which will be used for polling the
    61  	// endpoint described by Host, Port, and Path.
    62  	//
    63  	// If left empty it will default to 100 Milliseconds.
    64  	PollInterval time.Duration
    65  }
    66  
    67  // State define the state of the process.
    68  type State struct {
    69  	Cmd *exec.Cmd
    70  
    71  	// HealthCheck describes how to check if this process is up.  If we get an http.StatusOK,
    72  	// we assume the process is ready to operate.
    73  	//
    74  	// For example, the /healthz endpoint of the k8s API server, or the /health endpoint of etcd.
    75  	HealthCheck HealthCheck
    76  
    77  	Args []string
    78  
    79  	StopTimeout  time.Duration
    80  	StartTimeout time.Duration
    81  
    82  	Dir              string
    83  	DirNeedsCleaning bool
    84  	Path             string
    85  
    86  	// ready holds whether the process is currently in ready state (hit the ready condition) or not.
    87  	// It will be set to true on a successful `Start()` and set to false on a successful `Stop()`
    88  	ready bool
    89  
    90  	// waitDone is closed when our call to wait finishes up, and indicates that
    91  	// our process has terminated.
    92  	waitDone chan struct{}
    93  	errMu    sync.Mutex
    94  	exitErr  error
    95  	exited   bool
    96  }
    97  
    98  // Init sets up this process, configuring binary paths if missing, initializing
    99  // temporary directories, etc.
   100  //
   101  // This defaults all defaultable fields.
   102  func (ps *State) Init(name string) error {
   103  	if ps.Path == "" {
   104  		if name == "" {
   105  			return fmt.Errorf("must have at least one of name or path")
   106  		}
   107  		ps.Path = BinPathFinder(name, "")
   108  	}
   109  
   110  	if ps.Dir == "" {
   111  		newDir, err := os.MkdirTemp("", "k8s_test_framework_")
   112  		if err != nil {
   113  			return err
   114  		}
   115  		ps.Dir = newDir
   116  		ps.DirNeedsCleaning = true
   117  	}
   118  
   119  	if ps.StartTimeout == 0 {
   120  		ps.StartTimeout = 20 * time.Second
   121  	}
   122  
   123  	if ps.StopTimeout == 0 {
   124  		ps.StopTimeout = 20 * time.Second
   125  	}
   126  	return nil
   127  }
   128  
   129  type stopChannel chan struct{}
   130  
   131  // CheckFlag checks the help output of this command for the presence of the given flag, specified
   132  // without the leading `--` (e.g. `CheckFlag("insecure-port")` checks for `--insecure-port`),
   133  // returning true if the flag is present.
   134  func (ps *State) CheckFlag(flag string) (bool, error) {
   135  	cmd := exec.Command(ps.Path, "--help")
   136  	outContents, err := cmd.CombinedOutput()
   137  	if err != nil {
   138  		return false, fmt.Errorf("unable to run command %q to check for flag %q: %w", ps.Path, flag, err)
   139  	}
   140  	pat := `(?m)^\s*--` + flag + `\b` // (m --> multi-line --> ^ matches start of line)
   141  	matched, err := regexp.Match(pat, outContents)
   142  	if err != nil {
   143  		return false, fmt.Errorf("unable to check command %q for flag %q in help output: %w", ps.Path, flag, err)
   144  	}
   145  	return matched, nil
   146  }
   147  
   148  // Start starts the apiserver, waits for it to come up, and returns an error,
   149  // if occurred.
   150  func (ps *State) Start(stdout, stderr io.Writer) (err error) {
   151  	if ps.ready {
   152  		return nil
   153  	}
   154  
   155  	ps.Cmd = exec.Command(ps.Path, ps.Args...)
   156  	ps.Cmd.Stdout = stdout
   157  	ps.Cmd.Stderr = stderr
   158  	ps.Cmd.SysProcAttr = GetSysProcAttr()
   159  
   160  	ready := make(chan bool)
   161  	timedOut := time.After(ps.StartTimeout)
   162  	pollerStopCh := make(stopChannel)
   163  	go pollURLUntilOK(ps.HealthCheck.URL, ps.HealthCheck.PollInterval, ready, pollerStopCh)
   164  
   165  	ps.waitDone = make(chan struct{})
   166  
   167  	if err := ps.Cmd.Start(); err != nil {
   168  		ps.errMu.Lock()
   169  		defer ps.errMu.Unlock()
   170  		ps.exited = true
   171  		return err
   172  	}
   173  	go func() {
   174  		defer close(ps.waitDone)
   175  		err := ps.Cmd.Wait()
   176  
   177  		ps.errMu.Lock()
   178  		defer ps.errMu.Unlock()
   179  		ps.exitErr = err
   180  		ps.exited = true
   181  	}()
   182  
   183  	select {
   184  	case <-ready:
   185  		ps.ready = true
   186  		return nil
   187  	case <-ps.waitDone:
   188  		close(pollerStopCh)
   189  		return fmt.Errorf("timeout waiting for process %s to start successfully "+
   190  			"(it may have failed to start, or stopped unexpectedly before becoming ready)",
   191  			path.Base(ps.Path))
   192  	case <-timedOut:
   193  		close(pollerStopCh)
   194  		if ps.Cmd != nil {
   195  			// intentionally ignore this -- we might've crashed, failed to start, etc
   196  			ps.Cmd.Process.Signal(syscall.SIGTERM) //nolint:errcheck
   197  		}
   198  		return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
   199  	}
   200  }
   201  
   202  // Exited returns true if the process exited, and may also
   203  // return an error (as per Cmd.Wait) if the process did not
   204  // exit with error code 0.
   205  func (ps *State) Exited() (bool, error) {
   206  	ps.errMu.Lock()
   207  	defer ps.errMu.Unlock()
   208  	return ps.exited, ps.exitErr
   209  }
   210  
   211  func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) {
   212  	client := &http.Client{
   213  		Transport: &http.Transport{
   214  			TLSClientConfig: &tls.Config{
   215  				// there's probably certs *somewhere*,
   216  				// but it's fine to just skip validating
   217  				// them for health checks during testing
   218  				InsecureSkipVerify: true, //nolint:gosec
   219  			},
   220  		},
   221  	}
   222  	if interval <= 0 {
   223  		interval = 100 * time.Millisecond
   224  	}
   225  	for {
   226  		res, err := client.Get(url.String())
   227  		if err == nil {
   228  			res.Body.Close()
   229  			if res.StatusCode == http.StatusOK {
   230  				ready <- true
   231  				return
   232  			}
   233  		}
   234  
   235  		select {
   236  		case <-stopCh:
   237  			return
   238  		default:
   239  			time.Sleep(interval)
   240  		}
   241  	}
   242  }
   243  
   244  // Stop stops this process gracefully, waits for its termination, and cleans up
   245  // the CertDir if necessary.
   246  func (ps *State) Stop() error {
   247  	// Always clear the directory if we need to.
   248  	defer func() {
   249  		if ps.DirNeedsCleaning {
   250  			_ = os.RemoveAll(ps.Dir)
   251  		}
   252  	}()
   253  	if ps.Cmd == nil {
   254  		return nil
   255  	}
   256  	if done, _ := ps.Exited(); done {
   257  		return nil
   258  	}
   259  	if err := ps.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
   260  		return fmt.Errorf("unable to signal for process %s to stop: %w", ps.Path, err)
   261  	}
   262  
   263  	timedOut := time.After(ps.StopTimeout)
   264  
   265  	select {
   266  	case <-ps.waitDone:
   267  		break
   268  	case <-timedOut:
   269  		if err := ps.Cmd.Process.Signal(syscall.SIGKILL); err != nil {
   270  			return fmt.Errorf("unable to kill process %s: %w", ps.Path, err)
   271  		}
   272  		return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path))
   273  	}
   274  	ps.ready = false
   275  	return nil
   276  }
   277  

View as plain text