1
2
3
4
19
20 package ipvs
21
22 import (
23 "fmt"
24 "sync"
25 "time"
26
27 "k8s.io/apimachinery/pkg/util/wait"
28 "k8s.io/klog/v2"
29 utilipvs "k8s.io/kubernetes/pkg/proxy/ipvs/util"
30 )
31
32 const (
33 rsCheckDeleteInterval = 1 * time.Minute
34 )
35
36
37
38 type listItem struct {
39 VirtualServer *utilipvs.VirtualServer
40 RealServer *utilipvs.RealServer
41 }
42
43
44 func (g *listItem) String() string {
45 return GetUniqueRSName(g.VirtualServer, g.RealServer)
46 }
47
48
49 func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string {
50 return vs.String() + "/" + rs.String()
51 }
52
53 type graceTerminateRSList struct {
54 lock sync.Mutex
55 list map[string]*listItem
56 }
57
58
59 func (q *graceTerminateRSList) add(rs *listItem) bool {
60 q.lock.Lock()
61 defer q.lock.Unlock()
62
63 uniqueRS := rs.String()
64 if _, ok := q.list[uniqueRS]; ok {
65 return false
66 }
67
68 klog.V(5).InfoS("Adding real server to graceful delete real server list", "realServer", rs)
69 q.list[uniqueRS] = rs
70 return true
71 }
72
73
74 func (q *graceTerminateRSList) remove(rs *listItem) bool {
75 q.lock.Lock()
76 defer q.lock.Unlock()
77
78 uniqueRS := rs.String()
79 if _, ok := q.list[uniqueRS]; ok {
80 delete(q.list, uniqueRS)
81 return true
82 }
83 return false
84 }
85
86
87 func (q *graceTerminateRSList) len() int {
88 q.lock.Lock()
89 defer q.lock.Unlock()
90
91 return len(q.list)
92 }
93
94 func (q *graceTerminateRSList) flushList(handler func(rsToDelete *listItem) (bool, error)) bool {
95 q.lock.Lock()
96 defer q.lock.Unlock()
97 success := true
98 for name, rs := range q.list {
99 deleted, err := handler(rs)
100 if err != nil {
101 klog.ErrorS(err, "Error in deleting real server", "realServer", name)
102 success = false
103 }
104 if deleted {
105 klog.InfoS("Removed real server from graceful delete real server list", "realServer", name)
106 delete(q.list, rs.String())
107 }
108 }
109 return success
110 }
111
112
113 func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) {
114 q.lock.Lock()
115 defer q.lock.Unlock()
116
117 if rs, ok := q.list[uniqueRS]; ok {
118 return rs, true
119 }
120 return nil, false
121 }
122
123
124
125 type GracefulTerminationManager struct {
126 rsList graceTerminateRSList
127 ipvs utilipvs.Interface
128 }
129
130
131 func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager {
132 l := make(map[string]*listItem)
133 return &GracefulTerminationManager{
134 rsList: graceTerminateRSList{
135 list: l,
136 },
137 ipvs: ipvs,
138 }
139 }
140
141
142 func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool {
143 _, exist := m.rsList.exist(uniqueRS)
144 return exist
145 }
146
147
148 func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error {
149
150 ele := &listItem{
151 VirtualServer: vs,
152 RealServer: rs,
153 }
154 deleted, err := m.deleteRsFunc(ele)
155 if err != nil {
156 klog.ErrorS(err, "Error in deleting real server", "realServer", ele)
157 }
158 if deleted {
159 return nil
160 }
161 rs.Weight = 0
162 err = m.ipvs.UpdateRealServer(vs, rs)
163 if err != nil {
164 return err
165 }
166 klog.V(5).InfoS("Adding real server to graceful delete real server list", "realServer", ele)
167 m.rsList.add(ele)
168 return nil
169 }
170
171 func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, error) {
172 klog.V(5).InfoS("Trying to delete real server", "realServer", rsToDelete)
173 rss, err := m.ipvs.GetRealServers(rsToDelete.VirtualServer)
174 if err != nil {
175 return false, err
176 }
177 for _, rs := range rss {
178 if rsToDelete.RealServer.Equal(rs) {
179
180
181
182 if utilipvs.IsRsGracefulTerminationNeeded(rsToDelete.VirtualServer.Protocol) && rs.ActiveConn+rs.InactiveConn != 0 {
183 klog.V(5).InfoS("Skip deleting real server till all connection have expired", "realServer", rsToDelete, "activeConnection", rs.ActiveConn, "inactiveConnection", rs.InactiveConn)
184 return false, nil
185 }
186 klog.V(5).InfoS("Deleting real server", "realServer", rsToDelete)
187 err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rs)
188 if err != nil {
189 return false, fmt.Errorf("delete destination %q err: %w", rs.String(), err)
190 }
191 return true, nil
192 }
193 }
194 return true, fmt.Errorf("failed to delete rs %q, can't find the real server", rsToDelete.String())
195 }
196
197 func (m *GracefulTerminationManager) tryDeleteRs() {
198 if !m.rsList.flushList(m.deleteRsFunc) {
199 klog.ErrorS(nil, "Try flush graceful termination list error")
200 }
201 }
202
203
204 func (m *GracefulTerminationManager) MoveRSOutofGracefulDeleteList(uniqueRS string) error {
205 rsToDelete, find := m.rsList.exist(uniqueRS)
206 if !find || rsToDelete == nil {
207 return fmt.Errorf("failed to find rs: %q", uniqueRS)
208 }
209 err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer)
210 if err != nil {
211 return err
212 }
213 m.rsList.remove(rsToDelete)
214 return nil
215 }
216
217
218 func (m *GracefulTerminationManager) Run() {
219 go wait.Until(m.tryDeleteRs, rsCheckDeleteInterval, wait.NeverStop)
220 }
221
View as plain text