1
2
3
4
19
20 package ipvs
21
22 import (
23 "fmt"
24 "net"
25 "strings"
26 "sync"
27 "time"
28
29 "errors"
30 libipvs "github.com/moby/ipvs"
31
32 "golang.org/x/sys/unix"
33 "k8s.io/klog/v2"
34 )
35
36
37 type runner struct {
38 ipvsHandle *libipvs.Handle
39 mu sync.Mutex
40 }
41
42
43 type Protocol uint16
44
45
46 func New() Interface {
47 handle, err := libipvs.New("")
48 if err != nil {
49 klog.ErrorS(err, "IPVS interface can't be initialized")
50 return nil
51 }
52 return &runner{
53 ipvsHandle: handle,
54 }
55 }
56
57
58 func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
59 svc, err := toIPVSService(vs)
60 if err != nil {
61 return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
62 }
63 runner.mu.Lock()
64 defer runner.mu.Unlock()
65 return runner.ipvsHandle.NewService(svc)
66 }
67
68
69 func (runner *runner) UpdateVirtualServer(vs *VirtualServer) error {
70 svc, err := toIPVSService(vs)
71 if err != nil {
72 return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
73 }
74 runner.mu.Lock()
75 defer runner.mu.Unlock()
76 return runner.ipvsHandle.UpdateService(svc)
77 }
78
79
80 func (runner *runner) DeleteVirtualServer(vs *VirtualServer) error {
81 svc, err := toIPVSService(vs)
82 if err != nil {
83 return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
84 }
85 runner.mu.Lock()
86 defer runner.mu.Unlock()
87 return runner.ipvsHandle.DelService(svc)
88 }
89
90
91 func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error) {
92 svc, err := toIPVSService(vs)
93 if err != nil {
94 return nil, fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
95 }
96 runner.mu.Lock()
97 ipvsSvc, err := runner.ipvsHandle.GetService(svc)
98 runner.mu.Unlock()
99
100 if err != nil {
101 return nil, fmt.Errorf("could not get IPVS service: %w", err)
102 }
103 vServ, err := toVirtualServer(ipvsSvc)
104 if err != nil {
105 return nil, fmt.Errorf("could not convert IPVS service to local virtual server: %w", err)
106 }
107 return vServ, nil
108 }
109
110
111 func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
112 runner.mu.Lock()
113 ipvsSvcs, err := runner.ipvsHandle.GetServices()
114 runner.mu.Unlock()
115 if err != nil {
116 return nil, fmt.Errorf("could not get IPVS services: %w", err)
117 }
118 vss := make([]*VirtualServer, 0)
119 for _, ipvsSvc := range ipvsSvcs {
120 vs, err := toVirtualServer(ipvsSvc)
121 if err != nil {
122 return nil, fmt.Errorf("could not convert IPVS service to local virtual server: %w", err)
123 }
124 vss = append(vss, vs)
125 }
126 return vss, nil
127 }
128
129
130 func (runner *runner) Flush() error {
131 runner.mu.Lock()
132 defer runner.mu.Unlock()
133 return runner.ipvsHandle.Flush()
134 }
135
136
137 func (runner *runner) AddRealServer(vs *VirtualServer, rs *RealServer) error {
138 svc, err := toIPVSService(vs)
139 if err != nil {
140 return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
141 }
142 dst, err := toIPVSDestination(rs)
143 if err != nil {
144 return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
145 }
146 runner.mu.Lock()
147 defer runner.mu.Unlock()
148 return runner.ipvsHandle.NewDestination(svc, dst)
149 }
150
151
152 func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error {
153 svc, err := toIPVSService(vs)
154 if err != nil {
155 return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
156 }
157 dst, err := toIPVSDestination(rs)
158 if err != nil {
159 return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
160 }
161 runner.mu.Lock()
162 defer runner.mu.Unlock()
163 return runner.ipvsHandle.DelDestination(svc, dst)
164 }
165
166 func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error {
167 svc, err := toIPVSService(vs)
168 if err != nil {
169 return fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
170 }
171 dst, err := toIPVSDestination(rs)
172 if err != nil {
173 return fmt.Errorf("could not convert local real server to IPVS destination: %w", err)
174 }
175 runner.mu.Lock()
176 defer runner.mu.Unlock()
177 return runner.ipvsHandle.UpdateDestination(svc, dst)
178 }
179
180
181 func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) {
182 svc, err := toIPVSService(vs)
183 if err != nil {
184 return nil, fmt.Errorf("could not convert local virtual server to IPVS service: %w", err)
185 }
186 runner.mu.Lock()
187 dsts, err := runner.ipvsHandle.GetDestinations(svc)
188 runner.mu.Unlock()
189 if err != nil {
190 return nil, fmt.Errorf("could not get IPVS destination for service: %w", err)
191 }
192 rss := make([]*RealServer, 0)
193 for _, dst := range dsts {
194 dst, err := toRealServer(dst)
195
196 if err != nil {
197 return nil, fmt.Errorf("could not convert IPVS destination to local real server: %w", err)
198 }
199 rss = append(rss, dst)
200 }
201 return rss, nil
202 }
203
204
205 func (runner *runner) ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout time.Duration) error {
206 ipvsConfig := &libipvs.Config{
207 TimeoutTCP: tcpTimeout,
208 TimeoutTCPFin: tcpFinTimeout,
209 TimeoutUDP: udpTimeout,
210 }
211
212 return runner.ipvsHandle.SetConfig(ipvsConfig)
213 }
214
215
216 func toVirtualServer(svc *libipvs.Service) (*VirtualServer, error) {
217 if svc == nil {
218 return nil, errors.New("ipvs svc should not be empty")
219 }
220 vs := &VirtualServer{
221 Address: svc.Address,
222 Port: svc.Port,
223 Scheduler: svc.SchedName,
224 Protocol: protocolToString(Protocol(svc.Protocol)),
225 Timeout: svc.Timeout,
226 }
227
228
229 if svc.Flags&FlagHashed == 0 {
230 return nil, fmt.Errorf("Flags of successfully created IPVS service should enable the flag (%x) since every service is hashed into the service table", FlagHashed)
231 }
232
233
234 vs.Flags = ServiceFlags(svc.Flags &^ uint32(FlagHashed))
235
236 if vs.Address == nil {
237 if svc.AddressFamily == unix.AF_INET {
238 vs.Address = net.IPv4zero
239 } else {
240 vs.Address = net.IPv6zero
241 }
242 }
243 return vs, nil
244 }
245
246
247 func toRealServer(dst *libipvs.Destination) (*RealServer, error) {
248 if dst == nil {
249 return nil, errors.New("ipvs destination should not be empty")
250 }
251 return &RealServer{
252 Address: dst.Address,
253 Port: dst.Port,
254 Weight: dst.Weight,
255 ActiveConn: dst.ActiveConnections,
256 InactiveConn: dst.InactiveConnections,
257 }, nil
258 }
259
260
261 func toIPVSService(vs *VirtualServer) (*libipvs.Service, error) {
262 if vs == nil {
263 return nil, errors.New("virtual server should not be empty")
264 }
265 ipvsSvc := &libipvs.Service{
266 Address: vs.Address,
267 Protocol: stringToProtocol(vs.Protocol),
268 Port: vs.Port,
269 SchedName: vs.Scheduler,
270 Flags: uint32(vs.Flags),
271 Timeout: vs.Timeout,
272 }
273
274 if ip4 := vs.Address.To4(); ip4 != nil {
275 ipvsSvc.AddressFamily = unix.AF_INET
276 ipvsSvc.Netmask = 0xffffffff
277 } else {
278 ipvsSvc.AddressFamily = unix.AF_INET6
279 ipvsSvc.Netmask = 128
280 }
281 return ipvsSvc, nil
282 }
283
284
285 func toIPVSDestination(rs *RealServer) (*libipvs.Destination, error) {
286 if rs == nil {
287 return nil, errors.New("real server should not be empty")
288 }
289 return &libipvs.Destination{
290 Address: rs.Address,
291 Port: rs.Port,
292 Weight: rs.Weight,
293 }, nil
294 }
295
296
297 func stringToProtocol(protocol string) uint16 {
298 switch strings.ToLower(protocol) {
299 case "tcp":
300 return uint16(unix.IPPROTO_TCP)
301 case "udp":
302 return uint16(unix.IPPROTO_UDP)
303 case "sctp":
304 return uint16(unix.IPPROTO_SCTP)
305 }
306 return uint16(0)
307 }
308
309
310 func protocolToString(proto Protocol) string {
311 switch proto {
312 case unix.IPPROTO_TCP:
313 return "TCP"
314 case unix.IPPROTO_UDP:
315 return "UDP"
316 case unix.IPPROTO_SCTP:
317 return "SCTP"
318 }
319 return ""
320 }
321
View as plain text