...
1 package services
2
3 import (
4
5 "context"
6 "encoding/json"
7 "errors"
8 "io"
9 "net/http"
10 "sync"
11
12
13 "google.golang.org/grpc"
14
15
16 logdatav2 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/data/accesslog/v2"
17 logdatav3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/data/accesslog/v3"
18 alsv2 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/accesslog/v2"
19 alsv3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/accesslog/v3"
20 )
21
22 type GRPCALS struct {
23 HTTPListener
24
25 mu sync.Mutex
26 v2http []*logdatav2.HTTPAccessLogEntry
27 v2tcp []*logdatav2.TCPAccessLogEntry
28 v3http []*logdatav3.HTTPAccessLogEntry
29 v3tcp []*logdatav3.TCPAccessLogEntry
30 }
31
32 func (als *GRPCALS) Start(ctx context.Context) <-chan bool {
33 httpHandler := http.NewServeMux()
34 httpHandler.HandleFunc("/logs", als.ServeLogs)
35
36 grpcHandler := grpc.NewServer()
37 alsv2.RegisterAccessLogServiceServer(grpcHandler, ALSv2{als})
38 alsv3.RegisterAccessLogServiceServer(grpcHandler, ALSv3{als})
39
40 return als.HTTPListener.Run(ctx, "gRPC ALS", httpHandler, grpcHandler)
41 }
42
43 func (als *GRPCALS) ServeLogs(w http.ResponseWriter, r *http.Request) {
44 als.mu.Lock()
45 defer als.mu.Unlock()
46 switch r.Method {
47 case http.MethodGet:
48 bs, err := json.Marshal(map[string]interface{}{
49 "alsv2-http": als.v2http,
50 "alsv2-tcp": als.v2tcp,
51 "alsv3-http": als.v3http,
52 "alsv3-tcp": als.v3tcp,
53 })
54 if err != nil {
55 http.Error(w, err.Error(), http.StatusInternalServerError)
56 return
57 }
58 w.Header().Set("content-type", "application/json")
59 w.Write(bs)
60 case http.MethodDelete:
61 als.v2http = nil
62 als.v2tcp = nil
63 als.v3http = nil
64 als.v3tcp = nil
65 default:
66 http.Error(w, "only responds to GET and DELETE", http.StatusMethodNotAllowed)
67 }
68 }
69
70 type ALSv2 struct {
71 *GRPCALS
72 }
73
74 func (als ALSv2) StreamAccessLogs(srv alsv2.AccessLogService_StreamAccessLogsServer) error {
75 for {
76 msg, err := srv.Recv()
77 if msg != nil {
78 switch logEntries := msg.LogEntries.(type) {
79 case *alsv2.StreamAccessLogsMessage_HttpLogs:
80 if logEntries.HttpLogs != nil {
81 als.mu.Lock()
82 als.v2http = append(als.v2http, logEntries.HttpLogs.LogEntry...)
83 als.mu.Unlock()
84 }
85 case *alsv2.StreamAccessLogsMessage_TcpLogs:
86 if logEntries.TcpLogs != nil {
87 als.mu.Lock()
88 als.v2tcp = append(als.v2tcp, logEntries.TcpLogs.LogEntry...)
89 als.mu.Unlock()
90 }
91 }
92 }
93 if err != nil {
94 if errors.Is(err, io.EOF) {
95 return nil
96 }
97 return err
98 }
99 }
100 }
101
102 type ALSv3 struct {
103 *GRPCALS
104 }
105
106 func (als ALSv3) StreamAccessLogs(srv alsv3.AccessLogService_StreamAccessLogsServer) error {
107 for {
108 msg, err := srv.Recv()
109 if msg != nil {
110 switch logEntries := msg.LogEntries.(type) {
111 case *alsv3.StreamAccessLogsMessage_HttpLogs:
112 if logEntries.HttpLogs != nil {
113 als.mu.Lock()
114 als.v3http = append(als.v3http, logEntries.HttpLogs.LogEntry...)
115 als.mu.Unlock()
116 }
117 case *alsv3.StreamAccessLogsMessage_TcpLogs:
118 if logEntries.TcpLogs != nil {
119 als.mu.Lock()
120 als.v3tcp = append(als.v3tcp, logEntries.TcpLogs.LogEntry...)
121 als.mu.Unlock()
122 }
123 }
124 }
125 if err != nil {
126 if errors.Is(err, io.EOF) {
127 return nil
128 }
129 return err
130 }
131 }
132 }
133
View as plain text