...

Source file src/github.com/Microsoft/hcsshim/cmd/ncproxy/ncproxy.go

Documentation: github.com/Microsoft/hcsshim/cmd/ncproxy

     1  //go:build windows
     2  
     3  package main
     4  
     5  import (
     6  	"context"
     7  	"encoding/json"
     8  	"time"
     9  
    10  	"github.com/Microsoft/go-winio"
    11  	"github.com/Microsoft/hcsshim/hcn"
    12  	"github.com/Microsoft/hcsshim/internal/computeagent"
    13  	"github.com/Microsoft/hcsshim/internal/log"
    14  	ncproxynetworking "github.com/Microsoft/hcsshim/internal/ncproxy/networking"
    15  	ncproxystore "github.com/Microsoft/hcsshim/internal/ncproxy/store"
    16  	"github.com/Microsoft/hcsshim/internal/ncproxyttrpc"
    17  	"github.com/Microsoft/hcsshim/internal/oc"
    18  	"github.com/Microsoft/hcsshim/internal/uvm"
    19  	ncproxygrpc "github.com/Microsoft/hcsshim/pkg/ncproxy/ncproxygrpc/v1"
    20  	nodenetsvc "github.com/Microsoft/hcsshim/pkg/ncproxy/nodenetsvc/v1"
    21  	"github.com/Microsoft/hcsshim/pkg/octtrpc"
    22  	"github.com/containerd/ttrpc"
    23  	"github.com/containerd/typeurl"
    24  	"github.com/gogo/protobuf/types"
    25  	"github.com/pkg/errors"
    26  	"go.opencensus.io/trace"
    27  	"google.golang.org/grpc/codes"
    28  	"google.golang.org/grpc/status"
    29  )
    30  
    31  func init() {
    32  	typeurl.Register(&ncproxynetworking.Endpoint{}, "ncproxy/ncproxynetworking/Endpoint")
    33  	typeurl.Register(&ncproxynetworking.Network{}, "ncproxy/ncproxynetworking/Network")
    34  	typeurl.Register(&hcn.HostComputeEndpoint{}, "ncproxy/hcn/HostComputeEndpoint")
    35  	typeurl.Register(&hcn.HostComputeNetwork{}, "ncproxy/hcn/HostComputeNetwork")
    36  }
    37  
    38  // functions for mocking out in tests
    39  var (
    40  	winioDialPipe  = winio.DialPipe
    41  	ttrpcNewClient = ttrpc.NewClient
    42  )
    43  
    44  // GRPC service exposed for use by a Node Network Service.
    45  type grpcService struct {
    46  	// containerIDToComputeAgent is a cache that stores the mappings from
    47  	// container ID to compute agent address is memory. This is repopulated
    48  	// on reconnect and referenced during client calls.
    49  	containerIDToComputeAgent *computeAgentCache
    50  
    51  	// ncproxyNetworking is a database that stores the ncproxy networking networks
    52  	// and endpoints persistently.
    53  	ncpNetworkingStore *ncproxystore.NetworkingStore
    54  }
    55  
    56  func newGRPCService(agentCache *computeAgentCache, ncproxyNetworking *ncproxystore.NetworkingStore) *grpcService {
    57  	return &grpcService{
    58  		containerIDToComputeAgent: agentCache,
    59  		ncpNetworkingStore:        ncproxyNetworking,
    60  	}
    61  }
    62  
    63  var _ ncproxygrpc.NetworkConfigProxyServer = &grpcService{}
    64  
    65  func (s *grpcService) AddNIC(ctx context.Context, req *ncproxygrpc.AddNICRequest) (_ *ncproxygrpc.AddNICResponse, err error) {
    66  	ctx, span := oc.StartSpan(ctx, "AddNIC")
    67  	defer span.End()
    68  	defer func() { oc.SetSpanStatus(span, err) }()
    69  
    70  	span.AddAttributes(
    71  		trace.StringAttribute("containerID", req.ContainerID),
    72  		trace.StringAttribute("endpointName", req.EndpointName),
    73  		trace.StringAttribute("nicID", req.NicID))
    74  
    75  	if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" {
    76  		return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
    77  	}
    78  
    79  	agent, err := s.containerIDToComputeAgent.get(req.ContainerID)
    80  	if err != nil {
    81  		return nil, status.Errorf(codes.FailedPrecondition, "No shim registered for namespace `%s`", req.ContainerID)
    82  	}
    83  
    84  	var anyEndpoint *types.Any
    85  	if ep, err := s.ncpNetworkingStore.GetEndpointByName(ctx, req.EndpointName); err == nil {
    86  		if ep.Settings == nil || ep.Settings.DeviceDetails == nil || ep.Settings.DeviceDetails.PCIDeviceDetails == nil {
    87  			return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
    88  		}
    89  		// if there are device details, assign the device via the compute agent
    90  		caReq := &computeagent.AssignPCIInternalRequest{
    91  			ContainerID:          req.ContainerID,
    92  			DeviceID:             ep.Settings.DeviceDetails.PCIDeviceDetails.DeviceID,
    93  			VirtualFunctionIndex: ep.Settings.DeviceDetails.PCIDeviceDetails.VirtualFunctionIndex,
    94  			NicID:                req.NicID,
    95  		}
    96  		if _, err := agent.AssignPCI(ctx, caReq); err != nil {
    97  			return nil, err
    98  		}
    99  		anyEndpoint, err = typeurl.MarshalAny(ep)
   100  		if err != nil {
   101  			return nil, err
   102  		}
   103  	} else {
   104  		if !errors.Is(err, ncproxystore.ErrBucketNotFound) && !errors.Is(err, ncproxystore.ErrKeyNotFound) {
   105  			// log if there was an unexpected error before checking if this is an hcn endpoint
   106  			log.G(ctx).WithError(err).Warn("Failed to query ncproxy networking database")
   107  		}
   108  		ep, err := hcn.GetEndpointByName(req.EndpointName)
   109  		if err != nil {
   110  			if _, ok := err.(hcn.EndpointNotFoundError); ok {
   111  				return nil, status.Errorf(codes.NotFound, "no endpoint with name `%s` found", req.EndpointName)
   112  			}
   113  			return nil, errors.Wrapf(err, "failed to get endpoint with name `%s`", req.EndpointName)
   114  		}
   115  
   116  		anyEndpoint, err = typeurl.MarshalAny(ep)
   117  		if err != nil {
   118  			return nil, err
   119  		}
   120  
   121  		settings := req.EndpointSettings.GetHcnEndpoint()
   122  		if settings != nil && settings.Policies != nil && settings.Policies.IovPolicySettings != nil {
   123  			log.G(ctx).WithField("iov settings", settings.Policies.IovPolicySettings).Info("AddNIC iov settings")
   124  			iovReqSettings := settings.Policies.IovPolicySettings
   125  			if iovReqSettings.IovOffloadWeight != 0 {
   126  				// IOV policy was set during add nic request, update the hcn endpoint
   127  				hcnIOVSettings := &hcn.IovPolicySetting{
   128  					IovOffloadWeight:    iovReqSettings.IovOffloadWeight,
   129  					QueuePairsRequested: iovReqSettings.QueuePairsRequested,
   130  					InterruptModeration: iovReqSettings.InterruptModeration,
   131  				}
   132  				rawJSON, err := json.Marshal(hcnIOVSettings)
   133  				if err != nil {
   134  					return nil, err
   135  				}
   136  
   137  				iovPolicy := hcn.EndpointPolicy{
   138  					Type:     hcn.IOV,
   139  					Settings: rawJSON,
   140  				}
   141  				policies := []hcn.EndpointPolicy{iovPolicy}
   142  				if err := modifyEndpoint(ctx, ep.Id, policies, hcn.RequestTypeUpdate); err != nil {
   143  					return nil, errors.Wrap(err, "failed to add policy to endpoint")
   144  				}
   145  			}
   146  		}
   147  	}
   148  
   149  	caReq := &computeagent.AddNICInternalRequest{
   150  		ContainerID: req.ContainerID,
   151  		NicID:       req.NicID,
   152  		Endpoint:    anyEndpoint,
   153  	}
   154  	if _, err := agent.AddNIC(ctx, caReq); err != nil {
   155  		return nil, err
   156  	}
   157  	return &ncproxygrpc.AddNICResponse{}, nil
   158  }
   159  
   160  func (s *grpcService) ModifyNIC(ctx context.Context, req *ncproxygrpc.ModifyNICRequest) (_ *ncproxygrpc.ModifyNICResponse, err error) {
   161  	ctx, span := oc.StartSpan(ctx, "ModifyNIC")
   162  	defer span.End()
   163  	defer func() { oc.SetSpanStatus(span, err) }()
   164  
   165  	span.AddAttributes(
   166  		trace.StringAttribute("containerID", req.ContainerID),
   167  		trace.StringAttribute("endpointName", req.EndpointName),
   168  		trace.StringAttribute("nicID", req.NicID))
   169  
   170  	if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" || req.EndpointSettings == nil {
   171  		return nil, status.Error(codes.InvalidArgument, "received empty field in request")
   172  	}
   173  
   174  	if _, err := s.ncpNetworkingStore.GetEndpointByName(ctx, req.EndpointName); err == nil {
   175  		return nil, status.Errorf(codes.Unimplemented, "cannot modify custom endpoints: %v", req)
   176  	}
   177  
   178  	ep, err := hcn.GetEndpointByName(req.EndpointName)
   179  	if err != nil {
   180  		if _, ok := err.(hcn.EndpointNotFoundError); ok {
   181  			return nil, status.Errorf(codes.NotFound, "no endpoint with name `%s` found", req.EndpointName)
   182  		}
   183  		return nil, errors.Wrapf(err, "failed to get endpoint with name `%s`", req.EndpointName)
   184  	}
   185  
   186  	anyEndpoint, err := typeurl.MarshalAny(ep)
   187  	if err != nil {
   188  		return nil, err
   189  	}
   190  
   191  	agent, err := s.containerIDToComputeAgent.get(req.ContainerID)
   192  	if err != nil {
   193  		return nil, status.Errorf(codes.FailedPrecondition, "No shim registered for containerID `%s`", req.ContainerID)
   194  	}
   195  	settings := req.EndpointSettings.GetHcnEndpoint()
   196  	if settings.Policies == nil || settings.Policies.IovPolicySettings == nil {
   197  		return nil, status.Error(codes.InvalidArgument, "received empty field in request")
   198  	}
   199  	log.G(ctx).WithField("iov settings", settings.Policies.IovPolicySettings).Info("ModifyNIC iov settings")
   200  
   201  	iovReqSettings := settings.Policies.IovPolicySettings
   202  	caReq := &computeagent.ModifyNICInternalRequest{
   203  		NicID:    req.NicID,
   204  		Endpoint: anyEndpoint,
   205  		IovPolicySettings: &computeagent.IovSettings{
   206  			IovOffloadWeight:    iovReqSettings.IovOffloadWeight,
   207  			QueuePairsRequested: iovReqSettings.QueuePairsRequested,
   208  			InterruptModeration: iovReqSettings.InterruptModeration,
   209  		},
   210  	}
   211  
   212  	hcnIOVSettings := &hcn.IovPolicySetting{
   213  		IovOffloadWeight:    iovReqSettings.IovOffloadWeight,
   214  		QueuePairsRequested: iovReqSettings.QueuePairsRequested,
   215  		InterruptModeration: iovReqSettings.InterruptModeration,
   216  	}
   217  	rawJSON, err := json.Marshal(hcnIOVSettings)
   218  	if err != nil {
   219  		return nil, err
   220  	}
   221  
   222  	iovPolicy := hcn.EndpointPolicy{
   223  		Type:     hcn.IOV,
   224  		Settings: rawJSON,
   225  	}
   226  	policies := []hcn.EndpointPolicy{iovPolicy}
   227  
   228  	// To turn off iov offload on an endpoint, we need to first call into HCS to change the
   229  	// offload weight and then call into HNS to revoke the policy.
   230  	//
   231  	// To turn on iov offload, the reverse order is used.
   232  	if iovReqSettings.IovOffloadWeight == 0 {
   233  		if _, err := agent.ModifyNIC(ctx, caReq); err != nil {
   234  			return nil, err
   235  		}
   236  		if err := modifyEndpoint(ctx, ep.Id, policies, hcn.RequestTypeUpdate); err != nil {
   237  			return nil, errors.Wrap(err, "failed to modify network adapter")
   238  		}
   239  		if err := modifyEndpoint(ctx, ep.Id, policies, hcn.RequestTypeRemove); err != nil {
   240  			return nil, errors.Wrap(err, "failed to modify network adapter")
   241  		}
   242  	} else {
   243  		if err := modifyEndpoint(ctx, ep.Id, policies, hcn.RequestTypeUpdate); err != nil {
   244  			return nil, errors.Wrap(err, "failed to modify network adapter")
   245  		}
   246  		if _, err := agent.ModifyNIC(ctx, caReq); err != nil {
   247  			return nil, err
   248  		}
   249  	}
   250  
   251  	return &ncproxygrpc.ModifyNICResponse{}, nil
   252  }
   253  
   254  func (s *grpcService) DeleteNIC(ctx context.Context, req *ncproxygrpc.DeleteNICRequest) (_ *ncproxygrpc.DeleteNICResponse, err error) {
   255  	ctx, span := oc.StartSpan(ctx, "DeleteNIC")
   256  	defer span.End()
   257  	defer func() { oc.SetSpanStatus(span, err) }()
   258  
   259  	span.AddAttributes(
   260  		trace.StringAttribute("containerID", req.ContainerID),
   261  		trace.StringAttribute("endpointName", req.EndpointName),
   262  		trace.StringAttribute("nicID", req.NicID))
   263  
   264  	if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" {
   265  		return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   266  	}
   267  
   268  	var anyEndpoint *types.Any
   269  	if endpt, err := s.ncpNetworkingStore.GetEndpointByName(ctx, req.EndpointName); err == nil {
   270  		anyEndpoint, err = typeurl.MarshalAny(endpt)
   271  		if err != nil {
   272  			return nil, err
   273  		}
   274  	} else {
   275  		if !errors.Is(err, ncproxystore.ErrBucketNotFound) && !errors.Is(err, ncproxystore.ErrKeyNotFound) {
   276  			// log if there was an unexpected error before checking if this is an hcn endpoint
   277  			log.G(ctx).WithError(err).Warn("Failed to query ncproxy networking database")
   278  		}
   279  		ep, err := hcn.GetEndpointByName(req.EndpointName)
   280  		if err != nil {
   281  			if _, ok := err.(hcn.EndpointNotFoundError); ok {
   282  				return nil, status.Errorf(codes.NotFound, "no endpoint with name `%s` found", req.EndpointName)
   283  			}
   284  			return nil, errors.Wrapf(err, "failed to get endpoint with name `%s`", req.EndpointName)
   285  		}
   286  		anyEndpoint, err = typeurl.MarshalAny(ep)
   287  		if err != nil {
   288  			return nil, err
   289  		}
   290  	}
   291  	agent, err := s.containerIDToComputeAgent.get(req.ContainerID)
   292  	if err == nil {
   293  		caReq := &computeagent.DeleteNICInternalRequest{
   294  			ContainerID: req.ContainerID,
   295  			NicID:       req.NicID,
   296  			Endpoint:    anyEndpoint,
   297  		}
   298  		if _, err := agent.DeleteNIC(ctx, caReq); err != nil {
   299  			if err == uvm.ErrNICNotFound || err == uvm.ErrNetNSNotFound {
   300  				return nil, status.Errorf(codes.NotFound, "failed to remove endpoint %q from namespace %q", req.EndpointName, req.NicID)
   301  			}
   302  			return nil, err
   303  		}
   304  		return &ncproxygrpc.DeleteNICResponse{}, nil
   305  	}
   306  	return nil, status.Errorf(codes.FailedPrecondition, "No shim registered for namespace `%s`", req.ContainerID)
   307  }
   308  
   309  func (s *grpcService) CreateNetwork(ctx context.Context, req *ncproxygrpc.CreateNetworkRequest) (_ *ncproxygrpc.CreateNetworkResponse, err error) {
   310  	ctx, span := oc.StartSpan(ctx, "CreateNetwork")
   311  	defer span.End()
   312  	defer func() { oc.SetSpanStatus(span, err) }()
   313  
   314  	if req.Network == nil || req.Network.GetSettings() == nil {
   315  		return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   316  	}
   317  
   318  	switch req.Network.GetSettings().(type) {
   319  	case *ncproxygrpc.Network_HcnNetwork:
   320  		networkReq := req.Network.GetHcnNetwork()
   321  		if networkReq.Name == "" {
   322  			return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   323  		}
   324  		span.AddAttributes(
   325  			trace.StringAttribute("networkName", networkReq.Name),
   326  			trace.StringAttribute("type", networkReq.Mode.String()),
   327  			trace.StringAttribute("ipamType", networkReq.IpamType.String()))
   328  
   329  		network, err := createHCNNetwork(ctx, networkReq)
   330  		if err != nil {
   331  			return nil, err
   332  		}
   333  		return &ncproxygrpc.CreateNetworkResponse{
   334  			ID: network.Id,
   335  		}, nil
   336  	case *ncproxygrpc.Network_NcproxyNetwork:
   337  		settings := req.Network.GetNcproxyNetwork()
   338  		if settings.Name == "" {
   339  			return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   340  		}
   341  		networkSettings := &ncproxynetworking.NetworkSettings{
   342  			Name: settings.Name,
   343  		}
   344  		network := &ncproxynetworking.Network{
   345  			NetworkName: settings.Name,
   346  			Settings:    networkSettings,
   347  		}
   348  		if err := s.ncpNetworkingStore.CreateNetwork(ctx, network); err != nil {
   349  			return nil, err
   350  		}
   351  		return &ncproxygrpc.CreateNetworkResponse{
   352  			ID: settings.Name,
   353  		}, nil
   354  	}
   355  
   356  	return nil, status.Errorf(codes.InvalidArgument, "invalid network settings type: %+v", req.Network.Settings)
   357  }
   358  
   359  func (s *grpcService) CreateEndpoint(ctx context.Context, req *ncproxygrpc.CreateEndpointRequest) (_ *ncproxygrpc.CreateEndpointResponse, err error) {
   360  	ctx, span := oc.StartSpan(ctx, "CreateEndpoint")
   361  	defer span.End()
   362  	defer func() { oc.SetSpanStatus(span, err) }()
   363  
   364  	if req.EndpointSettings == nil {
   365  		return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   366  	}
   367  
   368  	switch req.EndpointSettings.GetSettings().(type) {
   369  	case *ncproxygrpc.EndpointSettings_HcnEndpoint:
   370  		reqEndpoint := req.EndpointSettings.GetHcnEndpoint()
   371  
   372  		span.AddAttributes(
   373  			trace.StringAttribute("macAddr", reqEndpoint.Macaddress),
   374  			trace.StringAttribute("endpointName", reqEndpoint.Name),
   375  			trace.StringAttribute("ipAddr", reqEndpoint.Ipaddress),
   376  			trace.StringAttribute("networkName", reqEndpoint.NetworkName))
   377  
   378  		if reqEndpoint.Name == "" || reqEndpoint.Ipaddress == "" || reqEndpoint.Macaddress == "" || reqEndpoint.NetworkName == "" {
   379  			return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   380  		}
   381  
   382  		network, err := hcn.GetNetworkByName(reqEndpoint.NetworkName)
   383  		if err != nil {
   384  			if _, ok := err.(hcn.NetworkNotFoundError); ok {
   385  				return nil, status.Errorf(codes.NotFound, "no network with name `%s` found", reqEndpoint.NetworkName)
   386  			}
   387  			return nil, errors.Wrapf(err, "failed to get network with name %q", reqEndpoint.NetworkName)
   388  		}
   389  		ep, err := createHCNEndpoint(ctx, network, reqEndpoint)
   390  		if err != nil {
   391  			return nil, err
   392  		}
   393  		return &ncproxygrpc.CreateEndpointResponse{
   394  			ID: ep.Id,
   395  		}, nil
   396  	case *ncproxygrpc.EndpointSettings_NcproxyEndpoint:
   397  		// get the network stored, create endpoint data and store
   398  		reqEndpoint := req.EndpointSettings.GetNcproxyEndpoint()
   399  		if reqEndpoint.Name == "" || reqEndpoint.Ipaddress == "" || reqEndpoint.Macaddress == "" || reqEndpoint.NetworkName == "" || reqEndpoint.DeviceDetails == nil {
   400  			return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   401  		}
   402  
   403  		network, err := s.ncpNetworkingStore.GetNetworkByName(ctx, reqEndpoint.NetworkName)
   404  		if err != nil || network == nil {
   405  			return nil, errors.Wrapf(err, "network %v does not exist", reqEndpoint.NetworkName)
   406  		}
   407  		epSettings := &ncproxynetworking.EndpointSettings{
   408  			Name:                  reqEndpoint.Name,
   409  			Macaddress:            reqEndpoint.Macaddress,
   410  			IPAddress:             reqEndpoint.Ipaddress,
   411  			IPAddressPrefixLength: reqEndpoint.IpaddressPrefixlength,
   412  			NetworkName:           reqEndpoint.NetworkName,
   413  			DefaultGateway:        reqEndpoint.DefaultGateway,
   414  			DeviceDetails: &ncproxynetworking.DeviceDetails{
   415  				PCIDeviceDetails: &ncproxynetworking.PCIDeviceDetails{
   416  					DeviceID:             reqEndpoint.GetPciDeviceDetails().DeviceID,
   417  					VirtualFunctionIndex: reqEndpoint.GetPciDeviceDetails().VirtualFunctionIndex,
   418  				},
   419  			},
   420  		}
   421  		ep := &ncproxynetworking.Endpoint{
   422  			EndpointName: reqEndpoint.Name,
   423  			Settings:     epSettings,
   424  		}
   425  		if err := s.ncpNetworkingStore.CreatEndpoint(ctx, ep); err != nil {
   426  			return nil, err
   427  		}
   428  		return &ncproxygrpc.CreateEndpointResponse{
   429  			ID: reqEndpoint.Name,
   430  		}, nil
   431  	}
   432  
   433  	return nil, status.Errorf(codes.InvalidArgument, "invalid endpoint settings type: %+v", req.EndpointSettings.GetSettings())
   434  }
   435  
   436  func (s *grpcService) AddEndpoint(ctx context.Context, req *ncproxygrpc.AddEndpointRequest) (_ *ncproxygrpc.AddEndpointResponse, err error) {
   437  	ctx, span := oc.StartSpan(ctx, "AddEndpoint")
   438  	defer span.End()
   439  	defer func() { oc.SetSpanStatus(span, err) }()
   440  
   441  	span.AddAttributes(
   442  		trace.StringAttribute("endpointName", req.Name),
   443  		trace.StringAttribute("namespaceID", req.NamespaceID))
   444  
   445  	if req.Name == "" || (!req.AttachToHost && req.NamespaceID == "") {
   446  		return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   447  	}
   448  
   449  	if endpt, err := s.ncpNetworkingStore.GetEndpointByName(ctx, req.Name); err == nil {
   450  		endpt.NamespaceID = req.NamespaceID
   451  		if err := s.ncpNetworkingStore.UpdateEndpoint(ctx, endpt); err != nil {
   452  			return nil, errors.Wrapf(err, "failed to update endpoint with name `%s`", req.Name)
   453  		}
   454  	} else {
   455  		if !errors.Is(err, ncproxystore.ErrBucketNotFound) && !errors.Is(err, ncproxystore.ErrKeyNotFound) {
   456  			// log if there was an unexpected error before checking if this is an hcn endpoint
   457  			log.G(ctx).WithError(err).Warn("Failed to query ncproxy networking database")
   458  		}
   459  		ep, err := hcn.GetEndpointByName(req.Name)
   460  		if err != nil {
   461  			if _, ok := err.(hcn.EndpointNotFoundError); ok {
   462  				return nil, status.Errorf(codes.NotFound, "no endpoint with name `%s` found", req.Name)
   463  			}
   464  			return nil, errors.Wrapf(err, "failed to get endpoint with name `%s`", req.Name)
   465  		}
   466  		if req.AttachToHost {
   467  			if req.NamespaceID != "" {
   468  				log.G(ctx).WithField("namespaceID", req.NamespaceID).
   469  					Warning("Specified namespace ID will be ignored when attaching to default host namespace")
   470  			}
   471  
   472  			nsID, err := getHostDefaultNamespace()
   473  			if err != nil {
   474  				return nil, err
   475  			}
   476  
   477  			req.NamespaceID = nsID
   478  			log.G(ctx).WithField("namespaceID", req.NamespaceID).Debug("Attaching endpoint to default host namespace")
   479  			// replace current span namespaceID attribute
   480  			span.AddAttributes(trace.StringAttribute("namespaceID", req.NamespaceID))
   481  		}
   482  		if err := hcn.AddNamespaceEndpoint(req.NamespaceID, ep.Id); err != nil {
   483  			return nil, errors.Wrapf(err, "failed to add endpoint with name %q to namespace", req.Name)
   484  		}
   485  	}
   486  
   487  	return &ncproxygrpc.AddEndpointResponse{}, nil
   488  }
   489  
   490  func (s *grpcService) DeleteEndpoint(ctx context.Context, req *ncproxygrpc.DeleteEndpointRequest) (_ *ncproxygrpc.DeleteEndpointResponse, err error) {
   491  	ctx, span := oc.StartSpan(ctx, "DeleteEndpoint")
   492  	defer span.End()
   493  	defer func() { oc.SetSpanStatus(span, err) }()
   494  
   495  	span.AddAttributes(
   496  		trace.StringAttribute("endpointName", req.Name))
   497  
   498  	if req.Name == "" {
   499  		return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   500  	}
   501  
   502  	if _, err := s.ncpNetworkingStore.GetEndpointByName(ctx, req.Name); err == nil {
   503  		if err := s.ncpNetworkingStore.DeleteEndpoint(ctx, req.Name); err != nil {
   504  			return nil, errors.Wrapf(err, "failed to delete endpoint with name %q", req.Name)
   505  		}
   506  	} else {
   507  		if !errors.Is(err, ncproxystore.ErrBucketNotFound) && !errors.Is(err, ncproxystore.ErrKeyNotFound) {
   508  			// log if there was an unexpected error before checking if this is an hcn endpoint
   509  			log.G(ctx).WithError(err).Warn("Failed to query ncproxy networking database")
   510  		}
   511  		ep, err := hcn.GetEndpointByName(req.Name)
   512  		if err != nil {
   513  			if _, ok := err.(hcn.EndpointNotFoundError); ok {
   514  				return nil, status.Errorf(codes.NotFound, "no endpoint with name `%s` found", req.Name)
   515  			}
   516  			return nil, errors.Wrapf(err, "failed to get endpoint with name %q", req.Name)
   517  		}
   518  
   519  		if err = ep.Delete(); err != nil {
   520  			return nil, errors.Wrapf(err, "failed to delete endpoint with name %q", req.Name)
   521  		}
   522  	}
   523  	return &ncproxygrpc.DeleteEndpointResponse{}, nil
   524  }
   525  
   526  func (s *grpcService) DeleteNetwork(ctx context.Context, req *ncproxygrpc.DeleteNetworkRequest) (_ *ncproxygrpc.DeleteNetworkResponse, err error) {
   527  	ctx, span := oc.StartSpan(ctx, "DeleteNetwork")
   528  	defer span.End()
   529  	defer func() { oc.SetSpanStatus(span, err) }()
   530  
   531  	span.AddAttributes(
   532  		trace.StringAttribute("networkName", req.Name))
   533  
   534  	if req.Name == "" {
   535  		return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   536  	}
   537  
   538  	if _, err := s.ncpNetworkingStore.GetNetworkByName(ctx, req.Name); err == nil {
   539  		if err := s.ncpNetworkingStore.DeleteNetwork(ctx, req.Name); err != nil {
   540  			return nil, errors.Wrapf(err, "failed to delete network with name %q", req.Name)
   541  		}
   542  	} else {
   543  		if !errors.Is(err, ncproxystore.ErrBucketNotFound) && !errors.Is(err, ncproxystore.ErrKeyNotFound) {
   544  			log.G(ctx).WithError(err).Warn("Failed to query ncproxy networking database")
   545  		}
   546  		network, err := hcn.GetNetworkByName(req.Name)
   547  		if err != nil {
   548  			if _, ok := err.(hcn.NetworkNotFoundError); ok {
   549  				return nil, status.Errorf(codes.NotFound, "no network with name `%s` found", req.Name)
   550  			}
   551  			return nil, errors.Wrapf(err, "failed to get network with name %q", req.Name)
   552  		}
   553  
   554  		if err = network.Delete(); err != nil {
   555  			return nil, errors.Wrapf(err, "failed to delete network with name %q", req.Name)
   556  		}
   557  	}
   558  
   559  	return &ncproxygrpc.DeleteNetworkResponse{}, nil
   560  }
   561  
   562  func ncpNetworkingEndpointToEndpointResponse(ep *ncproxynetworking.Endpoint) (_ *ncproxygrpc.GetEndpointResponse, err error) {
   563  	result := &ncproxygrpc.GetEndpointResponse{
   564  		Namespace: ep.NamespaceID,
   565  		ID:        ep.EndpointName,
   566  	}
   567  	if ep.Settings == nil {
   568  		return result, nil
   569  	}
   570  
   571  	deviceDetails := &ncproxygrpc.NCProxyEndpointSettings_PciDeviceDetails{}
   572  	if ep.Settings.DeviceDetails != nil && ep.Settings.DeviceDetails.PCIDeviceDetails != nil {
   573  		deviceDetails.PciDeviceDetails = &ncproxygrpc.PCIDeviceDetails{
   574  			DeviceID:             ep.Settings.DeviceDetails.PCIDeviceDetails.DeviceID,
   575  			VirtualFunctionIndex: ep.Settings.DeviceDetails.PCIDeviceDetails.VirtualFunctionIndex,
   576  		}
   577  	}
   578  
   579  	result.Endpoint = &ncproxygrpc.EndpointSettings{
   580  		Settings: &ncproxygrpc.EndpointSettings_NcproxyEndpoint{
   581  			NcproxyEndpoint: &ncproxygrpc.NCProxyEndpointSettings{
   582  				Name:                  ep.EndpointName,
   583  				Macaddress:            ep.Settings.Macaddress,
   584  				Ipaddress:             ep.Settings.IPAddress,
   585  				IpaddressPrefixlength: ep.Settings.IPAddressPrefixLength,
   586  				NetworkName:           ep.Settings.NetworkName,
   587  				DefaultGateway:        ep.Settings.DefaultGateway,
   588  				DeviceDetails:         deviceDetails,
   589  			},
   590  		},
   591  	}
   592  	return result, nil
   593  }
   594  
   595  func (s *grpcService) GetEndpoint(ctx context.Context, req *ncproxygrpc.GetEndpointRequest) (_ *ncproxygrpc.GetEndpointResponse, err error) {
   596  	ctx, span := oc.StartSpan(ctx, "GetEndpoint")
   597  	defer span.End()
   598  	defer func() { oc.SetSpanStatus(span, err) }()
   599  
   600  	span.AddAttributes(
   601  		trace.StringAttribute("endpointName", req.Name))
   602  
   603  	if req.Name == "" {
   604  		return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   605  	}
   606  
   607  	if ep, err := s.ncpNetworkingStore.GetEndpointByName(ctx, req.Name); err == nil {
   608  		return ncpNetworkingEndpointToEndpointResponse(ep)
   609  	} else if !errors.Is(err, ncproxystore.ErrBucketNotFound) && !errors.Is(err, ncproxystore.ErrKeyNotFound) {
   610  		log.G(ctx).WithError(err).Warn("Failed to query ncproxy networking database")
   611  	}
   612  
   613  	ep, err := hcn.GetEndpointByName(req.Name)
   614  	if err != nil {
   615  		if _, ok := err.(hcn.EndpointNotFoundError); ok {
   616  			return nil, status.Errorf(codes.NotFound, "no endpoint with name `%s` found", req.Name)
   617  		}
   618  		return nil, errors.Wrapf(err, "failed to get endpoint with name %q", req.Name)
   619  	}
   620  	return hcnEndpointToEndpointResponse(ep)
   621  }
   622  
   623  func (s *grpcService) GetEndpoints(ctx context.Context, req *ncproxygrpc.GetEndpointsRequest) (_ *ncproxygrpc.GetEndpointsResponse, err error) {
   624  	ctx, span := oc.StartSpan(ctx, "GetEndpoints")
   625  	defer span.End()
   626  	defer func() { oc.SetSpanStatus(span, err) }()
   627  
   628  	endpoints := []*ncproxygrpc.GetEndpointResponse{}
   629  
   630  	rawHCNEndpoints, err := hcn.ListEndpoints()
   631  	if err != nil {
   632  		return nil, errors.Wrap(err, "failed to get HNS endpoints")
   633  	}
   634  
   635  	rawNCProxyEndpoints, err := s.ncpNetworkingStore.ListEndpoints(ctx)
   636  	if err != nil && !errors.Is(err, ncproxystore.ErrBucketNotFound) {
   637  		return nil, errors.Wrap(err, "failed to get ncproxy networking endpoints")
   638  	}
   639  
   640  	for _, endpoint := range rawHCNEndpoints {
   641  		e, err := hcnEndpointToEndpointResponse(&endpoint)
   642  		if err != nil {
   643  			return nil, err
   644  		}
   645  		endpoints = append(endpoints, e)
   646  	}
   647  
   648  	for _, endpoint := range rawNCProxyEndpoints {
   649  		e, err := ncpNetworkingEndpointToEndpointResponse(endpoint)
   650  		if err != nil {
   651  			return nil, err
   652  		}
   653  		endpoints = append(endpoints, e)
   654  	}
   655  
   656  	return &ncproxygrpc.GetEndpointsResponse{
   657  		Endpoints: endpoints,
   658  	}, nil
   659  }
   660  
   661  func ncpNetworkingNetworkToNetworkResponse(network *ncproxynetworking.Network) (*ncproxygrpc.GetNetworkResponse, error) {
   662  	return &ncproxygrpc.GetNetworkResponse{
   663  		ID: network.NetworkName,
   664  		Network: &ncproxygrpc.Network{
   665  			Settings: &ncproxygrpc.Network_NcproxyNetwork{
   666  				NcproxyNetwork: &ncproxygrpc.NCProxyNetworkSettings{
   667  					Name: network.Settings.Name,
   668  				},
   669  			},
   670  		},
   671  	}, nil
   672  }
   673  
   674  func (s *grpcService) GetNetwork(ctx context.Context, req *ncproxygrpc.GetNetworkRequest) (_ *ncproxygrpc.GetNetworkResponse, err error) {
   675  	ctx, span := oc.StartSpan(ctx, "GetNetwork")
   676  	defer span.End()
   677  	defer func() { oc.SetSpanStatus(span, err) }()
   678  
   679  	span.AddAttributes(
   680  		trace.StringAttribute("networkName", req.Name))
   681  
   682  	if req.Name == "" {
   683  		return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
   684  	}
   685  
   686  	if network, err := s.ncpNetworkingStore.GetNetworkByName(ctx, req.Name); err == nil {
   687  		return ncpNetworkingNetworkToNetworkResponse(network)
   688  	} else if !errors.Is(err, ncproxystore.ErrBucketNotFound) && !errors.Is(err, ncproxystore.ErrKeyNotFound) {
   689  		log.G(ctx).WithError(err).Warn("Failed to query ncproxy networking database")
   690  	}
   691  
   692  	network, err := hcn.GetNetworkByName(req.Name)
   693  	if err != nil {
   694  		if _, ok := err.(hcn.NetworkNotFoundError); ok {
   695  			return nil, status.Errorf(codes.NotFound, "no network with name `%s` found", req.Name)
   696  		}
   697  		return nil, errors.Wrapf(err, "failed to get network with name %q", req.Name)
   698  	}
   699  
   700  	return hcnNetworkToNetworkResponse(ctx, network)
   701  }
   702  
   703  func (s *grpcService) GetNetworks(ctx context.Context, req *ncproxygrpc.GetNetworksRequest) (_ *ncproxygrpc.GetNetworksResponse, err error) {
   704  	ctx, span := oc.StartSpan(ctx, "GetNetworks")
   705  	defer span.End()
   706  	defer func() { oc.SetSpanStatus(span, err) }()
   707  
   708  	networks := []*ncproxygrpc.GetNetworkResponse{}
   709  
   710  	rawHCNNetworks, err := hcn.ListNetworks()
   711  	if err != nil {
   712  		return nil, errors.Wrap(err, "failed to get HNS networks")
   713  	}
   714  
   715  	rawNCProxyNetworks, err := s.ncpNetworkingStore.ListNetworks(ctx)
   716  	if err != nil && !errors.Is(err, ncproxystore.ErrBucketNotFound) {
   717  		return nil, errors.Wrap(err, "failed to get ncproxy networking networks")
   718  	}
   719  
   720  	for _, network := range rawHCNNetworks {
   721  		n, err := hcnNetworkToNetworkResponse(ctx, &network)
   722  		if err != nil {
   723  			return nil, err
   724  		}
   725  		networks = append(networks, n)
   726  	}
   727  
   728  	for _, network := range rawNCProxyNetworks {
   729  		n, err := ncpNetworkingNetworkToNetworkResponse(network)
   730  		if err != nil {
   731  			return nil, err
   732  		}
   733  		networks = append(networks, n)
   734  	}
   735  
   736  	return &ncproxygrpc.GetNetworksResponse{
   737  		Networks: networks,
   738  	}, nil
   739  }
   740  
   741  // TTRPC service exposed for use by the shim.
   742  type ttrpcService struct {
   743  	// containerIDToComputeAgent is a cache that stores the mappings from
   744  	// container ID to compute agent address is memory. This is repopulated
   745  	// on reconnect and referenced during client calls.
   746  	containerIDToComputeAgent *computeAgentCache
   747  	// agentStore refers to the database that stores the mappings from
   748  	// containerID to compute agent address persistently. This is referenced
   749  	// on reconnect and when registering/unregistering a compute agent.
   750  	agentStore *ncproxystore.ComputeAgentStore
   751  }
   752  
   753  func newTTRPCService(ctx context.Context, agent *computeAgentCache, agentStore *ncproxystore.ComputeAgentStore) *ttrpcService {
   754  	return &ttrpcService{
   755  		containerIDToComputeAgent: agent,
   756  		agentStore:                agentStore,
   757  	}
   758  }
   759  
   760  func getComputeAgentClient(agentAddr string) (*computeAgentClient, error) {
   761  	conn, err := winioDialPipe(agentAddr, nil)
   762  	if err != nil {
   763  		return nil, errors.Wrap(err, "failed to connect to compute agent service")
   764  	}
   765  	raw := ttrpcNewClient(
   766  		conn,
   767  		ttrpc.WithUnaryClientInterceptor(octtrpc.ClientInterceptor()),
   768  		ttrpc.WithOnClose(func() { conn.Close() }),
   769  	)
   770  	return &computeAgentClient{raw, computeagent.NewComputeAgentClient(raw)}, nil
   771  }
   772  
   773  func (s *ttrpcService) RegisterComputeAgent(ctx context.Context, req *ncproxyttrpc.RegisterComputeAgentRequest) (_ *ncproxyttrpc.RegisterComputeAgentResponse, err error) {
   774  	ctx, span := oc.StartSpan(ctx, "RegisterComputeAgent")
   775  	defer span.End()
   776  	defer func() { oc.SetSpanStatus(span, err) }()
   777  
   778  	span.AddAttributes(
   779  		trace.StringAttribute("containerID", req.ContainerID),
   780  		trace.StringAttribute("agentAddress", req.AgentAddress))
   781  
   782  	agent, err := getComputeAgentClient(req.AgentAddress)
   783  	if err != nil {
   784  		return nil, err
   785  	}
   786  
   787  	if err := s.agentStore.UpdateComputeAgent(ctx, req.ContainerID, req.AgentAddress); err != nil {
   788  		return nil, err
   789  	}
   790  
   791  	// Add to client cache if connection succeeds. Don't check if there's already a map entry
   792  	// just overwrite as the client may have changed the address of the config agent.
   793  	if err := s.containerIDToComputeAgent.put(req.ContainerID, agent); err != nil {
   794  		return nil, err
   795  	}
   796  
   797  	return &ncproxyttrpc.RegisterComputeAgentResponse{}, nil
   798  }
   799  
   800  func (s *ttrpcService) UnregisterComputeAgent(ctx context.Context, req *ncproxyttrpc.UnregisterComputeAgentRequest) (_ *ncproxyttrpc.UnregisterComputeAgentResponse, err error) {
   801  	ctx, span := oc.StartSpan(ctx, "UnregisterComputeAgent")
   802  	defer span.End()
   803  	defer func() { oc.SetSpanStatus(span, err) }()
   804  
   805  	span.AddAttributes(
   806  		trace.StringAttribute("containerID", req.ContainerID))
   807  
   808  	err = s.agentStore.DeleteComputeAgent(ctx, req.ContainerID)
   809  	if err != nil {
   810  		log.G(ctx).WithField("key", req.ContainerID).WithError(err).Warn("failed to delete key from compute agent store")
   811  	}
   812  
   813  	// remove the agent from the cache and return it so we can clean up its resources as well
   814  	agent, err := s.containerIDToComputeAgent.getAndDelete(req.ContainerID)
   815  	if err != nil {
   816  		return nil, err
   817  	}
   818  	if agent != nil {
   819  		if err := agent.Close(); err != nil {
   820  			return nil, err
   821  		}
   822  	}
   823  
   824  	return &ncproxyttrpc.UnregisterComputeAgentResponse{}, nil
   825  }
   826  
   827  func (s *ttrpcService) ConfigureNetworking(ctx context.Context, req *ncproxyttrpc.ConfigureNetworkingInternalRequest) (_ *ncproxyttrpc.ConfigureNetworkingInternalResponse, err error) {
   828  	ctx, span := oc.StartSpan(ctx, "ConfigureNetworking")
   829  	defer span.End()
   830  	defer func() { oc.SetSpanStatus(span, err) }()
   831  
   832  	span.AddAttributes(
   833  		trace.StringAttribute("containerID", req.ContainerID),
   834  		trace.StringAttribute("agentAddress", req.RequestType.String()))
   835  
   836  	if req.ContainerID == "" {
   837  		return nil, status.Error(codes.InvalidArgument, "ContainerID is empty")
   838  	}
   839  
   840  	if nodeNetSvcClient == nil {
   841  		return nil, status.Error(codes.FailedPrecondition, "No NodeNetworkService client registered")
   842  	}
   843  
   844  	switch req.RequestType {
   845  	case ncproxyttrpc.RequestTypeInternal_Setup:
   846  	case ncproxyttrpc.RequestTypeInternal_Teardown:
   847  	default:
   848  		return nil, status.Errorf(codes.InvalidArgument, "Request type %d is not known", req.RequestType)
   849  	}
   850  
   851  	netsvcReq := &nodenetsvc.ConfigureNetworkingRequest{
   852  		ContainerID: req.ContainerID,
   853  		RequestType: nodenetsvc.RequestType(req.RequestType),
   854  	}
   855  
   856  	ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
   857  	defer cancel()
   858  	if _, err := nodeNetSvcClient.ConfigureNetworking(ctx, netsvcReq); err != nil {
   859  		return nil, err
   860  	}
   861  	return &ncproxyttrpc.ConfigureNetworkingInternalResponse{}, nil
   862  }
   863  

View as plain text