Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,103 @@ public void testGetMessageById() throws Exception {
}
}

@Test
public void testGetMessageByIdWithBatchIndex() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
admin.topics().createNonPartitionedTopic(topicName1);
admin.topics().createNonPartitionedTopic(topicName2);

ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer()
.topic(topicName1)
.enableBatching(true)
.batchingMaxMessages(2)
.batchingMaxBytes(1024 * 1024 * 1024)
.batchingMaxPublishDelay(1, TimeUnit.MINUTES)
.create();

List<CompletableFuture<MessageId>> idFutureList = new ArrayList<>();
String data1 = "test1";
idFutureList.add(producer.sendAsync(data1.getBytes()));
Thread.sleep(10);

String data2 = "test2";
idFutureList.add(producer.sendAsync(data2.getBytes()));

Thread.sleep(100);
BatchMessageIdImpl id1 = (BatchMessageIdImpl) idFutureList.get(0).get();
BatchMessageIdImpl id2 = (BatchMessageIdImpl) idFutureList.get(1).get();

Assert.assertEquals(id1.getLedgerId(), id2.getLedgerId());
Assert.assertEquals(id1.getEntryId(), id2.getEntryId());

Message<byte[]> message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId(), id1.getBatchIndex());
Assert.assertEquals(message1.getData(), data1.getBytes());

Message<byte[]> message2 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId(), id2.getBatchIndex());
Assert.assertEquals(message2.getData(), data2.getBytes());

Message<byte[]> message3 = null;
try {
message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId(), id1.getBatchIndex());
Assert.fail();
} catch (Exception e) {
Assert.assertNull(message3);
}

Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId(), id2.getBatchIndex()+1);
Assert.assertNull(message4);
}

@Test
public void testGetBatchMessagesById() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
admin.topics().createNonPartitionedTopic(topicName1);
admin.topics().createNonPartitionedTopic(topicName2);

ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer()
.topic(topicName1)
.enableBatching(true)
.batchingMaxMessages(2)
.batchingMaxBytes(1024 * 1024 * 1024)
.batchingMaxPublishDelay(1, TimeUnit.MINUTES)
.create();

List<CompletableFuture<MessageId>> idFutureList = new ArrayList<>();
String data1 = "test1";
idFutureList.add(producer.sendAsync(data1.getBytes()));
Thread.sleep(10);

String data2 = "test2";
idFutureList.add(producer.sendAsync(data2.getBytes()));

Thread.sleep(100);
MessageIdImpl id1 = (MessageIdImpl) idFutureList.get(0).get();
MessageIdImpl id2 = (MessageIdImpl) idFutureList.get(1).get();

Assert.assertEquals(id1.getLedgerId(), id2.getLedgerId());
Assert.assertEquals(id1.getEntryId(), id2.getEntryId());

List<Message<byte[]>> messages = admin.topics().getBatchMessagesById(topicName1, id1.getLedgerId(), id1.getEntryId());
Assert.assertEquals(messages.get(0).getData(), data1.getBytes());
Assert.assertEquals(messages.get(1).getData(), data2.getBytes());

List<Message<byte[]>> messages2 = new ArrayList<>();
try {
messages2 = admin.topics().getBatchMessagesById(topicName2, id1.getLedgerId(), id1.getEntryId());
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(messages2.isEmpty());
}
}

@Test
public void testGetMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1667,7 +1667,39 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* @throws PulsarAdminException
* Unexpected error
*/
Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException;
default Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException {
return getMessageById(topic, ledgerId, entryId, -1);
};

/**
* Get a message by its messageId via a topic subscription asynchronously.
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @return a future that can be used to track when the message is returned
*/
default CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId) throws PulsarAdminException {
return getMessageByIdAsync(topic, ledgerId, entryId, -1);
};

/**
* Get a message by its messageId via a topic subscription.
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @param batchIndex
* Batch Index
* @return the message indexed by the messageId
* @throws PulsarAdminException
* Unexpected error
*/
Message<byte[]> getMessageById(String topic, long ledgerId, long entryId, int batchIndex) throws PulsarAdminException;

/**
* Get a message by its messageId via a topic subscription asynchronously.
Expand All @@ -1677,9 +1709,37 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* Ledger id
* @param entryId
* Entry id
* @param batchIndex
* Batch Index
* @return a future that can be used to track when the message is returned
*/
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId);
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId, int batchIndex);

/**
* Get whole batch messages by its messageId via a topic subscription.
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @return the whole batch messages indexed by the messageId
* @throws PulsarAdminException
* Unexpected error
*/
List<Message<byte[]>> getBatchMessagesById(String topic, long ledgerId, long entryId) throws PulsarAdminException;

/**
* Get whole batch messages by its messageId via a topic subscription asynchronously.
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @return a future that can be used to track when the batch messages is returned
*/
CompletableFuture<List<Message<byte[]>>> getBatchMessagesByIdAsync(String topic, long ledgerId, long entryId);

/**
* Get message ID published at or just after this absolute timestamp (in ms).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,9 +963,9 @@ public CompletableFuture<Void> truncateAsync(String topic) {
}

@Override
public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId) {
public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId, int batchIndex) {
CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> {
getRemoteMessageById(topic, ledgerId, entryId, batchIndex).handle((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage());
Expand All @@ -981,7 +981,7 @@ public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long
return future;
}

private CompletableFuture<Message<byte[]>> getRemoteMessageById(String topic, long ledgerId, long entryId) {
private CompletableFuture<Message<byte[]>> getRemoteMessageById(String topic, long ledgerId, long entryId, int batchIndex) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId));
final CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
Expand All @@ -990,7 +990,22 @@ private CompletableFuture<Message<byte[]>> getRemoteMessageById(String topic, lo
@Override
public void completed(Response response) {
try {
future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0));
if (batchIndex == -1) {
future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0));
} else {
List<Message<byte[]>> messages = getMessagesFromHttpResponse(topicName.toString(), response);
for (Message<byte[]> msg : messages) {
MessageImpl<byte[]> message = (MessageImpl<byte[]>) msg;
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
if (msgId.getBatchIndex() == batchIndex) {
future.complete(msg);
return;
}
}
}
future.complete(null);
}
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
Expand All @@ -1005,9 +1020,58 @@ public void failed(Throwable throwable) {
}

@Override
public Message<byte[]> getMessageById(String topic, long ledgerId, long entryId)
public Message<byte[]> getMessageById(String topic, long ledgerId, long entryId, int batchIndex)
throws PulsarAdminException {
return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId, batchIndex));
}

@Override
public List<Message<byte[]>> getBatchMessagesById(String topic, long ledgerId, long entryId)
throws PulsarAdminException {
return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId));
return sync(() -> getBatchMessagesByIdAsync(topic, ledgerId, entryId));
}

@Override
public CompletableFuture<List<Message<byte[]>>> getBatchMessagesByIdAsync(String topic, long ledgerId, long entryId) {
CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<>();
getRemoteBatchMessagesById(topic, ledgerId, entryId).handle((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage());
future.complete(r);
} else {
future.completeExceptionally(ex);
}
return null;
}

future.complete(r);
return null;
});
return future;
}

private CompletableFuture<List<Message<byte[]>>> getRemoteBatchMessagesById(String topic, long ledgerId, long entryId) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId));
final CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
try {
future.complete(getMessagesFromHttpResponse(topicName.toString(), response));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,10 @@ public void topics() throws Exception {
verify(mockTopics).setMaxMessageSize("persistent://myprop/clust/ns1/ds1", 99);

cmdTopics.run(split("get-message-by-id persistent://myprop/clust/ns1/ds1 -l 10 -e 2"));
verify(mockTopics).getMessageById("persistent://myprop/clust/ns1/ds1", 10,2);
verify(mockTopics).getMessageById("persistent://myprop/clust/ns1/ds1", 10,2, -1);

cmdTopics.run(split("get-batch-messages-by-id persistent://myprop/clust/ns1/ds1 -l 10 -e 2"));
verify(mockTopics).getBatchMessagesById("persistent://myprop/clust/ns1/ds1", 10,2);

cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getDispatchRate("persistent://myprop/clust/ns1/ds1", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public CmdPersistentTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
jcommander.addCommand("peek-messages", new PeekMessages());
jcommander.addCommand("get-message-by-id", new GetMessageById());
jcommander.addCommand("get-batch-messages-by-id", new GetBatchMessagesById());
jcommander.addCommand("last-message-id", new GetLastMessageId());
jcommander.addCommand("reset-cursor", new ResetCursor());
jcommander.addCommand("terminate", new Terminate());
Expand Down Expand Up @@ -629,7 +630,8 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Get message by its ledgerId and entryId")
@Parameters(commandDescription = "Get message by its ledgerId and entryId, " +
"or get the exactly batch message by its batchIndex")
private class GetMessageById extends CliCommand {
@Parameter(description = "persistent://property/cluster/namespace/topic", required = true)
private java.util.List<String> params;
Expand All @@ -644,17 +646,57 @@ private class GetMessageById extends CliCommand {
required = true)
private long entryId;

@Parameter(names = { "-b", "--batchIndex" },
description = "batch index pointing to the desired message",
required = false)
private int batchIndex = -1;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);

Message<byte[]> message = getPersistentTopics().getMessageById(persistentTopic, ledgerId, entryId);
Message<byte[]> message = getPersistentTopics().getMessageById(persistentTopic, ledgerId, entryId, batchIndex);

ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
}
}

@Parameters(commandDescription = "Get the whole batch messages by its ledgerId and entryId")
private class GetBatchMessagesById extends CliCommand {
@Parameter(description = "persistent://property/cluster/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-l", "--ledgerId" },
description = "ledger id pointing to the desired ledger",
required = true)
private long ledgerId;

@Parameter(names = { "-e", "--entryId" },
description = "entry id pointing to the desired entry",
required = true)
private long entryId;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);

List<Message<byte[]>> messages = getPersistentTopics().getBatchMessagesById(persistentTopic, ledgerId, entryId);

if (messages == null || messages.isEmpty()) {
System.out.println("Cannot find any messages based on ledgerId:"
+ ledgerId + " entryId:" + entryId);
} else {
for (Message message : messages) {
if (message != null) {
ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
}
}
}
}
}


@Parameters(commandDescription = "Get last message Id of the topic")
private class GetLastMessageId extends CliCommand {
Expand Down
Loading