1
16
17 package reconcilers
18
19 import (
20 "context"
21
22 corev1 "k8s.io/api/core/v1"
23 discovery "k8s.io/api/discovery/v1"
24 apiequality "k8s.io/apimachinery/pkg/api/equality"
25 "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
28 discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
29 utilnet "k8s.io/utils/net"
30 )
31
32
33
34
35
36
37
38
39
40 type EndpointsAdapter struct {
41 endpointClient corev1client.EndpointsGetter
42 endpointSliceClient discoveryclient.EndpointSlicesGetter
43 }
44
45
46 func NewEndpointsAdapter(endpointClient corev1client.EndpointsGetter, endpointSliceClient discoveryclient.EndpointSlicesGetter) EndpointsAdapter {
47 return EndpointsAdapter{
48 endpointClient: endpointClient,
49 endpointSliceClient: endpointSliceClient,
50 }
51 }
52
53
54
55 func (adapter *EndpointsAdapter) Get(namespace, name string, getOpts metav1.GetOptions) (*corev1.Endpoints, error) {
56 return adapter.endpointClient.Endpoints(namespace).Get(context.TODO(), name, getOpts)
57 }
58
59
60
61
62 func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
63 endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(context.TODO(), endpoints, metav1.CreateOptions{})
64 if err == nil {
65 err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
66 }
67 return endpoints, err
68 }
69
70
71
72 func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
73 endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(context.TODO(), endpoints, metav1.UpdateOptions{})
74 if err == nil {
75 err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
76 }
77 return endpoints, err
78 }
79
80
81
82
83 func (adapter *EndpointsAdapter) EnsureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) error {
84 endpointSlice := endpointSliceFromEndpoints(endpoints)
85 currentEndpointSlice, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(context.TODO(), endpointSlice.Name, metav1.GetOptions{})
86
87 if err != nil {
88 if errors.IsNotFound(err) {
89 if _, err = adapter.endpointSliceClient.EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{}); errors.IsAlreadyExists(err) {
90 err = nil
91 }
92 }
93 return err
94 }
95
96
97 if currentEndpointSlice.AddressType != endpointSlice.AddressType {
98 err = adapter.endpointSliceClient.EndpointSlices(namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
99 if err != nil {
100 return err
101 }
102 _, err = adapter.endpointSliceClient.EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
103 return err
104 }
105
106 if apiequality.Semantic.DeepEqual(currentEndpointSlice.Endpoints, endpointSlice.Endpoints) &&
107 apiequality.Semantic.DeepEqual(currentEndpointSlice.Ports, endpointSlice.Ports) &&
108 apiequality.Semantic.DeepEqual(currentEndpointSlice.Labels, endpointSlice.Labels) {
109 return nil
110 }
111
112 _, err = adapter.endpointSliceClient.EndpointSlices(namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
113 return err
114 }
115
116
117
118 func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice {
119 endpointSlice := &discovery.EndpointSlice{}
120 endpointSlice.Name = endpoints.Name
121 endpointSlice.Namespace = endpoints.Namespace
122 endpointSlice.Labels = map[string]string{discovery.LabelServiceName: endpoints.Name}
123
124
125
126 endpointSlice.AddressType = discovery.AddressTypeIPv4
127
128 if len(endpoints.Subsets) > 0 {
129 subset := endpoints.Subsets[0]
130 for i := range subset.Ports {
131 endpointSlice.Ports = append(endpointSlice.Ports, discovery.EndpointPort{
132 Port: &subset.Ports[i].Port,
133 Name: &subset.Ports[i].Name,
134 Protocol: &subset.Ports[i].Protocol,
135 })
136 }
137
138 if allAddressesIPv6(append(subset.Addresses, subset.NotReadyAddresses...)) {
139 endpointSlice.AddressType = discovery.AddressTypeIPv6
140 }
141
142 endpointSlice.Endpoints = append(endpointSlice.Endpoints, getEndpointsFromAddresses(subset.Addresses, endpointSlice.AddressType, true)...)
143 endpointSlice.Endpoints = append(endpointSlice.Endpoints, getEndpointsFromAddresses(subset.NotReadyAddresses, endpointSlice.AddressType, false)...)
144 }
145
146 return endpointSlice
147 }
148
149
150
151 func getEndpointsFromAddresses(addresses []corev1.EndpointAddress, addressType discovery.AddressType, ready bool) []discovery.Endpoint {
152 endpoints := []discovery.Endpoint{}
153 isIPv6AddressType := addressType == discovery.AddressTypeIPv6
154
155 for _, address := range addresses {
156 if utilnet.IsIPv6String(address.IP) == isIPv6AddressType {
157 endpoints = append(endpoints, endpointFromAddress(address, ready))
158 }
159 }
160
161 return endpoints
162 }
163
164
165 func endpointFromAddress(address corev1.EndpointAddress, ready bool) discovery.Endpoint {
166 ep := discovery.Endpoint{
167 Addresses: []string{address.IP},
168 Conditions: discovery.EndpointConditions{Ready: &ready},
169 TargetRef: address.TargetRef,
170 }
171
172 if address.NodeName != nil {
173 ep.NodeName = address.NodeName
174 }
175
176 return ep
177 }
178
179
180 func allAddressesIPv6(addresses []corev1.EndpointAddress) bool {
181 if len(addresses) == 0 {
182 return false
183 }
184
185 for _, address := range addresses {
186 if !utilnet.IsIPv6String(address.IP) {
187 return false
188 }
189 }
190
191 return true
192 }
193
194
195
196 func setSkipMirrorTrue(e *corev1.Endpoints) bool {
197 skipMirrorVal, ok := e.Labels[discovery.LabelSkipMirror]
198 if !ok || skipMirrorVal != "true" {
199 if e.Labels == nil {
200 e.Labels = map[string]string{}
201 }
202 e.Labels[discovery.LabelSkipMirror] = "true"
203 return true
204 }
205 return false
206 }
207
View as plain text