1
2
3
4
19
20 package ipam
21
22 import (
23 "context"
24 "encoding/json"
25 "fmt"
26 "net"
27
28 "k8s.io/klog/v2"
29 netutils "k8s.io/utils/net"
30
31 v1 "k8s.io/api/core/v1"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/types"
34 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
35 clientset "k8s.io/client-go/kubernetes"
36 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
37 "k8s.io/client-go/tools/record"
38 nodeutil "k8s.io/component-helpers/node/util"
39 "k8s.io/legacy-cloud-providers/gce"
40 "k8s.io/metrics/pkg/client/clientset/versioned/scheme"
41 )
42
43 type adapter struct {
44 k8s clientset.Interface
45 cloud *gce.Cloud
46
47 broadcaster record.EventBroadcaster
48 recorder record.EventRecorder
49 }
50
51 func newAdapter(ctx context.Context, k8s clientset.Interface, cloud *gce.Cloud) *adapter {
52 broadcaster := record.NewBroadcaster(record.WithContext(ctx))
53
54 ret := &adapter{
55 k8s: k8s,
56 cloud: cloud,
57 broadcaster: broadcaster,
58 recorder: broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloudCIDRAllocator"}),
59 }
60
61 return ret
62 }
63
64 func (a *adapter) Run(ctx context.Context) {
65 defer utilruntime.HandleCrash()
66
67
68 a.broadcaster.StartStructuredLogging(3)
69 a.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: a.k8s.CoreV1().Events("")})
70 defer a.broadcaster.Shutdown()
71
72 <-ctx.Done()
73 }
74
75 func (a *adapter) Alias(ctx context.Context, node *v1.Node) (*net.IPNet, error) {
76 if node.Spec.ProviderID == "" {
77 return nil, fmt.Errorf("node %s doesn't have providerID", node.Name)
78 }
79
80 cidrs, err := a.cloud.AliasRangesByProviderID(node.Spec.ProviderID)
81 if err != nil {
82 return nil, err
83 }
84
85 switch len(cidrs) {
86 case 0:
87 return nil, nil
88 case 1:
89 break
90 default:
91 klog.FromContext(ctx).Info("Node has more than one alias assigned, defaulting to the first", "node", klog.KObj(node), "CIDRs", cidrs)
92 }
93
94 _, cidrRange, err := netutils.ParseCIDRSloppy(cidrs[0])
95 if err != nil {
96 return nil, err
97 }
98
99 return cidrRange, nil
100 }
101
102 func (a *adapter) AddAlias(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error {
103 if node.Spec.ProviderID == "" {
104 return fmt.Errorf("node %s doesn't have providerID", node.Name)
105 }
106
107 return a.cloud.AddAliasToInstanceByProviderID(node.Spec.ProviderID, cidrRange)
108 }
109
110 func (a *adapter) Node(ctx context.Context, name string) (*v1.Node, error) {
111 return a.k8s.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
112 }
113
114 func (a *adapter) UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error {
115 patch := map[string]interface{}{
116 "apiVersion": node.APIVersion,
117 "kind": node.Kind,
118 "metadata": map[string]interface{}{"name": node.Name},
119 "spec": map[string]interface{}{"podCIDR": cidrRange.String()},
120 }
121 bytes, err := json.Marshal(patch)
122 if err != nil {
123 return err
124 }
125
126 _, err = a.k8s.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
127 return err
128 }
129
130 func (a *adapter) UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error {
131 condition := v1.ConditionFalse
132 if unavailable {
133 condition = v1.ConditionTrue
134 }
135 return nodeutil.SetNodeCondition(a.k8s, types.NodeName(nodeName), v1.NodeCondition{
136 Type: v1.NodeNetworkUnavailable,
137 Status: condition,
138 Reason: "RouteCreated",
139 Message: "NodeController created an implicit route",
140 LastTransitionTime: metav1.Now(),
141 })
142 }
143
144 func (a *adapter) EmitNodeWarningEvent(nodeName, reason, fmt string, args ...interface{}) {
145 ref := &v1.ObjectReference{Kind: "Node", Name: nodeName}
146 a.recorder.Eventf(ref, v1.EventTypeNormal, reason, fmt, args...)
147 }
148
View as plain text