...

Source file src/k8s.io/kubernetes/pkg/proxy/ipvs/graceful_termination.go

Documentation: k8s.io/kubernetes/pkg/proxy/ipvs

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2015 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package ipvs
    21  
    22  import (
    23  	"fmt"
    24  	"sync"
    25  	"time"
    26  
    27  	"k8s.io/apimachinery/pkg/util/wait"
    28  	"k8s.io/klog/v2"
    29  	utilipvs "k8s.io/kubernetes/pkg/proxy/ipvs/util"
    30  )
    31  
    32  const (
    33  	rsCheckDeleteInterval = 1 * time.Minute
    34  )
    35  
    36  // listItem stores real server information and the process time.
    37  // If nothing special happened, real server will be delete after process time.
    38  type listItem struct {
    39  	VirtualServer *utilipvs.VirtualServer
    40  	RealServer    *utilipvs.RealServer
    41  }
    42  
    43  // String return the unique real server name(with virtual server information)
    44  func (g *listItem) String() string {
    45  	return GetUniqueRSName(g.VirtualServer, g.RealServer)
    46  }
    47  
    48  // GetUniqueRSName return a string type unique rs name with vs information
    49  func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string {
    50  	return vs.String() + "/" + rs.String()
    51  }
    52  
    53  type graceTerminateRSList struct {
    54  	lock sync.Mutex
    55  	list map[string]*listItem
    56  }
    57  
    58  // add push an new element to the rsList
    59  func (q *graceTerminateRSList) add(rs *listItem) bool {
    60  	q.lock.Lock()
    61  	defer q.lock.Unlock()
    62  
    63  	uniqueRS := rs.String()
    64  	if _, ok := q.list[uniqueRS]; ok {
    65  		return false
    66  	}
    67  
    68  	klog.V(5).InfoS("Adding real server to graceful delete real server list", "realServer", rs)
    69  	q.list[uniqueRS] = rs
    70  	return true
    71  }
    72  
    73  // remove remove an element from the rsList
    74  func (q *graceTerminateRSList) remove(rs *listItem) bool {
    75  	q.lock.Lock()
    76  	defer q.lock.Unlock()
    77  
    78  	uniqueRS := rs.String()
    79  	if _, ok := q.list[uniqueRS]; ok {
    80  		delete(q.list, uniqueRS)
    81  		return true
    82  	}
    83  	return false
    84  }
    85  
    86  // return the size of the list
    87  func (q *graceTerminateRSList) len() int {
    88  	q.lock.Lock()
    89  	defer q.lock.Unlock()
    90  
    91  	return len(q.list)
    92  }
    93  
    94  func (q *graceTerminateRSList) flushList(handler func(rsToDelete *listItem) (bool, error)) bool {
    95  	q.lock.Lock()
    96  	defer q.lock.Unlock()
    97  	success := true
    98  	for name, rs := range q.list {
    99  		deleted, err := handler(rs)
   100  		if err != nil {
   101  			klog.ErrorS(err, "Error in deleting real server", "realServer", name)
   102  			success = false
   103  		}
   104  		if deleted {
   105  			klog.InfoS("Removed real server from graceful delete real server list", "realServer", name)
   106  			delete(q.list, rs.String())
   107  		}
   108  	}
   109  	return success
   110  }
   111  
   112  // exist check whether the specified unique RS is in the rsList
   113  func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) {
   114  	q.lock.Lock()
   115  	defer q.lock.Unlock()
   116  
   117  	if rs, ok := q.list[uniqueRS]; ok {
   118  		return rs, true
   119  	}
   120  	return nil, false
   121  }
   122  
   123  // GracefulTerminationManager manage rs graceful termination information and do graceful termination work
   124  // rsList is the rs list to graceful termination, ipvs is the ipvsinterface to do ipvs delete/update work
   125  type GracefulTerminationManager struct {
   126  	rsList graceTerminateRSList
   127  	ipvs   utilipvs.Interface
   128  }
   129  
   130  // NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work
   131  func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager {
   132  	l := make(map[string]*listItem)
   133  	return &GracefulTerminationManager{
   134  		rsList: graceTerminateRSList{
   135  			list: l,
   136  		},
   137  		ipvs: ipvs,
   138  	}
   139  }
   140  
   141  // InTerminationList to check whether specified unique rs name is in graceful termination list
   142  func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool {
   143  	_, exist := m.rsList.exist(uniqueRS)
   144  	return exist
   145  }
   146  
   147  // GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list
   148  func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error {
   149  	// Try to delete rs before add it to graceful delete list
   150  	ele := &listItem{
   151  		VirtualServer: vs,
   152  		RealServer:    rs,
   153  	}
   154  	deleted, err := m.deleteRsFunc(ele)
   155  	if err != nil {
   156  		klog.ErrorS(err, "Error in deleting real server", "realServer", ele)
   157  	}
   158  	if deleted {
   159  		return nil
   160  	}
   161  	rs.Weight = 0
   162  	err = m.ipvs.UpdateRealServer(vs, rs)
   163  	if err != nil {
   164  		return err
   165  	}
   166  	klog.V(5).InfoS("Adding real server to graceful delete real server list", "realServer", ele)
   167  	m.rsList.add(ele)
   168  	return nil
   169  }
   170  
   171  func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, error) {
   172  	klog.V(5).InfoS("Trying to delete real server", "realServer", rsToDelete)
   173  	rss, err := m.ipvs.GetRealServers(rsToDelete.VirtualServer)
   174  	if err != nil {
   175  		return false, err
   176  	}
   177  	for _, rs := range rss {
   178  		if rsToDelete.RealServer.Equal(rs) {
   179  			// For UDP and SCTP traffic, no graceful termination, we immediately delete the RS
   180  			//     (existing connections will be deleted on the next packet because sysctlExpireNoDestConn=1)
   181  			// For other protocols, don't delete until all connections have expired)
   182  			if utilipvs.IsRsGracefulTerminationNeeded(rsToDelete.VirtualServer.Protocol) && rs.ActiveConn+rs.InactiveConn != 0 {
   183  				klog.V(5).InfoS("Skip deleting real server till all connection have expired", "realServer", rsToDelete, "activeConnection", rs.ActiveConn, "inactiveConnection", rs.InactiveConn)
   184  				return false, nil
   185  			}
   186  			klog.V(5).InfoS("Deleting real server", "realServer", rsToDelete)
   187  			err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rs)
   188  			if err != nil {
   189  				return false, fmt.Errorf("delete destination %q err: %w", rs.String(), err)
   190  			}
   191  			return true, nil
   192  		}
   193  	}
   194  	return true, fmt.Errorf("failed to delete rs %q, can't find the real server", rsToDelete.String())
   195  }
   196  
   197  func (m *GracefulTerminationManager) tryDeleteRs() {
   198  	if !m.rsList.flushList(m.deleteRsFunc) {
   199  		klog.ErrorS(nil, "Try flush graceful termination list error")
   200  	}
   201  }
   202  
   203  // MoveRSOutofGracefulDeleteList to delete an rs and remove it from the rsList immediately
   204  func (m *GracefulTerminationManager) MoveRSOutofGracefulDeleteList(uniqueRS string) error {
   205  	rsToDelete, find := m.rsList.exist(uniqueRS)
   206  	if !find || rsToDelete == nil {
   207  		return fmt.Errorf("failed to find rs: %q", uniqueRS)
   208  	}
   209  	err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer)
   210  	if err != nil {
   211  		return err
   212  	}
   213  	m.rsList.remove(rsToDelete)
   214  	return nil
   215  }
   216  
   217  // Run start a goroutine to try to delete rs in the graceful delete rsList with an interval 1 minute
   218  func (m *GracefulTerminationManager) Run() {
   219  	go wait.Until(m.tryDeleteRs, rsCheckDeleteInterval, wait.NeverStop)
   220  }
   221  

View as plain text