Skip to content

Commit 166751d

Browse files
authored
Add TryQueryOffset API (#413)
* Add TryQueryOffset API --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 98dd6ac commit 166751d

13 files changed

+216
-9
lines changed

Directory.Packages.props

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
<PackageVersion Include="coverlet.collector" Version="3.2.0" />
1919
<!-- docs/**/*.csproj -->
2020
<PackageVersion Include="K4os.Compression.LZ4.Streams" Version="1.2.16" />
21-
<PackageVersion Include="Microsoft.Extensions.Logging" Version="7.0.0" />
22-
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
21+
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.0" />
22+
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
2323
</ItemGroup>
2424
<ItemGroup Label="net8.0 specific" Condition="'$(TargetFramework)' == 'net8.0'">
2525
<!-- RabbitMQ.Stream.Client -->
@@ -37,4 +37,4 @@
3737
<!-- RabbitMQ.Stream.Client.PerfTest -->
3838
<PackageVersion Include="FSharp.Core" Version="9.0.201" />
3939
</ItemGroup>
40-
</Project>
40+
</Project>

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ The library requires .NET 6, .NET 7 or .NET 8.
3636
- [Best practices to write a reliable client](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient/)
3737
- [Super Stream example](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/blob/main/docs/SuperStream)
3838
- [Stream Performance Test](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/RabbitMQ.Stream.Client.PerfTest)
39+
- [Single Active consumer for a stream](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/SingleActiveConsumer)
3940

4041

4142

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ RabbitMQ.Stream.Client.StreamSystem.StoreOffset(string reference, string stream,
342342
RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamInfo>
343343
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
344344
RabbitMQ.Stream.Client.StreamSystem.SuperStreamExists(string superStream) -> System.Threading.Tasks.Task<bool>
345+
RabbitMQ.Stream.Client.StreamSystem.TryQueryOffset(string reference, string stream) -> System.Threading.Tasks.Task<ulong?>
345346
RabbitMQ.Stream.Client.StreamSystem.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
346347
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
347348
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -590,10 +590,22 @@ private async Task Init()
590590
{
591591
// in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener
592592
// since the user decided to override the default behavior
593-
_config.StoredOffsetSpec = await _config.ConsumerUpdateListener(
594-
_config.Reference,
595-
_config.Stream,
596-
promotedAsActive).ConfigureAwait(false);
593+
594+
try
595+
{
596+
_config.StoredOffsetSpec = await _config.ConsumerUpdateListener(
597+
_config.Reference,
598+
_config.Stream,
599+
promotedAsActive).ConfigureAwait(false);
600+
}
601+
catch (Exception e)
602+
{
603+
Logger?.LogError(e,
604+
"Error while calling the ConsumerUpdateListener. OffsetTypeNext will be used. {EntityInfo}",
605+
DumpEntityConfiguration());
606+
// in this case the default behavior is to use the OffsetTypeNext
607+
_config.StoredOffsetSpec = new OffsetTypeNext();
608+
}
597609
}
598610

599611
// Here we set the promotion status

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,10 +478,29 @@ private static void MaybeThrowQueryException(string reference, string stream)
478478
/// <param name="stream">Stream name</param>
479479
/// <returns></returns>
480480
public async Task<ulong> QueryOffset(string reference, string stream)
481+
{
482+
var offset = await TryQueryOffset(reference, stream).ConfigureAwait(false);
483+
return offset ??
484+
throw new OffsetNotFoundException($"QueryOffset stream: {stream}, reference: {reference}");
485+
}
486+
487+
/// <summary>
488+
/// TryQueryOffset tries to retrieve the last consumer offset stored
489+
/// given a consumer name and stream name.
490+
/// Returns null if the offset is not found.
491+
/// </summary>
492+
public async Task<ulong?> TryQueryOffset(string reference, string stream)
481493
{
482494
MaybeThrowQueryException(reference, stream);
483495

484496
var response = await _client.QueryOffset(reference, stream).ConfigureAwait(false);
497+
498+
// Offset do not exist so just return null. There is no need to throw an OffsetNotFoundException and capture it.
499+
if (response.ResponseCode == ResponseCode.OffsetNotFound)
500+
{
501+
return null;
502+
}
503+
485504
ClientExceptions.MaybeThrowException(response.ResponseCode,
486505
$"QueryOffset stream: {stream}, reference: {reference}");
487506
return response.Offset;

Tests/RawConsumerSystemTests.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,11 +518,18 @@ public async Task ConsumerQueryOffset()
518518
Assert.Equal((ulong)(NumberOfMessagesToStore - 1),
519519
await system.QueryOffset(Reference, stream));
520520

521+
Assert.Equal((ulong)(NumberOfMessagesToStore - 1),
522+
await system.TryQueryOffset(Reference, stream));
523+
521524
// this has to raise OffsetNotFoundException in case the offset
522525
// does not exist like in this case.
523526
await Assert.ThrowsAsync<OffsetNotFoundException>(() =>
524527
system.QueryOffset("reference_does_not_exist", stream));
525528

529+
Assert.Null(await system.TryQueryOffset("reference_does_not_exist", stream));
530+
await Assert.ThrowsAsync<GenericProtocolException>(() =>
531+
(system.TryQueryOffset(Reference, "stream_does_not_exist")));
532+
526533
await rawConsumer.Close();
527534
await system.DeleteStream(stream);
528535
await system.Close();
@@ -579,8 +586,10 @@ public async Task ShouldConsumeFromStoredOffset()
579586

580587
// new consumer that should start from stored offset
581588
var offset = await system.QueryOffset(Reference, stream);
589+
var tryOffset = await system.TryQueryOffset(Reference, stream);
582590
// the offset received must be the same from the last stored
583591
Assert.Equal(offset, await storedOffset.Task);
592+
Assert.Equal(offset, tryOffset);
584593
var messagesConsumed = new TaskCompletionSource<ulong>();
585594
var rawConsumerWithOffset = await system.CreateRawConsumer(
586595
new RawConsumerConfig(stream)

Tests/SystemTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ await Assert.ThrowsAsync<ArgumentException>(
238238
await Assert.ThrowsAsync<QueryException>(
239239
async () => { await system.QueryPartition("stream_does_not_exist"); }
240240
);
241+
241242
await system.Close();
242243
}
243244

@@ -297,7 +298,8 @@ public async Task CloseProducerConsumerAfterForceCloseShouldNotRaiseError()
297298
var system = await StreamSystem.Create(config);
298299
await system.CreateStream(new StreamSpec(stream));
299300
var producer =
300-
await system.CreateRawProducer(new RawProducerConfig(stream) { ClientProvidedName = clientProvidedName });
301+
await system.CreateRawProducer(
302+
new RawProducerConfig(stream) { ClientProvidedName = clientProvidedName });
301303
await SystemUtils.WaitAsync();
302304
var consumer = await system.CreateRawConsumer(
303305
new RawConsumerConfig(stream) { ClientProvidedName = clientProvidedName });

docs/SingleActiveConsumer/Program.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// See https://aka.ms/new-console-template for more information
2+
3+
using SingleActiveConsumer;
4+
5+
internal class Program
6+
{
7+
static async Task Main(string[] args)
8+
{
9+
if (args.Length > 0)
10+
{
11+
switch (args[0])
12+
{
13+
case "--producer":
14+
await SaCProducer.Start().ConfigureAwait(false);
15+
break;
16+
case "--consumer":
17+
await SacConsumer.Start().ConfigureAwait(false);
18+
break;
19+
default:
20+
Console.WriteLine("Unknown option, valid options: --producer / --consumer");
21+
break;
22+
}
23+
}
24+
}
25+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using Microsoft.Extensions.Logging;
6+
7+
namespace SingleActiveConsumer;
8+
9+
using System.Text;
10+
using RabbitMQ.Stream.Client;
11+
using RabbitMQ.Stream.Client.Reliable;
12+
13+
public class SaCProducer
14+
{
15+
public static async Task Start()
16+
{
17+
var loggerFactory = LoggerFactory.Create(builder =>
18+
{
19+
builder.AddSimpleConsole();
20+
builder.AddFilter("RabbitMQ.Stream", LogLevel.Information);
21+
});
22+
23+
var loggerProducer = loggerFactory.CreateLogger<Producer>();
24+
var loggerMain = loggerFactory.CreateLogger<StreamSystem>();
25+
26+
27+
var streamSystem = await StreamSystem.Create(new StreamSystemConfig(), loggerMain).ConfigureAwait(false);
28+
await streamSystem.CreateStream(new StreamSpec("my-sac-stream")).ConfigureAwait(false);
29+
var producer = await Producer.Create(new ProducerConfig(streamSystem, "my-sac-stream"), loggerProducer)
30+
.ConfigureAwait(false);
31+
for (var i = 0; i < 5000; i++)
32+
{
33+
var body = Encoding.UTF8.GetBytes($"Message #{i}");
34+
var message = new Message(body);
35+
await producer.Send(message).ConfigureAwait(false);
36+
Thread.Sleep(2000);
37+
loggerProducer.LogInformation($"Message {i} sent");
38+
}
39+
40+
Console.WriteLine("Sending 50 messages to my-sac-stream");
41+
await producer.Close().ConfigureAwait(false);
42+
await streamSystem.Close().ConfigureAwait(false);
43+
}
44+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System.Buffers;
6+
using System.Text;
7+
using Microsoft.Extensions.Logging;
8+
using RabbitMQ.Stream.Client;
9+
using RabbitMQ.Stream.Client.Reliable;
10+
11+
namespace SingleActiveConsumer;
12+
13+
public class SacConsumer
14+
{
15+
public static async Task Start()
16+
{
17+
var loggerFactory = LoggerFactory.Create(builder =>
18+
{
19+
builder.AddSimpleConsole();
20+
builder.AddFilter("RabbitMQ.Stream", LogLevel.Information);
21+
});
22+
23+
var loggerConsumer = loggerFactory.CreateLogger<Consumer>();
24+
var loggerMain = loggerFactory.CreateLogger<StreamSystem>();
25+
26+
27+
var streamSystem = await StreamSystem.Create(new StreamSystemConfig(), loggerMain).ConfigureAwait(false);
28+
var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, "my-sac-stream")
29+
{
30+
Reference = "sac_consumer",
31+
OffsetSpec = new OffsetTypeFirst(),
32+
IsSingleActiveConsumer = true,
33+
MessageHandler = async (_, consumer, context, message) =>
34+
{
35+
var text = Encoding.UTF8.GetString(message.Data.Contents.ToArray());
36+
loggerConsumer.LogInformation($"The message {text} was received");
37+
38+
// Store the offset of the message.
39+
// store offset for each message is not a good practice
40+
// here is only for demo purpose
41+
await consumer.StoreOffset(context.Offset).ConfigureAwait(false);
42+
43+
await Task.CompletedTask.ConfigureAwait(false);
44+
},
45+
ConsumerUpdateListener = async (consumerRef, stream, isActive) =>
46+
{
47+
var status = isActive ? "active" : "inactive";
48+
loggerConsumer.LogInformation($"Consumer {consumerRef} is {status} on stream {stream}");
49+
if (!isActive) return new OffsetTypeNext();
50+
51+
var offset = await streamSystem.TryQueryOffset(consumerRef, stream).ConfigureAwait(false);
52+
if (offset != null)
53+
{
54+
loggerConsumer.LogInformation($"The offset for {consumerRef} on stream {stream} is {offset}");
55+
return new OffsetTypeOffset(offset.Value);
56+
}
57+
58+
loggerConsumer.LogWarning(
59+
$"The offset for {consumerRef} on stream {stream} is not available, OffsetNext will be used.");
60+
return new OffsetTypeNext();
61+
62+
}
63+
}, loggerConsumer).ConfigureAwait(false);
64+
Console.WriteLine("Consumer is running. Press [enter] to exit.");
65+
Console.ReadLine();
66+
await consumer.Close().ConfigureAwait(false);
67+
await streamSystem.Close().ConfigureAwait(false);
68+
}
69+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<ProjectReference Include="..\..\RabbitMQ.Stream.Client\RabbitMQ.Stream.Client.csproj" />
12+
</ItemGroup>
13+
14+
<ItemGroup>
15+
<PackageReference Include="Microsoft.Extensions.Logging" />
16+
<PackageReference Include="Microsoft.Extensions.Logging.Console" />
17+
</ItemGroup>
18+
19+
</Project>

docs/asciidoc/api.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,8 @@ include::{test-examples}/ConsumerUsage.cs[tag=manual-tracking-defaults]
938938
<2> Store the current offset on some condition
939939

940940
The snippet above uses `consumer.StoreOffset(context.Offset)` to store at the offset of the current message.
941+
It is possible to store the offset in a more generic way with `StreamSystem.StoreOffset(reference,stream, offsetValue)`
942+
941943

942944
====== Considerations On Offset Tracking
943945

docs/asciidoc/query-stream.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ The following methods are available:
1717

1818

1919
|`QueryOffset(string reference, string stream)`
20-
|Retrieves retrieves the last consumer offset stored for a given consumer Reference and stream. Useful for as consumer wants to know the last stored offset.
20+
|Retrieves the last consumer offset stored for a given consumer Reference and stream. Useful for as consumer wants to know the last stored offset.
21+
| Stream
22+
23+
|`TryQueryOffset(string reference, string stream)`
24+
|Like `QueryOffset` but returns `null` if the offset was not found.
2125
| Stream
2226

2327
|`QueryPartition(string superStream)`

0 commit comments

Comments
 (0)