...

Source file src/go.etcd.io/etcd/server/v3/proxy/grpcproxy/cluster.go

Documentation: go.etcd.io/etcd/server/v3/proxy/grpcproxy

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package grpcproxy
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"os"
    22  	"sync"
    23  
    24  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    25  	"go.etcd.io/etcd/client/v3"
    26  	"go.etcd.io/etcd/client/v3/naming/endpoints"
    27  	"golang.org/x/time/rate"
    28  
    29  	"go.uber.org/zap"
    30  )
    31  
    32  // allow maximum 1 retry per second
    33  const resolveRetryRate = 1
    34  
    35  type clusterProxy struct {
    36  	lg   *zap.Logger
    37  	clus clientv3.Cluster
    38  	ctx  context.Context
    39  
    40  	// advertise client URL
    41  	advaddr string
    42  	prefix  string
    43  
    44  	em endpoints.Manager
    45  
    46  	umu  sync.RWMutex
    47  	umap map[string]endpoints.Endpoint
    48  }
    49  
    50  // NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
    51  // The returned channel is closed when there is grpc-proxy endpoint registered
    52  // and the client's context is canceled so the 'register' loop returns.
    53  // TODO: Expand the API to report creation errors
    54  func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
    55  	if lg == nil {
    56  		lg = zap.NewNop()
    57  	}
    58  
    59  	var em endpoints.Manager
    60  	if advaddr != "" && prefix != "" {
    61  		var err error
    62  		if em, err = endpoints.NewManager(c, prefix); err != nil {
    63  			lg.Error("failed to provision endpointsManager", zap.String("prefix", prefix), zap.Error(err))
    64  			return nil, nil
    65  		}
    66  	}
    67  
    68  	cp := &clusterProxy{
    69  		lg:   lg,
    70  		clus: c.Cluster,
    71  		ctx:  c.Ctx(),
    72  
    73  		advaddr: advaddr,
    74  		prefix:  prefix,
    75  		umap:    make(map[string]endpoints.Endpoint),
    76  		em:      em,
    77  	}
    78  
    79  	donec := make(chan struct{})
    80  	if em != nil {
    81  		go func() {
    82  			defer close(donec)
    83  			cp.establishEndpointWatch(prefix)
    84  		}()
    85  		return cp, donec
    86  	}
    87  
    88  	close(donec)
    89  	return cp, donec
    90  }
    91  
    92  func (cp *clusterProxy) establishEndpointWatch(prefix string) {
    93  	rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
    94  	for rm.Wait(cp.ctx) == nil {
    95  		wc, err := cp.em.NewWatchChannel(cp.ctx)
    96  		if err != nil {
    97  			cp.lg.Warn("failed to establish endpoint watch", zap.String("prefix", prefix), zap.Error(err))
    98  			continue
    99  		}
   100  		cp.monitor(wc)
   101  	}
   102  }
   103  
   104  func (cp *clusterProxy) monitor(wa endpoints.WatchChannel) {
   105  	for {
   106  		select {
   107  		case <-cp.ctx.Done():
   108  			cp.lg.Info("watching endpoints interrupted", zap.Error(cp.ctx.Err()))
   109  			return
   110  		case updates := <-wa:
   111  			cp.umu.Lock()
   112  			for _, up := range updates {
   113  				switch up.Op {
   114  				case endpoints.Add:
   115  					cp.umap[up.Key] = up.Endpoint
   116  				case endpoints.Delete:
   117  					delete(cp.umap, up.Key)
   118  				}
   119  			}
   120  			cp.umu.Unlock()
   121  		}
   122  	}
   123  }
   124  
   125  func (cp *clusterProxy) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) {
   126  	if r.IsLearner {
   127  		return cp.memberAddAsLearner(ctx, r.PeerURLs)
   128  	}
   129  	return cp.memberAdd(ctx, r.PeerURLs)
   130  }
   131  
   132  func (cp *clusterProxy) memberAdd(ctx context.Context, peerURLs []string) (*pb.MemberAddResponse, error) {
   133  	mresp, err := cp.clus.MemberAdd(ctx, peerURLs)
   134  	if err != nil {
   135  		return nil, err
   136  	}
   137  	resp := (pb.MemberAddResponse)(*mresp)
   138  	return &resp, err
   139  }
   140  
   141  func (cp *clusterProxy) memberAddAsLearner(ctx context.Context, peerURLs []string) (*pb.MemberAddResponse, error) {
   142  	mresp, err := cp.clus.MemberAddAsLearner(ctx, peerURLs)
   143  	if err != nil {
   144  		return nil, err
   145  	}
   146  	resp := (pb.MemberAddResponse)(*mresp)
   147  	return &resp, err
   148  }
   149  
   150  func (cp *clusterProxy) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
   151  	mresp, err := cp.clus.MemberRemove(ctx, r.ID)
   152  	if err != nil {
   153  		return nil, err
   154  	}
   155  	resp := (pb.MemberRemoveResponse)(*mresp)
   156  	return &resp, err
   157  }
   158  
   159  func (cp *clusterProxy) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
   160  	mresp, err := cp.clus.MemberUpdate(ctx, r.ID, r.PeerURLs)
   161  	if err != nil {
   162  		return nil, err
   163  	}
   164  	resp := (pb.MemberUpdateResponse)(*mresp)
   165  	return &resp, err
   166  }
   167  
   168  func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) {
   169  	cp.umu.RLock()
   170  	defer cp.umu.RUnlock()
   171  	mbs := make([]*pb.Member, 0, len(cp.umap))
   172  	for _, upt := range cp.umap {
   173  		m, err := decodeMeta(fmt.Sprint(upt.Metadata))
   174  		if err != nil {
   175  			return nil, err
   176  		}
   177  		mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{upt.Addr}})
   178  	}
   179  	return mbs, nil
   180  }
   181  
   182  // MemberList wraps member list API with following rules:
   183  // - If 'advaddr' is not empty and 'prefix' is not empty, return registered member lists via resolver
   184  // - If 'advaddr' is not empty and 'prefix' is not empty and registered grpc-proxy members haven't been fetched, return the 'advaddr'
   185  // - If 'advaddr' is not empty and 'prefix' is empty, return 'advaddr' without forcing it to 'register'
   186  // - If 'advaddr' is empty, forward to member list API
   187  func (cp *clusterProxy) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) {
   188  	if cp.advaddr != "" {
   189  		if cp.prefix != "" {
   190  			mbs, err := cp.membersFromUpdates()
   191  			if err != nil {
   192  				return nil, err
   193  			}
   194  			if len(mbs) > 0 {
   195  				return &pb.MemberListResponse{Members: mbs}, nil
   196  			}
   197  		}
   198  		// prefix is empty or no grpc-proxy members haven't been registered
   199  		hostname, _ := os.Hostname()
   200  		return &pb.MemberListResponse{Members: []*pb.Member{{Name: hostname, ClientURLs: []string{cp.advaddr}}}}, nil
   201  	}
   202  	mresp, err := cp.clus.MemberList(ctx)
   203  	if err != nil {
   204  		return nil, err
   205  	}
   206  	resp := (pb.MemberListResponse)(*mresp)
   207  	return &resp, err
   208  }
   209  
   210  func (cp *clusterProxy) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) {
   211  	// TODO: implement
   212  	return nil, errors.New("not implemented")
   213  }
   214  

View as plain text