...

Source file src/github.com/emissary-ingress/emissary/v3/cmd/kat-server/services/grpc-als.go

Documentation: github.com/emissary-ingress/emissary/v3/cmd/kat-server/services

     1  package services
     2  
     3  import (
     4  	// stdlib
     5  	"context"
     6  	"encoding/json"
     7  	"errors"
     8  	"io"
     9  	"net/http"
    10  	"sync"
    11  
    12  	// third party
    13  	"google.golang.org/grpc"
    14  
    15  	// first party
    16  	logdatav2 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/data/accesslog/v2"
    17  	logdatav3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/data/accesslog/v3"
    18  	alsv2 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/accesslog/v2"
    19  	alsv3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/accesslog/v3"
    20  )
    21  
    22  type GRPCALS struct {
    23  	HTTPListener
    24  
    25  	mu     sync.Mutex
    26  	v2http []*logdatav2.HTTPAccessLogEntry
    27  	v2tcp  []*logdatav2.TCPAccessLogEntry
    28  	v3http []*logdatav3.HTTPAccessLogEntry
    29  	v3tcp  []*logdatav3.TCPAccessLogEntry
    30  }
    31  
    32  func (als *GRPCALS) Start(ctx context.Context) <-chan bool {
    33  	httpHandler := http.NewServeMux()
    34  	httpHandler.HandleFunc("/logs", als.ServeLogs)
    35  
    36  	grpcHandler := grpc.NewServer()
    37  	alsv2.RegisterAccessLogServiceServer(grpcHandler, ALSv2{als})
    38  	alsv3.RegisterAccessLogServiceServer(grpcHandler, ALSv3{als})
    39  
    40  	return als.HTTPListener.Run(ctx, "gRPC ALS", httpHandler, grpcHandler)
    41  }
    42  
    43  func (als *GRPCALS) ServeLogs(w http.ResponseWriter, r *http.Request) {
    44  	als.mu.Lock()
    45  	defer als.mu.Unlock()
    46  	switch r.Method {
    47  	case http.MethodGet:
    48  		bs, err := json.Marshal(map[string]interface{}{
    49  			"alsv2-http": als.v2http,
    50  			"alsv2-tcp":  als.v2tcp,
    51  			"alsv3-http": als.v3http,
    52  			"alsv3-tcp":  als.v3tcp,
    53  		})
    54  		if err != nil {
    55  			http.Error(w, err.Error(), http.StatusInternalServerError)
    56  			return
    57  		}
    58  		w.Header().Set("content-type", "application/json")
    59  		w.Write(bs)
    60  	case http.MethodDelete:
    61  		als.v2http = nil
    62  		als.v2tcp = nil
    63  		als.v3http = nil
    64  		als.v3tcp = nil
    65  	default:
    66  		http.Error(w, "only responds to GET and DELETE", http.StatusMethodNotAllowed)
    67  	}
    68  }
    69  
    70  type ALSv2 struct {
    71  	*GRPCALS
    72  }
    73  
    74  func (als ALSv2) StreamAccessLogs(srv alsv2.AccessLogService_StreamAccessLogsServer) error {
    75  	for {
    76  		msg, err := srv.Recv()
    77  		if msg != nil {
    78  			switch logEntries := msg.LogEntries.(type) {
    79  			case *alsv2.StreamAccessLogsMessage_HttpLogs:
    80  				if logEntries.HttpLogs != nil {
    81  					als.mu.Lock()
    82  					als.v2http = append(als.v2http, logEntries.HttpLogs.LogEntry...)
    83  					als.mu.Unlock()
    84  				}
    85  			case *alsv2.StreamAccessLogsMessage_TcpLogs:
    86  				if logEntries.TcpLogs != nil {
    87  					als.mu.Lock()
    88  					als.v2tcp = append(als.v2tcp, logEntries.TcpLogs.LogEntry...)
    89  					als.mu.Unlock()
    90  				}
    91  			}
    92  		}
    93  		if err != nil {
    94  			if errors.Is(err, io.EOF) {
    95  				return nil
    96  			}
    97  			return err
    98  		}
    99  	}
   100  }
   101  
   102  type ALSv3 struct {
   103  	*GRPCALS
   104  }
   105  
   106  func (als ALSv3) StreamAccessLogs(srv alsv3.AccessLogService_StreamAccessLogsServer) error {
   107  	for {
   108  		msg, err := srv.Recv()
   109  		if msg != nil {
   110  			switch logEntries := msg.LogEntries.(type) {
   111  			case *alsv3.StreamAccessLogsMessage_HttpLogs:
   112  				if logEntries.HttpLogs != nil {
   113  					als.mu.Lock()
   114  					als.v3http = append(als.v3http, logEntries.HttpLogs.LogEntry...)
   115  					als.mu.Unlock()
   116  				}
   117  			case *alsv3.StreamAccessLogsMessage_TcpLogs:
   118  				if logEntries.TcpLogs != nil {
   119  					als.mu.Lock()
   120  					als.v3tcp = append(als.v3tcp, logEntries.TcpLogs.LogEntry...)
   121  					als.mu.Unlock()
   122  				}
   123  			}
   124  		}
   125  		if err != nil {
   126  			if errors.Is(err, io.EOF) {
   127  				return nil
   128  			}
   129  			return err
   130  		}
   131  	}
   132  }
   133  

View as plain text