diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cbbe7aa3b..6ffaa5301c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3936](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3936)) - `opentelemetry-instrumentation-aiohttp-client`: Update instrumentor to respect suppressing http instrumentation ([#3957](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3957)) +- `opentelemetry-instrumentation-psycopg`: Fix `instrument_connection` method to use `_new_cursor_async_factory` on async connections. + ([#3956](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3956)) ## Version 1.38.0/0.59b0 (2025-10-16) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 0db721eec5..f9aa4cc45f 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -268,9 +268,14 @@ def instrument_connection( setattr( connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory ) - connection.cursor_factory = _new_cursor_factory( - tracer_provider=tracer_provider - ) + if isinstance(connection, psycopg.AsyncConnection): + connection.cursor_factory = _new_cursor_async_factory( + tracer_provider=tracer_provider + ) + else: + connection.cursor_factory = _new_cursor_factory( + tracer_provider=tracer_provider + ) connection._is_instrumented_by_opentelemetry = True else: _logger.warning( diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py index da43fbad32..531d5b021e 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py @@ -99,7 +99,7 @@ def get_dsn_parameters(self): # pylint: disable=no-self-use return {"dbname": "test"} -class MockAsyncConnection: +class MockAsyncConnection(psycopg.AsyncConnection): commit = mock.MagicMock(spec=types.MethodType) commit.__name__ = "commit" @@ -113,11 +113,11 @@ def __init__(self, *args, **kwargs): async def connect(*args, **kwargs): return MockAsyncConnection(**kwargs) - def cursor(self): + def cursor(self, *args, **kwargs): if self.cursor_factory: cur = self.cursor_factory(self) return cur - return MockAsyncCursor() + return MockAsyncCursor(*args, **kwargs) def execute(self, query, params=None, *, prepare=None, binary=False): cur = self.cursor() @@ -178,6 +178,8 @@ def test_instrumentor(self): cnx = psycopg.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) + cursor = cnx.cursor() query = "SELECT * FROM test" @@ -209,6 +211,8 @@ def test_instrumentor_with_connection_class(self): cnx = psycopg.Connection.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) + cursor = cnx.cursor() query = "SELECT * FROM test" @@ -239,6 +243,7 @@ def test_span_name(self): cnx = psycopg.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) cursor = cnx.cursor() cursor.execute("Test query", ("param1Value", False)) @@ -267,6 +272,7 @@ def test_span_name(self): def test_span_params_attribute(self): PsycopgInstrumentor().instrument(capture_parameters=True) cnx = psycopg.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) query = "SELECT * FROM mytable WHERE myparam1 = %s AND myparam2 = %s" params = ("test", 42) @@ -311,6 +317,7 @@ def test_custom_tracer_provider(self): PsycopgInstrumentor().instrument(tracer_provider=tracer_provider) cnx = psycopg.connect(database="test") + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) cursor = cnx.cursor() query = "SELECT * FROM test" cursor.execute(query) @@ -332,6 +339,9 @@ def test_instrument_connection(self): self.assertEqual(len(spans_list), 0) cnx = PsycopgInstrumentor().instrument_connection(cnx) + + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) + cursor = cnx.cursor() cursor.execute(query) @@ -350,6 +360,7 @@ def test_instrument_connection_with_instrument(self): PsycopgInstrumentor().instrument() cnx = PsycopgInstrumentor().instrument_connection(cnx) + self.assertTrue(issubclass(cnx.cursor_factory, MockCursor)) cursor = cnx.cursor() cursor.execute(query) @@ -422,6 +433,9 @@ async def test_wrap_async_connection_class_with_cursor(self): async def test_async_connection(): acnx = await psycopg.AsyncConnection.connect("test") async with acnx as cnx: + self.assertTrue( + issubclass(cnx.cursor_factory, MockAsyncCursor) + ) async with cnx.cursor() as cursor: await cursor.execute("SELECT * FROM test") @@ -450,6 +464,9 @@ async def test_instrumentor_with_async_connection_class(self): async def test_async_connection(): acnx = await psycopg.AsyncConnection.connect("test") async with acnx as cnx: + self.assertTrue( + issubclass(cnx.cursor_factory, MockAsyncCursor) + ) await cnx.execute("SELECT * FROM test") await test_async_connection() @@ -474,6 +491,7 @@ async def test_span_name_async(self): PsycopgInstrumentor().instrument() cnx = await psycopg.AsyncConnection.connect("test") + self.assertTrue(issubclass(cnx.cursor_factory, MockAsyncCursor)) async with cnx.cursor() as cursor: await cursor.execute("Test query", ("param1Value", False)) await cursor.execute( @@ -500,6 +518,7 @@ async def test_span_name_async(self): async def test_span_params_attribute(self): PsycopgInstrumentor().instrument(capture_parameters=True) cnx = await psycopg.AsyncConnection.connect("test") + self.assertTrue(issubclass(cnx.cursor_factory, MockAsyncCursor)) query = "SELECT * FROM mytable WHERE myparam1 = %s AND myparam2 = %s" params = ("test", 42) async with cnx.cursor() as cursor: @@ -543,6 +562,7 @@ async def test_tracing_is_async(self): async def test_async_connection(): acnx = await psycopg.AsyncConnection.connect("test") + self.assertTrue(issubclass(acnx.cursor_factory, MockAsyncCursor)) async with acnx as cnx: async with cnx.cursor() as cursor: await cursor.execute("SELECT * FROM test", delay=delay) @@ -557,3 +577,33 @@ async def test_async_connection(): self.assertGreater(duration, delay * 1e9) PsycopgInstrumentor().uninstrument() + + async def test_instrument_connection_uses_async_cursor_factory(self): + query = b"SELECT * FROM test" + + acnx = await psycopg.AsyncConnection.connect("test") + async with acnx: + await acnx.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + acnx = PsycopgInstrumentor().instrument_connection(acnx) + + self.assertTrue(acnx._is_instrumented_by_opentelemetry) + + # The new cursor_factory should be a subclass of MockAsyncCursor, + # the async traced cursor factory returned by _new_cursor_async_factory + self.assertTrue(issubclass(acnx.cursor_factory, MockAsyncCursor)) + + cursor = acnx.cursor() + await cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationScope( + span, opentelemetry.instrumentation.psycopg + )