Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -91,6 +95,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.zookeeper.KeeperException;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -1381,6 +1386,57 @@ public void testGetMessageById() throws Exception {
}
}

@Test
public void testGetMessagesById() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessagesById1";
final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessagesById2";
admin.topics().createNonPartitionedTopic(topicName1);
admin.topics().createNonPartitionedTopic(topicName2);

int batchMessagesMaxMessagesPerBatch = 2;
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topicName1)
.batchingMaxMessages(batchMessagesMaxMessagesPerBatch)
.batchingMaxPublishDelay(2, TimeUnit.SECONDS)
.enableBatching(true)
.create();

int numMessages = 10;
List<CompletableFuture<MessageId>> messageIds = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
String s = String.valueOf(i);
messageIds.add(producer1.newMessage().key(s).value(s.getBytes(StandardCharsets.UTF_8)).sendAsync());
}
FutureUtil.waitForAll(messageIds).get();

for (int i = 0; i < numMessages; i++) {
MessageIdImpl id = (MessageIdImpl) messageIds.get(i).get();
long ledgerId = id.getLedgerId();
long entryId = id.getEntryId();
List<Message<byte[]>> messagesById = admin.topics().getMessageById(topicName1, ledgerId, entryId, -1);
assertNotNull(messagesById);
assertEquals(messagesById.size(), batchMessagesMaxMessagesPerBatch);
assertTrue(messagesById.stream().allMatch(n -> {
MessageIdImpl actualMessageId = (MessageIdImpl) n.getMessageId();
return actualMessageId.getLedgerId() == ledgerId && actualMessageId.getEntryId() == entryId;
}));
for (int batchIndex = 0; batchIndex < batchMessagesMaxMessagesPerBatch; batchIndex++) {
messagesById = admin.topics().getMessageById(topicName1, ledgerId, entryId, batchIndex);
assertEquals(messagesById.size(), 1);
BatchMessageIdImpl actualMessageId = (BatchMessageIdImpl) messagesById.get(0).getMessageId();
assertEquals(actualMessageId.getBatchIndex(), batchIndex);
assertEquals(actualMessageId.getLedgerId(), ledgerId);
assertEquals(actualMessageId.getEntryId(), entryId);
}
}

assertThrows(PulsarAdminException.class, () -> admin.topics().getMessageById(topicName2, 2, 1, -1));
}

@Test
public void testGetMessageById4SpecialPropsInMsg() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1679,7 +1679,9 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* @return the message indexed by the messageId
* @throws PulsarAdminException
* Unexpected error
* @deprecated Use {@link #getMessageById(String, long, long, int)} instead.
*/
@Deprecated
Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException;

/**
Expand All @@ -1691,7 +1693,9 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* @param entryId
* Entry id
* @return a future that can be used to track when the message is returned
* @deprecated Use {@link #getMessageByIdAsync(String, long, long, int)} instead.
*/
@Deprecated
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId);

/**
Expand Down Expand Up @@ -4477,4 +4481,29 @@ default void createShadowTopic(String shadowTopic, String sourceTopic) throws Pu
default CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic) {
return createShadowTopicAsync(shadowTopic, sourceTopic, null);
}

/**
* Get the message by its messageId via a topic subscription asynchronously.
*
* @param topic Topic name
* @param ledgerId Ledger id
* @param entryId Entry id
* @param batchIndex Batch Index, -1 returns all batch message
* @return a future that can be used to track when the message is returned
*/
CompletableFuture<List<Message<byte[]>>> getMessageByIdAsync(String topic, long ledgerId, long entryId,
int batchIndex);

/**
* Get the message by its messageId via a topic subscription.
*
* @param topic Topic name
* @param ledgerId Ledger id
* @param entryId Entry id
* @param batchIndex Batch Index, -1 returns all batch message
* @return the message indexed by the messageId
* @throws PulsarAdminException Unexpected error
*/
List<Message<byte[]>> getMessageById(String topic, long ledgerId, long entryId, int batchIndex)
throws PulsarAdminException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ private CompletableFuture<List<Message<byte[]>>> peekNthMessage(String topic, St
@Override
public void completed(Response response) {
try {
future.complete(getMessagesFromHttpResponse(tn.toString(), response));
future.complete(getMessagesFromHttpResponse(tn.toString(), 0, response));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
Expand Down Expand Up @@ -954,7 +954,7 @@ public CompletableFuture<Message<byte[]>> examineMessageAsync(String topic, Stri
@Override
public void completed(Response response) {
try {
List<Message<byte[]>> messages = getMessagesFromHttpResponse(tn.toString(), response);
List<Message<byte[]>> messages = getMessagesFromHttpResponse(tn.toString(), -1, response);
if (messages.size() > 0) {
future.complete(messages.get(0));
} else {
Expand Down Expand Up @@ -986,48 +986,13 @@ public CompletableFuture<Void> truncateAsync(String topic) {
}

@Override
@Deprecated
public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId) {
CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage());
future.complete(r);
} else {
future.completeExceptionally(ex);
}
return null;
}
future.complete(r);
return null;
});
return future;
}

private CompletableFuture<Message<byte[]>> getRemoteMessageById(String topic, long ledgerId, long entryId) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId));
final CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
try {
future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
return getMessageByIdAsync(topic, ledgerId, entryId, 0).thenApply((r) -> r.get(0));
}

@Override
@Deprecated
public Message<byte[]> getMessageById(String topic, long ledgerId, long entryId)
throws PulsarAdminException {
return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId));
Expand Down Expand Up @@ -1257,7 +1222,8 @@ private TopicName validateTopic(String topic) {
return TopicName.get(topic);
}

private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response response) throws Exception {
private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, int batchIndex, Response response)
throws Exception {

if (response.getStatus() != Status.OK.getStatusCode()) {
throw getApiException(response);
Expand Down Expand Up @@ -1433,7 +1399,8 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
}

if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != null) {
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata, brokerEntryMetadata);
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata, brokerEntryMetadata,
batchIndex);
}

MessageImpl message = new MessageImpl(topic, msgId, properties,
Expand All @@ -1446,12 +1413,20 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
}

private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data,
Map<String, String> properties, MessageMetadata msgMetadataBuilder,
BrokerEntryMetadata brokerEntryMetadata) {
Map<String, String> properties,
MessageMetadata msgMetadataBuilder,
BrokerEntryMetadata brokerEntryMetadata,
int batchIndex) {
List<Message<byte[]>> ret = new ArrayList<>();
int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
if (batchIndex >= batchSize) {
throw new IllegalArgumentException("batchIndex is greater than batchSize");
}
ByteBuf buf = Unpooled.wrappedBuffer(data);
for (int i = 0; i < batchSize; i++) {
if (batchIndex != -1 && batchIndex != i) {
continue;
}
String batchMsgId = msgId + ":" + i;
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
try {
Expand Down Expand Up @@ -2782,5 +2757,36 @@ public CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String
});
}

@Override
public CompletableFuture<List<Message<byte[]>>> getMessageByIdAsync(String topic, long ledgerId, long entryId,
int batchIndex) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId));
final CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
try {
future.complete(getMessagesFromHttpResponse(topicName.toString(), batchIndex, response));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public List<Message<byte[]>> getMessageById(String topic, long ledgerId, long entryId, int batchIndex)
throws PulsarAdminException {
return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId, batchIndex));
}

private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,13 @@ private class GetMessageById extends CliCommand {
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);

Message<byte[]> message = getPersistentTopics().getMessageById(persistentTopic, ledgerId, entryId);
List<Message<byte[]>> messages = getPersistentTopics().getMessageById(persistentTopic, ledgerId, entryId,
-1);

ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
messages.forEach(message -> {
ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,45 +1351,49 @@ private class GetMessageById extends CliCommand {
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);

MessageImpl message = (MessageImpl) getTopics().getMessageById(persistentTopic, ledgerId, entryId);
if (message == null) {
List<Message<byte[]>> messages = getTopics()
.getMessageById(persistentTopic, ledgerId, entryId, -1);
if (messages == null || messages.size() == 0) {
System.out.println("Cannot find any messages based on ledgerId:"
+ ledgerId + " entryId:" + entryId);
} else {
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}

System.out.println("Publish time: " + message.getPublishTime());
System.out.println("Event time: " + message.getEventTime());
System.out.println("Redelivery count: " + message.getRedeliveryCount());
messages.forEach(messageInterface -> {
MessageImpl<byte[]> message = (MessageImpl<byte[]>) messageInterface;
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}

if (message.getDeliverAtTime() != 0) {
System.out.println("Deliver at time: " + message.getDeliverAtTime());
}
System.out.println("Publish time: " + message.getPublishTime());
System.out.println("Event time: " + message.getEventTime());
System.out.println("Redelivery count: " + message.getRedeliveryCount());

if (message.getBrokerEntryMetadata() != null) {
if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
System.out.println("Broker entry metadata timestamp: "
+ message.getBrokerEntryMetadata().getBrokerTimestamp());
if (message.getDeliverAtTime() != 0) {
System.out.println("Deliver at time: " + message.getDeliverAtTime());
}
if (message.getBrokerEntryMetadata().hasIndex()) {
System.out.println("Broker entry metadata index: "
+ message.getBrokerEntryMetadata().getIndex());

if (message.getBrokerEntryMetadata() != null) {
if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
System.out.println("Broker entry metadata timestamp: "
+ message.getBrokerEntryMetadata().getBrokerTimestamp());
}
if (message.getBrokerEntryMetadata().hasIndex()) {
System.out.println("Broker entry metadata index: "
+ message.getBrokerEntryMetadata().getIndex());
}
}
}

if (message.getProperties().size() > 0) {
System.out.println("Properties:");
print(message.getProperties());
}
ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
if (message.getProperties().size() > 0) {
System.out.println("Properties:");
print(message.getProperties());
}
ByteBuf date = Unpooled.wrappedBuffer(message.getData());
System.out.println(ByteBufUtil.prettyHexDump(date));
});
}
}
}
Expand Down