...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/ir/irlogservice.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador/ir

     1from typing import TYPE_CHECKING, Literal, Optional
     2
     3from ..config import Config
     4from .ircluster import IRCluster
     5from .irresource import IRResource
     6
     7if TYPE_CHECKING:
     8    from .ir import IR  # pragma: no cover
     9
    10
    11class IRLogService(IRResource):
    12    cluster: Optional[IRCluster]
    13    service: str
    14    protocol_version: Literal["v2", "v3"]
    15    driver: str
    16    driver_config: dict
    17    flush_interval_byte_size: int
    18    flush_interval_time: int
    19    grpc: bool
    20
    21    def __init__(
    22        self,
    23        ir: "IR",
    24        config,
    25        rkey: str = "ir.logservice",
    26        kind: str = "ir.logservice",
    27        name: str = "logservice",
    28        namespace: Optional[str] = None,
    29        **kwargs,
    30    ) -> None:
    31        del kwargs  # silence unused-variable warning
    32
    33        super().__init__(ir=ir, aconf=config, rkey=rkey, kind=kind, name=name, namespace=namespace)
    34
    35    def setup(self, ir: "IR", config) -> bool:
    36        self.service = config.get("service")
    37        if not self.service:
    38            self.post_error("service must be present for a remote log service!")
    39            return False
    40
    41        self.namespace = config.get("namespace", self.namespace)
    42        self.cluster = None
    43        self.grpc = config.get("grpc", False)
    44
    45        self.protocol_version = config.get("protocol_version", "v2")
    46        if self.protocol_version == "v2":
    47            self.post_error(
    48                f'LogService: protocol_version {self.protocol_version} is unsupported, protocol_version must be "v3"'
    49            )
    50            return False
    51
    52        self.driver = config.get("driver")
    53        # These defaults come from Envoy:
    54        # https://www.envoyproxy.io/docs/envoy/v1.22.2/api-v3/extensions/access_loggers/grpc/v3/als.proto#extensions-access-loggers-grpc-v3-commongrpcaccesslogconfig
    55        self.flush_interval_byte_size = config.get("flush_interval_byte_size", 16384)
    56        self.flush_interval_time = config.get("flush_interval_time", 1)
    57
    58        self.driver_config = config.get("driver_config")
    59        if "additional_log_headers" in self.driver_config:
    60            if self.driver != "http" and self.driver_config["additional_log_headers"]:
    61                self.post_error("additional_log_headers are not supported in tcp mode")
    62                return False
    63
    64            for header_obj in self.get_additional_headers():
    65                if header_obj.get("header_name", "") == "":
    66                    self.post_error("Please provide a header name for every additional log header!")
    67                    return False
    68
    69        self.sourced_by(config)
    70        self.referenced_by(config)
    71
    72        return True
    73
    74    def add_mappings(self, ir: "IR", aconf: Config):
    75        self.cluster = ir.add_cluster(
    76            IRCluster(
    77                ir=ir,
    78                aconf=aconf,
    79                parent_ir_resource=self,
    80                location=self.location,
    81                service=self.service,
    82                host_rewrite=self.get("host_rewrite", None),
    83                marker="logging",
    84                grpc=self.grpc,
    85                stats_name=self.get("stats_name", None),
    86            )
    87        )
    88
    89        self.cluster.referenced_by(self)
    90
    91    def get_common_config(self) -> dict:
    92        # get_common_config isn't allowed to be called before add_mappings
    93        # is called (by ir.walk_saved_resources). So we can assert that
    94        # self.cluster isn't None here, both to make mypy happier and out
    95        # of paranoia.
    96        assert self.cluster
    97
    98        return {
    99            "transport_api_version": self.protocol_version.upper(),
   100            "log_name": self.name,
   101            "grpc_service": {"envoy_grpc": {"cluster_name": self.cluster.envoy_name}},
   102            "buffer_flush_interval": "%ds" % self.flush_interval_time,
   103            "buffer_size_bytes": self.flush_interval_byte_size,
   104        }
   105
   106    def get_additional_headers(self) -> list:
   107        if "additional_log_headers" in self.driver_config:
   108            return self.driver_config.get("additional_log_headers", [])
   109        else:
   110            return []
   111
   112
   113class IRLogServiceFactory:
   114    @classmethod
   115    def load_all(cls, ir: "IR", aconf: Config) -> None:
   116        services = aconf.get_config("log_services")
   117        if services is not None:
   118            for config in services.values():
   119                srv = IRLogService(ir, config)
   120                extant_srv = ir.log_services.get(srv.name, None)
   121
   122                if extant_srv:
   123                    ir.post_error(
   124                        "Duplicate LogService %s; keeping definition from %s"
   125                        % (srv.name, extant_srv.location)
   126                    )
   127                elif srv.is_active():
   128                    ir.log_services[srv.name] = srv
   129                    ir.save_resource(srv)

View as plain text