From b78e04ab1242357ff00a29c8444e27cf81896623 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Thu, 26 Jun 2025 12:33:22 +0200 Subject: [PATCH 1/6] RUBY-3612 OpenTelemetry support --- gemfiles/standard.rb | 1 + lib/mongo.rb | 1 + lib/mongo/client.rb | 24 ++++ lib/mongo/collection.rb | 31 +++-- lib/mongo/collection/view.rb | 2 + lib/mongo/collection/view/iterable.rb | 26 ++-- lib/mongo/cursor.rb | 11 +- lib/mongo/operation/context.rb | 1 + lib/mongo/operation/insert/op_msg.rb | 10 +- lib/mongo/operation/shared/executable.rb | 12 +- lib/mongo/operation/shared/specifiable.rb | 4 + lib/mongo/server/connection.rb | 11 ++ lib/mongo/tracing.rb | 16 +++ lib/mongo/tracing/open_telemetry.rb | 26 ++++ .../tracing/open_telemetry/command_tracer.rb | 94 ++++++++++++++ .../open_telemetry/operation_tracer.rb | 119 ++++++++++++++++++ lib/mongo/tracing/open_telemetry/tracer.rb | 88 +++++++++++++ spec/lite_spec_helper.rb | 1 + spec/mongo/tracer/open_telemetry_spec.rb | 26 ++++ spec/runners/unified/test.rb | 13 ++ .../data/crud_unified/find-comment.yml | 10 ++ spec/support/tracing.rb | 64 ++++++++++ 22 files changed, 557 insertions(+), 34 deletions(-) create mode 100644 lib/mongo/tracing.rb create mode 100644 lib/mongo/tracing/open_telemetry.rb create mode 100644 lib/mongo/tracing/open_telemetry/command_tracer.rb create mode 100644 lib/mongo/tracing/open_telemetry/operation_tracer.rb create mode 100644 lib/mongo/tracing/open_telemetry/tracer.rb create mode 100644 spec/mongo/tracer/open_telemetry_spec.rb create mode 100644 spec/support/tracing.rb diff --git a/gemfiles/standard.rb b/gemfiles/standard.rb index c8065b3a1b..0d534f78d8 100644 --- a/gemfiles/standard.rb +++ b/gemfiles/standard.rb @@ -4,6 +4,7 @@ def standard_dependencies gem 'yard', '>= 0.9.35' gem 'ffi' + gem 'opentelemetry-sdk' group :development, :testing do gem 'jruby-openssl', platforms: :jruby diff --git a/lib/mongo.rb b/lib/mongo.rb index c866ad1a9e..ba9aa817e6 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -74,6 +74,7 @@ require 'mongo/socket' require 'mongo/srv' require 'mongo/timeout' +require 'mongo/tracing' require 'mongo/uri' require 'mongo/version' require 'mongo/write_concern' diff --git a/lib/mongo/client.rb b/lib/mongo/client.rb index 70f5768628..dcb10660aa 100644 --- a/lib/mongo/client.rb +++ b/lib/mongo/client.rb @@ -112,6 +112,7 @@ class Client :ssl_verify_hostname, :ssl_verify_ocsp_endpoint, :timeout_ms, + :tracing, :truncate_logs, :user, :wait_queue_timeout, @@ -437,6 +438,20 @@ def hash # See Ruby's Zlib module for valid levels. # @option options [ Hash ] :resolv_options For internal driver use only. # Options to pass through to Resolv::DNS constructor for SRV lookups. + # @option options [ Hash ] :tracing OpenTelemetry tracing options. + # - :enabled => Boolean, whether to enable OpenTelemetry tracing. The default + # value is nil that means that the configuration will be taken from the + # OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED environment variable. + # - :tracer => OpenTelemetry::Trace::Tracer, the tracer to use for + # tracing. Must be an implementation of OpenTelemetry::Trace::Tracer + # interface. + # - :query_text_max_length => Integer, the maximum length of the query text + # to be included in the span attributes. If the query text exceeds this + # length, it will be truncated. Value 0 means no query text + # will be included in the span attributes. The default value is nil that + # means that the configuration will be taken from the + # OTEL_RUBY_INSTRUMENTATION_MONGODB_QUERY_TEXT_MAX_LENGTH environment + # variable. # @option options [ Hash ] :auto_encryption_options Auto-encryption related # options. # - :key_vault_client => Client | nil, a client connected to the MongoDB @@ -1195,6 +1210,15 @@ def timeout_sec end end + def tracer + tracing_opts = @options[:tracing] || {} + @tracer ||= Tracing.create_tracer( + enabled: tracing_opts[:enabled], + query_text_max_length: tracing_opts[:query_text_max_length], + otel_tracer: tracing_opts[:tracer], + ) + end + private # Attempts to parse the given list of addresses, using the provided options. diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index b9cbefee0c..a2e0602cba 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -57,6 +57,8 @@ class Collection # Delegate to the cluster for the next primary. def_delegators :cluster, :next_primary + def_delegators :client, :tracer + # Options that can be updated on a new Collection instance via the #with method. # # @since 2.1.0 @@ -865,19 +867,22 @@ def insert_one(document, opts = {}) session: session, operation_timeouts: operation_timeouts(opts) ) - write_with_retry(write_concern, context: context) do |connection, txn_num, context| - Operation::Insert.new( - :documents => [ document ], - :db_name => database.name, - :coll_name => name, - :write_concern => write_concern, - :bypass_document_validation => !!opts[:bypass_document_validation], - :options => opts, - :id_generator => client.options[:id_generator], - :session => session, - :txn_num => txn_num, - :comment => opts[:comment] - ).execute_with_connection(connection, context: context) + operation = Operation::Insert.new( + :documents => [ document ], + :db_name => database.name, + :coll_name => name, + :write_concern => write_concern, + :bypass_document_validation => !!opts[:bypass_document_validation], + :options => opts, + :id_generator => client.options[:id_generator], + :session => session, + :comment => opts[:comment] + ) + tracer.trace_operation('insert_one', operation, context) do + write_with_retry(write_concern, context: context) do |connection, txn_num, context| + operation.txn_num = txn_num + operation.execute_with_connection(connection, context: context) + end end end end diff --git a/lib/mongo/collection/view.rb b/lib/mongo/collection/view.rb index fc33d85b75..e7c221f0a8 100644 --- a/lib/mongo/collection/view.rb +++ b/lib/mongo/collection/view.rb @@ -72,6 +72,8 @@ class View # Delegate to the cluster for the next primary. def_delegators :cluster, :next_primary + def_delegators :client, :tracer + alias :selector :filter # @return [ Integer | nil | The timeout_ms value that was passed as an diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index 99133c5e9f..54dcd6bb2b 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -88,19 +88,21 @@ def select_cursor(session) operation_timeouts: operation_timeouts, view: self ) - - if respond_to?(:write?, true) && write? - server = server_selector.select_server(cluster, nil, session, write_aggregation: true) - result = send_initial_query(server, context) - - if use_query_cache? - CachingCursor.new(view, result, server, session: session, context: context) + op = initial_query_op(session) + tracer.trace_operation('get_more', op, context) do + if respond_to?(:write?, true) && write? + server = server_selector.select_server(cluster, nil, session, write_aggregation: true) + result = send_initial_query(server, context) + + if use_query_cache? + CachingCursor.new(view, result, server, session: session, context: context) + else + Cursor.new(view, result, server, session: session, context: context) + end else - Cursor.new(view, result, server, session: session, context: context) - end - else - read_with_retry_cursor(session, server_selector, view, context: context) do |server| - send_initial_query(server, context) + read_with_retry_cursor(session, server_selector, view, context: context) do |server| + send_initial_query(server, context) + end end end end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 0e0927f02c..59a438ae7b 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -39,6 +39,7 @@ class Cursor def_delegators :@view, :collection def_delegators :collection, :client, :database def_delegators :@server, :cluster + def_delegators :client, :tracer # @return [ Collection::View ] view The collection view. attr_reader :view @@ -514,10 +515,12 @@ def unregister def execute_operation(op, context: nil) op_context = context || possibly_refreshed_context - if @connection.nil? - op.execute(@server, context: op_context) - else - op.execute_with_connection(@connection, context: op_context) + tracer.trace_operation('find', op, op_context) do + if @connection.nil? + op.execute(@server, context: op_context) + else + op.execute_with_connection(@connection, context: op_context) + end end end diff --git a/lib/mongo/operation/context.rb b/lib/mongo/operation/context.rb index 03d6e0957d..c2be3c0149 100644 --- a/lib/mongo/operation/context.rb +++ b/lib/mongo/operation/context.rb @@ -69,6 +69,7 @@ def initialize( attr_reader :session attr_reader :view attr_reader :options + attr_accessor :tracer # Returns a new Operation::Context with the deadline refreshed # and relative to the current moment. diff --git a/lib/mongo/operation/insert/op_msg.rb b/lib/mongo/operation/insert/op_msg.rb index 39b299ef76..45789ae183 100644 --- a/lib/mongo/operation/insert/op_msg.rb +++ b/lib/mongo/operation/insert/op_msg.rb @@ -34,8 +34,14 @@ class OpMsg < OpMsgBase private def get_result(connection, context, options = {}) - # This is a Mongo::Operation::Insert::Result - Result.new(*dispatch_message(connection, context), @ids, context: context) + message = build_message(connection, context) + if (tracer = context.tracer) + tracer.trace_command(message, context, connection) do + Result.new(*dispatch_message(message, connection, context), @ids, context: context) + end + else + Result.new(*dispatch_message(message, connection, context), @ids, context: context) + end end def selector(connection) diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 041e4d1e5b..53f35249e3 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -104,12 +104,18 @@ def result_class end def get_result(connection, context, options = {}) - result_class.new(*dispatch_message(connection, context, options), context: context, connection: connection) + message = build_message(connection, context) + if (tracer = context.tracer) + tracer.trace_command(message, context, connection) do + result_class.new(*dispatch_message(message, connection, context, options), context: context, connection: connection) + end + else + result_class.new(*dispatch_message(message, connection, context, options), context: context, connection: connection) + end end # Returns a Protocol::Message or nil as reply. - def dispatch_message(connection, context, options = {}) - message = build_message(connection, context) + def dispatch_message(message, connection, context, options = {}) message = message.maybe_encrypt(connection, context) reply = connection.dispatch([ message ], context, options) [reply, connection.description, connection.global_id] diff --git a/lib/mongo/operation/shared/specifiable.rb b/lib/mongo/operation/shared/specifiable.rb index afc799f46e..aa3125cb2a 100644 --- a/lib/mongo/operation/shared/specifiable.rb +++ b/lib/mongo/operation/shared/specifiable.rb @@ -526,6 +526,10 @@ def txn_num @spec[:txn_num] end + def txn_num=(num) + @spec[:txn_num] = num + end + # The command. # # @return [ Hash ] The command. diff --git a/lib/mongo/server/connection.rb b/lib/mongo/server/connection.rb index f9874764cf..14b8f7b16d 100644 --- a/lib/mongo/server/connection.rb +++ b/lib/mongo/server/connection.rb @@ -388,6 +388,17 @@ def record_checkin! self end + def transport + return nil if @socket.nil? + + case @socket + when Mongo::Socket::Unix + :unix + else + :tcp + end + end + private def deliver(message, client, options = {}) diff --git a/lib/mongo/tracing.rb b/lib/mongo/tracing.rb new file mode 100644 index 0000000000..25a0e7eeac --- /dev/null +++ b/lib/mongo/tracing.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Mongo + module Tracing + def create_tracer(enabled: nil, query_text_max_length: nil, otel_tracer: nil) + OpenTelemetry::Tracer.new( + enabled: enabled, + query_text_max_length: query_text_max_length, + otel_tracer: otel_tracer, + ) + end + module_function :create_tracer + end +end + +require 'mongo/tracing/open_telemetry' diff --git a/lib/mongo/tracing/open_telemetry.rb b/lib/mongo/tracing/open_telemetry.rb new file mode 100644 index 0000000000..702d2885c4 --- /dev/null +++ b/lib/mongo/tracing/open_telemetry.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +# Copyright (C) 2025-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Tracing + module OpenTelemetry + end + end +end + +require 'mongo/tracing/open_telemetry/command_tracer' +require 'mongo/tracing/open_telemetry/operation_tracer' +require 'mongo/tracing/open_telemetry/tracer' diff --git a/lib/mongo/tracing/open_telemetry/command_tracer.rb b/lib/mongo/tracing/open_telemetry/command_tracer.rb new file mode 100644 index 0000000000..76113783c8 --- /dev/null +++ b/lib/mongo/tracing/open_telemetry/command_tracer.rb @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +# Copyright (C) 2025-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Tracing + module OpenTelemetry + class CommandTracer + def initialize(otel_tracer, query_text_max_length: 0) + @otel_tracer = otel_tracer + @query_text_max_length = query_text_max_length + end + + def trace_command(message, _operation_context, connection) + @otel_tracer.in_span( + command_span_name(message), + attributes: span_attributes(message, connection), + kind: :client + ) do |span, _context| + yield.tap do |result| + if result.respond_to?(:cursor_id) && result.cursor_id.positive? + span.set_attribute('db.mongodb.cursor_id', result.cursor_id) + end + end + end + end + + private + + def span_attributes(message, connection) + { + 'db.system' => 'mongodb', + 'db.namespace' => message.documents.first['$db'], + 'db.collection.name' => collection_name(message), + 'db.operation.name' => message.documents.first.keys.first, + 'server.port' => connection.address.port, + 'server.address' => connection.address.host, + 'network.transport' => connection.transport.to_s, + 'db.mongodb.server_connection_id' => connection.server.description.server_connection_id, + 'db.mongodb.driver_connection_id' => connection.id, + 'db.query.text' => query_text(message) + }.compact + end + + def command_span_name(message) + message.documents.first.keys.first + end + + def collection_name(message) + case message.documents.first.keys.first + when 'getMore' + message.documents.first['collection'] + else + message.documents.first.values.first + end + end + + def query_text? + @query_text_max_length.positive? + end + + EXCLUDED_KEYS = %w[lsid $db $clusterTime signature].freeze + ELLIPSES = '...' + + def query_text(message) + return unless query_text? + + text = message + .documents + .first + .reject { |key, _| EXCLUDED_KEYS.include?(key) } + .to_json + if text.length > @query_text_max_length + "#{text[0...@query_text_max_length]}#{ELLIPSES}" + else + text + end + end + end + end + end +end diff --git a/lib/mongo/tracing/open_telemetry/operation_tracer.rb b/lib/mongo/tracing/open_telemetry/operation_tracer.rb new file mode 100644 index 0000000000..9e0ddae80f --- /dev/null +++ b/lib/mongo/tracing/open_telemetry/operation_tracer.rb @@ -0,0 +1,119 @@ +# frozen_string_literal: true + +# Copyright (C) 2025-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Tracing + module OpenTelemetry + # OperationTracer is responsible for tracing MongoDB operations using OpenTelemetry. + # + # It provides methods to trace driven operations. + # @api private + class OperationTracer + def initialize(otel_tracer, parent_tracer) + @otel_tracer = otel_tracer + @parent_tracer = parent_tracer + end + + def trace_operation(name, operation, operation_context) + parent_context = parent_context_for(operation_context, operation.cursor_id) + operation_context.tracer = @parent_tracer + span = @otel_tracer.start_span( + operation_span_name(name, operation), + attributes: span_attributes(name, operation), + with_parent: parent_context, + kind: :client + ) + ::OpenTelemetry::Trace.with_span(span) do |_s, c| + yield.tap do |result| + process_cursor_context(result, operation.cursor_id, c) + end + end + rescue Exception => e + span&.record_exception(e) + span&.status = ::OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") + raise e + ensure + span&.finish + operation_context.tracer = nil + end + + private + + def span_attributes(name, operation) + { + 'db.system' => 'mongodb', + 'db.namespace' => operation.db_name.to_s, + 'db.collection.name' => operation.coll_name.to_s, + 'db.operation.name' => name, + 'db.operation.summary' => operation_span_name(name, operation), + 'db.cursor.id' => operation.cursor_id, + }.compact + end + + def parent_context_for(operation_context, cursor_id) + if (key = transaction_map_key(operation_context.session)) + transaction_context_map[key] + elsif cursor_id + cursor_context_map[cursor_id] + end + end + + # This map is used to store OpenTelemetry context for cursor_id. + # This allows to group all operations related to a cursor under the same context. + # + # # @return [Hash] a map of cursor_id to OpenTelemetry context. + def cursor_context_map + @cursor_context_map ||= {} + end + + def process_cursor_context(result, cursor_id, context) + return unless result.is_a?(Cursor) + + if result.id.zero? + # If the cursor is closed, remove it from the context map. + cursor_context_map.delete(cursor_id) + elsif result.id && cursor_id.nil? + # New cursor created, store its context. + cursor_context_map[result.id] = context + end + end + + # This map is used to store OpenTelemetry context for transaction. + # This allows to group all operations related to a transaction under the same context. + # + # @return [Hash] a map of transaction_id to OpenTelemetry context. + def transaction_context_map + @transaction_context_map ||= {} + end + + # @param session [Mongo::Session] the session for which to get the transaction map key. + def transaction_map_key(session) + return if session.nil? || session.implicit? || !session.in_transaction? + + "#{session.id}-#{session.txn_num}" + end + + def operation_span_name(name, operation) + if operation.coll_name + "#{name} #{operation.db_name}.#{operation.coll_name}" + else + "#{operation.db_name}.#{name}" + end + end + end + end + end +end diff --git a/lib/mongo/tracing/open_telemetry/tracer.rb b/lib/mongo/tracing/open_telemetry/tracer.rb new file mode 100644 index 0000000000..de9f5b07dd --- /dev/null +++ b/lib/mongo/tracing/open_telemetry/tracer.rb @@ -0,0 +1,88 @@ +# frozen_string_literal: true + +# Copyright (C) 2025-present MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Tracing + module OpenTelemetry + class Tracer + # @return [ OpenTelemetry::Trace::Tracer ] the OpenTelemetry tracer implementation + # used to create spans for MongoDB operations and commands. + # + # @api private + attr_reader :otel_tracer + + # Initializes a new OpenTelemetry tracer. + # + # @param enabled [ Boolean | nil ] whether OpenTelemetry is enabled or not. + # If nil, it will check the environment variable + # OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED. + # @param otel_tracer [ OpenTelemetry::Trace::Tracer | nil ] the OpenTelemetry tracer + # implementation to use. If nil, it will use the default tracer from + # OpenTelemetry's tracer provider. + def initialize(enabled: nil, query_text_max_length: nil, otel_tracer: nil) + @enabled = if enabled.nil? + %w[true 1 yes].include?(ENV['OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED']&.downcase) + else + enabled + end + @query_text_max_length = if query_text_max_length.nil? + ENV['OTEL_RUBY_INSTRUMENTATION_MONGODB_QUERY_TEXT_MAX_LENGTH'].to_i + else + query_text_max_length + end + @otel_tracer = otel_tracer || initialize_tracer + @operation_tracer = OperationTracer.new(@otel_tracer, self) + @command_tracer = CommandTracer.new(@otel_tracer, query_text_max_length: @query_text_max_length) + end + + # Whether OpenTelemetry is enabled or not. + # + # # @return [Boolean] true if OpenTelemetry is enabled, false otherwise. + def enabled? + @enabled + end + + def trace_operation(name, operation, operation_context, &block) + return yield unless enabled? + + operation_context.tracer = self + @operation_tracer.trace_operation(name, operation, operation_context, &block) + end + + def trace_command(message, operation_context, connection, &block) + return yield unless enabled? + + @command_tracer.trace_command(message, operation_context, connection, &block) + end + + private + + def initialize_tracer + if enabled? + # Obtain the proper tracer from OpenTelemetry's tracer provider. + ::OpenTelemetry.tracer_provider.tracer( + 'mongo-ruby-driver', + Mongo::VERSION + ) + else + # No-op tracer when OpenTelemetry is not enabled. + ::OpenTelemetry::Trace::Tracer.new + end + end + end + end + end +end diff --git a/spec/lite_spec_helper.rb b/spec/lite_spec_helper.rb index 486d9c4235..ddd8b25efd 100644 --- a/spec/lite_spec_helper.rb +++ b/spec/lite_spec_helper.rb @@ -94,6 +94,7 @@ module Mrss require 'support/json_ext_formatter' require 'support/sdam_formatter_integration' require 'support/background_thread_registry' +require 'support/tracing' require 'mrss/session_registry' require 'support/local_resource_registry' diff --git a/spec/mongo/tracer/open_telemetry_spec.rb b/spec/mongo/tracer/open_telemetry_spec.rb new file mode 100644 index 0000000000..a7d115a9a0 --- /dev/null +++ b/spec/mongo/tracer/open_telemetry_spec.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Mongo::Tracer::OpenTelemetry do + describe '#initialize' do + it 'disables OpenTelemetry by default' do + tracer = described_class.new + expect(tracer.enabled?).to be false + end + + it 'disables OpenTelemetry when the environment variable is not set' do + allow(ENV).to receive(:[]).with('OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED').and_return(nil) + tracer = described_class.new + expect(tracer.enabled?).to be false + end + + %w[ true 1 yes ].each do |value| + it "enables OpenTelemetry when the environment variable is set to '#{value}'" do + allow(ENV).to receive(:[]).with('OTEL_RUBY_INSTRUMENTATION_MONGODB_ENABLED').and_return(value) + tracer = described_class.new + expect(tracer.enabled?).to be true + end + end + end +end diff --git a/spec/runners/unified/test.rb b/spec/runners/unified/test.rb index 32b0ba0a82..6f3bf61d62 100644 --- a/spec/runners/unified/test.rb +++ b/spec/runners/unified/test.rb @@ -37,6 +37,7 @@ def initialize(spec, **opts) @description = @test_spec.use('description') @outcome = @test_spec.use('outcome') @expected_events = @test_spec.use('expectEvents') + @expected_spans = @test_spec.use('expectSpans') @skip_reason = @test_spec.use('skipReason') if req = @test_spec.use('runOnRequirements') @reqs = req.map { |r| Mongo::CRUD::Requirement.new(r) } @@ -195,6 +196,14 @@ def generate_entities(es) end end + observe_spans = spec.use('observeSpans') + if observe_spans + opts[:tracing] = { + enabled: true, + tracer: tracer, + } + end + create_client(**opts).tap do |client| @observe_sensitive[id] = spec.use('observeSensitiveCommands') @subscribers[client] ||= subscriber @@ -602,5 +611,9 @@ def bson_error BSON::String.const_get(:IllegalKey) : BSON::Error end + + def tracer + @tracer ||= ::Tracing::Tracer.new + end end end diff --git a/spec/spec_tests/data/crud_unified/find-comment.yml b/spec/spec_tests/data/crud_unified/find-comment.yml index 905241ad0e..70cc968f7f 100644 --- a/spec/spec_tests/data/crud_unified/find-comment.yml +++ b/spec/spec_tests/data/crud_unified/find-comment.yml @@ -6,6 +6,7 @@ createEntities: - client: id: &client0 client0 observeEvents: [ commandStartedEvent ] + observeSpans: true - database: id: &database0 database0 client: *client0 @@ -47,6 +48,15 @@ tests: find: *collection0Name filter: *filter comment: "comment" + expectSpans: + - client: *client0 + spans: + - name: "find" + attributes: + mongodb.collection: *collection0Name + mongodb.database: *database0Name + mongodb.filter: *filter + mongodb.comment: "comment" - description: "find with document comment" runOnRequirements: diff --git a/spec/support/tracing.rb b/spec/support/tracing.rb new file mode 100644 index 0000000000..0eefb1ad46 --- /dev/null +++ b/spec/support/tracing.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +module Tracing + class Span + + attr_reader :name, :attributes, :events, :with_parent, :kind, :finished + + attr_accessor :status + + def initialize(name, attributes = {}, with_parent: nil, kind: :internal) + @name = name + @attributes = attributes + @events = [] + @with_parent = with_parent + @kind = kind + @finished = false + end + + def set_attribute(key, value) + @attributes[key] = value + end + + def add_event(name, attributes: {}) + event_attributes = { 'event.name' => name } + event_attributes.merge!(attributes) unless attributes.nil? + @events << event_attributes + end + + def record_exception(exception, attributes: nil) + event_attributes = { + 'exception.type' => exception.class.to_s, + 'exception.message' => exception.message, + 'exception.stacktrace' => exception.full_message(highlight: false, order: :top).encode('UTF-8', invalid: :replace, undef: :replace, replace: '�') + } + event_attributes.merge!(attributes) unless attributes.nil? + add_event('exception', attributes: event_attributes) + end + + def finish + @finished = true + end + end + + class Tracer + + attr_reader :spans + + def initialize + @spans = [] + end + def in_span(name, attributes: {}, kind: :internal) + span = Span.new(name, attributes, kind: kind) + @spans << span + context = Object.new + yield(span, context) if block_given? + end + + def start_span(name, attributes: {}, with_parent: nil, kind: :internal) + Span.new(name, attributes, with_parent: with_parent, kind: kind).tap do |span| + @spans << span + end + end + end +end From 87e8ac38aef1204625af30ab09080e9ddec6ea32 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Tue, 12 Aug 2025 14:56:49 +0200 Subject: [PATCH 2/6] wip --- lib/mongo/collection/view/aggregation.rb | 5 +- .../collection/view/aggregation/behavior.rb | 2 +- lib/mongo/collection/view/iterable.rb | 10 ++- lib/mongo/cursor.rb | 6 ++ lib/mongo/operation/shared/specifiable.rb | 2 +- .../open_telemetry/operation_tracer.rb | 69 ++++--------------- spec/runners/unified/test.rb | 2 +- 7 files changed, 37 insertions(+), 59 deletions(-) diff --git a/lib/mongo/collection/view/aggregation.rb b/lib/mongo/collection/view/aggregation.rb index f80a4f491b..5c263faeda 100644 --- a/lib/mongo/collection/view/aggregation.rb +++ b/lib/mongo/collection/view/aggregation.rb @@ -25,11 +25,14 @@ class View # # @since 2.0.0 class Aggregation + extend Forwardable include Behavior # @return [ Array ] pipeline The aggregation pipeline. attr_reader :pipeline + def_delegators :view, :tracer + # Initialize the aggregation for the provided collection view, pipeline # and options. # @@ -80,7 +83,7 @@ def new(options) Aggregation.new(view, pipeline, options) end - def initial_query_op(session, read_preference) + def initial_query_op(session, read_preference = nil) Operation::Aggregate.new(aggregate_spec(session, read_preference)) end diff --git a/lib/mongo/collection/view/aggregation/behavior.rb b/lib/mongo/collection/view/aggregation/behavior.rb index 349b82e4bc..db881106a7 100644 --- a/lib/mongo/collection/view/aggregation/behavior.rb +++ b/lib/mongo/collection/view/aggregation/behavior.rb @@ -88,7 +88,7 @@ def server_selector @view.send(:server_selector) end - def aggregate_spec(session, read_preference) + def aggregate_spec(session, read_preference = nil) Builder::Aggregation.new( pipeline, view, diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index 54dcd6bb2b..b19d5deabc 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -89,7 +89,15 @@ def select_cursor(session) view: self ) op = initial_query_op(session) - tracer.trace_operation('get_more', op, context) do + op_name = case op + when Mongo::Operation::Find + 'find' + when Mongo::Operation::Aggregate + 'aggregate' + else + op.class.name.split('::').last.downcase + end + tracer.trace_operation(op_name, op, context) do if respond_to?(:write?, true) && write? server = server_selector.select_server(cluster, nil, session, write_aggregation: true) result = send_initial_query(server, context) diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 59a438ae7b..b14c713c12 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -514,6 +514,12 @@ def unregister end def execute_operation(op, context: nil) + op_name = case op + when Mongo::Operation::GetMore + 'get_more' + when Mongo::Operation::Close + 'close' + end op_context = context || possibly_refreshed_context tracer.trace_operation('find', op, op_context) do if @connection.nil? diff --git a/lib/mongo/operation/shared/specifiable.rb b/lib/mongo/operation/shared/specifiable.rb index aa3125cb2a..dd2e37369e 100644 --- a/lib/mongo/operation/shared/specifiable.rb +++ b/lib/mongo/operation/shared/specifiable.rb @@ -233,7 +233,7 @@ def documents # # @since 2.0.0 def coll_name - spec.fetch(COLL_NAME) + spec[COLL_NAME] end # The id of the cursor created on the server. diff --git a/lib/mongo/tracing/open_telemetry/operation_tracer.rb b/lib/mongo/tracing/open_telemetry/operation_tracer.rb index 9e0ddae80f..c760daaa3f 100644 --- a/lib/mongo/tracing/open_telemetry/operation_tracer.rb +++ b/lib/mongo/tracing/open_telemetry/operation_tracer.rb @@ -28,30 +28,29 @@ def initialize(otel_tracer, parent_tracer) end def trace_operation(name, operation, operation_context) - parent_context = parent_context_for(operation_context, operation.cursor_id) - operation_context.tracer = @parent_tracer - span = @otel_tracer.start_span( + @otel_tracer.in_span( operation_span_name(name, operation), attributes: span_attributes(name, operation), - with_parent: parent_context, kind: :client - ) - ::OpenTelemetry::Trace.with_span(span) do |_s, c| + ) do |span, _context| + operation_context.tracer = @parent_tracer yield.tap do |result| - process_cursor_context(result, operation.cursor_id, c) + if result.is_a?(Cursor) && result.id.positive? + span.set_attribute('db.mongodb.cursor_id', result.id) + end end end - rescue Exception => e - span&.record_exception(e) - span&.status = ::OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") - raise e ensure - span&.finish operation_context.tracer = nil end private + # Returns a hash of attributes for the OpenTelemetry span for the operation. + # + # @param name [String] The name of the operation. + # @param operation [Operation] The operation being traced. + # @return [Hash] A hash of attributes for the span. def span_attributes(name, operation) { 'db.system' => 'mongodb', @@ -63,49 +62,11 @@ def span_attributes(name, operation) }.compact end - def parent_context_for(operation_context, cursor_id) - if (key = transaction_map_key(operation_context.session)) - transaction_context_map[key] - elsif cursor_id - cursor_context_map[cursor_id] - end - end - - # This map is used to store OpenTelemetry context for cursor_id. - # This allows to group all operations related to a cursor under the same context. - # - # # @return [Hash] a map of cursor_id to OpenTelemetry context. - def cursor_context_map - @cursor_context_map ||= {} - end - - def process_cursor_context(result, cursor_id, context) - return unless result.is_a?(Cursor) - - if result.id.zero? - # If the cursor is closed, remove it from the context map. - cursor_context_map.delete(cursor_id) - elsif result.id && cursor_id.nil? - # New cursor created, store its context. - cursor_context_map[result.id] = context - end - end - - # This map is used to store OpenTelemetry context for transaction. - # This allows to group all operations related to a transaction under the same context. + # Returns the name of the span for the operation. # - # @return [Hash] a map of transaction_id to OpenTelemetry context. - def transaction_context_map - @transaction_context_map ||= {} - end - - # @param session [Mongo::Session] the session for which to get the transaction map key. - def transaction_map_key(session) - return if session.nil? || session.implicit? || !session.in_transaction? - - "#{session.id}-#{session.txn_num}" - end - + # @param name [String] The name of the operation. + # @param operation [Operation] The operation being traced. + # # @return [String] The name of the span. def operation_span_name(name, operation) if operation.coll_name "#{name} #{operation.db_name}.#{operation.coll_name}" diff --git a/spec/runners/unified/test.rb b/spec/runners/unified/test.rb index 6f3bf61d62..ffec2bb242 100644 --- a/spec/runners/unified/test.rb +++ b/spec/runners/unified/test.rb @@ -200,7 +200,7 @@ def generate_entities(es) if observe_spans opts[:tracing] = { enabled: true, - tracer: tracer, + # tracer: tracer, } end From d6f75090b4544437d7a19971dfd70629bfe74fc5 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 15 Aug 2025 15:18:56 +0200 Subject: [PATCH 3/6] With nesting --- lib/mongo/client.rb | 8 +- lib/mongo/cluster.rb | 5 +- lib/mongo/collection.rb | 2 +- lib/mongo/collection/view/iterable.rb | 10 +-- lib/mongo/cursor.rb | 17 +---- lib/mongo/operation/context.rb | 1 - lib/mongo/operation/shared/executable.rb | 6 +- lib/mongo/server.rb | 3 +- lib/mongo/server/connection.rb | 2 + .../tracing/open_telemetry/command_tracer.rb | 69 ++++++++++++++--- .../open_telemetry/operation_tracer.rb | 76 ++++++++++++------- lib/mongo/tracing/open_telemetry/tracer.rb | 35 ++++++++- 12 files changed, 156 insertions(+), 78 deletions(-) diff --git a/lib/mongo/client.rb b/lib/mongo/client.rb index dcb10660aa..904524c360 100644 --- a/lib/mongo/client.rb +++ b/lib/mongo/client.rb @@ -589,8 +589,12 @@ def initialize(addresses_or_uri, options = nil) @connect_lock = Mutex.new @connect_lock.synchronize do - @cluster = Cluster.new(addresses, @monitoring, - cluster_options.merge(srv_uri: srv_uri)) + @cluster = Cluster.new( + addresses, + @monitoring, + tracer, + cluster_options.merge(srv_uri: srv_uri) + ) end begin diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 46e9f556f1..30a7e56583 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -117,7 +117,7 @@ class Cluster # - *:deprecation_errors* -- boolean # # @since 2.0.0 - def initialize(seeds, monitoring, options = Options::Redacted.new) + def initialize(seeds, monitoring, tracer = nil, options = Options::Redacted.new) if seeds.nil? raise ArgumentError, 'Seeds cannot be nil' end @@ -136,6 +136,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) @update_lock = Mutex.new @servers = [] @monitoring = monitoring + @tracer = tracer @event_listeners = Event::Listeners.new @app_metadata = Server::AppMetadata.new(@options.merge(purpose: :application)) @monitor_app_metadata = Server::Monitor::AppMetadata.new(@options.merge(purpose: :monitor)) @@ -309,6 +310,8 @@ def self.create(client, monitoring: nil) # @return [ Monitoring ] monitoring The monitoring. attr_reader :monitoring + attr_reader :tracer + # @return [ Object ] The cluster topology. attr_reader :topology diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index a2e0602cba..dee8a85da6 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -878,7 +878,7 @@ def insert_one(document, opts = {}) :session => session, :comment => opts[:comment] ) - tracer.trace_operation('insert_one', operation, context) do + tracer.trace_operation(operation, context) do write_with_retry(write_concern, context: context) do |connection, txn_num, context| operation.txn_num = txn_num operation.execute_with_connection(connection, context: context) diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index b19d5deabc..0f88e8eec7 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -89,15 +89,7 @@ def select_cursor(session) view: self ) op = initial_query_op(session) - op_name = case op - when Mongo::Operation::Find - 'find' - when Mongo::Operation::Aggregate - 'aggregate' - else - op.class.name.split('::').last.downcase - end - tracer.trace_operation(op_name, op, context) do + tracer.trace_operation(op, context) do if respond_to?(:write?, true) && write? server = server_selector.select_server(cluster, nil, session, write_aggregation: true) result = send_initial_query(server, context) diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index b14c713c12..0e0927f02c 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -39,7 +39,6 @@ class Cursor def_delegators :@view, :collection def_delegators :collection, :client, :database def_delegators :@server, :cluster - def_delegators :client, :tracer # @return [ Collection::View ] view The collection view. attr_reader :view @@ -514,19 +513,11 @@ def unregister end def execute_operation(op, context: nil) - op_name = case op - when Mongo::Operation::GetMore - 'get_more' - when Mongo::Operation::Close - 'close' - end op_context = context || possibly_refreshed_context - tracer.trace_operation('find', op, op_context) do - if @connection.nil? - op.execute(@server, context: op_context) - else - op.execute_with_connection(@connection, context: op_context) - end + if @connection.nil? + op.execute(@server, context: op_context) + else + op.execute_with_connection(@connection, context: op_context) end end diff --git a/lib/mongo/operation/context.rb b/lib/mongo/operation/context.rb index c2be3c0149..03d6e0957d 100644 --- a/lib/mongo/operation/context.rb +++ b/lib/mongo/operation/context.rb @@ -69,7 +69,6 @@ def initialize( attr_reader :session attr_reader :view attr_reader :options - attr_accessor :tracer # Returns a new Operation::Context with the deadline refreshed # and relative to the current moment. diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index 53f35249e3..85865c989a 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -105,11 +105,7 @@ def result_class def get_result(connection, context, options = {}) message = build_message(connection, context) - if (tracer = context.tracer) - tracer.trace_command(message, context, connection) do - result_class.new(*dispatch_message(message, connection, context, options), context: context, connection: connection) - end - else + connection.tracer.trace_command(message, context, connection) do result_class.new(*dispatch_message(message, connection, context, options), context: context, connection: connection) end end diff --git a/lib/mongo/server.rb b/lib/mongo/server.rb index c00285034e..edf2eb3b4f 100644 --- a/lib/mongo/server.rb +++ b/lib/mongo/server.rb @@ -218,7 +218,8 @@ def compressor # @api private def_delegators :cluster, :monitor_app_metadata, - :push_monitor_app_metadata + :push_monitor_app_metadata, + :tracer def_delegators :features, :check_driver_support! diff --git a/lib/mongo/server/connection.rb b/lib/mongo/server/connection.rb index 14b8f7b16d..4c0024c529 100644 --- a/lib/mongo/server/connection.rb +++ b/lib/mongo/server/connection.rb @@ -139,6 +139,8 @@ def initialize(server, options = {}) # across all connections. attr_reader :global_id + def_delegators :server, :tracer + # The connection pool from which this connection was created. # May be nil. # diff --git a/lib/mongo/tracing/open_telemetry/command_tracer.rb b/lib/mongo/tracing/open_telemetry/command_tracer.rb index 76113783c8..22c806c47a 100644 --- a/lib/mongo/tracing/open_telemetry/command_tracer.rb +++ b/lib/mongo/tracing/open_telemetry/command_tracer.rb @@ -17,24 +17,44 @@ module Mongo module Tracing module OpenTelemetry + # CommandTracer is responsible for tracing MongoDB server commands using OpenTelemetry. + # + # @api private class CommandTracer - def initialize(otel_tracer, query_text_max_length: 0) + extend Forwardable + + def_delegators :@parent_tracer, + :cursor_context_map, + :parent_context_for, + :transaction_context_map, + :transaction_map_key + + def initialize(otel_tracer, parent_tracer, query_text_max_length: 0) @otel_tracer = otel_tracer + @parent_tracer = parent_tracer @query_text_max_length = query_text_max_length end - def trace_command(message, _operation_context, connection) - @otel_tracer.in_span( + def trace_command(message, operation_context, connection) + parent_context = parent_context_for(operation_context, cursor_id(message)) + span = @otel_tracer.start_span( command_span_name(message), attributes: span_attributes(message, connection), + with_parent: parent_context, kind: :client - ) do |span, _context| + ) + ::OpenTelemetry::Trace.with_span(span) do |s, c| + # TODO: process cursor context if applicable yield.tap do |result| - if result.respond_to?(:cursor_id) && result.cursor_id.positive? - span.set_attribute('db.mongodb.cursor_id', result.cursor_id) - end + process_cursor_context(result, cursor_id(message), c, s) end end + rescue Exception => e + span&.record_exception(e) + span&.status = ::OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") + raise e + ensure + span&.finish end private @@ -42,35 +62,60 @@ def trace_command(message, _operation_context, connection) def span_attributes(message, connection) { 'db.system' => 'mongodb', - 'db.namespace' => message.documents.first['$db'], + 'db.namespace' => database(message), 'db.collection.name' => collection_name(message), - 'db.operation.name' => message.documents.first.keys.first, + 'db.command.name' => command_name(message), 'server.port' => connection.address.port, 'server.address' => connection.address.host, 'network.transport' => connection.transport.to_s, 'db.mongodb.server_connection_id' => connection.server.description.server_connection_id, 'db.mongodb.driver_connection_id' => connection.id, + 'db.mongodb.cursor_id' => cursor_id(message), 'db.query.text' => query_text(message) }.compact end + def process_cursor_context(result, cursor_id, context, span) + if result.respond_to?(:cursor_id) && result.cursor_id.positive? + span.set_attribute('db.mongodb.cursor_id', result.cursor_id) + end + end + def command_span_name(message) - message.documents.first.keys.first + if (coll_name = collection_name(message)) + "#{command_name(message)} #{database(message)}.#{coll_name}" + else + "#{command_name(message)} #{database(message)}" + end end def collection_name(message) case message.documents.first.keys.first when 'getMore' - message.documents.first['collection'] + message.documents.first['collection'].to_s else - message.documents.first.values.first + message.documents.first.values.first.to_s end end + def command_name(message) + message.documents.first.keys.first.to_s + end + + def database(message) + message.documents.first['$db'].to_s + end + def query_text? @query_text_max_length.positive? end + def cursor_id(message) + if command_name(message) == 'getMore' + message.documents.first['getMore'].value + end + end + EXCLUDED_KEYS = %w[lsid $db $clusterTime signature].freeze ELLIPSES = '...' diff --git a/lib/mongo/tracing/open_telemetry/operation_tracer.rb b/lib/mongo/tracing/open_telemetry/operation_tracer.rb index c760daaa3f..a06b4cc551 100644 --- a/lib/mongo/tracing/open_telemetry/operation_tracer.rb +++ b/lib/mongo/tracing/open_telemetry/operation_tracer.rb @@ -17,61 +17,79 @@ module Mongo module Tracing module OpenTelemetry - # OperationTracer is responsible for tracing MongoDB operations using OpenTelemetry. + # OperationTracer is responsible for tracing MongoDB driver operations using OpenTelemetry. # - # It provides methods to trace driven operations. # @api private class OperationTracer + extend Forwardable + + def_delegators :@parent_tracer, + :cursor_context_map, + :parent_context_for, + :transaction_context_map, + :transaction_map_key + def initialize(otel_tracer, parent_tracer) @otel_tracer = otel_tracer @parent_tracer = parent_tracer end - def trace_operation(name, operation, operation_context) - @otel_tracer.in_span( - operation_span_name(name, operation), - attributes: span_attributes(name, operation), + def trace_operation(operation, operation_context) + parent_context = parent_context_for(operation_context, operation.cursor_id) + span = @otel_tracer.start_span( + operation_span_name(operation), + attributes: span_attributes(operation), + with_parent: parent_context, kind: :client - ) do |span, _context| - operation_context.tracer = @parent_tracer + ) + ::OpenTelemetry::Trace.with_span(span) do |s, c| yield.tap do |result| - if result.is_a?(Cursor) && result.id.positive? - span.set_attribute('db.mongodb.cursor_id', result.id) - end + process_cursor_context(result, operation.cursor_id, c, s) end end + rescue Exception => e + span&.record_exception(e) + span&.status = ::OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") + raise e ensure - operation_context.tracer = nil + span&.finish end private - # Returns a hash of attributes for the OpenTelemetry span for the operation. - # - # @param name [String] The name of the operation. - # @param operation [Operation] The operation being traced. - # @return [Hash] A hash of attributes for the span. - def span_attributes(name, operation) + def operation_name(operation) + operation.class.name.split('::').last + end + + def span_attributes(operation) { 'db.system' => 'mongodb', 'db.namespace' => operation.db_name.to_s, 'db.collection.name' => operation.coll_name.to_s, - 'db.operation.name' => name, - 'db.operation.summary' => operation_span_name(name, operation), - 'db.cursor.id' => operation.cursor_id, + 'db.operation.name' => operation_name(operation), + 'db.operation.summary' => operation_span_name(operation), + 'db.mongodb.cursor_id' => operation.cursor_id, }.compact end - # Returns the name of the span for the operation. - # - # @param name [String] The name of the operation. - # @param operation [Operation] The operation being traced. - # # @return [String] The name of the span. - def operation_span_name(name, operation) + def process_cursor_context(result, cursor_id, context, span) + return unless result.is_a?(Cursor) + + if result.id.zero? + # If the cursor is closed, remove it from the context map. + cursor_context_map.delete(cursor_id) + elsif result.id && cursor_id.nil? + # New cursor created, store its context. + cursor_context_map[result.id] = context + span.set_attribute('db.mongodb.cursor_id', result.id) + end + end + + def operation_span_name(operation) if operation.coll_name - "#{name} #{operation.db_name}.#{operation.coll_name}" + "#{operation_name(operation)} #{operation.db_name}.#{operation.coll_name}" else - "#{operation.db_name}.#{name}" + "#{operation_name(operation)} #{operation.db_name}" end end end diff --git a/lib/mongo/tracing/open_telemetry/tracer.rb b/lib/mongo/tracing/open_telemetry/tracer.rb index de9f5b07dd..34615c0cd5 100644 --- a/lib/mongo/tracing/open_telemetry/tracer.rb +++ b/lib/mongo/tracing/open_telemetry/tracer.rb @@ -45,7 +45,7 @@ def initialize(enabled: nil, query_text_max_length: nil, otel_tracer: nil) end @otel_tracer = otel_tracer || initialize_tracer @operation_tracer = OperationTracer.new(@otel_tracer, self) - @command_tracer = CommandTracer.new(@otel_tracer, query_text_max_length: @query_text_max_length) + @command_tracer = CommandTracer.new(@otel_tracer, self, query_text_max_length: @query_text_max_length) end # Whether OpenTelemetry is enabled or not. @@ -55,11 +55,10 @@ def enabled? @enabled end - def trace_operation(name, operation, operation_context, &block) + def trace_operation(operation, operation_context, &block) return yield unless enabled? - operation_context.tracer = self - @operation_tracer.trace_operation(name, operation, operation_context, &block) + @operation_tracer.trace_operation(operation, operation_context, &block) end def trace_command(message, operation_context, connection, &block) @@ -68,6 +67,34 @@ def trace_command(message, operation_context, connection, &block) @command_tracer.trace_command(message, operation_context, connection, &block) end + def cursor_context_map + @cursor_context_map ||= {} + end + + def cursor_map_key(session, cursor_id) + return if cursor_id.nil? || session.nil? + + "#{session.id}-#{cursor_id}" + end + + def parent_context_for(operation_context, cursor_id) + if (key = transaction_map_key(operation_context.session)) + transaction_context_map[key] + elsif (key = cursor_map_key(operation_context.session, cursor_id)) + cursor_context_map[key] + end + end + + def transaction_context_map + @transaction_context_map ||= {} + end + + def transaction_map_key(session) + return if session.nil? || session.implicit? || !session.in_transaction? + + "#{session.id}-#{session.txn_num}" + end + private def initialize_tracer From 10f5f6caacaf78df3fd552ea0e4ee9569b9ab6cb Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 15 Aug 2025 15:50:58 +0200 Subject: [PATCH 4/6] Disable cursor nesting --- lib/mongo/tracing/open_telemetry/tracer.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/mongo/tracing/open_telemetry/tracer.rb b/lib/mongo/tracing/open_telemetry/tracer.rb index 34615c0cd5..f9b61bb7f8 100644 --- a/lib/mongo/tracing/open_telemetry/tracer.rb +++ b/lib/mongo/tracing/open_telemetry/tracer.rb @@ -74,14 +74,16 @@ def cursor_context_map def cursor_map_key(session, cursor_id) return if cursor_id.nil? || session.nil? - "#{session.id}-#{cursor_id}" + "#{session.session_id['id'].to_uuid}-#{cursor_id}" end def parent_context_for(operation_context, cursor_id) if (key = transaction_map_key(operation_context.session)) transaction_context_map[key] - elsif (key = cursor_map_key(operation_context.session, cursor_id)) - cursor_context_map[key] + elsif (_key = cursor_map_key(operation_context.session, cursor_id)) + # We return nil here unless we decide how to nest cursor operations. + nil + # cursor_context_map[key] end end @@ -92,7 +94,7 @@ def transaction_context_map def transaction_map_key(session) return if session.nil? || session.implicit? || !session.in_transaction? - "#{session.id}-#{session.txn_num}" + "#{session.session_id['id'].to_uuid}-#{session.txn_num}" end private From bf0dd5dbfefaae792ae3fef4c771eff1ba4e33fa Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Fri, 15 Aug 2025 17:26:41 +0200 Subject: [PATCH 5/6] Disable cursor nesting --- lib/mongo/operation/insert/op_msg.rb | 6 +----- lib/mongo/session.rb | 2 ++ lib/mongo/tracing/open_telemetry/command_tracer.rb | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/mongo/operation/insert/op_msg.rb b/lib/mongo/operation/insert/op_msg.rb index 45789ae183..4f5acec971 100644 --- a/lib/mongo/operation/insert/op_msg.rb +++ b/lib/mongo/operation/insert/op_msg.rb @@ -35,11 +35,7 @@ class OpMsg < OpMsgBase def get_result(connection, context, options = {}) message = build_message(connection, context) - if (tracer = context.tracer) - tracer.trace_command(message, context, connection) do - Result.new(*dispatch_message(message, connection, context), @ids, context: context) - end - else + connection.tracer.trace_command(message, context, connection) do Result.new(*dispatch_message(message, connection, context), @ids, context: context) end end diff --git a/lib/mongo/session.rb b/lib/mongo/session.rb index be9c1f2a42..f9cf3de279 100644 --- a/lib/mongo/session.rb +++ b/lib/mongo/session.rb @@ -130,6 +130,8 @@ def snapshot? # @since 2.5.0 attr_reader :operation_time + def_delegators :client, :tracer + # Sets the dirty state to the given value for the underlying server # session. If there is no server session, this does nothing. # diff --git a/lib/mongo/tracing/open_telemetry/command_tracer.rb b/lib/mongo/tracing/open_telemetry/command_tracer.rb index 22c806c47a..714b2c03b6 100644 --- a/lib/mongo/tracing/open_telemetry/command_tracer.rb +++ b/lib/mongo/tracing/open_telemetry/command_tracer.rb @@ -76,7 +76,7 @@ def span_attributes(message, connection) end def process_cursor_context(result, cursor_id, context, span) - if result.respond_to?(:cursor_id) && result.cursor_id.positive? + if result.has_cursor_id? && result.cursor_id.positive? span.set_attribute('db.mongodb.cursor_id', result.cursor_id) end end From f0595144541c9249e118798103668141bf6be8e4 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Thu, 21 Aug 2025 12:48:04 +0200 Subject: [PATCH 6/6] Spec runner --- lib/mongo.rb | 1 + .../tracing/open_telemetry/command_tracer.rb | 1 + .../open_telemetry/operation_tracer.rb | 2 +- spec/runners/unified.rb | 1 + spec/runners/unified/assertions.rb | 58 +++++++- spec/runners/unified/test.rb | 19 ++- .../data/crud_unified/find-comment.yml | 10 -- spec/spec_tests/data/open_telemetry/README.md | 34 +++++ .../data/open_telemetry/cursor/cursor.yml | 131 ++++++++++++++++++ .../data/open_telemetry/operation/find.yml | 64 +++++++++ .../open_telemetry/operation/find_retries.yml | 104 ++++++++++++++ .../data/open_telemetry/operation/insert.yml | 61 ++++++++ .../transaction/transaction.yml | 112 +++++++++++++++ spec/spec_tests/open_telemetry_spec.rb | 13 ++ spec/support/tracing.rb | 44 ++++-- 15 files changed, 625 insertions(+), 30 deletions(-) create mode 100644 spec/spec_tests/data/open_telemetry/README.md create mode 100644 spec/spec_tests/data/open_telemetry/cursor/cursor.yml create mode 100644 spec/spec_tests/data/open_telemetry/operation/find.yml create mode 100644 spec/spec_tests/data/open_telemetry/operation/find_retries.yml create mode 100644 spec/spec_tests/data/open_telemetry/operation/insert.yml create mode 100644 spec/spec_tests/data/open_telemetry/transaction/transaction.yml create mode 100644 spec/spec_tests/open_telemetry_spec.rb diff --git a/lib/mongo.rb b/lib/mongo.rb index ba9aa817e6..0e62e92415 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -33,6 +33,7 @@ autoload :CGI, 'cgi' require 'bson' +require 'opentelemetry-api' require 'mongo/id' require 'mongo/bson' diff --git a/lib/mongo/tracing/open_telemetry/command_tracer.rb b/lib/mongo/tracing/open_telemetry/command_tracer.rb index 714b2c03b6..9157c97b78 100644 --- a/lib/mongo/tracing/open_telemetry/command_tracer.rb +++ b/lib/mongo/tracing/open_telemetry/command_tracer.rb @@ -65,6 +65,7 @@ def span_attributes(message, connection) 'db.namespace' => database(message), 'db.collection.name' => collection_name(message), 'db.command.name' => command_name(message), + 'db.query.summary' => command_span_name(message), 'server.port' => connection.address.port, 'server.address' => connection.address.host, 'network.transport' => connection.transport.to_s, diff --git a/lib/mongo/tracing/open_telemetry/operation_tracer.rb b/lib/mongo/tracing/open_telemetry/operation_tracer.rb index a06b4cc551..1db2b68a73 100644 --- a/lib/mongo/tracing/open_telemetry/operation_tracer.rb +++ b/lib/mongo/tracing/open_telemetry/operation_tracer.rb @@ -58,7 +58,7 @@ def trace_operation(operation, operation_context) private def operation_name(operation) - operation.class.name.split('::').last + operation.class.name.split('::').last.downcase end def span_attributes(operation) diff --git a/spec/runners/unified.rb b/spec/runners/unified.rb index 042b1c3947..a58a469d9c 100644 --- a/spec/runners/unified.rb +++ b/spec/runners/unified.rb @@ -99,6 +99,7 @@ def define_unified_spec_tests(base_path, paths, expect_failure: false) test.run test.assert_outcome test.assert_events + test.assert_tracing_messages test.cleanup end end diff --git a/spec/runners/unified/assertions.rb b/spec/runners/unified/assertions.rb index 69cb2282ad..98850dcd56 100644 --- a/spec/runners/unified/assertions.rb +++ b/spec/runners/unified/assertions.rb @@ -140,9 +140,14 @@ def assert_documents_match(actual, expected) end end - def assert_document_matches(actual, expected, msg) - unless actual == expected - raise Error::ResultMismatch, "#{msg} does not match" + def assert_document_matches(actual, expected, msg, as_root: false) + if !as_root && actual.keys.to_set != expected.keys.to_set + raise Error::ResultMismatch, "#{msg} keys do not match: expected #{expected.keys}, actual #{actual.keys}" + end + expected.each do |key, expected_value| + raise Error::ResultMismatch, "#{msg} has no key #{key}" unless actual.key?(key) + actual_value = actual[key] + assert_value_matches(actual_value, expected_value, "#{msg} key #{key}") end end @@ -383,6 +388,14 @@ def assert_value_matches(actual, expected, msg) if actual.nil? || actual >= expected_v raise Error::ResultMismatch, "Actual value #{actual} should be less than #{expected_v}" end + when '$$matchAsDocument' + actual_v = BSON::ExtJSON.parse(actual) + match_as_root = false + if expected_v.keys.first == '$$matchAsRoot' + expected_v = expected_v.values.first + match_as_root = true + end + assert_document_matches(actual_v, expected_v, msg, as_root: match_as_root) else raise NotImplementedError, "Unknown operator #{operator}" end @@ -392,5 +405,44 @@ def assert_value_matches(actual, expected, msg) end end end + + def assert_tracing_messages + return unless @expected_tracing_messages + @expected_tracing_messages.each do |spec| + spec = UsingHash[spec] + client_id = spec.use!('client') + client = entities.get(:client, client_id) + tracer = @tracers.fetch(client) + expected_spans = spec.use!('spans') + ignore_extra_spans = if ignore = spec.use('ignoreExtraSpans') + # Ruby treats 0 as truthy, whereas the spec tests use it as falsy. + ignore == 0 ? false : ignore + else + false + end + actual_spans = tracer.span_hierarchy + if (!ignore_extra_spans && actual_spans.length != expected_spans.length) || + (ignore_extra_spans && actual_spans.length < expected_spans.length) + raise Error::ResultMismatch, "Span count mismatch: expected #{expected_spans.length}, actual #{actual_spans.length}\nExpected: #{expected_spans}\nActual: #{actual_spans}" + end + expected_spans.each_with_index do |expected, i| + assert_span_matches(actual_spans[i], expected) + end + end + end + + def assert_span_matches(actual, expected) + assert_eq(actual.name, expected.use!('name'), 'Span name does not match') + expected_attributes = UsingHash[expected.use!('tags')] + expected_attributes.each do |key, value| + actual_value = actual.attributes[key] + assert_value_matches(actual_value, value, "Span attribute #{key}") + end + + expected_nested_spans = expected.use('nested') || [] + expected_nested_spans.each_with_index do |nested_expected, i| + assert_span_matches(actual.nested[i], nested_expected) + end + end end end diff --git a/spec/runners/unified/test.rb b/spec/runners/unified/test.rb index ffec2bb242..8c40a37579 100644 --- a/spec/runners/unified/test.rb +++ b/spec/runners/unified/test.rb @@ -37,6 +37,7 @@ def initialize(spec, **opts) @description = @test_spec.use('description') @outcome = @test_spec.use('outcome') @expected_events = @test_spec.use('expectEvents') + @expected_tracing_messages = @test_spec.use('expectTracingMessages') @expected_spans = @test_spec.use('expectSpans') @skip_reason = @test_spec.use('skipReason') if req = @test_spec.use('runOnRequirements') @@ -55,6 +56,7 @@ def initialize(spec, **opts) end @test_spec.freeze @subscribers = {} + @tracers = {} @observe_sensitive = {} @options = opts end @@ -196,17 +198,24 @@ def generate_entities(es) end end - observe_spans = spec.use('observeSpans') - if observe_spans + observe_tracing_messages = spec.use('observeTracingMessages') + tracer = ::Tracing::Tracer.new + if observe_tracing_messages opts[:tracing] = { enabled: true, - # tracer: tracer, + tracer: tracer, } + if observe_tracing_messages['enableCommandPayload'] + # Set the maximum length of the query text to reasonably high + # value so that we can capture the full query text + opts[:tracing][:query_text_max_length] = 4096 + end end create_client(**opts).tap do |client| @observe_sensitive[id] = spec.use('observeSensitiveCommands') @subscribers[client] ||= subscriber + @tracers[client] ||= tracer end when 'database' client = entities.get(:client, spec.use!('client')) @@ -611,9 +620,5 @@ def bson_error BSON::String.const_get(:IllegalKey) : BSON::Error end - - def tracer - @tracer ||= ::Tracing::Tracer.new - end end end diff --git a/spec/spec_tests/data/crud_unified/find-comment.yml b/spec/spec_tests/data/crud_unified/find-comment.yml index 70cc968f7f..905241ad0e 100644 --- a/spec/spec_tests/data/crud_unified/find-comment.yml +++ b/spec/spec_tests/data/crud_unified/find-comment.yml @@ -6,7 +6,6 @@ createEntities: - client: id: &client0 client0 observeEvents: [ commandStartedEvent ] - observeSpans: true - database: id: &database0 database0 client: *client0 @@ -48,15 +47,6 @@ tests: find: *collection0Name filter: *filter comment: "comment" - expectSpans: - - client: *client0 - spans: - - name: "find" - attributes: - mongodb.collection: *collection0Name - mongodb.database: *database0Name - mongodb.filter: *filter - mongodb.comment: "comment" - description: "find with document comment" runOnRequirements: diff --git a/spec/spec_tests/data/open_telemetry/README.md b/spec/spec_tests/data/open_telemetry/README.md new file mode 100644 index 0000000000..c3d14f6c93 --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/README.md @@ -0,0 +1,34 @@ +# OpenTelemetry Tests + +______________________________________________________________________ + +## Testing + +### Automated Tests + +The YAML and JSON files in this directory are platform-independent tests meant to exercise a driver's implementation of +the OpenTelemetry specification. These tests utilize the +[Unified Test Format](../../unified-test-format/unified-test-format.md). + +For each test, create a MongoClient, configure it to enable tracing. + +```yaml +createEntities: + - client: + id: client0 + observeTracingMessages: + enableCommandPayload: true +``` + +These tests require the ability to collect tracing [spans](../open-telemetry.md#span) data in a structured form as +described in the +[Unified Test Format specification.expectTracingMessages](../../unified-test-format/unified-test-format.md#expectTracingMessages). +For example the Java driver uses [Micrometer](https://jira.mongodb.org/browse/JAVA-5732) to collect tracing spans. + +```yaml +expectTracingMessages: + client: client0 + ignoreExtraSpans: false + spans: + ... +``` diff --git a/spec/spec_tests/data/open_telemetry/cursor/cursor.yml b/spec/spec_tests/data/open_telemetry/cursor/cursor.yml new file mode 100644 index 0000000000..54675a8a67 --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/cursor/cursor.yml @@ -0,0 +1,131 @@ +description: cursor retrieval +schemaVersion: '1.26' +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: cursor + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: test +initialData: + - collectionName: test + databaseName: cursor + documents: + - { _id: 1 } + - { _id: 2 } + - { _id: 3 } + - { _id: 4 } + - { _id: 5 } + - { _id: 6 } +tests: + - description: find with a cursor + operations: + - name: find + object: *collection0 + arguments: + filter: { _id: { $gt: 1 } } + batchSize: 2 + expectResult: + - { _id: 2 } + - { _id: 3 } + - { _id: 4 } + - { _id: 5 } + - { _id: 6 } + expectTracingMessages: + client: *client0 + ignoreExtraSpans: false + spans: + - name: find cursor.test + tags: + db.system: mongodb + db.namespace: cursor + db.collection.name: test + db.operation.name: find + db.operation.summary: find cursor.test + nested: + - name: command find + tags: + db.system: mongodb + db.namespace: cursor + db.collection.name: cursor.$cmd + db.command.name: find + network.transport: tcp + db.mongodb.cursor_id: { $$exists: false } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + server.type: { $$type: string } + db.query.summary: find + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + find: test + filter: { _id: { $gt: 1 } } + batchSize: 2 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] + + - name: command getMore + tags: + db.system: mongodb + db.namespace: cursor + db.collection.name: cursor.$cmd + db.command.name: getMore + network.transport: tcp + db.mongodb.cursor_id: { $$type: [ 'int', 'long' ] } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + server.type: { $$type: string } + db.query.summary: getMore + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + getMore: { $$type: long } + collection: test + batchSize: 2 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] + - name: command getMore + tags: + db.system: mongodb + db.namespace: cursor + db.collection.name: cursor.$cmd + db.command.name: getMore + network.transport: tcp + db.mongodb.cursor_id: { $$type: [ 'int', 'long' ] } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + server.type: { $$type: string } + db.query.summary: getMore + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + getMore: { $$type: long } + collection: test + batchSize: 2 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] diff --git a/spec/spec_tests/data/open_telemetry/operation/find.yml b/spec/spec_tests/data/open_telemetry/operation/find.yml new file mode 100644 index 0000000000..292c641755 --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/operation/find.yml @@ -0,0 +1,64 @@ +description: operation find +schemaVersion: '1.26' +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: operation-find + - collection: + id: &collection0 collection0 + database: database0 + collectionName: &collection0Name test +initialData: + - collectionName: test + databaseName: operation-find + documents: [] +tests: + - description: find an element + operations: + - name: find + object: *collection0 + arguments: { filter: { x: 1 } } + + expectTracingMessages: + - client: *client0 + ignoreExtraSpans: false + spans: + - name: find operation-find.test + tags: + db.system: mongodb + db.namespace: operation-find + db.collection.name: test + db.operation.name: find + db.operation.summary: find operation-find.test + nested: + - name: find operation-find.test + tags: + db.system: mongodb + db.namespace: operation-find + db.collection.name: *collection0Name + db.command.name: find + network.transport: tcp + db.mongodb.cursor_id: { $$exists: false } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + db.query.summary: find operation-find.test + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + find: test + filter: + x: 1 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] diff --git a/spec/spec_tests/data/open_telemetry/operation/find_retries.yml b/spec/spec_tests/data/open_telemetry/operation/find_retries.yml new file mode 100644 index 0000000000..5daebe5fdb --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/operation/find_retries.yml @@ -0,0 +1,104 @@ +description: operation find retrying failed command +schemaVersion: '1.26' +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: operation-find-retries + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: test +initialData: + - collectionName: test + databaseName: operation-find-retries + documents: [] +tests: + - description: find an element + operations: + - name: failPoint + object: testRunner + arguments: + client: *client0 + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: [ find ] + errorCode: 89 + errorLabels: [ RetryableWriteError ] + + - name: find + object: *collection0 + arguments: + filter: { x: 1 } + expectTracingMessages: + client: *client0 + ignoreExtraSpans: true + spans: + - name: find operation-find-retries.test + tags: + db.system: mongodb + db.namespace: operation-find-retries + db.collection.name: test + db.operation.name: find + db.operation.summary: find operation-find-retries.test + nested: + - name: command find + tags: + db.system: mongodb + db.namespace: operation-find-retries + db.collection.name: operation-find-retries.$cmd + db.command.name: find + network.transport: tcp + db.mongodb.cursor_id: { $$exists: false } + db.response.status_code: { $$exists: false } + exception.message: { $$type: string } + exception.type: { $$type: string } + exception.stacktrace: { $$type: string } + server.address: { $$type: string } + server.port: { $$type: ['long', 'string'] } + server.type: { $$type: string } + db.query.summary: find + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + find: test + filter: + x: 1 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] + - name: command find + tags: + db.system: mongodb + db.namespace: operation-find-retries + db.collection.name: operation-find-retries.$cmd + db.command.name: find + network.transport: tcp + db.mongodb.cursor_id: { $$exists: false } + db.response.status_code: { $$exists: false } + exception.message: { $$exists: false } + exception.type: { $$exists: false } + exception.stacktrace: { $$exists: false } + server.address: { $$type: string } + server.port: { $$type: ['int', 'long'] } + server.type: { $$type: string } + db.query.summary: find + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + find: test + filter: + x: 1 + db.mongodb.server_connection_id: + $$type: [ 'int', 'long' ] + db.mongodb.driver_connection_id: + $$type: [ 'int', 'long' ] + diff --git a/spec/spec_tests/data/open_telemetry/operation/insert.yml b/spec/spec_tests/data/open_telemetry/operation/insert.yml new file mode 100644 index 0000000000..0a02a6128b --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/operation/insert.yml @@ -0,0 +1,61 @@ +description: operation insert +schemaVersion: '1.26' +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: operation-insert + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: test +initialData: + - collectionName: test + databaseName: operation-insert + documents: [ ] +tests: + - description: insert one element + operations: + - object: *collection0 + name: insertOne + arguments: { document: { _id: 1 } } + + expectTracingMessages: + client: *client0 + ignoreExtraSpans: false + spans: + - name: insert operation-insert.test + tags: + db.system: mongodb + db.namespace: operation-insert + db.collection.name: test + db.operation.name: insert + db.operation.summary: insert operation-insert.test + nested: + - name: command insert + tags: + db.system: mongodb + db.namespace: operation-insert + server.address: { $$type: string } + server.port: { $$type: [ 'long', 'string' ] } + server.type: { $$type: string } + db.query.summary: insert + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + insert: test + ordered: true + txnNumber: 1 + documents: + - _id: 1 + + outcome: + - collectionName: test + databaseName: operation-insert + documents: + - _id: 1 diff --git a/spec/spec_tests/data/open_telemetry/transaction/transaction.yml b/spec/spec_tests/data/open_telemetry/transaction/transaction.yml new file mode 100644 index 0000000000..eb76cc5fd7 --- /dev/null +++ b/spec/spec_tests/data/open_telemetry/transaction/transaction.yml @@ -0,0 +1,112 @@ +description: transaction spans +schemaVersion: '1.26' +runOnRequirements: + - minServerVersion: '4.0' + topologies: + - replicaset + - minServerVersion: '4.1.8' + topologies: + - sharded + - load-balanced +createEntities: + - client: + id: &client0 client0 + useMultipleMongoses: false + observeTracingMessages: + enableCommandPayload: true + - database: + id: &database0 database0 + client: *client0 + databaseName: transaction-tests + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: test + - session: + id: &session0 session0 + client: client0 +initialData: + - collectionName: test + databaseName: transaction-tests + documents: [] +tests: + - description: observeTracingMessages around transaction + operations: + - object: *session0 + name: startTransaction + - object: *collection0 + name: insertOne + arguments: + session: *session0 + document: + _id: 1 + - object: *session0 + name: commitTransaction + - name: find + object: *collection0 + arguments: { filter: { x: 1 } } + + expectTracingMessages: + client: *client0 + ignoreExtraSpans: false + spans: + - name: transaction + tags: + db.system: mongodb + nested: + - name: insert transaction-tests.test + tags: + db.system: mongodb + db.namespace: transaction-tests + db.collection.name: test + db.operation.name: insert + db.operation.summary: insert transaction-tests.test + nested: + - name: command insert + tags: + db.system: mongodb + db.namespace: transaction-tests + server.address: { $$type: string } + server.port: { $$type: ['long', 'string'] } + server.type: { $$type: string } + db.query.summary: insert + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + insert: test + ordered: true + txnNumber: 1 + startTransaction: true + autocommit: false + documents: + - _id: 1 + db.mongodb.lsid: { $$sessionLsid: *session0 } + - name: commitTransaction admin.$cmd + tags: + db.system: mongodb + nested: + - name: command commitTransaction + tags: + db.system: mongodb + db.query.text: + $$matchAsDocument: + $$matchAsRoot: + commitTransaction: 1 + txnNumber: 1 + autocommit: false + - name: find transaction-tests.test + tags: {} + nested: + - name: command find + tags: + db.system: mongodb + db.namespace: transaction-tests + server.address: { $$type: string } + server.port: { $$type: ['long', 'string'] } + server.type: { $$type: string } + db.query.summary: find + outcome: + - collectionName: test + databaseName: transaction-tests + documents: + - _id: 1 diff --git a/spec/spec_tests/open_telemetry_spec.rb b/spec/spec_tests/open_telemetry_spec.rb new file mode 100644 index 0000000000..040574597d --- /dev/null +++ b/spec/spec_tests/open_telemetry_spec.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true +# rubocop:todo all + +require 'spec_helper' + +require 'runners/unified' + +base = "#{CURRENT_PATH}/spec_tests/data/open_telemetry" +OTEL_UNIFIED_TESTS = Dir.glob("#{base}/**/*.yml").sort + +describe 'OpenTelemetry unified spec tests' do + define_unified_spec_tests(base, OTEL_UNIFIED_TESTS) +end diff --git a/spec/support/tracing.rb b/spec/support/tracing.rb index 0eefb1ad46..ec542bafc0 100644 --- a/spec/support/tracing.rb +++ b/spec/support/tracing.rb @@ -1,19 +1,23 @@ # frozen_string_literal: true module Tracing + Error = Class.new(StandardError) + class Span - attr_reader :name, :attributes, :events, :with_parent, :kind, :finished + attr_reader :tracer, :name, :attributes, :events, :with_parent, :kind, :finished, :nested attr_accessor :status - def initialize(name, attributes = {}, with_parent: nil, kind: :internal) + def initialize(tracer, name, attributes = {}, with_parent: nil, kind: :internal) + @tracer = tracer @name = name @attributes = attributes @events = [] @with_parent = with_parent @kind = kind @finished = false + @nested = [] end def set_attribute(key, value) @@ -37,7 +41,9 @@ def record_exception(exception, attributes: nil) end def finish + raise Tracing::Error, 'Span already finished' if @finished @finished = true + tracer.finish_span(self) end end @@ -47,18 +53,38 @@ class Tracer def initialize @spans = [] - end - def in_span(name, attributes: {}, kind: :internal) - span = Span.new(name, attributes, kind: kind) - @spans << span - context = Object.new - yield(span, context) if block_given? + @stack = [] end def start_span(name, attributes: {}, with_parent: nil, kind: :internal) - Span.new(name, attributes, with_parent: with_parent, kind: kind).tap do |span| + parent = if with_parent.nil? + @stack.last + else + with_parent + end + Span.new(self, name, attributes, with_parent: parent, kind: kind).tap do |span| @spans << span + @stack << span + end + end + + def finish_span(span) + raise Error, 'Span not found' unless @spans.include?(span) + @stack.pop if @stack.last == span + end + + def span_hierarchy + hierarchy = {} + @spans.each do |span| + if span.with_parent.nil? + hierarchy[span.object_id] = span + elsif (parent = hierarchy[span.with_parent.object_id]) + parent.nested << span + else + raise Error, "Parent span not found for span #{span.name}" + end end + hierarchy.values end end end