@@ -43,16 +43,20 @@ public static class RabbitMQActivitySource
43
43
private static readonly ActivitySource s_subscriberSource =
44
44
new ActivitySource ( SubscriberSourceName , AssemblyVersion ) ;
45
45
46
+ private static readonly ActivitySource s_connectionSource =
47
+ new ActivitySource ( ConnectionSourceName , AssemblyVersion ) ;
48
+
46
49
public const string PublisherSourceName = "RabbitMQ.Client.Publisher" ;
47
50
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber" ;
51
+ public const string ConnectionSourceName = "RabbitMQ.Client.Connection" ;
48
52
49
- public static Action < Activity , IDictionary < string , object ? > > ContextInjector { get ; set ; } = DefaultContextInjector ;
53
+ public static Action < Activity , IDictionary < string , object ? > > ContextInjector { get ; set ; } =
54
+ DefaultContextInjector ;
50
55
51
56
public static Func < IReadOnlyBasicProperties , ActivityContext > ContextExtractor { get ; set ; } =
52
57
DefaultContextExtractor ;
53
58
54
59
public static bool UseRoutingKeyAsOperationName { get ; set ; } = true ;
55
- internal static bool PublisherHasListeners => s_publisherSource . HasListeners ( ) ;
56
60
57
61
internal static readonly IEnumerable < KeyValuePair < string , object ? > > CreationTags = new [ ]
58
62
{
@@ -61,14 +65,24 @@ public static class RabbitMQActivitySource
61
65
new KeyValuePair < string , object ? > ( ProtocolVersion , "0.9.1" )
62
66
} ;
63
67
68
+ internal static Activity ? OpenConnection ( bool isReconnection )
69
+ {
70
+ Activity ? connectionActivity =
71
+ s_connectionSource . StartRabbitMQActivity ( "connection attempt" , ActivityKind . Client ) ;
72
+ connectionActivity ? . SetTag ( "messaging.rabbitmq.connection.is_reconnection" , isReconnection ) ;
73
+ return connectionActivity ;
74
+ }
75
+
76
+ internal static Activity ? OpenTcpConnection ( )
77
+ {
78
+ Activity ? connectionActivity =
79
+ s_connectionSource . StartRabbitMQActivity ( "tcp connection attempt" , ActivityKind . Client ) ;
80
+ return connectionActivity ;
81
+ }
82
+
64
83
internal static Activity ? BasicPublish ( string routingKey , string exchange , int bodySize ,
65
84
ActivityContext linkedContext = default )
66
85
{
67
- if ( ! s_publisherSource . HasListeners ( ) )
68
- {
69
- return null ;
70
- }
71
-
72
86
Activity ? activity = linkedContext == default
73
87
? s_publisherSource . StartRabbitMQActivity (
74
88
UseRoutingKeyAsOperationName ? $ "{ MessagingOperationNameBasicPublish } { routingKey } " : MessagingOperationNameBasicPublish ,
@@ -82,16 +96,10 @@ public static class RabbitMQActivitySource
82
96
}
83
97
84
98
return activity ;
85
-
86
99
}
87
100
88
101
internal static Activity ? BasicGetEmpty ( string queue )
89
102
{
90
- if ( ! s_subscriberSource . HasListeners ( ) )
91
- {
92
- return null ;
93
- }
94
-
95
103
Activity ? activity = s_subscriberSource . StartRabbitMQActivity (
96
104
UseRoutingKeyAsOperationName ? $ "{ MessagingOperationNameBasicGetEmpty } { queue } " : MessagingOperationNameBasicGetEmpty ,
97
105
ActivityKind . Consumer ) ;
@@ -109,11 +117,6 @@ public static class RabbitMQActivitySource
109
117
internal static Activity ? BasicGet ( string routingKey , string exchange , ulong deliveryTag ,
110
118
IReadOnlyBasicProperties readOnlyBasicProperties , int bodySize )
111
119
{
112
- if ( ! s_subscriberSource . HasListeners ( ) )
113
- {
114
- return null ;
115
- }
116
-
117
120
// Extract the PropagationContext of the upstream parent from the message headers.
118
121
Activity ? activity = s_subscriberSource . StartLinkedRabbitMQActivity (
119
122
UseRoutingKeyAsOperationName ? $ "{ MessagingOperationNameBasicGet } { routingKey } " : MessagingOperationNameBasicGet , ActivityKind . Consumer ,
@@ -130,11 +133,6 @@ public static class RabbitMQActivitySource
130
133
internal static Activity ? Deliver ( string routingKey , string exchange , ulong deliveryTag ,
131
134
IReadOnlyBasicProperties basicProperties , int bodySize )
132
135
{
133
- if ( ! s_subscriberSource . HasListeners ( ) )
134
- {
135
- return null ;
136
- }
137
-
138
136
// Extract the PropagationContext of the upstream parent from the message headers.
139
137
Activity ? activity = s_subscriberSource . StartLinkedRabbitMQActivity (
140
138
UseRoutingKeyAsOperationName ? $ "{ MessagingOperationNameBasicDeliver } { routingKey } " : MessagingOperationNameBasicDeliver ,
@@ -197,15 +195,15 @@ private static void PopulateMessagingTags(string operationType, string operation
197
195
198
196
internal static void PopulateMessageEnvelopeSize ( Activity ? activity , int size )
199
197
{
200
- if ( activity != null && activity . IsAllDataRequested && PublisherHasListeners )
198
+ if ( activity ? . IsAllDataRequested ?? false )
201
199
{
202
200
activity . SetTag ( MessagingEnvelopeSize , size ) ;
203
201
}
204
202
}
205
203
206
204
internal static void SetNetworkTags ( this Activity ? activity , IFrameHandler frameHandler )
207
205
{
208
- if ( PublisherHasListeners && activity != null && activity . IsAllDataRequested )
206
+ if ( activity ? . IsAllDataRequested ?? false )
209
207
{
210
208
switch ( frameHandler . RemoteEndPoint . AddressFamily )
211
209
{
@@ -216,15 +214,7 @@ internal static void SetNetworkTags(this Activity? activity, IFrameHandler frame
216
214
activity . SetTag ( "network.type" , "ipv4" ) ;
217
215
break ;
218
216
}
219
-
220
- if ( ! string . IsNullOrEmpty ( frameHandler . Endpoint . HostName ) )
221
- {
222
- activity
223
- . SetTag ( "server.address" , frameHandler . Endpoint . HostName ) ;
224
- }
225
-
226
- activity
227
- . SetTag ( "server.port" , frameHandler . Endpoint . Port ) ;
217
+ activity . SetServerTags ( frameHandler . Endpoint ) ;
228
218
229
219
if ( frameHandler . RemoteEndPoint is IPEndPoint ipEndpoint )
230
220
{
@@ -252,6 +242,18 @@ internal static void SetNetworkTags(this Activity? activity, IFrameHandler frame
252
242
}
253
243
}
254
244
245
+ internal static void SetServerTags ( this Activity activity , AmqpTcpEndpoint endpoint )
246
+ {
247
+ if ( ! string . IsNullOrEmpty ( endpoint . HostName ) )
248
+ {
249
+ activity
250
+ . SetTag ( "server.address" , endpoint . HostName ) ;
251
+ }
252
+
253
+ activity
254
+ . SetTag ( "server.port" , endpoint . Port ) ;
255
+ }
256
+
255
257
private static void DefaultContextInjector ( Activity sendActivity , IDictionary < string , object ? > props )
256
258
{
257
259
DistributedContextPropagator . Current . Inject ( sendActivity , props , DefaultContextSetter ) ;
0 commit comments