1
2
3 package trafficshaping
4
5 import (
6 "context"
7 "errors"
8 "net"
9
10 "github.com/vishvananda/netlink"
11 v1 "k8s.io/api/core/v1"
12 "sigs.k8s.io/controller-runtime/pkg/client"
13
14 k8snet "edge-infra.dev/pkg/k8s/net/calico"
15 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
16 "edge-infra.dev/pkg/lib/kernel/netlink/link"
17 v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
18 "edge-infra.dev/pkg/sds/ien/k8s/controllers/nodeagent/config"
19 "edge-infra.dev/pkg/sds/ien/topology"
20 )
21
22 var (
23 ifbPrefix = "ifb"
24 )
25
26 type Plugin struct {
27 client client.Client
28 tpInfo *topology.Info
29 }
30
31 func (p Plugin) Reconcile(ctx context.Context, ienode *v1ien.IENode, cfg config.Config) (reconcile.Result, error) {
32 p.client = cfg.GetClient()
33
34 if !ienode.Spec.IsGatewayNode() {
35 return reconcile.ResultEmpty, nil
36 }
37
38 tpInfo := &topology.Info{}
39 tpInfo, err := tpInfo.FromClient(ctx, p.client)
40 if err != nil {
41 return reconcile.ResultRequeue, err
42 }
43 p.tpInfo = tpInfo
44
45 if tpInfo.EgressGatewayEnabled {
46 if err := p.enable(ctx, ienode); err != nil {
47 return reconcile.ResultRequeue, err
48 }
49 return reconcile.ResultSuccess, nil
50 }
51 if err := p.disable(ienode); err != nil {
52 return reconcile.ResultRequeue, nil
53 }
54 return reconcile.ResultSuccess, nil
55 }
56
57 func (p Plugin) enable(ctx context.Context, ienode *v1ien.IENode) error {
58 defaultLink, err := getDefaultLink(ienode)
59 if err != nil {
60 return err
61 }
62
63 netlink := link.NetLink{}
64
65 ifbLink := netlink.NewIfb(ifbPrefix+defaultLink.Attrs().Name, defaultLink.Attrs().MTU)
66 updatedIfbLink, err := netlink.Ensure(ifbLink)
67 if err != nil {
68 return err
69 }
70
71 return p.reconcile(ctx, ienode, defaultLink, updatedIfbLink, p.tpInfo.UplinkRateLimit, p.tpInfo.DownlinkRateLimit)
72 }
73
74 func (p *Plugin) disable(ienode *v1ien.IENode) error {
75 defaultLink, err := getDefaultLink(ienode)
76 if err != nil {
77 return err
78 }
79
80 qdiscs, err := netlink.QdiscList(defaultLink)
81 if err != nil {
82 return err
83 }
84
85 for _, qdisc := range qdiscs {
86 if qdisc.Type() != "ingress" && qdisc.Type() != "htb" {
87 continue
88 }
89 if err := netlink.QdiscDel(qdisc); err != nil {
90 return err
91 }
92 }
93 netlink := link.NetLink{}
94 return netlink.Delete(ifbPrefix + defaultLink.Attrs().Name)
95 }
96
97 func (p Plugin) reconcile(ctx context.Context, ienode *v1ien.IENode, defaultLink, ifbLink netlink.Link, egressRate, ingressRate uint64) error {
98 networksToIgnore, err := p.getNodesIPs(ctx)
99 if err != nil {
100 return err
101 }
102 networksToIgnore = append(networksToIgnore, p.parseKubeVipIP(ienode))
103 defaultIndex := defaultLink.Attrs().Index
104 ifbIndex := ifbLink.Attrs().Index
105 return reconcileTrafficControl(defaultIndex, ifbIndex, egressRate, ingressRate, networksToIgnore)
106 }
107
108 func (p Plugin) parseKubeVipIP(ienode *v1ien.IENode) *net.IPNet {
109 vipIP := net.ParseIP(ienode.Spec.KubeVip)
110 mask := net.IPMask(net.ParseIP("255.255.255.255"))
111 return &net.IPNet{IP: vipIP, Mask: mask}
112 }
113
114
115 func (p Plugin) getNodesIPs(ctx context.Context) ([]*net.IPNet, error) {
116 nodes := v1.NodeList{}
117 if err := p.client.List(ctx, &nodes); err != nil {
118 return nil, err
119 }
120 ignoreIPs, err := k8snet.ParseNodeCIDRs(nodes.Items...)
121 if err != nil {
122 return nil, err
123 }
124 ignoreIPs, err = p.getIENodeIPs(ctx, ignoreIPs)
125 if err != nil {
126 return nil, err
127 }
128 return ignoreIPs, nil
129 }
130
131
132 func (p Plugin) getIENodeIPs(ctx context.Context, ignoreIPs []*net.IPNet) ([]*net.IPNet, error) {
133 ienodes := &v1ien.IENodeList{}
134 if err := p.client.List(ctx, ienodes); err != nil {
135 return nil, err
136 }
137
138 for _, node := range ienodes.Items {
139 iface, err := node.Spec.DefaultInterface()
140 if err != nil {
141 return nil, err
142 }
143 if iface.DHCP4 {
144 continue
145 }
146 ipNet, err := parseStaticIPs(iface)
147 if err != nil {
148 return nil, err
149 }
150 ignoreIPs = append(ignoreIPs, ipNet)
151 }
152 return ignoreIPs, nil
153 }
154
155 func parseStaticIPs(network v1ien.Network) (*net.IPNet, error) {
156 if len(network.Addresses) == 0 {
157 return nil, errors.New("node has invalid network configuration with no static ips configured")
158 }
159 _, ipNet, err := net.ParseCIDR(network.Addresses[0])
160 if err != nil {
161 return nil, err
162 }
163 return ipNet, nil
164 }
165
166
167 func getDefaultLink(ienode *v1ien.IENode) (netlink.Link, error) {
168 netlink := link.NetLink{}
169 hardwareAddress, err := ienode.Spec.DefaultMacAddress()
170 if err != nil {
171 return nil, err
172 }
173 return netlink.GetFromHardwareAddr(hardwareAddress)
174 }
175
View as plain text