...

Source file src/go.etcd.io/etcd/server/v3/proxy/httpproxy/director.go

Documentation: go.etcd.io/etcd/server/v3/proxy/httpproxy

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package httpproxy
    16  
    17  import (
    18  	"math/rand"
    19  	"net/url"
    20  	"sync"
    21  	"time"
    22  
    23  	"go.uber.org/zap"
    24  )
    25  
    26  // defaultRefreshInterval is the default proxyRefreshIntervalMs value
    27  // as in etcdmain/config.go.
    28  const defaultRefreshInterval = 30000 * time.Millisecond
    29  
    30  var once sync.Once
    31  
    32  func init() {
    33  	rand.Seed(time.Now().UnixNano())
    34  }
    35  
    36  func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
    37  	if lg == nil {
    38  		lg = zap.NewNop()
    39  	}
    40  	d := &director{
    41  		lg:          lg,
    42  		uf:          urlsFunc,
    43  		failureWait: failureWait,
    44  	}
    45  	d.refresh()
    46  	go func() {
    47  		// In order to prevent missing proxy endpoints in the first try:
    48  		// when given refresh interval of defaultRefreshInterval or greater
    49  		// and whenever there is no available proxy endpoints,
    50  		// give 1-second refreshInterval.
    51  		for {
    52  			es := d.endpoints()
    53  			ri := refreshInterval
    54  			if ri >= defaultRefreshInterval {
    55  				if len(es) == 0 {
    56  					ri = time.Second
    57  				}
    58  			}
    59  			if len(es) > 0 {
    60  				once.Do(func() {
    61  					var sl []string
    62  					for _, e := range es {
    63  						sl = append(sl, e.URL.String())
    64  					}
    65  					lg.Info("endpoints found", zap.Strings("endpoints", sl))
    66  				})
    67  			}
    68  			time.Sleep(ri)
    69  			d.refresh()
    70  		}
    71  	}()
    72  	return d
    73  }
    74  
    75  type director struct {
    76  	sync.Mutex
    77  	lg          *zap.Logger
    78  	ep          []*endpoint
    79  	uf          GetProxyURLs
    80  	failureWait time.Duration
    81  }
    82  
    83  func (d *director) refresh() {
    84  	urls := d.uf()
    85  	d.Lock()
    86  	defer d.Unlock()
    87  	var endpoints []*endpoint
    88  	for _, u := range urls {
    89  		uu, err := url.Parse(u)
    90  		if err != nil {
    91  			d.lg.Info("upstream URL invalid", zap.Error(err))
    92  			continue
    93  		}
    94  		endpoints = append(endpoints, newEndpoint(d.lg, *uu, d.failureWait))
    95  	}
    96  
    97  	// shuffle array to avoid connections being "stuck" to a single endpoint
    98  	for i := range endpoints {
    99  		j := rand.Intn(i + 1)
   100  		endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
   101  	}
   102  
   103  	d.ep = endpoints
   104  }
   105  
   106  func (d *director) endpoints() []*endpoint {
   107  	d.Lock()
   108  	defer d.Unlock()
   109  	filtered := make([]*endpoint, 0)
   110  	for _, ep := range d.ep {
   111  		if ep.Available {
   112  			filtered = append(filtered, ep)
   113  		}
   114  	}
   115  
   116  	return filtered
   117  }
   118  
   119  func newEndpoint(lg *zap.Logger, u url.URL, failureWait time.Duration) *endpoint {
   120  	ep := endpoint{
   121  		lg:        lg,
   122  		URL:       u,
   123  		Available: true,
   124  		failFunc:  timedUnavailabilityFunc(failureWait),
   125  	}
   126  
   127  	return &ep
   128  }
   129  
   130  type endpoint struct {
   131  	sync.Mutex
   132  
   133  	lg        *zap.Logger
   134  	URL       url.URL
   135  	Available bool
   136  
   137  	failFunc func(ep *endpoint)
   138  }
   139  
   140  func (ep *endpoint) Failed() {
   141  	ep.Lock()
   142  	if !ep.Available {
   143  		ep.Unlock()
   144  		return
   145  	}
   146  
   147  	ep.Available = false
   148  	ep.Unlock()
   149  
   150  	if ep.lg != nil {
   151  		ep.lg.Info("marked endpoint unavailable", zap.String("endpoint", ep.URL.String()))
   152  	}
   153  
   154  	if ep.failFunc == nil {
   155  		if ep.lg != nil {
   156  			ep.lg.Info(
   157  				"no failFunc defined, endpoint will be unavailable forever",
   158  				zap.String("endpoint", ep.URL.String()),
   159  			)
   160  		}
   161  		return
   162  	}
   163  
   164  	ep.failFunc(ep)
   165  }
   166  
   167  func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
   168  	return func(ep *endpoint) {
   169  		time.AfterFunc(wait, func() {
   170  			ep.Available = true
   171  			if ep.lg != nil {
   172  				ep.lg.Info(
   173  					"marked endpoint available, to retest connectivity",
   174  					zap.String("endpoint", ep.URL.String()),
   175  				)
   176  			}
   177  		})
   178  	}
   179  }
   180  

View as plain text