From e7bf5160a071e704440d74837ac35c98ec640a40 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Wed, 15 Mar 2023 16:28:58 +0800 Subject: [PATCH] implement get exactly message by batchIndex and get whole batch messages by id --- .../broker/admin/PersistentTopicsTest.java | 97 +++++++++++++++++++ .../apache/pulsar/client/admin/Topics.java | 64 +++++++++++- .../client/admin/internal/TopicsImpl.java | 76 +++++++++++++-- .../pulsar/admin/cli/PulsarAdminToolTest.java | 5 +- .../pulsar/admin/cli/CmdPersistentTopics.java | 46 ++++++++- .../apache/pulsar/admin/cli/CmdTopics.java | 74 +++++++++++++- 6 files changed, 349 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 6949fe931ca5a..46038884582f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1377,6 +1377,103 @@ public void testGetMessageById() throws Exception { } } + @Test + public void testGetMessageByIdWithBatchIndex() throws Exception { + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + admin.tenants().createTenant("tenant-xyz", tenantInfo); + admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test")); + final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1"; + final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2"; + admin.topics().createNonPartitionedTopic(topicName1); + admin.topics().createNonPartitionedTopic(topicName2); + + ProducerBase producer = (ProducerBase) pulsarClient.newProducer() + .topic(topicName1) + .enableBatching(true) + .batchingMaxMessages(2) + .batchingMaxBytes(1024 * 1024 * 1024) + .batchingMaxPublishDelay(1, TimeUnit.MINUTES) + .create(); + + List> idFutureList = new ArrayList<>(); + String data1 = "test1"; + idFutureList.add(producer.sendAsync(data1.getBytes())); + Thread.sleep(10); + + String data2 = "test2"; + idFutureList.add(producer.sendAsync(data2.getBytes())); + + Thread.sleep(100); + BatchMessageIdImpl id1 = (BatchMessageIdImpl) idFutureList.get(0).get(); + BatchMessageIdImpl id2 = (BatchMessageIdImpl) idFutureList.get(1).get(); + + Assert.assertEquals(id1.getLedgerId(), id2.getLedgerId()); + Assert.assertEquals(id1.getEntryId(), id2.getEntryId()); + + Message message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId(), id1.getBatchIndex()); + Assert.assertEquals(message1.getData(), data1.getBytes()); + + Message message2 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId(), id2.getBatchIndex()); + Assert.assertEquals(message2.getData(), data2.getBytes()); + + Message message3 = null; + try { + message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId(), id1.getBatchIndex()); + Assert.fail(); + } catch (Exception e) { + Assert.assertNull(message3); + } + + Message message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId(), id2.getBatchIndex()+1); + Assert.assertNull(message4); + } + + @Test + public void testGetBatchMessagesById() throws Exception { + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + admin.tenants().createTenant("tenant-xyz", tenantInfo); + admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test")); + final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1"; + final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2"; + admin.topics().createNonPartitionedTopic(topicName1); + admin.topics().createNonPartitionedTopic(topicName2); + + ProducerBase producer = (ProducerBase) pulsarClient.newProducer() + .topic(topicName1) + .enableBatching(true) + .batchingMaxMessages(2) + .batchingMaxBytes(1024 * 1024 * 1024) + .batchingMaxPublishDelay(1, TimeUnit.MINUTES) + .create(); + + List> idFutureList = new ArrayList<>(); + String data1 = "test1"; + idFutureList.add(producer.sendAsync(data1.getBytes())); + Thread.sleep(10); + + String data2 = "test2"; + idFutureList.add(producer.sendAsync(data2.getBytes())); + + Thread.sleep(100); + MessageIdImpl id1 = (MessageIdImpl) idFutureList.get(0).get(); + MessageIdImpl id2 = (MessageIdImpl) idFutureList.get(1).get(); + + Assert.assertEquals(id1.getLedgerId(), id2.getLedgerId()); + Assert.assertEquals(id1.getEntryId(), id2.getEntryId()); + + List> messages = admin.topics().getBatchMessagesById(topicName1, id1.getLedgerId(), id1.getEntryId()); + Assert.assertEquals(messages.get(0).getData(), data1.getBytes()); + Assert.assertEquals(messages.get(1).getData(), data2.getBytes()); + + List> messages2 = new ArrayList<>(); + try { + messages2 = admin.topics().getBatchMessagesById(topicName2, id1.getLedgerId(), id1.getEntryId()); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(messages2.isEmpty()); + } + } + @Test public void testGetMessageIdByTimestamp() throws Exception { TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 0d5d14eb73463..e84adbcac59fd 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1667,7 +1667,39 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * @throws PulsarAdminException * Unexpected error */ - Message getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException; + default Message getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException { + return getMessageById(topic, ledgerId, entryId, -1); + }; + + /** + * Get a message by its messageId via a topic subscription asynchronously. + * @param topic + * Topic name + * @param ledgerId + * Ledger id + * @param entryId + * Entry id + * @return a future that can be used to track when the message is returned + */ + default CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId) throws PulsarAdminException { + return getMessageByIdAsync(topic, ledgerId, entryId, -1); + }; + + /** + * Get a message by its messageId via a topic subscription. + * @param topic + * Topic name + * @param ledgerId + * Ledger id + * @param entryId + * Entry id + * @param batchIndex + * Batch Index + * @return the message indexed by the messageId + * @throws PulsarAdminException + * Unexpected error + */ + Message getMessageById(String topic, long ledgerId, long entryId, int batchIndex) throws PulsarAdminException; /** * Get a message by its messageId via a topic subscription asynchronously. @@ -1677,9 +1709,37 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * Ledger id * @param entryId * Entry id + * @param batchIndex + * Batch Index * @return a future that can be used to track when the message is returned */ - CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId); + CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId, int batchIndex); + + /** + * Get whole batch messages by its messageId via a topic subscription. + * @param topic + * Topic name + * @param ledgerId + * Ledger id + * @param entryId + * Entry id + * @return the whole batch messages indexed by the messageId + * @throws PulsarAdminException + * Unexpected error + */ + List> getBatchMessagesById(String topic, long ledgerId, long entryId) throws PulsarAdminException; + + /** + * Get whole batch messages by its messageId via a topic subscription asynchronously. + * @param topic + * Topic name + * @param ledgerId + * Ledger id + * @param entryId + * Entry id + * @return a future that can be used to track when the batch messages is returned + */ + CompletableFuture>> getBatchMessagesByIdAsync(String topic, long ledgerId, long entryId); /** * Get message ID published at or just after this absolute timestamp (in ms). diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 33d1cd1785827..1de59507629ea 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -963,9 +963,9 @@ public CompletableFuture truncateAsync(String topic) { } @Override - public CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId) { + public CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId, int batchIndex) { CompletableFuture> future = new CompletableFuture<>(); - getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> { + getRemoteMessageById(topic, ledgerId, entryId, batchIndex).handle((r, ex) -> { if (ex != null) { if (ex instanceof NotFoundException) { log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage()); @@ -981,7 +981,7 @@ public CompletableFuture> getMessageByIdAsync(String topic, long return future; } - private CompletableFuture> getRemoteMessageById(String topic, long ledgerId, long entryId) { + private CompletableFuture> getRemoteMessageById(String topic, long ledgerId, long entryId, int batchIndex) { TopicName topicName = validateTopic(topic); WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId)); final CompletableFuture> future = new CompletableFuture<>(); @@ -990,7 +990,22 @@ private CompletableFuture> getRemoteMessageById(String topic, lo @Override public void completed(Response response) { try { - future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0)); + if (batchIndex == -1) { + future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0)); + } else { + List> messages = getMessagesFromHttpResponse(topicName.toString(), response); + for (Message msg : messages) { + MessageImpl message = (MessageImpl) msg; + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + if (msgId.getBatchIndex() == batchIndex) { + future.complete(msg); + return; + } + } + } + future.complete(null); + } } catch (Exception e) { future.completeExceptionally(getApiException(e)); } @@ -1005,9 +1020,58 @@ public void failed(Throwable throwable) { } @Override - public Message getMessageById(String topic, long ledgerId, long entryId) + public Message getMessageById(String topic, long ledgerId, long entryId, int batchIndex) + throws PulsarAdminException { + return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId, batchIndex)); + } + + @Override + public List> getBatchMessagesById(String topic, long ledgerId, long entryId) throws PulsarAdminException { - return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId)); + return sync(() -> getBatchMessagesByIdAsync(topic, ledgerId, entryId)); + } + + @Override + public CompletableFuture>> getBatchMessagesByIdAsync(String topic, long ledgerId, long entryId) { + CompletableFuture>> future = new CompletableFuture<>(); + getRemoteBatchMessagesById(topic, ledgerId, entryId).handle((r, ex) -> { + if (ex != null) { + if (ex instanceof NotFoundException) { + log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage()); + future.complete(r); + } else { + future.completeExceptionally(ex); + } + return null; + } + + future.complete(r); + return null; + }); + return future; + } + + private CompletableFuture>> getRemoteBatchMessagesById(String topic, long ledgerId, long entryId) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId)); + final CompletableFuture>> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Response response) { + try { + future.complete(getMessagesFromHttpResponse(topicName.toString(), response)); + } catch (Exception e) { + future.completeExceptionally(getApiException(e)); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; } @Override diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index ccae1b1176527..5e82104a785cc 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1798,7 +1798,10 @@ public void topics() throws Exception { verify(mockTopics).setMaxMessageSize("persistent://myprop/clust/ns1/ds1", 99); cmdTopics.run(split("get-message-by-id persistent://myprop/clust/ns1/ds1 -l 10 -e 2")); - verify(mockTopics).getMessageById("persistent://myprop/clust/ns1/ds1", 10,2); + verify(mockTopics).getMessageById("persistent://myprop/clust/ns1/ds1", 10,2, -1); + + cmdTopics.run(split("get-batch-messages-by-id persistent://myprop/clust/ns1/ds1 -l 10 -e 2")); + verify(mockTopics).getBatchMessagesById("persistent://myprop/clust/ns1/ds1", 10,2); cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap")); verify(mockTopics).getDispatchRate("persistent://myprop/clust/ns1/ds1", true); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 6e59be39c018b..1f9f106e6b20f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -79,6 +79,7 @@ public CmdPersistentTopics(Supplier admin) { jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd()); jcommander.addCommand("peek-messages", new PeekMessages()); jcommander.addCommand("get-message-by-id", new GetMessageById()); + jcommander.addCommand("get-batch-messages-by-id", new GetBatchMessagesById()); jcommander.addCommand("last-message-id", new GetLastMessageId()); jcommander.addCommand("reset-cursor", new ResetCursor()); jcommander.addCommand("terminate", new Terminate()); @@ -629,7 +630,8 @@ void run() throws PulsarAdminException { } } - @Parameters(commandDescription = "Get message by its ledgerId and entryId") + @Parameters(commandDescription = "Get message by its ledgerId and entryId, " + + "or get the exactly batch message by its batchIndex") private class GetMessageById extends CliCommand { @Parameter(description = "persistent://property/cluster/namespace/topic", required = true) private java.util.List params; @@ -644,17 +646,57 @@ private class GetMessageById extends CliCommand { required = true) private long entryId; + @Parameter(names = { "-b", "--batchIndex" }, + description = "batch index pointing to the desired message", + required = false) + private int batchIndex = -1; + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - Message message = getPersistentTopics().getMessageById(persistentTopic, ledgerId, entryId); + Message message = getPersistentTopics().getMessageById(persistentTopic, ledgerId, entryId, batchIndex); ByteBuf date = Unpooled.wrappedBuffer(message.getData()); System.out.println(ByteBufUtil.prettyHexDump(date)); } } + @Parameters(commandDescription = "Get the whole batch messages by its ledgerId and entryId") + private class GetBatchMessagesById extends CliCommand { + @Parameter(description = "persistent://property/cluster/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-l", "--ledgerId" }, + description = "ledger id pointing to the desired ledger", + required = true) + private long ledgerId; + + @Parameter(names = { "-e", "--entryId" }, + description = "entry id pointing to the desired entry", + required = true) + private long entryId; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + + List> messages = getPersistentTopics().getBatchMessagesById(persistentTopic, ledgerId, entryId); + + if (messages == null || messages.isEmpty()) { + System.out.println("Cannot find any messages based on ledgerId:" + + ledgerId + " entryId:" + entryId); + } else { + for (Message message : messages) { + if (message != null) { + ByteBuf date = Unpooled.wrappedBuffer(message.getData()); + System.out.println(ByteBufUtil.prettyHexDump(date)); + } + } + } + } + } + @Parameters(commandDescription = "Get last message Id of the topic") private class GetLastMessageId extends CliCommand { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 8b11990dc531b..57a606a12b5b7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -129,6 +129,7 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("peek-messages", new PeekMessages()); jcommander.addCommand("examine-messages", new ExamineMessages()); jcommander.addCommand("get-message-by-id", new GetMessageById()); + jcommander.addCommand("get-batch-messages-by-id", new GetBatchMessagesById()); jcommander.addCommand("get-message-id", new GetMessageId()); jcommander.addCommand("reset-cursor", new ResetCursor()); jcommander.addCommand("terminate", new Terminate()); @@ -1292,7 +1293,8 @@ void run() throws PulsarAdminException { } } - @Parameters(commandDescription = "Get message by its ledgerId and entryId") + @Parameters(commandDescription = "Get message by its ledgerId and entryId, " + + "or get the exactly batch message by its batchIndex") private class GetMessageById extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; @@ -1307,11 +1309,16 @@ private class GetMessageById extends CliCommand { required = true) private long entryId; + @Parameter(names = { "-b", "--batchIndex" }, + description = "batch index pointing to the desired message", + required = false) + private int batchIndex = -1; + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - MessageImpl message = (MessageImpl) getTopics().getMessageById(persistentTopic, ledgerId, entryId); + MessageImpl message = (MessageImpl) getTopics().getMessageById(persistentTopic, ledgerId, entryId, batchIndex); if (message == null) { System.out.println("Cannot find any messages based on ledgerId:" + ledgerId + " entryId:" + entryId); @@ -1354,6 +1361,69 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get the whole batch messages by its ledgerId and entryId") + private class GetBatchMessagesById extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-l", "--ledgerId" }, + description = "ledger id pointing to the desired ledger", + required = true) + private long ledgerId; + + @Parameter(names = { "-e", "--entryId" }, + description = "entry id pointing to the desired entry", + required = true) + private long entryId; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + + List> messages = getTopics().getBatchMessagesById(persistentTopic, ledgerId, entryId); + if (messages == null || messages.isEmpty()) { + System.out.println("Cannot find any messages based on ledgerId:" + + ledgerId + " entryId:" + entryId); + } else { + for (Message msg : messages) { + MessageImpl message = (MessageImpl) msg; + if (message == null) { + continue; + } else if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) message.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } + + System.out.println("Publish time: " + message.getPublishTime()); + System.out.println("Event time: " + message.getEventTime()); + + if (message.getDeliverAtTime() != 0) { + System.out.println("Deliver at time: " + message.getDeliverAtTime()); + } + + if (message.getBrokerEntryMetadata() != null) { + if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { + System.out.println("Broker entry metadata timestamp: " + message.getBrokerEntryMetadata().getBrokerTimestamp()); + } + if (message.getBrokerEntryMetadata().hasIndex()) { + System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex()); + } + } + + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + print(message.getProperties()); + } + ByteBuf date = Unpooled.wrappedBuffer(message.getData()); + System.out.println(ByteBufUtil.prettyHexDump(date)); + } + } + } + } + @Parameters(commandDescription = "Get message ID") private class GetMessageId extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true)