...

Source file src/k8s.io/kubernetes/pkg/proxy/ipvs/util/ipvs_linux.go

Documentation: k8s.io/kubernetes/pkg/proxy/ipvs/util

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2017 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package ipvs
    21  
    22  import (
    23  	"fmt"
    24  	"net"
    25  	"strings"
    26  	"sync"
    27  	"time"
    28  
    29  	"errors"
    30  	libipvs "github.com/moby/ipvs"
    31  
    32  	"golang.org/x/sys/unix"
    33  	"k8s.io/klog/v2"
    34  )
    35  
    36  // runner implements ipvs.Interface.
    37  type runner struct {
    38  	ipvsHandle *libipvs.Handle
    39  	mu         sync.Mutex // Protect Netlink calls
    40  }
    41  
    42  // Protocol is the IPVS service protocol type
    43  type Protocol uint16
    44  
    45  // New returns a new Interface which will call ipvs APIs.
    46  func New() Interface {
    47  	handle, err := libipvs.New("")
    48  	if err != nil {
    49  		klog.ErrorS(err, "IPVS interface can't be initialized")
    50  		return nil
    51  	}
    52  	return &runner{
    53  		ipvsHandle: handle,
    54  	}
    55  }
    56  
    57  // AddVirtualServer is part of ipvs.Interface.
    58  func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
    59  	svc, err := toIPVSService(vs)
    60  	if err != nil {
    61  		return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    62  	}
    63  	runner.mu.Lock()
    64  	defer runner.mu.Unlock()
    65  	return runner.ipvsHandle.NewService(svc)
    66  }
    67  
    68  // UpdateVirtualServer is part of ipvs.Interface.
    69  func (runner *runner) UpdateVirtualServer(vs *VirtualServer) error {
    70  	svc, err := toIPVSService(vs)
    71  	if err != nil {
    72  		return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    73  	}
    74  	runner.mu.Lock()
    75  	defer runner.mu.Unlock()
    76  	return runner.ipvsHandle.UpdateService(svc)
    77  }
    78  
    79  // DeleteVirtualServer is part of ipvs.Interface.
    80  func (runner *runner) DeleteVirtualServer(vs *VirtualServer) error {
    81  	svc, err := toIPVSService(vs)
    82  	if err != nil {
    83  		return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    84  	}
    85  	runner.mu.Lock()
    86  	defer runner.mu.Unlock()
    87  	return runner.ipvsHandle.DelService(svc)
    88  }
    89  
    90  // GetVirtualServer is part of ipvs.Interface.
    91  func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error) {
    92  	svc, err := toIPVSService(vs)
    93  	if err != nil {
    94  		return nil, fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
    95  	}
    96  	runner.mu.Lock()
    97  	ipvsSvc, err := runner.ipvsHandle.GetService(svc)
    98  	runner.mu.Unlock()
    99  
   100  	if err != nil {
   101  		return nil, fmt.Errorf("could not get IPVS service: %w", err)
   102  	}
   103  	vServ, err := toVirtualServer(ipvsSvc)
   104  	if err != nil {
   105  		return nil, fmt.Errorf("could not convert IPVS service to local virtual server: %w", err)
   106  	}
   107  	return vServ, nil
   108  }
   109  
   110  // GetVirtualServers is part of ipvs.Interface.
   111  func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
   112  	runner.mu.Lock()
   113  	ipvsSvcs, err := runner.ipvsHandle.GetServices()
   114  	runner.mu.Unlock()
   115  	if err != nil {
   116  		return nil, fmt.Errorf("could not get IPVS services: %w", err)
   117  	}
   118  	vss := make([]*VirtualServer, 0)
   119  	for _, ipvsSvc := range ipvsSvcs {
   120  		vs, err := toVirtualServer(ipvsSvc)
   121  		if err != nil {
   122  			return nil, fmt.Errorf("could not convert IPVS service to local virtual server: %w", err)
   123  		}
   124  		vss = append(vss, vs)
   125  	}
   126  	return vss, nil
   127  }
   128  
   129  // Flush is part of ipvs.Interface. Currently we delete IPVS services one by one
   130  func (runner *runner) Flush() error {
   131  	runner.mu.Lock()
   132  	defer runner.mu.Unlock()
   133  	return runner.ipvsHandle.Flush()
   134  }
   135  
   136  // AddRealServer is part of ipvs.Interface.
   137  func (runner *runner) AddRealServer(vs *VirtualServer, rs *RealServer) error {
   138  	svc, err := toIPVSService(vs)
   139  	if err != nil {
   140  		return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
   141  	}
   142  	dst, err := toIPVSDestination(rs)
   143  	if err != nil {
   144  		return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
   145  	}
   146  	runner.mu.Lock()
   147  	defer runner.mu.Unlock()
   148  	return runner.ipvsHandle.NewDestination(svc, dst)
   149  }
   150  
   151  // DeleteRealServer is part of ipvs.Interface.
   152  func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error {
   153  	svc, err := toIPVSService(vs)
   154  	if err != nil {
   155  		return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
   156  	}
   157  	dst, err := toIPVSDestination(rs)
   158  	if err != nil {
   159  		return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
   160  	}
   161  	runner.mu.Lock()
   162  	defer runner.mu.Unlock()
   163  	return runner.ipvsHandle.DelDestination(svc, dst)
   164  }
   165  
   166  func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error {
   167  	svc, err := toIPVSService(vs)
   168  	if err != nil {
   169  		return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
   170  	}
   171  	dst, err := toIPVSDestination(rs)
   172  	if err != nil {
   173  		return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
   174  	}
   175  	runner.mu.Lock()
   176  	defer runner.mu.Unlock()
   177  	return runner.ipvsHandle.UpdateDestination(svc, dst)
   178  }
   179  
   180  // GetRealServers is part of ipvs.Interface.
   181  func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) {
   182  	svc, err := toIPVSService(vs)
   183  	if err != nil {
   184  		return nil, fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
   185  	}
   186  	runner.mu.Lock()
   187  	dsts, err := runner.ipvsHandle.GetDestinations(svc)
   188  	runner.mu.Unlock()
   189  	if err != nil {
   190  		return nil, fmt.Errorf("could not get IPVS destination for service: %w", err)
   191  	}
   192  	rss := make([]*RealServer, 0)
   193  	for _, dst := range dsts {
   194  		dst, err := toRealServer(dst)
   195  		// TODO: aggregate errors?
   196  		if err != nil {
   197  			return nil, fmt.Errorf("could not convert IPVS destination to local real server: %w", err)
   198  		}
   199  		rss = append(rss, dst)
   200  	}
   201  	return rss, nil
   202  }
   203  
   204  // ConfigureTimeouts is the equivalent to running "ipvsadm --set" to configure tcp, tcpfin and udp timeouts
   205  func (runner *runner) ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout time.Duration) error {
   206  	ipvsConfig := &libipvs.Config{
   207  		TimeoutTCP:    tcpTimeout,
   208  		TimeoutTCPFin: tcpFinTimeout,
   209  		TimeoutUDP:    udpTimeout,
   210  	}
   211  
   212  	return runner.ipvsHandle.SetConfig(ipvsConfig)
   213  }
   214  
   215  // toVirtualServer converts an IPVS Service to the equivalent VirtualServer structure.
   216  func toVirtualServer(svc *libipvs.Service) (*VirtualServer, error) {
   217  	if svc == nil {
   218  		return nil, errors.New("ipvs svc should not be empty")
   219  	}
   220  	vs := &VirtualServer{
   221  		Address:   svc.Address,
   222  		Port:      svc.Port,
   223  		Scheduler: svc.SchedName,
   224  		Protocol:  protocolToString(Protocol(svc.Protocol)),
   225  		Timeout:   svc.Timeout,
   226  	}
   227  
   228  	// Test FlagHashed (0x2). A valid flag must include FlagHashed
   229  	if svc.Flags&FlagHashed == 0 {
   230  		return nil, fmt.Errorf("Flags of successfully created IPVS service should enable the flag (%x) since every service is hashed into the service table", FlagHashed)
   231  	}
   232  	// Sub Flags to 0x2
   233  	// 011 -> 001, 010 -> 000
   234  	vs.Flags = ServiceFlags(svc.Flags &^ uint32(FlagHashed))
   235  
   236  	if vs.Address == nil {
   237  		if svc.AddressFamily == unix.AF_INET {
   238  			vs.Address = net.IPv4zero
   239  		} else {
   240  			vs.Address = net.IPv6zero
   241  		}
   242  	}
   243  	return vs, nil
   244  }
   245  
   246  // toRealServer converts an IPVS Destination to the equivalent RealServer structure.
   247  func toRealServer(dst *libipvs.Destination) (*RealServer, error) {
   248  	if dst == nil {
   249  		return nil, errors.New("ipvs destination should not be empty")
   250  	}
   251  	return &RealServer{
   252  		Address:      dst.Address,
   253  		Port:         dst.Port,
   254  		Weight:       dst.Weight,
   255  		ActiveConn:   dst.ActiveConnections,
   256  		InactiveConn: dst.InactiveConnections,
   257  	}, nil
   258  }
   259  
   260  // toIPVSService converts a VirtualServer to the equivalent IPVS Service structure.
   261  func toIPVSService(vs *VirtualServer) (*libipvs.Service, error) {
   262  	if vs == nil {
   263  		return nil, errors.New("virtual server should not be empty")
   264  	}
   265  	ipvsSvc := &libipvs.Service{
   266  		Address:   vs.Address,
   267  		Protocol:  stringToProtocol(vs.Protocol),
   268  		Port:      vs.Port,
   269  		SchedName: vs.Scheduler,
   270  		Flags:     uint32(vs.Flags),
   271  		Timeout:   vs.Timeout,
   272  	}
   273  
   274  	if ip4 := vs.Address.To4(); ip4 != nil {
   275  		ipvsSvc.AddressFamily = unix.AF_INET
   276  		ipvsSvc.Netmask = 0xffffffff
   277  	} else {
   278  		ipvsSvc.AddressFamily = unix.AF_INET6
   279  		ipvsSvc.Netmask = 128
   280  	}
   281  	return ipvsSvc, nil
   282  }
   283  
   284  // toIPVSDestination converts a RealServer to the equivalent IPVS Destination structure.
   285  func toIPVSDestination(rs *RealServer) (*libipvs.Destination, error) {
   286  	if rs == nil {
   287  		return nil, errors.New("real server should not be empty")
   288  	}
   289  	return &libipvs.Destination{
   290  		Address: rs.Address,
   291  		Port:    rs.Port,
   292  		Weight:  rs.Weight,
   293  	}, nil
   294  }
   295  
   296  // stringToProtocolType returns the protocol type for the given name
   297  func stringToProtocol(protocol string) uint16 {
   298  	switch strings.ToLower(protocol) {
   299  	case "tcp":
   300  		return uint16(unix.IPPROTO_TCP)
   301  	case "udp":
   302  		return uint16(unix.IPPROTO_UDP)
   303  	case "sctp":
   304  		return uint16(unix.IPPROTO_SCTP)
   305  	}
   306  	return uint16(0)
   307  }
   308  
   309  // protocolTypeToString returns the name for the given protocol.
   310  func protocolToString(proto Protocol) string {
   311  	switch proto {
   312  	case unix.IPPROTO_TCP:
   313  		return "TCP"
   314  	case unix.IPPROTO_UDP:
   315  		return "UDP"
   316  	case unix.IPPROTO_SCTP:
   317  		return "SCTP"
   318  	}
   319  	return ""
   320  }
   321  

View as plain text