...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/kubeapply/kubeapply.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/kubeapply

     1  package kubeapply
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"os"
     8  	"path/filepath"
     9  	"sort"
    10  	"strings"
    11  	"syscall"
    12  	"time"
    13  
    14  	"github.com/datawire/dlib/derror"
    15  	"github.com/datawire/dlib/dexec"
    16  	"github.com/datawire/dlib/dlog"
    17  	"github.com/emissary-ingress/emissary/v3/pkg/k8s"
    18  )
    19  
    20  // errorDeadlineExceeded is returned from YAMLCollection.applyAndWait
    21  // if the deadline is exceeded.
    22  var errorDeadlineExceeded = errors.New("timeout exceeded")
    23  
    24  // Kubeapply applies the supplied manifests to the kubernetes cluster
    25  // indicated via the kubeinfo argument.  If kubeinfo is nil, it will
    26  // look in the standard default places for cluster configuration.  If
    27  // any phase takes longer than perPhaseTimeout to become ready, then
    28  // it returns early with an error.
    29  func Kubeapply(ctx context.Context, kubeinfo *k8s.KubeInfo, perPhaseTimeout time.Duration, debug, dryRun bool, files ...string) error {
    30  	collection, err := CollectYAML(files...)
    31  	if err != nil {
    32  		return fmt.Errorf("CollectYAML: %w", err)
    33  	}
    34  
    35  	if err = collection.ApplyAndWait(ctx, kubeinfo, perPhaseTimeout, debug, dryRun); err != nil {
    36  		return fmt.Errorf("ApplyAndWait: %w", err)
    37  	}
    38  
    39  	return nil
    40  }
    41  
    42  // A YAMLCollection is a collection of YAML files to later be applied.
    43  type YAMLCollection map[string][]string
    44  
    45  // CollectYAML takes several file or directory paths, and returns a
    46  // collection of the YAML files in them.
    47  func CollectYAML(paths ...string) (YAMLCollection, error) {
    48  	ret := make(YAMLCollection)
    49  	for _, path := range paths {
    50  		err := filepath.Walk(path, func(filename string, fileinfo os.FileInfo, err error) error {
    51  			if err != nil {
    52  				return err
    53  			}
    54  			if fileinfo.IsDir() {
    55  				return nil
    56  			}
    57  
    58  			if strings.HasSuffix(filename, ".yaml") {
    59  				ret.addFile(filename)
    60  			}
    61  			return nil
    62  		})
    63  		if err != nil {
    64  			return nil, err
    65  		}
    66  	}
    67  	return ret, nil
    68  }
    69  
    70  func hasNumberPrefix(filepart string) bool {
    71  	if len(filepart) < 3 {
    72  		return false
    73  	}
    74  	return '0' <= filepart[0] && filepart[0] <= '9' &&
    75  		'0' <= filepart[1] && filepart[1] <= '9' &&
    76  		filepart[2] == '-'
    77  }
    78  
    79  func (collection YAMLCollection) addFile(path string) {
    80  	_, notdir := filepath.Split(path)
    81  	phaseName := "last" // all letters sort after all numbers; "last" is after all numbered phases
    82  	if hasNumberPrefix(notdir) {
    83  		phaseName = notdir[:2]
    84  	}
    85  
    86  	collection[phaseName] = append(collection[phaseName], path)
    87  }
    88  
    89  // ApplyAndWait applies the collection of YAML, and waits for all
    90  // Resources described in it to be ready.  If any phase takes longer
    91  // than perPhaseTimeout to become ready, then it returns early with an
    92  // error.
    93  func (collection YAMLCollection) ApplyAndWait(
    94  	ctx context.Context,
    95  	kubeinfo *k8s.KubeInfo,
    96  	perPhaseTimeout time.Duration,
    97  	debug, dryRun bool,
    98  ) error {
    99  	if kubeinfo == nil {
   100  		kubeinfo = k8s.NewKubeInfo("", "", "")
   101  	}
   102  
   103  	phaseNames := make([]string, 0, len(collection))
   104  	for phaseName := range collection {
   105  		phaseNames = append(phaseNames, phaseName)
   106  	}
   107  	sort.Strings(phaseNames)
   108  
   109  	for _, phaseName := range phaseNames {
   110  		deadline := time.Now().Add(perPhaseTimeout)
   111  		err := applyAndWait(ctx, kubeinfo, deadline, debug, dryRun, collection[phaseName])
   112  		if err != nil {
   113  			if errors.Is(err, errorDeadlineExceeded) {
   114  				err = fmt.Errorf("phase %q not ready after %v: %w", phaseName, perPhaseTimeout, err)
   115  			}
   116  			return err
   117  		}
   118  	}
   119  	return nil
   120  }
   121  
   122  func applyAndWait(ctx context.Context, kubeinfo *k8s.KubeInfo, deadline time.Time, debug, dryRun bool, sourceFilenames []string) error {
   123  	expandedFilenames, err := expand(ctx, sourceFilenames)
   124  	if err != nil {
   125  		return fmt.Errorf("expanding YAML: %w", err)
   126  	}
   127  
   128  	cli, err := k8s.NewClient(kubeinfo)
   129  	if err != nil {
   130  		return fmt.Errorf("connecting to cluster %v: %w", kubeinfo, err)
   131  	}
   132  	waiter, err := NewWaiter(cli.Watcher())
   133  	if err != nil {
   134  		return err
   135  	}
   136  
   137  	valid := make(map[string]bool)
   138  	var scanErrs derror.MultiError
   139  	for _, filename := range expandedFilenames {
   140  		valid[filename] = true
   141  		if err := waiter.Scan(ctx, filename); err != nil {
   142  			scanErrs = append(scanErrs, fmt.Errorf("watch %q: %w", filename, err))
   143  			valid[filename] = false
   144  		}
   145  	}
   146  	if !debug {
   147  		// Unless the debug flag is on, clean up temporary expanded files when we're
   148  		// finished.
   149  		defer func() {
   150  			for _, filename := range expandedFilenames {
   151  				if valid[filename] {
   152  					if err := os.Remove(filename); err != nil {
   153  						// os.Remove returns an *io/fs.PathError that
   154  						// already includes the filename; no need for us to
   155  						// explicitly include the filename in the log line.
   156  						dlog.Error(ctx, err)
   157  					}
   158  				}
   159  			}
   160  		}()
   161  	}
   162  	if len(scanErrs) > 0 {
   163  		return fmt.Errorf("waiter: %w", scanErrs)
   164  	}
   165  
   166  	if err := kubectlApply(ctx, kubeinfo, dryRun, expandedFilenames); err != nil {
   167  		return err
   168  	}
   169  
   170  	finished, err := waiter.Wait(ctx, deadline)
   171  	if err != nil {
   172  		return err
   173  	}
   174  	if !finished {
   175  		return errorDeadlineExceeded
   176  	}
   177  
   178  	return nil
   179  }
   180  
   181  func expand(ctx context.Context, names []string) ([]string, error) {
   182  	dlog.Printf(ctx, "expanding %s\n", strings.Join(names, " "))
   183  	var result []string
   184  	for _, n := range names {
   185  		resources, err := LoadResources(ctx, n)
   186  		if err != nil {
   187  			return nil, err
   188  		}
   189  		out := n + ".o"
   190  		err = SaveResources(out, resources)
   191  		if err != nil {
   192  			return nil, err
   193  		}
   194  		result = append(result, out)
   195  	}
   196  	return result, nil
   197  }
   198  
   199  func kubectlApply(ctx context.Context, info *k8s.KubeInfo, dryRun bool, filenames []string) error {
   200  	args := []string{"apply"}
   201  	if dryRun {
   202  		args = append(args, "--dry-run")
   203  	}
   204  	for _, filename := range filenames {
   205  		// https://github.com/datawire/ambassador/issues/77
   206  		filehandle, err := os.Open(filename)
   207  		if err != nil {
   208  			return err
   209  		}
   210  		defer filehandle.Close()
   211  		if err := syscall.Flock(int(filehandle.Fd()), syscall.LOCK_EX); err != nil {
   212  			return err
   213  		}
   214  		args = append(args, "-f", filename)
   215  	}
   216  	kargs, err := info.GetKubectlArray(args...)
   217  	if err != nil {
   218  		return err
   219  	}
   220  	dlog.Printf(ctx, "kubectl %s\n", strings.Join(kargs, " "))
   221  	/* #nosec */
   222  	if err := dexec.CommandContext(ctx, "kubectl", kargs...).Run(); err != nil {
   223  		return err
   224  	}
   225  
   226  	return nil
   227  }
   228  

View as plain text