From 0787f9dca42887c69c9d766ebe49e67c3468fe85 Mon Sep 17 00:00:00 2001 From: DJ Spatoulas Date: Mon, 17 Nov 2025 04:09:26 -0500 Subject: [PATCH 1/4] Fix instrument_connection to use async cursor factory for `AsyncConnection` --- .../opentelemetry/instrumentation/psycopg/__init__.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 0db721eec5..f9aa4cc45f 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -268,9 +268,14 @@ def instrument_connection( setattr( connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory ) - connection.cursor_factory = _new_cursor_factory( - tracer_provider=tracer_provider - ) + if isinstance(connection, psycopg.AsyncConnection): + connection.cursor_factory = _new_cursor_async_factory( + tracer_provider=tracer_provider + ) + else: + connection.cursor_factory = _new_cursor_factory( + tracer_provider=tracer_provider + ) connection._is_instrumented_by_opentelemetry = True else: _logger.warning( From 5670b9676d4e88d539f87442be84e1ae53e29429 Mon Sep 17 00:00:00 2001 From: DJ Spatoulas Date: Thu, 20 Nov 2025 15:53:02 -0500 Subject: [PATCH 2/4] Update tests for psycopg async cursor factory change --- .../tests/test_psycopg_integration.py | 52 ++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py index da43fbad32..0d67273718 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py @@ -99,7 +99,7 @@ def get_dsn_parameters(self): # pylint: disable=no-self-use return {"dbname": "test"} -class MockAsyncConnection: +class MockAsyncConnection(psycopg.AsyncConnection): commit = mock.MagicMock(spec=types.MethodType) commit.__name__ = "commit" @@ -178,6 +178,8 @@ def test_instrumentor(self): cnx = psycopg.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) + cursor = cnx.cursor() query = "SELECT * FROM test" @@ -209,6 +211,8 @@ def test_instrumentor_with_connection_class(self): cnx = psycopg.Connection.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) + cursor = cnx.cursor() query = "SELECT * FROM test" @@ -239,6 +243,7 @@ def test_span_name(self): cnx = psycopg.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) cursor = cnx.cursor() cursor.execute("Test query", ("param1Value", False)) @@ -267,6 +272,7 @@ def test_span_name(self): def test_span_params_attribute(self): PsycopgInstrumentor().instrument(capture_parameters=True) cnx = psycopg.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) query = "SELECT * FROM mytable WHERE myparam1 = %s AND myparam2 = %s" params = ("test", 42) @@ -311,6 +317,7 @@ def test_custom_tracer_provider(self): PsycopgInstrumentor().instrument(tracer_provider=tracer_provider) cnx = psycopg.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) cursor = cnx.cursor() query = "SELECT * FROM test" cursor.execute(query) @@ -332,6 +339,9 @@ def test_instrument_connection(self): self.assertEqual(len(spans_list), 0) cnx = PsycopgInstrumentor().instrument_connection(cnx) + + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) + cursor = cnx.cursor() cursor.execute(query) @@ -350,6 +360,7 @@ def test_instrument_connection_with_instrument(self): PsycopgInstrumentor().instrument() cnx = PsycopgInstrumentor().instrument_connection(cnx) + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) cursor = cnx.cursor() cursor.execute(query) @@ -422,6 +433,9 @@ async def test_wrap_async_connection_class_with_cursor(self): async def test_async_connection(): acnx = await psycopg.AsyncConnection.connect("test") async with acnx as cnx: + self.assertTrue( + issubclass(cnx.cursor_factory, MockAsyncCursor) + ) async with cnx.cursor() as cursor: await cursor.execute("SELECT * FROM test") @@ -450,6 +464,9 @@ async def test_instrumentor_with_async_connection_class(self): async def test_async_connection(): acnx = await psycopg.AsyncConnection.connect("test") async with acnx as cnx: + self.assertTrue( + issubclass(cnx.cursor_factory, MockAsyncCursor) + ) await cnx.execute("SELECT * FROM test") await test_async_connection() @@ -474,6 +491,7 @@ async def test_span_name_async(self): PsycopgInstrumentor().instrument() cnx = await psycopg.AsyncConnection.connect("test") + self.assertTrue(issubclass(cnx.cursor_factory, MockAsyncCursor)) async with cnx.cursor() as cursor: await cursor.execute("Test query", ("param1Value", False)) await cursor.execute( @@ -500,6 +518,7 @@ async def test_span_name_async(self): async def test_span_params_attribute(self): PsycopgInstrumentor().instrument(capture_parameters=True) cnx = await psycopg.AsyncConnection.connect("test") + self.assertTrue(issubclass(cnx.cursor_factory, MockAsyncCursor)) query = "SELECT * FROM mytable WHERE myparam1 = %s AND myparam2 = %s" params = ("test", 42) async with cnx.cursor() as cursor: @@ -543,6 +562,7 @@ async def test_tracing_is_async(self): async def test_async_connection(): acnx = await psycopg.AsyncConnection.connect("test") + self.assertTrue(issubclass(acnx.cursor_factory, MockAsyncCursor)) async with acnx as cnx: async with cnx.cursor() as cursor: await cursor.execute("SELECT * FROM test", delay=delay) @@ -557,3 +577,33 @@ async def test_async_connection(): self.assertGreater(duration, delay * 1e9) PsycopgInstrumentor().uninstrument() + + async def test_instrument_connection_uses_async_cursor_factory(self): + query = b"SELECT * FROM test" + + acnx = await psycopg.AsyncConnection.connect("test") + async with acnx: + await acnx.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + acnx = PsycopgInstrumentor().instrument_connection(acnx) + + self.assertTrue(acnx._is_instrumented_by_opentelemetry) + + # The new cursor_factory should be a subclass of MockAsyncCursor, + # the async traced cursor factory returned by _new_cursor_async_factory + self.assertTrue(issubclass(acnx.cursor_factory, MockAsyncCursor)) + + cursor = acnx.cursor() + await cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationScope( + span, opentelemetry.instrumentation.psycopg + ) From b76b7d16a61005d54ca89a0d4f0dd1efacaaf612 Mon Sep 17 00:00:00 2001 From: DJ Spatoulas Date: Fri, 28 Nov 2025 22:39:18 -0500 Subject: [PATCH 3/4] Add `psycopg` async cursor fix to CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cbbe7aa3b..6ffaa5301c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3936](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3936)) - `opentelemetry-instrumentation-aiohttp-client`: Update instrumentor to respect suppressing http instrumentation ([#3957](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3957)) +- `opentelemetry-instrumentation-psycopg`: Fix `instrument_connection` method to use `_new_cursor_async_factory` on async connections. + ([#3956](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3956)) ## Version 1.38.0/0.59b0 (2025-10-16) From 58ae7441c1b4605d5d657400773f72d2e50ec61f Mon Sep 17 00:00:00 2001 From: DJ Spatoulas Date: Mon, 1 Dec 2025 12:05:10 -0500 Subject: [PATCH 4/4] Fix pylint error for `MockAsyncConnection.cursor` --- .../tests/test_psycopg_integration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py index 0d67273718..531d5b021e 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py @@ -113,11 +113,11 @@ def __init__(self, *args, **kwargs): async def connect(*args, **kwargs): return MockAsyncConnection(**kwargs) - def cursor(self): + def cursor(self, *args, **kwargs): if self.cursor_factory: cur = self.cursor_factory(self) return cur - return MockAsyncCursor() + return MockAsyncCursor(*args, **kwargs) def execute(self, query, params=None, *, prepare=None, binary=False): cur = self.cursor()