From a2e9819827d87fa3e19550c495c215b6d4fdcf35 Mon Sep 17 00:00:00 2001 From: Bradley Walters Date: Thu, 20 Nov 2025 23:41:54 -0800 Subject: [PATCH] opentelemetry-instrumentation-dbapi: instrument commit and rollback --- CHANGELOG.md | 6 + .../instrumentation/dbapi/__init__.py | 153 +++++++++++++++--- .../tests/test_dbapi_integration.py | 64 ++++++++ .../instrumentation/mysql/__init__.py | 6 + .../instrumentation/mysqlclient/__init__.py | 6 + .../instrumentation/psycopg/__init__.py | 10 +- .../tests/test_psycopg_integration.py | 60 ++++++- .../instrumentation/psycopg2/__init__.py | 5 +- .../instrumentation/pymssql/__init__.py | 7 +- .../instrumentation/pymysql/__init__.py | 6 + .../tests/test_pymysql_integration.py | 53 ++++++ .../instrumentation/sqlite3/__init__.py | 6 + .../tests/pymysql/test_pymysql_functional.py | 68 +++++--- 13 files changed, 395 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d14896238..e66a5a58b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `opentelemetry-instrumentation-dbapi`: Add instrumentation for `commit()` and `rollback()` transaction operations + ([#3964](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3964)) +- `opentelemetry-instrumentation-dbapi`: Add `enable_transaction_spans` configuration flag to control transaction span creation (default: `True`) + ([#3964](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3964)) +- `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-mysqlclient`, `opentelemetry-instrumentation-psycopg`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-sqlite3`, `opentelemetry-instrumentation-pymssql`: Add support for transaction span instrumentation via `enable_transaction_spans` parameter + ([#3964](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3964)) - `opentelemetry-instrumentation-aiohttp-client`: add support for url exclusions via `OTEL_PYTHON_EXCLUDED_URLS` / `OTEL_PYTHON_AIOHTTP_CLIENT_EXCLUDED_URLS` ([#3850](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3850)) - `opentelemetry-instrumentation-httpx`: add support for url exclusions via `OTEL_PYTHON_EXCLUDED_URLS` / `OTEL_PYTHON_HTTPX_EXCLUDED_URLS` diff --git a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py index ec088e1aa5..5436b1e91a 100644 --- a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py @@ -209,6 +209,7 @@ def trace_integration( db_api_integration_factory: type[DatabaseApiIntegration] | None = None, enable_attribute_commenter: bool = False, commenter_options: dict[str, Any] | None = None, + enable_transaction_spans: bool = True, ): """Integrate with DB API library. https://www.python.org/dev/peps/pep-0249/ @@ -228,6 +229,7 @@ def trace_integration( default one is used. enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` span attribute. Only available if enable_commenter=True. commenter_options: Configurations for tags to be appended at the sql query. + enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True. """ wrap_connect( __name__, @@ -242,6 +244,7 @@ def trace_integration( db_api_integration_factory=db_api_integration_factory, enable_attribute_commenter=enable_attribute_commenter, commenter_options=commenter_options, + enable_transaction_spans=enable_transaction_spans, ) @@ -258,6 +261,7 @@ def wrap_connect( db_api_integration_factory: type[DatabaseApiIntegration] | None = None, commenter_options: dict[str, Any] | None = None, enable_attribute_commenter: bool = False, + enable_transaction_spans: bool = True, ): """Integrate with DB API library. https://www.python.org/dev/peps/pep-0249/ @@ -277,6 +281,7 @@ def wrap_connect( default one is used. commenter_options: Configurations for tags to be appended at the sql query. enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` span attribute. Only available if enable_commenter=True. + enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True. """ db_api_integration_factory = ( @@ -301,6 +306,7 @@ def wrap_connect_( commenter_options=commenter_options, connect_module=connect_module, enable_attribute_commenter=enable_attribute_commenter, + enable_transaction_spans=enable_transaction_spans, ) return db_integration.wrapped_connection(wrapped, args, kwargs) @@ -338,6 +344,7 @@ def instrument_connection( connect_module: Callable[..., Any] | None = None, enable_attribute_commenter: bool = False, db_api_integration_factory: type[DatabaseApiIntegration] | None = None, + enable_transaction_spans: bool = True, ) -> TracedConnectionProxy[ConnectionT]: """Enable instrumentation in a database connection. @@ -359,6 +366,7 @@ def instrument_connection( replacement for :class:`DatabaseApiIntegration`. Can be used to obtain connection attributes from the connect method instead of from the connection itself (as done by the pymssql intrumentor). + enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True. Returns: An instrumented connection. @@ -382,6 +390,7 @@ def instrument_connection( commenter_options=commenter_options, connect_module=connect_module, enable_attribute_commenter=enable_attribute_commenter, + enable_transaction_spans=enable_transaction_spans, ) db_integration.get_connection_attributes(connection) return get_traced_connection_proxy(connection, db_integration) @@ -418,6 +427,7 @@ def __init__( commenter_options: dict[str, Any] | None = None, connect_module: Callable[..., Any] | None = None, enable_attribute_commenter: bool = False, + enable_transaction_spans: bool = True, ): if connection_attributes is None: self.connection_attributes = { @@ -440,6 +450,7 @@ def __init__( self.enable_commenter = enable_commenter self.commenter_options = commenter_options self.enable_attribute_commenter = enable_attribute_commenter + self.enable_transaction_spans = enable_transaction_spans self.database_system = database_system self.connection_props: dict[str, Any] = {} self.span_attributes: dict[str, Any] = {} @@ -555,6 +566,17 @@ def get_connection_attributes(self, connection: object) -> None: if port is not None: self.span_attributes[SpanAttributes.NET_PEER_PORT] = port + def populate_common_span_attributes(self, span: trace_api.Span) -> None: + """Populate span with common database connection attributes.""" + if not span.is_recording(): + return + + span.set_attribute(SpanAttributes.DB_SYSTEM, self.database_system) + span.set_attribute(SpanAttributes.DB_NAME, self.database) + + for attribute_key, attribute_value in self.span_attributes.items(): + span.set_attribute(attribute_key, attribute_value) + # pylint: disable=abstract-method class TracedConnectionProxy(wrapt.ObjectProxy, Generic[ConnectionT]): @@ -563,23 +585,54 @@ def __init__( self, connection: ConnectionT, db_api_integration: DatabaseApiIntegration | None = None, + wrap_cursors: bool = True, ): wrapt.ObjectProxy.__init__(self, connection) self._self_db_api_integration = db_api_integration + self._self_wrap_cursors = wrap_cursors def __getattribute__(self, name: str): - if object.__getattribute__(self, name): + # Try to get the attribute from the proxy first + try: return object.__getattribute__(self, name) - - return object.__getattribute__( - object.__getattribute__(self, "_connection"), name - ) + except AttributeError: + # If not found on proxy, try the wrapped connection + return object.__getattribute__( + object.__getattribute__(self, "__wrapped__"), name + ) def cursor(self, *args: Any, **kwargs: Any): - return get_traced_cursor_proxy( - self.__wrapped__.cursor(*args, **kwargs), - self._self_db_api_integration, - ) + cursor = self.__wrapped__.cursor(*args, **kwargs) + + # For databases like psycopg/psycopg2 that use cursor_factory, + # cursor tracing is already handled by the factory, so skip wrapping + if not self._self_wrap_cursors: + return cursor + + # For standard dbapi connections, wrap the cursor + return get_traced_cursor_proxy(cursor, self._self_db_api_integration) + + def _traced_tx_operation( + self, operation_name: str, operation_method: Callable[[], None] + ) -> None: + """Execute a traced transaction operation (commit, rollback).""" + if not is_instrumentation_enabled(): + return operation_method() + + if not self._self_db_api_integration.enable_transaction_spans: + return operation_method() + + with self._self_db_api_integration._tracer.start_as_current_span( + operation_name, kind=trace_api.SpanKind.CLIENT + ) as span: + self._self_db_api_integration.populate_common_span_attributes(span) + return operation_method() + + def commit(self): + return self._traced_tx_operation("COMMIT", self.__wrapped__.commit) + + def rollback(self): + return self._traced_tx_operation("ROLLBACK", self.__wrapped__.rollback) def __enter__(self): self.__wrapped__.__enter__() @@ -589,13 +642,78 @@ def __exit__(self, *args: Any, **kwargs: Any): self.__wrapped__.__exit__(*args, **kwargs) +class AsyncTracedConnectionProxy(TracedConnectionProxy[ConnectionT]): + async def _traced_tx_operation_async( + self, operation_name: str, operation_method: Callable[[], Awaitable[None]] + ) -> None: + """Execute a traced async transaction operation (commit, rollback).""" + if not is_instrumentation_enabled(): + return await operation_method() + + if not self._self_db_api_integration.enable_transaction_spans: + return await operation_method() + + with self._self_db_api_integration._tracer.start_as_current_span( + operation_name, kind=trace_api.SpanKind.CLIENT + ) as span: + self._self_db_api_integration.populate_common_span_attributes(span) + return await operation_method() + + async def commit(self): + """Async commit for async connections (e.g., psycopg.AsyncConnection).""" + return await self._traced_tx_operation_async("COMMIT", self.__wrapped__.commit) + + async def rollback(self): + """Async rollback for async connections (e.g., psycopg.AsyncConnection).""" + return await self._traced_tx_operation_async("ROLLBACK", self.__wrapped__.rollback) + + # Async context manager support + async def __aenter__(self): + if hasattr(self.__wrapped__, "__aenter__"): + await self.__wrapped__.__aenter__() + return self + + async def __aexit__(self, *args: Any, **kwargs: Any): + if hasattr(self.__wrapped__, "__aexit__"): + return await self.__wrapped__.__aexit__(*args, **kwargs) + + def get_traced_connection_proxy( connection: ConnectionT, db_api_integration: DatabaseApiIntegration | None, + wrap_cursors: bool = True, *args: Any, **kwargs: Any, ) -> TracedConnectionProxy[ConnectionT]: - return TracedConnectionProxy(connection, db_api_integration) + """Get a traced connection proxy for sync connections. + + Args: + connection: The database connection to wrap. + db_api_integration: The database API integration instance. + wrap_cursors: Whether to wrap cursors returned by connection.cursor(). + Set to False for databases like psycopg/psycopg2 that handle cursor + tracing via cursor_factory. Defaults to True. + """ + return TracedConnectionProxy(connection, db_api_integration, wrap_cursors) + + +def get_traced_async_connection_proxy( + connection: ConnectionT, + db_api_integration: DatabaseApiIntegration | None, + wrap_cursors: bool = True, + *args: Any, + **kwargs: Any, +) -> AsyncTracedConnectionProxy[ConnectionT]: + """Get a traced connection proxy for async connections. + + Args: + connection: The async database connection to wrap. + db_api_integration: The database API integration instance. + wrap_cursors: Whether to wrap cursors returned by connection.cursor(). + Set to False for databases like psycopg/psycopg2 that handle cursor + tracing via cursor_factory. Defaults to True. + """ + return AsyncTracedConnectionProxy(connection, db_api_integration, wrap_cursors) class CursorTracer(Generic[CursorT]): @@ -666,21 +784,12 @@ def _populate_span( ): if not span.is_recording(): return + + self._db_api_integration.populate_common_span_attributes(span) + statement = self.get_statement(cursor, args) - span.set_attribute( - SpanAttributes.DB_SYSTEM, self._db_api_integration.database_system - ) - span.set_attribute( - SpanAttributes.DB_NAME, self._db_api_integration.database - ) span.set_attribute(SpanAttributes.DB_STATEMENT, statement) - for ( - attribute_key, - attribute_value, - ) in self._db_api_integration.span_attributes.items(): - span.set_attribute(attribute_key, attribute_value) - if self._db_api_integration.capture_parameters and len(args) > 1: span.set_attribute("db.statement.parameters", str(args[1])) diff --git a/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py b/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py index 7d3396353a..748f076ca0 100644 --- a/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py +++ b/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py @@ -1055,6 +1055,62 @@ def test_callproc(self): "Test stored procedure", ) + def test_commit(self): + db_integration = dbapi.DatabaseApiIntegration( + "instrumenting_module_test_name", "testcomponent" + ) + mock_connection = db_integration.wrapped_connection( + mock_connect, {}, {} + ) + mock_connection.commit() + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "COMMIT") + + def test_rollback(self): + db_integration = dbapi.DatabaseApiIntegration( + "instrumenting_module_test_name", "testcomponent" + ) + mock_connection = db_integration.wrapped_connection( + mock_connect, {}, {} + ) + mock_connection.rollback() + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "ROLLBACK") + + def test_commit_with_suppress_instrumentation(self): + """Test that commit doesn't create a span when instrumentation is suppressed""" + db_integration = dbapi.DatabaseApiIntegration( + "instrumenting_module_test_name", + "testcomponent", + ) + mock_connection = db_integration.wrapped_connection( + mock_connect, {}, {} + ) + with suppress_instrumentation(): + mock_connection.commit() + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + def test_rollback_with_suppress_instrumentation(self): + """Test that rollback doesn't create a span when instrumentation is suppressed""" + db_integration = dbapi.DatabaseApiIntegration( + "instrumenting_module_test_name", + "testcomponent", + ) + mock_connection = db_integration.wrapped_connection( + mock_connect, {}, {} + ) + with suppress_instrumentation(): + mock_connection.rollback() + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + @mock.patch("opentelemetry.instrumentation.dbapi") def test_wrap_connect(self, mock_dbapi): dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-") @@ -1209,6 +1265,14 @@ def __init__(self, database, server_port, server_host, user): def cursor(self): return MockCursor() + # pylint: disable=no-self-use + def commit(self): + pass + + # pylint: disable=no-self-use + def rollback(self): + pass + class MockCursor: def __init__(self) -> None: diff --git a/instrumentation/opentelemetry-instrumentation-mysql/src/opentelemetry/instrumentation/mysql/__init__.py b/instrumentation/opentelemetry-instrumentation-mysql/src/opentelemetry/instrumentation/mysql/__init__.py index 0f344fada2..4be42e263e 100644 --- a/instrumentation/opentelemetry-instrumentation-mysql/src/opentelemetry/instrumentation/mysql/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-mysql/src/opentelemetry/instrumentation/mysql/__init__.py @@ -182,6 +182,7 @@ def _instrument(self, **kwargs): enable_attribute_commenter = kwargs.get( "enable_attribute_commenter", False ) + enable_transaction_spans = kwargs.get("enable_transaction_spans", True) dbapi.wrap_connect( __name__, @@ -194,6 +195,7 @@ def _instrument(self, **kwargs): enable_commenter=enable_sqlcommenter, commenter_options=commenter_options, enable_attribute_commenter=enable_attribute_commenter, + enable_transaction_spans=enable_transaction_spans, ) def _uninstrument(self, **kwargs): @@ -208,6 +210,7 @@ def instrument_connection( enable_commenter=None, commenter_options=None, enable_attribute_commenter=None, + enable_transaction_spans=True, ): """Enable instrumentation in a MySQL connection. @@ -225,6 +228,8 @@ def instrument_connection( Optional configurations for tags to be appended at the sql query. enable_attribute_commenter: Optional flag to enable/disable addition of sqlcomment to span attribute (default False). Requires enable_commenter=True. + enable_transaction_spans: + Flag to enable/disable transaction spans (commit/rollback). Defaults to True. Returns: An instrumented MySQL connection with OpenTelemetry tracing enabled. @@ -240,6 +245,7 @@ def instrument_connection( commenter_options=commenter_options, connect_module=mysql.connector, enable_attribute_commenter=enable_attribute_commenter, + enable_transaction_spans=enable_transaction_spans, ) def uninstrument_connection(self, connection): diff --git a/instrumentation/opentelemetry-instrumentation-mysqlclient/src/opentelemetry/instrumentation/mysqlclient/__init__.py b/instrumentation/opentelemetry-instrumentation-mysqlclient/src/opentelemetry/instrumentation/mysqlclient/__init__.py index fb08600505..44b54ce281 100644 --- a/instrumentation/opentelemetry-instrumentation-mysqlclient/src/opentelemetry/instrumentation/mysqlclient/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-mysqlclient/src/opentelemetry/instrumentation/mysqlclient/__init__.py @@ -166,6 +166,7 @@ def _instrument(self, **kwargs): # pylint: disable=no-self-use enable_attribute_commenter = kwargs.get( "enable_attribute_commenter", False ) + enable_transaction_spans = kwargs.get("enable_transaction_spans", True) dbapi.wrap_connect( __name__, @@ -178,6 +179,7 @@ def _instrument(self, **kwargs): # pylint: disable=no-self-use enable_commenter=enable_sqlcommenter, commenter_options=commenter_options, enable_attribute_commenter=enable_attribute_commenter, + enable_transaction_spans=enable_transaction_spans, ) def _uninstrument(self, **kwargs): # pylint: disable=no-self-use @@ -191,6 +193,7 @@ def instrument_connection( enable_commenter=None, commenter_options=None, enable_attribute_commenter=None, + enable_transaction_spans=True, ): """Enable instrumentation in a mysqlclient connection. @@ -215,6 +218,8 @@ def instrument_connection( - `mysql_client_version`: Adds the MySQL client version. - `driver_paramstyle`: Adds the parameter style. - `opentelemetry_values`: Includes traceparent values. + enable_transaction_spans: + Flag to enable/disable transaction spans (commit/rollback). Defaults to True. Returns: An instrumented MySQL connection with OpenTelemetry support enabled. """ @@ -230,6 +235,7 @@ def instrument_connection( commenter_options=commenter_options, connect_module=MySQLdb, enable_attribute_commenter=enable_attribute_commenter, + enable_transaction_spans=enable_transaction_spans, ) @staticmethod diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 0db721eec5..0b4905e089 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -188,6 +188,7 @@ def _instrument(self, **kwargs: Any): "enable_attribute_commenter", False ) capture_parameters = kwargs.get("capture_parameters", False) + enable_transaction_spans = kwargs.get("enable_transaction_spans", True) dbapi.wrap_connect( __name__, psycopg, @@ -201,6 +202,7 @@ def _instrument(self, **kwargs: Any): commenter_options=commenter_options, enable_attribute_commenter=enable_attribute_commenter, capture_parameters=capture_parameters, + enable_transaction_spans=enable_transaction_spans, ) dbapi.wrap_connect( @@ -216,6 +218,7 @@ def _instrument(self, **kwargs: Any): commenter_options=commenter_options, enable_attribute_commenter=enable_attribute_commenter, capture_parameters=capture_parameters, + enable_transaction_spans=enable_transaction_spans, ) dbapi.wrap_connect( __name__, @@ -230,6 +233,7 @@ def _instrument(self, **kwargs: Any): commenter_options=commenter_options, enable_attribute_commenter=enable_attribute_commenter, capture_parameters=capture_parameters, + enable_transaction_spans=enable_transaction_spans, ) def _uninstrument(self, **kwargs: Any): @@ -304,7 +308,8 @@ def wrapped_connection( kwargs["cursor_factory"] = _new_cursor_factory(**new_factory_kwargs) connection = connect_method(*args, **kwargs) self.get_connection_attributes(connection) - return connection + # psycopg uses cursor_factory for cursor tracing, so disable cursor wrapping + return dbapi.get_traced_connection_proxy(connection, self, wrap_cursors=False) class DatabaseApiAsyncIntegration(dbapi.DatabaseApiIntegration): @@ -324,7 +329,8 @@ async def wrapped_connection( ) connection = await connect_method(*args, **kwargs) self.get_connection_attributes(connection) - return connection + # psycopg uses cursor_factory for cursor tracing, so disable cursor wrapping + return dbapi.get_traced_async_connection_proxy(connection, self, wrap_cursors=False) class CursorTracer(dbapi.CursorTracer): diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py index da43fbad32..a194c82ed3 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py @@ -100,12 +100,6 @@ def get_dsn_parameters(self): # pylint: disable=no-self-use class MockAsyncConnection: - commit = mock.MagicMock(spec=types.MethodType) - commit.__name__ = "commit" - - rollback = mock.MagicMock(spec=types.MethodType) - rollback.__name__ = "rollback" - def __init__(self, *args, **kwargs): self.cursor_factory = kwargs.pop("cursor_factory", None) @@ -113,6 +107,12 @@ def __init__(self, *args, **kwargs): async def connect(*args, **kwargs): return MockAsyncConnection(**kwargs) + async def commit(self): + pass + + async def rollback(self): + pass + def cursor(self): if self.cursor_factory: cur = self.cursor_factory(self) @@ -392,6 +392,28 @@ def test_uninstrument_connection_with_instrument_connection(self): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) + def test_commit(self): + PsycopgInstrumentor().instrument() + + cnx = psycopg.connect(database="test") + cnx.commit() + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "COMMIT") + + def test_rollback(self): + PsycopgInstrumentor().instrument() + + cnx = psycopg.connect(database="test") + cnx.rollback() + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "ROLLBACK") + @mock.patch("opentelemetry.instrumentation.dbapi.wrap_connect") def test_sqlcommenter_enabled(self, event_mocked): cnx = psycopg.connect(database="test") @@ -534,6 +556,32 @@ async def test_not_recording_async(self): PsycopgInstrumentor().uninstrument() + async def test_async_commit(self): + PsycopgInstrumentor().instrument() + + cnx = await psycopg.AsyncConnection.connect("test") + await cnx.commit() + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "COMMIT") + + PsycopgInstrumentor().uninstrument() + + async def test_async_rollback(self): + PsycopgInstrumentor().instrument() + + cnx = await psycopg.AsyncConnection.connect("test") + await cnx.rollback() + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "ROLLBACK") + + PsycopgInstrumentor().uninstrument() + async def test_tracing_is_async(self): PsycopgInstrumentor().instrument() diff --git a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py index eceadef308..933febe7bb 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg2/src/opentelemetry/instrumentation/psycopg2/__init__.py @@ -202,6 +202,7 @@ def _instrument(self, **kwargs): enable_attribute_commenter = kwargs.get( "enable_attribute_commenter", False ) + enable_transaction_spans = kwargs.get("enable_transaction_spans", True) dbapi.wrap_connect( __name__, psycopg2, @@ -214,6 +215,7 @@ def _instrument(self, **kwargs): enable_commenter=enable_sqlcommenter, commenter_options=commenter_options, enable_attribute_commenter=enable_attribute_commenter, + enable_transaction_spans=enable_transaction_spans, ) def _uninstrument(self, **kwargs): @@ -279,7 +281,8 @@ def wrapped_connection( kwargs["cursor_factory"] = _new_cursor_factory(**new_factory_kwargs) connection = connect_method(*args, **kwargs) self.get_connection_attributes(connection) - return connection + # psycopg2 uses cursor_factory for cursor tracing, so disable cursor wrapping + return dbapi.get_traced_connection_proxy(connection, self, wrap_cursors=False) class CursorTracer(dbapi.CursorTracer): diff --git a/instrumentation/opentelemetry-instrumentation-pymssql/src/opentelemetry/instrumentation/pymssql/__init__.py b/instrumentation/opentelemetry-instrumentation-pymssql/src/opentelemetry/instrumentation/pymssql/__init__.py index 9da65734bc..eb534025c7 100644 --- a/instrumentation/opentelemetry-instrumentation-pymssql/src/opentelemetry/instrumentation/pymssql/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pymssql/src/opentelemetry/instrumentation/pymssql/__init__.py @@ -157,6 +157,7 @@ def _instrument(self, **kwargs): https://github.com/pymssql/pymssql/ """ tracer_provider = kwargs.get("tracer_provider") + enable_transaction_spans = kwargs.get("enable_transaction_spans", True) dbapi.wrap_connect( __name__, @@ -169,6 +170,7 @@ def _instrument(self, **kwargs): # instead, we get the attributes from the connect method (which is done # via PyMSSQLDatabaseApiIntegration.wrapped_connection) db_api_integration_factory=_PyMSSQLDatabaseApiIntegration, + enable_transaction_spans=enable_transaction_spans, ) def _uninstrument(self, **kwargs): @@ -176,13 +178,15 @@ def _uninstrument(self, **kwargs): dbapi.unwrap_connect(pymssql, "connect") @staticmethod - def instrument_connection(connection, tracer_provider=None): + def instrument_connection(connection, tracer_provider=None, enable_transaction_spans=True): """Enable instrumentation in a pymssql connection. Args: connection: The connection to instrument. tracer_provider: The optional tracer provider to use. If omitted the current globally configured one is used. + enable_transaction_spans: Flag to enable/disable transaction spans + (commit/rollback). Defaults to True. Returns: An instrumented connection. @@ -195,6 +199,7 @@ def instrument_connection(connection, tracer_provider=None): version=__version__, tracer_provider=tracer_provider, db_api_integration_factory=_PyMSSQLDatabaseApiIntegration, + enable_transaction_spans=enable_transaction_spans, ) @staticmethod diff --git a/instrumentation/opentelemetry-instrumentation-pymysql/src/opentelemetry/instrumentation/pymysql/__init__.py b/instrumentation/opentelemetry-instrumentation-pymysql/src/opentelemetry/instrumentation/pymysql/__init__.py index 587aafbd55..e97e1a6ef8 100644 --- a/instrumentation/opentelemetry-instrumentation-pymysql/src/opentelemetry/instrumentation/pymysql/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pymysql/src/opentelemetry/instrumentation/pymysql/__init__.py @@ -189,6 +189,7 @@ def _instrument(self, **kwargs): # pylint: disable=no-self-use enable_attribute_commenter = kwargs.get( "enable_attribute_commenter", False ) + enable_transaction_spans = kwargs.get("enable_transaction_spans", True) dbapi.wrap_connect( __name__, @@ -201,6 +202,7 @@ def _instrument(self, **kwargs): # pylint: disable=no-self-use enable_commenter=enable_sqlcommenter, commenter_options=commenter_options, enable_attribute_commenter=enable_attribute_commenter, + enable_transaction_spans=enable_transaction_spans, ) def _uninstrument(self, **kwargs): # pylint: disable=no-self-use @@ -214,6 +216,7 @@ def instrument_connection( enable_commenter=None, commenter_options=None, enable_attribute_commenter=None, + enable_transaction_spans=True, ): """Enable instrumentation in a PyMySQL connection. @@ -232,6 +235,8 @@ def instrument_connection( You can specify various options, such as enabling driver information, database version logging, traceparent propagation, and other customizable metadata enhancements. See *SQLCommenter Configurations* above for more information. + enable_transaction_spans: + A flag to enable/disable transaction spans (commit/rollback). Defaults to True. Returns: An instrumented connection. """ @@ -247,6 +252,7 @@ def instrument_connection( commenter_options=commenter_options, connect_module=pymysql, enable_attribute_commenter=enable_attribute_commenter, + enable_transaction_spans=enable_transaction_spans, ) @staticmethod diff --git a/instrumentation/opentelemetry-instrumentation-pymysql/tests/test_pymysql_integration.py b/instrumentation/opentelemetry-instrumentation-pymysql/tests/test_pymysql_integration.py index ea59e5df7b..52be3de107 100644 --- a/instrumentation/opentelemetry-instrumentation-pymysql/tests/test_pymysql_integration.py +++ b/instrumentation/opentelemetry-instrumentation-pymysql/tests/test_pymysql_integration.py @@ -480,3 +480,56 @@ def test_uninstrument_connection(self, mock_connect): spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 1) + + @mock.patch("pymysql.connect") + # pylint: disable=unused-argument + def test_commit(self, mock_connect): + """Test that commit creates a span""" + PyMySQLInstrumentor().instrument() + cnx = pymysql.connect(database="test") + cnx.commit() + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "COMMIT") + self.assertIs(span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual(span.attributes[SpanAttributes.DB_SYSTEM], "mysql") + + @mock.patch("pymysql.connect") + # pylint: disable=unused-argument + def test_rollback(self, mock_connect): + """Test that rollback creates a span""" + PyMySQLInstrumentor().instrument() + cnx = pymysql.connect(database="test") + cnx.rollback() + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + self.assertEqual(span.name, "ROLLBACK") + self.assertIs(span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual(span.attributes[SpanAttributes.DB_SYSTEM], "mysql") + + @mock.patch("pymysql.connect") + # pylint: disable=unused-argument + def test_commit_and_query(self, mock_connect): + """Test that both execute and commit create spans""" + PyMySQLInstrumentor().instrument() + cnx = pymysql.connect(database="test") + cursor = cnx.cursor() + cursor.execute("SELECT * FROM test") + cnx.commit() + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 2) + + # First span should be the SELECT + select_span = spans_list[0] + self.assertEqual(select_span.name, "SELECT") + self.assertIs(select_span.kind, trace_api.SpanKind.CLIENT) + + # Second span should be the COMMIT + commit_span = spans_list[1] + self.assertEqual(commit_span.name, "COMMIT") + self.assertIs(commit_span.kind, trace_api.SpanKind.CLIENT) diff --git a/instrumentation/opentelemetry-instrumentation-sqlite3/src/opentelemetry/instrumentation/sqlite3/__init__.py b/instrumentation/opentelemetry-instrumentation-sqlite3/src/opentelemetry/instrumentation/sqlite3/__init__.py index 086d47f3f5..97fe67dd1d 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlite3/src/opentelemetry/instrumentation/sqlite3/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-sqlite3/src/opentelemetry/instrumentation/sqlite3/__init__.py @@ -89,6 +89,7 @@ def _instrument(self, **kwargs: Any) -> None: https://docs.python.org/3/library/sqlite3.html """ tracer_provider = kwargs.get("tracer_provider") + enable_transaction_spans = kwargs.get("enable_transaction_spans", True) for module in self._TO_WRAP: dbapi.wrap_connect( @@ -99,6 +100,7 @@ def _instrument(self, **kwargs: Any) -> None: _CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, + enable_transaction_spans=enable_transaction_spans, ) def _uninstrument(self, **kwargs: Any) -> None: @@ -110,6 +112,7 @@ def _uninstrument(self, **kwargs: Any) -> None: def instrument_connection( connection: SQLite3Connection, tracer_provider: TracerProvider | None = None, + enable_transaction_spans: bool = True, ) -> SQLite3Connection: """Enable instrumentation in a SQLite connection. @@ -117,6 +120,8 @@ def instrument_connection( connection: The connection to instrument. tracer_provider: The optional tracer provider to use. If omitted the current globally configured one is used. + enable_transaction_spans: Flag to enable/disable transaction spans + (commit/rollback). Defaults to True. Returns: An instrumented SQLite connection that supports @@ -130,6 +135,7 @@ def instrument_connection( _CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, + enable_transaction_spans=enable_transaction_spans, ) @staticmethod diff --git a/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py index 537f42377c..aa00eb2673 100644 --- a/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py +++ b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py @@ -48,36 +48,44 @@ def tearDown(self): PyMySQLInstrumentor().uninstrument() super().tearDown() - def validate_spans(self, span_name): + def validate_spans(self, *span_names): spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 2) + self.assertEqual(len(spans), len(span_names) + 1) # +1 for rootSpan + + root_span = None + db_spans = [] + for span in spans: if span.name == "rootSpan": root_span = span else: - db_span = span + db_spans.append(span) self.assertIsInstance(span.start_time, int) self.assertIsInstance(span.end_time, int) + self.assertIsNotNone(root_span) - self.assertIsNotNone(db_span) self.assertEqual(root_span.name, "rootSpan") - self.assertEqual(db_span.name, span_name) - self.assertIsNotNone(db_span.parent) - self.assertIs(db_span.parent, root_span.get_span_context()) - self.assertIs(db_span.kind, trace_api.SpanKind.CLIENT) - self.assertEqual(db_span.attributes[SpanAttributes.DB_SYSTEM], "mysql") - self.assertEqual( - db_span.attributes[SpanAttributes.DB_NAME], MYSQL_DB_NAME - ) - self.assertEqual( - db_span.attributes[SpanAttributes.DB_USER], MYSQL_USER - ) - self.assertEqual( - db_span.attributes[SpanAttributes.NET_PEER_NAME], MYSQL_HOST - ) - self.assertEqual( - db_span.attributes[SpanAttributes.NET_PEER_PORT], MYSQL_PORT - ) + self.assertEqual(len(db_spans), len(span_names)) + + # Validate each db span + for db_span, expected_name in zip(db_spans, span_names): + self.assertEqual(db_span.name, expected_name) + self.assertIsNotNone(db_span.parent) + self.assertIs(db_span.parent, root_span.get_span_context()) + self.assertIs(db_span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual(db_span.attributes[SpanAttributes.DB_SYSTEM], "mysql") + self.assertEqual( + db_span.attributes[SpanAttributes.DB_NAME], MYSQL_DB_NAME + ) + self.assertEqual( + db_span.attributes[SpanAttributes.DB_USER], MYSQL_USER + ) + self.assertEqual( + db_span.attributes[SpanAttributes.NET_PEER_NAME], MYSQL_HOST + ) + self.assertEqual( + db_span.attributes[SpanAttributes.NET_PEER_PORT], MYSQL_PORT + ) def test_execute(self): """Should create a child span for execute""" @@ -112,17 +120,31 @@ def test_callproc(self): self.validate_spans("test") def test_commit(self): + """Should create spans for both INSERT and COMMIT""" stmt = "INSERT INTO test (id) VALUES (%s)" with self._tracer.start_as_current_span("rootSpan"): data = (("4",), ("5",), ("6",)) self._cursor.executemany(stmt, data) self._connection.commit() - self.validate_spans("INSERT") + self.validate_spans("INSERT", "COMMIT") def test_rollback(self): + """Should create spans for both INSERT and ROLLBACK""" stmt = "INSERT INTO test (id) VALUES (%s)" with self._tracer.start_as_current_span("rootSpan"): data = (("7",), ("8",), ("9",)) self._cursor.executemany(stmt, data) self._connection.rollback() - self.validate_spans("INSERT") + self.validate_spans("INSERT", "ROLLBACK") + + def test_commit_only(self): + """Should create a span for standalone COMMIT""" + with self._tracer.start_as_current_span("rootSpan"): + self._connection.commit() + self.validate_spans("COMMIT") + + def test_rollback_only(self): + """Should create a span for standalone ROLLBACK""" + with self._tracer.start_as_current_span("rootSpan"): + self._connection.rollback() + self.validate_spans("ROLLBACK")