Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ The library contains the next components:
* **Readers**: insert logs into the system.
* **Parsers**: parse the logs receive from the reader.
* **Detectors**: return alerts if anomalies are detected.
* **Outputs**: return alerts as outputs.
* **Schemas**: standard data classes use in DetectMate.
```
+---------+ +--------+ +-----------+
| Reader | --> | Parser | --> | Detector |
+---------+ +--------+ +-----------+
+---------+ +--------+ +-----------+ +--------+
| Reader | --> | Parser | --> | Detector | --> | Output |
+---------+ +--------+ +-----------+ +--------+
```
## Developer setup:

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"pydantic>=2.11.7",
"pyyaml>=6.0.3",
"regex>=2025.11.3",
"kafka-python>=2.3.0",
]

[project.optional-dependencies]
Expand Down
6 changes: 5 additions & 1 deletion src/detectmatelibrary/common/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
def _extract_timestamp(
input_: List[ParserSchema] | ParserSchema
) -> List[int]:
def format_time(time: str) -> int:
time_ = time.split(":")[0]
return int(float(time_))

if not isinstance(input_, list):
input_ = [input_]

return [int(float(i["logFormatVariables"]["Time"])) for i in input_]
return [format_time(i["logFormatVariables"]["Time"]) for i in input_]


def _extract_logIDs(
Expand Down
72 changes: 72 additions & 0 deletions src/detectmatelibrary/readers/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from detectmatelibrary.common.reader import CoreReaderConfig, CoreReader

from detectmatelibrary import schemas

from kafka import KafkaConsumer
from typing import Optional
import threading


class KafkaConfig(CoreReaderConfig):
file: str = "<PLACEHOLDER>"
method_type: str = "log_file_reader"

server: str = "<PLACEHOLDER>"
topic: str = "<PLACEHOLDER>"
group_id: str = "<PLACEHOLDER>"


def kafka_consumer(config: KafkaConfig, log_pipe: list[str]) -> None:
consumer = KafkaConsumer(
config.topic,
bootstrap_servers=config.server,
group_id=config.group_id,
auto_offset_reset="earliest",
enable_auto_commit=True,
consumer_timeout_ms=1000
)

try:
while True:
for msg in consumer:
log_pipe.append(msg.value.decode("utf-8"))
except KeyboardInterrupt:
print("shutting down consumer")
finally:
consumer.close()


class KafkaReader(CoreReader):
def __init__(
self,
name: str = "Kafka_reader",
config: Optional[KafkaConfig | dict] = KafkaConfig(), # type: ignore
) -> None:

if isinstance(config, dict):
config = KafkaConfig.from_dict(config, name)

super().__init__(name=name, config=config)
self._log_pipe: list[str] = []
self._init_consumer()

def _init_consumer(self) -> None:
cfg = self.config
pipe = self._log_pipe

thread = threading.Thread(
target=kafka_consumer,
args=(cfg, pipe),
daemon=True,
name=f"kafka_consumer_{self.name}"
)
thread.start()
self._consumer_thread = thread

def read(self, output_: schemas.LogSchema) -> bool:
if len(self._log_pipe) == 0:
return False

output_["log"] = self._log_pipe.pop(0)

return True
2 changes: 1 addition & 1 deletion tests/test_common/test_core_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def detect(self, input_, output_):
"parsedLogID": 22,
"parserID": "test",
"log": "This is a parsed log.",
"logFormatVariables": {"Time": "12121.12"},
"logFormatVariables": {"Time": "12121.12:20"},
}


Expand Down
27 changes: 27 additions & 0 deletions tests/test_readers/start_kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Kafka Unitests

To run the kafka reader unit tests follow the next steps.

## Step 1: Initialize Kafka server
To start the server, we will use docker compose.
```bash
cd tests/test_readers/start_kafka

docker compose up
```
Check if is running in the UI using **http://localhost:8080/**.

## Step 2: Add messages
If and only if is the first time the kafka server was run, do the next command:
```bash
uv run add_messages.py
```

Check in the UI using **http://localhost:8080/** if the messages were added.


## Step 3: Remove the ignore test
Temporaly remove the **@pytest.mark.skip** from the kafka unittests in **test_reader_kafka.py**.

## Step 4: Run test
Run tests as normal pytests.
15 changes: 15 additions & 0 deletions tests/test_readers/start_kafka/add_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from kafka import KafkaProducer

producer = KafkaProducer(
bootstrap_servers="localhost:9092",
acks="all",
retries=5,
)


for msg in [b"hello1", b"hello2", b"hello3"]:
future = producer.send("test_topic", msg)
try:
record_metadata = future.get(timeout=10)
except Exception as e:
print("send failed:", e)
33 changes: 33 additions & 0 deletions tests/test_readers/start_kafka/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
services:
kafka:
image: apache/kafka-native
ports:
- "9092:9092"
environment:
# Configure listeners for both docker and host communication
KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT

# Settings required for KRaft mode
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091

# Listener to use for broker-to-broker communication
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER

# Required for a single node cluster
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

kafka-ui:
image: kafbat/kafka-ui:main
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: "true"
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
depends_on:
- kafka
27 changes: 27 additions & 0 deletions tests/test_readers/test_reader_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from detectmatelibrary.readers.kafka_consumer import KafkaConfig, KafkaReader

import pytest
import time


class TestCaseKafka:
@pytest.mark.skip(reason="Only run when the kafka server is running")
def test_normal(self) -> None:
config = KafkaConfig(
server="localhost:9092", topic="test_topic", group_id="b"
)
reader = KafkaReader(config=config)

time.sleep(5)
assert len(reader._log_pipe) > 0, "Pipeline is empty"

log1 = reader.process(as_bytes=False)
assert log1.log == "hello1"

log2 = reader.process(as_bytes=False)
assert log2.log == "hello2"

log3 = reader.process(as_bytes=False)
assert log3.log == "hello3"

assert reader.process(as_bytes=False) is None
11 changes: 11 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.