1
2
3 package main
4
5 import (
6 "context"
7 "net"
8 "strings"
9 "sync"
10
11 "github.com/Microsoft/go-winio"
12 "github.com/Microsoft/hcsshim/internal/log"
13 ncproxystore "github.com/Microsoft/hcsshim/internal/ncproxy/store"
14 "github.com/Microsoft/hcsshim/internal/ncproxyttrpc"
15 ncproxygrpcv0 "github.com/Microsoft/hcsshim/pkg/ncproxy/ncproxygrpc/v0"
16 ncproxygrpc "github.com/Microsoft/hcsshim/pkg/ncproxy/ncproxygrpc/v1"
17 "github.com/Microsoft/hcsshim/pkg/octtrpc"
18 "github.com/containerd/ttrpc"
19 "github.com/pkg/errors"
20 "github.com/sirupsen/logrus"
21 bolt "go.etcd.io/bbolt"
22 "go.opencensus.io/plugin/ocgrpc"
23 "google.golang.org/grpc"
24 )
25
26 type server struct {
27 ttrpc *ttrpc.Server
28 grpc *grpc.Server
29 conf *config
30
31
32
33 agentStore *ncproxystore.ComputeAgentStore
34
35 cache *computeAgentCache
36
37
38
39 ncproxyNetworking *ncproxystore.NetworkingStore
40 }
41
42 func newServer(ctx context.Context, conf *config, dbPath string) (*server, error) {
43 db, err := bolt.Open(dbPath, 0600, nil)
44 if err != nil {
45 return nil, err
46 }
47 agentStore := ncproxystore.NewComputeAgentStore(db)
48 agentCache := newComputeAgentCache()
49 reconnectComputeAgents(ctx, agentStore, agentCache)
50
51 ttrpcServer, err := ttrpc.NewServer(ttrpc.WithUnaryServerInterceptor(octtrpc.ServerInterceptor()))
52 if err != nil {
53 log.G(ctx).WithError(err).Error("failed to create ttrpc server")
54 return nil, err
55 }
56 return &server{
57 grpc: grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})),
58 ttrpc: ttrpcServer,
59 conf: conf,
60 agentStore: agentStore,
61 cache: agentCache,
62 ncproxyNetworking: ncproxystore.NewNetworkingStore(db),
63 }, nil
64 }
65
66 func (s *server) setup(ctx context.Context) (net.Listener, net.Listener, error) {
67 gService := newGRPCService(s.cache, s.ncproxyNetworking)
68 ncproxygrpc.RegisterNetworkConfigProxyServer(s.grpc, gService)
69
70
71 v0Wrapper := newV0ServiceWrapper(gService)
72 ncproxygrpcv0.RegisterNetworkConfigProxyServer(s.grpc, v0Wrapper)
73 log.G(ctx).Warnf("ncproxygprc api v0 is deprecated, please use ncproxygrpc api v1")
74
75 tService := newTTRPCService(ctx, s.cache, s.agentStore)
76 ncproxyttrpc.RegisterNetworkConfigProxyService(s.ttrpc, tService)
77
78 ttrpcListener, err := winio.ListenPipe(s.conf.TTRPCAddr, nil)
79 if err != nil {
80 log.G(ctx).WithError(err).Errorf("failed to listen on %s", s.conf.TTRPCAddr)
81 return nil, nil, err
82 }
83
84 grpcListener, err := net.Listen("tcp", s.conf.GRPCAddr)
85 if err != nil {
86 log.G(ctx).WithError(err).Errorf("failed to listen on %s", s.conf.GRPCAddr)
87 return nil, nil, err
88 }
89 return ttrpcListener, grpcListener, nil
90 }
91
92
93 func (s *server) gracefulShutdown(ctx context.Context) {
94 s.grpc.GracefulStop()
95 if err := s.ttrpc.Shutdown(ctx); err != nil {
96 log.G(ctx).WithError(err).Error("failed to gracefully shutdown ttrpc server")
97 }
98 }
99
100
101 func (s *server) cleanupResources(ctx context.Context) {
102 if err := disconnectComputeAgents(ctx, s.cache); err != nil {
103 log.G(ctx).WithError(err).Error("failed to disconnect connections in compute agent cache")
104 }
105 if err := s.agentStore.Close(); err != nil {
106 log.G(ctx).WithError(err).Error("failed to close ncproxy compute agent database")
107 }
108 if err := s.ncproxyNetworking.Close(); err != nil {
109 log.G(ctx).WithError(err).Error("failed to close ncproxy networking database")
110 }
111 }
112
113 func trapClosedConnErr(err error) error {
114 if err == nil || strings.Contains(err.Error(), "use of closed network connection") {
115 return nil
116 }
117 return err
118 }
119
120 func (s *server) serve(ctx context.Context, ttrpcListener net.Listener, grpcListener net.Listener, serveErr chan error) {
121 go func() {
122 log.G(ctx).WithFields(logrus.Fields{
123 "address": s.conf.TTRPCAddr,
124 }).Info("Serving ncproxy TTRPC service")
125
126
127 serveErr <- trapClosedConnErr(s.ttrpc.Serve(ctx, ttrpcListener))
128 }()
129
130 go func() {
131 log.G(ctx).WithFields(logrus.Fields{
132 "address": s.conf.GRPCAddr,
133 }).Info("Serving ncproxy GRPC service")
134
135 defer grpcListener.Close()
136 serveErr <- trapClosedConnErr(s.grpc.Serve(grpcListener))
137 }()
138 }
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 func reconnectComputeAgents(ctx context.Context, agentStore *ncproxystore.ComputeAgentStore, agentCache *computeAgentCache) {
167 computeAgentMap, err := agentStore.GetComputeAgents(ctx)
168 if err != nil && errors.Is(err, ncproxystore.ErrBucketNotFound) {
169
170 log.G(ctx).WithError(err).Debug("no entries in database")
171 return
172 } else if err != nil {
173 log.G(ctx).WithError(err).Error("failed to get compute agent information")
174 }
175
176 var wg sync.WaitGroup
177 for cid, addr := range computeAgentMap {
178 wg.Add(1)
179 go func(agentAddress, containerID string) {
180 defer wg.Done()
181 service, err := getComputeAgentClient(agentAddress)
182 if err != nil {
183
184 log.G(ctx).WithField("agentAddress", agentAddress).WithError(err).Error("failed to create new compute agent client")
185 dErr := agentStore.DeleteComputeAgent(ctx, containerID)
186 if dErr != nil {
187 log.G(ctx).WithField("key", containerID).WithError(dErr).Warn("failed to delete key from compute agent store")
188 }
189 return
190 }
191 log.G(ctx).WithField("containerID", containerID).Info("reconnected to container's compute agent")
192
193
194
195
196 _ = agentCache.put(containerID, service)
197 }(addr, cid)
198 }
199
200 wg.Wait()
201 }
202
203
204
205 func disconnectComputeAgents(ctx context.Context, containerIDToComputeAgent *computeAgentCache) error {
206 agents, err := containerIDToComputeAgent.getAllAndClear()
207 if err != nil {
208 return errors.Wrapf(err, "failed to get all cached compute agent clients")
209 }
210 for _, agent := range agents {
211 if err := agent.Close(); err != nil {
212 log.G(ctx).WithError(err).Error("failed to close compute agent connection")
213 }
214 }
215 return nil
216 }
217
View as plain text