local severity_hierarchy = { ["debug"] = 1, ["info"] = 2, ["notice"] = 3, ["warn"] = 4, ["warning"] = 4, ["error"] = 5, ["crit"] = 6, ["critical"] = 6, ["alert"] = 7, ["emergency"] = 8, } local critical_logs = { ["audit"] = true, ["security"] = true, } --[[ Extracting this method out to allow for more dynamic usage(ex. need of log replay pipeline needing its own severity level) of get_allowed_severity, so we dont have to mess up the local min_level for normal severity filtering. ]] local function severity_hierarchy_finder(severity_allowed, min_level) for severity, level in pairs(severity_hierarchy) do if level >= min_level then severity_allowed[severity] = true end end return severity_allowed end local function get_allowed_severity(tag, record, log_levels) local tag_pattern = "^(k8s_container%.)([%a%-%d]+)_([%a%-%d]+)_([%a%-%d]+)" local k8s_data = record["kubernetes"] local log_class = record["log_class"] local severity_allowed = {} local min_level = 5 -- allow "error" and above by default -- allow certain logs like security or audit logs to always pass through our filter if log_class and critical_logs[log_class] then return nil end --[[ This should create a severity allowed hashmap for log replay specifcally for the log passed in. Effectively kicking it out of any further severity filtering logic in this method. Log replay Example: OG Log was error Replay Log will be info like the format in replay.go then rewrite_tag drops info and makes original-tag the actual tag then goes through fluent bit as og log with replay elements ]] if log_class == "replay" then if record["replay_severity"] then replay_severity = severity_hierarchy[string.lower(record["replay_severity"])] severity_allowed = severity_hierarchy_finder(severity_allowed, replay_severity) return severity_allowed end --[[ This is an edge case where replay_severity doesn't get sent up we still want log_replay to make it ]] return nil end --[[ -- look for anything defined in the log-levels map. This defines the default log level -- for an entire cluster or for a given namespace. If a 'cluster' key is found then -- that should be the min_level allowed for logs coming from this cluster -- local log_levels = log_levels ]] if log_levels and log_levels.cluster then min_level = severity_hierarchy[log_levels.cluster] end --[[ For Lan Outage we want to be able to process logs without having to wait for the 30 minute window to be able to use cached data via the kubernetes plugin. ]] local _, namespace, _, _ = tag:match(tag_pattern) if namespace then -- check if any log levels for namespaces have been defined the log_levels map. If so, set the -- min_level to this value for the current log record. if log_levels and log_levels["namespace"] and log_levels["namespace"][namespace] then min_level = severity_hierarchy[log_levels["namespace"][namespace]] end end -- check the k8s metadata attached to each log record and look for annotations that override -- which log levels are allowed to go to the cloud. if k8s_data then if k8s_data["annotations"] then -- // TODO: TO BE DEPRECATED IN 0.25 @RS185722 -- users can add annotations to individual pods which designate the allowed log level for that pod. If that annotation is -- defined then local log_level = k8s_data["annotations"]["logging.edge.ncr.com/level"] if log_level and severity_hierarchy[log_level] then min_level = severity_hierarchy[log_level] end end end severity_allowed = severity_hierarchy_finder(severity_allowed, min_level) return severity_allowed end -- removes whitespaces, tabs and quote characters local function trim(str) str = string.gsub(str, "%s+", "") str = string.gsub(str, "[\'\"]", "") return str end --[[ -- The patterns below are used to get the contents from a file from a mounted configmap volume. An example cm would look like this: -- apiVersion: v1 -- kind: ConfigMap -- metadata: -- name: log-levels -- data: -- log-levels: | -- cluster: info -- namespace:warehouse-system: debug -- The resultant file would be called log-levels and it would have 2 lines : -- cluster: info -- namespace:warehouse-system: debug -- -- https://www.lua.org/manual/5.1/manual.html see section 5.4.1 -- ^: match the beginning of a line -- %S: represents all non-space characters -- ( ): creates a capture group ]] local cluster_pattern = "^(cluster):(%S*)" local namespace_pattern = "^(namespace):(%S*):(%S*)" -- read the log-levels config map and populate the log_levels map local function parse_levels_configmap(configmap_file) local log_levels = {} local file = io.open(configmap_file, "r") if file then -- set a default level to handle cases where the file might have empty values log_levels.cluster = "error" for line in file:lines() do line = trim(line) -- handle cluster lines using cluster_pattern. cluster_pattern has two capture groups so we'll get two values returned local cluster_match, cluster_log_level = line:match(cluster_pattern) if cluster_match and cluster_log_level and cluster_log_level ~= "" then if cluster_log_level then log_levels.cluster = string.lower(cluster_log_level) end end -- handle namespace lines using namespace_pattern. namespace_pattern has three capture groups so we'll get three values returned local namespace_match, namespace, namespace_log_level = line:match(namespace_pattern) if namespace_match and namespace and namespace ~= "" then if namespace_log_level and namespace_log_level ~= "" then log_levels.namespace = log_levels.namespace or {} log_levels.namespace[namespace] = string.lower(namespace_log_level) end end end file:close() end --[[ an example log_levels table could look like this: log_levels: { cluster: warning namespace: { warehouse-system: info, kinform: info, } } ]] return log_levels end --[[ process_logs is the starting function that fluent-bit calls - return codes : -1 record must be deleted 0 record not modified, keep the original 1 record was modified, replace timestamp and record 2 record was modified, replace record and keep timestamp ]] function process_logs(tag, timestamp, record) local configmap_file = "/var/configs/log-levels/log-levels" local log_levels = parse_levels_configmap(configmap_file) local level = record["severity"] if level == nil or level == '' then level = "info" else level = string.lower(level) end --[[ severity_allowed is a hashmap that will have true values for the severity that are whitelisted to GCP. For example, if the cluster is on error level the map will look something like this: severity_allowed = { ["error"] = true, ["crit"] = true, ["critical"] = true, ["alert"] = true, ["emergency"] = true, } Since this script gets re ran for every log that passes through. This hashmap will be recreated. Thus, making each run have a unique severity_allowed map. This helps with log_replay (further details within the get_allowed_severity method where log_replay is filtered out) and normal severity being isolated threads within the same method. ]] local severity_allowed = get_allowed_severity(tag, record, log_levels) if severity_allowed == nil or severity_allowed[level] then return 0, timestamp, record else return -1, timestamp, record end end --[[ return a table of functions here so we can import them in unit tests ]] return { parse_levels_configmap = parse_levels_configmap, get_allowed_severity = get_allowed_severity }