1 package couchctl
2
3 import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "net/http"
10 "strings"
11
12 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
13 "edge-infra.dev/pkg/edge/datasync/couchdb"
14 "edge-infra.dev/pkg/k8s/runtime/conditions"
15 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
16
17 corev1 "k8s.io/api/core/v1"
18 kerrors "k8s.io/apimachinery/pkg/api/errors"
19 "k8s.io/apimachinery/pkg/types"
20 "sigs.k8s.io/controller-runtime/pkg/client"
21
22 "github.com/go-logr/logr"
23 "github.com/google/uuid"
24 )
25
26 type NodeResourcePredicate interface {
27 ShouldReconcile(*Config, client.Object) bool
28 }
29
30 type NodeResourcePredicateFunc func(*Config, client.Object) bool
31
32 func (f NodeResourcePredicateFunc) ShouldReconcile(cfg *Config, obj client.Object) bool {
33 return f(cfg, obj)
34 }
35
36
37 func couchServer(ctx context.Context, cl client.Client, serverRef dsapi.ServerRef) (*dsapi.CouchDBServer, error) {
38 server := &dsapi.CouchDBServer{}
39 nn := serverRef.ServerRef()
40 err := cl.Get(ctx, types.NamespacedName{Namespace: nn.Namespace, Name: nn.Name}, server)
41 if err != nil {
42 return nil, err
43 }
44 return server, nil
45 }
46
47
48 func couchDBClient(ctx context.Context, cl client.Client, config *Config, serverRef dsapi.ServerRef) (*couchdb.CouchDB, error) {
49 server, err := couchServer(ctx, cl, serverRef)
50 if err != nil {
51 return nil, err
52 }
53 return couchDBServerClient(ctx, cl, config, server)
54 }
55
56
57 func couchDBServerClient(ctx context.Context, cl client.Client, config *Config, server *dsapi.CouchDBServer) (*couchdb.CouchDB, error) {
58
59 adminCreds := &couchdb.AdminCredentials{}
60 creds := server.AdminCredentials()
61 adminNN := types.NamespacedName{Name: creds.Name, Namespace: creds.Namespace}
62 _, err := adminCreds.FromSecret(ctx, cl, adminNN)
63 if err != nil {
64 return nil, err
65 }
66
67
68 cc := &couchdb.CouchDB{}
69 err = cc.New(couchdb.Driver, string(adminCreds.Username), string(adminCreds.Password), server.Spec.URI, config.CouchDBPort)
70 if err != nil {
71 return nil, err
72 }
73 return cc, err
74 }
75
76 func couchDBLeader(ctx context.Context, cl client.Client, config *Config) (*dsapi.CouchDBServer, error) {
77 log := logr.FromContextOrDiscard(ctx)
78 servers := &dsapi.CouchDBServerList{}
79 listOption := client.MatchingLabels{couchdb.NodeLeaderLabel: couchdb.LabelValueTrue}
80 if err := cl.List(ctx, servers, client.InNamespace(config.CouchNamespace), listOption); err != nil {
81 log.Error(err, "failed to list couchdb servers")
82 return nil, err
83 }
84 if len(servers.Items) != 1 {
85 err := fmt.Errorf("invalid couchdb server")
86 log.Error(err, "failed to list couchdb servers", "count", len(servers.Items))
87 return nil, err
88 }
89 server := servers.Items[0]
90 return &server, nil
91 }
92
93 func couchDBLeaderClient(ctx context.Context, cl client.Client, config *Config) (*couchdb.CouchDB, error) {
94 server, err := couchDBLeader(ctx, cl, config)
95 if err != nil {
96 return nil, err
97 }
98 return couchDBServerClient(ctx, cl, config, server)
99 }
100
101 func serverNotFound(err error) bool {
102 return kerrors.IsNotFound(err) || couchdb.IsNotFound(err)
103 }
104
105
106 func couchDBNotReadyOrNotFound(err error) bool {
107 e := err
108 var re recerr.Error
109 if errors.As(err, &re) {
110 e = re.Unwrap()
111 }
112 return errors.Is(e, ErrPodsNotReady) || errors.Is(e, ErrServerNotReady) || couchdb.IsBadGateway(e) || serverNotFound(e)
113 }
114
115 func logError(ctx context.Context, err error, msg string, keysAndValues ...interface{}) {
116 if couchDBNotReadyOrNotFound(err) {
117 return
118 }
119 logr.FromContextOrDiscard(ctx).Error(err, msg, keysAndValues...)
120 }
121
122
123 func checkIfServerIsReady(ctx context.Context, client client.Client, serverRef dsapi.ServerRef) (bool, *dsapi.CouchDBServer, error) {
124 server := &dsapi.CouchDBServer{}
125 nn := serverRef.ServerRef()
126 err := client.Get(ctx, types.NamespacedName{Namespace: nn.Namespace, Name: nn.Name}, server)
127 if err != nil {
128 return false, nil, err
129 }
130 return conditions.IsReady(server), server, nil
131 }
132
133 func labelOrEmpty(obj client.Object, key string) string {
134 labels := obj.GetLabels()
135 if len(labels) == 0 {
136 return ""
137 }
138 return labels[key]
139 }
140
141 func CheckPod(ctx context.Context, cl client.Client, ns string, name string) (bool, error) {
142 pod := &corev1.Pod{}
143 err := cl.Get(ctx, types.NamespacedName{Name: name, Namespace: ns}, pod)
144 if err != nil {
145 return false, err
146 }
147 return IsCondition(pod.Status.Conditions, "Ready", "True"), nil
148 }
149
150
151 func IsCondition(conditions []corev1.PodCondition, cType string, cStatus corev1.ConditionStatus) bool {
152 for _, condition := range conditions {
153 if string(condition.Type) == cType {
154 return condition.Status == cStatus
155 }
156 }
157 return false
158 }
159
160 func checkFinishStatus(url string) (bool, error) {
161
162 resp := &ServerSetupResponse{}
163 err := httpRequest("GET", url, Payload{}, &resp)
164 if err != nil {
165 return false, err
166 }
167
168
169 if resp.Error != "" {
170 return false, fmt.Errorf("error getting finished status: %s - %s", resp.Error, resp.Reason)
171 }
172
173
174 if resp.State != "cluster_finished" {
175 return false, nil
176 }
177
178
179 return true, nil
180 }
181
182 func httpRequest(verb, url string, data Payload, t interface{}) error {
183 payloadBytes, err := json.Marshal(data)
184 if err != nil {
185 return err
186 }
187 body := bytes.NewReader(payloadBytes)
188
189 req, err := http.NewRequest(verb, url, body)
190 if err != nil {
191 return err
192 }
193 req.Header.Set("Content-Type", "application/json")
194
195 resp, err := http.DefaultClient.Do(req)
196 if err != nil {
197 return err
198 }
199 defer resp.Body.Close()
200
201 return json.NewDecoder(resp.Body).Decode(t)
202 }
203
204 func getServerURL(config *Config, server *dsapi.CouchDBServer) string {
205 if server.IsCloud() {
206 return config.CloudURL()
207 }
208 return fmt.Sprintf("http://%s:%s", server.Spec.URI, config.CouchDBPort)
209 }
210
211
212
213 func getCouchDBUserURL(config *Config, server *dsapi.CouchDBServer) string {
214 if server.IsCloud() {
215 return config.CloudURL()
216 }
217 serviceName := couchdb.Name
218 if config.IsDSDS() {
219 serviceName = couchdb.ServiceName
220 }
221 return fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, server.Namespace)
222 }
223
224
225 func oldPVCsSuffixes(ctx context.Context, cl client.Client) (map[string]map[string]string, error) {
226 nsSuffixes := map[string]map[string]string{}
227 for ns, name := range oldPVCs {
228 pvcs := &corev1.PersistentVolumeClaimList{}
229 if err := cl.List(ctx, pvcs, client.InNamespace(ns)); client.IgnoreNotFound(err) != nil {
230 return nil, err
231 }
232
233 if len(pvcs.Items) == 0 {
234 continue
235 }
236
237 suffixes := map[string]string{}
238 nsSuffixes[ns] = suffixes
239
240 pvcPrefix := fmt.Sprintf("%s-%s-", name, ns)
241 pvcPrefixLen := len(pvcPrefix)
242
243 for _, pvc := range pvcs.Items {
244 pvcName := pvc.Name
245
246 if !strings.HasPrefix(pvcName, pvcPrefix) || len(pvcName) <= pvcPrefixLen {
247 continue
248 }
249
250 pvName := pvc.Spec.VolumeName
251 pv := &corev1.PersistentVolume{}
252 err := cl.Get(ctx, types.NamespacedName{Name: pvName}, pv)
253 if err != nil && kerrors.IsNotFound(err) {
254 continue
255 } else if err != nil {
256 return nil, fmt.Errorf("fail to get pv for pvc %s: %w", pvName, err)
257 }
258
259 nodeName := pvNodeName(pv)
260 if nodeName == "" {
261 continue
262 }
263
264 suffix := pvcName[pvcPrefixLen:]
265 if strings.HasSuffix(suffix, "-0") && len(suffix) > 2 {
266 suffix = suffix[:len(suffix)-2]
267 }
268 suffixes[nodeName] = suffix
269 }
270 }
271 return nsSuffixes, nil
272 }
273
274 func pvNodeName(pv *corev1.PersistentVolume) string {
275 if pv.Spec.NodeAffinity == nil {
276 return ""
277 }
278 if pv.Spec.NodeAffinity.Required == nil {
279 return ""
280 }
281 var nodeName string
282 outer:
283 for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
284 for _, ex := range term.MatchExpressions {
285 if ex.Key == "kubernetes.io/hostname" &&
286 ex.Operator == corev1.NodeSelectorOpIn &&
287 len(ex.Values) > 0 {
288 nodeName = ex.Values[0]
289 break outer
290 }
291 }
292 }
293 return nodeName
294 }
295
296 func sameNode(cfg *Config, obj client.Object) bool {
297 uid, err := nodeUID(obj)
298 if err != nil {
299 return false
300 }
301 return cfg.NodeUID == uid
302 }
303
304 func nodeUID(obj client.Object) (string, error) {
305 uid, err := nodeUIDFromLabel(obj)
306 if err == nil {
307 return uid, nil
308 }
309 return nodeUIDFromServerRef(obj.(dsapi.ServerRef))
310 }
311
312 func nodeUIDFromLabel(obj client.Object) (string, error) {
313 if obj == nil {
314 return "", fmt.Errorf("couch resource is invalid: nil")
315 }
316 labels := obj.GetLabels()
317 if labels == nil || labels[couchdb.NodeUIDLabel] == "" {
318 return "", fmt.Errorf("node uid label not found")
319 }
320 return labels[couchdb.NodeUIDLabel], nil
321 }
322
323 func nodeUIDFromServerRef(ref dsapi.ServerRef) (string, error) {
324 if ref == nil {
325 return "", fmt.Errorf("couch server ref is invalid: nil")
326 }
327 nn := ref.ServerRef()
328 uid := strings.TrimPrefix(nn.Name, "couchdb-")
329 if _, err := uuid.Parse(uid); err != nil {
330 return "", fmt.Errorf("invalid server name %s", nn.Name)
331 }
332 return uid, nil
333 }
334
View as plain text