...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/consulwatch/connectwatcher.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/consulwatch

     1  package consulwatch
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  
     8  	"github.com/hashicorp/consul/api"
     9  	"github.com/hashicorp/consul/api/watch"
    10  
    11  	"github.com/datawire/dlib/dlog"
    12  )
    13  
    14  type ConnectLeafWatcher struct {
    15  	consul *api.Client
    16  	plan   *watch.Plan
    17  }
    18  
    19  func NewConnectLeafWatcher(consul *api.Client, service string) (*ConnectLeafWatcher, error) {
    20  	if service == "" {
    21  		err := errors.New("service name is empty")
    22  		return nil, err
    23  	}
    24  
    25  	watcher := &ConnectLeafWatcher{consul: consul}
    26  
    27  	plan, err := watch.Parse(map[string]interface{}{"type": "connect_leaf", "service": service})
    28  	if err != nil {
    29  		return nil, err
    30  	}
    31  
    32  	watcher.plan = plan
    33  
    34  	return watcher, nil
    35  }
    36  
    37  func (w *ConnectLeafWatcher) Watch(handler func(*Certificate, error)) {
    38  	w.plan.HybridHandler = func(val watch.BlockingParamVal, raw interface{}) {
    39  		if raw == nil {
    40  			handler(nil, fmt.Errorf("unexpected empty/nil response from consul"))
    41  			return
    42  		}
    43  
    44  		v, ok := raw.(*api.LeafCert)
    45  		if !ok {
    46  			handler(nil, fmt.Errorf("unexpected raw type. expected: %T, was: %T", &api.LeafCert{}, raw))
    47  			return
    48  		}
    49  
    50  		certificate := &Certificate{
    51  			PEM:           v.CertPEM,
    52  			PrivateKeyPEM: v.PrivateKeyPEM,
    53  			ValidBefore:   v.ValidBefore,
    54  			ValidAfter:    v.ValidAfter,
    55  			SerialNumber:  v.SerialNumber,
    56  			Service:       v.Service,
    57  			ServiceURI:    v.ServiceURI,
    58  		}
    59  
    60  		handler(certificate, nil)
    61  	}
    62  }
    63  
    64  func (w *ConnectLeafWatcher) Start(ctx context.Context) error {
    65  	return w.plan.RunWithClientAndLogger(w.consul, dlog.StdLogger(ctx, dlog.LogLevelInfo))
    66  }
    67  
    68  func (w *ConnectLeafWatcher) Stop() {
    69  	w.plan.Stop()
    70  }
    71  
    72  // ConnectCARootsWatcher watches the Consul Connect CA roots endpoint for changes and invokes a a handler function
    73  // whenever it changes.
    74  type ConnectCARootsWatcher struct {
    75  	consul *api.Client
    76  	plan   *watch.Plan
    77  }
    78  
    79  func NewConnectCARootsWatcher(consul *api.Client) (*ConnectCARootsWatcher, error) {
    80  	watcher := &ConnectCARootsWatcher{consul: consul}
    81  
    82  	plan, err := watch.Parse(map[string]interface{}{"type": "connect_roots"})
    83  	if err != nil {
    84  		return nil, err
    85  	}
    86  
    87  	watcher.plan = plan
    88  
    89  	return watcher, nil
    90  }
    91  
    92  func (w *ConnectCARootsWatcher) Watch(handler func(*CARoots, error)) {
    93  	w.plan.HybridHandler = func(val watch.BlockingParamVal, raw interface{}) {
    94  		if raw == nil {
    95  			handler(nil, fmt.Errorf("unexpected empty/nil response from consul"))
    96  			return
    97  		}
    98  
    99  		v, ok := raw.(*api.CARootList)
   100  		if !ok {
   101  			handler(nil, fmt.Errorf("unexpected raw type. expected: %T, was: %T", &api.CARootList{}, raw))
   102  			return
   103  		}
   104  
   105  		rootsMap := make(map[string]CARoot)
   106  		for _, root := range v.Roots {
   107  			rootsMap[root.ID] = CARoot{
   108  				ID:     root.ID,
   109  				Name:   root.Name,
   110  				PEM:    root.RootCertPEM,
   111  				Active: root.Active,
   112  			}
   113  		}
   114  
   115  		roots := &CARoots{
   116  			ActiveRootID: v.ActiveRootID,
   117  			TrustDomain:  v.TrustDomain,
   118  			Roots:        rootsMap,
   119  		}
   120  
   121  		handler(roots, nil)
   122  	}
   123  }
   124  
   125  func (w *ConnectCARootsWatcher) Start(ctx context.Context) error {
   126  	return w.plan.RunWithClientAndLogger(w.consul, dlog.StdLogger(ctx, dlog.LogLevelInfo))
   127  }
   128  
   129  func (w *ConnectCARootsWatcher) Stop() {
   130  	w.plan.Stop()
   131  }
   132  

View as plain text