diff --git a/README.md b/README.md index a282269..233054b 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,12 @@ The library contains the next components: * **Readers**: insert logs into the system. * **Parsers**: parse the logs receive from the reader. * **Detectors**: return alerts if anomalies are detected. +* **Outputs**: return alerts as outputs. * **Schemas**: standard data classes use in DetectMate. ``` -+---------+ +--------+ +-----------+ -| Reader | --> | Parser | --> | Detector | -+---------+ +--------+ +-----------+ ++---------+ +--------+ +-----------+ +--------+ +| Reader | --> | Parser | --> | Detector | --> | Output | ++---------+ +--------+ +-----------+ +--------+ ``` ## Developer setup: diff --git a/pyproject.toml b/pyproject.toml index 7322744..db6d0a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "pydantic>=2.11.7", "pyyaml>=6.0.3", "regex>=2025.11.3", + "kafka-python>=2.3.0", ] [project.optional-dependencies] diff --git a/src/detectmatelibrary/common/detector.py b/src/detectmatelibrary/common/detector.py index 876013c..1592d54 100644 --- a/src/detectmatelibrary/common/detector.py +++ b/src/detectmatelibrary/common/detector.py @@ -12,10 +12,14 @@ def _extract_timestamp( input_: List[ParserSchema] | ParserSchema ) -> List[int]: + def format_time(time: str) -> int: + time_ = time.split(":")[0] + return int(float(time_)) + if not isinstance(input_, list): input_ = [input_] - return [int(float(i["logFormatVariables"]["Time"])) for i in input_] + return [format_time(i["logFormatVariables"]["Time"]) for i in input_] def _extract_logIDs( diff --git a/src/detectmatelibrary/readers/kafka_consumer.py b/src/detectmatelibrary/readers/kafka_consumer.py new file mode 100644 index 0000000..3c77eb7 --- /dev/null +++ b/src/detectmatelibrary/readers/kafka_consumer.py @@ -0,0 +1,72 @@ +from detectmatelibrary.common.reader import CoreReaderConfig, CoreReader + +from detectmatelibrary import schemas + +from kafka import KafkaConsumer +from typing import Optional +import threading + + +class KafkaConfig(CoreReaderConfig): + file: str = "" + method_type: str = "log_file_reader" + + server: str = "" + topic: str = "" + group_id: str = "" + + +def kafka_consumer(config: KafkaConfig, log_pipe: list[str]) -> None: + consumer = KafkaConsumer( + config.topic, + bootstrap_servers=config.server, + group_id=config.group_id, + auto_offset_reset="earliest", + enable_auto_commit=True, + consumer_timeout_ms=1000 + ) + + try: + while True: + for msg in consumer: + log_pipe.append(msg.value.decode("utf-8")) + except KeyboardInterrupt: + print("shutting down consumer") + finally: + consumer.close() + + +class KafkaReader(CoreReader): + def __init__( + self, + name: str = "Kafka_reader", + config: Optional[KafkaConfig | dict] = KafkaConfig(), # type: ignore + ) -> None: + + if isinstance(config, dict): + config = KafkaConfig.from_dict(config, name) + + super().__init__(name=name, config=config) + self._log_pipe: list[str] = [] + self._init_consumer() + + def _init_consumer(self) -> None: + cfg = self.config + pipe = self._log_pipe + + thread = threading.Thread( + target=kafka_consumer, + args=(cfg, pipe), + daemon=True, + name=f"kafka_consumer_{self.name}" + ) + thread.start() + self._consumer_thread = thread + + def read(self, output_: schemas.LogSchema) -> bool: + if len(self._log_pipe) == 0: + return False + + output_["log"] = self._log_pipe.pop(0) + + return True diff --git a/tests/test_common/test_core_detector.py b/tests/test_common/test_core_detector.py index c9cb821..992af74 100644 --- a/tests/test_common/test_core_detector.py +++ b/tests/test_common/test_core_detector.py @@ -72,7 +72,7 @@ def detect(self, input_, output_): "parsedLogID": 22, "parserID": "test", "log": "This is a parsed log.", - "logFormatVariables": {"Time": "12121.12"}, + "logFormatVariables": {"Time": "12121.12:20"}, } diff --git a/tests/test_readers/start_kafka/README.md b/tests/test_readers/start_kafka/README.md new file mode 100644 index 0000000..1d4e9a3 --- /dev/null +++ b/tests/test_readers/start_kafka/README.md @@ -0,0 +1,27 @@ +# Kafka Unitests + +To run the kafka reader unit tests follow the next steps. + +## Step 1: Initialize Kafka server +To start the server, we will use docker compose. +```bash +cd tests/test_readers/start_kafka + +docker compose up +``` +Check if is running in the UI using **http://localhost:8080/**. + +## Step 2: Add messages +If and only if is the first time the kafka server was run, do the next command: +```bash +uv run add_messages.py +``` + +Check in the UI using **http://localhost:8080/** if the messages were added. + + +## Step 3: Remove the ignore test +Temporaly remove the **@pytest.mark.skip** from the kafka unittests in **test_reader_kafka.py**. + +## Step 4: Run test +Run tests as normal pytests. diff --git a/tests/test_readers/start_kafka/add_messages.py b/tests/test_readers/start_kafka/add_messages.py new file mode 100644 index 0000000..1954903 --- /dev/null +++ b/tests/test_readers/start_kafka/add_messages.py @@ -0,0 +1,15 @@ +from kafka import KafkaProducer + +producer = KafkaProducer( + bootstrap_servers="localhost:9092", + acks="all", + retries=5, + ) + + +for msg in [b"hello1", b"hello2", b"hello3"]: + future = producer.send("test_topic", msg) + try: + record_metadata = future.get(timeout=10) + except Exception as e: + print("send failed:", e) diff --git a/tests/test_readers/start_kafka/compose.yaml b/tests/test_readers/start_kafka/compose.yaml new file mode 100644 index 0000000..21deac3 --- /dev/null +++ b/tests/test_readers/start_kafka/compose.yaml @@ -0,0 +1,33 @@ +services: + kafka: + image: apache/kafka-native + ports: + - "9092:9092" + environment: + # Configure listeners for both docker and host communication + KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT + + # Settings required for KRaft mode + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091 + + # Listener to use for broker-to-broker communication + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER + + # Required for a single node cluster + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + kafka-ui: + image: kafbat/kafka-ui:main + ports: + - 8080:8080 + environment: + DYNAMIC_CONFIG_ENABLED: "true" + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093 + depends_on: + - kafka diff --git a/tests/test_readers/test_reader_kafka.py b/tests/test_readers/test_reader_kafka.py new file mode 100644 index 0000000..cbf31a3 --- /dev/null +++ b/tests/test_readers/test_reader_kafka.py @@ -0,0 +1,27 @@ +from detectmatelibrary.readers.kafka_consumer import KafkaConfig, KafkaReader + +import pytest +import time + + +class TestCaseKafka: + @pytest.mark.skip(reason="Only run when the kafka server is running") + def test_normal(self) -> None: + config = KafkaConfig( + server="localhost:9092", topic="test_topic", group_id="b" + ) + reader = KafkaReader(config=config) + + time.sleep(5) + assert len(reader._log_pipe) > 0, "Pipeline is empty" + + log1 = reader.process(as_bytes=False) + assert log1.log == "hello1" + + log2 = reader.process(as_bytes=False) + assert log2.log == "hello2" + + log3 = reader.process(as_bytes=False) + assert log3.log == "hello3" + + assert reader.process(as_bytes=False) is None diff --git a/tests/test_readers/test_reader_basic.py b/tests/test_readers/test_reader_log_file.py similarity index 100% rename from tests/test_readers/test_reader_basic.py rename to tests/test_readers/test_reader_log_file.py diff --git a/uv.lock b/uv.lock index 4d7cd6f..caf0ecc 100644 --- a/uv.lock +++ b/uv.lock @@ -109,6 +109,7 @@ version = "0.1.0" source = { editable = "." } dependencies = [ { name = "drain3" }, + { name = "kafka-python" }, { name = "pandas" }, { name = "protobuf" }, { name = "pydantic" }, @@ -126,6 +127,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "drain3", specifier = ">=0.9.11" }, + { name = "kafka-python", specifier = ">=2.3.0" }, { name = "pandas", specifier = ">=2.3.2" }, { name = "prek", marker = "extra == 'dev'", specifier = ">=0.2.8" }, { name = "protobuf", specifier = ">=6.32.1" }, @@ -165,6 +167,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/77/a7/c2f527ddce3155ae9e008385963c2325cbfd52969f8b38efa2723e2af4af/jsonpickle-1.5.1-py2.py3-none-any.whl", hash = "sha256:8eb8323f0e12cb40687f0445e2115d8165901e20ac670add55bb53a95c68c0e5", size = 37124, upload-time = "2021-01-31T05:57:12.256Z" }, ] +[[package]] +name = "kafka-python" +version = "2.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/27/5c/d3b6d93ed625d2cb0265e6fe0f507be544f6edde577c1b118f42566389bf/kafka_python-2.3.0.tar.gz", hash = "sha256:de65b596d26b5c894f227c35c7d29a65cea8f8a1c4f0f2b4e3e2ea60d503acc8", size = 358391, upload-time = "2025-11-21T00:47:34.078Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4a/db/694fd552295ed091e7418d02b6268ee36092d4c93211136c448fe061fe32/kafka_python-2.3.0-py2.py3-none-any.whl", hash = "sha256:831ba6dff28144d0f1145c874d391f3ebb3c2c3e940cc78d74e83f0183497c98", size = 326260, upload-time = "2025-11-21T00:47:32.561Z" }, +] + [[package]] name = "numpy" version = "2.3.2"