diff --git a/config/pipeline_config_default.yaml b/config/pipeline_config_default.yaml
index 1ce2c62..16ac77b 100644
--- a/config/pipeline_config_default.yaml
+++ b/config/pipeline_config_default.yaml
@@ -3,7 +3,7 @@ readers:
method_type: log_file_reader
auto_config: False
params:
- file: local/miranda.json
+ file: tests/test_folder/audit.log
parsers:
MatcherParser:
@@ -97,11 +97,8 @@ detectors:
variables:
- pos: 0
name: var1
- params:
- threshold: 0.
header_variables:
- pos: level
- params: {}
NewValueComboDetector_All:
method_type: new_value_combo_detector
auto_config: False
diff --git a/notebooks/persistency_demo.ipynb b/notebooks/persistency_demo.ipynb
new file mode 100644
index 0000000..db6af59
--- /dev/null
+++ b/notebooks/persistency_demo.ipynb
@@ -0,0 +1,533 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "54215189",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import os\n",
+ "os.chdir(\"../\")\n",
+ "\n",
+ "from detectmatelibrary.detectors.new_value_combo_detector import NewValueComboDetector, schemas\n",
+ "from detectmatelibrary.parsers.template_matcher import MatcherParser\n",
+ "from detectmatelibrary.readers.log_file import LogFileReader\n",
+ "\n",
+ "from detectmatelibrary.common.persistency.event_data_structures.trackers import (\n",
+ " EventVariableTracker, StabilityTracker\n",
+ ")\n",
+ "from detectmatelibrary.common.persistency.event_data_structures.dataframes import (\n",
+ " EventDataFrame, ChunkedEventDataFrame\n",
+ ")\n",
+ "from detectmatelibrary.common.persistency.event_persistency import EventPersistency\n",
+ "\n",
+ "import logging\n",
+ "logging.getLogger().setLevel(logging.ERROR) # Only show errors"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "4cd73416",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import yaml\n",
+ "\n",
+ "\n",
+ "with open(\"config/pipeline_config_default.yaml\", 'r') as f:\n",
+ " config = yaml.safe_load(f)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "54ae5e78",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "reader = LogFileReader(config=config)\n",
+ "parser = MatcherParser(config=config)\n",
+ "detector = NewValueComboDetector(config=config)\n",
+ "\n",
+ "persistency1 = EventPersistency(\n",
+ " event_data_class=EventVariableTracker,\n",
+ " event_data_kwargs={\"tracker_type\": StabilityTracker},\n",
+ ")\n",
+ "\n",
+ "persistency2 = EventPersistency(\n",
+ " event_data_class=EventDataFrame,\n",
+ ")\n",
+ "\n",
+ "persistency3 = EventPersistency(\n",
+ " event_data_class=ChunkedEventDataFrame,\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "f23c07d8",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "for i in range(1000):\n",
+ " log = reader.process(as_bytes=False)\n",
+ " parsed_log = parser.process(log)\n",
+ "\n",
+ " persistency1.ingest_event(\n",
+ " event_id=parsed_log['EventID'],\n",
+ " event_template=parsed_log['template'],\n",
+ " variables=parsed_log['variables'],\n",
+ " log_format_variables=parsed_log['logFormatVariables'],\n",
+ " )\n",
+ " persistency2.ingest_event(\n",
+ " event_id=parsed_log['EventID'],\n",
+ " event_template=parsed_log['template'],\n",
+ " variables=parsed_log['variables'],\n",
+ " log_format_variables=parsed_log['logFormatVariables'],\n",
+ " )\n",
+ " persistency3.ingest_event(\n",
+ " event_id=parsed_log['EventID'],\n",
+ " event_template=parsed_log['template'],\n",
+ " variables=parsed_log['variables'],\n",
+ " log_format_variables=parsed_log['logFormatVariables'],\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "651e272e",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{0: EventVariableTracker(data={\n",
+ " \tTime: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (642)'), change_series=[1, 1, 1, '...', 1, 1, 1], unique_set={(1642820941.151:1057), (1642847941.596:1245), (1642785421.190:843), ..., (1642833541.196:1147), (1642799821.579:931), (1642822741.217:1074)}, RLE=[(True, 642)])\n",
+ " \tType: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.03125, 0.0, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 1, 1, '...', 0, 0, 0], unique_set={USER_START, CRED_DISP, USER_ACCT, USER_END, CRED_ACQ}, RLE=[(True, 5), (False, 637)])\n",
+ " \tvar_0: StabilityTracker(classification=Classification(type='UNSTABLE', reason='No classification matched; variable is unstable'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={16092, 14930, 15774, ..., 10883, 13737, 19301}, RLE=[(True, 1), (False, 4), (True, 1), '...', (False, 4), (True, 1), (False, 3)])\n",
+ " \tvar_1: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 641)])\n",
+ " \tvar_2: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.01875, 0.0, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 0, 1, '...', 0, 0, 0], unique_set={1002, 4294967295, 0}, RLE=[(True, 1), (False, 1), (True, 1), (False, 127), (True, 1), (False, 511)])\n",
+ " \tvar_3: StabilityTracker(classification=Classification(type='UNSTABLE', reason='No classification matched; variable is unstable'), change_series=[1, 0, 1, '...', 0, 1, 0], unique_set={120, 110, 117, ..., 183, 133, 162}, RLE=[(True, 1), (False, 1), (True, 1), '...', (False, 4), (True, 1), (False, 1)])\n",
+ " \tvar_4: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.025, 0.0, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 1, 1, '...', 0, 0, 0], unique_set={PAM:session_open, PAM:session_close, PAM:accounting, PAM:setcred}, RLE=[(True, 3), (False, 1), (True, 1), (False, 637)])\n",
+ " \tvar_5: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.0125, 0.0, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={\"root\", \"jhall\"}, RLE=[(True, 1), (False, 129), (True, 1), (False, 511)])\n",
+ " \tvar_6: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.0125, 0.006211180124223602, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={\"/usr/sbin/sshd\", \"/usr/sbin/cron\", \"/lib/systemd/systemd\"}, RLE=[(True, 1), (False, 129), (True, 1), (False, 48), (True, 1), (False, 462)])\n",
+ " \tvar_7: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.0125, 0.0, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={172.19.131.174, ?}, RLE=[(True, 1), (False, 129), (True, 1), (False, 511)])\n",
+ " \tvar_8: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.0125, 0.0, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={172.19.131.174, ?}, RLE=[(True, 1), (False, 129), (True, 1), (False, 511)])\n",
+ " \tvar_9: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.0125, 0.006211180124223602, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={ssh, cron, ?}, RLE=[(True, 1), (False, 129), (True, 1), (False, 48), (True, 1), (False, 462)])\n",
+ " \tvar_10: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={success}, RLE=[(True, 1), (False, 641)])\n",
+ " },\n",
+ " 2: EventVariableTracker(data={\n",
+ " \tTime: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (129)'), change_series=[1, 1, 1, '...', 1, 1, 1], unique_set={(1642774000.779:760), (1642748941.234:583), (1642820941.151:1058), ..., (1642819141.100:1050), (1642763341.623:689), (1642749421.260:591)}, RLE=[(True, 129)])\n",
+ " \tType: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={LOGIN}, RLE=[(True, 1), (False, 128)])\n",
+ " \tvar_0: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (129)'), change_series=[1, 1, 1, '...', 1, 1, 1], unique_set={16092, 14930, 15774, ..., 10883, 13737, 19301}, RLE=[(True, 129)])\n",
+ " \tvar_1: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 128)])\n",
+ " \tvar_2: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={4294967295}, RLE=[(True, 1), (False, 128)])\n",
+ " \tvar_3: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.03125, 0.03125, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={1002, 0}, RLE=[(True, 1), (False, 34), (True, 1), (False, 93)])\n",
+ " \tvar_4: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={(none)}, RLE=[(True, 1), (False, 128)])\n",
+ " \tvar_5: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={4294967295}, RLE=[(True, 1), (False, 128)])\n",
+ " \tvar_6: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (129)'), change_series=[1, 1, 1, '...', 1, 1, 1], unique_set={120, 110, 117, ..., 183, 133, 162}, RLE=[(True, 129)])\n",
+ " \tvar_7: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={1}, RLE=[(True, 1), (False, 128)])\n",
+ " },\n",
+ " 1: EventVariableTracker(data={\n",
+ " \tTime: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (215)'), change_series=[1, 1, 1, '...', 1, 1, 1], unique_set={(1642790352.631:877), (1642764532.512:701), (1642746730.000:557), ..., (1642846152.628:1232), (1642761573.588:680), (1642726457.339:397)}, RLE=[(True, 215)])\n",
+ " \tType: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.03773584905660377, 0.0, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 1, 0, '...', 0, 0, 0], unique_set={SERVICE_START, SERVICE_STOP}, RLE=[(True, 2), (False, 213)])\n",
+ " \tvar_0: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={1}, RLE=[(True, 1), (False, 214)])\n",
+ " \tvar_1: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 214)])\n",
+ " \tvar_2: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={4294967295}, RLE=[(True, 1), (False, 214)])\n",
+ " \tvar_3: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={4294967295}, RLE=[(True, 1), (False, 214)])\n",
+ " \tvar_4: StabilityTracker(classification=Classification(type='STABLE', reason='Segment means of change series [0.20754716981132076, 0.09259259259259259, 0.0, 0.0] are below segment thresholds: [1.1, 0.3, 0.1, 0.01]'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={systemd-udevd, systemd-journald, systemd-tmpfiles-clean, ..., apt-daily-upgrade, user@1002, systemd-timesyncd}, RLE=[(True, 1), (False, 3), (True, 1), '...', (False, 17), (True, 1), (False, 128)])\n",
+ " \tvar_5: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={\"systemd\"}, RLE=[(True, 1), (False, 214)])\n",
+ " \tvar_6: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={\"/lib/systemd/systemd\"}, RLE=[(True, 1), (False, 214)])\n",
+ " \tvar_7: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={?}, RLE=[(True, 1), (False, 214)])\n",
+ " \tvar_8: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={?}, RLE=[(True, 1), (False, 214)])\n",
+ " \tvar_9: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={?}, RLE=[(True, 1), (False, 214)])\n",
+ " \tvar_10: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, '...', 0, 0, 0], unique_set={success}, RLE=[(True, 1), (False, 214)])\n",
+ " },\n",
+ " 3: EventVariableTracker(data={\n",
+ " \tTime: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (4)'), change_series=[1, 1, 1, 1], unique_set={(1642746582.133:531), (1642746582.137:532), (1642746582.133:530), (1642746582.129:529)}, RLE=[(True, 4)])\n",
+ " \tType: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={AVC}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_0: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={\"STATUS\"}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_1: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={\"profile_replace\"}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_2: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={\"same as current profile, skipping\"}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_3: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={\"unconfined\"}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_4: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (4)'), change_series=[1, 1, 1, 1], unique_set={\"/usr/lib/connman/scripts/dhclient-script\", \"/usr/lib/NetworkManager/nm-dhcp-helper\", \"/usr/lib/NetworkManager/nm-dhcp-client.action\", \"/sbin/dhclient\"}, RLE=[(True, 4)])\n",
+ " \tvar_5: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={23662}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_6: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={\"apparmor_parser\"}, RLE=[(True, 1), (False, 3)])\n",
+ " },\n",
+ " 4: EventVariableTracker(data={\n",
+ " \tTime: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (4)'), change_series=[1, 1, 1, 1], unique_set={(1642746582.133:531), (1642746582.137:532), (1642746582.133:530), (1642746582.129:529)}, RLE=[(True, 4)])\n",
+ " \tType: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={SYSCALL}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_0: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={c000003e}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_1: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={1}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_2: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={yes}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_3: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (4)'), change_series=[1, 1, 1, 1], unique_set={25025, 24993, 44953, 23625}, RLE=[(True, 4)])\n",
+ " \tvar_4: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={6}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_5: StabilityTracker(classification=Classification(type='UNSTABLE', reason='No classification matched; variable is unstable'), change_series=[1, 1, 0, 0], unique_set={55688bbce110, 55688bbf9b10}, RLE=[(True, 2), (False, 2)])\n",
+ " \tvar_6: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (4)'), change_series=[1, 1, 1, 1], unique_set={5c49, 61a1, af99, 61c1}, RLE=[(True, 4)])\n",
+ " \tvar_7: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_8: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_9: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={23661}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_10: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={23662}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_11: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={4294967295}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_12: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_13: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_14: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_15: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_16: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_17: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_18: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_19: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={0}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_20: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={pts1}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_21: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={4294967295}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_22: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={\"apparmor_parser\"}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_23: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={\"/sbin/apparmor_parser\"}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_24: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={(null)}, RLE=[(True, 1), (False, 3)])\n",
+ " },\n",
+ " 5: EventVariableTracker(data={\n",
+ " \tTime: StabilityTracker(classification=Classification(type='RANDOM', reason='Unique set size equals number of samples (4)'), change_series=[1, 1, 1, 1], unique_set={(1642746582.133:531), (1642746582.137:532), (1642746582.133:530), (1642746582.129:529)}, RLE=[(True, 4)])\n",
+ " \tType: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={PROCTITLE}, RLE=[(True, 1), (False, 3)])\n",
+ " \tvar_0: StabilityTracker(classification=Classification(type='STATIC', reason='Unique set size is 1'), change_series=[1, 0, 0, 0], unique_set={61707061726D6F725F706172736572002D72002D54002D57002F6574632F61707061726D6F722E642F7362696E2E6468636C69656E74}, RLE=[(True, 1), (False, 3)])\n",
+ " },\n",
+ " 6: EventVariableTracker(data={\n",
+ " \tTime: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 1], unique_set={(1642774001.811:765), (1642761574.840:683)}, RLE=[(True, 2)])\n",
+ " \tType: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={USER_LOGIN}, RLE=[(True, 1), (False, 1)])\n",
+ " \tvar_0: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 1], unique_set={15014, 14362}, RLE=[(True, 2)])\n",
+ " \tvar_1: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={0}, RLE=[(True, 1), (False, 1)])\n",
+ " \tvar_2: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={1002}, RLE=[(True, 1), (False, 1)])\n",
+ " \tvar_3: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 1], unique_set={100, 111}, RLE=[(True, 2)])\n",
+ " \tvar_4: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={login}, RLE=[(True, 1), (False, 1)])\n",
+ " \tvar_5: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={1002}, RLE=[(True, 1), (False, 1)])\n",
+ " \tvar_6: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={\"/usr/sbin/sshd\"}, RLE=[(True, 1), (False, 1)])\n",
+ " \tvar_7: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={172.19.131.174}, RLE=[(True, 1), (False, 1)])\n",
+ " \tvar_8: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={172.19.131.174}, RLE=[(True, 1), (False, 1)])\n",
+ " \tvar_9: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={/dev/pts/0}, RLE=[(True, 1), (False, 1)])\n",
+ " \tvar_10: StabilityTracker(classification=Classification(type='INSUFFICIENT_DATA', reason='Not enough data (have 2, need 3)'), change_series=[1, 0], unique_set={success}, RLE=[(True, 1), (False, 1)])\n",
+ " }}"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "persistency1.get_events_data()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "6e70d2b3",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "
\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " | \n",
+ " Time | \n",
+ " Type | \n",
+ " var_0 | \n",
+ " var_1 | \n",
+ " var_2 | \n",
+ " var_3 | \n",
+ " var_4 | \n",
+ " var_5 | \n",
+ " var_6 | \n",
+ " var_7 | \n",
+ " var_8 | \n",
+ " var_9 | \n",
+ " var_10 | \n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " | 0 | \n",
+ " (1642723741.072:375) | \n",
+ " USER_ACCT | \n",
+ " 10125 | \n",
+ " 0 | \n",
+ " 4294967295 | \n",
+ " 4294967295 | \n",
+ " PAM:accounting | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ " | 1 | \n",
+ " (1642723741.072:376) | \n",
+ " CRED_ACQ | \n",
+ " 10125 | \n",
+ " 0 | \n",
+ " 4294967295 | \n",
+ " 4294967295 | \n",
+ " PAM:setcred | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ " | 2 | \n",
+ " (1642723741.080:378) | \n",
+ " USER_START | \n",
+ " 10125 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 65 | \n",
+ " PAM:session_open | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ " | 3 | \n",
+ " (1642723741.084:379) | \n",
+ " CRED_DISP | \n",
+ " 10125 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 65 | \n",
+ " PAM:setcred | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ " | 4 | \n",
+ " (1642723741.084:380) | \n",
+ " USER_END | \n",
+ " 10125 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 65 | \n",
+ " PAM:session_close | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ " | ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ "
\n",
+ " \n",
+ " | 637 | \n",
+ " (1642865941.046:1359) | \n",
+ " USER_END | \n",
+ " 20191 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 192 | \n",
+ " PAM:session_close | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ " | 638 | \n",
+ " (1642867741.073:1362) | \n",
+ " USER_ACCT | \n",
+ " 20264 | \n",
+ " 0 | \n",
+ " 4294967295 | \n",
+ " 4294967295 | \n",
+ " PAM:accounting | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ " | 639 | \n",
+ " (1642867741.077:1363) | \n",
+ " CRED_ACQ | \n",
+ " 20264 | \n",
+ " 0 | \n",
+ " 4294967295 | \n",
+ " 4294967295 | \n",
+ " PAM:setcred | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ " | 640 | \n",
+ " (1642867741.081:1365) | \n",
+ " USER_START | \n",
+ " 20264 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 193 | \n",
+ " PAM:session_open | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ " | 641 | \n",
+ " (1642867741.085:1366) | \n",
+ " CRED_DISP | \n",
+ " 20264 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 193 | \n",
+ " PAM:setcred | \n",
+ " \"root\" | \n",
+ " \"/usr/sbin/cron\" | \n",
+ " ? | \n",
+ " ? | \n",
+ " cron | \n",
+ " success | \n",
+ "
\n",
+ " \n",
+ "
\n",
+ "
642 rows × 13 columns
\n",
+ "
"
+ ],
+ "text/plain": [
+ " Time Type var_0 var_1 var_2 var_3 \\\n",
+ "0 (1642723741.072:375) USER_ACCT 10125 0 4294967295 4294967295 \n",
+ "1 (1642723741.072:376) CRED_ACQ 10125 0 4294967295 4294967295 \n",
+ "2 (1642723741.080:378) USER_START 10125 0 0 65 \n",
+ "3 (1642723741.084:379) CRED_DISP 10125 0 0 65 \n",
+ "4 (1642723741.084:380) USER_END 10125 0 0 65 \n",
+ ".. ... ... ... ... ... ... \n",
+ "637 (1642865941.046:1359) USER_END 20191 0 0 192 \n",
+ "638 (1642867741.073:1362) USER_ACCT 20264 0 4294967295 4294967295 \n",
+ "639 (1642867741.077:1363) CRED_ACQ 20264 0 4294967295 4294967295 \n",
+ "640 (1642867741.081:1365) USER_START 20264 0 0 193 \n",
+ "641 (1642867741.085:1366) CRED_DISP 20264 0 0 193 \n",
+ "\n",
+ " var_4 var_5 var_6 var_7 var_8 var_9 var_10 \n",
+ "0 PAM:accounting \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ "1 PAM:setcred \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ "2 PAM:session_open \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ "3 PAM:setcred \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ "4 PAM:session_close \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ ".. ... ... ... ... ... ... ... \n",
+ "637 PAM:session_close \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ "638 PAM:accounting \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ "639 PAM:setcred \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ "640 PAM:session_open \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ "641 PAM:setcred \"root\" \"/usr/sbin/cron\" ? ? cron success \n",
+ "\n",
+ "[642 rows x 13 columns]"
+ ]
+ },
+ "execution_count": 9,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "persistency2.get_event_data(0)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "8580b0d3",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{0: ChunkedEventDataFrame(df=..., rows=642, chunks=642, variables=['Time', 'Type', 'var_0', 'var_1', 'var_2', 'var_3', 'var_4', 'var_5', 'var_6', 'var_7', 'var_8', 'var_9', 'var_10']),\n",
+ " 2: ChunkedEventDataFrame(df=..., rows=129, chunks=129, variables=['Time', 'Type', 'var_0', 'var_1', 'var_2', 'var_3', 'var_4', 'var_5', 'var_6', 'var_7']),\n",
+ " 1: ChunkedEventDataFrame(df=..., rows=215, chunks=215, variables=['Time', 'Type', 'var_0', 'var_1', 'var_2', 'var_3', 'var_4', 'var_5', 'var_6', 'var_7', 'var_8', 'var_9', 'var_10']),\n",
+ " 3: ChunkedEventDataFrame(df=..., rows=4, chunks=4, variables=['Time', 'Type', 'var_0', 'var_1', 'var_2', 'var_3', 'var_4', 'var_5', 'var_6']),\n",
+ " 4: ChunkedEventDataFrame(df=..., rows=4, chunks=4, variables=['Time', 'Type', 'var_0', 'var_1', 'var_2', 'var_3', 'var_4', 'var_5', 'var_6', 'var_7', 'var_8', 'var_9', 'var_10', 'var_11', 'var_12', 'var_13', 'var_14', 'var_15', 'var_16', 'var_17', 'var_18', 'var_19', 'var_20', 'var_21', 'var_22', 'var_23', 'var_24']),\n",
+ " 5: ChunkedEventDataFrame(df=..., rows=4, chunks=4, variables=['Time', 'Type', 'var_0']),\n",
+ " 6: ChunkedEventDataFrame(df=..., rows=2, chunks=2, variables=['Time', 'Type', 'var_0', 'var_1', 'var_2', 'var_3', 'var_4', 'var_5', 'var_6', 'var_7', 'var_8', 'var_9', 'var_10'])}"
+ ]
+ },
+ "execution_count": 10,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "persistency3.get_events_data()"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "detectmatelibrary (3.12.3)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.12.3"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/notebooks/reader_parser_detector.ipynb b/notebooks/reader_parser_detector.ipynb
new file mode 100644
index 0000000..8e36449
--- /dev/null
+++ b/notebooks/reader_parser_detector.ipynb
@@ -0,0 +1,154 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "54215189",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import os\n",
+ "os.chdir(\"../\")\n",
+ "\n",
+ "from detectmatelibrary.detectors.new_value_combo_detector import NewValueComboDetector, schemas\n",
+ "from detectmatelibrary.parsers.template_matcher import MatcherParser\n",
+ "from detectmatelibrary.readers.log_file import LogFileReader\n",
+ "\n",
+ "from detectmatelibrary.common.persistency.event_data_structures.trackers import (\n",
+ " EventVariableTracker, StabilityTracker\n",
+ ")\n",
+ "from detectmatelibrary.common.persistency.event_data_structures.dataframes import (\n",
+ " EventDataFrame, ChunkedEventDataFrame\n",
+ ")\n",
+ "from detectmatelibrary.common.persistency.event_persistency import EventPersistency\n",
+ "\n",
+ "import logging\n",
+ "logging.getLogger().setLevel(logging.ERROR) # Only show errors"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "id": "4cd73416",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import yaml\n",
+ "\n",
+ "\n",
+ "with open(\"config/pipeline_config_default.yaml\", 'r') as f:\n",
+ " config = yaml.safe_load(f)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "id": "54ae5e78",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "reader = LogFileReader(config=config)\n",
+ "parser = MatcherParser(config=config)\n",
+ "detector = NewValueComboDetector(config=config)\n",
+ "\n",
+ "output = schemas.DetectorSchema()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "id": "f23c07d8",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "reader.reset()\n",
+ "for i in range(1000):\n",
+ " log = reader.process(as_bytes=False)\n",
+ " parsed_log = parser.process(log)\n",
+ " detector.configure(parsed_log)\n",
+ "\n",
+ "detector.set_configuration()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "id": "ddc59204",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "reader.reset()\n",
+ "for i in range(1000):\n",
+ " log = reader.process(as_bytes=False)\n",
+ " parsed_log = parser.process(log)\n",
+ " detector.train(parsed_log)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "id": "651e272e",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "predictions = {}\n",
+ "\n",
+ "for i in range(1000):\n",
+ " log = reader.process(as_bytes=False)\n",
+ " parsed_log = parser.process(log)\n",
+ " prediction = detector.detect(parsed_log, output_=output)\n",
+ " predictions[i] = prediction\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "id": "51daa602",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[(859, True),\n",
+ " (860, True),\n",
+ " (861, True),\n",
+ " (862, True),\n",
+ " (864, True),\n",
+ " (865, True),\n",
+ " (866, True),\n",
+ " (867, True)]"
+ ]
+ },
+ "execution_count": 13,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "# get true predictions\n",
+ "[(i, is_anomaly) for i, is_anomaly in predictions.items() if is_anomaly]"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "detectmatelibrary (3.12.3)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.12.3"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/pyproject.toml b/pyproject.toml
index db6d0a8..e3c2c5c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -6,12 +6,15 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"drain3>=0.9.11",
+ "numpy>=2.3.2",
"pandas>=2.3.2",
+ "polars>=1.36.1",
"protobuf>=6.32.1",
"pydantic>=2.11.7",
"pyyaml>=6.0.3",
"regex>=2025.11.3",
"kafka-python>=2.3.0",
+ "ujson>=5.11.0",
]
[project.optional-dependencies]
diff --git a/src/detectmatelibrary/common/_config/__init__.py b/src/detectmatelibrary/common/_config/__init__.py
index eca49b2..5aa90ff 100644
--- a/src/detectmatelibrary/common/_config/__init__.py
+++ b/src/detectmatelibrary/common/_config/__init__.py
@@ -3,7 +3,7 @@
from pydantic import BaseModel, ConfigDict
from typing_extensions import Self
-from typing import Any, Dict
+from typing import Any, Dict, List, Optional
from copy import deepcopy
@@ -35,3 +35,61 @@ def from_dict(cls, data: Dict[str, Any], method_id: str) -> Self:
ConfigMethods.check_type(config_, method_type=aux.method_type)
return cls(**ConfigMethods.process(config_))
+
+
+def generate_detector_config(
+ variable_selection: Dict[int, List[str]],
+ templates: Dict[Any, str | None],
+ detector_name: str,
+ method_type: str,
+ base_config: Optional[Dict[str, Any]] = None,
+ **additional_params: Any,
+) -> Dict[str, Any]:
+ """Generate the configuration for detectors. Output is a dictionary.
+
+ Args:
+ variable_selection (Dict[int, List[str]]): Mapping of event IDs to variable names.
+ templates (Dict[Any, str | None]): Mapping of event IDs to their templates.
+ detector_name (str): Name of the detector.
+ method_type (str): Type of the detection method.
+ base_config (Optional[Dict[str, Any]]): Base configuration to build upon.
+ **additional_params: Additional parameters for the detector.
+ """
+
+ if base_config is None:
+ base_config = {
+ "detectors": {
+ detector_name: {
+ "method_type": method_type,
+ "auto_config": False,
+ "params": {
+ "log_variables": []
+ },
+ }
+ }
+ }
+ config = deepcopy(base_config)
+
+ detectors = config.setdefault("detectors", {})
+ detector = detectors.setdefault(detector_name, {})
+ detector.setdefault("method_type", method_type)
+ detector.setdefault("auto_config", False)
+ params = detector.setdefault("params", {})
+ params.update(additional_params)
+ log_variables = params.setdefault("log_variables", [])
+
+ for event_id, all_variables in variable_selection.items():
+ variables = [
+ {"pos": int(name.split("_")[1]), "name": name}
+ for name in all_variables if name.startswith("var_")
+ ]
+ header_variables = [{"pos": name} for name in all_variables if not name.startswith("var_")]
+
+ log_variables.append({
+ "id": f"id_{event_id}",
+ "event": event_id,
+ "template": templates.get(event_id, ""),
+ "variables": variables,
+ "header_variables": header_variables,
+ })
+ return config
diff --git a/src/detectmatelibrary/common/_config/_compile.py b/src/detectmatelibrary/common/_config/_compile.py
index c965c3b..8de129b 100644
--- a/src/detectmatelibrary/common/_config/_compile.py
+++ b/src/detectmatelibrary/common/_config/_compile.py
@@ -2,7 +2,8 @@
from detectmatelibrary.common._config._formats import apply_format
-from typing import Any, Dict
+from typing import Any, Dict, List, Optional
+from copy import deepcopy
import warnings
@@ -82,3 +83,61 @@ def process(config: Dict[str, Any]) -> Dict[str, Any]:
config.update(config["params"])
config.pop("params")
return config
+
+
+def generate_detector_config(
+ variable_selection: Dict[int, List[str]],
+ templates: Dict[Any, str | None],
+ detector_name: str,
+ method_type: str,
+ base_config: Optional[Dict[str, Any]] = None,
+ **additional_params: Any,
+) -> Dict[str, Any]:
+ """Generate the configuration for detectors. Output is a dictionary.
+
+ Args:
+ variable_selection (Dict[int, List[str]]): Mapping of event IDs to variable names.
+ templates (Dict[Any, str | None]): Mapping of event IDs to their templates.
+ detector_name (str): Name of the detector.
+ method_type (str): Type of the detection method.
+ base_config (Optional[Dict[str, Any]]): Base configuration to build upon.
+ **additional_params: Additional parameters for the detector.
+ """
+
+ if base_config is None:
+ base_config = {
+ "detectors": {
+ detector_name: {
+ "method_type": method_type,
+ "auto_config": False,
+ "params": {
+ "log_variables": []
+ },
+ }
+ }
+ }
+ config = deepcopy(base_config)
+
+ detectors = config.setdefault("detectors", {})
+ detector = detectors.setdefault(detector_name, {})
+ detector.setdefault("method_type", method_type)
+ detector.setdefault("auto_config", False)
+ params = detector.setdefault("params", {})
+ params.update(additional_params)
+ log_variables = params.setdefault("log_variables", [])
+
+ for event_id, all_variables in variable_selection.items():
+ variables = [
+ {"pos": int(name.split("_")[1]), "name": name}
+ for name in all_variables if name.startswith("var_")
+ ]
+ header_variables = [{"pos": name} for name in all_variables if not name.startswith("var_")]
+
+ log_variables.append({
+ "id": f"id_{event_id}",
+ "event": event_id,
+ "template": templates.get(event_id, ""),
+ "variables": variables,
+ "header_variables": header_variables,
+ })
+ return config
diff --git a/src/detectmatelibrary/common/persistency/__init__.py b/src/detectmatelibrary/common/persistency/__init__.py
new file mode 100644
index 0000000..5ea37cd
--- /dev/null
+++ b/src/detectmatelibrary/common/persistency/__init__.py
@@ -0,0 +1,5 @@
+from .event_persistency import EventPersistency
+
+__all__ = [
+ "EventPersistency"
+]
diff --git a/src/detectmatelibrary/common/persistency/event_data_structures/base.py b/src/detectmatelibrary/common/persistency/event_data_structures/base.py
new file mode 100644
index 0000000..86ac25e
--- /dev/null
+++ b/src/detectmatelibrary/common/persistency/event_data_structures/base.py
@@ -0,0 +1,32 @@
+from abc import ABC, abstractmethod
+from typing import Any, List
+from dataclasses import dataclass
+
+
+@dataclass
+class EventDataStructure(ABC):
+ """Storage backend interface for event-based data analysis."""
+
+ event_id: int = -1
+ template: str = ""
+
+ @abstractmethod
+ def add_data(self, data_object: Any) -> None: ...
+
+ @abstractmethod
+ def get_data(self) -> Any: ...
+
+ @abstractmethod
+ def get_variables(self) -> List[str]: ...
+
+ @classmethod
+ @abstractmethod
+ def to_data(cls, raw_data: Any) -> Any:
+ """Convert raw data into the appropriate data format for storage."""
+ pass
+
+ def get_template(self) -> str:
+ return self.template
+
+ def get_event_id(self) -> int:
+ return self.event_id
diff --git a/src/detectmatelibrary/common/persistency/event_data_structures/dataframes.py b/src/detectmatelibrary/common/persistency/event_data_structures/dataframes.py
new file mode 100644
index 0000000..b3009a2
--- /dev/null
+++ b/src/detectmatelibrary/common/persistency/event_data_structures/dataframes.py
@@ -0,0 +1,116 @@
+from typing import Any, Dict, List, Optional
+from dataclasses import dataclass, field
+
+import pandas as pd
+import polars as pl
+
+from .base import EventDataStructure
+
+
+# -------- Pandas backend --------
+
+@dataclass
+class EventDataFrame(EventDataStructure):
+ """
+ Pandas DataFrame backend:
+ - Ingest appends data (expensive)
+ - Retention is not handled (can be extended)
+ - DataFrame is always materialized
+ """
+ data: pd.DataFrame = field(default_factory=pd.DataFrame)
+
+ def add_data(self, data: pd.DataFrame) -> None:
+ if len(self.data) > 0:
+ self.data = pd.concat([self.data, data], ignore_index=True)
+ else:
+ self.data = data
+
+ def get_data(self) -> pd.DataFrame:
+ return self.data
+
+ def get_variables(self) -> List[str]:
+ return list(self.data.columns)
+
+ @staticmethod
+ def to_data(raw_data: Dict[int | str, Any]) -> pd.DataFrame:
+ data = {key: [value] for key, value in raw_data.items()}
+ return pd.DataFrame(data)
+
+ def __repr__(self) -> str:
+ return f"EventDataFrame(df=..., rows={len(self.data)}, variables={self.get_variables()})"
+
+
+# -------- Polars backends --------
+@dataclass
+class ChunkedEventDataFrame(EventDataStructure):
+ """
+ Streaming-friendly Polars DataFrame backend:
+ - Ingest appends chunks (cheap)
+ - Retention by max_rows is handled internally
+ - DataFrame is materialized on demand
+ """
+ max_rows: Optional[int] = 10_000_000
+ compact_every: int = 1000
+
+ chunks: list[pl.DataFrame] = field(default_factory=list)
+ _rows: int = 0
+
+ def add_data(self, data: pl.DataFrame) -> None:
+ if data.height == 0:
+ return
+ self.chunks.append(data)
+ self._rows += data.height
+
+ if self.max_rows is not None:
+ self._evict_oldest()
+
+ if len(self.chunks) > self.compact_every:
+ self._compact()
+
+ def _evict_oldest(self) -> None:
+ if self.max_rows is not None:
+ overflow = self._rows - self.max_rows
+ if overflow <= 0:
+ return
+
+ # drop whole chunks
+ while self.chunks and overflow >= self.chunks[0].height:
+ oldest = self.chunks.pop(0)
+ overflow -= oldest.height
+ self._rows -= oldest.height
+
+ # trim remaining overflow from the oldest chunk
+ if overflow > 0 and self.chunks:
+ oldest = self.chunks[0]
+ keep = oldest.height - overflow
+ self.chunks[0] = oldest.tail(keep)
+ self._rows -= overflow
+
+ def _compact(self) -> None:
+ if not self.chunks:
+ return
+ df = pl.concat(self.chunks, how="vertical", rechunk=False)
+ self.chunks = [df]
+ self._rows = df.height
+
+ def get_data(self) -> pl.DataFrame:
+ if not self.chunks:
+ return pl.DataFrame()
+ if len(self.chunks) == 1:
+ return self.chunks[0]
+ return pl.concat(self.chunks, how="vertical", rechunk=False)
+
+ def get_variables(self) -> Any:
+ if not self.chunks:
+ return []
+ return self.chunks[0].columns
+
+ @staticmethod
+ def to_data(raw_data: Dict[str, List[Any]]) -> pl.DataFrame:
+ return pl.DataFrame(raw_data)
+
+ def __repr__(self) -> str:
+ return (
+ f"ChunkedEventDataFrame(df=..., rows={self._rows}, chunks={len(self.chunks)}, "
+ f"variables={self.get_variables()})"
+ )
diff --git a/src/detectmatelibrary/common/persistency/event_data_structures/trackers.py b/src/detectmatelibrary/common/persistency/event_data_structures/trackers.py
new file mode 100644
index 0000000..7332618
--- /dev/null
+++ b/src/detectmatelibrary/common/persistency/event_data_structures/trackers.py
@@ -0,0 +1,246 @@
+# A tracker is a data structure that stores a specific feature of a variable
+# and tracks the behavior of that feature over time/events.
+# It operates within the persistency framework to monitor how variables evolve.
+# It is interchangable with other EventDataStructure implementations.
+
+# from src.detectmatelibrary.utils.data_buffer import DataBuffer, ArgsBuffer, BufferMode
+from typing import Any, Dict, Literal, Set, Type, List
+from dataclasses import dataclass, field
+import numpy as np
+
+from detectmatelibrary.utils.preview_helpers import list_preview_str, format_dict_repr
+from detectmatelibrary.utils.RLE_list import RLEList
+
+from .base import EventDataStructure
+
+
+class StabilityClassifier:
+ """Classifier for stability based on segment means."""
+ def __init__(self, segment_thresholds: List[float], min_samples: int = 10):
+ self.segment_threshs = segment_thresholds
+ self.min_samples = min_samples
+ # for RLELists
+ self.segment_sums = [0.0] * len(segment_thresholds)
+ self.segment_counts = [0] * len(segment_thresholds)
+ self.n_segments = len(self.segment_threshs)
+ # for lists
+ self.segment_means: List[float] = []
+
+ def is_stable(self, change_series: RLEList[bool] | List[bool]) -> bool:
+ """Determine if a list of segment means is stable.
+
+ Works efficiently with RLEList without expanding to a full list.
+ """
+ # Handle both RLEList and regular list
+ if isinstance(change_series, RLEList):
+ total_len = len(change_series)
+ if total_len == 0:
+ return True
+
+ # Calculate segment boundaries
+ segment_size = total_len / self.n_segments
+ segment_boundaries = [int(i * segment_size) for i in range(self.n_segments + 1)]
+ segment_boundaries[-1] = total_len
+
+ # Compute segment means directly from RLE runs
+ segment_sums = [0.0] * self.n_segments
+ segment_counts = [0] * self.n_segments
+
+ position = 0
+ for value, count in change_series.runs():
+ run_start = position
+ run_end = position + count
+
+ # Find which segments this run overlaps with
+ for seg_idx in range(self.n_segments):
+ seg_start = segment_boundaries[seg_idx]
+ seg_end = segment_boundaries[seg_idx + 1]
+
+ # Calculate overlap between run and segment
+ overlap_start = max(run_start, seg_start)
+ overlap_end = min(run_end, seg_end)
+ overlap_count = max(0, overlap_end - overlap_start)
+
+ if overlap_count > 0:
+ segment_sums[seg_idx] += value * overlap_count
+ segment_counts[seg_idx] += overlap_count
+
+ position = run_end
+
+ # Calculate means
+ self.segment_means = [
+ segment_sums[i] / segment_counts[i] if segment_counts[i] > 0 else np.nan
+ for i in range(self.n_segments)
+ ]
+ else:
+ # Original implementation for regular lists
+ self.segment_means = self._compute_segment_means(change_series)
+ return all([not q >= thresh for q, thresh in zip(self.segment_means, self.segment_threshs)])
+
+ def _compute_segment_means(self, change_series: List[bool]) -> List[float]:
+ """Get means of each segment for a normal list."""
+ segments = np.array_split(change_series, self.n_segments)
+ return list(map(lambda x: np.mean(x) if len(x) > 0 else np.nan, segments))
+
+ def get_last_segment_means(self) -> List[float]:
+ return self.segment_means
+
+ def get_segment_thresholds(self) -> List[float]:
+ return self.segment_threshs
+
+ def __call__(self, change_series: RLEList[bool] | List[bool]) -> bool:
+ return self.is_stable(change_series)
+
+ def __repr__(self) -> str:
+ return (
+ f"StabilityClassifier(segment_threshs={self.segment_threshs}, "
+ f"segment_means={self.segment_means})"
+ )
+
+
+@dataclass
+class Classification:
+ type: str = ""
+ reason: str = ""
+
+
+class StabilityTracker:
+ """Tracks whether a variable is converging to a constant value."""
+
+ def __init__(self, min_samples: int = 3) -> None:
+ self.min_samples = min_samples
+ self.change_series: RLEList[bool] = RLEList()
+ self.unique_set: Set[Any] = set()
+ self.stability_classifier: StabilityClassifier = StabilityClassifier(
+ segment_thresholds=[1.1, 0.3, 0.1, 0.01],
+ )
+
+ def add_value(self, value: Any) -> None:
+ """Add a new value to the tracker."""
+ unique_set_size_before = len(self.unique_set)
+ self.unique_set.add(value)
+ has_changed = len(self.unique_set) - unique_set_size_before > 0
+ self.change_series.append(has_changed)
+
+ def classify(self) -> Classification:
+ """Classify the variable."""
+ if len(self.change_series) < self.min_samples:
+ return Classification(
+ type="INSUFFICIENT_DATA",
+ reason=f"Not enough data (have {len(self.change_series)}, need {self.min_samples})"
+ )
+ elif len(self.unique_set) == 1:
+ return Classification(
+ type="STATIC",
+ reason="Unique set size is 1"
+ )
+ elif len(self.unique_set) == len(self.change_series):
+ return Classification(
+ type="RANDOM",
+ reason=f"Unique set size equals number of samples ({len(self.change_series)})"
+ )
+ elif self.stability_classifier.is_stable(self.change_series):
+ return Classification(
+ type="STABLE",
+ reason=(
+ f"Segment means of change series {self.stability_classifier.get_last_segment_means()} "
+ f"are below segment thresholds: {self.stability_classifier.get_segment_thresholds()}"
+ )
+ )
+ else:
+ return Classification(
+ type="UNSTABLE",
+ reason="No classification matched; variable is unstable"
+ )
+
+ def __repr__(self) -> str:
+ # show only part of the series for brevity
+ series_str = list_preview_str(self.change_series)
+ unique_set_str = "{" + ", ".join(map(str, list_preview_str(self.unique_set))) + "}"
+ RLE_str = list_preview_str(self.change_series.runs())
+ return (
+ f"StabilityTracker(classification={self.classify()}, change_series={series_str}, "
+ f"unique_set={unique_set_str}, RLE={RLE_str})"
+ )
+
+
+class VariableTrackers:
+ """Tracks multiple variables using individual trackers."""
+
+ def __init__(self, tracker_type: Type[StabilityTracker] = StabilityTracker) -> None:
+ self.trackers: Dict[str, StabilityTracker] = {}
+ self.tracker_type: Type[StabilityTracker] = tracker_type
+
+ def add_data(self, data_object: Dict[str, Any]) -> None:
+ """Add data to the appropriate variable trackers."""
+ for var_name, value in data_object.items():
+ if var_name not in self.trackers:
+ self.trackers[var_name] = self.tracker_type()
+ self.trackers[var_name].add_value(value)
+
+ def get_trackers(self) -> Dict[str, StabilityTracker]:
+ """Get the current variable trackers."""
+ return self.trackers
+
+ def classify(self) -> Dict[str, Any]:
+ """Classify all tracked variables."""
+ classifications = {}
+ for var_name, tracker in self.trackers.items():
+ classifications[var_name] = tracker.classify()
+ return classifications
+
+ def get_variables_by_classification(
+ self,
+ classification_type: Literal["INSUFFICIENT_DATA", "STATIC", "RANDOM", "STABLE", "UNSTABLE"]
+ ) -> List[str]:
+ """Get a list of variable names that are classified as the given
+ type."""
+ variables = []
+ for var_name, tracker in self.trackers.items():
+ classification = tracker.classify()
+ if classification.type == classification_type:
+ variables.append(var_name)
+ return variables
+
+ def __repr__(self) -> str:
+ strs = format_dict_repr(self.trackers, indent="\t")
+ return f"VariableTrackers{{\n\t{strs}\n}}\n"
+
+
+@dataclass
+class EventVariableTracker(EventDataStructure):
+ """Event data structure that tracks variable behaviors over time/events."""
+
+ tracker_type: Type[StabilityTracker] = StabilityTracker
+ variable_trackers: VariableTrackers = field(init=False)
+
+ def __post_init__(self) -> None:
+ self.variable_trackers = VariableTrackers(tracker_type=self.tracker_type)
+
+ def add_data(self, data_object: Any) -> None:
+ """Add data to the variable trackers."""
+ self.variable_trackers.add_data(data_object)
+
+ def get_data(self) -> Any:
+ """Retrieve the tracker's stored data."""
+ return self.variable_trackers.get_trackers()
+
+ def get_variables(self) -> list[str]:
+ """Get the list of tracked variable names."""
+ return list(self.variable_trackers.get_trackers().keys())
+
+ def get_variables_by_classification(
+ self, classification_type: Literal["INSUFFICIENT_DATA", "STATIC", "RANDOM", "STABLE", "UNSTABLE"]
+ ) -> List[str]:
+ """Get a list of variable names that are classified as the given
+ type."""
+ return self.variable_trackers.get_variables_by_classification(classification_type)
+
+ @staticmethod
+ def to_data(raw_data: Dict[str, Any]) -> Dict[str, Any]:
+ """Transform raw data into the format expected by the tracker."""
+ return raw_data
+
+ def __repr__(self) -> str:
+ strs = format_dict_repr(self.variable_trackers.get_trackers(), indent="\t")
+ return f"EventVariableTracker(data={{\n\t{strs}\n}})"
diff --git a/src/detectmatelibrary/common/persistency/event_persistency.py b/src/detectmatelibrary/common/persistency/event_persistency.py
new file mode 100644
index 0000000..3a2362c
--- /dev/null
+++ b/src/detectmatelibrary/common/persistency/event_persistency.py
@@ -0,0 +1,99 @@
+from typing import Any, Dict, List, Optional, Type
+
+from .event_data_structures.base import EventDataStructure
+
+
+# -------- Generic persistency --------
+
+class EventPersistency:
+ """
+ Event-based persistency orchestrator:
+ - manages multiple EventDataStructure instances, one per event ID
+ - doesn't know retention strategy
+ - only delegates to EventDataStructure
+
+ Args:
+ event_data_class: The EventDataStructure subclass to use for storing event data.
+ variable_blacklist: Variable names to exclude from storage. "Content" is excluded by default.
+ event_data_kwargs: Additional keyword arguments to pass to the EventDataStructure constructor.
+ """
+
+ def __init__(
+ self,
+ event_data_class: Type[EventDataStructure],
+ variable_blacklist: Optional[List[str | int]] = ["Content"],
+ *,
+ event_data_kwargs: Optional[dict[str, Any]] = None,
+ ):
+ self.events_data: Dict[int, EventDataStructure] = {}
+ self.event_data_class = event_data_class
+ self.event_data_kwargs = event_data_kwargs or {}
+ self.variable_blacklist = variable_blacklist or []
+ self.event_templates: Dict[int, str] = {}
+
+ def ingest_event(
+ self,
+ event_id: int,
+ event_template: str,
+ variables: list[Any],
+ log_format_variables: Dict[str, Any],
+ ) -> None:
+ """Ingest event data into the appropriate EventData store."""
+ self.event_templates[event_id] = event_template
+ all_variables = self.get_all_variables(variables, log_format_variables, self.variable_blacklist)
+ data = self.event_data_class.to_data(all_variables)
+
+ data_structure = self.events_data.get(event_id)
+ if data_structure is None:
+ data_structure = self.event_data_class(**self.event_data_kwargs)
+ self.events_data[event_id] = data_structure
+
+ data_structure.add_data(data)
+
+ def get_event_data(self, event_id: int) -> Any | None:
+ """Retrieve the data for a specific event ID."""
+ data_structure = self.events_data.get(event_id)
+ return data_structure.get_data() if data_structure is not None else None
+
+ def get_events_data(self) -> Dict[int, EventDataStructure]:
+ """Retrieve the events' data."""
+ return self.events_data
+
+ def get_event_template(self, event_id: int) -> str | None:
+ """Retrieve the template for a specific event ID."""
+ return self.event_templates.get(event_id)
+
+ def get_event_templates(self) -> Dict[int, str]:
+ """Retrieve all event templates."""
+ return self.event_templates
+
+ @staticmethod
+ def get_all_variables(
+ variables: list[Any],
+ log_format_variables: Dict[str, Any],
+ variable_blacklist: List[str | int],
+ event_var_prefix: str = "var_",
+ ) -> dict[str, list[Any]]:
+ """Combine log format variables and event variables into a single
+ dictionary.
+
+ Schema-friendly by using string column names.
+ """
+ all_vars: dict[str, list[Any]] = {
+ k: v for k, v in log_format_variables.items()
+ if k not in variable_blacklist
+ }
+ all_vars.update({
+ f"{event_var_prefix}{i}": val for i, val in enumerate(variables)
+ if i not in variable_blacklist
+ })
+ return all_vars
+
+ def __getitem__(self, event_id: int) -> EventDataStructure | None:
+ return self.events_data.get(event_id)
+
+ def __repr__(self) -> str:
+ return (
+ f"EventPersistency(num_event_types={len(self.events_data)}, "
+ f"keys={list(self.events_data.keys())})"
+ )
diff --git a/src/detectmatelibrary/detectors/new_value_combo_detector.py b/src/detectmatelibrary/detectors/new_value_combo_detector.py
index 01b2908..02c48ab 100644
--- a/src/detectmatelibrary/detectors/new_value_combo_detector.py
+++ b/src/detectmatelibrary/detectors/new_value_combo_detector.py
@@ -1,15 +1,20 @@
from detectmatelibrary.common._config._formats import LogVariables, AllLogVariables
+from detectmatelibrary.common._config import generate_detector_config
from detectmatelibrary.common.detector import CoreDetectorConfig
from detectmatelibrary.common.detector import CoreDetector
from detectmatelibrary.utils.data_buffer import BufferMode
-import detectmatelibrary.schemas as schemas
+from detectmatelibrary.common.persistency.event_data_structures.trackers import (
+ EventVariableTracker, StabilityTracker
+)
+from detectmatelibrary.common.persistency.event_persistency import EventPersistency
-from itertools import combinations
+import detectmatelibrary.schemas as schemas
from typing import Any, Set, Dict, cast
+from itertools import combinations
# Auxiliar methods ********************************************************
@@ -42,7 +47,7 @@ def _get_combos(
return set()
relevant_log_fields = relevant_log_fields.get_all().keys() # type: ignore
- _check_size(combo_size, len(relevant_log_fields)) # type: ignore
+ # _check_size(combo_size, len(relevant_log_fields)) # type: ignore
return set(combinations([
_get_element(input_, var_pos=field) for field in relevant_log_fields # type: ignore
@@ -93,7 +98,7 @@ def detect_combo_detector(
if not unique_combos.issubset(combos_set):
for combo in unique_combos - combos_set:
overall_score += 1
- alerts.update({"Not found combo": str(combo)})
+ alerts.update({f"EventID_{input_['EventID']}": f"Values: {combo}"})
return overall_score
@@ -120,8 +125,18 @@ def __init__(
self.config = cast(NewValueComboDetectorConfig, self.config)
self.known_combos: Dict[str | int, Set[Any]] = {"all": set()}
+ self.persistency = EventPersistency(
+ event_data_class=EventVariableTracker,
+ event_data_kwargs={"tracker_type": StabilityTracker}
+ )
def train(self, input_: schemas.ParserSchema) -> None: # type: ignore
+ # self.persistency.ingest_event(
+ # event_id=input_["EventID"],
+ # event_template=input_["template"],
+ # variables=input_["variables"],
+ # log_format_variables=input_["logFormatVariables"],
+ # )
train_combo_detector(
input_=input_,
known_combos=self.known_combos,
@@ -144,7 +159,35 @@ def detect(
if overall_score > 0:
output_["score"] = overall_score
- output_["description"] = f"The detector check combinations of {self.config.comb_size} variables"
+ output_["description"] = (
+ f"The detector checks for new value combinations of size {self.config.comb_size}."
+ )
output_["alertsObtain"].update(alerts)
return True
return False
+
+ def configure(self, input_: schemas.ParserSchema) -> None:
+ self.persistency.ingest_event(
+ event_id=input_["EventID"],
+ event_template=input_["template"],
+ variables=input_["variables"],
+ log_format_variables=input_["logFormatVariables"],
+ )
+
+ def set_configuration(self, max_combo_size: int = 6) -> None:
+ variable_combos = {}
+ templates = {}
+ for event_id, tracker in self.persistency.get_events_data().items():
+ stable_vars = tracker.get_stable_variables() # type: ignore
+ if len(stable_vars) > 1:
+ variable_combos[event_id] = stable_vars
+ templates[event_id] = self.persistency.get_event_template(event_id)
+ config_dict = generate_detector_config(
+ variable_selection=variable_combos,
+ templates=templates,
+ detector_name=self.name,
+ method_type=self.config.method_type,
+ comb_size=max_combo_size
+ )
+ # Update the config object from the dictionary instead of replacing it
+ self.config = NewValueComboDetectorConfig.from_dict(config_dict, self.name)
diff --git a/src/detectmatelibrary/parsers/json_parser.py b/src/detectmatelibrary/parsers/json_parser.py
index 713b8f2..aee11d0 100644
--- a/src/detectmatelibrary/parsers/json_parser.py
+++ b/src/detectmatelibrary/parsers/json_parser.py
@@ -4,8 +4,8 @@
from detectmatelibrary import schemas
from collections.abc import Mapping
-from typing import Any, Iterable
-import json
+from typing import Any, Iterable, Optional
+import ujson as json # type: ignore
def iter_flatten(obj: dict[str, Any], sep: str = '.') -> Iterable[tuple[str, Any]]:
@@ -47,10 +47,13 @@ class JsonParserConfig(CoreParserConfig):
method_type: str = "json_parser"
timestamp_name: str = "time"
content_name: str = "message"
- content_parser: str = "JsonMatcherParser"
+ remove_content_key: bool = False
+ content_parser: Optional[str] = None
class JsonParser(CoreParser):
+ config: JsonParserConfig # type annotation to help mypy
+
def __init__(
self,
name: str = "JsonParser",
@@ -58,24 +61,33 @@ def __init__(
) -> None:
if isinstance(config, dict):
- content_parser_name = config.get("content_parser", "JsonMatcherParser")
- content_parser_config = MatcherParserConfig.from_dict(config, content_parser_name)
- self.content_parser = MatcherParser(config=content_parser_config)
- config = JsonParserConfig.from_dict(config, name)
- super().__init__(name=name, config=config)
+ self.config = JsonParserConfig.from_dict(config, name)
+ else:
+ self.config = config
+
+ # Set up content parser if specified
+ self.content_parser = None
+ if self.config.content_parser:
+ if isinstance(config, dict):
+ content_parser_config = MatcherParserConfig.from_dict(config, self.config.content_parser)
+ self.content_parser = MatcherParser(config=content_parser_config)
+ else:
+ content_parser_config = MatcherParserConfig()
+ self.content_parser = MatcherParser(config=content_parser_config)
- self.time_extractor = KeyExtractor(key_substr=config.timestamp_name)
- self.content_extractor = KeyExtractor(key_substr=config.content_name)
+ super().__init__(name=name, config=self.config)
+ self.time_extractor = KeyExtractor(key_substr=self.config.timestamp_name)
+ self.content_extractor = KeyExtractor(key_substr=self.config.content_name)
def parse(self, input_: schemas.LogSchema, output_: schemas.ParserSchema) -> None:
log = json.loads(input_["log"])
# extract timestamp and content in the most efficient way from the json log
timestamp = self.time_extractor.extract(obj=log, delete=True)
- content = self.content_extractor.extract(obj=log, delete=True)
+ content = self.content_extractor.extract(obj=log, delete=self.config.remove_content_key)
parsed = {"EventTemplate": "", "Params": [], "EventId": 0}
# if the json also contains a message field, parse it for template and parameters
- if content:
+ if self.content_parser is not None and content is not None:
log_ = schemas.LogSchema({"log": content})
parsed_content = self.content_parser.process(log_)
parsed["EventTemplate"] = parsed_content["template"] # type: ignore
diff --git a/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py b/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py
index 8760ab2..4195aee 100644
--- a/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py
+++ b/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py
@@ -14,6 +14,150 @@ def safe_search(pattern: str, string: str, timeout: int = 1) -> regex.Match[str]
return result
+def tokenize_template(string: str) -> List[str]:
+ """Split template string treating <*> as tokens."""
+ # Split by <*> but keep <*> in the result, then filter empty strings
+ return [token for token in re.split(r'(<\*>)', string) if token]
+
+
+def exclude_digits(string: str) -> bool:
+ """Exclude the digits-domain words from partial constant."""
+ pattern = r'\d'
+ digits = re.findall(pattern, string)
+ if len(digits) == 0 or string[0].isalpha() or any(c.isupper() for c in string):
+ return False
+ elif len(digits) >= 4:
+ return True
+ else:
+ return len(digits) / len(string) > 0.3
+
+
+def normalize_spaces(text: str) -> str:
+ """Replace consecutive spaces with a single space."""
+ return re.sub(r' +', ' ', text)
+
+
+def correct_single_template(template: str, user_strings: set[str] | None = None) -> str:
+ """Apply all rules to process a template.
+
+ DS (Double Space) BL (Boolean) US (User String) DG (Digit) PS (Path-
+ like String) WV (Word concatenated with Variable) DV (Dot-separated
+ Variables) CV (Consecutive Variables)
+ """
+
+ boolean = {'true', 'false'}
+ default_strings = {'null', 'root'} # 'null', 'root', 'admin'
+ path_delimiters = { # reduced set of delimiters for tokenizing for checking the path-like strings
+ r'\s', r'\,', r'\!', r'\;', r'\:',
+ r'\=', r'\|', r'\"', r'\'', r'\+',
+ r'\[', r'\]', r'\(', r'\)', r'\{', r'\}'
+ }
+ token_delimiters = path_delimiters.union({ # all delimiters for tokenizing the remaining rules
+ r'\.', r'\-', r'\@', r'\#', r'\$', r'\%', r'\&', r'\/'
+ })
+
+ if user_strings:
+ default_strings = default_strings.union(user_strings)
+
+ # apply DS
+ template = template.strip()
+ template = re.sub(r'\s+', ' ', template)
+
+ # apply PS
+ p_tokens = re.split('(' + '|'.join(path_delimiters) + ')', template)
+ new_p_tokens = []
+ for p_token in p_tokens:
+ if (
+ re.match(r'^(\/[^\/]+)+\/?$', p_token) or
+ re.match(r'.*/.*\..*', p_token) or
+ re.match(r'^([a-zA-Z0-9-]+\.){3,}[a-z]+$', p_token)
+ ):
+ p_token = '<*>' # nosec B105
+
+ new_p_tokens.append(p_token)
+ template = ''.join(new_p_tokens)
+ # tokenize for the remaining rules
+ tokens = re.split('(' + '|'.join(token_delimiters) + ')', template) # tokenizing while keeping delimiters
+ new_tokens = []
+ for token in tokens:
+ # apply BL, US
+ for to_replace in boolean.union(default_strings):
+ # if token.lower() == to_replace.lower():
+ if token == to_replace:
+ token = '<*>' # nosec B105
+
+ # apply DG
+ # Note: hexadecimal num also appears a lot in the logs
+ # if re.match(r'^\d+$', token) or re.match(r'\b0[xX][0-9a-fA-F]+\b', token):
+ # token = '<*>'
+ if exclude_digits(token):
+ token = '<*>' # nosec B105
+
+ # apply WV
+ if re.match(r'^[^\s\/]*<\*>[^\s\/]*$', token) or re.match(r'^<\*>.*<\*>$', token):
+ token = '<*>' # nosec B105
+ # collect the result
+ new_tokens.append(token)
+
+ # make the template using new_tokens
+ template = ''.join(new_tokens)
+
+ # Substitute consecutive variables only if separated with any delimiter including "." (DV)
+ while True:
+ prev = template
+ template = re.sub(r'<\*>\.<\*>', '<*>', template)
+ if prev == template:
+ break
+
+ # Substitute consecutive variables only if not separated with any delimiter including space (CV)
+ # NOTE: this should be done at the end
+ while True:
+ prev = template
+ template = re.sub(r'<\*><\*>', '<*>', template)
+ if prev == template:
+ break
+
+ while "#<*>#" in template:
+ template = template.replace("#<*>#", "<*>")
+
+ while "<*>:<*>" in template:
+ template = template.replace("<*>:<*>", "<*>")
+
+ while "<*>/<*>" in template:
+ template = template.replace("<*>/<*>", "<*>")
+
+ while " #<*> " in template:
+ template = template.replace(" #<*> ", " <*> ")
+
+ while "<*>:<*>" in template:
+ template = template.replace("<*>:<*>", "<*>")
+
+ while "<*>#<*>" in template:
+ template = template.replace("<*>#<*>", "<*>")
+
+ while "<*>/<*>" in template:
+ template = template.replace("<*>/<*>", "<*>")
+
+ while "<*>@<*>" in template:
+ template = template.replace("<*>@<*>", "<*>")
+
+ while "<*>.<*>" in template:
+ template = template.replace("<*>.<*>", "<*>")
+
+ while ' "<*>" ' in template:
+ template = template.replace(' "<*>" ', ' <*> ')
+
+ while " '<*>' " in template:
+ template = template.replace(" '<*>' ", " <*> ")
+
+ while "<*><*>" in template:
+ template = template.replace("<*><*>", "<*>")
+
+ template = re.sub(r'<\*> [KGTM]?B\b', '<*>', template)
+
+ return template
+
+
class Preprocess:
def __init__(
self,
@@ -43,7 +187,7 @@ def _to_lowercase(s: str) -> str:
def __call__(self, text: str) -> str:
if self.__re_spaces:
- text = text.replace(" ", "")
+ text = normalize_spaces(text)
if self.__re_punctuation:
text = text.replace("<*>", "WILDCARD")
@@ -80,7 +224,7 @@ def __init__(
event_id = 0
for tpl in template_list:
cleaned_tpl = self.preprocess(tpl)
- tokens = cleaned_tpl.split("<*>")
+ tokens = tokenize_template(cleaned_tpl)
min_len = sum(len(t) for t in tokens) # lower bound to skip impossibles
info = {
@@ -137,14 +281,21 @@ def extract_parameters(log: str, template: str) -> tuple[str, ...] | None:
else:
return None
+ @staticmethod
+ def correct_single_template(template: str) -> str:
+ """Apply all rules to process a template."""
+ return correct_single_template(template)
+
def match_template_with_params(self, log: str) -> tuple[str, tuple[str, ...]] | None:
"""Return (template_string, [param1, param2, ...]) or None."""
- s, candidates = self.manager.candidate_indices(log)
- for i in candidates:
+ for i in range(len(self.manager.templates)):
t = self.manager.templates[i]
- if len(s) < t["min_len"]:
+ if len(log) < t["min_len"]:
continue
- params = self.extract_parameters(log, t["raw"])
+ t = self.manager.templates[i]
+ preprocessed_log = self.manager.preprocess(log)
+ preprocessed_template = self.correct_single_template(self.manager.preprocess(t["raw"]))
+ params = self.extract_parameters(preprocessed_log, preprocessed_template)
if params is not None:
t["count"] += 1
return t["raw"], params
diff --git a/src/detectmatelibrary/readers/log_file.py b/src/detectmatelibrary/readers/log_file.py
index 4d22164..ea55b07 100644
--- a/src/detectmatelibrary/readers/log_file.py
+++ b/src/detectmatelibrary/readers/log_file.py
@@ -57,3 +57,7 @@ def read(self, output_: schemas.LogSchema) -> bool:
output_.log = log
return not self.is_over
+
+ def reset(self) -> None:
+ self.__log_generator = self.read_logs()
+ self.is_over = False
\ No newline at end of file
diff --git a/src/detectmatelibrary/utils/LRU_set.py b/src/detectmatelibrary/utils/LRU_set.py
new file mode 100644
index 0000000..5d00df9
--- /dev/null
+++ b/src/detectmatelibrary/utils/LRU_set.py
@@ -0,0 +1,68 @@
+from collections import OrderedDict
+from collections.abc import Iterator
+from typing import List
+
+from .preview_helpers import list_preview_str
+
+
+class LRUSet:
+ """LRU = Least Recently Used.
+
+ A set with a maximum size that evicts the least-recently-used items
+ when full.
+ """
+
+ def __init__(self, max_size: int):
+ if max_size <= 0:
+ raise ValueError("max_size must be > 0")
+ self.max_size = max_size
+ self._d: OrderedDict[object, None] = OrderedDict()
+
+ def add(self, item: object) -> None:
+ # Touch -> mark as most-recent
+ if item in self._d:
+ self._d.move_to_end(item)
+ return
+
+ self._d[item] = None
+ if len(self._d) > self.max_size:
+ self._d.popitem(last=False) # evict LRU
+
+ def touch(self, item: object) -> bool:
+ """Mark item as most-recent if present.
+
+ Returns True if present.
+ """
+ if item in self._d:
+ self._d.move_to_end(item)
+ return True
+ return False
+
+ def discard(self, item: object) -> None:
+ self._d.pop(item, None)
+
+ def __contains__(self, item: object) -> bool:
+ return item in self._d
+
+ def __len__(self) -> int:
+ return len(self._d)
+
+ def __iter__(self) -> Iterator[object]:
+ return iter(self._d)
+
+ def items_lru_to_mru(self) -> List[object]:
+ return list(self._d.keys())
+
+ def __repr__(self) -> str:
+ return f"LRUSet({list_preview_str(self._d.keys())})"
+
+
+# example usage:
+
+# lru = LRUSet(max_size=3)
+# lru.add("a")
+# lru.add("b")
+# lru.add("c")
+# print(lru) # LRUSet(['a', 'b', 'c'])
+# lru.add("d")
+# print(lru) # LRUSet(['b', 'c', 'd'])
diff --git a/src/detectmatelibrary/utils/RLE_list.py b/src/detectmatelibrary/utils/RLE_list.py
new file mode 100644
index 0000000..8009433
--- /dev/null
+++ b/src/detectmatelibrary/utils/RLE_list.py
@@ -0,0 +1,63 @@
+# Incremental RLE implementation.
+# https://en.wikipedia.org/wiki/Run-length_encoding
+
+from typing import Generic, Iterable, Iterator, List, Tuple, TypeVar
+
+from .preview_helpers import list_preview_str
+
+T = TypeVar("T")
+
+
+class RLEList(Generic[T]):
+ """List-like container storing data in run-length encoded form."""
+
+ def __init__(self, data: Iterable[T] | None = None):
+ self._runs: List[Tuple[T, int]] = []
+ self._len: int = 0
+ if data is not None:
+ for x in data:
+ self.append(x)
+
+ def append(self, x: T) -> None:
+ if self._runs and self._runs[-1][0] == x:
+ v, c = self._runs[-1]
+ self._runs[-1] = (v, c + 1)
+ else:
+ self._runs.append((x, 1))
+ self._len += 1
+
+ def extend(self, xs: Iterable[T]) -> None:
+ for x in xs:
+ self.append(x)
+
+ def __len__(self) -> int:
+ return self._len
+
+ def __iter__(self) -> Iterator[T]:
+ for v, c in self._runs:
+ for _ in range(c):
+ yield v
+
+ def runs(self) -> List[Tuple[T, int]]:
+ """Return the internal RLE representation."""
+ return list(self._runs)
+
+ def __repr__(self) -> str:
+ # convert bool to int
+ runs_str = list_preview_str(self._runs)
+ return f"RLEList(len={self._len}, runs={runs_str})"
+
+
+# example usage
+
+# r = RLEList[str]()
+
+# r.append("A")
+# r.append("A")
+# r.append("B")
+# r.extend(["B", "B", "C"])
+
+# print(len(r)) # 6
+# print(list(r)) # ['A', 'A', 'B', 'B', 'B', 'C']
+# print(r.runs()) # [('A', 2), ('B', 3), ('C', 1)]
+# print(r) # RLEList(len=6, runs=[('A', 2), ('B', 3), ('C', 1)])
diff --git a/src/detectmatelibrary/utils/data_buffer.py b/src/detectmatelibrary/utils/data_buffer.py
index aaa5d6c..1f0d2a9 100644
--- a/src/detectmatelibrary/utils/data_buffer.py
+++ b/src/detectmatelibrary/utils/data_buffer.py
@@ -101,8 +101,15 @@ def _process_and_clear(self, buf: deque[Any], clear: bool = True) -> Any:
buf.clear()
return result
+ def get_buffer(self) -> deque[Any]:
+ """Get the current buffer."""
+ return self.buffer
+
def flush(self) -> Any:
"""Process remaining data_points."""
if self.buffer:
clear = self.mode != BufferMode.WINDOW
return self._process_and_clear(self.buffer, clear=clear)
+
+ def __repr__(self) -> str:
+ return f"DataBuffer(mode={self.mode}, capacity={self.size}, current_length={len(self.buffer)})"
diff --git a/src/detectmatelibrary/utils/preview_helpers.py b/src/detectmatelibrary/utils/preview_helpers.py
new file mode 100644
index 0000000..278f031
--- /dev/null
+++ b/src/detectmatelibrary/utils/preview_helpers.py
@@ -0,0 +1,19 @@
+from typing import Any, Dict
+
+
+def list_preview_str(listlike: Any, bool_to_int: bool = True) -> list[Any]:
+ """Show a preview of a listlike sequence."""
+ series_start = list(listlike)[:3]
+ if len(listlike) > 6:
+ series_end = list(listlike)[-3:]
+ series_preview = series_start + ["..."] + series_end
+ else:
+ series_preview = list(listlike)
+ if bool_to_int:
+ series_preview = [int(x) if isinstance(x, bool) else x for x in series_preview]
+ return series_preview
+
+
+def format_dict_repr(items: Dict[str, Any], indent: str = "\t") -> str:
+ """Format a dictionary as a multiline string with indentation."""
+ return f"\n{indent}".join(f"{name}: {value}" for name, value in items.items())
diff --git a/tests/test_common/test_persistency.py b/tests/test_common/test_persistency.py
new file mode 100644
index 0000000..38883de
--- /dev/null
+++ b/tests/test_common/test_persistency.py
@@ -0,0 +1,460 @@
+"""Tests for the persistency module core functionality.
+
+This module tests EventPersistency and data structure backends including
+EventDataFrame (Pandas) and ChunkedEventDataFrame (Polars).
+"""
+
+
+import pandas as pd
+import polars as pl
+from detectmatelibrary.common.persistency.event_persistency import EventPersistency
+from detectmatelibrary.common.persistency.event_data_structures.dataframes import (
+ EventDataFrame,
+ ChunkedEventDataFrame,
+)
+from detectmatelibrary.common.persistency.event_data_structures.trackers import (
+ EventVariableTracker,
+ StabilityTracker,
+)
+
+
+# Sample test data - variables is a list, not a dict
+SAMPLE_EVENT_1 = {
+ "event_id": "E001",
+ "event_template": "User <*> logged in from <*>",
+ "variables": ["alice", "192.168.1.1"],
+ "log_format_variables": {"timestamp": "2024-01-01 10:00:00"},
+}
+
+SAMPLE_EVENT_2 = {
+ "event_id": "E002",
+ "event_template": "Error in module <*>: <*>",
+ "variables": ["auth", "timeout"],
+ "log_format_variables": {"timestamp": "2024-01-01 10:01:00"},
+}
+
+SAMPLE_EVENT_3 = {
+ "event_id": "E001",
+ "event_template": "User <*> logged in from <*>",
+ "variables": ["bob", "192.168.1.2"],
+ "log_format_variables": {"timestamp": "2024-01-01 10:02:00"},
+}
+
+
+class TestEventPersistency:
+ """Test suite for EventPersistency orchestrator class."""
+
+ def test_initialization_with_pandas_backend(self):
+ """Test initialization with EventDataFrame backend."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+ assert persistency is not None
+ assert persistency.event_data_class == EventDataFrame
+
+ def test_initialization_with_polars_backend(self):
+ """Test initialization with ChunkedEventDataFrame backend."""
+ persistency = EventPersistency(
+ event_data_class=ChunkedEventDataFrame,
+ event_data_kwargs={"max_rows": 100},
+ )
+ assert persistency is not None
+ assert persistency.event_data_class == ChunkedEventDataFrame
+
+ def test_initialization_with_tracker_backend(self):
+ """Test initialization with EventVariableTracker backend."""
+ persistency = EventPersistency(
+ event_data_class=EventVariableTracker,
+ event_data_kwargs={"tracker_type": StabilityTracker},
+ )
+ assert persistency is not None
+ assert persistency.event_data_class == EventVariableTracker
+
+ def test_ingest_single_event(self):
+ """Test ingesting a single event."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+ persistency.ingest_event(**SAMPLE_EVENT_1)
+
+ data = persistency.get_event_data("E001")
+ assert data is not None
+ assert len(data) == 1
+ assert "var_0" in data.columns # alice
+ assert "var_1" in data.columns # 192.168.1.1
+ assert "timestamp" in data.columns
+
+ def test_ingest_multiple_events_same_id(self):
+ """Test ingesting multiple events with the same ID."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+ persistency.ingest_event(**SAMPLE_EVENT_1)
+ persistency.ingest_event(**SAMPLE_EVENT_3)
+
+ data = persistency.get_event_data("E001")
+ assert len(data) == 2
+ assert data["var_0"].tolist() == ["alice", "bob"]
+
+ def test_ingest_multiple_events_different_ids(self):
+ """Test ingesting events with different IDs."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+ persistency.ingest_event(**SAMPLE_EVENT_1)
+ persistency.ingest_event(**SAMPLE_EVENT_2)
+
+ data1 = persistency.get_event_data("E001")
+ data2 = persistency.get_event_data("E002")
+
+ assert len(data1) == 1
+ assert len(data2) == 1
+ assert "var_0" in data1.columns
+ assert "var_0" in data2.columns
+
+ def test_get_all_events_data(self):
+ """Test retrieving data for all events."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+ persistency.ingest_event(**SAMPLE_EVENT_1)
+ persistency.ingest_event(**SAMPLE_EVENT_2)
+
+ all_data = persistency.get_events_data()
+ assert "E001" in all_data
+ assert "E002" in all_data
+ assert isinstance(all_data["E001"], EventDataFrame)
+ assert isinstance(all_data["E002"], EventDataFrame)
+
+ def test_template_storage_and_retrieval(self):
+ """Test template storage and retrieval."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+ persistency.ingest_event(**SAMPLE_EVENT_1)
+ persistency.ingest_event(**SAMPLE_EVENT_2)
+
+ template1 = persistency.get_event_template("E001")
+ template2 = persistency.get_event_template("E002")
+
+ assert template1 == "User <*> logged in from <*>"
+ assert template2 == "Error in module <*>: <*>"
+
+ def test_get_all_templates(self):
+ """Test retrieving all templates."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+ persistency.ingest_event(**SAMPLE_EVENT_1)
+ persistency.ingest_event(**SAMPLE_EVENT_2)
+
+ templates = persistency.get_event_templates()
+ assert len(templates) == 2
+ assert templates["E001"] == "User <*> logged in from <*>"
+ assert templates["E002"] == "Error in module <*>: <*>"
+
+ def test_variable_blacklist(self):
+ """Test variable blacklisting functionality."""
+ persistency = EventPersistency(
+ event_data_class=EventDataFrame,
+ variable_blacklist=[1], # Blacklist index 1 (second variable)
+ )
+ persistency.ingest_event(**SAMPLE_EVENT_1)
+
+ data = persistency.get_event_data("E001")
+ assert "var_0" in data.columns # First variable should be present
+ assert "var_1" not in data.columns # Second variable should be blocked
+
+ def test_get_all_variables_static_method(self):
+ """Test the get_all_variables static method."""
+ variables = ["value1", "value2", "value3"]
+ log_format_variables = {"timestamp": "2024-01-01", "level": "INFO"}
+ blacklist = [1] # Blacklist index 1
+
+ combined = EventPersistency.get_all_variables(
+ variables, log_format_variables, blacklist
+ )
+
+ assert "timestamp" in combined
+ assert "level" in combined
+ assert "var_0" in combined # First variable
+ assert "var_1" not in combined # Blacklisted
+ assert "var_2" in combined # Third variable
+
+ def test_dict_like_access(self):
+ """Test dictionary-like access via __getitem__."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+ persistency.ingest_event(**SAMPLE_EVENT_1)
+
+ data_structure = persistency["E001"]
+ assert data_structure is not None
+ assert isinstance(data_structure, EventDataFrame)
+
+
+class TestEventDataFrame:
+ """Test suite for EventDataFrame (Pandas backend)."""
+
+ def test_initialization(self):
+ """Test EventDataFrame initialization."""
+ edf = EventDataFrame()
+ assert edf is not None
+ assert len(edf.data) == 0 # Empty DataFrame
+
+ def test_add_single_data(self):
+ """Test adding single data entry."""
+ edf = EventDataFrame()
+ data_dict = {"user": "alice", "ip": "192.168.1.1"}
+ data_df = EventDataFrame.to_data(data_dict)
+ edf.add_data(data_df)
+
+ assert edf.data is not None
+ assert len(edf.data) == 1
+ assert "user" in edf.data.columns
+
+ def test_add_multiple_data(self):
+ """Test adding multiple data entries."""
+ edf = EventDataFrame()
+ edf.add_data(EventDataFrame.to_data({"user": "alice", "ip": "192.168.1.1"}))
+ edf.add_data(EventDataFrame.to_data({"user": "bob", "ip": "192.168.1.2"}))
+
+ assert len(edf.data) == 2
+ assert edf.data["user"].tolist() == ["alice", "bob"]
+
+ def test_get_data(self):
+ """Test retrieving data."""
+ edf = EventDataFrame()
+ edf.add_data(EventDataFrame.to_data({"user": "alice", "ip": "192.168.1.1"}))
+
+ data = edf.get_data()
+ assert isinstance(data, pd.DataFrame)
+ assert len(data) == 1
+
+ def test_get_variable_names(self):
+ """Test retrieving variable names."""
+ edf = EventDataFrame()
+ edf.add_data(EventDataFrame.to_data({"user": "alice", "ip": "192.168.1.1", "port": "22"}))
+
+ var_names = edf.get_variables()
+ assert "user" in var_names
+ assert "ip" in var_names
+ assert "port" in var_names
+
+
+class TestChunkedEventDataFrame:
+ """Test suite for ChunkedEventDataFrame (Polars backend)."""
+
+ def test_initialization_default(self):
+ """Test ChunkedEventDataFrame initialization with defaults."""
+ cedf = ChunkedEventDataFrame()
+ assert cedf is not None
+ assert cedf.max_rows == 10_000_000
+ assert cedf.compact_every == 1000
+
+ def test_initialization_custom_params(self):
+ """Test initialization with custom parameters."""
+ cedf = ChunkedEventDataFrame(max_rows=500, compact_every=100)
+ assert cedf.max_rows == 500
+ assert cedf.compact_every == 100
+
+ def test_add_single_data(self):
+ """Test adding single data entry."""
+ cedf = ChunkedEventDataFrame(max_rows=10)
+ data_dict = {"user": ["alice"], "ip": ["192.168.1.1"]}
+ data_df = ChunkedEventDataFrame.to_data(data_dict)
+ cedf.add_data(data_df)
+
+ data = cedf.get_data()
+ assert data is not None
+ assert len(data) == 1
+
+ def test_add_data_triggers_compaction(self):
+ """Test that adding data beyond compact_every triggers compaction."""
+ cedf = ChunkedEventDataFrame(max_rows=10000, compact_every=5)
+
+ # Add 6 entries (should trigger compaction at 5)
+ for i in range(6):
+ cedf.add_data(ChunkedEventDataFrame.to_data({"user": [f"user{i}"], "value": [i]}))
+
+ # After compaction, should have 1 chunk
+ data = cedf.get_data()
+ assert len(data) == 6
+
+ def test_chunked_storage(self):
+ """Test that data is stored in chunks."""
+ cedf = ChunkedEventDataFrame(max_rows=5, compact_every=1000)
+
+ # Add more than max_rows
+ for i in range(8):
+ cedf.add_data(ChunkedEventDataFrame.to_data({"user": [f"user{i}"], "value": [i]}))
+
+ # Should have evicted oldest to stay within max_rows
+ data = cedf.get_data()
+ assert data is not None
+ assert len(data) <= 5
+
+ def test_get_variable_names(self):
+ """Test retrieving variable names from chunks."""
+ cedf = ChunkedEventDataFrame()
+ cedf.add_data(
+ ChunkedEventDataFrame.to_data({"user": ["alice"], "ip": ["192.168.1.1"], "port": ["22"]})
+ )
+
+ var_names = cedf.get_variables()
+ assert "user" in var_names
+ assert "ip" in var_names
+ assert "port" in var_names
+
+ def test_dict_to_dataframe_conversion(self):
+ """Test static method to_data."""
+ data_dict = {"user": ["alice"], "ip": ["192.168.1.1"]}
+ df = ChunkedEventDataFrame.to_data(data_dict)
+
+ assert isinstance(df, pl.DataFrame)
+ assert len(df) == 1
+ assert "user" in df.columns
+
+
+class TestEventPersistencyIntegration:
+ """Integration tests for EventPersistency with different backends."""
+
+ def test_pandas_backend_full_workflow(self):
+ """Test complete workflow with Pandas backend."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+
+ # Ingest multiple events
+ for i in range(10):
+ persistency.ingest_event(
+ event_id=f"E{i % 3}",
+ event_template=f"Template {i % 3}",
+ variables=[str(i), str(i * 10)],
+ log_format_variables={},
+ )
+
+ # Verify all events stored
+ all_data = persistency.get_events_data()
+ assert len(all_data) == 3 # 3 unique event IDs
+
+ # Verify correct grouping
+ assert len(all_data["E0"].get_data()) == 4 # 0, 3, 6, 9
+ assert len(all_data["E1"].get_data()) == 3 # 1, 4, 7
+ assert len(all_data["E2"].get_data()) == 3 # 2, 5, 8
+
+ def test_polars_backend_full_workflow(self):
+ """Test complete workflow with Polars backend."""
+ persistency = EventPersistency(
+ event_data_class=ChunkedEventDataFrame,
+ event_data_kwargs={"max_rows": 5, "compact_every": 10},
+ )
+
+ # Ingest events
+ for i in range(10):
+ persistency.ingest_event(
+ event_id="E001",
+ event_template="Test template",
+ variables=[str(i)],
+ log_format_variables={},
+ )
+
+ # Verify data retrieval works
+ data = persistency.get_event_data("E001")
+ assert data is not None
+ assert len(data) <= 5 # Should be trimmed to max_rows
+
+ def test_tracker_backend_full_workflow(self):
+ """Test complete workflow with Tracker backend."""
+ persistency = EventPersistency(
+ event_data_class=EventVariableTracker,
+ event_data_kwargs={"tracker_type": StabilityTracker},
+ )
+
+ # Ingest events with patterns
+ for i in range(20):
+ persistency.ingest_event(
+ event_id="E001",
+ event_template="Test template",
+ variables=["constant", str(i)],
+ log_format_variables={},
+ )
+
+ # Verify tracker functionality
+ data_structure = persistency.events_data["E001"]
+ assert isinstance(data_structure, EventVariableTracker)
+
+ def test_mixed_event_ids_and_templates(self):
+ """Test handling mixed event IDs and templates."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+
+ events = [
+ ("E001", "Login from <*>", ["192.168.1.1"]),
+ ("E002", "Error: <*>", ["timeout"]),
+ ("E001", "Login from <*>", ["192.168.1.2"]),
+ ("E003", "Logout <*>", ["alice"]),
+ ("E002", "Error: <*>", ["connection refused"]),
+ ]
+
+ for event_id, template, variables in events:
+ persistency.ingest_event(
+ event_id=event_id,
+ event_template=template,
+ variables=variables,
+ log_format_variables={},
+ )
+
+ # Verify correct storage
+ all_data = persistency.get_events_data()
+ assert len(all_data) == 3
+ assert len(all_data["E001"].get_data()) == 2
+ assert len(all_data["E002"].get_data()) == 2
+ assert len(all_data["E003"].get_data()) == 1
+
+ # Verify templates
+ templates = persistency.get_event_templates()
+ assert templates["E001"] == "Login from <*>"
+ assert templates["E002"] == "Error: <*>"
+ assert templates["E003"] == "Logout <*>"
+
+ def test_large_scale_ingestion(self):
+ """Test ingesting a large number of events."""
+ persistency = EventPersistency(event_data_class=EventDataFrame)
+
+ num_events = 1000
+ for i in range(num_events):
+ persistency.ingest_event(
+ event_id=f"E{i % 10}",
+ event_template=f"Template {i % 10}",
+ variables=[str(i), str(i * 2)],
+ log_format_variables={"timestamp": f"2024-01-01 10:{i % 60}:00"},
+ )
+
+ # Verify all data stored
+ all_data = persistency.get_events_data()
+ assert len(all_data) == 10
+
+ # Verify counts
+ total_rows = sum(len(data_structure.get_data()) for data_structure in all_data.values())
+ assert total_rows == num_events
+
+ def test_variable_blacklist_across_backends(self):
+ """Test variable blacklist works with different backends."""
+ # Blacklist log format variables by name and event variables by index
+ log_blacklist = ["timestamp"]
+ event_blacklist = [1] # Second event variable
+ blacklist = log_blacklist + event_blacklist
+
+ # Test with Pandas
+ p1 = EventPersistency(
+ event_data_class=EventDataFrame,
+ variable_blacklist=blacklist,
+ )
+ p1.ingest_event(
+ event_id="E001",
+ event_template="Test",
+ variables=["alice", "1234"],
+ log_format_variables={"timestamp": "2024-01-01"},
+ )
+ data1 = p1.get_event_data("E001")
+ assert "var_0" in data1.columns # First variable
+ assert "var_1" not in data1.columns # Blacklisted
+ assert "timestamp" not in data1.columns # Blacklisted
+
+ # Test with Polars
+ p2 = EventPersistency(
+ event_data_class=ChunkedEventDataFrame,
+ variable_blacklist=blacklist,
+ )
+ p2.ingest_event(
+ event_id="E001",
+ event_template="Test",
+ variables=["bob", "5678"],
+ log_format_variables={"timestamp": "2024-01-02"},
+ )
+ data2 = p2.get_event_data("E001")
+ assert "var_0" in data2.columns # First variable
+ assert "var_1" not in data2.columns # Blacklisted
+ assert "timestamp" not in data2.columns # Blacklisted
diff --git a/tests/test_common/test_stability_tracking.py b/tests/test_common/test_stability_tracking.py
new file mode 100644
index 0000000..cb06c07
--- /dev/null
+++ b/tests/test_common/test_stability_tracking.py
@@ -0,0 +1,506 @@
+"""Tests for the stability tracking module.
+
+This module tests StabilityClassifier, StabilityTracker,
+VariableTrackers, and EventVariableTracker for variable convergence and
+stability analysis.
+"""
+
+from detectmatelibrary.common.persistency.event_data_structures.trackers import (
+ StabilityClassifier,
+ StabilityTracker,
+ VariableTrackers,
+ EventVariableTracker,
+ Classification,
+)
+from detectmatelibrary.utils.RLE_list import RLEList
+
+
+class TestStabilityClassifier:
+ """Test suite for StabilityClassifier."""
+
+ def test_initialization_default(self):
+ """Test StabilityClassifier initialization with defaults."""
+ classifier = StabilityClassifier(segment_thresholds=[1.1, 0.5, 0.2, 0.1])
+ assert classifier.segment_threshs == [1.1, 0.5, 0.2, 0.1]
+ assert classifier is not None
+
+ def test_initialization_custom_threshold(self):
+ """Test initialization with custom segment thresholds."""
+ classifier = StabilityClassifier(segment_thresholds=[0.8, 0.4, 0.2, 0.05])
+ assert classifier.segment_threshs == [0.8, 0.4, 0.2, 0.05]
+
+ def test_is_stable_with_rle_list_stable_pattern(self):
+ """Test stability detection with RLEList - stable pattern."""
+ classifier = StabilityClassifier(segment_thresholds=[1.1, 0.3, 0.1, 0.01])
+
+ # Create RLEList: 10 True, 5 False, 15 True (stabilizing to True)
+ rle = RLEList()
+ for _ in range(10):
+ rle.append(True)
+ for _ in range(5):
+ rle.append(False)
+ for _ in range(15):
+ rle.append(True)
+
+ result = classifier.is_stable(rle)
+ assert isinstance(result, bool)
+
+ def test_is_stable_with_rle_list_unstable_pattern(self):
+ """Test stability detection with RLEList - unstable pattern."""
+ classifier = StabilityClassifier(segment_thresholds=[1.1, 0.3, 0.1, 0.01])
+
+ # Create alternating pattern
+ rle = RLEList()
+ for _ in range(20):
+ rle.append(True)
+ rle.append(False)
+
+ result = classifier.is_stable(rle)
+ assert isinstance(result, bool)
+
+ def test_is_stable_with_regular_list(self):
+ """Test stability detection with regular list."""
+ classifier = StabilityClassifier(segment_thresholds=[1.1, 0.3, 0.1, 0.01])
+
+ # Stable pattern
+ stable_list = [1] * 10 + [0] * 5 + [1] * 15
+ result = classifier.is_stable(stable_list)
+ assert isinstance(result, bool)
+
+ def test_different_segment_thresholds(self):
+ """Test behavior with different segment thresholds."""
+ series = [1] * 10 + [0] * 3 + [1] * 15
+
+ strict = StabilityClassifier(segment_thresholds=[0.5, 0.2, 0.05, 0.01])
+ lenient = StabilityClassifier(segment_thresholds=[2.0, 1.0, 0.5, 0.3])
+
+ result_strict = strict.is_stable(series)
+ result_lenient = lenient.is_stable(series)
+
+ assert isinstance(result_strict, bool)
+ assert isinstance(result_lenient, bool)
+
+
+class TestStabilityTracker:
+ """Test suite for StabilityTracker."""
+
+ def test_initialization_default(self):
+ """Test StabilityTracker initialization with defaults."""
+ tracker = StabilityTracker()
+ assert tracker.min_samples == 3
+ assert isinstance(tracker.change_series, RLEList)
+ assert isinstance(tracker.unique_set, set)
+
+ def test_initialization_custom_params(self):
+ """Test initialization with custom parameters."""
+ tracker = StabilityTracker(min_samples=20)
+ assert tracker.min_samples == 20
+
+ def test_add_value_single(self):
+ """Test adding a single value."""
+ tracker = StabilityTracker()
+ tracker.add_value("value1")
+
+ assert len(tracker.unique_set) == 1
+ assert len(tracker.change_series) == 1
+
+ def test_add_value_multiple_same(self):
+ """Test adding multiple same values."""
+ tracker = StabilityTracker()
+
+ for i in range(10):
+ tracker.add_value("constant")
+
+ assert len(tracker.unique_set) == 1
+ assert "constant" in tracker.unique_set
+
+ def test_add_value_multiple_different(self):
+ """Test adding multiple different values."""
+ tracker = StabilityTracker()
+
+ for i in range(10):
+ tracker.add_value(f"value_{i}")
+
+ assert len(tracker.unique_set) == 10
+
+ def test_classification_insufficient_data(self):
+ """Test classification with insufficient data."""
+ tracker = StabilityTracker(min_samples=30)
+
+ for i in range(10): # Less than min_samples
+ tracker.add_value(f"value_{i}")
+
+ result = tracker.classify()
+ assert result.type == "INSUFFICIENT_DATA"
+
+ def test_classification_static(self):
+ """Test classification as STATIC (single unique value)."""
+ tracker = StabilityTracker(min_samples=10)
+
+ for i in range(40):
+ tracker.add_value("constant")
+
+ result = tracker.classify()
+ assert result.type == "STATIC"
+
+ def test_classification_random(self):
+ """Test classification as RANDOM (all unique values)."""
+ tracker = StabilityTracker(min_samples=10)
+
+ for i in range(40):
+ tracker.add_value(f"unique_{i}")
+
+ result = tracker.classify()
+ assert result.type == "RANDOM"
+
+ def test_classification_stable(self):
+ """Test classification as STABLE (converging pattern)."""
+ tracker = StabilityTracker(min_samples=10)
+
+ # Pattern: changing values that stabilize
+ for i in range(15):
+ tracker.add_value(f"value_{i % 5}")
+ for i in range(25):
+ tracker.add_value("stable_value")
+
+ result = tracker.classify()
+ assert result.type in ["STABLE", "STATIC"]
+
+ def test_classification_unstable(self):
+ """Test classification as UNSTABLE (no clear pattern)."""
+ tracker = StabilityTracker(min_samples=10)
+
+ # Alternating pattern
+ for i in range(40):
+ tracker.add_value("value_a" if i % 2 == 0 else "value_b")
+
+ result = tracker.classify()
+ # Could be UNSTABLE or classified differently depending on classifier logic
+ assert result.type in ["UNSTABLE", "STABLE", "RANDOM"]
+
+ def test_rle_list_integration(self):
+ """Test that RLEList is used for efficient storage."""
+ tracker = StabilityTracker()
+
+ # Add values that create runs
+ for i in range(20):
+ tracker.add_value("a")
+ for i in range(20):
+ tracker.add_value("b")
+
+ # RLEList should be populated
+ assert len(tracker.change_series) > 0
+
+ def test_unique_values_tracking(self):
+ """Test unique values are tracked in a set."""
+ tracker = StabilityTracker()
+
+ values = ["a", "b", "c", "a", "b", "a"]
+ for v in values:
+ tracker.add_value(v)
+
+ assert len(tracker.unique_set) == 3
+ assert "a" in tracker.unique_set
+ assert "b" in tracker.unique_set
+ assert "c" in tracker.unique_set
+
+ def test_change_detection(self):
+ """Test that add_value correctly detects changes."""
+ tracker = StabilityTracker()
+
+ tracker.add_value("a")
+ tracker.add_value("a")
+ tracker.add_value("b")
+
+ # First value creates a change (new unique value)
+ # Second doesn't (same value)
+ # Third does (new unique value)
+ assert len(tracker.change_series) == 3
+
+
+class TestVariableTrackers:
+ """Test suite for VariableTrackers manager."""
+
+ def test_initialization_default(self):
+ """Test VariableTrackers initialization."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+ assert trackers is not None
+ assert trackers.tracker_type == StabilityTracker
+
+ def test_initialization_with_kwargs(self):
+ """Test initialization without kwargs - VariableTrackers doesn't store tracker kwargs."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+ assert trackers.tracker_type == StabilityTracker
+
+ def test_add_data_single_variable(self):
+ """Test adding data for a single variable."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+ data = {"var1": "value1"}
+ trackers.add_data(data)
+
+ all_trackers = trackers.get_trackers()
+ assert "var1" in all_trackers
+ assert isinstance(all_trackers["var1"], StabilityTracker)
+
+ def test_add_data_multiple_variables(self):
+ """Test adding data for multiple variables."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+ data = {"var1": "value1", "var2": "value2", "var3": "value3"}
+ trackers.add_data(data)
+
+ all_trackers = trackers.get_trackers()
+ assert len(all_trackers) == 3
+ assert "var1" in all_trackers
+ assert "var2" in all_trackers
+ assert "var3" in all_trackers
+
+ def test_add_data_multiple_times(self):
+ """Test adding data multiple times."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+
+ trackers.add_data({"var1": "a", "var2": "x"})
+ trackers.add_data({"var1": "b", "var2": "y"})
+ trackers.add_data({"var1": "a", "var3": "z"})
+
+ all_trackers = trackers.get_trackers()
+ assert len(all_trackers) == 3
+ assert "var3" in all_trackers # New variable dynamically added
+
+ def test_classify_all_variables(self):
+ """Test classifying all variables."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+
+ # Add enough data for classification
+ for i in range(10):
+ trackers.add_data({"var1": "constant", "var2": f"changing_{i}"})
+
+ classifications = trackers.classify()
+ assert "var1" in classifications
+ assert "var2" in classifications
+ assert isinstance(classifications["var1"], Classification)
+
+ def test_get_stable_variables(self):
+ """Test retrieving stable variables."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+
+ # Create stable pattern
+ for i in range(40):
+ trackers.add_data({
+ "stable_var": "constant",
+ "random_var": f"unique_{i}",
+ })
+
+ stable_vars = trackers.get_variables_by_classification("STABLE")
+ assert isinstance(stable_vars, list)
+ # "stable_var" should be classified as STATIC
+
+ def test_get_trackers(self):
+ """Test retrieving all trackers."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+ trackers.add_data({"var1": "a", "var2": "b"})
+
+ all_trackers = trackers.get_trackers()
+ assert isinstance(all_trackers, dict)
+ assert len(all_trackers) == 2
+
+ def test_dynamic_tracker_creation(self):
+ """Test that trackers are created dynamically."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+
+ # First add
+ trackers.add_data({"var1": "a"})
+ assert len(trackers.get_trackers()) == 1
+
+ # Second add with new variable
+ trackers.add_data({"var1": "b", "var2": "x"})
+ assert len(trackers.get_trackers()) == 2
+
+ # Third add with another new variable
+ trackers.add_data({"var3": "z"})
+ assert len(trackers.get_trackers()) == 3
+
+
+class TestEventVariableTracker:
+ """Test suite for EventVariableTracker."""
+
+ def test_initialization(self):
+ """Test EventVariableTracker initialization."""
+ evt = EventVariableTracker(tracker_type=StabilityTracker)
+ assert evt is not None
+ assert isinstance(evt.variable_trackers, VariableTrackers)
+
+ def test_add_data(self):
+ """Test adding data."""
+ evt = EventVariableTracker(tracker_type=StabilityTracker)
+ data = {"var1": "value1", "var2": "value2"}
+ evt.add_data(data)
+
+ # Should have created trackers
+ trackers = evt.get_data()
+ assert len(trackers) == 2
+
+ def test_get_variables(self):
+ """Test retrieving variable names."""
+ evt = EventVariableTracker(tracker_type=StabilityTracker)
+ evt.add_data({"var1": "a", "var2": "b", "var3": "c"})
+
+ var_names = evt.get_variables()
+ assert "var1" in var_names
+ assert "var2" in var_names
+ assert "var3" in var_names
+ assert len(var_names) == 3
+
+ def test_get_stable_variables(self):
+ """Test retrieving stable variables."""
+ evt = EventVariableTracker(tracker_type=StabilityTracker)
+
+ for i in range(40):
+ evt.add_data({
+ "stable_var": "constant",
+ "random_var": f"unique_{i}",
+ })
+
+ stable_vars = evt.get_variables_by_classification("STABLE")
+ assert isinstance(stable_vars, list)
+
+ def test_integration_with_stability_tracker(self):
+ """Test full integration with StabilityTracker."""
+ evt = EventVariableTracker(tracker_type=StabilityTracker)
+
+ # Simulate log processing
+ for i in range(50):
+ evt.add_data({
+ "user": f"user_{i % 5}",
+ "status": "success" if i > 30 else f"status_{i}",
+ "request_id": f"req_{i}",
+ })
+
+ # Get data and variables
+ var_names = evt.get_variables()
+ stable_vars = evt.get_variables_by_classification("STABLE")
+
+ assert len(var_names) == 3
+ assert isinstance(stable_vars, list)
+
+
+class TestClassification:
+ """Test suite for Classification dataclass."""
+
+ def test_initialization(self):
+ """Test Classification initialization."""
+ result = Classification(type="STABLE", reason="Test reason")
+ assert result.type == "STABLE"
+ assert result.reason == "Test reason"
+
+ def test_all_classification_types(self):
+ """Test Classification with all classification types."""
+ types = ["INSUFFICIENT_DATA", "STATIC", "RANDOM", "STABLE", "UNSTABLE"]
+
+ for cls_type in types:
+ result = Classification(type=cls_type, reason=f"Reason for {cls_type}")
+ assert result.type == cls_type
+ assert isinstance(result.reason, str)
+
+
+class TestStabilityTrackingIntegration:
+ """Integration tests for stability tracking components."""
+
+ def test_full_workflow_static_variable(self):
+ """Test full workflow with static variable."""
+ tracker = StabilityTracker(min_samples=10)
+
+ # Add 50 identical values
+ for i in range(50):
+ tracker.add_value("constant_value")
+
+ classification = tracker.classify()
+ assert classification.type == "STATIC"
+
+ def test_full_workflow_random_variable(self):
+ """Test full workflow with random variable."""
+ tracker = StabilityTracker(min_samples=10)
+
+ # Add 50 unique values
+ for i in range(50):
+ tracker.add_value(f"unique_value_{i}")
+
+ classification = tracker.classify()
+ assert classification.type == "RANDOM"
+
+ def test_full_workflow_stabilizing_variable(self):
+ """Test full workflow with stabilizing variable."""
+ tracker = StabilityTracker(min_samples=10)
+
+ # Start with varied values, then stabilize
+ for i in range(15):
+ tracker.add_value(f"value_{i % 7}")
+ for i in range(35):
+ tracker.add_value("final_stable_value")
+
+ classification = tracker.classify()
+ # Should be STABLE or STATIC
+ assert classification.type in ["STABLE", "STATIC"]
+
+ def test_multiple_variables_with_different_patterns(self):
+ """Test tracking multiple variables with different patterns."""
+ trackers = VariableTrackers(tracker_type=StabilityTracker)
+
+ # Simulate 100 events
+ for i in range(100):
+ trackers.add_data({
+ "static_var": "always_same",
+ "random_var": f"unique_{i}",
+ "user_id": f"user_{i % 10}", # 10 users cycling
+ "status": "ok" if i > 80 else f"status_{i % 5}", # Stabilizing
+ })
+
+ classifications = trackers.classify()
+
+ # static_var should be STATIC
+ assert classifications["static_var"].type == "STATIC"
+
+ # random_var should be RANDOM
+ assert classifications["random_var"].type == "RANDOM"
+
+ # user_id and status depend on classifier logic
+ assert isinstance(classifications["user_id"], Classification)
+ assert isinstance(classifications["status"], Classification)
+
+ def test_event_variable_tracker_real_world_scenario(self):
+ """Test EventVariableTracker with realistic log data."""
+ evt = EventVariableTracker(tracker_type=StabilityTracker)
+
+ # Simulate web server logs
+ for i in range(200):
+ evt.add_data({
+ "method": "GET" if i % 10 != 0 else "POST", # Mostly GET
+ "status_code": "200" if i > 150 else str(200 + (i % 5)), # Stabilizing to 200
+ "user_agent": f"Browser_{i % 3}", # 3 different browsers
+ "request_id": f"req_{i}", # Unique per request
+ "server": "prod-server-1", # Static
+ })
+
+ var_names = evt.get_variables()
+ stable_vars = evt.get_variables_by_classification("STABLE")
+
+ assert len(var_names) == 5
+ assert isinstance(stable_vars, list)
+
+ # Server should definitely be stable
+ # Request_id should be random
+ # Others depend on exact classifier logic
+
+ def test_classifier_with_varying_thresholds(self):
+ """Test stability classifier with different thresholds."""
+ # Create a borderline case
+ pattern = [1] * 15 + [0] * 5 + [1] * 20
+
+ strict = StabilityClassifier(segment_thresholds=[0.5, 0.2, 0.08, 0.02])
+ lenient = StabilityClassifier(segment_thresholds=[2.0, 1.5, 1.0, 0.5])
+
+ result_strict = strict.is_stable(pattern)
+ result_lenient = lenient.is_stable(pattern)
+
+ # Both should produce boolean results
+ assert isinstance(result_strict, bool)
+ assert isinstance(result_lenient, bool)
diff --git a/tests/test_detectors/test_new_value_combo_detector.py b/tests/test_detectors/test_new_value_combo_detector.py
index 7cc4f34..9bf8183 100644
--- a/tests/test_detectors/test_new_value_combo_detector.py
+++ b/tests/test_detectors/test_new_value_combo_detector.py
@@ -1,12 +1,11 @@
from detectmatelibrary.detectors.new_value_combo_detector import (
- NewValueComboDetector, ComboTooBigError, BufferMode
+ NewValueComboDetector, BufferMode, generate_detector_config
)
import detectmatelibrary.schemas as schemas
from detectmatelibrary.utils.aux import time_test_mode
-import pytest
# Set time test mode for consistent timestamps
time_test_mode()
@@ -20,7 +19,7 @@
"params": {
"comb_size": 4,
"log_variables": [{
- "id": "instanace1",
+ "id": "instance1",
"event": 1,
"template": "adsdas",
"variables": [{
@@ -166,27 +165,6 @@ def test_train_multiple_values(self):
})}
assert combos == detector.known_combos
- def test_train_too_big(self):
- parser_data = schemas.ParserSchema({
- "parserType": "test",
- "EventID": 1,
- "template": "test template",
- "variables": ["0", "assa"],
- "logID": 1,
- "parsedLogID": 1,
- "parserID": "test_parser",
- "log": "test log message",
- "logFormatVariables": {"level": "INFO"}
- })
-
- with pytest.raises(ComboTooBigError):
- detector = NewValueComboDetector(config=config, name="AllDetectorTooBig")
- detector.train(parser_data)
-
- with pytest.raises(ComboTooBigError):
- detector = NewValueComboDetector(config=config, name="MultipleDetectorTooBig")
- detector.train(parser_data)
-
class TestNewValueComboDetectorDetection:
"""Test NewValueDetector detection functionality."""
@@ -388,3 +366,196 @@ def test_detect_known_value_alert(self):
assert result
assert output.score == 1.0
+
+
+class TestNewValueComboDetectorConfiguration:
+ """Test NewValueComboDetector configuration functionality."""
+
+ def test_generate_detector_config_basic(self):
+ """Test basic config generation with single event."""
+ variable_selection = {
+ 1: ["var_0", "var_1"]
+ }
+ templates = {1: "Test template"}
+
+ config_dict = generate_detector_config(
+ variable_selection=variable_selection,
+ templates=templates,
+ detector_name="TestDetector",
+ method_type="new_value_combo_detector",
+ comb_size=2
+ )
+
+ assert "detectors" in config_dict
+ assert "TestDetector" in config_dict["detectors"]
+ detector_config = config_dict["detectors"]["TestDetector"]
+ assert detector_config["method_type"] == "new_value_combo_detector"
+ assert detector_config["params"]["comb_size"] == 2
+ assert len(detector_config["params"]["log_variables"]) == 1
+
+ def test_generate_detector_config_multiple_events(self):
+ """Test config generation with multiple events."""
+ variable_selection = {
+ 1: ["var_0", "var_1"],
+ 2: ["var_0", "var_2", "var_3"],
+ 3: ["level"]
+ }
+ templates = {1: "Template 1", 2: "Template 2", 3: "Template 3"}
+
+ config_dict = generate_detector_config(
+ variable_selection=variable_selection,
+ templates=templates,
+ detector_name="MultiEventDetector",
+ method_type="new_value_combo_detector"
+ )
+
+ assert len(config_dict["detectors"]["MultiEventDetector"]["params"]["log_variables"]) == 3
+
+ def test_configure_method_ingests_events(self):
+ """Test that configure method properly ingests events."""
+ detector = NewValueComboDetector()
+
+ # Configure with sample events
+ for event_id in [1, 2, 3]:
+ parser_data = schemas.ParserSchema({
+ "parserType": "test",
+ "EventID": event_id,
+ "template": f"Template {event_id}",
+ "variables": ["val1", "val2", "val3"],
+ "logID": event_id,
+ "parsedLogID": event_id,
+ "parserID": "test_parser",
+ "log": "test log",
+ "logFormatVariables": {"level": "INFO"}
+ })
+ detector.configure(parser_data)
+
+ # Verify events were ingested
+ events_data = detector.persistency.get_events_data()
+ assert len(events_data) == 3
+ assert 1 in events_data
+ assert 2 in events_data
+ assert 3 in events_data
+
+ def test_set_configuration_updates_config(self):
+ """Test that set_configuration properly updates detector config."""
+ detector = NewValueComboDetector()
+
+ # Configure with events that have varying stability
+ for i in range(10):
+ parser_data = schemas.ParserSchema({
+ "parserType": "test",
+ "EventID": 1,
+ "template": "Template 1",
+ "variables": ["constant", f"varying_{i}", "another_constant"],
+ "logID": i,
+ "parsedLogID": i,
+ "parserID": "test_parser",
+ "log": "test log",
+ "logFormatVariables": {"level": "INFO"}
+ })
+ detector.configure(parser_data)
+
+ # Set configuration
+ detector.set_configuration(max_combo_size=2)
+
+ # Verify config was updated
+ assert detector.config.log_variables is not None
+ assert detector.config.comb_size == 2
+
+ def test_configuration_workflow(self):
+ """Test complete configuration workflow like in notebook."""
+ detector = NewValueComboDetector()
+
+ # Step 1: Configure phase - ingest events
+ training_data = []
+ for i in range(20):
+ for event_id in [1, 2]:
+ parser_data = schemas.ParserSchema({
+ "parserType": "test",
+ "EventID": event_id,
+ "template": f"Template {event_id}",
+ "variables": ["stable_val", f"var_{i % 3}", "another_stable"],
+ "logID": len(training_data),
+ "parsedLogID": len(training_data),
+ "parserID": "test_parser",
+ "log": "test log",
+ "logFormatVariables": {"level": "INFO"}
+ })
+ training_data.append(parser_data)
+ detector.configure(parser_data)
+
+ # Step 2: Set configuration based on stable variables
+ detector.set_configuration(max_combo_size=3)
+
+ # Step 3: Train detector with configuration
+ for data in training_data:
+ detector.train(data)
+
+ # Step 4: Verify detector can detect anomalies
+ test_data = schemas.ParserSchema({
+ "parserType": "test",
+ "EventID": 1,
+ "template": "Template 1",
+ "variables": ["stable_val", "new_value", "another_stable"],
+ "logID": 999,
+ "parsedLogID": 999,
+ "parserID": "test_parser",
+ "log": "test log",
+ "logFormatVariables": {"level": "INFO"}
+ })
+ output = schemas.DetectorSchema()
+
+ # Should detect anomaly due to new combination
+ result = detector.detect(test_data, output)
+ assert isinstance(result, bool)
+
+ def test_set_configuration_with_combo_size(self):
+ """Test set_configuration respects max_combo_size parameter."""
+ detector = NewValueComboDetector()
+
+ # Configure with multiple variable events
+ for i in range(15):
+ parser_data = schemas.ParserSchema({
+ "parserType": "test",
+ "EventID": 1,
+ "template": "Multi-var template",
+ "variables": ["v1", "v2", "v3", "v4", "v5"],
+ "logID": i,
+ "parsedLogID": i,
+ "parserID": "test_parser",
+ "log": "test log",
+ "logFormatVariables": {}
+ })
+ detector.configure(parser_data)
+
+ # Set configuration with specific combo size
+ detector.set_configuration(max_combo_size=4)
+
+ # Verify comb_size was updated
+ assert detector.config.comb_size == 4
+
+ def test_configuration_with_no_stable_variables(self):
+ """Test configuration when no stable variables are found."""
+ detector = NewValueComboDetector()
+
+ # Configure with highly variable data
+ for i in range(10):
+ parser_data = schemas.ParserSchema({
+ "parserType": "test",
+ "EventID": 1,
+ "template": "Variable template",
+ "variables": [f"val_{i}_0", f"val_{i}_1"],
+ "logID": i,
+ "parsedLogID": i,
+ "parserID": "test_parser",
+ "log": "test log",
+ "logFormatVariables": {}
+ })
+ detector.configure(parser_data)
+
+ # Set configuration
+ detector.set_configuration()
+
+ # Should handle gracefully without stable variables
+ assert detector.config is not None
diff --git a/tests/test_parsers/test_json_parser.py b/tests/test_parsers/test_json_parser.py
index 74b22b3..8bab108 100644
--- a/tests/test_parsers/test_json_parser.py
+++ b/tests/test_parsers/test_json_parser.py
@@ -134,6 +134,7 @@ def test_parse_with_content_parser(self):
"method_type": "json_parser",
"timestamp_name": "time",
"content_name": "message",
+ "content_parser": "JsonMatcherParser"
},
"JsonMatcherParser": {
"auto_config": True,
diff --git a/uv.lock b/uv.lock
index caf0ecc..c9a83ad 100644
--- a/uv.lock
+++ b/uv.lock
@@ -110,7 +110,9 @@ source = { editable = "." }
dependencies = [
{ name = "drain3" },
{ name = "kafka-python" },
+ { name = "numpy" },
{ name = "pandas" },
+ { name = "polars" },
{ name = "protobuf" },
{ name = "pydantic" },
{ name = "pyyaml" },
@@ -128,7 +130,9 @@ dev = [
requires-dist = [
{ name = "drain3", specifier = ">=0.9.11" },
{ name = "kafka-python", specifier = ">=2.3.0" },
+ { name = "numpy", specifier = ">=2.3.2" },
{ name = "pandas", specifier = ">=2.3.2" },
+ { name = "polars", specifier = ">=1.36.1" },
{ name = "prek", marker = "extra == 'dev'", specifier = ">=0.2.8" },
{ name = "protobuf", specifier = ">=6.32.1" },
{ name = "pydantic", specifier = ">=2.11.7" },
@@ -291,6 +295,32 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
+[[package]]
+name = "polars"
+version = "1.36.1"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "polars-runtime-32" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/9f/dc/56f2a90c79a2cb13f9e956eab6385effe54216ae7a2068b3a6406bae4345/polars-1.36.1.tar.gz", hash = "sha256:12c7616a2305559144711ab73eaa18814f7aa898c522e7645014b68f1432d54c", size = 711993, upload-time = "2025-12-10T01:14:53.033Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/f6/c6/36a1b874036b49893ecae0ac44a2f63d1a76e6212631a5b2f50a86e0e8af/polars-1.36.1-py3-none-any.whl", hash = "sha256:853c1bbb237add6a5f6d133c15094a9b727d66dd6a4eb91dbb07cdb056b2b8ef", size = 802429, upload-time = "2025-12-10T01:13:53.838Z" },
+]
+
+[[package]]
+name = "polars-runtime-32"
+version = "1.36.1"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/31/df/597c0ef5eb8d761a16d72327846599b57c5d40d7f9e74306fc154aba8c37/polars_runtime_32-1.36.1.tar.gz", hash = "sha256:201c2cfd80ceb5d5cd7b63085b5fd08d6ae6554f922bcb941035e39638528a09", size = 2788751, upload-time = "2025-12-10T01:14:54.172Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/e1/ea/871129a2d296966c0925b078a9a93c6c5e7facb1c5eebfcd3d5811aeddc1/polars_runtime_32-1.36.1-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:327b621ca82594f277751f7e23d4b939ebd1be18d54b4cdf7a2f8406cecc18b2", size = 43494311, upload-time = "2025-12-10T01:13:56.096Z" },
+ { url = "https://files.pythonhosted.org/packages/d8/76/0038210ad1e526ce5bb2933b13760d6b986b3045eccc1338e661bd656f77/polars_runtime_32-1.36.1-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:ab0d1f23084afee2b97de8c37aa3e02ec3569749ae39571bd89e7a8b11ae9e83", size = 39300602, upload-time = "2025-12-10T01:13:59.366Z" },
+ { url = "https://files.pythonhosted.org/packages/54/1e/2707bee75a780a953a77a2c59829ee90ef55708f02fc4add761c579bf76e/polars_runtime_32-1.36.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:899b9ad2e47ceb31eb157f27a09dbc2047efbf4969a923a6b1ba7f0412c3e64c", size = 44511780, upload-time = "2025-12-10T01:14:02.285Z" },
+ { url = "https://files.pythonhosted.org/packages/11/b2/3fede95feee441be64b4bcb32444679a8fbb7a453a10251583053f6efe52/polars_runtime_32-1.36.1-cp39-abi3-manylinux_2_24_aarch64.whl", hash = "sha256:d9d077bb9df711bc635a86540df48242bb91975b353e53ef261c6fae6cb0948f", size = 40688448, upload-time = "2025-12-10T01:14:05.131Z" },
+ { url = "https://files.pythonhosted.org/packages/05/0f/e629713a72999939b7b4bfdbf030a32794db588b04fdf3dc977dd8ea6c53/polars_runtime_32-1.36.1-cp39-abi3-win_amd64.whl", hash = "sha256:cc17101f28c9a169ff8b5b8d4977a3683cd403621841623825525f440b564cf0", size = 44464898, upload-time = "2025-12-10T01:14:08.296Z" },
+ { url = "https://files.pythonhosted.org/packages/d1/d8/a12e6aa14f63784cead437083319ec7cece0d5bb9a5bfe7678cc6578b52a/polars_runtime_32-1.36.1-cp39-abi3-win_arm64.whl", hash = "sha256:809e73857be71250141225ddd5d2b30c97e6340aeaa0d445f930e01bef6888dc", size = 39798896, upload-time = "2025-12-10T01:14:11.568Z" },
+]
+
[[package]]
name = "prek"
version = "0.2.10"