-- severity_test.lua -- Runs unit tests against the severity filtering code from severity.lua -- requires 2 arguments -- 1. a path to the test library: e.g.: /Users/ur1337/edge-infra/third_party/lua/simple_test/src -- 2. a path to the test target dir: e.g.: /Users/ur1337/edge-infra/pkg/edge/logging/fluentbit local test_library = arg[1] .. "/?.lua" local target_dir = arg[2] .. "/?.lua" local test_data = arg[2] .. "/testdata" -- the test library and target directory need to be added to package.path so we can 'require' them package.path = package.path .. ";" .. test_library .. ";" .. target_dir .. ";" .. test_data local test = require("simple_test.init") local severity = require("severity") local logLevelsTest = { ["cluster"] = "warning", ["namespace"] = { ["warehouse-system"] = "info", ["kinform"] = "critical" }, } local test_string_fill = "_test-pod-name_test-container-name_" local config_map_test_cases = { { description = "should populate the log_levels table", input = test_data .. "/log-levels-info", assert_func = function(t,result) t.not_equal(next(result), nil) end }, { description = "should return cluser level = info", input = test_data .. "/log-levels-info", assert_func = function(t,result) t.equal(result.cluster, "info") end }, { description = "should return an empty map if no file is found", input = test_data .. "/log-levels-file-does-not-exist", -- in lua, a call to next returns the next index of the table and its associated value. So if -- we have an empty map then the value should be nil. assert_func = function(t,result) t.equal(next(result), nil) end }, { description = "should return cluser level = error", input = test_data .. "/log-levels-error", assert_func = function(t,result) t.equal(result.cluster, "error") end }, { description = "should return a critical level for a cluster with namespaces defined", input = test_data .. "/log-levels-with-ns", assert_func = function(t,result) t.equal(result.cluster, "critical") end }, { description = "should return an info level for a namespace", input = test_data .. "/log-levels-with-ns", assert_func = function(t,result) t.equal(result.namespace["warehouse-system"], "info") end }, { description = "should return a warning level for a namespace", input = test_data .. "/log-levels-with-ns", assert_func = function(t,result) t.equal(result.namespace["kinform"], "warning") end }, { description = "should handle mixed case cluster severity levels", input = test_data .. "/log-levels-mixed-case", assert_func = function(t,result) t.equal(result.cluster, "error") end }, { description = "should handle mixed case namespace severity levels", input = test_data .. "/log-levels-mixed-case", assert_func = function(t,result) t.equal(result.namespace["warehouse-system"], "info") end }, { description = "should return an error level if cluster key is empty", input = test_data .. "/log-levels-cluster-empty", assert_func = function(t,result) t.equal(result.cluster, "error") end }, { description = "should return an error level if cluster setting is double quote empty string", input = test_data .. "/log-levels-cluster-empty-quotes", assert_func = function(t,result) t.equal(result.cluster, "error") end }, { description = "should return an error level if cluster setting is single quote empty string", input = test_data .. "/log-levels-cluster-empty-single-quotes", assert_func = function(t,result) t.equal(result.cluster, "error") end }, { description = "should return an error level if no cluster setting is defined", input = test_data .. "/log-levels-no-cluster", assert_func = function(t,result) t.equal(result.cluster, "error") end }, { description = "should return namespace:info if only namespaces are found ", input = test_data .. "/log-levels-no-cluster", assert_func = function(t,result) t.equal(result.namespace["warehouse-system"], "info") end }, { description = "should return namespace:error if namespace value is empty", input = test_data .. "/log-levels-with-ns-quotes", assert_func = function(t,result) t.equal(result.namespace["warehouse-system"], "error") end }, { description = "should ignore a line if namespace is empty", input = test_data .. "/log-levels-ns-empty", assert_func = function(t,result) t.equal(result.cluster, "critical") end }, { description = "should not return a namespace value if namespace is empty", input = test_data .. "/log-levels-ns-empty", assert_func = function(t,result) t.equal(result.namespace, nil) end }, { description = "should ignore a line if namespace is empty but still get other namespace levels", input = test_data .. "/log-levels-ns-empty-multi", assert_func = function(t,result) t.equal(result.namespace["warehouse-system"], "info") end }, { description = "should ignore a line if namespace is defined but has an empty severity", input = test_data .. "/log-levels-ns-value-empty", assert_func = function(t,result) t.equal(result.namespace, nil) end }, { description = "should ignore a line if namespace line is defined without a namespace name", input = test_data .. "/log-levels-ns-empty-missing", assert_func = function(t,result) t.equal(result.namespace, nil) end, }, } local normal_severity_test_cases = { { description = "allowed severity log level warehouse-system = info", record = { ["kubernetes"] = { ["container_name"] = "lumperctl", ["namespace_name"] = "warehouse-system" }, }, tag = "k8s_container.warehouse-system" .. test_string_fill, log_levels = logLevelsTest, assert_func = function(t,result) t.equal(result["info"], true) end }, { description = "allowed severity log level from cluster setting", record = { ["kubernetes"] = { ["container_name"] = "lumperctl", ["namespace_name"] = "warehouse-system" }, }, tag = "k8s_container.warehouse-system" .. test_string_fill, log_levels = logLevelsTest, assert_func = function(t,result) t.equal(result["warning"], true) end }, { description = "allowed severity log level from default setting error", record = { ["kubernetes"] = { ["container_name"] = "fluentbit", ["namespace_name"] = "fluent-operator" }, }, tag = "k8s_container.fluent-operator" .. test_string_fill, log_levels = { ["namespace"] = { ["warehouse-system"] = "info", ["kinform"] = "critical" }, }, assert_func = function(t,result) t.equal(result["error"], true) end }, { description = "audit level logs, should be nil for severity", record = { ["kubernetes"] = { ["container_name"] = "lumperctl", ["namespace_name"] = "warehouse-system" }, ["log_class"] = "audit", }, tag = "k8s_container.warehouse-system" .. test_string_fill, log_levels = logLevelsTest, assert_func = function(t,result) t.equal(result, nil) end }, { description = "security level logs, should be nil for severity", record = { ["kubernetes"] = { ["namespace_name"] = "authserver" }, ["log_class"] = "security", }, tag = "k8s_container.authserver_authserver_authserver", log_levels = logLevelsTest, assert_func = function(t,result) t.equal(result, nil) end }, { description = "annotation to info level logs", record = { ["kubernetes"] = { ["container_name"] = "lumperctl", ["namespace_name"] = "fluent-operator", ["annotations"] = { ["logging.edge.ncr.com/level"] = "info" } }, ["log_class"] = "notice", }, tag = "k8s_container.fluent-operator" .. test_string_fill, log_levels = logLevelsTest, assert_func = function(t,result) t.equal(result["info"], true) end }, } local log_replay_severity_test_cases = { { description = "allowed severity log level info: lower case", record = { ["kubernetes"] = { ["container_name"] = "lumperctl", ["namespace_name"] = "warehouse-system" }, ["replay_severity"] = "info", ["log_class"] = "replay", }, tag = "k8s_container.warehouse-system" .. test_string_fill, log_levels = logLevelsTest, assert_func = function(t,result) t.equal(result["info"], true) end }, { description = "allowed severity log level info: upper case", record = { ["kubernetes"] = { ["container_name"] = "lumperctl", ["namespace_name"] = "warehouse-system" }, ["replay_severity"] = "INFO", ["log_class"] = "replay", }, tag = "k8s_container.warehouse-system" .. test_string_fill, log_levels = logLevelsTest, assert_func = function(t,result) t.equal(result["info"], true) end }, { description = "allowed severity log level warning: upper case", record = { ["kubernetes"] = { ["container_name"] = "lumperctl", ["namespace_name"] = "warehouse-system" }, ["replay_severity"] = "WARNING", ["log_class"] = "replay", }, tag = "k8s_container.warehouse-system" .. test_string_fill, log_levels = logLevelsTest, assert_func = function(t,result) t.equal(result["warning"], true) end }, { description = "no log level passed in", record = { ["kubernetes"] = { ["container_name"] = "lumperctl", ["namespace_name"] = "warehouse-system" }, ["log_class"] = "replay", }, tag = "k8s_container.warehouse-system" .. test_string_fill, log_levels = logLevelsTest, assert_func = function(t,result) t.equal(result, nil) end }, } print("--------------------------------------") print("severity config_map_test_cases started") print("--------------------------------------") for _, case in ipairs(config_map_test_cases) do test(case.description, function(t) local result = severity.parse_levels_configmap(case.input) case.assert_func(t, result) end) end print("------------------------------------------") print("normal_allowed_severity_test_cases started") print("-----------------------------------------") for _, case in ipairs(normal_severity_test_cases) do test(case.description, function(t) local result = severity.get_allowed_severity(case.tag, case.record, case.log_levels) case.assert_func(t, result) end) end print("----------------------------------------------") print("log_replay_allowed_severity_test_cases started") print("----------------------------------------------") for _, case in ipairs(log_replay_severity_test_cases) do test(case.description, function(t) local result = severity.get_allowed_severity(case.tag, case.record, case.log_levels) case.assert_func(t, result) end) end