...

Source file src/edge-infra.dev/pkg/sds/display/k8s/controllers/displayctl/annotator.go

Documentation: edge-infra.dev/pkg/sds/display/k8s/controllers/displayctl

     1  package displayctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sync"
     7  	"time"
     8  
     9  	"github.com/go-logr/logr"
    10  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    11  	"k8s.io/apimachinery/pkg/types"
    12  	"sigs.k8s.io/controller-runtime/pkg/client"
    13  
    14  	v2 "edge-infra.dev/pkg/sds/display/k8s/apis/v2"
    15  )
    16  
    17  const (
    18  	annotationPatchFmt = `{"metadata":{"annotations":{"%s":"%s"}}}`
    19  	tick               = time.Millisecond * 100
    20  )
    21  
    22  // nodeDisplayConfigAnnotator provides methods for annotating
    23  // the host NodeDisplayConfig.
    24  //
    25  // Annotations can be patched immediately, or rate-limited to
    26  // avoid multiple patches to the same annotation in a short
    27  // amount of time.
    28  type nodeDisplayConfigAnnotator struct {
    29  	Hostname   string
    30  	Annotation string
    31  
    32  	Client  client.Client
    33  	ErrChan chan error
    34  
    35  	// logger used to log message on annotation
    36  	log logr.Logger
    37  	// log message to show after annotating
    38  	logMessage string
    39  
    40  	// used to cancel the running annotator thread
    41  	cancel context.CancelFunc
    42  
    43  	mu sync.Mutex
    44  }
    45  
    46  func newNodeDisplayConfigAnnotator(hostname, annotation string, c client.Client, log logr.Logger, logMessage string) *nodeDisplayConfigAnnotator {
    47  	return &nodeDisplayConfigAnnotator{
    48  		Hostname:   hostname,
    49  		Annotation: annotation,
    50  		Client:     c,
    51  		ErrChan:    make(chan error),
    52  		log:        log,
    53  		logMessage: logMessage,
    54  	}
    55  }
    56  
    57  // Patches the host's NodeDisplayConfig annotation with the value,
    58  // with rate-limiting to avoid multiple patches of the same annotation
    59  // in a short amount of time.
    60  //
    61  // Asynchronously applies the patch after the timeout is reached. If
    62  // AnnotateRateLimited is called again before the timeout is reached,
    63  // the previous call will be cancelled and the timeout started again.
    64  // If the context becomes cancelled, the NodeDisplayConfig will not
    65  // be patched.
    66  func (a *nodeDisplayConfigAnnotator) AnnotateRateLimited(ctx context.Context, value string, timeout time.Duration) {
    67  	// cancel running annotator thread
    68  	if a.cancel != nil {
    69  		a.cancel()
    70  	}
    71  
    72  	a.mu.Lock()
    73  
    74  	ctx, cancel := context.WithCancel(ctx)
    75  	a.cancel = cancel
    76  
    77  	// asynchronously annotate after timeout is reached, doing
    78  	// nothing if the context is cancelled first
    79  	go a.annotateAfterTimeoutOrCancel(ctx, value, timeout)
    80  }
    81  
    82  // Annotates the NodeDisplayConfig after the timeout is reached.
    83  // Exits without annotating if the context is canceled first.
    84  func (a *nodeDisplayConfigAnnotator) annotateAfterTimeoutOrCancel(ctx context.Context, value string, timeout time.Duration) {
    85  	defer a.mu.Unlock()
    86  
    87  	select {
    88  	case <-time.After(timeout):
    89  		if err := a.Annotate(ctx, value); err != nil {
    90  			a.ErrChan <- err
    91  		}
    92  	case <-ctx.Done():
    93  		return
    94  	}
    95  }
    96  
    97  // Patches the host's NodeDisplayConfig annotation with the value.
    98  //
    99  // Does nothing if the NodeDisplayConfig cannot be found.
   100  func (a *nodeDisplayConfigAnnotator) Annotate(ctx context.Context, value string) error {
   101  	key := client.ObjectKey{
   102  		Name: a.Hostname,
   103  	}
   104  
   105  	nodeDisplayConfig := &v2.NodeDisplayConfig{}
   106  	if err := a.Client.Get(context.Background(), key, nodeDisplayConfig); kerrors.IsNotFound(err) {
   107  		return nil
   108  	} else if err != nil {
   109  		return fmt.Errorf("unable to get %s NodeDisplayConfig: %w", a.Hostname, err)
   110  	}
   111  
   112  	patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf(annotationPatchFmt, a.Annotation, value)))
   113  	if err := a.Client.Patch(ctx, nodeDisplayConfig, patch); err != nil {
   114  		return fmt.Errorf("unable patch %s annotation for %s NodeDisplayConfig: %w", a.Annotation, a.Hostname, err)
   115  	}
   116  
   117  	if a.logMessage != "" {
   118  		a.log.Info(a.logMessage)
   119  	}
   120  
   121  	return nil
   122  }
   123  

View as plain text