...

Source file src/google.golang.org/grpc/pickfirst.go

Documentation: google.golang.org/grpc

     1  /*
     2   *
     3   * Copyright 2017 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpc
    20  
    21  import (
    22  	"encoding/json"
    23  	"errors"
    24  	"fmt"
    25  
    26  	"google.golang.org/grpc/balancer"
    27  	"google.golang.org/grpc/connectivity"
    28  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
    29  	"google.golang.org/grpc/internal/grpcrand"
    30  	"google.golang.org/grpc/internal/pretty"
    31  	"google.golang.org/grpc/resolver"
    32  	"google.golang.org/grpc/serviceconfig"
    33  )
    34  
    35  const (
    36  	// PickFirstBalancerName is the name of the pick_first balancer.
    37  	PickFirstBalancerName = "pick_first"
    38  	logPrefix             = "[pick-first-lb %p] "
    39  )
    40  
    41  type pickfirstBuilder struct{}
    42  
    43  func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
    44  	b := &pickfirstBalancer{cc: cc}
    45  	b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
    46  	return b
    47  }
    48  
    49  func (pickfirstBuilder) Name() string {
    50  	return PickFirstBalancerName
    51  }
    52  
    53  type pfConfig struct {
    54  	serviceconfig.LoadBalancingConfig `json:"-"`
    55  
    56  	// If set to true, instructs the LB policy to shuffle the order of the list
    57  	// of endpoints received from the name resolver before attempting to
    58  	// connect to them.
    59  	ShuffleAddressList bool `json:"shuffleAddressList"`
    60  }
    61  
    62  func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    63  	var cfg pfConfig
    64  	if err := json.Unmarshal(js, &cfg); err != nil {
    65  		return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
    66  	}
    67  	return cfg, nil
    68  }
    69  
    70  type pickfirstBalancer struct {
    71  	logger  *internalgrpclog.PrefixLogger
    72  	state   connectivity.State
    73  	cc      balancer.ClientConn
    74  	subConn balancer.SubConn
    75  }
    76  
    77  func (b *pickfirstBalancer) ResolverError(err error) {
    78  	if b.logger.V(2) {
    79  		b.logger.Infof("Received error from the name resolver: %v", err)
    80  	}
    81  	if b.subConn == nil {
    82  		b.state = connectivity.TransientFailure
    83  	}
    84  
    85  	if b.state != connectivity.TransientFailure {
    86  		// The picker will not change since the balancer does not currently
    87  		// report an error.
    88  		return
    89  	}
    90  	b.cc.UpdateState(balancer.State{
    91  		ConnectivityState: connectivity.TransientFailure,
    92  		Picker:            &picker{err: fmt.Errorf("name resolver error: %v", err)},
    93  	})
    94  }
    95  
    96  func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
    97  	if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
    98  		// The resolver reported an empty address list. Treat it like an error by
    99  		// calling b.ResolverError.
   100  		if b.subConn != nil {
   101  			// Shut down the old subConn. All addresses were removed, so it is
   102  			// no longer valid.
   103  			b.subConn.Shutdown()
   104  			b.subConn = nil
   105  		}
   106  		b.ResolverError(errors.New("produced zero addresses"))
   107  		return balancer.ErrBadResolverState
   108  	}
   109  	// We don't have to guard this block with the env var because ParseConfig
   110  	// already does so.
   111  	cfg, ok := state.BalancerConfig.(pfConfig)
   112  	if state.BalancerConfig != nil && !ok {
   113  		return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
   114  	}
   115  
   116  	if b.logger.V(2) {
   117  		b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
   118  	}
   119  
   120  	var addrs []resolver.Address
   121  	if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
   122  		// Perform the optional shuffling described in gRFC A62. The shuffling will
   123  		// change the order of endpoints but not touch the order of the addresses
   124  		// within each endpoint. - A61
   125  		if cfg.ShuffleAddressList {
   126  			endpoints = append([]resolver.Endpoint{}, endpoints...)
   127  			grpcrand.Shuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
   128  		}
   129  
   130  		// "Flatten the list by concatenating the ordered list of addresses for each
   131  		// of the endpoints, in order." - A61
   132  		for _, endpoint := range endpoints {
   133  			// "In the flattened list, interleave addresses from the two address
   134  			// families, as per RFC-8304 section 4." - A61
   135  			// TODO: support the above language.
   136  			addrs = append(addrs, endpoint.Addresses...)
   137  		}
   138  	} else {
   139  		// Endpoints not set, process addresses until we migrate resolver
   140  		// emissions fully to Endpoints. The top channel does wrap emitted
   141  		// addresses with endpoints, however some balancers such as weighted
   142  		// target do not forwarrd the corresponding correct endpoints down/split
   143  		// endpoints properly. Once all balancers correctly forward endpoints
   144  		// down, can delete this else conditional.
   145  		addrs = state.ResolverState.Addresses
   146  		if cfg.ShuffleAddressList {
   147  			addrs = append([]resolver.Address{}, addrs...)
   148  			grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
   149  		}
   150  	}
   151  
   152  	if b.subConn != nil {
   153  		b.cc.UpdateAddresses(b.subConn, addrs)
   154  		return nil
   155  	}
   156  
   157  	var subConn balancer.SubConn
   158  	subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
   159  		StateListener: func(state balancer.SubConnState) {
   160  			b.updateSubConnState(subConn, state)
   161  		},
   162  	})
   163  	if err != nil {
   164  		if b.logger.V(2) {
   165  			b.logger.Infof("Failed to create new SubConn: %v", err)
   166  		}
   167  		b.state = connectivity.TransientFailure
   168  		b.cc.UpdateState(balancer.State{
   169  			ConnectivityState: connectivity.TransientFailure,
   170  			Picker:            &picker{err: fmt.Errorf("error creating connection: %v", err)},
   171  		})
   172  		return balancer.ErrBadResolverState
   173  	}
   174  	b.subConn = subConn
   175  	b.state = connectivity.Idle
   176  	b.cc.UpdateState(balancer.State{
   177  		ConnectivityState: connectivity.Connecting,
   178  		Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
   179  	})
   180  	b.subConn.Connect()
   181  	return nil
   182  }
   183  
   184  // UpdateSubConnState is unused as a StateListener is always registered when
   185  // creating SubConns.
   186  func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
   187  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
   188  }
   189  
   190  func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
   191  	if b.logger.V(2) {
   192  		b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
   193  	}
   194  	if b.subConn != subConn {
   195  		if b.logger.V(2) {
   196  			b.logger.Infof("Ignored state change because subConn is not recognized")
   197  		}
   198  		return
   199  	}
   200  	if state.ConnectivityState == connectivity.Shutdown {
   201  		b.subConn = nil
   202  		return
   203  	}
   204  
   205  	switch state.ConnectivityState {
   206  	case connectivity.Ready:
   207  		b.cc.UpdateState(balancer.State{
   208  			ConnectivityState: state.ConnectivityState,
   209  			Picker:            &picker{result: balancer.PickResult{SubConn: subConn}},
   210  		})
   211  	case connectivity.Connecting:
   212  		if b.state == connectivity.TransientFailure {
   213  			// We stay in TransientFailure until we are Ready. See A62.
   214  			return
   215  		}
   216  		b.cc.UpdateState(balancer.State{
   217  			ConnectivityState: state.ConnectivityState,
   218  			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
   219  		})
   220  	case connectivity.Idle:
   221  		if b.state == connectivity.TransientFailure {
   222  			// We stay in TransientFailure until we are Ready. Also kick the
   223  			// subConn out of Idle into Connecting. See A62.
   224  			b.subConn.Connect()
   225  			return
   226  		}
   227  		b.cc.UpdateState(balancer.State{
   228  			ConnectivityState: state.ConnectivityState,
   229  			Picker:            &idlePicker{subConn: subConn},
   230  		})
   231  	case connectivity.TransientFailure:
   232  		b.cc.UpdateState(balancer.State{
   233  			ConnectivityState: state.ConnectivityState,
   234  			Picker:            &picker{err: state.ConnectionError},
   235  		})
   236  	}
   237  	b.state = state.ConnectivityState
   238  }
   239  
   240  func (b *pickfirstBalancer) Close() {
   241  }
   242  
   243  func (b *pickfirstBalancer) ExitIdle() {
   244  	if b.subConn != nil && b.state == connectivity.Idle {
   245  		b.subConn.Connect()
   246  	}
   247  }
   248  
   249  type picker struct {
   250  	result balancer.PickResult
   251  	err    error
   252  }
   253  
   254  func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
   255  	return p.result, p.err
   256  }
   257  
   258  // idlePicker is used when the SubConn is IDLE and kicks the SubConn into
   259  // CONNECTING when Pick is called.
   260  type idlePicker struct {
   261  	subConn balancer.SubConn
   262  }
   263  
   264  func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
   265  	i.subConn.Connect()
   266  	return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
   267  }
   268  

View as plain text