From d5fd44d1094ef6d3497cd2b1eba4bbb2e1ea7a7f Mon Sep 17 00:00:00 2001 From: Dimosthenis Schizas Date: Thu, 2 Apr 2026 03:28:14 +0300 Subject: [PATCH] feat(otel): add OpenTelemetry base instrumentation - Add opentelemetry-api, opentelemetry-sdk, opentelemetry-instrumentation-fastapi, and opentelemetry-exporter-otlp-proto-http as dependencies - Use FastAPIInstrumentor for automatic HTTP metrics and traces - Add manual spans on SearchProvider operations via __init_subclass__, providing consistent tracing for both Elasticsearch and OpenSearch backends - Use stable OTEL semantic conventions (db.system.name, db.operation.name) - No manual TracerProvider setup; users configure OTEL entirely via standard env vars and opentelemetry-instrument CLI Ref: opensanctions/yente#1079 --- pyproject.toml | 4 +++ tests/test_otel.py | 66 ++++++++++++++++++++++++++++++++++++ yente/app.py | 4 +++ yente/provider/base.py | 27 ++++++++++++++- yente/provider/elastic.py | 14 +++++++- yente/provider/opensearch.py | 14 +++++++- 6 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 tests/test_otel.py diff --git a/pyproject.toml b/pyproject.toml index a87ad1f7..ca33ec0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,10 @@ dependencies = [ "aiocsv == 1.4.0", "aiofiles == 25.1.0", "rich", + "opentelemetry-api >= 1.40.0, < 2.0.0", + "opentelemetry-sdk >= 1.40.0, < 2.0.0", + "opentelemetry-instrumentation-fastapi >= 0.61b0, < 1.0.0", + "opentelemetry-exporter-otlp-proto-http >= 1.40.0, < 2.0.0", ] requires-python = ">=3.11,<=3.14" diff --git a/tests/test_otel.py b/tests/test_otel.py new file mode 100644 index 00000000..15777bdf --- /dev/null +++ b/tests/test_otel.py @@ -0,0 +1,66 @@ +import pytest +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from .conftest import client + +_exporter = InMemorySpanExporter() +_provider = TracerProvider() +_provider.add_span_processor(SimpleSpanProcessor(_exporter)) +trace.set_tracer_provider(_provider) + + +@pytest.fixture(autouse=True) +def span_exporter(): + """Provide the in-memory span exporter, clearing spans between tests.""" + _exporter.clear() + yield _exporter + + +def test_fastapi_instrumentation_creates_spans(span_exporter): + """Verify that HTTP requests produce OTEL spans.""" + response = client.get("/healthz") + assert response.status_code == 200 + + spans = span_exporter.get_finished_spans() + http_spans = [s for s in spans if "healthz" in s.name or "GET" in s.name] + assert len(http_spans) >= 1, f"Expected HTTP span, got: {[s.name for s in spans]}" + + +@pytest.mark.asyncio +async def test_search_provider_creates_spans(search_provider, span_exporter): + """Verify that SearchProvider operations produce OTEL spans.""" + from yente import settings + + result = await search_provider.search( + index=settings.ENTITY_INDEX, + query={"match_all": {}}, + size=1, + ) + assert result is not None + + spans = span_exporter.get_finished_spans() + search_spans = [s for s in spans if s.name == "SearchProvider.search"] + assert ( + len(search_spans) == 1 + ), f"Expected 1 search span, got: {[s.name for s in spans]}" + + span = search_spans[0] + assert span.attributes["db.system.name"] in ("elasticsearch", "opensearch") + assert span.attributes["db.operation.name"] == "search" + + +@pytest.mark.asyncio +async def test_search_provider_records_errors_in_spans(search_provider, span_exporter): + """Verify that SearchProvider errors are recorded in OTEL spans.""" + from yente.exc import YenteNotFoundError + + fake_index = "nonexistent-index-otel-test" + with pytest.raises((YenteNotFoundError, Exception)): + await search_provider.refresh(fake_index) + + spans = span_exporter.get_finished_spans() + error_spans = [s for s in spans if s.status.status_code == trace.StatusCode.ERROR] + assert len(error_spans) >= 1, f"Expected error span, got: {[s.name for s in spans]}" diff --git a/yente/app.py b/yente/app.py index b5860507..257a1d9d 100644 --- a/yente/app.py +++ b/yente/app.py @@ -19,6 +19,7 @@ from yente.search.indexer import update_index_threaded from yente.provider import close_provider from yente.middleware import RequestLogMiddleware, TraceContextMiddleware +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor log = get_logger("yente") ExceptionHandler = Callable[[Request, Any], Coroutine[Any, Any, Response]] @@ -137,4 +138,7 @@ def create_app() -> FastAPI: app.include_router(search.router) app.include_router(reconcile.router) app.include_router(admin.router) + + FastAPIInstrumentor.instrument_app(app) + return app diff --git a/yente/provider/base.py b/yente/provider/base.py index 93be9c3c..9ffb13db 100644 --- a/yente/provider/base.py +++ b/yente/provider/base.py @@ -1,4 +1,29 @@ -from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Union +import functools +from typing import Any, AsyncIterable, Callable, Dict, Iterable, List, Optional, Union + +from opentelemetry import trace + +from yente import settings + +_tracer = trace.get_tracer("yente.provider") + + +def traced(method: Callable[..., Any]) -> Callable[..., Any]: + """Decorator that wraps a SearchProvider method with an OTEL span.""" + name = method.__name__ + + @_tracer.start_as_current_span(f"SearchProvider.{name}") + @functools.wraps(method) + async def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: + span = trace.get_current_span() + span.set_attribute( + "db.system.name", + "opensearch" if settings.INDEX_TYPE == "opensearch" else "elasticsearch", + ) + span.set_attribute("db.operation.name", name) + return await method(self, *args, **kwargs) + + return wrapper class SearchProvider(object): diff --git a/yente/provider/elastic.py b/yente/provider/elastic.py index c92bb967..306e998c 100644 --- a/yente/provider/elastic.py +++ b/yente/provider/elastic.py @@ -10,7 +10,7 @@ from yente import settings from yente.exc import IndexNotReadyError, YenteIndexError, YenteNotFoundError from yente.logs import get_logger -from yente.provider.base import SearchProvider +from yente.provider.base import SearchProvider, traced from yente.middleware.trace_context import get_trace_context log = get_logger(__name__) @@ -72,6 +72,7 @@ def client(self, **kwargs: Any) -> AsyncElasticsearch: async def close(self) -> None: await self._client.close() + @traced async def refresh(self, index: str) -> None: """Refresh the index to make changes visible.""" try: @@ -79,11 +80,13 @@ async def refresh(self, index: str) -> None: except NotFoundError as nfe: raise YenteNotFoundError(f"Index {index} does not exist.") from nfe + @traced async def get_all_indices(self) -> List[str]: """Get a list of all indices in the ElasticSearch cluster.""" indices: Any = await self.client().cat.indices(format="json") return [index.get("index") for index in indices] + @traced async def get_alias_indices(self, alias: str) -> List[str]: """Get a list of indices that are aliased to the entity query alias.""" try: @@ -94,6 +97,7 @@ async def get_alias_indices(self, alias: str) -> List[str]: except (ApiError, TransportError) as te: raise YenteIndexError(f"Could not get alias indices: {te}") from te + @traced async def rollover_index(self, alias: str, next_index: str, prefix: str) -> None: """Remove all existing indices with a given prefix from the alias and add the new one.""" @@ -105,6 +109,7 @@ async def rollover_index(self, alias: str, next_index: str, prefix: str) -> None except (ApiError, TransportError) as te: raise YenteIndexError(f"Could not rollover index: {te}") from te + @traced async def clone_index(self, base_version: str, target_version: str) -> None: """Create a copy of the index with the given name.""" if base_version == target_version: @@ -140,6 +145,7 @@ async def clone_index(self, base_version: str, target_version: str) -> None: msg = f"Could not clone index {base_version} to {target_version}: {te}" raise YenteIndexError(msg) from te + @traced async def create_index( self, index: str, mappings: Dict[str, Any], settings: Dict[str, Any] ) -> None: @@ -156,6 +162,7 @@ async def create_index( return raise YenteIndexError(f"Could not create index: {exc}") from exc + @traced async def delete_index(self, index: str) -> None: """Delete a given index if it exists.""" try: @@ -165,6 +172,7 @@ async def delete_index(self, index: str) -> None: except (ApiError, TransportError) as te: raise YenteIndexError(f"Could not delete index: {te}") from te + @traced async def exists_index_alias(self, alias: str, index: str) -> bool: """Check if an index exists and is linked into the given alias.""" try: @@ -175,6 +183,7 @@ async def exists_index_alias(self, alias: str, index: str) -> bool: except (ApiError, TransportError) as te: raise YenteIndexError(f"Could not check index alias: {te}") from te + @traced async def check_health(self, index: str) -> bool: try: health = await self.client(request_timeout=5).cluster.health( @@ -187,6 +196,7 @@ async def check_health(self, index: str) -> bool: log.error(f"Search status failure: {te}") return False + @traced async def search( self, index: str, @@ -247,6 +257,7 @@ async def search( msg = f"Error during search: {str(exc)}" raise YenteIndexError(msg, status=500) from exc + @traced async def get_document(self, index: str, doc_id: str) -> Optional[Dict[str, Any]]: """Get a document by ID using the GET API. @@ -260,6 +271,7 @@ async def get_document(self, index: str, doc_id: str) -> Optional[Dict[str, Any] except Exception as exc: raise YenteIndexError(f"Error getting document: {exc}") from exc + @traced async def bulk_index( self, actions: Union[Iterable[Dict[str, Any]], AsyncIterable[Dict[str, Any]]] ) -> None: diff --git a/yente/provider/opensearch.py b/yente/provider/opensearch.py index 26a5c8c8..957515e9 100644 --- a/yente/provider/opensearch.py +++ b/yente/provider/opensearch.py @@ -10,7 +10,7 @@ from yente import settings from yente.exc import IndexNotReadyError, YenteIndexError, YenteNotFoundError from yente.logs import get_logger -from yente.provider.base import SearchProvider +from yente.provider.base import SearchProvider, traced log = get_logger(__name__) logging.getLogger("opensearch").setLevel(logging.ERROR) @@ -81,6 +81,7 @@ def __init__( async def close(self) -> None: await self.client.close() + @traced async def refresh(self, index: str) -> None: """Refresh the index to make changes visible.""" if self.service_type == OpenSearchServiceType.AOSS: @@ -93,11 +94,13 @@ async def refresh(self, index: str) -> None: except NotFoundError as nfe: raise YenteNotFoundError(f"Index {index} does not exist.") from nfe + @traced async def get_all_indices(self) -> List[str]: """Get a list of all indices in the ElasticSearch cluster.""" indices: Any = await self.client.cat.indices(format="json") return [index.get("index") for index in indices] + @traced async def get_alias_indices(self, alias: str) -> List[str]: """Get a list of indices that are aliased to the entity query alias.""" try: @@ -108,6 +111,7 @@ async def get_alias_indices(self, alias: str) -> List[str]: except TransportError as te: raise YenteIndexError(f"Could not get alias indices: {te}") from te + @traced async def rollover_index(self, alias: str, next_index: str, prefix: str) -> None: """Remove all existing indices with a given prefix from the alias and add the new one.""" @@ -122,6 +126,7 @@ async def rollover_index(self, alias: str, next_index: str, prefix: str) -> None except TransportError as te: raise YenteIndexError(f"Could not rollover index: {te}") from te + @traced async def clone_index(self, base_version: str, target_version: str) -> None: """Create a copy of the index with the given name.""" if base_version == target_version: @@ -157,6 +162,7 @@ async def clone_index(self, base_version: str, target_version: str) -> None: msg = f"Could not clone index {base_version} to {target_version}: {te}" raise YenteIndexError(msg) from te + @traced async def create_index( self, index: str, mappings: Dict[str, Any], settings: Dict[str, Any] ) -> None: @@ -173,6 +179,7 @@ async def create_index( return raise YenteIndexError(f"Could not create index: {exc}") from exc + @traced async def delete_index(self, index: str) -> None: """Delete a given index if it exists.""" try: @@ -182,6 +189,7 @@ async def delete_index(self, index: str) -> None: except TransportError as te: raise YenteIndexError(f"Could not delete index: {te}") from te + @traced async def exists_index_alias(self, alias: str, index: str) -> bool: """Check if an index exists and is linked into the given alias.""" try: @@ -192,6 +200,7 @@ async def exists_index_alias(self, alias: str, index: str) -> bool: except TransportError as te: raise YenteIndexError(f"Could not check index alias: {te}") from te + @traced async def check_health(self, index: str) -> bool: try: health = await self.client.cluster.health(index=index, timeout=5) @@ -202,6 +211,7 @@ async def check_health(self, index: str) -> bool: log.error(f"Search status failure: {te}") return False + @traced async def search( self, index: str, @@ -260,6 +270,7 @@ async def search( msg = f"Error during search: {str(exc)}" raise YenteIndexError(msg, status=500) from exc + @traced async def get_document(self, index: str, doc_id: str) -> Optional[Dict[str, Any]]: """Get a document by ID using the GET API. @@ -273,6 +284,7 @@ async def get_document(self, index: str, doc_id: str) -> Optional[Dict[str, Any] except Exception as exc: raise YenteIndexError(f"Error getting document: {exc}") from exc + @traced async def bulk_index( self, actions: Union[Iterable[Dict[str, Any]], AsyncIterable[Dict[str, Any]]] ) -> None: