...

Source file src/github.com/Microsoft/hcsshim/cmd/ncproxy/server.go

Documentation: github.com/Microsoft/hcsshim/cmd/ncproxy

     1  //go:build windows
     2  
     3  package main
     4  
     5  import (
     6  	"context"
     7  	"net"
     8  	"strings"
     9  	"sync"
    10  
    11  	"github.com/Microsoft/go-winio"
    12  	"github.com/Microsoft/hcsshim/internal/log"
    13  	ncproxystore "github.com/Microsoft/hcsshim/internal/ncproxy/store"
    14  	"github.com/Microsoft/hcsshim/internal/ncproxyttrpc"
    15  	ncproxygrpcv0 "github.com/Microsoft/hcsshim/pkg/ncproxy/ncproxygrpc/v0"
    16  	ncproxygrpc "github.com/Microsoft/hcsshim/pkg/ncproxy/ncproxygrpc/v1"
    17  	"github.com/Microsoft/hcsshim/pkg/octtrpc"
    18  	"github.com/containerd/ttrpc"
    19  	"github.com/pkg/errors"
    20  	"github.com/sirupsen/logrus"
    21  	bolt "go.etcd.io/bbolt"
    22  	"go.opencensus.io/plugin/ocgrpc"
    23  	"google.golang.org/grpc"
    24  )
    25  
    26  type server struct {
    27  	ttrpc *ttrpc.Server
    28  	grpc  *grpc.Server
    29  	conf  *config
    30  
    31  	// store shared data on server for cleaning up later
    32  	// database for containerID to compute agent address
    33  	agentStore *ncproxystore.ComputeAgentStore
    34  	// cache of container IDs to compute agent clients
    35  	cache *computeAgentCache
    36  
    37  	// database store for ncproxynetworking networks and endpoints
    38  	// database for network name to ncproxy networking network
    39  	ncproxyNetworking *ncproxystore.NetworkingStore
    40  }
    41  
    42  func newServer(ctx context.Context, conf *config, dbPath string) (*server, error) {
    43  	db, err := bolt.Open(dbPath, 0600, nil)
    44  	if err != nil {
    45  		return nil, err
    46  	}
    47  	agentStore := ncproxystore.NewComputeAgentStore(db)
    48  	agentCache := newComputeAgentCache()
    49  	reconnectComputeAgents(ctx, agentStore, agentCache)
    50  
    51  	ttrpcServer, err := ttrpc.NewServer(ttrpc.WithUnaryServerInterceptor(octtrpc.ServerInterceptor()))
    52  	if err != nil {
    53  		log.G(ctx).WithError(err).Error("failed to create ttrpc server")
    54  		return nil, err
    55  	}
    56  	return &server{
    57  		grpc:              grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})),
    58  		ttrpc:             ttrpcServer,
    59  		conf:              conf,
    60  		agentStore:        agentStore,
    61  		cache:             agentCache,
    62  		ncproxyNetworking: ncproxystore.NewNetworkingStore(db),
    63  	}, nil
    64  }
    65  
    66  func (s *server) setup(ctx context.Context) (net.Listener, net.Listener, error) {
    67  	gService := newGRPCService(s.cache, s.ncproxyNetworking)
    68  	ncproxygrpc.RegisterNetworkConfigProxyServer(s.grpc, gService)
    69  
    70  	// support the v0 ncproxy api
    71  	v0Wrapper := newV0ServiceWrapper(gService)
    72  	ncproxygrpcv0.RegisterNetworkConfigProxyServer(s.grpc, v0Wrapper)
    73  	log.G(ctx).Warnf("ncproxygprc api v0 is deprecated, please use ncproxygrpc api v1")
    74  
    75  	tService := newTTRPCService(ctx, s.cache, s.agentStore)
    76  	ncproxyttrpc.RegisterNetworkConfigProxyService(s.ttrpc, tService)
    77  
    78  	ttrpcListener, err := winio.ListenPipe(s.conf.TTRPCAddr, nil)
    79  	if err != nil {
    80  		log.G(ctx).WithError(err).Errorf("failed to listen on %s", s.conf.TTRPCAddr)
    81  		return nil, nil, err
    82  	}
    83  
    84  	grpcListener, err := net.Listen("tcp", s.conf.GRPCAddr)
    85  	if err != nil {
    86  		log.G(ctx).WithError(err).Errorf("failed to listen on %s", s.conf.GRPCAddr)
    87  		return nil, nil, err
    88  	}
    89  	return ttrpcListener, grpcListener, nil
    90  }
    91  
    92  // best effort graceful shutdown of the grpc and ttrpc servers
    93  func (s *server) gracefulShutdown(ctx context.Context) {
    94  	s.grpc.GracefulStop()
    95  	if err := s.ttrpc.Shutdown(ctx); err != nil {
    96  		log.G(ctx).WithError(err).Error("failed to gracefully shutdown ttrpc server")
    97  	}
    98  }
    99  
   100  // best effort cleanup resources belonging to the server
   101  func (s *server) cleanupResources(ctx context.Context) {
   102  	if err := disconnectComputeAgents(ctx, s.cache); err != nil {
   103  		log.G(ctx).WithError(err).Error("failed to disconnect connections in compute agent cache")
   104  	}
   105  	if err := s.agentStore.Close(); err != nil {
   106  		log.G(ctx).WithError(err).Error("failed to close ncproxy compute agent database")
   107  	}
   108  	if err := s.ncproxyNetworking.Close(); err != nil {
   109  		log.G(ctx).WithError(err).Error("failed to close ncproxy networking database")
   110  	}
   111  }
   112  
   113  func trapClosedConnErr(err error) error {
   114  	if err == nil || strings.Contains(err.Error(), "use of closed network connection") {
   115  		return nil
   116  	}
   117  	return err
   118  }
   119  
   120  func (s *server) serve(ctx context.Context, ttrpcListener net.Listener, grpcListener net.Listener, serveErr chan error) {
   121  	go func() {
   122  		log.G(ctx).WithFields(logrus.Fields{
   123  			"address": s.conf.TTRPCAddr,
   124  		}).Info("Serving ncproxy TTRPC service")
   125  
   126  		// No need to defer close the listener as ttrpc.Serve does this internally.
   127  		serveErr <- trapClosedConnErr(s.ttrpc.Serve(ctx, ttrpcListener))
   128  	}()
   129  
   130  	go func() {
   131  		log.G(ctx).WithFields(logrus.Fields{
   132  			"address": s.conf.GRPCAddr,
   133  		}).Info("Serving ncproxy GRPC service")
   134  
   135  		defer grpcListener.Close()
   136  		serveErr <- trapClosedConnErr(s.grpc.Serve(grpcListener))
   137  	}()
   138  }
   139  
   140  // reconnectComputeAgents handles reconnecting to existing compute agents on ncproxy
   141  // restart.
   142  //
   143  // Ncproxy maintains a cache of active compute agents in order reestablish connections
   144  // if the service is restarted. The cache is persisted in a bolt database. The schema
   145  // can be found in `buckets.go`.
   146  //
   147  // On restart ncproxy will attempt to create new compute agent connections from the
   148  // database of active compute agent addresses and add them to its compute agent client
   149  // cache. Reconnect *MUST* be called before the server is allowed to start serving anything
   150  // so that we can ensure that the cache is ready. Reconnections are performed in parallel
   151  // to improve service startup performance.
   152  //
   153  // There are a few failure modes for reconnect:
   154  //
   155  //  1. If a compute agent entry is stale, connecting to the compute agent client will fail
   156  //     and we will remove the entry from the database.
   157  //
   158  //  2. If an active compute agent exists but we fail to connect to it, we will again remove
   159  //     the entry from the database. In this case, it is the node network service's
   160  //     responsibility to cleanup host network resources that are no longer being used.
   161  //
   162  // Other failure modes are possible but not expected. In all failure cases we log the failures
   163  // but allow the service start to proceed. We chose this approach vs just failing service
   164  // start to avoid blocking service for all containers that had successful reconnection and to
   165  // avoid blocking the creation of new containers until retry or mitigation.
   166  func reconnectComputeAgents(ctx context.Context, agentStore *ncproxystore.ComputeAgentStore, agentCache *computeAgentCache) {
   167  	computeAgentMap, err := agentStore.GetComputeAgents(ctx)
   168  	if err != nil && errors.Is(err, ncproxystore.ErrBucketNotFound) {
   169  		// no entries in the database yet, return early
   170  		log.G(ctx).WithError(err).Debug("no entries in database")
   171  		return
   172  	} else if err != nil {
   173  		log.G(ctx).WithError(err).Error("failed to get compute agent information")
   174  	}
   175  
   176  	var wg sync.WaitGroup
   177  	for cid, addr := range computeAgentMap {
   178  		wg.Add(1)
   179  		go func(agentAddress, containerID string) {
   180  			defer wg.Done()
   181  			service, err := getComputeAgentClient(agentAddress)
   182  			if err != nil {
   183  				// can't connect to compute agent, remove entry in database
   184  				log.G(ctx).WithField("agentAddress", agentAddress).WithError(err).Error("failed to create new compute agent client")
   185  				dErr := agentStore.DeleteComputeAgent(ctx, containerID)
   186  				if dErr != nil {
   187  					log.G(ctx).WithField("key", containerID).WithError(dErr).Warn("failed to delete key from compute agent store")
   188  				}
   189  				return
   190  			}
   191  			log.G(ctx).WithField("containerID", containerID).Info("reconnected to container's compute agent")
   192  
   193  			// connection succeeded, add entry in cache map for later
   194  			// since the servers have not started running, we know that the cache cannot be empty
   195  			// which would only happen on a call to `disconnectComputeAgents`, ignore error
   196  			_ = agentCache.put(containerID, service)
   197  		}(addr, cid)
   198  	}
   199  
   200  	wg.Wait()
   201  }
   202  
   203  // disconnectComputeAgents clears the cache of compute agent clients and cleans up
   204  // their resources.
   205  func disconnectComputeAgents(ctx context.Context, containerIDToComputeAgent *computeAgentCache) error {
   206  	agents, err := containerIDToComputeAgent.getAllAndClear()
   207  	if err != nil {
   208  		return errors.Wrapf(err, "failed to get all cached compute agent clients")
   209  	}
   210  	for _, agent := range agents {
   211  		if err := agent.Close(); err != nil {
   212  			log.G(ctx).WithError(err).Error("failed to close compute agent connection")
   213  		}
   214  	}
   215  	return nil
   216  }
   217  

View as plain text