diff --git a/src/main/java/io/nats/streaming/Message.java b/src/main/java/io/nats/streaming/Message.java index bf50492..38657ed 100644 --- a/src/main/java/io/nats/streaming/Message.java +++ b/src/main/java/io/nats/streaming/Message.java @@ -31,6 +31,7 @@ public class Message { private long sequence; private long timestamp; private boolean redelivered; + private int redeliveryCount; private int crc32; private boolean immutable; @@ -46,6 +47,7 @@ public class Message { this.sequence = msgp.getSequence(); this.timestamp = msgp.getTimestamp(); this.redelivered = msgp.getRedelivered(); + this.redeliveryCount = msgp.getRedeliveryCount(); this.crc32 = msgp.getCRC32(); immutable = true; } @@ -186,6 +188,15 @@ public boolean isRedelivered() { return redelivered; } + /** + * Returns number of times this message has been redelivered to this client's connection. + * + * @return number of times this message was redelivered + */ + public int getRedeliveryCount() { + return redeliveryCount; + } + /** * Returns the CRC32 checksum for the message. * @@ -241,8 +252,8 @@ public String toString() { StringBuilder sb = new StringBuilder(); // Date theDate = new Date(TimeUnit.NANOSECONDS.toMillis(getTimestamp())); sb.append(String.format( - "{Timestamp=%d;Sequence=%d;Redelivered=%b;Subject=%s;Reply=%s;Payload=<", - getTimestamp(), getSequence(), isRedelivered(), getSubject(), getReplyTo())); + "{Timestamp=%d;Sequence=%d;Redelivered=%b;RedeliveryCount=%d;Subject=%s;Reply=%s;Payload=<", + getTimestamp(), getSequence(), isRedelivered(), getRedeliveryCount(), getSubject(), getReplyTo())); // dateFormat.format(theDate), getSequence(), isRedelivered(), getSubject(), getReplyTo())); for (int i = 0; i < maxBytes && i < len; i++) { diff --git a/src/main/proto/protocol.proto b/src/main/proto/protocol.proto index 4d1abd9..3679313 100644 --- a/src/main/proto/protocol.proto +++ b/src/main/proto/protocol.proto @@ -50,12 +50,13 @@ message PubAck { // Msg struct. Sequence is assigned for global ordering by // the cluster after the publisher has been acknowledged. message MsgProto { - uint64 sequence = 1; // globally ordered sequence number for the subject's channel - string subject = 2; // subject - string reply = 3; // optional reply - bytes data = 4; // payload - int64 timestamp = 5; // received timestamp - bool redelivered = 6; // Flag specifying if the message is being redelivered + uint64 sequence = 1; // globally ordered sequence number for the subject's channel + string subject = 2; // subject + string reply = 3; // optional reply + bytes data = 4; // payload + int64 timestamp = 5; // received timestamp + bool redelivered = 6; // Flag specifying if the message is being redelivered + uint32 redeliveryCount = 7; // Number of times the message has been redelivered (count currently not persisted) uint32 CRC32 = 10; // optional IEEE CRC32 } diff --git a/src/test/java/io/nats/streaming/MessageTests.java b/src/test/java/io/nats/streaming/MessageTests.java index d1358e9..fba5a86 100644 --- a/src/test/java/io/nats/streaming/MessageTests.java +++ b/src/test/java/io/nats/streaming/MessageTests.java @@ -49,19 +49,21 @@ public void testMessageMsgProto() { final byte[] data = "Hello World".getBytes(); final long sequence = 1234567890; final boolean redelivered = true; + final int redeliveryCount = 3; final int crc32 = 9898989; long timestamp = System.nanoTime(); MsgProto msgp = MsgProto.newBuilder().setSubject(subject).setReply(reply) .setData(ByteString.copyFrom(data)).setTimestamp(timestamp).setSequence(sequence) - .setRedelivered(redelivered).setCRC32(crc32).build(); + .setRedelivered(redelivered).setRedeliveryCount(redeliveryCount).setCRC32(crc32).build(); Message msg = new Message(msgp); assertEquals(subject, msg.getSubject()); assertEquals(reply, msg.getReplyTo()); assertArrayEquals(data, msg.getData()); assertEquals(sequence, msg.getSequence()); assertEquals(redelivered, msg.isRedelivered()); + assertEquals(redeliveryCount, msg.getRedeliveryCount()); assertEquals(crc32, msg.getCrc32()); assertNotNull(msg.getInstant()); } @@ -89,13 +91,14 @@ public void testImmutable() { final byte[] data = "Hello World".getBytes(); final long sequence = 1234567890; final boolean redelivered = true; + final int redeliveryCount = 3; final int crc32 = 9898989; long timestamp = System.nanoTime(); MsgProto msgp = MsgProto.newBuilder().setSubject(subject).setReply(reply) .setData(ByteString.copyFrom(data)).setTimestamp(timestamp).setSequence(sequence) - .setRedelivered(redelivered).setCRC32(crc32).build(); + .setRedelivered(redelivered).setRedeliveryCount(redeliveryCount).setCRC32(crc32).build(); Message msg = new Message(msgp); boolean exThrown = false; diff --git a/src/test/java/io/nats/streaming/RedeliveryTests.java b/src/test/java/io/nats/streaming/RedeliveryTests.java index 79b0885..3dbc665 100644 --- a/src/test/java/io/nats/streaming/RedeliveryTests.java +++ b/src/test/java/io/nats/streaming/RedeliveryTests.java @@ -218,7 +218,7 @@ public void testHighRedeliveryToQueueSubMoreThanOnce() throws Exception { } @Test - public void testRedeliveredFlag() throws Exception { + public void testRedeliveredFlagAndRedeliveryCount() throws Exception { try (NatsStreamingTestServer srv = new NatsStreamingTestServer(clusterName, false)) { Options options = new Options.Builder().natsUrl(srv.getURI()).build(); try (StreamingConnection sc = NatsStreaming.connect(clusterName, clientName, options)) { @@ -273,6 +273,10 @@ public void testRedeliveredFlag() throws Exception { fail("Expected a redelivered flag to be set on msg: " + msg.getSequence()); } + if ((msg.getSequence() % 2 == 0) && msg.getRedeliveryCount() <= 0) { + fail("Expected a redelivery count to be higher than 0 on msg: " + + msg.getSequence()); + } } } catch (IOException e) { e.printStackTrace();