1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package leaser
16
17 import (
18 "context"
19 "fmt"
20 "strconv"
21 "time"
22
23 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
24 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/krmtotf"
25 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/lease/leasable"
26 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
27
28 tfschema "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
29 "github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
30 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32 )
33
34 const (
35 leaseHolderKey = "cnrm-lease-holder-id"
36 leaseExpirationKey = "cnrm-lease-expiration"
37 )
38
39 var (
40 zeroUnixTime = time.Unix(0, 0)
41 leaseKeys = []string{leaseHolderKey, leaseExpirationKey}
42 )
43
44
45
46
47
48
49
50 type Leaser struct {
51 tfProvider *tfschema.Provider
52 kubeClient client.Client
53 smLoader *servicemappingloader.ServiceMappingLoader
54 }
55
56 func NewLeaser(tfProvider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, kubeClient client.Client) *Leaser {
57 return &Leaser{
58 tfProvider: tfProvider,
59 kubeClient: kubeClient,
60 smLoader: smLoader,
61 }
62 }
63
64
65
66
67
68
69 func (l *Leaser) Obtain(ctx context.Context, u *unstructured.Unstructured, ownerId string, duration time.Duration, renewalMin time.Duration) error {
70 if err := l.validateUnstructuredSupportsLocking(u); err != nil {
71 return err
72 }
73 if renewalMin > duration {
74 return fmt.Errorf("invalid argument, renewalMin '%v' is greater than duration '%v'", renewalMin, duration)
75 }
76 resource, liveState, err := l.getResourceAndLiveState(ctx, u)
77 if err != nil {
78 return err
79 }
80 ok, err := l.softObtain(resource.Labels, ownerId, duration, renewalMin)
81 if err != nil {
82 return err
83 }
84 if !ok {
85
86 return nil
87 }
88 config, _, err := krmtotf.KRMResourceToTFResourceConfig(resource, l.kubeClient, l.smLoader)
89 if err != nil {
90 return fmt.Errorf("error expanding resource configuration: %v", err)
91 }
92 diff, err := resource.TFResource.Diff(ctx, liveState, config, l.tfProvider.Meta())
93 if err != nil {
94 return fmt.Errorf("error calculating diff: %v", err)
95 }
96 if diff.Empty() {
97 return nil
98 }
99 _, diagnostics := resource.TFResource.Apply(ctx, liveState, diff, l.tfProvider.Meta())
100
101 if err := krmtotf.NewErrorFromDiagnostics(diagnostics); err != nil {
102 return fmt.Errorf("error applying resource change: %v", err)
103 }
104 return nil
105 }
106
107
108
109
110 func (l *Leaser) SoftObtain(resource *k8s.Resource, liveLabels map[string]string, ownerId string, duration time.Duration, renewalMin time.Duration) error {
111 if _, err := l.softObtain(liveLabels, ownerId, duration, renewalMin); err != nil {
112 return err
113 }
114 if resource.Labels == nil {
115 resource.Labels = liveLabels
116 } else {
117 if val, ok := liveLabels[leaseHolderKey]; ok {
118 resource.Labels[leaseHolderKey] = val
119 }
120 if val, ok := liveLabels[leaseExpirationKey]; ok {
121 resource.Labels[leaseExpirationKey] = val
122 }
123 }
124 return nil
125 }
126
127
128
129 func (l *Leaser) softObtain(labels map[string]string, ownerId string, duration time.Duration, renewalMin time.Duration) (ok bool, err error) {
130 leaseHolder, expirationTime := getLeaseHolderAndExpirationTime(labels)
131 if !canObtainLease(ownerId, leaseHolder, expirationTime) {
132 return false, fmt.Errorf("resource is under lease by '%v' for an additional %v second(s)", leaseHolder, expirationTime.Sub(time.Now()))
133 }
134 if !shouldRenewOrObtainLease(renewalMin, expirationTime) {
135 return false, nil
136 }
137 setLeaseHolder(labels, ownerId, duration)
138 return true, nil
139 }
140
141 func (l *Leaser) Release(ctx context.Context, u *unstructured.Unstructured, ownerId string) error {
142 if err := l.validateUnstructuredSupportsLocking(u); err != nil {
143 return err
144 }
145 resource, liveState, err := l.getResourceAndLiveState(ctx, u)
146 if err != nil {
147 return err
148 }
149 leaseHolder, expirationTime := getLeaseHolderAndExpirationTime(resource.Labels)
150 if leaseHolder == "" {
151 return fmt.Errorf("resource is not under management by '%v' or any other owner", ownerId)
152 }
153 now := time.Now()
154 if leaseHolder != ownerId {
155 return fmt.Errorf("resource is under lease by '%v' for an additional %v second(s)", leaseHolder, expirationTime.Sub(now))
156 }
157 if expirationTime.Before(now) {
158 return fmt.Errorf("unable to release lease: expired %v second(s) ago", now.Sub(expirationTime))
159 }
160 delete(resource.Labels, leaseHolderKey)
161 config, _, err := krmtotf.KRMResourceToTFResourceConfig(resource, l.kubeClient, l.smLoader)
162 if err != nil {
163 return fmt.Errorf("error expanding resource configuration: %v", err)
164 }
165 diff, err := resource.TFResource.Diff(ctx, liveState, config, l.tfProvider.Meta())
166 if err != nil {
167 return fmt.Errorf("error calculating diff: %v", err)
168 }
169 _, diagnostics := resource.TFResource.Apply(ctx, liveState, diff, l.tfProvider.Meta())
170 if err := krmtotf.NewErrorFromDiagnostics(diagnostics); err != nil {
171 return fmt.Errorf("error applying resource change: %v", err)
172 }
173 return nil
174 }
175
176 func (l *Leaser) GetOwnerAndExpirationTime(ctx context.Context, u *unstructured.Unstructured) (string, time.Time, error) {
177 if err := l.validateUnstructuredSupportsLocking(u); err != nil {
178 return "", zeroUnixTime, err
179 }
180 resource, _, err := l.getResourceAndLiveState(ctx, u)
181 if err != nil {
182 return "", zeroUnixTime, err
183 }
184 leaseHolder, expirationTime := getLeaseHolderAndExpirationTime(resource.Labels)
185 return leaseHolder, expirationTime, nil
186 }
187
188 func (l *Leaser) getResourceAndLiveState(ctx context.Context, u *unstructured.Unstructured) (*krmtotf.Resource,
189 *terraform.InstanceState, error) {
190 sm, err := l.smLoader.GetServiceMapping(u.GroupVersionKind().Group)
191 if err != nil {
192 return nil, nil, fmt.Errorf("error getting service mapping for gvk '%v': %v", u.GroupVersionKind(), err)
193 }
194 resource, err := krmtotf.NewResource(u, sm, l.tfProvider)
195 if err != nil {
196 return nil, nil, fmt.Errorf("error parsing resource %s: %v", u.GetName(), err)
197 }
198 liveState, err := krmtotf.FetchLiveState(ctx, resource, l.tfProvider, l.kubeClient, l.smLoader)
199 if err != nil {
200 return nil, nil, fmt.Errorf("error fetching live state: %v", err)
201 }
202 if liveState.Empty() {
203 return nil, nil, fmt.Errorf("resource '%v' of type '%v': not found", u.GetName(), u.GroupVersionKind())
204 }
205 resource.Labels = krmtotf.GetLabelsFromState(resource, liveState)
206 resource.Annotations = krmtotf.GetAnnotationsFromState(resource, liveState)
207 resource.Spec, resource.Status = krmtotf.ResolveSpecAndStatusWithResourceID(resource, liveState)
208
209 return resource, liveState, nil
210 }
211
212
213 func GetLabelKeys() []string {
214 return leaseKeys
215 }
216
217 func (l *Leaser) UnstructuredSupportsLeasing(u *unstructured.Unstructured) (ok bool, err error) {
218 sm, err := l.smLoader.GetServiceMapping(u.GroupVersionKind().Group)
219 if err != nil {
220 return false, fmt.Errorf("error getting service mapping: %v", err)
221 }
222 rc, err := servicemappingloader.GetResourceConfig(sm, u)
223 if err != nil {
224 return false, fmt.Errorf("error getting resource config: %v", err)
225 }
226 return leasable.ResourceConfigSupportsLeasing(rc, l.tfProvider.ResourcesMap)
227 }
228
229 func (l *Leaser) validateUnstructuredSupportsLocking(u *unstructured.Unstructured) error {
230 ok, err := l.UnstructuredSupportsLeasing(u)
231 if err != nil {
232 return fmt.Errorf("error determining if gvk '%v' supports locking: %v", u.GroupVersionKind(), err)
233 }
234 if !ok {
235 return fmt.Errorf("gvk '%v' does not support locking", u.GroupVersionKind())
236 }
237 return nil
238 }
239
240 func shouldRenewOrObtainLease(minDuration time.Duration, expirationTime time.Time) bool {
241 return time.Until(expirationTime) < minDuration
242 }
243
244 func canObtainLease(ownerId string, curLeaseHolder string, expirationTime time.Time) bool {
245 if curLeaseHolder == "" || curLeaseHolder == ownerId {
246 return true
247 }
248 return time.Now().After(expirationTime)
249 }
250
251 func setLeaseHolder(labels map[string]string, ownerId string, duration time.Duration) {
252 labels[leaseHolderKey] = ownerId
253 labels[leaseExpirationKey] = strconv.FormatInt(time.Now().Add(duration).Unix(), 10)
254 }
255
256 func getLeaseHolderAndExpirationTime(labels map[string]string) (string, time.Time) {
257 leaseHolder, ok := labels[leaseHolderKey]
258 if !ok {
259 return "", zeroUnixTime
260 }
261 leaseExpirationString, ok := labels[leaseExpirationKey]
262 if !ok {
263 return leaseHolder, zeroUnixTime
264 }
265 unixTime, err := strconv.ParseInt(leaseExpirationString, 10, 64)
266 if err != nil {
267 return leaseHolder, zeroUnixTime
268 }
269 return leaseHolder, time.Unix(unixTime, 0)
270 }
271
View as plain text