...

Source file src/sigs.k8s.io/controller-runtime/pkg/internal/testing/addr/manager.go

Documentation: sigs.k8s.io/controller-runtime/pkg/internal/testing/addr

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package addr
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"io/fs"
    23  	"net"
    24  	"os"
    25  	"path/filepath"
    26  	"strings"
    27  	"time"
    28  
    29  	"sigs.k8s.io/controller-runtime/pkg/internal/flock"
    30  )
    31  
    32  // TODO(directxman12): interface / release functionality for external port managers
    33  
    34  const (
    35  	portReserveTime   = 2 * time.Minute
    36  	portConflictRetry = 100
    37  	portFilePrefix    = "port-"
    38  )
    39  
    40  var (
    41  	cacheDir string
    42  )
    43  
    44  func init() {
    45  	baseDir, err := os.UserCacheDir()
    46  	if err == nil {
    47  		cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
    48  		err = os.MkdirAll(cacheDir, 0o750)
    49  	}
    50  	if err != nil {
    51  		// Either we didn't get a cache directory, or we can't use it
    52  		baseDir = os.TempDir()
    53  		cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
    54  		err = os.MkdirAll(cacheDir, 0o750)
    55  	}
    56  	if err != nil {
    57  		panic(err)
    58  	}
    59  }
    60  
    61  type portCache struct{}
    62  
    63  func (c *portCache) add(port int) (bool, error) {
    64  	// Remove outdated ports.
    65  	if err := fs.WalkDir(os.DirFS(cacheDir), ".", func(path string, d fs.DirEntry, err error) error {
    66  		if err != nil {
    67  			return err
    68  		}
    69  		if d.IsDir() || !d.Type().IsRegular() || !strings.HasPrefix(path, portFilePrefix) {
    70  			return nil
    71  		}
    72  		info, err := d.Info()
    73  		if err != nil {
    74  			// No-op if file no longer exists; may have been deleted by another
    75  			// process/thread trying to allocate ports.
    76  			if errors.Is(err, fs.ErrNotExist) {
    77  				return nil
    78  			}
    79  			return err
    80  		}
    81  		if time.Since(info.ModTime()) > portReserveTime {
    82  			if err := os.Remove(filepath.Join(cacheDir, path)); err != nil {
    83  				// No-op if file no longer exists; may have been deleted by another
    84  				// process/thread trying to allocate ports.
    85  				if os.IsNotExist(err) {
    86  					return nil
    87  				}
    88  				return err
    89  			}
    90  		}
    91  		return nil
    92  	}); err != nil {
    93  		return false, err
    94  	}
    95  	// Try allocating new port, by acquiring a file.
    96  	path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)
    97  	if err := flock.Acquire(path); errors.Is(err, flock.ErrAlreadyLocked) {
    98  		return false, nil
    99  	} else if err != nil {
   100  		return false, err
   101  	}
   102  	return true, nil
   103  }
   104  
   105  var cache = &portCache{}
   106  
   107  func suggest(listenHost string) (*net.TCPListener, int, string, error) {
   108  	if listenHost == "" {
   109  		listenHost = "localhost"
   110  	}
   111  	addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0"))
   112  	if err != nil {
   113  		return nil, -1, "", err
   114  	}
   115  	l, err := net.ListenTCP("tcp", addr)
   116  	if err != nil {
   117  		return nil, -1, "", err
   118  	}
   119  	return l, l.Addr().(*net.TCPAddr).Port,
   120  		addr.IP.String(),
   121  		nil
   122  }
   123  
   124  // Suggest suggests an address a process can listen on. It returns
   125  // a tuple consisting of a free port and the hostname resolved to its IP.
   126  // It makes sure that new port allocated does not conflict with old ports
   127  // allocated within 1 minute.
   128  func Suggest(listenHost string) (int, string, error) {
   129  	for i := 0; i < portConflictRetry; i++ {
   130  		listener, port, resolvedHost, err := suggest(listenHost)
   131  		if err != nil {
   132  			return -1, "", err
   133  		}
   134  		defer listener.Close()
   135  		if ok, err := cache.add(port); ok {
   136  			return port, resolvedHost, nil
   137  		} else if err != nil {
   138  			return -1, "", err
   139  		}
   140  	}
   141  	return -1, "", fmt.Errorf("no free ports found after %d retries", portConflictRetry)
   142  }
   143  

View as plain text