1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package client
19
20 import (
21 "bytes"
22 "context"
23 "crypto/tls"
24 "crypto/x509"
25 "encoding/json"
26 "errors"
27 "fmt"
28 "io"
29 "log"
30 "net"
31 "net/http"
32 "net/url"
33 "os"
34 "strings"
35 "sync"
36 "time"
37
38 "github.com/donovanhide/eventsource"
39 "golang.org/x/time/rate"
40 "moul.io/http2curl/v2"
41 )
42
43
44 type Client struct {
45 httpClient *http.Client
46 baseURL *url.URL
47 basicAuth *BasicAuthCfg
48 bearerToken string
49 userAgent string
50 controllers []*url.URL
51 lim *rate.Limiter
52 log interface{}
53
54 Nodes NodeProvider
55 ResourceDefinitions ResourceDefinitionProvider
56 Resources ResourceProvider
57 ResourceGroups ResourceGroupProvider
58 StoragePoolDefinitions StoragePoolDefinitionProvider
59 Encryption EncryptionProvider
60 Controller ControllerProvider
61 Events EventProvider
62 Vendor VendorProvider
63 Remote RemoteProvider
64 Backup BackupProvider
65 KeyValueStore KeyValueStoreProvider
66 Connections ConnectionProvider
67 }
68
69
70 type Logger interface {
71 Printf(string, ...interface{})
72 }
73
74
75 type LeveledLogger interface {
76 Errorf(string, ...interface{})
77 Infof(string, ...interface{})
78 Debugf(string, ...interface{})
79 Warnf(string, ...interface{})
80 }
81
82 type BasicAuthCfg struct {
83 Username, Password string
84 }
85
86
87 type clientError string
88
89 func (e clientError) Error() string { return string(e) }
90
91 const (
92
93 NotFoundError = clientError("404 Not Found")
94
95 UserCertEnv = "LS_USER_CERTIFICATE"
96
97 UserKeyEnv = "LS_USER_KEY"
98
99 RootCAEnv = "LS_ROOT_CA"
100
101 ControllerUrlEnv = "LS_CONTROLLERS"
102
103 UsernameEnv = "LS_USERNAME"
104
105 PasswordEnv = "LS_PASSWORD"
106
107 BearerTokenFileEnv = "LS_BEARER_TOKEN_FILE"
108 )
109
110
111
112
113
114
115 type Option func(*Client) error
116
117
118 func BaseURL(URL *url.URL) Option {
119 return func(c *Client) error {
120 c.baseURL = URL
121 return nil
122 }
123 }
124
125
126 func BasicAuth(basicauth *BasicAuthCfg) Option {
127 return func(c *Client) error {
128 c.basicAuth = basicauth
129 return nil
130 }
131 }
132
133
134 func HTTPClient(httpClient *http.Client) Option {
135 return func(c *Client) error {
136 c.httpClient = httpClient
137 return nil
138 }
139 }
140
141
142 func Log(logger interface{}) Option {
143 return func(c *Client) error {
144 switch logger.(type) {
145 case Logger, LeveledLogger, nil:
146 c.log = logger
147 default:
148 return errors.New("Invalid logger type, expected Logger or LeveledLogger")
149 }
150 return nil
151 }
152 }
153
154
155
156 func Limiter(limiter *rate.Limiter) Option {
157 return func(c *Client) error {
158 if limiter.Burst() == 0 && limiter.Limit() != rate.Inf {
159 return fmt.Errorf("invalid rate limit, burst must not be zero for non-unlimited rates")
160 }
161 c.lim = limiter
162 return nil
163 }
164 }
165
166
167
168
169
170 func Limit(r rate.Limit, b int) Option {
171 return Limiter(rate.NewLimiter(r, b))
172 }
173
174 func Controllers(controllers []string) Option {
175 return func(c *Client) error {
176 var err error
177 c.controllers, err = parseURLs(controllers)
178 return err
179 }
180 }
181
182
183
184 func BearerToken(token string) Option {
185 return func(c *Client) error {
186 c.bearerToken = token
187 return nil
188 }
189 }
190
191
192 func UserAgent(ua string) Option {
193 return func(c *Client) error {
194 c.userAgent = ua
195 return nil
196 }
197 }
198
199
200
201
202
203
204
205 func buildHttpClient() (*http.Client, error) {
206 certPEM, cert := os.LookupEnv(UserCertEnv)
207 keyPEM, key := os.LookupEnv(UserKeyEnv)
208 caPEM, ca := os.LookupEnv(RootCAEnv)
209
210 if key != cert {
211 return nil, fmt.Errorf("'%s', '%s': specify both or none", UserKeyEnv, UserCertEnv)
212 }
213
214 if !cert && !key && !ca {
215
216 return http.DefaultClient, nil
217 }
218
219 tlsConfig := &tls.Config{}
220
221 if ca {
222 caPool := x509.NewCertPool()
223 ok := caPool.AppendCertsFromPEM([]byte(caPEM))
224 if !ok {
225 return nil, fmt.Errorf("failed to get a valid certificate from '%s'", RootCAEnv)
226 }
227 tlsConfig.RootCAs = caPool
228 }
229
230 if key && cert {
231 keyPair, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM))
232 if err != nil {
233 return nil, fmt.Errorf("failed to load keys: %w", err)
234 }
235 tlsConfig.Certificates = append(tlsConfig.Certificates, keyPair)
236 }
237
238 return &http.Client{
239 Transport: &http.Transport{
240 TLSClientConfig: tlsConfig,
241 },
242 }, nil
243 }
244
245
246
247
248 func defaultScheme() string {
249 _, ca := os.LookupEnv(RootCAEnv)
250 _, cert := os.LookupEnv(UserCertEnv)
251 _, key := os.LookupEnv(UserKeyEnv)
252 if ca || cert || key {
253 return "https"
254 }
255 return "http"
256 }
257
258 const defaultHost = "localhost"
259
260
261
262
263
264 func defaultPort(scheme string) string {
265 if scheme == "https" {
266 return "3371"
267 }
268 return "3370"
269 }
270
271
272
273
274 func tryConnect(urls []*url.URL) (*url.URL, []error) {
275 var wg sync.WaitGroup
276 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
277 defer cancel()
278 errChan := make(chan error)
279 indexChan := make(chan int)
280 doneChan := make(chan bool)
281 wg.Add(len(urls))
282 for i := range urls {
283 i := i
284 go func() {
285 defer wg.Done()
286 conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", urls[i].Host)
287 if err != nil {
288 errChan <- err
289 return
290 }
291 cancel()
292 conn.Close()
293 indexChan <- i
294 }()
295 }
296
297 go func() {
298 wg.Wait()
299 doneChan <- true
300 }()
301
302 var errs []error
303 for {
304 select {
305 case result := <-indexChan:
306 return urls[result], nil
307 case err := <-errChan:
308 errs = append(errs, err)
309 case <-doneChan:
310 return nil, errs
311 }
312 }
313 }
314
315 func parseBaseURL(urlString string) (*url.URL, error) {
316
317 urlSplit := strings.Split(urlString, "://")
318
319 if len(urlSplit) == 1 {
320 if urlSplit[0] == "" {
321 urlSplit[0] = defaultHost
322 }
323 urlSplit = []string{defaultScheme(), urlSplit[0]}
324 }
325
326 if len(urlSplit) != 2 {
327 return nil, fmt.Errorf("URL with multiple scheme separators. parts: %v", urlSplit)
328 }
329 scheme, endpoint := urlSplit[0], urlSplit[1]
330 switch scheme {
331 case "linstor":
332 scheme = defaultScheme()
333 case "linstor+ssl":
334 scheme = "https"
335 }
336
337
338 endpointSplit := strings.Split(endpoint, ":")
339 if len(endpointSplit) == 1 {
340 endpointSplit = []string{endpointSplit[0], defaultPort(scheme)}
341 }
342 if len(endpointSplit) != 2 {
343 return nil, fmt.Errorf("URL with multiple port separators. parts: %v", endpointSplit)
344 }
345 host, port := endpointSplit[0], endpointSplit[1]
346
347 return url.Parse(fmt.Sprintf("%s://%s:%s", scheme, host, port))
348 }
349
350 func parseURLs(urls []string) ([]*url.URL, error) {
351 var result []*url.URL
352 for _, controller := range urls {
353 url, err := parseBaseURL(controller)
354 if err != nil {
355 return nil, err
356 }
357 result = append(result, url)
358 }
359
360 return result, nil
361 }
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380 func NewClient(options ...Option) (*Client, error) {
381 httpClient, err := buildHttpClient()
382 if err != nil {
383 return nil, fmt.Errorf("failed to build http client: %w", err)
384 }
385
386 c := &Client{
387 httpClient: httpClient,
388 basicAuth: &BasicAuthCfg{
389 Username: os.Getenv(UsernameEnv),
390 Password: os.Getenv(PasswordEnv),
391 },
392 lim: rate.NewLimiter(rate.Inf, 0),
393 log: log.New(os.Stderr, "", 0),
394 }
395
396 c.Nodes = &NodeService{client: c}
397 c.ResourceDefinitions = &ResourceDefinitionService{client: c}
398 c.Resources = &ResourceService{client: c}
399 c.Encryption = &EncryptionService{client: c}
400 c.ResourceGroups = &ResourceGroupService{client: c}
401 c.StoragePoolDefinitions = &StoragePoolDefinitionService{client: c}
402 c.Controller = &ControllerService{client: c}
403 c.Events = &EventService{client: c}
404 c.Vendor = &VendorService{client: c}
405 c.Remote = &RemoteService{client: c}
406 c.Backup = &BackupService{client: c}
407 c.KeyValueStore = &KeyValueStoreService{client: c}
408 c.Connections = &ConnectionService{client: c}
409
410 if path, ok := os.LookupEnv(BearerTokenFileEnv); ok {
411 token, err := os.ReadFile(path)
412 if err != nil {
413 return nil, fmt.Errorf("failed to read token from file: %w", err)
414 }
415
416 c.bearerToken = string(token)
417 }
418
419 for _, opt := range options {
420 if err := opt(c); err != nil {
421 return nil, err
422 }
423 }
424
425 if c.baseURL == nil {
426 if len(c.controllers) == 0 {
427
428 controllersStr := os.Getenv(ControllerUrlEnv)
429 if controllersStr == "" {
430
431 controllersStr = fmt.Sprintf("%v://%v:%v", defaultScheme(), defaultHost, defaultPort(defaultScheme()))
432 }
433
434 c.controllers, err = parseURLs(strings.Split(controllersStr, ","))
435 if err != nil {
436 return nil, fmt.Errorf("failed to parse controller URLs: %w", err)
437 }
438 }
439
440
441
442 if len(c.controllers) == 1 {
443 c.baseURL = c.controllers[0]
444 }
445 }
446
447 return c, nil
448 }
449
450 func (c *Client) BaseURL() *url.URL {
451 return c.baseURL
452 }
453
454 func (c *Client) newRequest(method, path string, body interface{}) (*http.Request, error) {
455 rel, err := url.Parse(path)
456 if err != nil {
457 return nil, err
458 }
459
460 if c.baseURL == nil {
461 if err := c.findRespondingController(); err != nil {
462 return nil, fmt.Errorf("failed to connect: %w", err)
463 }
464 if c.baseURL == nil {
465
466
467 return nil, fmt.Errorf("failed to determine base URL")
468 }
469 }
470 u := c.baseURL.ResolveReference(rel)
471
472 var buf io.ReadWriter
473 if body != nil {
474 buf = new(bytes.Buffer)
475 err := json.NewEncoder(buf).Encode(body)
476 if err != nil {
477 return nil, err
478 }
479 switch l := c.log.(type) {
480 case LeveledLogger:
481 l.Debugf("%s", buf)
482 case Logger:
483 l.Printf("[DEBUG] %s", body)
484 }
485 }
486
487 req, err := http.NewRequest(method, u.String(), buf)
488 if err != nil {
489 return nil, err
490 }
491
492 if body != nil {
493 req.Header.Set("Content-Type", "application/json")
494 }
495
496 if c.userAgent != "" {
497 req.Header.Set("User-Agent", c.userAgent)
498 }
499
500 req.Header.Set("Accept", "application/json")
501
502 username := c.basicAuth.Username
503 if username != "" {
504 req.SetBasicAuth(username, c.basicAuth.Password)
505 }
506
507 if c.bearerToken != "" {
508 req.Header.Set("Authorization", "Bearer "+c.bearerToken)
509 }
510
511 return req, nil
512 }
513
514 func (c *Client) curlify(req *http.Request) (string, error) {
515 cc, err := http2curl.GetCurlCommand(req)
516 if err != nil {
517 return "", err
518 }
519 return cc.String(), nil
520 }
521
522
523
524
525
526 func (c *Client) findRespondingController() error {
527 switch num := len(c.controllers); {
528 case num > 1:
529 url, errors := tryConnect(c.controllers)
530 if errors != nil {
531 logError := func(msg string) {
532 switch l := c.log.(type) {
533 case LeveledLogger:
534 l.Errorf(msg)
535 case Logger:
536 l.Printf("[ERROR] %s", msg)
537 }
538 }
539 logError("Unable to connect to any of the given controller hosts:")
540 for _, e := range errors {
541 logError(fmt.Sprintf(" - %v", e))
542 }
543 return fmt.Errorf("could not connect to any controller")
544 }
545 c.baseURL = url
546 case num == 1:
547 c.baseURL = c.controllers[0]
548 default:
549 return fmt.Errorf("no controller to connect to")
550 }
551
552 return nil
553 }
554
555 func (c *Client) logCurlify(req *http.Request) {
556 var msg string
557 if curl, err := c.curlify(req); err != nil {
558 msg = err.Error()
559 } else {
560 msg = curl
561 }
562
563 switch l := c.log.(type) {
564 case LeveledLogger:
565 l.Debugf("%s", msg)
566 case Logger:
567 l.Printf("[DEBUG] %s", msg)
568 }
569 }
570
571 func (c *Client) retry(origErr error, req *http.Request) (*http.Response, error) {
572
573 if _, ok := origErr.(net.Error); !ok || len(c.controllers) <= 1 {
574 return nil, origErr
575 }
576
577 prevBaseURL := c.baseURL
578 e := c.findRespondingController()
579
580 if e != nil && c.baseURL == prevBaseURL {
581 return nil, origErr
582 }
583
584 req.URL.Host = c.baseURL.Host
585 return c.httpClient.Do(req)
586 }
587
588 func (c *Client) do(ctx context.Context, req *http.Request, v interface{}) (*http.Response, error) {
589 if err := c.lim.Wait(ctx); err != nil {
590 return nil, err
591 }
592 req = req.WithContext(ctx)
593
594 c.logCurlify(req)
595
596 resp, err := c.httpClient.Do(req)
597 if err != nil {
598 select {
599 case <-ctx.Done():
600 return nil, ctx.Err()
601 default:
602 }
603
604
605 resp, err = c.retry(err, req)
606 if err != nil {
607 return nil, err
608 }
609 }
610 defer resp.Body.Close()
611
612 if resp.StatusCode < 200 || resp.StatusCode >= 400 {
613 msg := fmt.Sprintf("Status code not within 200 to 400, but %d (%s)\n",
614 resp.StatusCode, http.StatusText(resp.StatusCode))
615 switch l := c.log.(type) {
616 case LeveledLogger:
617 l.Debugf("%s", msg)
618 case Logger:
619 l.Printf("[DEBUG] %s", msg)
620 }
621 if resp.StatusCode == 404 {
622 return nil, NotFoundError
623 }
624
625 var rets ApiCallError
626 if err = json.NewDecoder(resp.Body).Decode(&rets); err != nil {
627 return nil, err
628 }
629 return nil, rets
630 }
631
632 if v != nil {
633 err = json.NewDecoder(resp.Body).Decode(v)
634 }
635 return resp, err
636 }
637
638
639
640 func (c *Client) doGET(ctx context.Context, url string, ret interface{}, opts ...*ListOpts) (*http.Response, error) {
641
642 u, err := addOptions(url, genOptions(opts...))
643 if err != nil {
644 return nil, err
645 }
646
647 req, err := c.newRequest("GET", u, nil)
648 if err != nil {
649 return nil, err
650 }
651 return c.do(ctx, req, ret)
652 }
653
654 func (c *Client) doEvent(ctx context.Context, url, lastEventId string) (*eventsource.Stream, error) {
655 req, err := c.newRequest("GET", url, nil)
656 if err != nil {
657 return nil, err
658 }
659 req.Header.Set("Accept", "text/event-stream")
660 req = req.WithContext(ctx)
661
662 stream, err := eventsource.SubscribeWith(lastEventId, c.httpClient, req)
663 if err != nil {
664 return nil, err
665 }
666
667 return stream, nil
668 }
669
670 func (c *Client) doPOST(ctx context.Context, url string, body interface{}) (*http.Response, error) {
671 req, err := c.newRequest("POST", url, body)
672 if err != nil {
673 return nil, err
674 }
675
676 return c.do(ctx, req, nil)
677 }
678
679 func (c *Client) doPUT(ctx context.Context, url string, body interface{}) (*http.Response, error) {
680 req, err := c.newRequest("PUT", url, body)
681 if err != nil {
682 return nil, err
683 }
684
685 return c.do(ctx, req, nil)
686 }
687
688 func (c *Client) doPATCH(ctx context.Context, url string, body interface{}) (*http.Response, error) {
689 req, err := c.newRequest("PATCH", url, body)
690 if err != nil {
691 return nil, err
692 }
693
694 return c.do(ctx, req, nil)
695 }
696
697 func (c *Client) doDELETE(ctx context.Context, url string, body interface{}) (*http.Response, error) {
698 req, err := c.newRequest("DELETE", url, body)
699 if err != nil {
700 return nil, err
701 }
702
703 return c.do(ctx, req, nil)
704 }
705
706 func (c *Client) doOPTIONS(ctx context.Context, url string, ret interface{}, body interface{}) (*http.Response, error) {
707 req, err := c.newRequest("OPTIONS", url, body)
708 if err != nil {
709 return nil, err
710 }
711
712 return c.do(ctx, req, ret)
713 }
714
715
716 type ApiCallRc struct {
717
718 RetCode int64 `json:"ret_code"`
719 Message string `json:"message"`
720
721 Cause string `json:"cause,omitempty"`
722
723 Details string `json:"details,omitempty"`
724
725 Correction string `json:"correction,omitempty"`
726
727 ErrorReportIds []string `json:"error_report_ids,omitempty"`
728
729 ObjRefs map[string]string `json:"obj_refs,omitempty"`
730 }
731
732 func (rc *ApiCallRc) String() string {
733 s := fmt.Sprintf("Message: '%s'", rc.Message)
734 if rc.Cause != "" {
735 s += fmt.Sprintf("; Cause: '%s'", rc.Cause)
736 }
737 if rc.Details != "" {
738 s += fmt.Sprintf("; Details: '%s'", rc.Details)
739 }
740 if rc.Correction != "" {
741 s += fmt.Sprintf("; Correction: '%s'", rc.Correction)
742 }
743 if len(rc.ErrorReportIds) > 0 {
744 s += fmt.Sprintf("; Reports: '[%s]'", strings.Join(rc.ErrorReportIds, ","))
745 }
746
747 return s
748 }
749
750
751 type DeleteProps []string
752
753
754 type OverrideProps map[string]string
755
756
757 type DeleteNamespaces []string
758
759
760 type GenericPropsModify struct {
761 DeleteProps DeleteProps `json:"delete_props,omitempty"`
762 OverrideProps OverrideProps `json:"override_props,omitempty"`
763 DeleteNamespaces DeleteNamespaces `json:"delete_namespaces,omitempty"`
764 }
765
View as plain text