...

Source file src/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_lds.go

Documentation: google.golang.org/grpc/xds/internal/xdsclient/xdsresource

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   */
    17  
    18  package xdsresource
    19  
    20  import (
    21  	"errors"
    22  	"fmt"
    23  	"strconv"
    24  
    25  	v1xdsudpatypepb "github.com/cncf/xds/go/udpa/type/v1"
    26  	v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
    27  	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
    28  	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
    29  	v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
    30  	"google.golang.org/grpc/xds/internal/httpfilter"
    31  	"google.golang.org/protobuf/proto"
    32  	"google.golang.org/protobuf/types/known/anypb"
    33  )
    34  
    35  func unmarshalListenerResource(r *anypb.Any) (string, ListenerUpdate, error) {
    36  	r, err := UnwrapResource(r)
    37  	if err != nil {
    38  		return "", ListenerUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
    39  	}
    40  
    41  	if !IsListenerResource(r.GetTypeUrl()) {
    42  		return "", ListenerUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl())
    43  	}
    44  	lis := &v3listenerpb.Listener{}
    45  	if err := proto.Unmarshal(r.GetValue(), lis); err != nil {
    46  		return "", ListenerUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
    47  	}
    48  
    49  	lu, err := processListener(lis)
    50  	if err != nil {
    51  		return lis.GetName(), ListenerUpdate{}, err
    52  	}
    53  	lu.Raw = r
    54  	return lis.GetName(), *lu, nil
    55  }
    56  
    57  func processListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
    58  	if lis.GetApiListener() != nil {
    59  		return processClientSideListener(lis)
    60  	}
    61  	return processServerSideListener(lis)
    62  }
    63  
    64  // processClientSideListener checks if the provided Listener proto meets
    65  // the expected criteria. If so, it returns a non-empty routeConfigName.
    66  func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
    67  	update := &ListenerUpdate{}
    68  
    69  	apiLisAny := lis.GetApiListener().GetApiListener()
    70  	if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
    71  		return nil, fmt.Errorf("unexpected resource type: %q", apiLisAny.GetTypeUrl())
    72  	}
    73  	apiLis := &v3httppb.HttpConnectionManager{}
    74  	if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
    75  		return nil, fmt.Errorf("failed to unmarshal api_listener: %v", err)
    76  	}
    77  	// "HttpConnectionManager.xff_num_trusted_hops must be unset or zero and
    78  	// HttpConnectionManager.original_ip_detection_extensions must be empty. If
    79  	// either field has an incorrect value, the Listener must be NACKed." - A41
    80  	if apiLis.XffNumTrustedHops != 0 {
    81  		return nil, fmt.Errorf("xff_num_trusted_hops must be unset or zero %+v", apiLis)
    82  	}
    83  	if len(apiLis.OriginalIpDetectionExtensions) != 0 {
    84  		return nil, fmt.Errorf("original_ip_detection_extensions must be empty %+v", apiLis)
    85  	}
    86  
    87  	switch apiLis.RouteSpecifier.(type) {
    88  	case *v3httppb.HttpConnectionManager_Rds:
    89  		if configsource := apiLis.GetRds().GetConfigSource(); configsource.GetAds() == nil && configsource.GetSelf() == nil {
    90  			return nil, fmt.Errorf("LDS's RDS configSource is not ADS or Self: %+v", lis)
    91  		}
    92  		name := apiLis.GetRds().GetRouteConfigName()
    93  		if name == "" {
    94  			return nil, fmt.Errorf("empty route_config_name: %+v", lis)
    95  		}
    96  		update.RouteConfigName = name
    97  	case *v3httppb.HttpConnectionManager_RouteConfig:
    98  		routeU, err := generateRDSUpdateFromRouteConfiguration(apiLis.GetRouteConfig())
    99  		if err != nil {
   100  			return nil, fmt.Errorf("failed to parse inline RDS resp: %v", err)
   101  		}
   102  		update.InlineRouteConfig = &routeU
   103  	case nil:
   104  		return nil, fmt.Errorf("no RouteSpecifier: %+v", apiLis)
   105  	default:
   106  		return nil, fmt.Errorf("unsupported type %T for RouteSpecifier", apiLis.RouteSpecifier)
   107  	}
   108  
   109  	// The following checks and fields only apply to xDS protocol versions v3+.
   110  
   111  	update.MaxStreamDuration = apiLis.GetCommonHttpProtocolOptions().GetMaxStreamDuration().AsDuration()
   112  
   113  	var err error
   114  	if update.HTTPFilters, err = processHTTPFilters(apiLis.GetHttpFilters(), false); err != nil {
   115  		return nil, err
   116  	}
   117  
   118  	return update, nil
   119  }
   120  
   121  func unwrapHTTPFilterConfig(config *anypb.Any) (proto.Message, string, error) {
   122  	switch {
   123  	case config.MessageIs(&v3xdsxdstypepb.TypedStruct{}):
   124  		// The real type name is inside the new TypedStruct message.
   125  		s := new(v3xdsxdstypepb.TypedStruct)
   126  		if err := config.UnmarshalTo(s); err != nil {
   127  			return nil, "", fmt.Errorf("error unmarshalling TypedStruct filter config: %v", err)
   128  		}
   129  		return s, s.GetTypeUrl(), nil
   130  	case config.MessageIs(&v1xdsudpatypepb.TypedStruct{}):
   131  		// The real type name is inside the old TypedStruct message.
   132  		s := new(v1xdsudpatypepb.TypedStruct)
   133  		if err := config.UnmarshalTo(s); err != nil {
   134  			return nil, "", fmt.Errorf("error unmarshalling TypedStruct filter config: %v", err)
   135  		}
   136  		return s, s.GetTypeUrl(), nil
   137  	default:
   138  		return config, config.GetTypeUrl(), nil
   139  	}
   140  }
   141  
   142  func validateHTTPFilterConfig(cfg *anypb.Any, lds, optional bool) (httpfilter.Filter, httpfilter.FilterConfig, error) {
   143  	config, typeURL, err := unwrapHTTPFilterConfig(cfg)
   144  	if err != nil {
   145  		return nil, nil, err
   146  	}
   147  	filterBuilder := httpfilter.Get(typeURL)
   148  	if filterBuilder == nil {
   149  		if optional {
   150  			return nil, nil, nil
   151  		}
   152  		return nil, nil, fmt.Errorf("no filter implementation found for %q", typeURL)
   153  	}
   154  	parseFunc := filterBuilder.ParseFilterConfig
   155  	if !lds {
   156  		parseFunc = filterBuilder.ParseFilterConfigOverride
   157  	}
   158  	filterConfig, err := parseFunc(config)
   159  	if err != nil {
   160  		return nil, nil, fmt.Errorf("error parsing config for filter %q: %v", typeURL, err)
   161  	}
   162  	return filterBuilder, filterConfig, nil
   163  }
   164  
   165  func processHTTPFilterOverrides(cfgs map[string]*anypb.Any) (map[string]httpfilter.FilterConfig, error) {
   166  	if len(cfgs) == 0 {
   167  		return nil, nil
   168  	}
   169  	m := make(map[string]httpfilter.FilterConfig)
   170  	for name, cfg := range cfgs {
   171  		optional := false
   172  		s := new(v3routepb.FilterConfig)
   173  		if cfg.MessageIs(s) {
   174  			if err := cfg.UnmarshalTo(s); err != nil {
   175  				return nil, fmt.Errorf("filter override %q: error unmarshalling FilterConfig: %v", name, err)
   176  			}
   177  			cfg = s.GetConfig()
   178  			optional = s.GetIsOptional()
   179  		}
   180  
   181  		httpFilter, config, err := validateHTTPFilterConfig(cfg, false, optional)
   182  		if err != nil {
   183  			return nil, fmt.Errorf("filter override %q: %v", name, err)
   184  		}
   185  		if httpFilter == nil {
   186  			// Optional configs are ignored.
   187  			continue
   188  		}
   189  		m[name] = config
   190  	}
   191  	return m, nil
   192  }
   193  
   194  func processHTTPFilters(filters []*v3httppb.HttpFilter, server bool) ([]HTTPFilter, error) {
   195  	ret := make([]HTTPFilter, 0, len(filters))
   196  	seenNames := make(map[string]bool, len(filters))
   197  	for _, filter := range filters {
   198  		name := filter.GetName()
   199  		if name == "" {
   200  			return nil, errors.New("filter missing name field")
   201  		}
   202  		if seenNames[name] {
   203  			return nil, fmt.Errorf("duplicate filter name %q", name)
   204  		}
   205  		seenNames[name] = true
   206  
   207  		httpFilter, config, err := validateHTTPFilterConfig(filter.GetTypedConfig(), true, filter.GetIsOptional())
   208  		if err != nil {
   209  			return nil, err
   210  		}
   211  		if httpFilter == nil {
   212  			// Optional configs are ignored.
   213  			continue
   214  		}
   215  		if server {
   216  			if _, ok := httpFilter.(httpfilter.ServerInterceptorBuilder); !ok {
   217  				if filter.GetIsOptional() {
   218  					continue
   219  				}
   220  				return nil, fmt.Errorf("HTTP filter %q not supported server-side", name)
   221  			}
   222  		} else if _, ok := httpFilter.(httpfilter.ClientInterceptorBuilder); !ok {
   223  			if filter.GetIsOptional() {
   224  				continue
   225  			}
   226  			return nil, fmt.Errorf("HTTP filter %q not supported client-side", name)
   227  		}
   228  
   229  		// Save name/config
   230  		ret = append(ret, HTTPFilter{Name: name, Filter: httpFilter, Config: config})
   231  	}
   232  	// "Validation will fail if a terminal filter is not the last filter in the
   233  	// chain or if a non-terminal filter is the last filter in the chain." - A39
   234  	if len(ret) == 0 {
   235  		return nil, fmt.Errorf("http filters list is empty")
   236  	}
   237  	var i int
   238  	for ; i < len(ret)-1; i++ {
   239  		if ret[i].Filter.IsTerminal() {
   240  			return nil, fmt.Errorf("http filter %q is a terminal filter but it is not last in the filter chain", ret[i].Name)
   241  		}
   242  	}
   243  	if !ret[i].Filter.IsTerminal() {
   244  		return nil, fmt.Errorf("http filter %q is not a terminal filter", ret[len(ret)-1].Name)
   245  	}
   246  	return ret, nil
   247  }
   248  
   249  func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
   250  	if n := len(lis.ListenerFilters); n != 0 {
   251  		return nil, fmt.Errorf("unsupported field 'listener_filters' contains %d entries", n)
   252  	}
   253  	if useOrigDst := lis.GetUseOriginalDst(); useOrigDst != nil && useOrigDst.GetValue() {
   254  		return nil, errors.New("unsupported field 'use_original_dst' is present and set to true")
   255  	}
   256  	addr := lis.GetAddress()
   257  	if addr == nil {
   258  		return nil, fmt.Errorf("no address field in LDS response: %+v", lis)
   259  	}
   260  	sockAddr := addr.GetSocketAddress()
   261  	if sockAddr == nil {
   262  		return nil, fmt.Errorf("no socket_address field in LDS response: %+v", lis)
   263  	}
   264  	lu := &ListenerUpdate{
   265  		InboundListenerCfg: &InboundListenerConfig{
   266  			Address: sockAddr.GetAddress(),
   267  			Port:    strconv.Itoa(int(sockAddr.GetPortValue())),
   268  		},
   269  	}
   270  
   271  	fcMgr, err := NewFilterChainManager(lis)
   272  	if err != nil {
   273  		return nil, err
   274  	}
   275  	lu.InboundListenerCfg.FilterChains = fcMgr
   276  	return lu, nil
   277  }
   278  

View as plain text