From 69fb1f5925c939a576655fa9a6f15988e2eac69f Mon Sep 17 00:00:00 2001 From: Quentin RILLET Date: Tue, 29 Apr 2025 13:22:55 +0200 Subject: [PATCH] feat: add kafka healthcheck --- health_check/contrib/kafka/__init__.py | 4 ++ health_check/contrib/kafka/apps.py | 12 ++++++ health_check/contrib/kafka/backends.py | 58 ++++++++++++++++++++++++++ tests/test_kafka.py | 56 +++++++++++++++++++++++++ 4 files changed, 130 insertions(+) create mode 100644 health_check/contrib/kafka/__init__.py create mode 100644 health_check/contrib/kafka/apps.py create mode 100644 health_check/contrib/kafka/backends.py create mode 100644 tests/test_kafka.py diff --git a/health_check/contrib/kafka/__init__.py b/health_check/contrib/kafka/__init__.py new file mode 100644 index 00000000..00a83d4b --- /dev/null +++ b/health_check/contrib/kafka/__init__.py @@ -0,0 +1,4 @@ +import django + +if django.VERSION < (3, 2): + default_app_config = "health_check.contrib.kafka.apps.HealthCheckConfig" diff --git a/health_check/contrib/kafka/apps.py b/health_check/contrib/kafka/apps.py new file mode 100644 index 00000000..eb18081e --- /dev/null +++ b/health_check/contrib/kafka/apps.py @@ -0,0 +1,12 @@ +from django.apps import AppConfig + +from health_check.plugins import plugin_dir + + +class HealthCheckConfig(AppConfig): + name = "health_check.contrib.kafka" + + def ready(self): + from .backends import KafkaHealthCheck + + plugin_dir.register(KafkaHealthCheck) diff --git a/health_check/contrib/kafka/backends.py b/health_check/contrib/kafka/backends.py new file mode 100644 index 00000000..3f108837 --- /dev/null +++ b/health_check/contrib/kafka/backends.py @@ -0,0 +1,58 @@ +import logging +import importlib + +from django.conf import settings + +from health_check.backends import BaseHealthCheckBackend +from health_check.exceptions import ServiceUnavailable + +logger = logging.getLogger(__name__) + + +try: + kafka_module = importlib.import_module("kafka") +except ImportError: + kafka_module = None + +if not kafka_module: + raise ImportError( + "No kafka-python or kafka-python-ng library found. Please install one of them." + ) + +KafkaAdminClient = getattr(kafka_module, "KafkaAdminClient", None) +KafkaError = getattr(importlib.import_module("kafka.errors"), "KafkaError", None) + +if not KafkaAdminClient or not KafkaError: + raise ImportError( + "KafkaAdminClient or KafkaError not available. Please check your installations." + ) + + +class KafkaHealthCheck(BaseHealthCheckBackend): + """Health check for Kafka.""" + + namespace = None + + def check_status(self): + """Check Kafka service by opening and closing a broker channel.""" + logger.debug("Checking for a KAFKA_URL on django settings...") + + bootstrap_servers = getattr(settings, "KAFKA_URL", None) + + logger.debug( + "Got %s as the kafka_url. Connecting to kafka...", bootstrap_servers + ) + + logger.debug("Attempting to connect to kafka...") + try: + admin_client = KafkaAdminClient( + bootstrap_servers=bootstrap_servers, + request_timeout_ms=3000, # 3 secondes max + api_version_auto_timeout_ms=1000, + ) + # Ping léger : on liste les topics (lecture metadata) + admin_client.list_topics() + except KafkaError as e: + self.add_error(ServiceUnavailable("Unknown error"), e) + else: + logger.debug("Connection established. Kafka is healthy.") diff --git a/tests/test_kafka.py b/tests/test_kafka.py new file mode 100644 index 00000000..ee61902d --- /dev/null +++ b/tests/test_kafka.py @@ -0,0 +1,56 @@ +from unittest import mock + +from kafka.errors import KafkaError + +from health_check.contrib.kafka.backends import KafkaHealthCheck + + +class TestKafkaHealthCheck: + """Test Kafka health check.""" + + @mock.patch("health_check.contrib.kafka.backends.getattr") + @mock.patch("health_check.contrib.kafka.backends.Connection") + def test_broker_refused_connection(self, mocked_connection, mocked_getattr): + """Test when the connection to Kafka is refused.""" + mocked_getattr.return_value = "KAFKA_URL" + + conn_exception = ConnectionRefusedError("Refused connection") + + # mock returns + mocked_conn = mock.MagicMock() + mocked_connection.return_value.__enter__.return_value = mocked_conn + mocked_conn.connect.side_effect = conn_exception + + # instantiates the class + kafka_healthchecker = KafkaHealthCheck() + + # invokes the method check_status() + kafka_healthchecker.check_status() + assert len(kafka_healthchecker.errors), 1 + + # mock assertions + mocked_connection.assert_called_once_with("KAFKA_URL") + + @mock.patch("health_check.contrib.kafka.backends.getattr") + @mock.patch("health_check.contrib.kafka.backends.Connection") + def test_broker_auth_error(self, mocked_connection, mocked_getattr): + """Test that the connection to Kafka has an authentication error.""" + mocked_getattr.return_value = "KAFKA_URL" + + conn_exception = KafkaError("Refused connection") + + # mock returns + mocked_conn = mock.MagicMock() + mocked_connection.return_value.__enter__.return_value = mocked_conn + mocked_conn.connect.side_effect = conn_exception + + # instantiates the class + rabbitmq_healthchecker = KafkaHealthCheck() + + # invokes the method check_status() + rabbitmq_healthchecker.check_status() + assert len(rabbitmq_healthchecker.errors), 1 + + # mock assertions + mocked_connection.assert_called_once_with("KAFKA_URL") +