...

Source file src/google.golang.org/grpc/xds/internal/balancer/priority/balancer.go

Documentation: google.golang.org/grpc/xds/internal/balancer/priority

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package priority implements the priority balancer.
    20  //
    21  // This balancer will be kept in internal until we use it in the xds balancers,
    22  // and are confident its functionalities are stable. It will then be exported
    23  // for more users.
    24  package priority
    25  
    26  import (
    27  	"encoding/json"
    28  	"fmt"
    29  	"sync"
    30  	"time"
    31  
    32  	"google.golang.org/grpc/balancer"
    33  	"google.golang.org/grpc/balancer/base"
    34  	"google.golang.org/grpc/connectivity"
    35  	"google.golang.org/grpc/internal/balancergroup"
    36  	"google.golang.org/grpc/internal/buffer"
    37  	"google.golang.org/grpc/internal/grpclog"
    38  	"google.golang.org/grpc/internal/grpcsync"
    39  	"google.golang.org/grpc/internal/hierarchy"
    40  	"google.golang.org/grpc/internal/pretty"
    41  	"google.golang.org/grpc/resolver"
    42  	"google.golang.org/grpc/serviceconfig"
    43  )
    44  
    45  // Name is the name of the priority balancer.
    46  const Name = "priority_experimental"
    47  
    48  // DefaultSubBalancerCloseTimeout is defined as a variable instead of const for
    49  // testing.
    50  var DefaultSubBalancerCloseTimeout = 15 * time.Minute
    51  
    52  func init() {
    53  	balancer.Register(bb{})
    54  }
    55  
    56  type bb struct{}
    57  
    58  func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
    59  	b := &priorityBalancer{
    60  		cc:                       cc,
    61  		done:                     grpcsync.NewEvent(),
    62  		children:                 make(map[string]*childBalancer),
    63  		childBalancerStateUpdate: buffer.NewUnbounded(),
    64  	}
    65  
    66  	b.logger = prefixLogger(b)
    67  	b.bg = balancergroup.New(balancergroup.Options{
    68  		CC:                      cc,
    69  		BuildOpts:               bOpts,
    70  		StateAggregator:         b,
    71  		Logger:                  b.logger,
    72  		SubBalancerCloseTimeout: DefaultSubBalancerCloseTimeout,
    73  	})
    74  	b.bg.Start()
    75  	go b.run()
    76  	b.logger.Infof("Created")
    77  	return b
    78  }
    79  
    80  func (b bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    81  	return parseConfig(s)
    82  }
    83  
    84  func (bb) Name() string {
    85  	return Name
    86  }
    87  
    88  // timerWrapper wraps a timer with a boolean. So that when a race happens
    89  // between AfterFunc and Stop, the func is guaranteed to not execute.
    90  type timerWrapper struct {
    91  	stopped bool
    92  	timer   *time.Timer
    93  }
    94  
    95  type priorityBalancer struct {
    96  	logger                   *grpclog.PrefixLogger
    97  	cc                       balancer.ClientConn
    98  	bg                       *balancergroup.BalancerGroup
    99  	done                     *grpcsync.Event
   100  	childBalancerStateUpdate *buffer.Unbounded
   101  
   102  	mu         sync.Mutex
   103  	childInUse string
   104  	// priorities is a list of child names from higher to lower priority.
   105  	priorities []string
   106  	// children is a map from child name to sub-balancers.
   107  	children map[string]*childBalancer
   108  
   109  	// Set during UpdateClientConnState when calling into sub-balancers.
   110  	// Prevents child updates from recomputing the active priority or sending
   111  	// an update of the aggregated picker to the parent.  Cleared after all
   112  	// sub-balancers have finished UpdateClientConnState, after which
   113  	// syncPriority is called manually.
   114  	inhibitPickerUpdates bool
   115  }
   116  
   117  func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
   118  	if b.logger.V(2) {
   119  		b.logger.Infof("Received an update with balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
   120  	}
   121  	newConfig, ok := s.BalancerConfig.(*LBConfig)
   122  	if !ok {
   123  		return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
   124  	}
   125  	addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
   126  
   127  	b.mu.Lock()
   128  	// Create and remove children, since we know all children from the config
   129  	// are used by some priority.
   130  	for name, newSubConfig := range newConfig.Children {
   131  		bb := balancer.Get(newSubConfig.Config.Name)
   132  		if bb == nil {
   133  			b.logger.Errorf("balancer name %v from config is not registered", newSubConfig.Config.Name)
   134  			continue
   135  		}
   136  
   137  		currentChild, ok := b.children[name]
   138  		if !ok {
   139  			// This is a new child, add it to the children list. But note that
   140  			// the balancer isn't built, because this child can be a low
   141  			// priority. If necessary, it will be built when syncing priorities.
   142  			cb := newChildBalancer(name, b, bb.Name(), b.cc)
   143  			cb.updateConfig(newSubConfig, resolver.State{
   144  				Addresses:     addressesSplit[name],
   145  				ServiceConfig: s.ResolverState.ServiceConfig,
   146  				Attributes:    s.ResolverState.Attributes,
   147  			})
   148  			b.children[name] = cb
   149  			continue
   150  		}
   151  
   152  		// This is not a new child. But the config/addresses could change.
   153  
   154  		// The balancing policy name is changed, close the old child. But don't
   155  		// rebuild, rebuild will happen when syncing priorities.
   156  		if currentChild.balancerName != bb.Name() {
   157  			currentChild.stop()
   158  			currentChild.updateBalancerName(bb.Name())
   159  		}
   160  
   161  		// Update config and address, but note that this doesn't send the
   162  		// updates to non-started child balancers (the child balancer might not
   163  		// be built, if it's a low priority).
   164  		currentChild.updateConfig(newSubConfig, resolver.State{
   165  			Addresses:     addressesSplit[name],
   166  			ServiceConfig: s.ResolverState.ServiceConfig,
   167  			Attributes:    s.ResolverState.Attributes,
   168  		})
   169  	}
   170  	// Cleanup resources used by children removed from the config.
   171  	for name, oldChild := range b.children {
   172  		if _, ok := newConfig.Children[name]; !ok {
   173  			oldChild.stop()
   174  			delete(b.children, name)
   175  		}
   176  	}
   177  
   178  	// Update priorities and handle priority changes.
   179  	b.priorities = newConfig.Priorities
   180  
   181  	// Everything was removed by the update.
   182  	if len(b.priorities) == 0 {
   183  		b.childInUse = ""
   184  		b.cc.UpdateState(balancer.State{
   185  			ConnectivityState: connectivity.TransientFailure,
   186  			Picker:            base.NewErrPicker(ErrAllPrioritiesRemoved),
   187  		})
   188  		b.mu.Unlock()
   189  		return nil
   190  	}
   191  
   192  	// This will sync the states of all children to the new updated
   193  	// priorities. Includes starting/stopping child balancers when necessary.
   194  	// Block picker updates until all children have had a chance to call
   195  	// UpdateState to prevent races where, e.g., the active priority reports
   196  	// transient failure but a higher priority may have reported something that
   197  	// made it active, and if the transient failure update is handled first,
   198  	// RPCs could fail.
   199  	b.inhibitPickerUpdates = true
   200  	// Add an item to queue to notify us when the current items in the queue
   201  	// are done and syncPriority has been called.
   202  	done := make(chan struct{})
   203  	b.childBalancerStateUpdate.Put(resumePickerUpdates{done: done})
   204  	b.mu.Unlock()
   205  	<-done
   206  
   207  	return nil
   208  }
   209  
   210  func (b *priorityBalancer) ResolverError(err error) {
   211  	b.bg.ResolverError(err)
   212  }
   213  
   214  func (b *priorityBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   215  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   216  }
   217  
   218  func (b *priorityBalancer) Close() {
   219  	b.bg.Close()
   220  	b.childBalancerStateUpdate.Close()
   221  
   222  	b.mu.Lock()
   223  	defer b.mu.Unlock()
   224  	b.done.Fire()
   225  	// Clear states of the current child in use, so if there's a race in picker
   226  	// update, it will be dropped.
   227  	b.childInUse = ""
   228  	// Stop the child policies, this is necessary to stop the init timers in the
   229  	// children.
   230  	for _, child := range b.children {
   231  		child.stop()
   232  	}
   233  }
   234  
   235  func (b *priorityBalancer) ExitIdle() {
   236  	b.bg.ExitIdle()
   237  }
   238  
   239  // UpdateState implements balancergroup.BalancerStateAggregator interface. The
   240  // balancer group sends new connectivity state and picker here.
   241  func (b *priorityBalancer) UpdateState(childName string, state balancer.State) {
   242  	b.childBalancerStateUpdate.Put(childBalancerState{
   243  		name: childName,
   244  		s:    state,
   245  	})
   246  }
   247  
   248  type childBalancerState struct {
   249  	name string
   250  	s    balancer.State
   251  }
   252  
   253  type resumePickerUpdates struct {
   254  	done chan struct{}
   255  }
   256  
   257  // run handles child update in a separate goroutine, so if the child sends
   258  // updates inline (when called by parent), it won't cause deadlocks (by trying
   259  // to hold the same mutex).
   260  func (b *priorityBalancer) run() {
   261  	for {
   262  		select {
   263  		case u, ok := <-b.childBalancerStateUpdate.Get():
   264  			if !ok {
   265  				return
   266  			}
   267  			b.childBalancerStateUpdate.Load()
   268  			// Needs to handle state update in a goroutine, because each state
   269  			// update needs to start/close child policy, could result in
   270  			// deadlock.
   271  			b.mu.Lock()
   272  			if b.done.HasFired() {
   273  				return
   274  			}
   275  			switch s := u.(type) {
   276  			case childBalancerState:
   277  				b.handleChildStateUpdate(s.name, s.s)
   278  			case resumePickerUpdates:
   279  				b.inhibitPickerUpdates = false
   280  				b.syncPriority(b.childInUse)
   281  				close(s.done)
   282  			}
   283  			b.mu.Unlock()
   284  		case <-b.done.Done():
   285  			return
   286  		}
   287  	}
   288  }
   289  

View as plain text