...
1
18
19
20
21
22
23
24 package csds
25
26 import (
27 "context"
28 "fmt"
29 "io"
30 "sync"
31
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/grpclog"
34 internalgrpclog "google.golang.org/grpc/internal/grpclog"
35 "google.golang.org/grpc/status"
36 "google.golang.org/grpc/xds/internal/xdsclient"
37 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
38 "google.golang.org/protobuf/types/known/timestamppb"
39
40 v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
41 v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
42 v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
43 )
44
45 var logger = grpclog.Component("xds")
46
47 const prefix = "[csds-server %p] "
48
49 func prefixLogger(s *ClientStatusDiscoveryServer) *internalgrpclog.PrefixLogger {
50 return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, s))
51 }
52
53
54
55
56
57
58
59 type ClientStatusDiscoveryServer struct {
60 logger *internalgrpclog.PrefixLogger
61
62 mu sync.Mutex
63 xdsClient xdsclient.XDSClient
64 xdsClientClose func()
65 }
66
67
68
69 func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) {
70 c, close, err := xdsclient.New()
71 if err != nil {
72 logger.Warningf("Failed to create xDS client: %v", err)
73 }
74 s := &ClientStatusDiscoveryServer{xdsClient: c, xdsClientClose: close}
75 s.logger = prefixLogger(s)
76 s.logger.Infof("Created CSDS server, with xdsClient %p", c)
77 return s, nil
78 }
79
80
81 func (s *ClientStatusDiscoveryServer) StreamClientStatus(stream v3statusgrpc.ClientStatusDiscoveryService_StreamClientStatusServer) error {
82 for {
83 req, err := stream.Recv()
84 if err == io.EOF {
85 return nil
86 }
87 if err != nil {
88 return err
89 }
90 resp, err := s.buildClientStatusRespForReq(req)
91 if err != nil {
92 return err
93 }
94 if err := stream.Send(resp); err != nil {
95 return err
96 }
97 }
98 }
99
100
101 func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
102 return s.buildClientStatusRespForReq(req)
103 }
104
105
106
107
108
109 func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
110 s.mu.Lock()
111 defer s.mu.Unlock()
112
113 if s.xdsClient == nil {
114 return &v3statuspb.ClientStatusResponse{}, nil
115 }
116
117
118 if len(req.NodeMatchers) != 0 {
119 return nil, status.Errorf(codes.InvalidArgument, "node_matchers are not supported, request contains node_matchers: %v", req.NodeMatchers)
120 }
121
122 dump := s.xdsClient.DumpResources()
123 ret := &v3statuspb.ClientStatusResponse{
124 Config: []*v3statuspb.ClientConfig{
125 {
126 Node: s.xdsClient.BootstrapConfig().NodeProto,
127 GenericXdsConfigs: dumpToGenericXdsConfig(dump),
128 },
129 },
130 }
131 return ret, nil
132 }
133
134
135 func (s *ClientStatusDiscoveryServer) Close() {
136 if s.xdsClientClose != nil {
137 s.xdsClientClose()
138 }
139 }
140
141 func dumpToGenericXdsConfig(dump map[string]map[string]xdsresource.UpdateWithMD) []*v3statuspb.ClientConfig_GenericXdsConfig {
142 var ret []*v3statuspb.ClientConfig_GenericXdsConfig
143 for typeURL, updates := range dump {
144 for name, update := range updates {
145 config := &v3statuspb.ClientConfig_GenericXdsConfig{
146 TypeUrl: typeURL,
147 Name: name,
148 VersionInfo: update.MD.Version,
149 XdsConfig: update.Raw,
150 LastUpdated: timestamppb.New(update.MD.Timestamp),
151 ClientStatus: serviceStatusToProto(update.MD.Status),
152 }
153 if errState := update.MD.ErrState; errState != nil {
154 config.ErrorState = &v3adminpb.UpdateFailureState{
155 LastUpdateAttempt: timestamppb.New(errState.Timestamp),
156 Details: errState.Err.Error(),
157 VersionInfo: errState.Version,
158 }
159 }
160 ret = append(ret, config)
161 }
162 }
163 return ret
164 }
165
166 func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
167 switch serviceStatus {
168 case xdsresource.ServiceStatusUnknown:
169 return v3adminpb.ClientResourceStatus_UNKNOWN
170 case xdsresource.ServiceStatusRequested:
171 return v3adminpb.ClientResourceStatus_REQUESTED
172 case xdsresource.ServiceStatusNotExist:
173 return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
174 case xdsresource.ServiceStatusACKed:
175 return v3adminpb.ClientResourceStatus_ACKED
176 case xdsresource.ServiceStatusNACKed:
177 return v3adminpb.ClientResourceStatus_NACKED
178 default:
179 return v3adminpb.ClientResourceStatus_UNKNOWN
180 }
181 }
182
View as plain text