1
18
19 package xds
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "net"
26
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/connectivity"
30 internalgrpclog "google.golang.org/grpc/internal/grpclog"
31 "google.golang.org/grpc/internal/grpcsync"
32 iresolver "google.golang.org/grpc/internal/resolver"
33 "google.golang.org/grpc/internal/transport"
34 "google.golang.org/grpc/internal/xds/bootstrap"
35 "google.golang.org/grpc/metadata"
36 "google.golang.org/grpc/status"
37 "google.golang.org/grpc/xds/internal/server"
38 "google.golang.org/grpc/xds/internal/xdsclient"
39 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
40 )
41
42 const serverPrefix = "[xds-server %p] "
43
44 var (
45
46 newXDSClient = func() (xdsclient.XDSClient, func(), error) {
47 return xdsclient.New()
48 }
49 newGRPCServer = func(opts ...grpc.ServerOption) grpcServer {
50 return grpc.NewServer(opts...)
51 }
52 )
53
54
55
56 type grpcServer interface {
57 RegisterService(*grpc.ServiceDesc, any)
58 Serve(net.Listener) error
59 Stop()
60 GracefulStop()
61 GetServiceInfo() map[string]grpc.ServiceInfo
62 }
63
64
65
66
67
68 type GRPCServer struct {
69 gs grpcServer
70 quit *grpcsync.Event
71 logger *internalgrpclog.PrefixLogger
72 opts *serverOptions
73 xdsC xdsclient.XDSClient
74 xdsClientClose func()
75 }
76
77
78
79
80 func NewGRPCServer(opts ...grpc.ServerOption) (*GRPCServer, error) {
81 newOpts := []grpc.ServerOption{
82 grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
83 grpc.ChainStreamInterceptor(xdsStreamInterceptor),
84 }
85 newOpts = append(newOpts, opts...)
86 s := &GRPCServer{
87 gs: newGRPCServer(newOpts...),
88 quit: grpcsync.NewEvent(),
89 }
90 s.handleServerOptions(opts)
91
92
93
94
95 newXDSClient := newXDSClient
96 if s.opts.bootstrapContentsForTesting != nil {
97
98 newXDSClient = func() (xdsclient.XDSClient, func(), error) {
99 return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting)
100 }
101 }
102 xdsClient, xdsClientClose, err := newXDSClient()
103 if err != nil {
104 return nil, fmt.Errorf("xDS client creation failed: %v", err)
105 }
106
107
108
109
110 cfg := xdsClient.BootstrapConfig()
111 if cfg.ServerListenerResourceNameTemplate == "" {
112 xdsClientClose()
113 return nil, errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
114 }
115
116 s.xdsC = xdsClient
117 s.xdsClientClose = xdsClientClose
118
119 s.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, s))
120 s.logger.Infof("Created xds.GRPCServer")
121
122 return s, nil
123 }
124
125
126
127 func (s *GRPCServer) handleServerOptions(opts []grpc.ServerOption) {
128 so := s.defaultServerOptions()
129 for _, opt := range opts {
130 if o, ok := opt.(*serverOption); ok {
131 o.apply(so)
132 }
133 }
134 s.opts = so
135 }
136
137 func (s *GRPCServer) defaultServerOptions() *serverOptions {
138 return &serverOptions{
139
140
141
142
143
144
145 modeCallback: s.loggingServerModeChangeCallback,
146 }
147 }
148
149 func (s *GRPCServer) loggingServerModeChangeCallback(addr net.Addr, args ServingModeChangeArgs) {
150 switch args.Mode {
151 case connectivity.ServingModeServing:
152 s.logger.Errorf("Listener %q entering mode: %q", addr.String(), args.Mode)
153 case connectivity.ServingModeNotServing:
154 s.logger.Errorf("Listener %q entering mode: %q due to error: %v", addr.String(), args.Mode, args.Err)
155 }
156 }
157
158
159
160
161 func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss any) {
162 s.gs.RegisterService(sd, ss)
163 }
164
165
166
167 func (s *GRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
168 return s.gs.GetServiceInfo()
169 }
170
171
172
173
174
175
176
177
178 func (s *GRPCServer) Serve(lis net.Listener) error {
179 s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String())
180 if _, ok := lis.Addr().(*net.TCPAddr); !ok {
181 return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
182 }
183
184 if s.quit.HasFired() {
185 return grpc.ErrServerStopped
186 }
187
188
189
190
191
192
193 cfg := s.xdsC.BootstrapConfig()
194 name := bootstrap.PopulateResourceTemplate(cfg.ServerListenerResourceNameTemplate, lis.Addr().String())
195
196
197
198 lw := server.NewListenerWrapper(server.ListenerWrapperParams{
199 Listener: lis,
200 ListenerResourceName: name,
201 XDSClient: s.xdsC,
202 ModeCallback: func(addr net.Addr, mode connectivity.ServingMode, err error) {
203 s.opts.modeCallback(addr, ServingModeChangeArgs{
204 Mode: mode,
205 Err: err,
206 })
207 },
208 })
209 return s.gs.Serve(lw)
210 }
211
212
213
214
215
216 func (s *GRPCServer) Stop() {
217 s.quit.Fire()
218 s.gs.Stop()
219 if s.xdsC != nil {
220 s.xdsClientClose()
221 }
222 }
223
224
225
226
227 func (s *GRPCServer) GracefulStop() {
228 s.quit.Fire()
229 s.gs.GracefulStop()
230 if s.xdsC != nil {
231 s.xdsClientClose()
232 }
233 }
234
235
236
237
238 func routeAndProcess(ctx context.Context) error {
239 conn := transport.GetConnection(ctx)
240 cw, ok := conn.(interface {
241 UsableRouteConfiguration() xdsresource.UsableRouteConfiguration
242 })
243 if !ok {
244 return errors.New("missing virtual hosts in incoming context")
245 }
246
247 rc := cw.UsableRouteConfiguration()
248
249
250
251 if rc.Err != nil {
252 if logger.V(2) {
253 logger.Infof("RPC on connection with xDS Configuration error: %v", rc.Err)
254 }
255 return status.Error(codes.Unavailable, "error from xDS configuration for matched route configuration")
256 }
257
258 mn, ok := grpc.Method(ctx)
259 if !ok {
260 return errors.New("missing method name in incoming context")
261 }
262 md, ok := metadata.FromIncomingContext(ctx)
263 if !ok {
264 return errors.New("missing metadata in incoming context")
265 }
266
267
268
269 authority := md.Get(":authority")
270 vh := xdsresource.FindBestMatchingVirtualHostServer(authority[0], rc.VHS)
271 if vh == nil {
272 return status.Error(codes.Unavailable, "the incoming RPC did not match a configured Virtual Host")
273 }
274
275 var rwi *xdsresource.RouteWithInterceptors
276 rpcInfo := iresolver.RPCInfo{
277 Context: ctx,
278 Method: mn,
279 }
280 for _, r := range vh.Routes {
281 if r.M.Match(rpcInfo) {
282
283
284 if r.ActionType != xdsresource.RouteActionNonForwardingAction {
285 return status.Error(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding")
286 }
287 rwi = &r
288 break
289 }
290 }
291 if rwi == nil {
292 return status.Error(codes.Unavailable, "the incoming RPC did not match a configured Route")
293 }
294 for _, interceptor := range rwi.Interceptors {
295 if err := interceptor.AllowRPC(ctx); err != nil {
296 return status.Errorf(codes.PermissionDenied, "Incoming RPC is not allowed: %v", err)
297 }
298 }
299 return nil
300 }
301
302
303
304 func xdsUnaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
305 if err := routeAndProcess(ctx); err != nil {
306 return nil, err
307 }
308 return handler(ctx, req)
309 }
310
311
312
313 func xdsStreamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
314 if err := routeAndProcess(ss.Context()); err != nil {
315 return err
316 }
317 return handler(srv, ss)
318 }
319
View as plain text