1
17
18 package xdsresource
19
20 import (
21 "errors"
22 "fmt"
23 "strconv"
24
25 v1xdsudpatypepb "github.com/cncf/xds/go/udpa/type/v1"
26 v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
27 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
28 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
29 v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
30 "google.golang.org/grpc/xds/internal/httpfilter"
31 "google.golang.org/protobuf/proto"
32 "google.golang.org/protobuf/types/known/anypb"
33 )
34
35 func unmarshalListenerResource(r *anypb.Any) (string, ListenerUpdate, error) {
36 r, err := UnwrapResource(r)
37 if err != nil {
38 return "", ListenerUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
39 }
40
41 if !IsListenerResource(r.GetTypeUrl()) {
42 return "", ListenerUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl())
43 }
44 lis := &v3listenerpb.Listener{}
45 if err := proto.Unmarshal(r.GetValue(), lis); err != nil {
46 return "", ListenerUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
47 }
48
49 lu, err := processListener(lis)
50 if err != nil {
51 return lis.GetName(), ListenerUpdate{}, err
52 }
53 lu.Raw = r
54 return lis.GetName(), *lu, nil
55 }
56
57 func processListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
58 if lis.GetApiListener() != nil {
59 return processClientSideListener(lis)
60 }
61 return processServerSideListener(lis)
62 }
63
64
65
66 func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
67 update := &ListenerUpdate{}
68
69 apiLisAny := lis.GetApiListener().GetApiListener()
70 if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
71 return nil, fmt.Errorf("unexpected resource type: %q", apiLisAny.GetTypeUrl())
72 }
73 apiLis := &v3httppb.HttpConnectionManager{}
74 if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
75 return nil, fmt.Errorf("failed to unmarshal api_listener: %v", err)
76 }
77
78
79
80 if apiLis.XffNumTrustedHops != 0 {
81 return nil, fmt.Errorf("xff_num_trusted_hops must be unset or zero %+v", apiLis)
82 }
83 if len(apiLis.OriginalIpDetectionExtensions) != 0 {
84 return nil, fmt.Errorf("original_ip_detection_extensions must be empty %+v", apiLis)
85 }
86
87 switch apiLis.RouteSpecifier.(type) {
88 case *v3httppb.HttpConnectionManager_Rds:
89 if configsource := apiLis.GetRds().GetConfigSource(); configsource.GetAds() == nil && configsource.GetSelf() == nil {
90 return nil, fmt.Errorf("LDS's RDS configSource is not ADS or Self: %+v", lis)
91 }
92 name := apiLis.GetRds().GetRouteConfigName()
93 if name == "" {
94 return nil, fmt.Errorf("empty route_config_name: %+v", lis)
95 }
96 update.RouteConfigName = name
97 case *v3httppb.HttpConnectionManager_RouteConfig:
98 routeU, err := generateRDSUpdateFromRouteConfiguration(apiLis.GetRouteConfig())
99 if err != nil {
100 return nil, fmt.Errorf("failed to parse inline RDS resp: %v", err)
101 }
102 update.InlineRouteConfig = &routeU
103 case nil:
104 return nil, fmt.Errorf("no RouteSpecifier: %+v", apiLis)
105 default:
106 return nil, fmt.Errorf("unsupported type %T for RouteSpecifier", apiLis.RouteSpecifier)
107 }
108
109
110
111 update.MaxStreamDuration = apiLis.GetCommonHttpProtocolOptions().GetMaxStreamDuration().AsDuration()
112
113 var err error
114 if update.HTTPFilters, err = processHTTPFilters(apiLis.GetHttpFilters(), false); err != nil {
115 return nil, err
116 }
117
118 return update, nil
119 }
120
121 func unwrapHTTPFilterConfig(config *anypb.Any) (proto.Message, string, error) {
122 switch {
123 case config.MessageIs(&v3xdsxdstypepb.TypedStruct{}):
124
125 s := new(v3xdsxdstypepb.TypedStruct)
126 if err := config.UnmarshalTo(s); err != nil {
127 return nil, "", fmt.Errorf("error unmarshalling TypedStruct filter config: %v", err)
128 }
129 return s, s.GetTypeUrl(), nil
130 case config.MessageIs(&v1xdsudpatypepb.TypedStruct{}):
131
132 s := new(v1xdsudpatypepb.TypedStruct)
133 if err := config.UnmarshalTo(s); err != nil {
134 return nil, "", fmt.Errorf("error unmarshalling TypedStruct filter config: %v", err)
135 }
136 return s, s.GetTypeUrl(), nil
137 default:
138 return config, config.GetTypeUrl(), nil
139 }
140 }
141
142 func validateHTTPFilterConfig(cfg *anypb.Any, lds, optional bool) (httpfilter.Filter, httpfilter.FilterConfig, error) {
143 config, typeURL, err := unwrapHTTPFilterConfig(cfg)
144 if err != nil {
145 return nil, nil, err
146 }
147 filterBuilder := httpfilter.Get(typeURL)
148 if filterBuilder == nil {
149 if optional {
150 return nil, nil, nil
151 }
152 return nil, nil, fmt.Errorf("no filter implementation found for %q", typeURL)
153 }
154 parseFunc := filterBuilder.ParseFilterConfig
155 if !lds {
156 parseFunc = filterBuilder.ParseFilterConfigOverride
157 }
158 filterConfig, err := parseFunc(config)
159 if err != nil {
160 return nil, nil, fmt.Errorf("error parsing config for filter %q: %v", typeURL, err)
161 }
162 return filterBuilder, filterConfig, nil
163 }
164
165 func processHTTPFilterOverrides(cfgs map[string]*anypb.Any) (map[string]httpfilter.FilterConfig, error) {
166 if len(cfgs) == 0 {
167 return nil, nil
168 }
169 m := make(map[string]httpfilter.FilterConfig)
170 for name, cfg := range cfgs {
171 optional := false
172 s := new(v3routepb.FilterConfig)
173 if cfg.MessageIs(s) {
174 if err := cfg.UnmarshalTo(s); err != nil {
175 return nil, fmt.Errorf("filter override %q: error unmarshalling FilterConfig: %v", name, err)
176 }
177 cfg = s.GetConfig()
178 optional = s.GetIsOptional()
179 }
180
181 httpFilter, config, err := validateHTTPFilterConfig(cfg, false, optional)
182 if err != nil {
183 return nil, fmt.Errorf("filter override %q: %v", name, err)
184 }
185 if httpFilter == nil {
186
187 continue
188 }
189 m[name] = config
190 }
191 return m, nil
192 }
193
194 func processHTTPFilters(filters []*v3httppb.HttpFilter, server bool) ([]HTTPFilter, error) {
195 ret := make([]HTTPFilter, 0, len(filters))
196 seenNames := make(map[string]bool, len(filters))
197 for _, filter := range filters {
198 name := filter.GetName()
199 if name == "" {
200 return nil, errors.New("filter missing name field")
201 }
202 if seenNames[name] {
203 return nil, fmt.Errorf("duplicate filter name %q", name)
204 }
205 seenNames[name] = true
206
207 httpFilter, config, err := validateHTTPFilterConfig(filter.GetTypedConfig(), true, filter.GetIsOptional())
208 if err != nil {
209 return nil, err
210 }
211 if httpFilter == nil {
212
213 continue
214 }
215 if server {
216 if _, ok := httpFilter.(httpfilter.ServerInterceptorBuilder); !ok {
217 if filter.GetIsOptional() {
218 continue
219 }
220 return nil, fmt.Errorf("HTTP filter %q not supported server-side", name)
221 }
222 } else if _, ok := httpFilter.(httpfilter.ClientInterceptorBuilder); !ok {
223 if filter.GetIsOptional() {
224 continue
225 }
226 return nil, fmt.Errorf("HTTP filter %q not supported client-side", name)
227 }
228
229
230 ret = append(ret, HTTPFilter{Name: name, Filter: httpFilter, Config: config})
231 }
232
233
234 if len(ret) == 0 {
235 return nil, fmt.Errorf("http filters list is empty")
236 }
237 var i int
238 for ; i < len(ret)-1; i++ {
239 if ret[i].Filter.IsTerminal() {
240 return nil, fmt.Errorf("http filter %q is a terminal filter but it is not last in the filter chain", ret[i].Name)
241 }
242 }
243 if !ret[i].Filter.IsTerminal() {
244 return nil, fmt.Errorf("http filter %q is not a terminal filter", ret[len(ret)-1].Name)
245 }
246 return ret, nil
247 }
248
249 func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
250 if n := len(lis.ListenerFilters); n != 0 {
251 return nil, fmt.Errorf("unsupported field 'listener_filters' contains %d entries", n)
252 }
253 if useOrigDst := lis.GetUseOriginalDst(); useOrigDst != nil && useOrigDst.GetValue() {
254 return nil, errors.New("unsupported field 'use_original_dst' is present and set to true")
255 }
256 addr := lis.GetAddress()
257 if addr == nil {
258 return nil, fmt.Errorf("no address field in LDS response: %+v", lis)
259 }
260 sockAddr := addr.GetSocketAddress()
261 if sockAddr == nil {
262 return nil, fmt.Errorf("no socket_address field in LDS response: %+v", lis)
263 }
264 lu := &ListenerUpdate{
265 InboundListenerCfg: &InboundListenerConfig{
266 Address: sockAddr.GetAddress(),
267 Port: strconv.Itoa(int(sockAddr.GetPortValue())),
268 },
269 }
270
271 fcMgr, err := NewFilterChainManager(lis)
272 if err != nil {
273 return nil, err
274 }
275 lu.InboundListenerCfg.FilterChains = fcMgr
276 return lu, nil
277 }
278
View as plain text