1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package main
16
17 import (
18 "context"
19 goflag "flag"
20 "fmt"
21 "net/http"
22 _ "net/http/pprof"
23 "os"
24 "time"
25
26 dclmetadata "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/metadata"
27 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/gcp/profiler"
28 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/gvks/supportedgvks"
29 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
30 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/logging"
31 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/metrics"
32 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/ready"
33 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
34
35 "github.com/prometheus/client_golang/prometheus"
36 "github.com/prometheus/client_golang/prometheus/promhttp"
37 flag "github.com/spf13/pflag"
38 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
42 "k8s.io/klog/v2/klogr"
43 "sigs.k8s.io/controller-runtime/pkg/client"
44 "sigs.k8s.io/controller-runtime/pkg/client/config"
45 klog "sigs.k8s.io/controller-runtime/pkg/log"
46 )
47
48 const (
49 NumberOfWorkers = 20
50 MaximumListResults = 50
51 )
52
53 var (
54 logger = klog.Log.WithName("setup")
55 appliedResources = metrics.NewAppliedResourcesCollector()
56 )
57
58 func main() {
59
60 var (
61 prometheusScrapeEndpoint string
62 metricInterval int
63 enablePprof bool
64 pprofPort int
65 )
66 flag.StringVar(&prometheusScrapeEndpoint, "prometheus-scrape-endpoint", ":8888", "configure the Prometheus scrape endpoint; :8888 as default")
67 flag.IntVar(&metricInterval, "metric-interval", 60, "the time interval of each recording in seconds")
68 flag.BoolVar(&enablePprof, "enable-pprof", false, "Enable the pprof server.")
69 flag.IntVar(&pprofPort, "pprof-port", 6060, "The port that the pprof server binds to if enabled.")
70 profiler.AddFlag(flag.CommandLine)
71 flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
72 flag.Parse()
73 kccVersion := os.Getenv("CONFIG_CONNECTOR_VERSION")
74
75 klog.SetLogger(klogr.New())
76
77 logger.Info("Recording the stats of Config Connector resources")
78
79
80 if enablePprof {
81 go func() {
82 if err := http.ListenAndServe(fmt.Sprintf(":%d", pprofPort), nil); err != nil {
83 logger.Error(err, "error while running pprof server")
84 }
85 }()
86 }
87
88
89 if err := profiler.StartIfEnabled(); err != nil {
90 logging.Fatal(err, "error starting Cloud Profiler agent")
91 }
92
93
94 prometheus.MustRegister(appliedResources)
95 prometheus.MustRegister(metrics.NewBuildInfoCollector(kccVersion))
96
97
98 go func() {
99 http.Handle("/metrics", promhttp.Handler())
100 logging.Fatal(http.ListenAndServe(prometheusScrapeEndpoint, nil), "error registering the Prometheus HTTP handler")
101 }()
102
103
104 logger.Info("Setting container as ready...")
105 ready.SetContainerAsReady()
106 logger.Info("Container is ready.")
107
108
109 cfg, err := config.GetConfig()
110 if err != nil {
111 logging.Fatal(err, "error getting configuration from APIServer.")
112 }
113
114
115 c, err := client.New(cfg, client.Options{})
116 if err != nil {
117 logging.Fatal(err, "error getting client.")
118 }
119
120 smLoader, err := servicemappingloader.New()
121 if err != nil {
122 logging.Fatal(err, "error getting new service mapping loader")
123 }
124
125 supportedGVKs := supportedgvks.All(smLoader, dclmetadata.New())
126 for {
127 err := doRecord(c, supportedGVKs)
128 if err != nil {
129 logger.Error(err, "error recording metrics.")
130 }
131 time.Sleep(time.Duration(metricInterval) * time.Second)
132 }
133 }
134
135 func doRecord(c client.Client, gvks []schema.GroupVersionKind) error {
136 logger.Info("listing all CRDs managed by Config Connector.")
137
138
139 appliedResources.Reset()
140
141 sem := make(chan struct{}, NumberOfWorkers)
142 for _, gvk := range gvks {
143 gvk := gvk
144 sem <- struct{}{}
145 go func() {
146 defer func() { <-sem }()
147 err := recordMetricsForGVK(c, gvk)
148 if err != nil {
149 logger.Error(err, "error recording metrics for CRD %v: %v", gvk.String())
150 }
151 }()
152 }
153 for i := 0; i < NumberOfWorkers; i++ {
154 sem <- struct{}{}
155 }
156 logger.Info("finished one run of recording resource metrics.")
157 return nil
158 }
159
160 func forEach(c client.Client, gvk schema.GroupVersionKind, listOptions *client.ListOptions, fn func(unstructured.Unstructured) error) error {
161 for ok := true; ok; ok = listOptions.Continue != "" {
162 list := unstructured.UnstructuredList{}
163 list.SetGroupVersionKind(gvk)
164 err := c.List(context.Background(), &list, listOptions)
165 if err != nil {
166 return fmt.Errorf("error listing objects:%v", err)
167 }
168 for _, item := range list.Items {
169 if err := fn(item); err != nil {
170 return err
171 }
172 }
173 listOptions.Continue = list.GetContinue()
174 }
175 return nil
176 }
177
178 func recordMetricsForGVK(c client.Client, gvk schema.GroupVersionKind) error {
179 opts := &client.ListOptions{
180 Limit: MaximumListResults,
181 Raw: &v1.ListOptions{},
182 }
183 statsNamespaceMap := make(map[string]*Stats)
184 if err := forEach(c, gvk, opts, func(obj unstructured.Unstructured) error {
185 namespace := obj.GetNamespace()
186 s := statsNamespaceMap[namespace]
187 if s == nil {
188 s = &Stats{make(map[string]int64)}
189 statsNamespaceMap[namespace] = s
190 }
191 lastCondition, err := getTheLastCondition(obj)
192 if err != nil {
193 logger.Error(err, "error getting the last condition for metrics for %v: %v", gvk.String())
194 return nil
195 }
196 s.countByStatus[lastCondition]++
197 return nil
198 }); err != nil {
199 return fmt.Errorf("error listing objects for %v: %w", gvk.String(), err)
200 }
201 for ns, stats := range statsNamespaceMap {
202 for status, count := range stats.countByStatus {
203 appliedResources.WithLabelValues(ns, gvk.GroupKind().String(), status).Set(float64(count))
204 }
205 }
206 return nil
207 }
208
209 type Stats struct {
210 countByStatus map[string]int64
211 }
212
213
214 func getTheLastCondition(obj unstructured.Unstructured) (string, error) {
215 currConditionsRaw, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
216 if err != nil {
217 return "", err
218 }
219 if !found || len(currConditionsRaw) == 0 {
220 return "NoCondition", nil
221 }
222
223 currConditions, err := k8s.MarshalAsConditionsSlice(currConditionsRaw)
224 if err != nil {
225 return "", err
226 }
227 if currConditions[0].Reason == "" {
228 return k8s.NoCondition, nil
229 }
230 return currConditions[0].Reason, nil
231 }
232
View as plain text