1
2 package operator
3
4 import (
5 "context"
6 "fmt"
7 "os"
8 "os/signal"
9
10 "github.com/spf13/afero"
11 "golang.org/x/sys/unix"
12 appsv1 "k8s.io/api/apps/v1"
13 corev1 "k8s.io/api/core/v1"
14 kruntime "k8s.io/apimachinery/pkg/runtime"
15 k8stypes "k8s.io/apimachinery/pkg/types"
16 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
18 ctrl "sigs.k8s.io/controller-runtime"
19 "sigs.k8s.io/controller-runtime/pkg/client"
20 k8smanager "sigs.k8s.io/controller-runtime/pkg/manager"
21
22 edgecontroller "edge-infra.dev/pkg/k8s/runtime/controller"
23 "edge-infra.dev/pkg/lib/fog"
24 v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
25 "edge-infra.dev/pkg/sds/etcd/operator/constants"
26 "edge-infra.dev/pkg/sds/etcd/operator/internal/config"
27 "edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/inform"
28 "edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/install"
29 "edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/lifecycle"
30 "edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/provision"
31 v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
32 nodemeta "edge-infra.dev/pkg/sds/ien/node"
33 )
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 const (
50 worker = "worker"
51 controlPlane = "controlplane"
52 )
53
54 const (
55 MemberLifecycle = "MemberLifecycleReconciler"
56 MemberInformer = "MemberInformerReconciler"
57 MemberProvision = "MemberProvisionReconciler"
58 Secret = "SecretReconciler"
59 )
60
61
62 type ReconcilerSet map[string]NodeReconcilerSet
63
64
65 type NodeReconcilerSet map[string]Reconciler
66
67
68 type Reconciler interface {
69 SetupWithManager(config.Config, *v1etcd.EtcdMemberList) error
70 Reconcile(context.Context, ctrl.Request) (ctrl.Result, error)
71 }
72
73
74 var reconcilerSet = ReconcilerSet{
75 controlPlane: NodeReconcilerSet{
76 MemberLifecycle: &lifecycle.Reconciler{},
77 MemberInformer: &inform.Reconciler{},
78 MemberProvision: &provision.Reconciler{},
79 },
80 worker: NodeReconcilerSet{
81 Secret: &install.Reconciler{},
82 },
83 }
84
85
86 func Run(opts ...edgecontroller.Option) error {
87 log := fog.New().WithName("etcdoperator")
88 ctx := context.Background()
89 ctx = fog.IntoContext(ctx, log)
90
91 if err := afero.NewOsFs().RemoveAll(constants.OperatorFilewallFilepath); err != nil {
92 fog.FromContext(ctx).Error(err, "failed to cleanup Kubernetes API Server firewall on shutdown")
93 }
94
95 mgr, err := createControllerManager(ctx, reconcilerSet, opts...)
96 if err != nil {
97 return fmt.Errorf("error creating controller manager: %v", err)
98 }
99
100 if err := runTigeraOperatorOnControlplane(ctx, mgr); err != nil {
101 return err
102 }
103 return mgr.Start(SetupSignalHandler(ctx))
104 }
105
106
107
108 func createControllerManager(ctx context.Context, reconcilerSet ReconcilerSet, o ...edgecontroller.Option) (ctrl.Manager, error) {
109 log := fog.FromContext(ctx)
110 ctlCfg, opts := edgecontroller.ProcessOptions(o...)
111 opts.Scheme = createScheme()
112
113 mgr, err := ctrl.NewManager(ctlCfg, opts)
114 if err != nil {
115 return nil, fmt.Errorf("failed to create controller manager: %v", err)
116 }
117
118 roleLabel, err := getNodeRole(mgr)
119 if err != nil {
120 return nil, fmt.Errorf("failed to get node role: %v", err)
121 }
122
123 reconcilers, ok := reconcilerSet[roleLabel]
124 if !ok {
125 return nil, fmt.Errorf("unsupported node role: %v", roleLabel)
126 }
127
128 log.V(0).Info(fmt.Sprintf("registering %s controllers", roleLabel))
129 if err := registerReconcilers(ctx, reconcilers, mgr); err != nil {
130 return nil, fmt.Errorf("failed to register controllers: %v", err)
131 }
132
133 return mgr, nil
134 }
135
136
137 func getNodeRole(mgr k8smanager.Manager) (string, error) {
138 node := &corev1.Node{}
139
140 key := k8stypes.NamespacedName{
141 Name: os.Getenv("NODE_NAME"),
142 }
143 if err := mgr.GetAPIReader().Get(context.Background(), key, node); err != nil {
144 return "", err
145 }
146
147 label, ok := node.GetLabels()[nodemeta.RoleLabel]
148 if !ok {
149 return "", fmt.Errorf("node %v does not have label %v", node.GetName(), nodemeta.RoleLabel)
150 }
151
152 return label, nil
153 }
154
155
156 func registerReconcilers(ctx context.Context, reconcilers NodeReconcilerSet, mgr ctrl.Manager) error {
157 log := fog.FromContext(ctx)
158 initialMembers := &v1etcd.EtcdMemberList{}
159 if err := mgr.GetAPIReader().List(ctx, initialMembers); err != nil {
160 return err
161 }
162
163 for name, reconciler := range reconcilers {
164 cfg := createDefaultCfg(name, mgr)
165
166 log.V(0).Info("registering controller", "name", name)
167 if err := reconciler.SetupWithManager(cfg, initialMembers); err != nil {
168 return fmt.Errorf("failed to register %s: %v", name, err)
169 }
170 }
171 return nil
172 }
173
174
175 func createDefaultCfg(name string, mgr ctrl.Manager) config.Config {
176 cfg := config.Config{
177 Name: name,
178 NodeName: os.Getenv("NODE_NAME"),
179 Mgr: mgr,
180 Fs: afero.NewOsFs(),
181 }
182 cfg.WithDefaultKubeRetryClient()
183
184 return cfg
185 }
186
187
188 func createScheme() *kruntime.Scheme {
189 scheme := kruntime.NewScheme()
190 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
191
192 utilruntime.Must(v1etcd.AddToScheme(scheme))
193 return scheme
194 }
195
196
197
198
199 func runTigeraOperatorOnControlplane(ctx context.Context, mgr ctrl.Manager) error {
200 cfg := createDefaultCfg("tigeraOperatorNodeAffinity", mgr)
201 key := k8stypes.NamespacedName{
202 Name: "tigera-operator",
203 Namespace: "tigera-operator",
204 }
205 deployment := &appsv1.Deployment{}
206 return cfg.KubeRetryClient.IgnoreCache().SafeUpdate(ctx, key, deployment, addControlplaneNodeAffinity)
207 }
208
209
210
211
212 func addControlplaneNodeAffinity(_ context.Context, obj client.Object) error {
213 deployment := obj.(*appsv1.Deployment)
214 deployment.Spec.Template.Spec.Affinity = &corev1.Affinity{
215 NodeAffinity: &corev1.NodeAffinity{
216 RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
217 NodeSelectorTerms: []corev1.NodeSelectorTerm{
218 {
219 MatchExpressions: []corev1.NodeSelectorRequirement{
220 {
221 Key: nodemeta.RoleLabel,
222 Operator: corev1.NodeSelectorOpIn,
223 Values: []string{string(v1ien.ControlPlane)},
224 },
225 },
226 },
227 },
228 },
229 },
230 }
231 return nil
232 }
233
234 var onlyOneSignalHandler = make(chan struct{})
235 var shutdownSignals = []os.Signal{os.Interrupt, unix.SIGTERM}
236
237
238
239
240 func SetupSignalHandler(ctx context.Context) context.Context {
241 close(onlyOneSignalHandler)
242
243 ctx, cancel := context.WithCancel(ctx)
244
245 c := make(chan os.Signal, 2)
246 signal.Notify(c, shutdownSignals...)
247 go func() {
248 <-c
249 cancel()
250 <-c
251 if err := afero.NewOsFs().RemoveAll(constants.OperatorFilewallFilepath); err != nil {
252 fog.FromContext(ctx).Error(err, "failed to cleanup Kubernetes API Server firewall on shutdown")
253 }
254 os.Exit(1)
255 }()
256
257 return ctx
258 }
259
View as plain text