Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/Coordination/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ std::string checkAndGetSuperdigest(const Poco::Util::AbstractConfiguration & con
return user_and_digest;
}

int getIsUseRocksdbFromConfig(const Poco::Util::AbstractConfiguration & config)
{
if (!config.has("keeper_server.rocksdb_isuse"))
return 0;
return config.getInt("keeper_server.rocksdb_isuse");
}

std::string getRocksdbPathFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper)
{
if (config.has("keeper_server.keeper_rocksdb_path"))
return config.getString("keeper_server.keeper_rocksdb_path");

if (standalone_keeper)
return std::filesystem::path{config.getString("path", KEEPER_DEFAULT_PATH)} / "rocksdb";
else
return std::filesystem::path{config.getString("path", DBMS_DEFAULT_PATH)} / "rocksdb";
}

}

KeeperServer::KeeperServer(
Expand All @@ -105,7 +123,9 @@ KeeperServer::KeeperServer(
responses_queue_, snapshots_queue_,
getSnapshotsPathFromConfig(config, standalone_keeper),
coordination_settings,
checkAndGetSuperdigest(config)))
checkAndGetSuperdigest(config),
getRocksdbPathFromConfig(config, standalone_keeper),
getIsUseRocksdbFromConfig(config)))
, state_manager(nuraft::cs_new<KeeperStateManager>(server_id, "keeper_server", config, coordination_settings, standalone_keeper))
, log(&Poco::Logger::get("KeeperServer"))
{
Expand Down
34 changes: 24 additions & 10 deletions src/Coordination/KeeperSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace
return "/";
}

void writeNode(const KeeperStorage::Node & node, WriteBuffer & out)
void writeNode(const KeeperStorageNode & node, WriteBuffer & out)
{
writeBinary(node.data, out);

Expand All @@ -78,7 +78,7 @@ namespace
writeBinary(node.seq_num, out);
}

void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
void readNode(KeeperStorageNode & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
{
readBinary(node.data, in);

Expand Down Expand Up @@ -268,23 +268,34 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
{
std::string path;
readBinary(path, in);
KeeperStorage::Node node{};
KeeperStorageNode node{};
readNode(node, in, current_version, storage.acl_map);
storage.container.insertOrReplace(path, node);
if (node.stat.ephemeralOwner != 0)
storage.ephemerals[node.stat.ephemeralOwner].insert(path);

current_size++;
}

for (const auto & itr : storage.container)

std::vector<std::string> vecKey;
storage.container.rocksdbIter(vecKey);
for (const auto & itr : vecKey)
{
if (itr.key != "/")
if (itr != "/")
{
auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); });
auto parent_path = parentPath(itr);
storage.container.updateValue(parent_path, [&path = itr] (KeeperStorageNode & value) { value.children.insert(getBaseName(path)); });
}
}

// for (const auto & itr : storage.container)
// {
// if (itr.key != "/")
// {
// auto parent_path = parentPath(itr.key);
// storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorageNode & value) { value.children.insert(getBaseName(path)); });
// }
// }

size_t active_sessions_size;
readBinary(active_sessions_size, in);
Expand Down Expand Up @@ -368,12 +379,15 @@ KeeperStorageSnapshot::~KeeperStorageSnapshot()
KeeperSnapshotManager::KeeperSnapshotManager(
const std::string & snapshots_path_, size_t snapshots_to_keep_,
bool compress_snapshots_zstd_,
const std::string & superdigest_, size_t storage_tick_time_)
const std::string & superdigest_, size_t storage_tick_time_,
const std::string & rocksdbpath_, int isuse_rocksdb_)
: snapshots_path(snapshots_path_)
, snapshots_to_keep(snapshots_to_keep_)
, compress_snapshots_zstd(compress_snapshots_zstd_)
, superdigest(superdigest_)
, storage_tick_time(storage_tick_time_)
, rocksdbpath(rocksdbpath_)
, isuse_rocksdb(isuse_rocksdb_)
{
namespace fs = std::filesystem;

Expand Down Expand Up @@ -496,7 +510,7 @@ SnapshotDeserializationResult KeeperSnapshotManager::deserializeSnapshotFromBuff
compressed_reader = std::make_unique<CompressedReadBuffer>(*reader);

SnapshotDeserializationResult result;
result.storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest);
result.storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest, rocksdbpath, isuse_rocksdb);
KeeperStorageSnapshot::deserialize(result, *compressed_reader);
return result;
}
Expand Down
5 changes: 4 additions & 1 deletion src/Coordination/KeeperSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ class KeeperSnapshotManager
public:
KeeperSnapshotManager(
const std::string & snapshots_path_, size_t snapshots_to_keep_,
bool compress_snapshots_zstd_ = true, const std::string & superdigest_ = "", size_t storage_tick_time_ = 500);
bool compress_snapshots_zstd_ = true, const std::string & superdigest_ = "", size_t storage_tick_time_ = 500,
const std::string & rocksdbpath_ = "", int isuse_rocksdb_ = 0);

/// Restore storage from latest available snapshot
SnapshotDeserializationResult restoreFromLatestSnapshot();
Expand Down Expand Up @@ -143,6 +144,8 @@ class KeeperSnapshotManager
const std::string superdigest;
/// Storage sessions timeout check interval (also for deserializatopn)
size_t storage_tick_time;
const std::string rocksdbpath;
int isuse_rocksdb;
};

/// Keeper create snapshots in background thread. KeeperStateMachine just create
Expand Down
12 changes: 8 additions & 4 deletions src/Coordination/KeeperStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,28 @@ KeeperStateMachine::KeeperStateMachine(
SnapshotsQueue & snapshots_queue_,
const std::string & snapshots_path_,
const CoordinationSettingsPtr & coordination_settings_,
const std::string & superdigest_)
const std::string & superdigest_,
const std::string & rocksdbpath_,
int isuse_rocksdb_)
: coordination_settings(coordination_settings_)
, snapshot_manager(
snapshots_path_, coordination_settings->snapshots_to_keep,
coordination_settings->compress_snapshots_with_zstd_format, superdigest_,
coordination_settings->dead_session_check_period_ms.totalMicroseconds())
coordination_settings->dead_session_check_period_ms.totalMicroseconds(), rocksdbpath_, isuse_rocksdb_)
, responses_queue(responses_queue_)
, snapshots_queue(snapshots_queue_)
, last_committed_idx(0)
, log(&Poco::Logger::get("KeeperStateMachine"))
, superdigest(superdigest_)
, rocksdbpath(rocksdbpath_)
, isuse_rocksdb(isuse_rocksdb_)
{
}

void KeeperStateMachine::init()
{
/// Do everything without mutexes, no other threads exist.
LOG_DEBUG(log, "Totally have {} snapshots", snapshot_manager.totalSnapshots());
LOG_DEBUG(log, "Totally have {} snapshots.rocksdbpath{} .isuse rocksdb={}", snapshot_manager.totalSnapshots(), rocksdbpath, isuse_rocksdb);
bool loaded = false;
bool has_snapshots = snapshot_manager.totalSnapshots() != 0;
/// Deserialize latest snapshot from disk
Expand Down Expand Up @@ -102,7 +106,7 @@ void KeeperStateMachine::init()
}

if (!storage)
storage = std::make_unique<KeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest);
storage = std::make_unique<KeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest, rocksdbpath, isuse_rocksdb);
}

nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
Expand Down
4 changes: 3 additions & 1 deletion src/Coordination/KeeperStateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class KeeperStateMachine : public nuraft::state_machine
KeeperStateMachine(
ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_,
const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_,
const std::string & superdigest_ = "");
const std::string & superdigest_ = "", const std::string & rocksdbpath_ = "", int isuse_rocksdb_ = 0);

/// Read state from the latest snapshot
void init();
Expand Down Expand Up @@ -125,6 +125,8 @@ class KeeperStateMachine : public nuraft::state_machine

/// Special part of ACL system -- superdigest specified in server config.
const std::string superdigest;
const std::string rocksdbpath;
int isuse_rocksdb;
};

}
Loading