...
1 package displayctl
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/go-logr/logr"
10 kerrors "k8s.io/apimachinery/pkg/api/errors"
11 "k8s.io/apimachinery/pkg/types"
12 "sigs.k8s.io/controller-runtime/pkg/client"
13
14 v2 "edge-infra.dev/pkg/sds/display/k8s/apis/v2"
15 )
16
17 const (
18 annotationPatchFmt = `{"metadata":{"annotations":{"%s":"%s"}}}`
19 tick = time.Millisecond * 100
20 )
21
22
23
24
25
26
27
28 type nodeDisplayConfigAnnotator struct {
29 Hostname string
30 Annotation string
31
32 Client client.Client
33 ErrChan chan error
34
35
36 log logr.Logger
37
38 logMessage string
39
40
41 cancel context.CancelFunc
42
43 mu sync.Mutex
44 }
45
46 func newNodeDisplayConfigAnnotator(hostname, annotation string, c client.Client, log logr.Logger, logMessage string) *nodeDisplayConfigAnnotator {
47 return &nodeDisplayConfigAnnotator{
48 Hostname: hostname,
49 Annotation: annotation,
50 Client: c,
51 ErrChan: make(chan error),
52 log: log,
53 logMessage: logMessage,
54 }
55 }
56
57
58
59
60
61
62
63
64
65
66 func (a *nodeDisplayConfigAnnotator) AnnotateRateLimited(ctx context.Context, value string, timeout time.Duration) {
67
68 if a.cancel != nil {
69 a.cancel()
70 }
71
72 a.mu.Lock()
73
74 ctx, cancel := context.WithCancel(ctx)
75 a.cancel = cancel
76
77
78
79 go a.annotateAfterTimeoutOrCancel(ctx, value, timeout)
80 }
81
82
83
84 func (a *nodeDisplayConfigAnnotator) annotateAfterTimeoutOrCancel(ctx context.Context, value string, timeout time.Duration) {
85 defer a.mu.Unlock()
86
87 select {
88 case <-time.After(timeout):
89 if err := a.Annotate(ctx, value); err != nil {
90 a.ErrChan <- err
91 }
92 case <-ctx.Done():
93 return
94 }
95 }
96
97
98
99
100 func (a *nodeDisplayConfigAnnotator) Annotate(ctx context.Context, value string) error {
101 key := client.ObjectKey{
102 Name: a.Hostname,
103 }
104
105 nodeDisplayConfig := &v2.NodeDisplayConfig{}
106 if err := a.Client.Get(context.Background(), key, nodeDisplayConfig); kerrors.IsNotFound(err) {
107 return nil
108 } else if err != nil {
109 return fmt.Errorf("unable to get %s NodeDisplayConfig: %w", a.Hostname, err)
110 }
111
112 patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf(annotationPatchFmt, a.Annotation, value)))
113 if err := a.Client.Patch(ctx, nodeDisplayConfig, patch); err != nil {
114 return fmt.Errorf("unable patch %s annotation for %s NodeDisplayConfig: %w", a.Annotation, a.Hostname, err)
115 }
116
117 if a.logMessage != "" {
118 a.log.Info(a.logMessage)
119 }
120
121 return nil
122 }
123
View as plain text