Skip to content

Commit 96b07b8

Browse files
YayBurritoslukebakken
authored andcommitted
Pass basic props to RabbitMQActivitySource.BasicPublish to allow messageid tag to be included in trace span
updated tests for messageid tag corrected broken tests after bad conflict resolution from merging main into feature branch
1 parent 44d8be3 commit 96b07b8

File tree

5 files changed

+48
-235
lines changed

5 files changed

+48
-235
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
6161
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
6262

6363
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
64-
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
64+
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length, basicProperties)
6565
: default;
6666

6767
ulong publishSequenceNumber = 0;
@@ -116,7 +116,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
116116
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
117117

118118
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
119-
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
119+
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length, basicProperties)
120120
: default;
121121

122122
ulong publishSequenceNumber = 0;

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public static bool UseRoutingKeyAsOperationName
6666
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
6767
};
6868

69-
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
69+
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, IReadOnlyBasicProperties basicProperties,
7070
ActivityContext linkedContext = default)
7171
{
7272
if (!s_publisherSource.HasListeners())
@@ -83,7 +83,7 @@ public static bool UseRoutingKeyAsOperationName
8383
ActivityKind.Producer, linkedContext);
8484
if (activity != null && activity.IsAllDataRequested)
8585
{
86-
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity);
86+
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, basicProperties, bodySize, activity);
8787
}
8888

8989
return activity;

projects/Test/SequentialIntegration/TestActivitySource.cs

Lines changed: 22 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -75,158 +75,6 @@ void AssertIntTagGreaterThanZero(Activity activity, string name)
7575
Assert.True(activity.GetTagItem(name) is int result && result > 0);
7676
}
7777

78-
[Theory]
79-
[InlineData(true, true)]
80-
[InlineData(true, false)]
81-
[InlineData(false, true)]
82-
[InlineData(false, false)]
83-
public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
84-
{
85-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
86-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
87-
var _activities = new List<Activity>();
88-
using ActivityListener activityListener = StartActivityListener(_activities);
89-
await Task.Delay(500);
90-
string queueName = $"{Guid.NewGuid()}";
91-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
92-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
93-
byte[] consumeBody = null;
94-
var consumer = new AsyncEventingBasicConsumer(_channel);
95-
var consumerReceivedTcs =
96-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
97-
consumer.ReceivedAsync += (o, a) =>
98-
{
99-
consumeBody = a.Body.ToArray();
100-
consumerReceivedTcs.SetResult(true);
101-
return Task.CompletedTask;
102-
};
103-
104-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
105-
await _channel.BasicPublishAsync("", q.QueueName, true, sendBody);
106-
107-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
108-
Assert.True(await consumerReceivedTcs.Task);
109-
110-
await _channel.BasicCancelAsync(consumerTag);
111-
await Task.Delay(500);
112-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true);
113-
}
114-
115-
[Theory]
116-
[InlineData(true, true)]
117-
[InlineData(true, false)]
118-
[InlineData(false, true)]
119-
[InlineData(false, false)]
120-
public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
121-
{
122-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
123-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
124-
var _activities = new List<Activity>();
125-
using ActivityListener activityListener = StartActivityListener(_activities);
126-
await Task.Delay(500);
127-
string queueName = $"{Guid.NewGuid()}";
128-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
129-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
130-
byte[] consumeBody = null;
131-
var consumer = new AsyncEventingBasicConsumer(_channel);
132-
var consumerReceivedTcs =
133-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
134-
consumer.ReceivedAsync += (o, a) =>
135-
{
136-
consumeBody = a.Body.ToArray();
137-
consumerReceivedTcs.SetResult(true);
138-
return Task.CompletedTask;
139-
};
140-
141-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
142-
CachedString exchange = new CachedString("");
143-
CachedString routingKey = new CachedString(q.QueueName);
144-
await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody);
145-
146-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
147-
Assert.True(await consumerReceivedTcs.Task);
148-
149-
await _channel.BasicCancelAsync(consumerTag);
150-
await Task.Delay(500);
151-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true);
152-
}
153-
154-
[Theory]
155-
[InlineData(true, true)]
156-
[InlineData(true, false)]
157-
[InlineData(false, true)]
158-
[InlineData(false, false)]
159-
public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
160-
{
161-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
162-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
163-
var _activities = new List<Activity>();
164-
using ActivityListener activityListener = StartActivityListener(_activities);
165-
await Task.Delay(500);
166-
string queueName = $"{Guid.NewGuid()}";
167-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
168-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
169-
byte[] consumeBody = null;
170-
var consumer = new AsyncEventingBasicConsumer(_channel);
171-
var consumerReceivedTcs =
172-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
173-
consumer.ReceivedAsync += (o, a) =>
174-
{
175-
consumeBody = a.Body.ToArray();
176-
consumerReceivedTcs.SetResult(true);
177-
return Task.CompletedTask;
178-
};
179-
180-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
181-
PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName);
182-
await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody);
183-
184-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
185-
Assert.True(await consumerReceivedTcs.Task);
186-
187-
await _channel.BasicCancelAsync(consumerTag);
188-
await Task.Delay(500);
189-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, _activities, true);
190-
}
191-
192-
[Theory]
193-
[InlineData(true, true)]
194-
[InlineData(true, false)]
195-
[InlineData(false, true)]
196-
[InlineData(false, false)]
197-
public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
198-
{
199-
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
200-
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
201-
var activities = new List<Activity>();
202-
using ActivityListener activityListener = StartActivityListener(activities);
203-
await Task.Delay(500);
204-
205-
string queueName = $"{Guid.NewGuid()}";
206-
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
207-
byte[] sendBody = Encoding.UTF8.GetBytes("hi");
208-
byte[] consumeBody = null;
209-
var consumer = new AsyncEventingBasicConsumer(_channel);
210-
var consumerReceivedTcs =
211-
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
212-
consumer.ReceivedAsync += (o, a) =>
213-
{
214-
consumeBody = a.Body.ToArray();
215-
consumerReceivedTcs.SetResult(true);
216-
return Task.CompletedTask;
217-
};
218-
219-
string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
220-
await _channel.BasicPublishAsync("", q.QueueName, true, sendBody);
221-
222-
await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
223-
Assert.True(await consumerReceivedTcs.Task);
224-
225-
await _channel.BasicCancelAsync(consumerTag);
226-
await Task.Delay(500);
227-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queueName, activities, true);
228-
}
229-
23078
[Theory]
23179
[InlineData(true, true)]
23280
[InlineData(true, false)]
@@ -307,11 +155,15 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn
307155
}
308156

309157
[Theory]
310-
[InlineData(true, true)]
311-
[InlineData(true, false)]
312-
[InlineData(false, true)]
313-
[InlineData(false, false)]
314-
public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
158+
[InlineData(true, true, true)]
159+
[InlineData(true, true, false)]
160+
[InlineData(true, false, true)]
161+
[InlineData(true, false, false)]
162+
[InlineData(false, true, true)]
163+
[InlineData(false, true, false)]
164+
[InlineData(false, false, true)]
165+
[InlineData(false, false, false)]
166+
public async Task TestPublisherAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, bool useMessageId)
315167
{
316168
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
317169
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
@@ -321,18 +173,20 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
321173
string queue = $"queue-{Guid.NewGuid()}";
322174
const string msg = "for basic.get";
323175

176+
var basicProps = useMessageId ? new BasicProperties() { MessageId = Guid.NewGuid().ToString() } : new BasicProperties();
177+
324178
try
325179
{
326180
await _channel.QueueDeclareAsync(queue, false, false, false, null);
327-
await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg));
181+
await _channel.BasicPublishAsync("", queue, true, basicProps, Encoding.UTF8.GetBytes(msg));
328182
QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue);
329183
Assert.Equal(1u, ok.MessageCount);
330184
BasicGetResult res = await _channel.BasicGetAsync(queue, true);
331185
Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray()));
332186
ok = await _channel.QueueDeclarePassiveAsync(queue);
333187
Assert.Equal(0u, ok.MessageCount);
334188
await Task.Delay(500);
335-
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false);
189+
AssertActivityData(useRoutingKeyAsOperationName, usePublisherAsParent, queue, activities, false, basicProps.MessageId);
336190
}
337191
finally
338192
{
@@ -345,7 +199,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
345199
[InlineData(true, false)]
346200
[InlineData(false, true)]
347201
[InlineData(false, false)]
348-
public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
202+
public async Task TestPublisherWithCachedStringsAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
349203
{
350204
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
351205
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
@@ -381,7 +235,7 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use
381235
[InlineData(true, false)]
382236
[InlineData(false, true)]
383237
[InlineData(false, false)]
384-
public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
238+
public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTagsAsync(bool useRoutingKeyAsOperationName, bool usePublisherAsParent)
385239
{
386240
RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName;
387241
RabbitMQActivitySource.TracingOptions.UsePublisherAsParent = usePublisherAsParent;
@@ -427,7 +281,7 @@ private static ActivityListener StartActivityListener(List<Activity> activities)
427281
}
428282

429283
private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePublisherAsParent, string queueName,
430-
List<Activity> activityList, bool isDeliver = false)
284+
List<Activity> activityList, bool isDeliver = false, string messageId = null)
431285
{
432286
string childName = isDeliver ? "deliver" : "fetch";
433287
Activity[] activities = activityList.ToArray();
@@ -480,6 +334,12 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, bool usePubli
480334
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
481335
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
482336
AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);
337+
338+
if (messageId is not null)
339+
{
340+
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessageId, messageId);
341+
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessageId, messageId);
342+
}
483343
}
484344
}
485345
}

projects/Test/SequentialIntegration/TestHeartbeats.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public override Task InitializeAsync()
5959

6060
[SkippableFact(Timeout = 35000)]
6161
[Trait("Category", "LongRunning")]
62-
public async Task TestThatHeartbeatWriterUsesConfigurableInterval()
62+
public async Task TestThatHeartbeatWriterUsesConfigurableIntervalAsync()
6363
{
6464
Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test");
6565

@@ -72,7 +72,7 @@ public async Task TestThatHeartbeatWriterUsesConfigurableInterval()
7272

7373
[SkippableFact]
7474
[Trait("Category", "LongRunning")]
75-
public async Task TestThatHeartbeatWriterWithTLSEnabled()
75+
public async Task TestThatHeartbeatWriterWithTLSEnabledAsync()
7676
{
7777
Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test");
7878

@@ -94,7 +94,7 @@ public async Task TestThatHeartbeatWriterWithTLSEnabled()
9494

9595
[SkippableFact(Timeout = 90000)]
9696
[Trait("Category", "LongRunning")]
97-
public async Task TestHundredsOfConnectionsWithRandomHeartbeatInterval()
97+
public async Task TestHundredsOfConnectionsWithRandomHeartbeatIntervalAsync()
9898
{
9999
Skip.IfNot(LongRunningTestsEnabled(), "RABBITMQ_LONG_RUNNING_TESTS is not set, skipping test");
100100

0 commit comments

Comments
 (0)