Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `opentelemetry-instrumentation-dbapi`: Add instrumentation for `commit()` and `rollback()` transaction operations
([#3964](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3964))
- `opentelemetry-instrumentation-dbapi`: Add `enable_transaction_spans` configuration flag to control transaction span creation (default: `True`)
([#3964](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3964))
- `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-mysqlclient`, `opentelemetry-instrumentation-psycopg`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-sqlite3`, `opentelemetry-instrumentation-pymssql`: Add support for transaction span instrumentation via `enable_transaction_spans` parameter
([#3964](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3964))
- `opentelemetry-instrumentation-aiohttp-client`: add support for url exclusions via `OTEL_PYTHON_EXCLUDED_URLS` / `OTEL_PYTHON_AIOHTTP_CLIENT_EXCLUDED_URLS`
([#3850](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3850))
- `opentelemetry-instrumentation-httpx`: add support for url exclusions via `OTEL_PYTHON_EXCLUDED_URLS` / `OTEL_PYTHON_HTTPX_EXCLUDED_URLS`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def trace_integration(
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
enable_attribute_commenter: bool = False,
commenter_options: dict[str, Any] | None = None,
enable_transaction_spans: bool = True,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
Expand All @@ -228,6 +229,7 @@ def trace_integration(
default one is used.
enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` span attribute. Only available if enable_commenter=True.
commenter_options: Configurations for tags to be appended at the sql query.
enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True.
"""
wrap_connect(
__name__,
Expand All @@ -242,6 +244,7 @@ def trace_integration(
db_api_integration_factory=db_api_integration_factory,
enable_attribute_commenter=enable_attribute_commenter,
commenter_options=commenter_options,
enable_transaction_spans=enable_transaction_spans,
)


Expand All @@ -258,6 +261,7 @@ def wrap_connect(
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
commenter_options: dict[str, Any] | None = None,
enable_attribute_commenter: bool = False,
enable_transaction_spans: bool = True,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
Expand All @@ -277,6 +281,7 @@ def wrap_connect(
default one is used.
commenter_options: Configurations for tags to be appended at the sql query.
enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` span attribute. Only available if enable_commenter=True.
enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True.

"""
db_api_integration_factory = (
Expand All @@ -301,6 +306,7 @@ def wrap_connect_(
commenter_options=commenter_options,
connect_module=connect_module,
enable_attribute_commenter=enable_attribute_commenter,
enable_transaction_spans=enable_transaction_spans,
)
return db_integration.wrapped_connection(wrapped, args, kwargs)

Expand Down Expand Up @@ -338,6 +344,7 @@ def instrument_connection(
connect_module: Callable[..., Any] | None = None,
enable_attribute_commenter: bool = False,
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
enable_transaction_spans: bool = True,
) -> TracedConnectionProxy[ConnectionT]:
"""Enable instrumentation in a database connection.

Expand All @@ -359,6 +366,7 @@ def instrument_connection(
replacement for :class:`DatabaseApiIntegration`. Can be used to
obtain connection attributes from the connect method instead of
from the connection itself (as done by the pymssql intrumentor).
enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True.

Returns:
An instrumented connection.
Expand All @@ -382,6 +390,7 @@ def instrument_connection(
commenter_options=commenter_options,
connect_module=connect_module,
enable_attribute_commenter=enable_attribute_commenter,
enable_transaction_spans=enable_transaction_spans,
)
db_integration.get_connection_attributes(connection)
return get_traced_connection_proxy(connection, db_integration)
Expand Down Expand Up @@ -418,6 +427,7 @@ def __init__(
commenter_options: dict[str, Any] | None = None,
connect_module: Callable[..., Any] | None = None,
enable_attribute_commenter: bool = False,
enable_transaction_spans: bool = True,
):
if connection_attributes is None:
self.connection_attributes = {
Expand All @@ -440,6 +450,7 @@ def __init__(
self.enable_commenter = enable_commenter
self.commenter_options = commenter_options
self.enable_attribute_commenter = enable_attribute_commenter
self.enable_transaction_spans = enable_transaction_spans
self.database_system = database_system
self.connection_props: dict[str, Any] = {}
self.span_attributes: dict[str, Any] = {}
Expand Down Expand Up @@ -555,6 +566,17 @@ def get_connection_attributes(self, connection: object) -> None:
if port is not None:
self.span_attributes[SpanAttributes.NET_PEER_PORT] = port

def populate_common_span_attributes(self, span: trace_api.Span) -> None:
"""Populate span with common database connection attributes."""
if not span.is_recording():
return

span.set_attribute(SpanAttributes.DB_SYSTEM, self.database_system)
span.set_attribute(SpanAttributes.DB_NAME, self.database)

for attribute_key, attribute_value in self.span_attributes.items():
span.set_attribute(attribute_key, attribute_value)


# pylint: disable=abstract-method
class TracedConnectionProxy(wrapt.ObjectProxy, Generic[ConnectionT]):
Expand All @@ -563,23 +585,54 @@ def __init__(
self,
connection: ConnectionT,
db_api_integration: DatabaseApiIntegration | None = None,
wrap_cursors: bool = True,
):
wrapt.ObjectProxy.__init__(self, connection)
self._self_db_api_integration = db_api_integration
self._self_wrap_cursors = wrap_cursors

def __getattribute__(self, name: str):
if object.__getattribute__(self, name):
# Try to get the attribute from the proxy first
try:
return object.__getattribute__(self, name)

return object.__getattribute__(
object.__getattribute__(self, "_connection"), name
)
except AttributeError:
# If not found on proxy, try the wrapped connection
return object.__getattribute__(
object.__getattribute__(self, "__wrapped__"), name
)

def cursor(self, *args: Any, **kwargs: Any):
return get_traced_cursor_proxy(
self.__wrapped__.cursor(*args, **kwargs),
self._self_db_api_integration,
)
cursor = self.__wrapped__.cursor(*args, **kwargs)

# For databases like psycopg/psycopg2 that use cursor_factory,
# cursor tracing is already handled by the factory, so skip wrapping
if not self._self_wrap_cursors:
return cursor

# For standard dbapi connections, wrap the cursor
return get_traced_cursor_proxy(cursor, self._self_db_api_integration)

def _traced_tx_operation(
self, operation_name: str, operation_method: Callable[[], None]
) -> None:
"""Execute a traced transaction operation (commit, rollback)."""
if not is_instrumentation_enabled():
return operation_method()

if not self._self_db_api_integration.enable_transaction_spans:
return operation_method()

with self._self_db_api_integration._tracer.start_as_current_span(
operation_name, kind=trace_api.SpanKind.CLIENT
) as span:
self._self_db_api_integration.populate_common_span_attributes(span)
return operation_method()

def commit(self):
return self._traced_tx_operation("COMMIT", self.__wrapped__.commit)

def rollback(self):
return self._traced_tx_operation("ROLLBACK", self.__wrapped__.rollback)

def __enter__(self):
self.__wrapped__.__enter__()
Expand All @@ -589,13 +642,78 @@ def __exit__(self, *args: Any, **kwargs: Any):
self.__wrapped__.__exit__(*args, **kwargs)


class AsyncTracedConnectionProxy(TracedConnectionProxy[ConnectionT]):
async def _traced_tx_operation_async(
self, operation_name: str, operation_method: Callable[[], Awaitable[None]]
) -> None:
"""Execute a traced async transaction operation (commit, rollback)."""
if not is_instrumentation_enabled():
return await operation_method()

if not self._self_db_api_integration.enable_transaction_spans:
return await operation_method()

with self._self_db_api_integration._tracer.start_as_current_span(
operation_name, kind=trace_api.SpanKind.CLIENT
) as span:
self._self_db_api_integration.populate_common_span_attributes(span)
return await operation_method()

async def commit(self):
"""Async commit for async connections (e.g., psycopg.AsyncConnection)."""
return await self._traced_tx_operation_async("COMMIT", self.__wrapped__.commit)

async def rollback(self):
"""Async rollback for async connections (e.g., psycopg.AsyncConnection)."""
return await self._traced_tx_operation_async("ROLLBACK", self.__wrapped__.rollback)

# Async context manager support
async def __aenter__(self):
if hasattr(self.__wrapped__, "__aenter__"):
await self.__wrapped__.__aenter__()
return self

async def __aexit__(self, *args: Any, **kwargs: Any):
if hasattr(self.__wrapped__, "__aexit__"):
return await self.__wrapped__.__aexit__(*args, **kwargs)


def get_traced_connection_proxy(
connection: ConnectionT,
db_api_integration: DatabaseApiIntegration | None,
wrap_cursors: bool = True,
*args: Any,
**kwargs: Any,
) -> TracedConnectionProxy[ConnectionT]:
return TracedConnectionProxy(connection, db_api_integration)
"""Get a traced connection proxy for sync connections.

Args:
connection: The database connection to wrap.
db_api_integration: The database API integration instance.
wrap_cursors: Whether to wrap cursors returned by connection.cursor().
Set to False for databases like psycopg/psycopg2 that handle cursor
tracing via cursor_factory. Defaults to True.
"""
return TracedConnectionProxy(connection, db_api_integration, wrap_cursors)


def get_traced_async_connection_proxy(
connection: ConnectionT,
db_api_integration: DatabaseApiIntegration | None,
wrap_cursors: bool = True,
*args: Any,
**kwargs: Any,
) -> AsyncTracedConnectionProxy[ConnectionT]:
"""Get a traced connection proxy for async connections.

Args:
connection: The async database connection to wrap.
db_api_integration: The database API integration instance.
wrap_cursors: Whether to wrap cursors returned by connection.cursor().
Set to False for databases like psycopg/psycopg2 that handle cursor
tracing via cursor_factory. Defaults to True.
"""
return AsyncTracedConnectionProxy(connection, db_api_integration, wrap_cursors)


class CursorTracer(Generic[CursorT]):
Expand Down Expand Up @@ -666,21 +784,12 @@ def _populate_span(
):
if not span.is_recording():
return

self._db_api_integration.populate_common_span_attributes(span)

statement = self.get_statement(cursor, args)
span.set_attribute(
SpanAttributes.DB_SYSTEM, self._db_api_integration.database_system
)
span.set_attribute(
SpanAttributes.DB_NAME, self._db_api_integration.database
)
span.set_attribute(SpanAttributes.DB_STATEMENT, statement)

for (
attribute_key,
attribute_value,
) in self._db_api_integration.span_attributes.items():
span.set_attribute(attribute_key, attribute_value)

if self._db_api_integration.capture_parameters and len(args) > 1:
span.set_attribute("db.statement.parameters", str(args[1]))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,62 @@ def test_callproc(self):
"Test stored procedure",
)

def test_commit(self):
db_integration = dbapi.DatabaseApiIntegration(
"instrumenting_module_test_name", "testcomponent"
)
mock_connection = db_integration.wrapped_connection(
mock_connect, {}, {}
)
mock_connection.commit()
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "COMMIT")

def test_rollback(self):
db_integration = dbapi.DatabaseApiIntegration(
"instrumenting_module_test_name", "testcomponent"
)
mock_connection = db_integration.wrapped_connection(
mock_connect, {}, {}
)
mock_connection.rollback()
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "ROLLBACK")

def test_commit_with_suppress_instrumentation(self):
"""Test that commit doesn't create a span when instrumentation is suppressed"""
db_integration = dbapi.DatabaseApiIntegration(
"instrumenting_module_test_name",
"testcomponent",
)
mock_connection = db_integration.wrapped_connection(
mock_connect, {}, {}
)
with suppress_instrumentation():
mock_connection.commit()

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)

def test_rollback_with_suppress_instrumentation(self):
"""Test that rollback doesn't create a span when instrumentation is suppressed"""
db_integration = dbapi.DatabaseApiIntegration(
"instrumenting_module_test_name",
"testcomponent",
)
mock_connection = db_integration.wrapped_connection(
mock_connect, {}, {}
)
with suppress_instrumentation():
mock_connection.rollback()

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)

@mock.patch("opentelemetry.instrumentation.dbapi")
def test_wrap_connect(self, mock_dbapi):
dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-")
Expand Down Expand Up @@ -1209,6 +1265,14 @@ def __init__(self, database, server_port, server_host, user):
def cursor(self):
return MockCursor()

# pylint: disable=no-self-use
def commit(self):
pass

# pylint: disable=no-self-use
def rollback(self):
pass


class MockCursor:
def __init__(self) -> None:
Expand Down
Loading