...

Source file src/google.golang.org/grpc/resolver_wrapper.go

Documentation: google.golang.org/grpc

     1  /*
     2   *
     3   * Copyright 2017 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpc
    20  
    21  import (
    22  	"context"
    23  	"strings"
    24  	"sync"
    25  
    26  	"google.golang.org/grpc/internal/channelz"
    27  	"google.golang.org/grpc/internal/grpcsync"
    28  	"google.golang.org/grpc/internal/pretty"
    29  	"google.golang.org/grpc/resolver"
    30  	"google.golang.org/grpc/serviceconfig"
    31  )
    32  
    33  // ccResolverWrapper is a wrapper on top of cc for resolvers.
    34  // It implements resolver.ClientConn interface.
    35  type ccResolverWrapper struct {
    36  	// The following fields are initialized when the wrapper is created and are
    37  	// read-only afterwards, and therefore can be accessed without a mutex.
    38  	cc                  *ClientConn
    39  	ignoreServiceConfig bool
    40  	serializer          *grpcsync.CallbackSerializer
    41  	serializerCancel    context.CancelFunc
    42  
    43  	resolver resolver.Resolver // only accessed within the serializer
    44  
    45  	// The following fields are protected by mu.  Caller must take cc.mu before
    46  	// taking mu.
    47  	mu       sync.Mutex
    48  	curState resolver.State
    49  	closed   bool
    50  }
    51  
    52  // newCCResolverWrapper initializes the ccResolverWrapper.  It can only be used
    53  // after calling start, which builds the resolver.
    54  func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper {
    55  	ctx, cancel := context.WithCancel(cc.ctx)
    56  	return &ccResolverWrapper{
    57  		cc:                  cc,
    58  		ignoreServiceConfig: cc.dopts.disableServiceConfig,
    59  		serializer:          grpcsync.NewCallbackSerializer(ctx),
    60  		serializerCancel:    cancel,
    61  	}
    62  }
    63  
    64  // start builds the name resolver using the resolver.Builder in cc and returns
    65  // any error encountered.  It must always be the first operation performed on
    66  // any newly created ccResolverWrapper, except that close may be called instead.
    67  func (ccr *ccResolverWrapper) start() error {
    68  	errCh := make(chan error)
    69  	ccr.serializer.Schedule(func(ctx context.Context) {
    70  		if ctx.Err() != nil {
    71  			return
    72  		}
    73  		opts := resolver.BuildOptions{
    74  			DisableServiceConfig: ccr.cc.dopts.disableServiceConfig,
    75  			DialCreds:            ccr.cc.dopts.copts.TransportCredentials,
    76  			CredsBundle:          ccr.cc.dopts.copts.CredsBundle,
    77  			Dialer:               ccr.cc.dopts.copts.Dialer,
    78  			Authority:            ccr.cc.authority,
    79  		}
    80  		var err error
    81  		ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
    82  		errCh <- err
    83  	})
    84  	return <-errCh
    85  }
    86  
    87  func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
    88  	ccr.serializer.Schedule(func(ctx context.Context) {
    89  		if ctx.Err() != nil || ccr.resolver == nil {
    90  			return
    91  		}
    92  		ccr.resolver.ResolveNow(o)
    93  	})
    94  }
    95  
    96  // close initiates async shutdown of the wrapper.  To determine the wrapper has
    97  // finished shutting down, the channel should block on ccr.serializer.Done()
    98  // without cc.mu held.
    99  func (ccr *ccResolverWrapper) close() {
   100  	channelz.Info(logger, ccr.cc.channelz, "Closing the name resolver")
   101  	ccr.mu.Lock()
   102  	ccr.closed = true
   103  	ccr.mu.Unlock()
   104  
   105  	ccr.serializer.Schedule(func(context.Context) {
   106  		if ccr.resolver == nil {
   107  			return
   108  		}
   109  		ccr.resolver.Close()
   110  		ccr.resolver = nil
   111  	})
   112  	ccr.serializerCancel()
   113  }
   114  
   115  // UpdateState is called by resolver implementations to report new state to gRPC
   116  // which includes addresses and service config.
   117  func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
   118  	ccr.cc.mu.Lock()
   119  	ccr.mu.Lock()
   120  	if ccr.closed {
   121  		ccr.mu.Unlock()
   122  		ccr.cc.mu.Unlock()
   123  		return nil
   124  	}
   125  	if s.Endpoints == nil {
   126  		s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
   127  		for _, a := range s.Addresses {
   128  			ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
   129  			ep.Addresses[0].BalancerAttributes = nil
   130  			s.Endpoints = append(s.Endpoints, ep)
   131  		}
   132  	}
   133  	ccr.addChannelzTraceEvent(s)
   134  	ccr.curState = s
   135  	ccr.mu.Unlock()
   136  	return ccr.cc.updateResolverStateAndUnlock(s, nil)
   137  }
   138  
   139  // ReportError is called by resolver implementations to report errors
   140  // encountered during name resolution to gRPC.
   141  func (ccr *ccResolverWrapper) ReportError(err error) {
   142  	ccr.cc.mu.Lock()
   143  	ccr.mu.Lock()
   144  	if ccr.closed {
   145  		ccr.mu.Unlock()
   146  		ccr.cc.mu.Unlock()
   147  		return
   148  	}
   149  	ccr.mu.Unlock()
   150  	channelz.Warningf(logger, ccr.cc.channelz, "ccResolverWrapper: reporting error to cc: %v", err)
   151  	ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err)
   152  }
   153  
   154  // NewAddress is called by the resolver implementation to send addresses to
   155  // gRPC.
   156  func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
   157  	ccr.cc.mu.Lock()
   158  	ccr.mu.Lock()
   159  	if ccr.closed {
   160  		ccr.mu.Unlock()
   161  		ccr.cc.mu.Unlock()
   162  		return
   163  	}
   164  	s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}
   165  	ccr.addChannelzTraceEvent(s)
   166  	ccr.curState = s
   167  	ccr.mu.Unlock()
   168  	ccr.cc.updateResolverStateAndUnlock(s, nil)
   169  }
   170  
   171  // ParseServiceConfig is called by resolver implementations to parse a JSON
   172  // representation of the service config.
   173  func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
   174  	return parseServiceConfig(scJSON)
   175  }
   176  
   177  // addChannelzTraceEvent adds a channelz trace event containing the new
   178  // state received from resolver implementations.
   179  func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
   180  	var updates []string
   181  	var oldSC, newSC *ServiceConfig
   182  	var oldOK, newOK bool
   183  	if ccr.curState.ServiceConfig != nil {
   184  		oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
   185  	}
   186  	if s.ServiceConfig != nil {
   187  		newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
   188  	}
   189  	if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
   190  		updates = append(updates, "service config updated")
   191  	}
   192  	if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
   193  		updates = append(updates, "resolver returned an empty address list")
   194  	} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
   195  		updates = append(updates, "resolver returned new addresses")
   196  	}
   197  	channelz.Infof(logger, ccr.cc.channelz, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
   198  }
   199  

View as plain text