Skip to content

Commit 891abe8

Browse files
authored
feat: add support for PinnedByCorrelation consumer strategy in persistent subscriptions (#341)
1 parent 633486e commit 891abe8

File tree

3 files changed

+84
-6
lines changed

3 files changed

+84
-6
lines changed

src/main/java/io/kurrent/dbclient/AbstractCreatePersistentSubscription.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
77

8+
import java.util.Optional;
89
import java.util.concurrent.CompletableFuture;
910

1011
abstract class AbstractCreatePersistentSubscription<TPos, TSettings extends PersistentSubscriptionSettings> {
@@ -31,6 +32,8 @@ protected Persistent.CreateReq.Settings.Builder createSettings(){
3132
@SuppressWarnings({"unchecked", "deprecation"})
3233
public CompletableFuture execute() {
3334
return this.client.runWithArgs(args -> {
35+
Optional<ServerVersion> serverVersion = args.getServerVersion();
36+
3437
CompletableFuture result = new CompletableFuture();
3538
PersistentSubscriptionsGrpc.PersistentSubscriptionsStub client =
3639
GrpcUtils.configureStub(PersistentSubscriptionsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
@@ -56,8 +59,15 @@ public CompletableFuture execute() {
5659
settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.RoundRobin);
5760
} else if (settings.getNamedConsumerStrategy().isPinned()) {
5861
settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.Pinned);
62+
} else if (settings.getNamedConsumerStrategy().isPinnedByCorrelation()) {
63+
if (serverVersion.get().isGreaterThan(21, 10, 0)) {
64+
settingsBuilder.setConsumerStrategy(settings.getNamedConsumerStrategy().toString());
65+
} else {
66+
logger.error("Consumer strategy: '{}' is only available on server 21.10.1 and above", NamedConsumerStrategy.PINNED_BY_CORRELATION);
67+
throw new UnsupportedFeatureException();
68+
}
5969
} else {
60-
logger.error(String.format("Unsupported named consumer strategy: '%s'", settings.getNamedConsumerStrategy().toString()));
70+
logger.error("Unsupported named consumer strategy: '{}'", settings.getNamedConsumerStrategy().toString());
6171
throw new UnsupportedFeatureException();
6272
}
6373

src/main/java/io/kurrent/dbclient/NamedConsumerStrategy.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public class NamedConsumerStrategy {
2828
*/
2929
public static final NamedConsumerStrategy PINNED = new NamedConsumerStrategy("Pinned");
3030

31+
/**
32+
* This is similar to the Pinned strategy, but instead of using the source stream id to bucket the messages, it distributes the events based on the event's correlationId.
33+
*/
34+
public static final NamedConsumerStrategy PINNED_BY_CORRELATION = new NamedConsumerStrategy("PinnedByCorrelation");
35+
3136
NamedConsumerStrategy(String value) {
3237
this.value = value;
3338
}
@@ -53,6 +58,14 @@ public boolean isPinned() {
5358
return isNamed("Pinned");
5459
}
5560

61+
62+
/**
63+
* Checks if it's a <i>PinnedByCorrelation</i> strategy.
64+
*/
65+
public boolean isPinnedByCorrelation() {
66+
return isNamed("PinnedByCorrelation");
67+
}
68+
5669
/**
5770
* Checks if the strategy's name matches the string passed as a parameter.
5871
*/

src/test/java/io/kurrent/dbclient/persistentsubscriptions/PersistentSubscriptionManagementTests.java

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
import io.kurrent.dbclient.*;
44
import com.fasterxml.jackson.databind.json.JsonMapper;
55
import org.junit.jupiter.api.*;
6+
import org.junit.jupiter.api.extension.ExtensionContext;
7+
import org.junit.jupiter.params.ParameterizedTest;
8+
import org.junit.jupiter.params.provider.*;
69

710
import java.util.List;
811
import java.util.Optional;
912
import java.util.concurrent.CompletableFuture;
1013
import java.util.concurrent.TimeUnit;
14+
import java.util.stream.Stream;
1115

1216
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
1317
@SuppressWarnings("unchecked")
@@ -29,7 +33,7 @@ default void testListPersistentSubscriptions() throws Throwable {
2933
List<PersistentSubscriptionInfo> subs = client.listAll().get();
3034

3135
int count = 0;
32-
for (PersistentSubscriptionInfo info: subs) {
36+
for (PersistentSubscriptionInfo info : subs) {
3337
if (info.getEventSource().equals(streamA) || info.getEventSource().equals(streamB)) {
3438
count++;
3539
}
@@ -122,7 +126,7 @@ default void testGetPersistentSubscriptionInfoToAll() throws Throwable {
122126

123127
@Test
124128
@Order(6)
125-
default void testGetPersistentSubscriptionInfoNotExisting() throws Throwable {
129+
default void testGetPersistentSubscriptionInfoNotExisting() throws Throwable {
126130
KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient();
127131
Optional<PersistentSubscriptionToStreamInfo> result = client.getInfoToStream(generateName(), generateName()).get();
128132

@@ -147,6 +151,7 @@ default void testReplayParkedMessages() throws Throwable {
147151

148152
client.subscribeToStream(streamName, groupName, new PersistentSubscriptionListener() {
149153
int count = 0;
154+
150155
@Override
151156
public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
152157
if (count < 2)
@@ -206,6 +211,7 @@ default void testReplayParkedMessagesToAll() throws Throwable {
206211

207212
client.subscribeToAll(groupName, new PersistentSubscriptionListener() {
208213
int count = 0;
214+
209215
@Override
210216
public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
211217
if (count < 2 && event.getOriginalEvent().getStreamId().equals(streamName))
@@ -268,9 +274,9 @@ default void testEncoding() throws Throwable {
268274
break;
269275
}
270276

271-
Assertions.assertTrue(info.isPresent());
272-
Assertions.assertEquals(info.get().getEventSource(), streamName);
273-
Assertions.assertEquals(info.get().getGroupName(), groupName);
277+
Assertions.assertTrue(info.isPresent());
278+
Assertions.assertEquals(info.get().getEventSource(), streamName);
279+
Assertions.assertEquals(info.get().getGroupName(), groupName);
274280
}
275281

276282
@Test
@@ -279,4 +285,53 @@ default void testRestartSubsystem() throws Throwable {
279285
KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient();
280286
client.restartSubsystem().get();
281287
}
288+
289+
@ParameterizedTest
290+
@ArgumentsSource(NamedConsumerStrategyProvider.class)
291+
default void testCreatePersistentSubscriptionToAllWithConsumerStrategies(NamedConsumerStrategy strategy) throws Throwable {
292+
KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient();
293+
String groupName = String.format("/foo/%s/group", generateName());
294+
295+
CreatePersistentSubscriptionToAllOptions options = CreatePersistentSubscriptionToAllOptions.get()
296+
.namedConsumerStrategy(strategy);
297+
298+
client.createToAll(groupName, options).get();
299+
300+
Optional<PersistentSubscriptionToAllInfo> result = client.getInfoToAll(groupName).get();
301+
Assertions.assertTrue(result.isPresent(), "Subscription should be created");
302+
303+
Assertions.assertEquals(groupName, result.get().getGroupName());
304+
Assertions.assertEquals(strategy.toString(), result.get().getSettings().getNamedConsumerStrategy().toString());
305+
}
306+
307+
@ParameterizedTest
308+
@ArgumentsSource(NamedConsumerStrategyProvider.class)
309+
default void testCreatePersistentSubscriptionToStreamWithConsumerStrategies(NamedConsumerStrategy strategy) throws Throwable {
310+
KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient();
311+
String streamName = String.format("/foo/%s/stream", generateName());
312+
String groupName = String.format("/foo/%s/group", generateName());
313+
314+
CreatePersistentSubscriptionToStreamOptions options = CreatePersistentSubscriptionToStreamOptions.get()
315+
.namedConsumerStrategy(strategy);
316+
317+
client.createToStream(streamName, groupName, options).get();
318+
319+
Optional<PersistentSubscriptionToStreamInfo> result = client.getInfoToStream(streamName, groupName).get();
320+
Assertions.assertTrue(result.isPresent(), "Subscription should be created");
321+
322+
Assertions.assertEquals(groupName, result.get().getGroupName());
323+
Assertions.assertEquals(strategy.toString(), result.get().getSettings().getNamedConsumerStrategy().toString());
324+
}
282325
}
326+
327+
class NamedConsumerStrategyProvider implements ArgumentsProvider {
328+
@Override
329+
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
330+
return Stream.of(
331+
Arguments.of(NamedConsumerStrategy.DISPATCH_TO_SINGLE),
332+
Arguments.of(NamedConsumerStrategy.ROUND_ROBIN),
333+
Arguments.of(NamedConsumerStrategy.PINNED),
334+
Arguments.of(NamedConsumerStrategy.PINNED_BY_CORRELATION)
335+
);
336+
}
337+
}

0 commit comments

Comments
 (0)