Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <openssl/x509.h>
#include <pulsar/MessageIdBuilder.h>

#include <chrono>
#include <fstream>

#include "AsioDefines.h"
Expand All @@ -31,6 +32,7 @@
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "MockServer.h"
#include "OpSendMsg.h"
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
Expand Down Expand Up @@ -1005,15 +1007,17 @@ Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint6
void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative,
const std::string& listenerName, uint64_t requestId,
const LookupDataResultPromisePtr& promise) {
newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, promise);
newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, "LOOKUP",
promise);
}

void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId,
const LookupDataResultPromisePtr& promise) {
newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, promise);
newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, "PARTITIONED_METADATA",
promise);
}

void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType,
const LookupDataResultPromisePtr& promise) {
Lock lock(mutex_);
std::shared_ptr<LookupDataResultPtr> lookupDataResult;
Expand Down Expand Up @@ -1042,6 +1046,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
numOfPendingLookupRequest_++;
lock.unlock();
LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")");
sendCommand(cmd);
}

Expand Down Expand Up @@ -1158,12 +1163,15 @@ void ClientConnection::sendPendingCommands() {
}
}

Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId) {
Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId,
const char* requestType) {
Lock lock(mutex_);

if (isClosed()) {
lock.unlock();
Promise<Result, ResponseData> promise;
LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " << requestId
<< ") to a closed connection");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
Expand All @@ -1182,7 +1190,17 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
pendingRequests_.insert(std::make_pair(requestId, requestData));
lock.unlock();

sendCommand(cmd);
LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " << requestId << ")");
if (mockingRequests_.load(std::memory_order_acquire)) {
if (mockServer_ == nullptr) {
LOG_WARN(cnxString_ << "Mock server is unexpectedly null when processing " << requestType);
sendCommand(cmd);
} else if (!mockServer_->sendRequest(requestType, requestId)) {
sendCommand(cmd);
}
} else {
sendCommand(cmd);
}
return requestData.promise.getFuture();
}

Expand Down Expand Up @@ -1625,9 +1643,6 @@ void ClientConnection::handleConsumerStatsResponse(

void ClientConnection::handleLookupTopicRespose(
const proto::CommandLookupTopicResponse& lookupTopicResponse) {
LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
<< lookupTopicResponse.request_id());

Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
Expand Down
22 changes: 20 additions & 2 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ struct ResponseData {

typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;

class MockServer;
class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<ClientConnection> {
enum State : uint8_t
{
Expand All @@ -123,6 +124,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
Ready,
Disconnected
};
using RequestDelayType =
std::unordered_map<std::string /* request type */, long /* delay in milliseconds */>;

public:
typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
Expand Down Expand Up @@ -185,7 +188,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
* Send a request with a specific Id over the connection. The future will be
* triggered when the response for this request is received
*/
Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, int requestId);
Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, int requestId,
const char* requestType);

const std::string& brokerAddress() const;

Expand All @@ -208,6 +212,13 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, const std::string& version,
uint64_t requestId);

void attachMockServer(const std::shared_ptr<MockServer>& mockServer) {
mockServer_ = mockServer;
// Mark that requests will first go through the mock server, if the mock server cannot process it,
// fall back to the normal logic
mockingRequests_.store(true, std::memory_order_release);
}

private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
Expand Down Expand Up @@ -264,7 +275,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
void handleSendPair(const ASIO_ERROR& err);
void sendPendingCommands();
void newLookup(const SharedBuffer& cmd, uint64_t requestId, const LookupDataResultPromisePtr& promise);
void newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType,
const LookupDataResultPromisePtr& promise);

void handleRequestTimeout(const ASIO_ERROR& ec, const PendingRequestData& pendingRequestData);

Expand Down Expand Up @@ -308,6 +320,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
}
}

void mockSendCommand(const char* requestType, uint64_t requestId, const SharedBuffer& cmd);

std::atomic<State> state_{Pending};
TimeDuration operationsTimeout_;
AuthenticationPtr authentication_;
Expand Down Expand Up @@ -391,6 +405,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
DeadlineTimerPtr keepAliveTimer_;
DeadlineTimerPtr consumerStatsRequestTimer_;

std::atomic_bool mockingRequests_{false};
std::shared_ptr<MockServer> mockServer_;

void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector<uint64_t>& consumerStatsRequests);

void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
Expand All @@ -405,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

friend class PulsarFriend;
friend class ConsumerTest;
friend class MockServer;

void checkServerError(ServerError error, const std::string& message);

Expand Down
Loading
Loading