...

Source file src/go.etcd.io/etcd/client/v3/naming/endpoints/endpoints_impl.go

Documentation: go.etcd.io/etcd/client/v3/naming/endpoints

     1  package endpoints
     2  
     3  // TODO: The API is not yet implemented.
     4  
     5  import (
     6  	"context"
     7  	"encoding/json"
     8  	"errors"
     9  	"strings"
    10  
    11  	clientv3 "go.etcd.io/etcd/client/v3"
    12  	"go.etcd.io/etcd/client/v3/naming/endpoints/internal"
    13  
    14  	"go.uber.org/zap"
    15  	"google.golang.org/grpc/codes"
    16  	"google.golang.org/grpc/status"
    17  )
    18  
    19  type endpointManager struct {
    20  	// Client is an initialized etcd client.
    21  	client *clientv3.Client
    22  	target string
    23  }
    24  
    25  // NewManager creates an endpoint manager which implements the interface of 'Manager'.
    26  func NewManager(client *clientv3.Client, target string) (Manager, error) {
    27  	if client == nil {
    28  		return nil, errors.New("invalid etcd client")
    29  	}
    30  
    31  	if target == "" {
    32  		return nil, errors.New("invalid target")
    33  	}
    34  
    35  	em := &endpointManager{
    36  		client: client,
    37  		target: target,
    38  	}
    39  	return em, nil
    40  }
    41  
    42  func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) (err error) {
    43  	ops := make([]clientv3.Op, 0, len(updates))
    44  	for _, update := range updates {
    45  		if !strings.HasPrefix(update.Key, m.target+"/") {
    46  			return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key)
    47  		}
    48  
    49  		switch update.Op {
    50  		case Add:
    51  			internalUpdate := &internal.Update{
    52  				Op:       internal.Add,
    53  				Addr:     update.Endpoint.Addr,
    54  				Metadata: update.Endpoint.Metadata,
    55  			}
    56  
    57  			var v []byte
    58  			if v, err = json.Marshal(internalUpdate); err != nil {
    59  				return status.Error(codes.InvalidArgument, err.Error())
    60  			}
    61  			ops = append(ops, clientv3.OpPut(update.Key, string(v), update.Opts...))
    62  		case Delete:
    63  			ops = append(ops, clientv3.OpDelete(update.Key, update.Opts...))
    64  		default:
    65  			return status.Error(codes.InvalidArgument, "endpoints: bad update op")
    66  		}
    67  	}
    68  	_, err = m.client.KV.Txn(ctx).Then(ops...).Commit()
    69  	return err
    70  }
    71  
    72  func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
    73  	return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)})
    74  }
    75  
    76  func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error {
    77  	return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)})
    78  }
    79  
    80  func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
    81  	key := m.target + "/"
    82  	resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
    83  	if err != nil {
    84  		return nil, err
    85  	}
    86  
    87  	lg := m.client.GetLogger()
    88  	initUpdates := make([]*Update, 0, len(resp.Kvs))
    89  	for _, kv := range resp.Kvs {
    90  		var iup internal.Update
    91  		if err := json.Unmarshal(kv.Value, &iup); err != nil {
    92  			lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err))
    93  			continue
    94  		}
    95  		up := &Update{
    96  			Op:       Add,
    97  			Key:      string(kv.Key),
    98  			Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata},
    99  		}
   100  		initUpdates = append(initUpdates, up)
   101  	}
   102  
   103  	upch := make(chan []*Update, 1)
   104  	if len(initUpdates) > 0 {
   105  		upch <- initUpdates
   106  	}
   107  	go m.watch(ctx, resp.Header.Revision+1, upch)
   108  	return upch, nil
   109  }
   110  
   111  func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
   112  	defer close(upch)
   113  
   114  	lg := m.client.GetLogger()
   115  	opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
   116  	key := m.target + "/"
   117  	wch := m.client.Watch(ctx, key, opts...)
   118  	for {
   119  		select {
   120  		case <-ctx.Done():
   121  			return
   122  		case wresp, ok := <-wch:
   123  			if !ok {
   124  				lg.Warn("watch closed", zap.String("target", m.target))
   125  				return
   126  			}
   127  			if wresp.Err() != nil {
   128  				lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err()))
   129  				return
   130  			}
   131  
   132  			deltaUps := make([]*Update, 0, len(wresp.Events))
   133  			for _, e := range wresp.Events {
   134  				var iup internal.Update
   135  				var err error
   136  				var op Operation
   137  				switch e.Type {
   138  				case clientv3.EventTypePut:
   139  					err = json.Unmarshal(e.Kv.Value, &iup)
   140  					op = Add
   141  					if err != nil {
   142  						lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err))
   143  						continue
   144  					}
   145  				case clientv3.EventTypeDelete:
   146  					iup = internal.Update{Op: internal.Delete}
   147  					op = Delete
   148  				default:
   149  					continue
   150  				}
   151  				up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
   152  				deltaUps = append(deltaUps, up)
   153  			}
   154  			if len(deltaUps) > 0 {
   155  				upch <- deltaUps
   156  			}
   157  		}
   158  	}
   159  }
   160  
   161  func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
   162  	key := m.target + "/"
   163  	resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
   164  	if err != nil {
   165  		return nil, err
   166  	}
   167  
   168  	eps := make(Key2EndpointMap)
   169  	for _, kv := range resp.Kvs {
   170  		var iup internal.Update
   171  		if err := json.Unmarshal(kv.Value, &iup); err != nil {
   172  			continue
   173  		}
   174  
   175  		eps[string(kv.Key)] = Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}
   176  	}
   177  	return eps, nil
   178  }
   179  

View as plain text