...

Source file src/google.golang.org/grpc/picker_wrapper.go

Documentation: google.golang.org/grpc

     1  /*
     2   *
     3   * Copyright 2017 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpc
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"io"
    25  	"sync"
    26  
    27  	"google.golang.org/grpc/balancer"
    28  	"google.golang.org/grpc/codes"
    29  	"google.golang.org/grpc/internal/channelz"
    30  	istatus "google.golang.org/grpc/internal/status"
    31  	"google.golang.org/grpc/internal/transport"
    32  	"google.golang.org/grpc/stats"
    33  	"google.golang.org/grpc/status"
    34  )
    35  
    36  // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
    37  // actions and unblock when there's a picker update.
    38  type pickerWrapper struct {
    39  	mu            sync.Mutex
    40  	done          bool
    41  	blockingCh    chan struct{}
    42  	picker        balancer.Picker
    43  	statsHandlers []stats.Handler // to record blocking picker calls
    44  }
    45  
    46  func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
    47  	return &pickerWrapper{
    48  		blockingCh:    make(chan struct{}),
    49  		statsHandlers: statsHandlers,
    50  	}
    51  }
    52  
    53  // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
    54  func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
    55  	pw.mu.Lock()
    56  	if pw.done {
    57  		pw.mu.Unlock()
    58  		return
    59  	}
    60  	pw.picker = p
    61  	// pw.blockingCh should never be nil.
    62  	close(pw.blockingCh)
    63  	pw.blockingCh = make(chan struct{})
    64  	pw.mu.Unlock()
    65  }
    66  
    67  // doneChannelzWrapper performs the following:
    68  //   - increments the calls started channelz counter
    69  //   - wraps the done function in the passed in result to increment the calls
    70  //     failed or calls succeeded channelz counter before invoking the actual
    71  //     done function.
    72  func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
    73  	ac := acbw.ac
    74  	ac.incrCallsStarted()
    75  	done := result.Done
    76  	result.Done = func(b balancer.DoneInfo) {
    77  		if b.Err != nil && b.Err != io.EOF {
    78  			ac.incrCallsFailed()
    79  		} else {
    80  			ac.incrCallsSucceeded()
    81  		}
    82  		if done != nil {
    83  			done(b)
    84  		}
    85  	}
    86  }
    87  
    88  // pick returns the transport that will be used for the RPC.
    89  // It may block in the following cases:
    90  // - there's no picker
    91  // - the current picker returns ErrNoSubConnAvailable
    92  // - the current picker returns other errors and failfast is false.
    93  // - the subConn returned by the current picker is not READY
    94  // When one of these situations happens, pick blocks until the picker gets updated.
    95  func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
    96  	var ch chan struct{}
    97  
    98  	var lastPickErr error
    99  
   100  	for {
   101  		pw.mu.Lock()
   102  		if pw.done {
   103  			pw.mu.Unlock()
   104  			return nil, balancer.PickResult{}, ErrClientConnClosing
   105  		}
   106  
   107  		if pw.picker == nil {
   108  			ch = pw.blockingCh
   109  		}
   110  		if ch == pw.blockingCh {
   111  			// This could happen when either:
   112  			// - pw.picker is nil (the previous if condition), or
   113  			// - has called pick on the current picker.
   114  			pw.mu.Unlock()
   115  			select {
   116  			case <-ctx.Done():
   117  				var errStr string
   118  				if lastPickErr != nil {
   119  					errStr = "latest balancer error: " + lastPickErr.Error()
   120  				} else {
   121  					errStr = fmt.Sprintf("received context error while waiting for new LB policy update: %s", ctx.Err().Error())
   122  				}
   123  				switch ctx.Err() {
   124  				case context.DeadlineExceeded:
   125  					return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
   126  				case context.Canceled:
   127  					return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
   128  				}
   129  			case <-ch:
   130  			}
   131  			continue
   132  		}
   133  
   134  		// If the channel is set, it means that the pick call had to wait for a
   135  		// new picker at some point. Either it's the first iteration and this
   136  		// function received the first picker, or a picker errored with
   137  		// ErrNoSubConnAvailable or errored with failfast set to false, which
   138  		// will trigger a continue to the next iteration. In the first case this
   139  		// conditional will hit if this call had to block (the channel is set).
   140  		// In the second case, the only way it will get to this conditional is
   141  		// if there is a new picker.
   142  		if ch != nil {
   143  			for _, sh := range pw.statsHandlers {
   144  				sh.HandleRPC(ctx, &stats.PickerUpdated{})
   145  			}
   146  		}
   147  
   148  		ch = pw.blockingCh
   149  		p := pw.picker
   150  		pw.mu.Unlock()
   151  
   152  		pickResult, err := p.Pick(info)
   153  		if err != nil {
   154  			if err == balancer.ErrNoSubConnAvailable {
   155  				continue
   156  			}
   157  			if st, ok := status.FromError(err); ok {
   158  				// Status error: end the RPC unconditionally with this status.
   159  				// First restrict the code to the list allowed by gRFC A54.
   160  				if istatus.IsRestrictedControlPlaneCode(st) {
   161  					err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
   162  				}
   163  				return nil, balancer.PickResult{}, dropError{error: err}
   164  			}
   165  			// For all other errors, wait for ready RPCs should block and other
   166  			// RPCs should fail with unavailable.
   167  			if !failfast {
   168  				lastPickErr = err
   169  				continue
   170  			}
   171  			return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
   172  		}
   173  
   174  		acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
   175  		if !ok {
   176  			logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
   177  			continue
   178  		}
   179  		if t := acbw.ac.getReadyTransport(); t != nil {
   180  			if channelz.IsOn() {
   181  				doneChannelzWrapper(acbw, &pickResult)
   182  				return t, pickResult, nil
   183  			}
   184  			return t, pickResult, nil
   185  		}
   186  		if pickResult.Done != nil {
   187  			// Calling done with nil error, no bytes sent and no bytes received.
   188  			// DoneInfo with default value works.
   189  			pickResult.Done(balancer.DoneInfo{})
   190  		}
   191  		logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
   192  		// If ok == false, ac.state is not READY.
   193  		// A valid picker always returns READY subConn. This means the state of ac
   194  		// just changed, and picker will be updated shortly.
   195  		// continue back to the beginning of the for loop to repick.
   196  	}
   197  }
   198  
   199  func (pw *pickerWrapper) close() {
   200  	pw.mu.Lock()
   201  	defer pw.mu.Unlock()
   202  	if pw.done {
   203  		return
   204  	}
   205  	pw.done = true
   206  	close(pw.blockingCh)
   207  }
   208  
   209  // reset clears the pickerWrapper and prepares it for being used again when idle
   210  // mode is exited.
   211  func (pw *pickerWrapper) reset() {
   212  	pw.mu.Lock()
   213  	defer pw.mu.Unlock()
   214  	if pw.done {
   215  		return
   216  	}
   217  	pw.blockingCh = make(chan struct{})
   218  }
   219  
   220  // dropError is a wrapper error that indicates the LB policy wishes to drop the
   221  // RPC and not retry it.
   222  type dropError struct {
   223  	error
   224  }
   225  

View as plain text