diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 7939b19283946..c3363f46fd4c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -31,8 +31,12 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -91,6 +95,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.zookeeper.KeeperException; import org.awaitility.Awaitility; @@ -1381,6 +1386,57 @@ public void testGetMessageById() throws Exception { } } + @Test + public void testGetMessagesById() throws Exception { + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + admin.tenants().createTenant("tenant-xyz", tenantInfo); + admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test")); + final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessagesById1"; + final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessagesById2"; + admin.topics().createNonPartitionedTopic(topicName1); + admin.topics().createNonPartitionedTopic(topicName2); + + int batchMessagesMaxMessagesPerBatch = 2; + @Cleanup + Producer producer1 = pulsarClient.newProducer() + .topic(topicName1) + .batchingMaxMessages(batchMessagesMaxMessagesPerBatch) + .batchingMaxPublishDelay(2, TimeUnit.SECONDS) + .enableBatching(true) + .create(); + + int numMessages = 10; + List> messageIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + String s = String.valueOf(i); + messageIds.add(producer1.newMessage().key(s).value(s.getBytes(StandardCharsets.UTF_8)).sendAsync()); + } + FutureUtil.waitForAll(messageIds).get(); + + for (int i = 0; i < numMessages; i++) { + MessageIdImpl id = (MessageIdImpl) messageIds.get(i).get(); + long ledgerId = id.getLedgerId(); + long entryId = id.getEntryId(); + List> messagesById = admin.topics().getMessageById(topicName1, ledgerId, entryId, -1); + assertNotNull(messagesById); + assertEquals(messagesById.size(), batchMessagesMaxMessagesPerBatch); + assertTrue(messagesById.stream().allMatch(n -> { + MessageIdImpl actualMessageId = (MessageIdImpl) n.getMessageId(); + return actualMessageId.getLedgerId() == ledgerId && actualMessageId.getEntryId() == entryId; + })); + for (int batchIndex = 0; batchIndex < batchMessagesMaxMessagesPerBatch; batchIndex++) { + messagesById = admin.topics().getMessageById(topicName1, ledgerId, entryId, batchIndex); + assertEquals(messagesById.size(), 1); + BatchMessageIdImpl actualMessageId = (BatchMessageIdImpl) messagesById.get(0).getMessageId(); + assertEquals(actualMessageId.getBatchIndex(), batchIndex); + assertEquals(actualMessageId.getLedgerId(), ledgerId); + assertEquals(actualMessageId.getEntryId(), entryId); + } + } + + assertThrows(PulsarAdminException.class, () -> admin.topics().getMessageById(topicName2, 2, 1, -1)); + } + @Test public void testGetMessageById4SpecialPropsInMsg() throws Exception { TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index cace5cda7bd5b..6a09ed003a334 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1679,7 +1679,9 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * @return the message indexed by the messageId * @throws PulsarAdminException * Unexpected error + * @deprecated Use {@link #getMessageById(String, long, long, int)} instead. */ + @Deprecated Message getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException; /** @@ -1691,7 +1693,9 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * @param entryId * Entry id * @return a future that can be used to track when the message is returned + * @deprecated Use {@link #getMessageByIdAsync(String, long, long, int)} instead. */ + @Deprecated CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId); /** @@ -4477,4 +4481,29 @@ default void createShadowTopic(String shadowTopic, String sourceTopic) throws Pu default CompletableFuture createShadowTopicAsync(String shadowTopic, String sourceTopic) { return createShadowTopicAsync(shadowTopic, sourceTopic, null); } + + /** + * Get the message by its messageId via a topic subscription asynchronously. + * + * @param topic Topic name + * @param ledgerId Ledger id + * @param entryId Entry id + * @param batchIndex Batch Index, -1 returns all batch message + * @return a future that can be used to track when the message is returned + */ + CompletableFuture>> getMessageByIdAsync(String topic, long ledgerId, long entryId, + int batchIndex); + + /** + * Get the message by its messageId via a topic subscription. + * + * @param topic Topic name + * @param ledgerId Ledger id + * @param entryId Entry id + * @param batchIndex Batch Index, -1 returns all batch message + * @return the message indexed by the messageId + * @throws PulsarAdminException Unexpected error + */ + List> getMessageById(String topic, long ledgerId, long entryId, int batchIndex) + throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9d09d96073d9e..14ef9be20b60e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -879,7 +879,7 @@ private CompletableFuture>> peekNthMessage(String topic, St @Override public void completed(Response response) { try { - future.complete(getMessagesFromHttpResponse(tn.toString(), response)); + future.complete(getMessagesFromHttpResponse(tn.toString(), 0, response)); } catch (Exception e) { future.completeExceptionally(getApiException(e)); } @@ -954,7 +954,7 @@ public CompletableFuture> examineMessageAsync(String topic, Stri @Override public void completed(Response response) { try { - List> messages = getMessagesFromHttpResponse(tn.toString(), response); + List> messages = getMessagesFromHttpResponse(tn.toString(), -1, response); if (messages.size() > 0) { future.complete(messages.get(0)); } else { @@ -986,48 +986,13 @@ public CompletableFuture truncateAsync(String topic) { } @Override + @Deprecated public CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId) { - CompletableFuture> future = new CompletableFuture<>(); - getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> { - if (ex != null) { - if (ex instanceof NotFoundException) { - log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage()); - future.complete(r); - } else { - future.completeExceptionally(ex); - } - return null; - } - future.complete(r); - return null; - }); - return future; - } - - private CompletableFuture> getRemoteMessageById(String topic, long ledgerId, long entryId) { - TopicName topicName = validateTopic(topic); - WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId)); - final CompletableFuture> future = new CompletableFuture<>(); - asyncGetRequest(path, - new InvocationCallback() { - @Override - public void completed(Response response) { - try { - future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0)); - } catch (Exception e) { - future.completeExceptionally(getApiException(e)); - } - } - - @Override - public void failed(Throwable throwable) { - future.completeExceptionally(getApiException(throwable.getCause())); - } - }); - return future; + return getMessageByIdAsync(topic, ledgerId, entryId, 0).thenApply((r) -> r.get(0)); } @Override + @Deprecated public Message getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException { return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId)); @@ -1257,7 +1222,8 @@ private TopicName validateTopic(String topic) { return TopicName.get(topic); } - private List> getMessagesFromHttpResponse(String topic, Response response) throws Exception { + private List> getMessagesFromHttpResponse(String topic, int batchIndex, Response response) + throws Exception { if (response.getStatus() != Status.OK.getStatusCode()) { throw getApiException(response); @@ -1433,7 +1399,8 @@ private List> getMessagesFromHttpResponse(String topic, Response } if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != null) { - return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata, brokerEntryMetadata); + return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata, brokerEntryMetadata, + batchIndex); } MessageImpl message = new MessageImpl(topic, msgId, properties, @@ -1446,12 +1413,20 @@ private List> getMessagesFromHttpResponse(String topic, Response } private List> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data, - Map properties, MessageMetadata msgMetadataBuilder, - BrokerEntryMetadata brokerEntryMetadata) { + Map properties, + MessageMetadata msgMetadataBuilder, + BrokerEntryMetadata brokerEntryMetadata, + int batchIndex) { List> ret = new ArrayList<>(); int batchSize = Integer.parseInt(properties.get(BATCH_HEADER)); + if (batchIndex >= batchSize) { + throw new IllegalArgumentException("batchIndex is greater than batchSize"); + } ByteBuf buf = Unpooled.wrappedBuffer(data); for (int i = 0; i < batchSize; i++) { + if (batchIndex != -1 && batchIndex != i) { + continue; + } String batchMsgId = msgId + ":" + i; SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); try { @@ -2782,5 +2757,36 @@ public CompletableFuture createShadowTopicAsync(String shadowTopic, String }); } + @Override + public CompletableFuture>> getMessageByIdAsync(String topic, long ledgerId, long entryId, + int batchIndex) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId)); + final CompletableFuture>> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Response response) { + try { + future.complete(getMessagesFromHttpResponse(topicName.toString(), batchIndex, response)); + } catch (Exception e) { + future.completeExceptionally(getApiException(e)); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public List> getMessageById(String topic, long ledgerId, long entryId, int batchIndex) + throws PulsarAdminException { + return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId, batchIndex)); + } + private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index cdfcaefc7f6e4..aedc105e918ca 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -632,10 +632,13 @@ private class GetMessageById extends CliCommand { void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - Message message = getPersistentTopics().getMessageById(persistentTopic, ledgerId, entryId); + List> messages = getPersistentTopics().getMessageById(persistentTopic, ledgerId, entryId, + -1); - ByteBuf date = Unpooled.wrappedBuffer(message.getData()); - System.out.println(ByteBufUtil.prettyHexDump(date)); + messages.forEach(message -> { + ByteBuf date = Unpooled.wrappedBuffer(message.getData()); + System.out.println(ByteBufUtil.prettyHexDump(date)); + }); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 508642e63ae2b..17c24315ed171 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -1351,45 +1351,49 @@ private class GetMessageById extends CliCommand { void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - MessageImpl message = (MessageImpl) getTopics().getMessageById(persistentTopic, ledgerId, entryId); - if (message == null) { + List> messages = getTopics() + .getMessageById(persistentTopic, ledgerId, entryId, -1); + if (messages == null || messages.size() == 0) { System.out.println("Cannot find any messages based on ledgerId:" + ledgerId + " entryId:" + entryId); } else { - if (message.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); - System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" - + msgId.getBatchIndex()); - } else { - MessageIdImpl msgId = (MessageIdImpl) message.getMessageId(); - System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); - } - - System.out.println("Publish time: " + message.getPublishTime()); - System.out.println("Event time: " + message.getEventTime()); - System.out.println("Redelivery count: " + message.getRedeliveryCount()); + messages.forEach(messageInterface -> { + MessageImpl message = (MessageImpl) messageInterface; + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) message.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } - if (message.getDeliverAtTime() != 0) { - System.out.println("Deliver at time: " + message.getDeliverAtTime()); - } + System.out.println("Publish time: " + message.getPublishTime()); + System.out.println("Event time: " + message.getEventTime()); + System.out.println("Redelivery count: " + message.getRedeliveryCount()); - if (message.getBrokerEntryMetadata() != null) { - if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { - System.out.println("Broker entry metadata timestamp: " - + message.getBrokerEntryMetadata().getBrokerTimestamp()); + if (message.getDeliverAtTime() != 0) { + System.out.println("Deliver at time: " + message.getDeliverAtTime()); } - if (message.getBrokerEntryMetadata().hasIndex()) { - System.out.println("Broker entry metadata index: " - + message.getBrokerEntryMetadata().getIndex()); + + if (message.getBrokerEntryMetadata() != null) { + if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { + System.out.println("Broker entry metadata timestamp: " + + message.getBrokerEntryMetadata().getBrokerTimestamp()); + } + if (message.getBrokerEntryMetadata().hasIndex()) { + System.out.println("Broker entry metadata index: " + + message.getBrokerEntryMetadata().getIndex()); + } } - } - if (message.getProperties().size() > 0) { - System.out.println("Properties:"); - print(message.getProperties()); - } - ByteBuf date = Unpooled.wrappedBuffer(message.getData()); - System.out.println(ByteBufUtil.prettyHexDump(date)); + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + print(message.getProperties()); + } + ByteBuf date = Unpooled.wrappedBuffer(message.getData()); + System.out.println(ByteBufUtil.prettyHexDump(date)); + }); } } }