1local severity_hierarchy = {
2 ["debug"] = 1,
3 ["info"] = 2,
4 ["notice"] = 3,
5 ["warn"] = 4,
6 ["warning"] = 4,
7 ["error"] = 5,
8 ["crit"] = 6,
9 ["critical"] = 6,
10 ["alert"] = 7,
11 ["emergency"] = 8,
12}
13
14local critical_logs = {
15 ["audit"] = true,
16 ["security"] = true,
17}
18
19--[[
20 Extracting this method out to allow for more dynamic usage(ex. need of log replay pipeline needing its own severity level)
21 of get_allowed_severity, so we dont have to mess up the local min_level for normal severity filtering.
22 ]]
23local function severity_hierarchy_finder(severity_allowed, min_level)
24 for severity, level in pairs(severity_hierarchy) do
25 if level >= min_level then
26 severity_allowed[severity] = true
27 end
28 end
29 return severity_allowed
30end
31
32
33local function get_allowed_severity(tag, record, log_levels)
34 local tag_pattern = "^(k8s_container%.)([%a%-%d]+)_([%a%-%d]+)_([%a%-%d]+)"
35 local k8s_data = record["kubernetes"]
36 local log_class = record["log_class"]
37 local severity_allowed = {}
38 local min_level = 5 -- allow "error" and above by default
39
40 -- allow certain logs like security or audit logs to always pass through our filter
41 if log_class and critical_logs[log_class] then
42 return nil
43 end
44
45 --[[
46 This should create a severity allowed hashmap for log replay specifcally for the log passed in.
47 Effectively kicking it out of any further severity filtering logic in this method.
48
49 Log replay Example:
50 OG Log was error
51 Replay Log will be info like the format in replay.go
52 then rewrite_tag drops info and makes original-tag the actual tag
53 then goes through fluent bit as og log with replay elements
54 ]]
55 if log_class == "replay" then
56 if record["replay_severity"] then
57 replay_severity = severity_hierarchy[string.lower(record["replay_severity"])]
58 severity_allowed = severity_hierarchy_finder(severity_allowed, replay_severity)
59 return severity_allowed
60 end
61 --[[
62 This is an edge case where replay_severity doesn't get sent up we still want log_replay to make it
63 ]]
64 return nil
65 end
66
67 --[[
68 -- look for anything defined in the log-levels map. This defines the default log level
69 -- for an entire cluster or for a given namespace. If a 'cluster' key is found then
70 -- that should be the min_level allowed for logs coming from this cluster
71 -- local log_levels = log_levels
72 ]]
73 if log_levels and log_levels.cluster then
74 min_level = severity_hierarchy[log_levels.cluster]
75 end
76
77 --[[
78 For Lan Outage we want to be able to process logs without having to wait for the 30
79 minute window to be able to use cached data via the kubernetes plugin.
80 ]]
81 local _, namespace, _, _ = tag:match(tag_pattern)
82 if namespace then
83 -- check if any log levels for namespaces have been defined the log_levels map. If so, set the
84 -- min_level to this value for the current log record.
85 if log_levels and log_levels["namespace"] and log_levels["namespace"][namespace] then
86 min_level = severity_hierarchy[log_levels["namespace"][namespace]]
87 end
88 end
89
90 -- check the k8s metadata attached to each log record and look for annotations that override
91 -- which log levels are allowed to go to the cloud.
92 if k8s_data then
93 if k8s_data["annotations"] then
94 -- // TODO: TO BE DEPRECATED IN 0.25 @RS185722
95 -- users can add annotations to individual pods which designate the allowed log level for that pod. If that annotation is
96 -- defined then
97 local log_level = k8s_data["annotations"]["logging.edge.ncr.com/level"]
98 if log_level and severity_hierarchy[log_level] then
99 min_level = severity_hierarchy[log_level]
100 end
101 end
102 end
103
104 severity_allowed = severity_hierarchy_finder(severity_allowed, min_level)
105 return severity_allowed
106end
107
108-- removes whitespaces, tabs and quote characters
109local function trim(str)
110 str = string.gsub(str, "%s+", "")
111 str = string.gsub(str, "[\'\"]", "")
112 return str
113end
114
115--[[
116-- The patterns below are used to get the contents from a file from a mounted configmap volume. An example cm would look like this:
117-- apiVersion: v1
118-- kind: ConfigMap
119-- metadata:
120-- name: log-levels
121-- data:
122-- log-levels: |
123-- cluster: info
124-- namespace:warehouse-system: debug
125-- The resultant file would be called log-levels and it would have 2 lines :
126-- cluster: info
127-- namespace:warehouse-system: debug
128--
129-- https://www.lua.org/manual/5.1/manual.html see section 5.4.1
130-- ^: match the beginning of a line
131-- %S: represents all non-space characters
132-- ( ): creates a capture group
133]]
134
135local cluster_pattern = "^(cluster):(%S*)"
136local namespace_pattern = "^(namespace):(%S*):(%S*)"
137-- read the log-levels config map and populate the log_levels map
138local function parse_levels_configmap(configmap_file)
139 local log_levels = {}
140 local file = io.open(configmap_file, "r")
141 if file then
142 -- set a default level to handle cases where the file might have empty values
143 log_levels.cluster = "error"
144
145 for line in file:lines() do
146 line = trim(line)
147
148 -- handle cluster lines using cluster_pattern. cluster_pattern has two capture groups so we'll get two values returned
149 local cluster_match, cluster_log_level = line:match(cluster_pattern)
150 if cluster_match and cluster_log_level and cluster_log_level ~= "" then
151 if cluster_log_level then
152 log_levels.cluster = string.lower(cluster_log_level)
153 end
154 end
155
156 -- handle namespace lines using namespace_pattern. namespace_pattern has three capture groups so we'll get three values returned
157 local namespace_match, namespace, namespace_log_level = line:match(namespace_pattern)
158 if namespace_match and namespace and namespace ~= "" then
159 if namespace_log_level and namespace_log_level ~= "" then
160 log_levels.namespace = log_levels.namespace or {}
161 log_levels.namespace[namespace] = string.lower(namespace_log_level)
162 end
163 end
164
165 end
166 file:close()
167 end
168
169 --[[
170 an example log_levels table could look like this:
171 log_levels: {
172 cluster: warning
173 namespace: {
174 warehouse-system: info,
175 kinform: info,
176 }
177 }
178 ]]
179 return log_levels
180end
181
182--[[
183 process_logs is the starting function that fluent-bit calls
184 - return codes : -1 record must be deleted
185 0 record not modified, keep the original
186 1 record was modified, replace timestamp and record
187 2 record was modified, replace record and keep timestamp
188 ]]
189function process_logs(tag, timestamp, record)
190 local configmap_file = "/var/configs/log-levels/log-levels"
191 local log_levels = parse_levels_configmap(configmap_file)
192 local level = record["severity"]
193 if level == nil or level == '' then
194 level = "info"
195 else
196 level = string.lower(level)
197 end
198
199 --[[
200 severity_allowed is a hashmap that will have true values for the severity
201 that are whitelisted to GCP. For example, if the cluster is on error level
202 the map will look something like this:
203
204 severity_allowed = {
205 ["error"] = true,
206 ["crit"] = true,
207 ["critical"] = true,
208 ["alert"] = true,
209 ["emergency"] = true,
210 }
211
212 Since this script gets re ran for every log that passes through. This hashmap
213 will be recreated. Thus, making each run have a unique severity_allowed map.
214
215 This helps with log_replay (further details within the get_allowed_severity method where log_replay is filtered out)
216 and normal severity being isolated threads within the same method.
217 ]]
218
219 local severity_allowed = get_allowed_severity(tag, record, log_levels)
220 if severity_allowed == nil or severity_allowed[level] then
221 return 0, timestamp, record
222 else
223 return -1, timestamp, record
224 end
225end
226
227--[[
228 return a table of functions here so we can import them in unit tests
229 ]]
230return {
231 parse_levels_configmap = parse_levels_configmap,
232 get_allowed_severity = get_allowed_severity
233}
View as plain text