...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package main
16
17 import (
18 goflag "flag"
19 "fmt"
20 "log"
21 "net"
22 "net/http"
23 _ "net/http/pprof"
24 "time"
25
26 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis"
27 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/gcp/profiler"
28 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
29 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/logging"
30 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/ready"
31 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/webhook"
32
33 flag "github.com/spf13/pflag"
34 apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
35 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36 "k8s.io/apimachinery/pkg/labels"
37 "k8s.io/apimachinery/pkg/runtime/schema"
38 _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
39 "k8s.io/client-go/rest"
40 "sigs.k8s.io/controller-runtime/pkg/cache"
41 "sigs.k8s.io/controller-runtime/pkg/client"
42 "sigs.k8s.io/controller-runtime/pkg/client/config"
43 klog "sigs.k8s.io/controller-runtime/pkg/log"
44 "sigs.k8s.io/controller-runtime/pkg/manager"
45 "sigs.k8s.io/controller-runtime/pkg/manager/signals"
46 )
47
48 var logger = klog.Log.WithName("setup")
49
50 func main() {
51 stop := signals.SetupSignalHandler()
52
53 var enablePprof bool
54 var pprofPort int
55
56 profiler.AddFlag(flag.CommandLine)
57 flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
58 flag.BoolVar(&enablePprof, "enable-pprof", false, "Enable the pprof server.")
59 flag.IntVar(&pprofPort, "pprof-port", 6060, "The port that the pprof server binds to if enabled.")
60 flag.Parse()
61
62
63 logging.SetupLogger()
64
65
66 if enablePprof {
67 go func() {
68 if err := http.ListenAndServe(fmt.Sprintf(":%d", pprofPort), nil); err != nil {
69 logger.Error(err, "error while running pprof server")
70 }
71 }()
72 }
73
74
75 if err := profiler.StartIfEnabled(); err != nil {
76 logging.Fatal(err, "error starting Cloud Profiler agent")
77 }
78
79
80 cfg, err := config.GetConfig()
81 if err != nil {
82 log.Fatal(err)
83 }
84
85
86 labelSelector, err := labels.ValidatedSelectorFromSet(labels.Set{
87 k8s.KCCSystemLabel: "true",
88 })
89 if err != nil {
90 log.Fatal(err)
91 }
92 crdKind := &unstructured.Unstructured{}
93 crdKind.SetGroupVersionKind(schema.GroupVersionKind{
94 Group: "apiextensions.k8s.io",
95 Version: "v1",
96 Kind: "CustomResourceDefinition",
97 })
98
99
100 mgr, err := manager.New(cfg, manager.Options{
101
102
103
104 Port: webhook.ServicePort,
105 NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
106 opts.SelectorsByObject = cache.SelectorsByObject{
107 crdKind: cache.ObjectSelector{
108 Label: labelSelector,
109 },
110 }
111 return cache.New(config, opts)
112 },
113 })
114 if err != nil {
115 log.Fatal(err)
116 }
117
118
119 apis.AddToSchemes = append(apis.AddToSchemes, apiextensions.SchemeBuilder.AddToScheme)
120 if err := apis.AddToScheme(mgr.GetScheme()); err != nil {
121 log.Fatal(err)
122 }
123
124 log.Printf("Registering Webhooks.")
125
126
127
128 nocacheClient, err := client.New(cfg, client.Options{})
129 if err != nil {
130 log.Fatal(err)
131 }
132 if err := webhook.RegisterCommonWebhooks(mgr, nocacheClient); err != nil {
133 log.Fatalf("error adding the validating webhooks: %v", err)
134 }
135
136
137
138
139
140 go func() {
141 timeout := 2 * time.Minute
142 log.Println(fmt.Sprintf("Waiting up to %v for the http server to be ready...", timeout))
143 if err := waitForHTTPServerToAcceptRequests("localhost", webhook.ServicePort, timeout); err != nil {
144 log.Fatalf("error waiting for http server to be ready: %v", err)
145 }
146
147 log.Println("Setting container as ready...")
148 ready.SetContainerAsReady()
149 log.Println("Container is ready.")
150 }()
151
152 log.Printf("Starting the Cmd.")
153
154
155 log.Fatal(mgr.Start(stop))
156 }
157
158 func waitForHTTPServerToAcceptRequests(host string, port int, timeout time.Duration) error {
159 address := fmt.Sprintf("%v:%v", host, port)
160 var err error
161 for totalWait := time.Duration(0); totalWait < timeout; {
162 singleDialTimeout := 1 * time.Second
163 _, err = net.DialTimeout("tcp", address, singleDialTimeout)
164 if err == nil {
165 return nil
166 }
167 totalWait += singleDialTimeout
168 sleepTime := 2 * time.Second
169 time.Sleep(sleepTime)
170 totalWait += sleepTime
171 }
172 return fmt.Errorf("timeout of '%v' exceeded with a final error of '%w': still cannot contact http server at '%v'",
173 timeout, err, address)
174 }
175
View as plain text