1from typing import TYPE_CHECKING, Literal, Optional
2
3from ..config import Config
4from .ircluster import IRCluster
5from .irresource import IRResource
6
7if TYPE_CHECKING:
8 from .ir import IR # pragma: no cover
9
10
11class IRLogService(IRResource):
12 cluster: Optional[IRCluster]
13 service: str
14 protocol_version: Literal["v2", "v3"]
15 driver: str
16 driver_config: dict
17 flush_interval_byte_size: int
18 flush_interval_time: int
19 grpc: bool
20
21 def __init__(
22 self,
23 ir: "IR",
24 config,
25 rkey: str = "ir.logservice",
26 kind: str = "ir.logservice",
27 name: str = "logservice",
28 namespace: Optional[str] = None,
29 **kwargs,
30 ) -> None:
31 del kwargs # silence unused-variable warning
32
33 super().__init__(ir=ir, aconf=config, rkey=rkey, kind=kind, name=name, namespace=namespace)
34
35 def setup(self, ir: "IR", config) -> bool:
36 self.service = config.get("service")
37 if not self.service:
38 self.post_error("service must be present for a remote log service!")
39 return False
40
41 self.namespace = config.get("namespace", self.namespace)
42 self.cluster = None
43 self.grpc = config.get("grpc", False)
44
45 self.protocol_version = config.get("protocol_version", "v2")
46 if self.protocol_version == "v2":
47 self.post_error(
48 f'LogService: protocol_version {self.protocol_version} is unsupported, protocol_version must be "v3"'
49 )
50 return False
51
52 self.driver = config.get("driver")
53 # These defaults come from Envoy:
54 # https://www.envoyproxy.io/docs/envoy/v1.22.2/api-v3/extensions/access_loggers/grpc/v3/als.proto#extensions-access-loggers-grpc-v3-commongrpcaccesslogconfig
55 self.flush_interval_byte_size = config.get("flush_interval_byte_size", 16384)
56 self.flush_interval_time = config.get("flush_interval_time", 1)
57
58 self.driver_config = config.get("driver_config")
59 if "additional_log_headers" in self.driver_config:
60 if self.driver != "http" and self.driver_config["additional_log_headers"]:
61 self.post_error("additional_log_headers are not supported in tcp mode")
62 return False
63
64 for header_obj in self.get_additional_headers():
65 if header_obj.get("header_name", "") == "":
66 self.post_error("Please provide a header name for every additional log header!")
67 return False
68
69 self.sourced_by(config)
70 self.referenced_by(config)
71
72 return True
73
74 def add_mappings(self, ir: "IR", aconf: Config):
75 self.cluster = ir.add_cluster(
76 IRCluster(
77 ir=ir,
78 aconf=aconf,
79 parent_ir_resource=self,
80 location=self.location,
81 service=self.service,
82 host_rewrite=self.get("host_rewrite", None),
83 marker="logging",
84 grpc=self.grpc,
85 stats_name=self.get("stats_name", None),
86 )
87 )
88
89 self.cluster.referenced_by(self)
90
91 def get_common_config(self) -> dict:
92 # get_common_config isn't allowed to be called before add_mappings
93 # is called (by ir.walk_saved_resources). So we can assert that
94 # self.cluster isn't None here, both to make mypy happier and out
95 # of paranoia.
96 assert self.cluster
97
98 return {
99 "transport_api_version": self.protocol_version.upper(),
100 "log_name": self.name,
101 "grpc_service": {"envoy_grpc": {"cluster_name": self.cluster.envoy_name}},
102 "buffer_flush_interval": "%ds" % self.flush_interval_time,
103 "buffer_size_bytes": self.flush_interval_byte_size,
104 }
105
106 def get_additional_headers(self) -> list:
107 if "additional_log_headers" in self.driver_config:
108 return self.driver_config.get("additional_log_headers", [])
109 else:
110 return []
111
112
113class IRLogServiceFactory:
114 @classmethod
115 def load_all(cls, ir: "IR", aconf: Config) -> None:
116 services = aconf.get_config("log_services")
117 if services is not None:
118 for config in services.values():
119 srv = IRLogService(ir, config)
120 extant_srv = ir.log_services.get(srv.name, None)
121
122 if extant_srv:
123 ir.post_error(
124 "Duplicate LogService %s; keeping definition from %s"
125 % (srv.name, extant_srv.location)
126 )
127 elif srv.is_active():
128 ir.log_services[srv.name] = srv
129 ir.save_resource(srv)
View as plain text