-- siem_test.lua -- Runs unit tests against the severity filtering code from severity.lua -- requires 2 arguments -- 1. a path to the test library: e.g.: /Users/ur1337/edge-infra/third_party/lua/simple_test/src -- 2. a path to the test target dir: e.g.: /Users/ur1337/edge-infra/pkg/edge/logging/fluentbit local test_library = arg[1] .. "/?.lua" local target_dir = arg[2] .. "/?.lua" local test_data = arg[2] .. "/testdata" -- the test library and target directory need to be added to package.path so we can 'require' them package.path = package.path .. ";" .. test_library .. ";" .. target_dir .. ";" .. test_data local test = require("simple_test.init") local siem = require("siem") local siem_config_test = { ["namespace"] = "redpanda", ["pod"] = "redpanda", ["container"] = "redpanda", ["version"] = "test-version", ["type1"] = "audit", ["type2"] = "security", ["class"] = "redpanda", } -- config maps where pod doesn't exist; container exists; neither exist local edge_siem_config_map_test_cases = { -- These tests are for reading a file and using json.decode -- to properly make use of the data in a file { description = "should populate the edge_siem_configs table for one log type", input = test_data .. "/siem/static-full.data", assert_func = function(t,result) t.not_equal(next(result), nil) end }, { description = "edge_siem_configs file not existing", input = test_data .. "/file_does_not_exist", assert_func = function(t,result) t.equal(next(result), nil) end }, { description = "empty edge_siem_configs file", input = test_data .. "/siem/empty.data", assert_func = function(t,result) t.equal(next(result), nil) end }, { description = "test reading namespace from edge_siem_configs", input = test_data .. "/siem/static-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.namespace, "fim") elseif i == 2 then t.equal(item.namespace, "data-sync-couchdb") end end end }, { description = "test reading pod from edge_siem_configs", input = test_data .. "/siem/static-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.pod, "samhaim") elseif i == 2 then t.equal(item.pod, "data-sync-couchdb") end end end }, { description = "test reading container from edge_siem_configs", input = test_data .. "/siem/static-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.container, "alpine-fim") elseif i == 2 then t.equal(item.container, "couchdb") end end end }, { description = "test reading log_class from edge_siem_configs", input = test_data .. "/siem/static-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.log_class, "audit") elseif i == 2 then t.equal(item.log_class, "security") end end end }, { description = "test reading log_type from edge_siem_configs", input = test_data .. "/siem/static-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.log_type, "alpine-fim") elseif i == 2 then t.equal(item.log_type, "couchdb") end end end }, } local third_party_siem_config_map_test_cases = { -- These tests are for reading a file and using json.decode -- to properly make use of the data in a file { description = "should populate the third_party_siem_configs table for one log type", input = test_data .. "/siem/workload-full.data", assert_func = function(t,result) t.not_equal(next(result), nil) end }, { description = "third_party_siem_configs file not existing", input = test_data .. "/file_does_not_exist", assert_func = function(t,result) t.equal(next(result), nil) end }, { description = "empty third_party_siem_configs file", input = test_data .. "/siem/empty.data", assert_func = function(t,result) t.equal(next(result), nil) end }, { description = "test reading label from workload-full.data", input = test_data .. "/siem/workload-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.helm_edge_ID, "68f1e7b3-0609-4b20-a9b6-6d365bed3801") elseif i == 2 then t.equal(item.helm_edge_ID, "68f1e7b3-0609-4b20-a9b6-6d365bed3802") elseif i == 3 then t.equal(item.helm_edge_ID, "68f1e7b3-0609-4b20-a9b6-6d365bed3803") end end end }, { description = "test reading pod from workload-full.data", input = test_data .. "/siem/workload-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.pod, "samhaim") elseif i == 2 then t.equal(item.pod, "data-sync-couchdb") elseif i == 3 then t.equal(item.pod, "authserver") end end end }, { description = "test reading container from workload-full.data", input = test_data .. "/siem/workload-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.container, "alpine-fim") elseif i == 2 then t.equal(item.container, "couchdb") elseif i == 3 then t.equal(item.container, "authserver") end end end }, { description = "test reading log_class from workload-full.data", input = test_data .. "/siem/workload-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.log_class, "audit") elseif i == 2 then t.equal(item.log_class, "security") elseif i == 3 then t.equal(item.log_class, "security") end end end }, { description = "test reading log_type from workload-full.data", input = test_data .. "/siem/workload-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.log_type, "alpine-fim") elseif i == 2 then t.equal(item.log_type, "couchdb") elseif i == 3 then t.equal(item.log_type, "authserver") end end end }, { description = "test reading pattern from workload-full.data", input = test_data .. "/siem/workload-full.data", assert_func = function(t,result) for i, item in ipairs(result) do if i == 1 then t.equal(item.pattern, "xyz") elseif i == 2 then t.equal(item.pattern, "123") elseif i == 3 then t.equal(item.pattern, "") end end end }, } local add_siem_record_test_cases_static= { { description = "check for log_class == security", input = test_data .. "/siem/static-full.data", record = { ["kubernetes"] = { ["container_name"] = "couchdb", ["namespace_name"] = "data-sync-couchdb", ["pod_name"] = "data-sync-couchdb-0", }, ["message"] = "abc123def456", ["severity"] = "INFO" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_couchdb", assert_func = function(t,result,bool) t.equal(result["log_class"], "security") t.equal(bool, true) end }, { description = "check for log_class == audit", input = test_data .. "/siem/static-full.data", record = { ["kubernetes"] = { ["container_name"] = "alpine-fim", ["namespace_name"] = "fim", ["pod_name"] = "samhaim", }, ["message"] = "abc123defrdj", ["severity"] = "INFO" }, tag = "k8s_container.fim_samhaim_alpine-fim", assert_func = function(t,result,bool) t.equal(result["log_class"], "audit") t.equal(bool, true) end }, { description = "check for log_type", input = test_data .. "/siem/static-full.data", record = { ["kubernetes"] = { ["container_name"] = "couchdb", ["namespace_name"] = "data-sync-couchdb", ["pod_name"] = "data-sync-couchdb-0", }, ["message"] = "abc123def456", ["severity"] = "INFO" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_couchdb", assert_func = function(t,result,bool) t.equal(result["log_type"], "couchdb") t.equal(bool, true) end }, { description = "check for no changes to record", input = test_data .. "/siem/static-full.data", record = { ["kubernetes"] = { ["container_name"] = "couchdb", ["namespace_name"] = "data-sync-couchdb", }, ["message"] = "abc123def456", ["severity"] = "INFO" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_incorrect-conntainer-name", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end }, { description = "linkerd init container; log_class == security", input = test_data .. "/siem/static-full.data", record = { ["kubernetes"] = { ["container_name"] = "linkerd-init", ["namespace_name"] = "data-sync-couchdb", ["pod_name"] = "data-sync-couchdb-0", }, ["message"] = "abc123def456", ["severity"] = "INFO" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_couchdb", assert_func = function(t,result,bool) t.equal(result["log_class"], "security") t.equal(bool, true) end }, { description = "linkerd proxy container; log_class == security", input = test_data .. "/siem/static-full.data", record = { ["kubernetes"] = { ["container_name"] = "linkerd-proxy", ["namespace_name"] = "data-sync-couchdb", ["pod_name"] = "data-sync-couchdb-0", }, ["message"] = "abc123def456", ["severity"] = "INFO" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_couchdb", assert_func = function(t,result,bool) t.equal(result["log_class"], "security") t.equal(bool, true) end }, } local add_siem_record_test_cases_dynamic = { { description = "check for workloadEdgeID: 68f1e7b3-0609-4b20-a9b6-6d365bed3801", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3801", } } }, ["message"] = "abcxyzdef456", ["severity"] = "INFO" }, tag = "k8s_container.fim_samhaim_alpine-fim", assert_func = function(t,result,bool) t.equal(result["log_type"], "alpine-fim") t.equal(bool, true) end }, { description = "check for workloadEdgeID: 68f1e7b3-0609-4b20-a9b6-6d365bed3802", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3802", } } }, ["message"] = "abc123def456", ["severity"] = "INFO" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_couchdb", assert_func = function(t,result,bool) t.equal(result["log_type"], "couchdb") t.equal(bool, true) end }, { description = "check for workloadEdgeID: 68f1e7b3-0609-4b20-a9b6-6d365bed3803", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3803", } } }, ["severity"] = "INFO" }, tag = "k8s_container.authserver_authserver_authserver", assert_func = function(t,result,bool) t.equal(result["log_type"], "authserver") t.equal(bool, true) end }, { description = "check for workloadEdgeID: no label found", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["labels"] = { } } }, ["message"] = "abcxyzdef456", ["severity"] = "INFO" }, tag = "k8s_container.fim_samhaim_alpine-fim", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end }, { description = "check for simple pattern: 123 in record['message'] AND right level", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3802", } } }, ["message"] = "abc123def456", ["severity"] = "INFO" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_couchdb", assert_func = function(t,result,bool) t.equal(result["log_type"], "couchdb") t.equal(bool, true) end }, { description = "check for simple pattern: 123 in record['message'] AND right level in range", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3802", } } }, ["message"] = "abc123def456", ["severity"] = "ERROR" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_couchdb", assert_func = function(t,result,bool) t.equal(result["log_type"], "couchdb") t.equal(bool, true) end }, { description = "check for simple pattern: 123 in record['message'] AND wrong level", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3802", } } }, ["message"] = "abc123def456", ["severity"] = "DEBUG" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_couchdb", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end }, { description = "check for simple pattern: xyz in record['message'] AND right level", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3801", } } }, ["message"] = "abcxyzdef456", ["severity"] = "INFO" }, tag = "k8s_container.fim_samhaim_alpine-fim", assert_func = function(t,result,bool) t.equal(result["log_type"], "alpine-fim") t.equal(bool, true) end }, { description = "check for simple pattern: xyz in record['message'] AND right level in range", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3801", } } }, ["message"] = "abcxyzdef456", ["severity"] = "CRITICAL" }, tag = "k8s_container.fim_samhaim_alpine-fim", assert_func = function(t,result,bool) t.equal(result["log_type"], "alpine-fim") t.equal(bool, true) end }, { description = "check for simple pattern: xyz in record['message'] AND wrong level", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3801", } } }, ["message"] = "abcxyzdef456", ["severity"] = "DEBUG" }, tag = "k8s_container.fim_samhaim_alpine-fim", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end }, { description = "check for simple pattern: empty pattern should return true", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3803", } } }, ["message"] = "abcxyzdef456", ["severity"] = "INFO" }, tag = "k8s_container.authserver_authserver_authserver", assert_func = function(t,result,bool) t.equal(result["log_type"], "authserver") t.equal(bool, true) end }, { description = "no record['message'] for pattern match", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3801", } } }, ["severity"] = "INFO" }, tag = "k8s_container.fim_samhaim_alpine-fim", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end }, { description = "no record['msg'] for pattern match", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3802", } } }, ["severity"] = "INFO" }, tag = "k8s_container.data-sync-couchdb_data-sync-couchdb-0_couchdb", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end }, { description = "no record['message'] for pattern match; wireguard-relay container", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3804", } } }, ["severity"] = "INFO" }, tag = "k8s_container.prometheus-exporter_vpn_wireguard-relay-6bd78894fb-txfwl", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end }, { description = "check for record['msg'] for pattern match; prometheus-operator container", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3805", } } }, ["severity"] = "INFO" }, tag = "k8s_container.prometheus-operator_prometheus-operator_prometheus-operator-6bbb94454d-dhvtr", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end }, { description = "Check that SEVERITY is working", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3806", } } }, ["severity"] = "INFO" }, tag = "k8s_container.prometheus-operator_prometheus-operator_prometheus-operator-6bbb94454d-dhvtr", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end }, { description = "check for advanced pattern: .* (in lua %w+) message check", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3807", } } }, ["message"] = "abcxyzdef456", ["severity"] = "INFO" }, tag = "k8s_container.prometheus-operator_prometheus-operator_prometheus-operator-6bbb94454d-dhvtr", assert_func = function(t,result,bool) t.equal(result["log_class"], "audit") t.equal(bool, true) end }, { description = "check for advanced pattern: x%syz in record['message']", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3808", } } }, ["message"] = "abcx yzdef456", ["severity"] = "INFO" }, tag = "k8s_container.test-namespace_test-pod_test-container", assert_func = function(t,result,bool) t.equal(result["log_type"], "test") t.equal(bool, true) end }, { description = "check for advanced pattern: x%s+y%s+z in record['message']", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3809", } } }, ["message"] = "abcx y zdef456", ["severity"] = "INFO" }, tag = "k8s_container.test-namespace_test-pod_test-container", assert_func = function(t,result,bool) t.equal(result["log_type"], "test") t.equal(bool, true) end }, { description = "check for advanced pattern: x%s+y%d+%s+z in record['message']", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3810", } } }, ["message"] = "abcx y123313 zdef456", ["severity"] = "INFO" }, tag = "k8s_container.test-namespace_test-pod_test-container", assert_func = function(t,result,bool) t.equal(result["log_type"], "test") t.equal(bool, true) end }, { description = "check for advanced pattern: ^x%s+y%d+%s+z in record['message'] PASS", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3811", } } }, ["message"] = "x y123313 zdef456", ["severity"] = "INFO" }, tag = "k8s_container.test-namespace_test-pod_test-container", assert_func = function(t,result,bool) t.equal(result["log_type"], "test") t.equal(bool, true) end }, { description = "check for advanced pattern: ^x%s+y%d+%-%s+z in record['message'] FAIL", input = test_data .. "/siem/workload-full.data", record = { ["kubernetes"] = { ["namespace"] = { ["name"] = "your-namespace", ["labels"] = { ["siem.edge.ncr.com/helm-edge-id"] = "68f1e7b3-0609-4b20-a9b6-6d365bed3812", } } }, ["message"] = "abcx y123313- zdef456", ["severity"] = "INFO" }, tag = "k8s_container.test-namespace_test-pod_test-container", assert_func = function(t,result,bool) t.equal(result["log_type"], nil) t.equal(bool, false) end } } -- PARSING TESTS RUN print("---------------------------------------") print("edge_siem_config_map_test_cases started") print("---------------------------------------") for _, case in ipairs(edge_siem_config_map_test_cases) do test(case.description, function(t) local result = siem.parse_configmap(case.input) case.assert_func(t, result) end) end print("----------------------------------------------") print("third_party_siem_config_map_test_cases started") print("----------------------------------------------") for _, case in ipairs(third_party_siem_config_map_test_cases) do test(case.description, function(t) local result = siem.parse_configmap(case.input) case.assert_func(t, result) end) end -- ADD RECORD RUNS print("-----------------------------------------") print("add_siem_record_test_cases_static started") print("-----------------------------------------") for _, case in ipairs(add_siem_record_test_cases_static) do test(case.description, function(t) local temp = siem.parse_configmap(case.input) local bool, result = siem.add_siem_record(case.tag, case.record, temp, "edge") case.assert_func(t, result, bool) end) end print("------------------------------------------") print("add_siem_record_test_cases_dynamic started") print("------------------------------------------") for _, case in ipairs(add_siem_record_test_cases_dynamic) do test(case.description, function(t) local temp = siem.parse_configmap(case.input) local bool, result = siem.add_siem_record(case.tag, case.record, temp, "workload") case.assert_func(t, result, bool) end) end