...

Text file src/edge-infra.dev/pkg/edge/logging/fluentbit/severity.lua

Documentation: edge-infra.dev/pkg/edge/logging/fluentbit

     1local severity_hierarchy = {
     2  ["debug"] = 1,
     3  ["info"] = 2,
     4  ["notice"] = 3,
     5  ["warn"] = 4,
     6  ["warning"] = 4,
     7  ["error"] = 5,
     8  ["crit"] = 6,
     9  ["critical"] = 6,
    10  ["alert"] = 7,
    11  ["emergency"] = 8,
    12}
    13
    14local critical_logs = {
    15  ["audit"] = true,
    16  ["security"] = true,
    17}
    18
    19--[[
    20    Extracting this method out to allow for more dynamic usage(ex. need of log replay pipeline needing its own severity level)
    21    of get_allowed_severity, so we dont have to mess up the local min_level for normal severity filtering. 
    22  ]]
    23local function severity_hierarchy_finder(severity_allowed, min_level)
    24  for severity, level in pairs(severity_hierarchy) do
    25    if level >= min_level then
    26      severity_allowed[severity] = true
    27    end
    28  end
    29  return severity_allowed
    30end
    31
    32
    33local function get_allowed_severity(tag, record, log_levels)
    34  local tag_pattern = "^(k8s_container%.)([%a%-%d]+)_([%a%-%d]+)_([%a%-%d]+)"
    35  local k8s_data = record["kubernetes"]
    36  local log_class = record["log_class"]
    37  local severity_allowed = {}
    38  local min_level = 5 -- allow "error" and above by default
    39
    40  -- allow certain logs like security or audit logs to always pass through our filter
    41  if log_class and critical_logs[log_class] then
    42    return nil
    43  end
    44
    45  --[[
    46      This should create a severity allowed hashmap for log replay specifcally for the log passed in.
    47      Effectively kicking it out of any further severity filtering logic in this method.
    48
    49      Log replay Example:
    50      OG Log was error
    51      Replay Log will be info like the format in replay.go 
    52      then rewrite_tag drops info and makes original-tag the actual tag
    53      then goes through fluent bit as og log with replay elements
    54    ]]
    55  if log_class == "replay" then 
    56    if record["replay_severity"] then
    57      replay_severity = severity_hierarchy[string.lower(record["replay_severity"])]
    58      severity_allowed = severity_hierarchy_finder(severity_allowed, replay_severity)
    59      return severity_allowed
    60    end
    61    --[[
    62        This is an edge case where replay_severity doesn't get sent up we still want log_replay to make it
    63      ]]
    64    return nil
    65  end
    66
    67  --[[
    68  -- look for anything defined in the log-levels map. This defines the default log level
    69  -- for an entire cluster or for a given namespace. If a 'cluster' key is found then
    70  -- that should be the min_level allowed for logs coming from this cluster
    71  -- local log_levels = log_levels
    72  ]]
    73  if log_levels and log_levels.cluster then
    74    min_level = severity_hierarchy[log_levels.cluster]
    75  end
    76
    77  --[[ 
    78      For Lan Outage we want to be able to process logs without having to wait for the 30
    79      minute window to be able to use cached data via the kubernetes plugin. 
    80    ]]
    81  local _, namespace, _, _ = tag:match(tag_pattern)
    82  if namespace then
    83    -- check if any log levels for namespaces have been defined the log_levels map. If so, set the
    84    -- min_level to this value for the current log record.
    85    if log_levels and log_levels["namespace"] and log_levels["namespace"][namespace] then
    86      min_level = severity_hierarchy[log_levels["namespace"][namespace]]
    87    end
    88  end
    89
    90  -- check the k8s metadata attached to each log record and look for annotations that override
    91  -- which log levels are allowed to go to the cloud.
    92  if k8s_data then
    93    if k8s_data["annotations"] then
    94      -- // TODO: TO BE DEPRECATED IN 0.25 @RS185722
    95      -- users can add annotations to individual pods which designate the allowed log level for that pod. If that annotation is
    96      -- defined then
    97      local log_level = k8s_data["annotations"]["logging.edge.ncr.com/level"]
    98      if log_level and severity_hierarchy[log_level] then
    99        min_level = severity_hierarchy[log_level]
   100      end
   101    end
   102  end
   103
   104  severity_allowed = severity_hierarchy_finder(severity_allowed, min_level)
   105  return severity_allowed
   106end
   107
   108-- removes whitespaces, tabs and quote characters
   109local function trim(str)
   110  str = string.gsub(str, "%s+", "")
   111  str = string.gsub(str, "[\'\"]", "")
   112  return str
   113end
   114
   115--[[
   116-- The patterns below are used to get the contents from a file from a mounted configmap volume. An example cm would look like this:
   117--    apiVersion: v1
   118--    kind: ConfigMap
   119--    metadata:
   120--      name: log-levels
   121--    data:
   122--      log-levels: |
   123--        cluster: info
   124--        namespace:warehouse-system: debug
   125-- The resultant file would be called log-levels and it would have 2 lines :
   126--        cluster: info
   127--        namespace:warehouse-system: debug
   128--
   129-- https://www.lua.org/manual/5.1/manual.html see section 5.4.1
   130-- ^: match the beginning of a line
   131-- %S: represents all non-space characters
   132-- ( ): creates a capture group
   133]]
   134
   135local cluster_pattern = "^(cluster):(%S*)"
   136local namespace_pattern = "^(namespace):(%S*):(%S*)"
   137-- read the log-levels config map and populate the log_levels map
   138local function parse_levels_configmap(configmap_file)
   139  local log_levels = {}
   140  local file = io.open(configmap_file, "r")
   141  if file then
   142    -- set a default level to handle cases where the file might have empty values
   143    log_levels.cluster = "error"
   144
   145    for line in file:lines() do
   146      line = trim(line)
   147
   148      -- handle cluster lines using cluster_pattern. cluster_pattern has two capture groups so we'll get two values returned
   149      local cluster_match, cluster_log_level = line:match(cluster_pattern)
   150      if cluster_match and cluster_log_level and cluster_log_level ~= "" then
   151        if cluster_log_level then
   152          log_levels.cluster = string.lower(cluster_log_level)
   153        end
   154      end
   155
   156      -- handle namespace lines using namespace_pattern. namespace_pattern has three capture groups so we'll get three values returned
   157      local namespace_match, namespace, namespace_log_level = line:match(namespace_pattern)
   158      if namespace_match and namespace and namespace ~= "" then
   159        if namespace_log_level and namespace_log_level ~= "" then
   160          log_levels.namespace = log_levels.namespace or {}
   161          log_levels.namespace[namespace] = string.lower(namespace_log_level)
   162        end
   163      end
   164
   165    end
   166    file:close()
   167  end
   168
   169  --[[
   170    an example log_levels table could look like this:
   171    log_levels: {
   172      cluster: warning
   173      namespace: {
   174        warehouse-system: info,
   175        kinform: info,
   176      }
   177    }
   178  ]]
   179  return log_levels
   180end
   181
   182--[[
   183  process_logs is the starting function that fluent-bit calls
   184    - return codes : -1 record must be deleted
   185                      0 record not modified, keep the original
   186                      1 record was modified, replace timestamp and record
   187                      2 record was modified, replace record and keep timestamp
   188  ]]
   189function process_logs(tag, timestamp, record)
   190  local configmap_file = "/var/configs/log-levels/log-levels"
   191  local log_levels = parse_levels_configmap(configmap_file)
   192  local level = record["severity"]
   193  if level == nil or level == '' then
   194    level = "info"
   195  else
   196    level = string.lower(level)
   197  end
   198
   199  --[[
   200      severity_allowed is a hashmap that will have true values for the severity
   201      that are whitelisted to GCP. For example, if the cluster is on error level
   202      the map will look something like this: 
   203
   204        severity_allowed = {
   205          ["error"] = true,
   206          ["crit"] = true,
   207          ["critical"] = true,
   208          ["alert"] = true,
   209          ["emergency"] = true,
   210        }
   211
   212      Since this script gets re ran for every log that passes through. This hashmap
   213      will be recreated. Thus, making each run have a unique severity_allowed map. 
   214
   215      This helps with log_replay (further details within the get_allowed_severity method where log_replay is filtered out) 
   216      and normal severity being isolated threads within the same method.
   217    ]]
   218
   219  local severity_allowed = get_allowed_severity(tag, record, log_levels)
   220  if severity_allowed == nil or severity_allowed[level] then
   221    return 0, timestamp, record
   222  else
   223    return -1, timestamp, record
   224  end
   225end
   226
   227--[[
   228  return a table of functions here so we can import them in unit tests
   229  ]]
   230return {
   231  parse_levels_configmap = parse_levels_configmap,
   232  get_allowed_severity = get_allowed_severity
   233}

View as plain text