...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterresolver

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package clusterresolver
    20  
    21  import (
    22  	"encoding/json"
    23  	"fmt"
    24  	"sort"
    25  
    26  	"google.golang.org/grpc/balancer/weightedroundrobin"
    27  	"google.golang.org/grpc/internal/hierarchy"
    28  	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
    29  	"google.golang.org/grpc/resolver"
    30  	"google.golang.org/grpc/xds/internal"
    31  	"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
    32  	"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
    33  	"google.golang.org/grpc/xds/internal/balancer/priority"
    34  	"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
    35  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    36  )
    37  
    38  const million = 1000000
    39  
    40  // priorityConfig is config for one priority. For example, if there an EDS and a
    41  // DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}].
    42  //
    43  // Each priorityConfig corresponds to one discovery mechanism from the LBConfig
    44  // generated by the CDS balancer. The CDS balancer resolves the cluster name to
    45  // an ordered list of discovery mechanisms (if the top cluster is an aggregated
    46  // cluster), one for each underlying cluster.
    47  type priorityConfig struct {
    48  	mechanism DiscoveryMechanism
    49  	// edsResp is set only if type is EDS.
    50  	edsResp xdsresource.EndpointsUpdate
    51  	// addresses is set only if type is DNS.
    52  	addresses []string
    53  	// Each discovery mechanism has a name generator so that the child policies
    54  	// can reuse names between updates (EDS updates for example).
    55  	childNameGen *nameGenerator
    56  }
    57  
    58  // buildPriorityConfigJSON builds balancer config for the passed in
    59  // priorities.
    60  //
    61  // The built tree of balancers (see test for the output struct).
    62  //
    63  //	          ┌────────┐
    64  //	          │priority│
    65  //	          └┬──────┬┘
    66  //	           │      │
    67  //	┌──────────▼─┐  ┌─▼──────────┐
    68  //	│cluster_impl│  │cluster_impl│
    69  //	└──────┬─────┘  └─────┬──────┘
    70  //	       │              │
    71  //	┌──────▼─────┐  ┌─────▼──────┐
    72  //	│xDSLBPolicy │  │xDSLBPolicy │ (Locality and Endpoint picking layer)
    73  //	└────────────┘  └────────────┘
    74  func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) {
    75  	pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy)
    76  	if err != nil {
    77  		return nil, nil, fmt.Errorf("failed to build priority config: %v", err)
    78  	}
    79  	ret, err := json.Marshal(pc)
    80  	if err != nil {
    81  		return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err)
    82  	}
    83  	return ret, addrs, nil
    84  }
    85  
    86  func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) {
    87  	var (
    88  		retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
    89  		retAddrs  []resolver.Address
    90  	)
    91  	for _, p := range priorities {
    92  		switch p.mechanism.Type {
    93  		case DiscoveryMechanismTypeEDS:
    94  			names, configs, addrs, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy)
    95  			if err != nil {
    96  				return nil, nil, err
    97  			}
    98  			retConfig.Priorities = append(retConfig.Priorities, names...)
    99  			retAddrs = append(retAddrs, addrs...)
   100  			odCfgs := convertClusterImplMapToOutlierDetection(configs, p.mechanism.outlierDetection)
   101  			for n, c := range odCfgs {
   102  				retConfig.Children[n] = &priority.Child{
   103  					Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: c},
   104  					// Ignore all re-resolution from EDS children.
   105  					IgnoreReresolutionRequests: true,
   106  				}
   107  			}
   108  			continue
   109  		case DiscoveryMechanismTypeLogicalDNS:
   110  			name, config, addrs := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism)
   111  			retConfig.Priorities = append(retConfig.Priorities, name)
   112  			retAddrs = append(retAddrs, addrs...)
   113  			odCfg := makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection)
   114  			retConfig.Children[name] = &priority.Child{
   115  				Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: odCfg},
   116  				// Not ignore re-resolution from DNS children, they will trigger
   117  				// DNS to re-resolve.
   118  				IgnoreReresolutionRequests: false,
   119  			}
   120  			continue
   121  		}
   122  	}
   123  	return retConfig, retAddrs, nil
   124  }
   125  
   126  func convertClusterImplMapToOutlierDetection(ciCfgs map[string]*clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) map[string]*outlierdetection.LBConfig {
   127  	odCfgs := make(map[string]*outlierdetection.LBConfig, len(ciCfgs))
   128  	for n, c := range ciCfgs {
   129  		odCfgs[n] = makeClusterImplOutlierDetectionChild(c, odCfg)
   130  	}
   131  	return odCfgs
   132  }
   133  
   134  func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) *outlierdetection.LBConfig {
   135  	odCfgRet := odCfg
   136  	odCfgRet.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: ciCfg}
   137  	return &odCfgRet
   138  }
   139  
   140  func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) {
   141  	// Endpoint picking policy for DNS is hardcoded to pick_first.
   142  	const childPolicy = "pick_first"
   143  	retAddrs := make([]resolver.Address, 0, len(addrStrs))
   144  	pName := fmt.Sprintf("priority-%v", g.prefix)
   145  	for _, addrStr := range addrStrs {
   146  		retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
   147  	}
   148  	return pName, &clusterimpl.LBConfig{
   149  		Cluster:         mechanism.Cluster,
   150  		TelemetryLabels: mechanism.TelemetryLabels,
   151  		ChildPolicy:     &internalserviceconfig.BalancerConfig{Name: childPolicy},
   152  	}, retAddrs
   153  }
   154  
   155  // buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for
   156  // each priority, sorted by priority, and the addresses for each priority (with
   157  // hierarchy attributes set).
   158  //
   159  // For example, if there are two priorities, the returned values will be
   160  // - ["p0", "p1"]
   161  // - map{"p0":p0_config, "p1":p1_config}
   162  // - [p0_address_0, p0_address_1, p1_address_0, p1_address_1]
   163  //   - p0 addresses' hierarchy attributes are set to p0
   164  func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) {
   165  	drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops))
   166  	for _, d := range edsResp.Drops {
   167  		drops = append(drops, clusterimpl.DropConfig{
   168  			Category:           d.Category,
   169  			RequestsPerMillion: d.Numerator * million / d.Denominator,
   170  		})
   171  	}
   172  
   173  	// Localities of length 0 is triggered by an NACK or resource-not-found
   174  	// error before update, or a empty localities list in a update. In either
   175  	// case want to create a priority, and send down empty address list, causing
   176  	// TF for that priority. "If any discovery mechanism instance experiences an
   177  	// error retrieving data, and it has not previously reported any results, it
   178  	// should report a result that is a single priority with no endpoints." -
   179  	// A37
   180  	priorities := [][]xdsresource.Locality{{}}
   181  	if len(edsResp.Localities) != 0 {
   182  		priorities = groupLocalitiesByPriority(edsResp.Localities)
   183  	}
   184  	retNames := g.generate(priorities)
   185  	retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames))
   186  	var retAddrs []resolver.Address
   187  	for i, pName := range retNames {
   188  		priorityLocalities := priorities[i]
   189  		cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy)
   190  		if err != nil {
   191  			return nil, nil, nil, err
   192  		}
   193  		retConfigs[pName] = cfg
   194  		retAddrs = append(retAddrs, addrs...)
   195  	}
   196  	return retNames, retConfigs, retAddrs, nil
   197  }
   198  
   199  // groupLocalitiesByPriority returns the localities grouped by priority.
   200  //
   201  // The returned list is sorted from higher priority to lower. Each item in the
   202  // list is a group of localities.
   203  //
   204  // For example, for L0-p0, L1-p0, L2-p1, results will be
   205  // - [[L0, L1], [L2]]
   206  func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresource.Locality {
   207  	var priorityIntSlice []int
   208  	priorities := make(map[int][]xdsresource.Locality)
   209  	for _, locality := range localities {
   210  		priority := int(locality.Priority)
   211  		priorities[priority] = append(priorities[priority], locality)
   212  		priorityIntSlice = append(priorityIntSlice, priority)
   213  	}
   214  	// Sort the priorities based on the int value, deduplicate, and then turn
   215  	// the sorted list into a string list. This will be child names, in priority
   216  	// order.
   217  	sort.Ints(priorityIntSlice)
   218  	priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice)
   219  	ret := make([][]xdsresource.Locality, 0, len(priorityIntSliceDeduped))
   220  	for _, p := range priorityIntSliceDeduped {
   221  		ret = append(ret, priorities[p])
   222  	}
   223  	return ret
   224  }
   225  
   226  func dedupSortedIntSlice(a []int) []int {
   227  	if len(a) == 0 {
   228  		return a
   229  	}
   230  	i, j := 0, 1
   231  	for ; j < len(a); j++ {
   232  		if a[i] == a[j] {
   233  			continue
   234  		}
   235  		i++
   236  		if i != j {
   237  			a[i] = a[j]
   238  		}
   239  	}
   240  	return a[:i+1]
   241  }
   242  
   243  // priorityLocalitiesToClusterImpl takes a list of localities (with the same
   244  // priority), and generates a cluster impl policy config, and a list of
   245  // addresses with their path hierarchy set to [priority-name, locality-name], so
   246  // priority and the xDS LB Policy know which child policy each address is for.
   247  func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) {
   248  	var addrs []resolver.Address
   249  	for _, locality := range localities {
   250  		var lw uint32 = 1
   251  		if locality.Weight != 0 {
   252  			lw = locality.Weight
   253  		}
   254  		localityStr, err := locality.ID.ToString()
   255  		if err != nil {
   256  			localityStr = fmt.Sprintf("%+v", locality.ID)
   257  		}
   258  		for _, endpoint := range locality.Endpoints {
   259  			// Filter out all "unhealthy" endpoints (unknown and healthy are
   260  			// both considered to be healthy:
   261  			// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
   262  			if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
   263  				continue
   264  			}
   265  			addr := resolver.Address{Addr: endpoint.Address}
   266  			addr = hierarchy.Set(addr, []string{priorityName, localityStr})
   267  			addr = internal.SetLocalityID(addr, locality.ID)
   268  			// "To provide the xds_wrr_locality load balancer information about
   269  			// locality weights received from EDS, the cluster resolver will
   270  			// populate a new locality weight attribute for each address The
   271  			// attribute will have the weight (as an integer) of the locality
   272  			// the address is part of." - A52
   273  			addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: lw})
   274  			var ew uint32 = 1
   275  			if endpoint.Weight != 0 {
   276  				ew = endpoint.Weight
   277  			}
   278  			addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew})
   279  			addrs = append(addrs, addr)
   280  		}
   281  	}
   282  	return &clusterimpl.LBConfig{
   283  		Cluster:               mechanism.Cluster,
   284  		EDSServiceName:        mechanism.EDSServiceName,
   285  		LoadReportingServer:   mechanism.LoadReportingServer,
   286  		MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
   287  		TelemetryLabels:       mechanism.TelemetryLabels,
   288  		DropCategories:        drops,
   289  		ChildPolicy:           xdsLBPolicy,
   290  	}, addrs, nil
   291  }
   292  

View as plain text