1
16
17 package kubernetesservice
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "net/http"
24 "sync"
25 "time"
26
27 corev1 "k8s.io/api/core/v1"
28 "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/intstr"
31 "k8s.io/apimachinery/pkg/util/runtime"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/apiserver/pkg/storage"
34 v1informers "k8s.io/client-go/informers/core/v1"
35 "k8s.io/client-go/kubernetes"
36 v1listers "k8s.io/client-go/listers/core/v1"
37 "k8s.io/client-go/tools/cache"
38 "k8s.io/klog/v2"
39
40 "k8s.io/kubernetes/pkg/controlplane/reconcilers"
41 )
42
43 const (
44 kubernetesServiceName = "kubernetes"
45 )
46
47
48
49
50 type Controller struct {
51 Config
52
53 client kubernetes.Interface
54 serviceLister v1listers.ServiceLister
55 serviceSynced cache.InformerSynced
56
57 lock sync.Mutex
58 stopCh chan struct{}
59 }
60
61 type Config struct {
62 PublicIP net.IP
63
64 EndpointReconciler reconcilers.EndpointReconciler
65 EndpointInterval time.Duration
66
67
68 ServiceIP net.IP
69 ServicePort int
70 PublicServicePort int
71 KubernetesServiceNodePort int
72 }
73
74
75 func New(config Config, client kubernetes.Interface, serviceInformer v1informers.ServiceInformer) *Controller {
76 return &Controller{
77 Config: config,
78 client: client,
79 serviceLister: serviceInformer.Lister(),
80 serviceSynced: serviceInformer.Informer().HasSynced,
81 stopCh: make(chan struct{}),
82 }
83 }
84
85
86
87 func (c *Controller) Start(stopCh <-chan struct{}) {
88 if !cache.WaitForCacheSync(stopCh, c.serviceSynced) {
89 runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
90 return
91 }
92
93 endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
94 if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil {
95 klog.Error("Found stale data, removed previous endpoints on kubernetes service, apiserver didn't exit successfully previously")
96 } else if !storage.IsNotFound(err) {
97 klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
98 }
99
100 localStopCh := make(chan struct{})
101 go func() {
102 defer close(localStopCh)
103 select {
104 case <-stopCh:
105 case <-c.stopCh:
106 }
107 }()
108
109 go c.Run(localStopCh)
110 }
111
112
113 func (c *Controller) Stop() {
114 c.lock.Lock()
115 defer c.lock.Unlock()
116
117 select {
118 case <-c.stopCh:
119 return
120 default:
121 close(c.stopCh)
122 }
123
124 endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
125 finishedReconciling := make(chan struct{})
126 go func() {
127 defer close(finishedReconciling)
128 klog.Infof("Shutting down kubernetes service endpoint reconciler")
129 c.EndpointReconciler.StopReconciling()
130 if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
131 klog.Errorf("Unable to remove endpoints from kubernetes service: %v", err)
132 }
133 c.EndpointReconciler.Destroy()
134 }()
135
136 select {
137 case <-finishedReconciling:
138
139 case <-time.After(2 * c.EndpointInterval):
140
141 klog.Warning("RemoveEndpoints() timed out")
142 }
143 }
144
145
146 func (c *Controller) Run(ch <-chan struct{}) {
147
148 wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
149 var code int
150 c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
151 return code == http.StatusOK, nil
152 }, ch)
153
154 wait.NonSlidingUntil(func() {
155
156
157
158 if err := c.UpdateKubernetesService(false); err != nil {
159 runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
160 }
161 }, c.EndpointInterval, ch)
162 }
163
164
165 func (c *Controller) UpdateKubernetesService(reconcile bool) error {
166
167 servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https")
168 if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
169 return err
170 }
171 endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
172 if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
173 return err
174 }
175 return nil
176 }
177
178
179
180 func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort int, servicePortName string) ([]corev1.ServicePort, corev1.ServiceType) {
181
182
183 servicePorts := []corev1.ServicePort{{
184 Protocol: corev1.ProtocolTCP,
185 Port: int32(servicePort),
186 Name: servicePortName,
187 TargetPort: intstr.FromInt32(int32(targetServicePort)),
188 }}
189 serviceType := corev1.ServiceTypeClusterIP
190 if nodePort > 0 {
191 servicePorts[0].NodePort = int32(nodePort)
192 serviceType = corev1.ServiceTypeNodePort
193 }
194 return servicePorts, serviceType
195 }
196
197
198 func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.EndpointPort {
199 return []corev1.EndpointPort{{
200 Protocol: corev1.ProtocolTCP,
201 Port: int32(endpointPort),
202 Name: endpointPortName,
203 }}
204 }
205
206
207
208 func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
209 if s, err := c.serviceLister.Services(metav1.NamespaceDefault).Get(serviceName); err == nil {
210
211
212 if reconcile {
213 if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
214 klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
215 _, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
216 return err
217 }
218 }
219 return nil
220 }
221 singleStack := corev1.IPFamilyPolicySingleStack
222 svc := &corev1.Service{
223 ObjectMeta: metav1.ObjectMeta{
224 Name: serviceName,
225 Namespace: metav1.NamespaceDefault,
226 Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"},
227 },
228 Spec: corev1.ServiceSpec{
229 Ports: servicePorts,
230
231 Selector: nil,
232 ClusterIP: serviceIP.String(),
233 IPFamilyPolicy: &singleStack,
234 SessionAffinity: corev1.ServiceAffinityNone,
235 Type: serviceType,
236 },
237 }
238
239 _, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
240 if errors.IsAlreadyExists(err) {
241 return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
242 }
243 return err
244 }
245
246
247 func getMasterServiceUpdateIfNeeded(svc *corev1.Service, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType) (s *corev1.Service, updated bool) {
248
249
250 formatCorrect := checkServiceFormat(svc, servicePorts, serviceType)
251 if formatCorrect {
252 return svc, false
253 }
254 svc.Spec.Ports = servicePorts
255 svc.Spec.Type = serviceType
256 return svc, true
257 }
258
259
260
261
262 func checkServiceFormat(s *corev1.Service, ports []corev1.ServicePort, serviceType corev1.ServiceType) (formatCorrect bool) {
263 if s.Spec.Type != serviceType {
264 return false
265 }
266 if len(ports) != len(s.Spec.Ports) {
267 return false
268 }
269 for i, port := range ports {
270 if port != s.Spec.Ports[i] {
271 return false
272 }
273 }
274 return true
275 }
276
View as plain text