1 package entrypoint
2
3 import (
4 "context"
5 "fmt"
6 "io/ioutil"
7 "os"
8 "os/signal"
9 "path"
10 "strings"
11 "sync/atomic"
12 "syscall"
13 "time"
14
15 "github.com/google/uuid"
16
17 "github.com/datawire/ambassador/v2/pkg/acp"
18 "github.com/datawire/ambassador/v2/pkg/ambex"
19 "github.com/datawire/ambassador/v2/pkg/busy"
20 "github.com/datawire/ambassador/v2/pkg/kates"
21 "github.com/datawire/ambassador/v2/pkg/logutil"
22 "github.com/datawire/ambassador/v2/pkg/memory"
23 "github.com/datawire/dlib/dgroup"
24 "github.com/datawire/dlib/dlog"
25 )
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 func Main(ctx context.Context, Version string, args ...string) error {
81
82 busy.SetLogLevel(logutil.DefaultLogLevel)
83 if lvl := os.Getenv("AES_LOG_LEVEL"); lvl != "" {
84 parsed, err := logutil.ParseLogLevel(lvl)
85 if err != nil {
86 dlog.Errorf(ctx, "Error parsing log level: %v", err)
87 } else {
88 busy.SetLogLevel(parsed)
89 }
90 }
91
92
93
94
95 os.Unsetenv("AGENT_SERVICE")
96 dlog.Infof(ctx, "Started Ambassador (Version %s)", Version)
97
98 demoMode := false
99
100
101
102 if (len(args) == 1) && (args[0] == "--demo") {
103
104 dlog.Infof(ctx, "DEMO MODE")
105 demoMode = true
106 }
107
108 clusterID := GetClusterID(ctx)
109 os.Setenv("AMBASSADOR_CLUSTER_ID", clusterID)
110 dlog.Infof(ctx, "AMBASSADOR_CLUSTER_ID=%s", clusterID)
111
112 pec := "PYTHON_EGG_CACHE"
113 if os.Getenv(pec) == "" {
114 os.Setenv(pec, path.Join(GetAmbassadorConfigBaseDir(), ".cache"))
115 }
116 os.Setenv("PYTHONUNBUFFERED", "true")
117
118
119 if err := ensureDir(GetHomeDir()); err != nil {
120 return err
121 }
122 if err := ensureDir(GetAmbassadorConfigBaseDir()); err != nil {
123 return err
124 }
125 if err := ensureDir(GetSnapshotDir()); err != nil {
126 return err
127 }
128 if err := ensureDir(GetEnvoyDir()); err != nil {
129 return err
130 }
131
132
133 envoyHUP := make(chan os.Signal, 1)
134 signal.Notify(envoyHUP, syscall.SIGHUP)
135
136
137 ambwatch := acp.NewAmbassadorWatcher(acp.NewEnvoyWatcher(), acp.NewDiagdWatcher())
138
139 group := dgroup.NewGroup(ctx, dgroup.GroupConfig{
140 EnableSignalHandling: true,
141 SoftShutdownTimeout: 10 * time.Second,
142 HardShutdownTimeout: 10 * time.Second,
143 })
144
145
146
147
148 if demoMode {
149 bootDemoMode(ctx, group, ambwatch)
150 }
151
152 group.Go("diagd", func(ctx context.Context) error {
153 cmd := subcommand(ctx, "diagd", GetDiagdArgs(ctx, demoMode)...)
154 if envbool("DEV_SHUTUP_DIAGD") {
155 cmd.Stdout = nil
156 cmd.Stderr = nil
157 }
158 return cmd.Run()
159 })
160
161 usage := memory.GetMemoryUsage(ctx)
162 if !envbool("DEV_SHUTUP_MEMORY") {
163 group.Go("memory", func(ctx context.Context) error {
164 usage.Watch(ctx)
165 return nil
166 })
167 }
168
169 fastpathCh := make(chan *ambex.FastpathSnapshot)
170 group.Go("ambex", func(ctx context.Context) error {
171 return ambex.Main(ctx, Version, usage.PercentUsed, fastpathCh, "--ads-listen-address",
172 "127.0.0.1:8003", GetEnvoyDir())
173 })
174
175 group.Go("envoy", func(ctx context.Context) error {
176 return runEnvoy(ctx, envoyHUP)
177 })
178
179 snapshot := &atomic.Value{}
180 group.Go("snapshot_server", func(ctx context.Context) error {
181 return snapshotServer(ctx, snapshot)
182 })
183 if !envbool("AMBASSADOR_DISABLE_SNAPSHOT_SERVER") {
184 group.Go("external_snapshot_server", func(ctx context.Context) error {
185 return externalSnapshotServer(ctx, snapshot)
186 })
187 }
188
189 if !demoMode {
190 group.Go("watcher", func(ctx context.Context) error {
191
192
193 return WatchAllTheThings(ctx, ambwatch, snapshot, fastpathCh, clusterID, Version)
194 })
195 }
196
197
198 group.Go("healthchecks", func(ctx context.Context) error {
199 return healthCheckHandler(ctx, ambwatch)
200 })
201
202
203
204 sidecarDir := "/ambassador/sidecars"
205 sidecars, err := ioutil.ReadDir(sidecarDir)
206 if err != nil && !os.IsNotExist(err) {
207 return err
208 }
209 for _, sidecar := range sidecars {
210 group.Go(sidecar.Name(), func(ctx context.Context) error {
211 cmd := subcommand(ctx, path.Join(sidecarDir, sidecar.Name()))
212 return cmd.Run()
213 })
214 }
215
216 return group.Wait()
217 }
218
219 func clusterIDFromRootID(rootID string) string {
220 clusterUrl := fmt.Sprintf("d6e_id://%s/%s", rootID, GetAmbassadorID())
221 uid := uuid.NewSHA1(uuid.NameSpaceURL, []byte(clusterUrl))
222
223 return strings.ToLower(uid.String())
224 }
225
226 func GetClusterID(ctx context.Context) (clusterID string) {
227 clusterID = env("AMBASSADOR_CLUSTER_ID", env("AMBASSADOR_SCOUT_ID", ""))
228 if clusterID != "" {
229 return clusterID
230 }
231
232 rootID := "00000000-0000-0000-0000-000000000000"
233
234 client, err := kates.NewClient(kates.ClientConfig{})
235 if err == nil {
236 nsName := "default"
237 if IsAmbassadorSingleNamespace() {
238 nsName = GetAmbassadorNamespace()
239 }
240 ns := &kates.Namespace{
241 TypeMeta: kates.TypeMeta{Kind: "Namespace"},
242 ObjectMeta: kates.ObjectMeta{Name: nsName},
243 }
244
245 err := client.Get(ctx, ns, ns)
246 if err == nil {
247 rootID = string(ns.GetUID())
248 }
249 }
250
251 return clusterIDFromRootID(rootID)
252 }
253
View as plain text