diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index f4787a66d7d4..b2232706aa8a 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -90,6 +90,24 @@ std::string checkAndGetSuperdigest(const Poco::Util::AbstractConfiguration & con return user_and_digest; } +int getIsUseRocksdbFromConfig(const Poco::Util::AbstractConfiguration & config) +{ + if (!config.has("keeper_server.rocksdb_isuse")) + return 0; + return config.getInt("keeper_server.rocksdb_isuse"); +} + +std::string getRocksdbPathFromConfig(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper) +{ + if (config.has("keeper_server.keeper_rocksdb_path")) + return config.getString("keeper_server.keeper_rocksdb_path"); + + if (standalone_keeper) + return std::filesystem::path{config.getString("path", KEEPER_DEFAULT_PATH)} / "rocksdb"; + else + return std::filesystem::path{config.getString("path", DBMS_DEFAULT_PATH)} / "rocksdb"; +} + } KeeperServer::KeeperServer( @@ -105,7 +123,9 @@ KeeperServer::KeeperServer( responses_queue_, snapshots_queue_, getSnapshotsPathFromConfig(config, standalone_keeper), coordination_settings, - checkAndGetSuperdigest(config))) + checkAndGetSuperdigest(config), + getRocksdbPathFromConfig(config, standalone_keeper), + getIsUseRocksdbFromConfig(config))) , state_manager(nuraft::cs_new(server_id, "keeper_server", config, coordination_settings, standalone_keeper)) , log(&Poco::Logger::get("KeeperServer")) { diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index be6d4db42190..cf01e0d50237 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -55,7 +55,7 @@ namespace return "/"; } - void writeNode(const KeeperStorage::Node & node, WriteBuffer & out) + void writeNode(const KeeperStorageNode & node, WriteBuffer & out) { writeBinary(node.data, out); @@ -78,7 +78,7 @@ namespace writeBinary(node.seq_num, out); } - void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map) + void readNode(KeeperStorageNode & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map) { readBinary(node.data, in); @@ -268,7 +268,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial { std::string path; readBinary(path, in); - KeeperStorage::Node node{}; + KeeperStorageNode node{}; readNode(node, in, current_version, storage.acl_map); storage.container.insertOrReplace(path, node); if (node.stat.ephemeralOwner != 0) @@ -276,15 +276,26 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial current_size++; } - - for (const auto & itr : storage.container) + + std::vector vecKey; + storage.container.rocksdbIter(vecKey); + for (const auto & itr : vecKey) { - if (itr.key != "/") + if (itr != "/") { - auto parent_path = parentPath(itr.key); - storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); }); + auto parent_path = parentPath(itr); + storage.container.updateValue(parent_path, [&path = itr] (KeeperStorageNode & value) { value.children.insert(getBaseName(path)); }); } } + + // for (const auto & itr : storage.container) + // { + // if (itr.key != "/") + // { + // auto parent_path = parentPath(itr.key); + // storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorageNode & value) { value.children.insert(getBaseName(path)); }); + // } + // } size_t active_sessions_size; readBinary(active_sessions_size, in); @@ -368,12 +379,15 @@ KeeperStorageSnapshot::~KeeperStorageSnapshot() KeeperSnapshotManager::KeeperSnapshotManager( const std::string & snapshots_path_, size_t snapshots_to_keep_, bool compress_snapshots_zstd_, - const std::string & superdigest_, size_t storage_tick_time_) + const std::string & superdigest_, size_t storage_tick_time_, + const std::string & rocksdbpath_, int isuse_rocksdb_) : snapshots_path(snapshots_path_) , snapshots_to_keep(snapshots_to_keep_) , compress_snapshots_zstd(compress_snapshots_zstd_) , superdigest(superdigest_) , storage_tick_time(storage_tick_time_) + , rocksdbpath(rocksdbpath_) + , isuse_rocksdb(isuse_rocksdb_) { namespace fs = std::filesystem; @@ -496,7 +510,7 @@ SnapshotDeserializationResult KeeperSnapshotManager::deserializeSnapshotFromBuff compressed_reader = std::make_unique(*reader); SnapshotDeserializationResult result; - result.storage = std::make_unique(storage_tick_time, superdigest); + result.storage = std::make_unique(storage_tick_time, superdigest, rocksdbpath, isuse_rocksdb); KeeperStorageSnapshot::deserialize(result, *compressed_reader); return result; } diff --git a/src/Coordination/KeeperSnapshotManager.h b/src/Coordination/KeeperSnapshotManager.h index 2889ec493df0..c70049563882 100644 --- a/src/Coordination/KeeperSnapshotManager.h +++ b/src/Coordination/KeeperSnapshotManager.h @@ -89,7 +89,8 @@ class KeeperSnapshotManager public: KeeperSnapshotManager( const std::string & snapshots_path_, size_t snapshots_to_keep_, - bool compress_snapshots_zstd_ = true, const std::string & superdigest_ = "", size_t storage_tick_time_ = 500); + bool compress_snapshots_zstd_ = true, const std::string & superdigest_ = "", size_t storage_tick_time_ = 500, + const std::string & rocksdbpath_ = "", int isuse_rocksdb_ = 0); /// Restore storage from latest available snapshot SnapshotDeserializationResult restoreFromLatestSnapshot(); @@ -143,6 +144,8 @@ class KeeperSnapshotManager const std::string superdigest; /// Storage sessions timeout check interval (also for deserializatopn) size_t storage_tick_time; + const std::string rocksdbpath; + int isuse_rocksdb; }; /// Keeper create snapshots in background thread. KeeperStateMachine just create diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 62820b417311..1607ba661896 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -45,24 +45,28 @@ KeeperStateMachine::KeeperStateMachine( SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_, - const std::string & superdigest_) + const std::string & superdigest_, + const std::string & rocksdbpath_, + int isuse_rocksdb_) : coordination_settings(coordination_settings_) , snapshot_manager( snapshots_path_, coordination_settings->snapshots_to_keep, coordination_settings->compress_snapshots_with_zstd_format, superdigest_, - coordination_settings->dead_session_check_period_ms.totalMicroseconds()) + coordination_settings->dead_session_check_period_ms.totalMicroseconds(), rocksdbpath_, isuse_rocksdb_) , responses_queue(responses_queue_) , snapshots_queue(snapshots_queue_) , last_committed_idx(0) , log(&Poco::Logger::get("KeeperStateMachine")) , superdigest(superdigest_) + , rocksdbpath(rocksdbpath_) + , isuse_rocksdb(isuse_rocksdb_) { } void KeeperStateMachine::init() { /// Do everything without mutexes, no other threads exist. - LOG_DEBUG(log, "Totally have {} snapshots", snapshot_manager.totalSnapshots()); + LOG_DEBUG(log, "Totally have {} snapshots.rocksdbpath{} .isuse rocksdb={}", snapshot_manager.totalSnapshots(), rocksdbpath, isuse_rocksdb); bool loaded = false; bool has_snapshots = snapshot_manager.totalSnapshots() != 0; /// Deserialize latest snapshot from disk @@ -102,7 +106,7 @@ void KeeperStateMachine::init() } if (!storage) - storage = std::make_unique(coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest); + storage = std::make_unique(coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest, rocksdbpath, isuse_rocksdb); } nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data) diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 5af3dff73b5e..9a6e78b5b1a7 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -22,7 +22,7 @@ class KeeperStateMachine : public nuraft::state_machine KeeperStateMachine( ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_, - const std::string & superdigest_ = ""); + const std::string & superdigest_ = "", const std::string & rocksdbpath_ = "", int isuse_rocksdb_ = 0); /// Read state from the latest snapshot void init(); @@ -125,6 +125,8 @@ class KeeperStateMachine : public nuraft::state_machine /// Special part of ACL system -- superdigest specified in server config. const std::string superdigest; + const std::string rocksdbpath; + int isuse_rocksdb; }; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index b6938da958b5..650482f547c6 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -188,11 +188,14 @@ static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & pat return result; } -KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_) +KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_, const std::string & rocksdbpath_, int isuse_rocksdb_) : session_expiry_queue(tick_time_ms) , superdigest(superdigest_) + , rocksdbpath(rocksdbpath_) + , isuse_rocksdb(isuse_rocksdb_) { - container.insert("/", Node()); + container.rocksdbOpen(rocksdbpath_, isuse_rocksdb_); + container.insert("/", KeeperStorageNode()); } using Undo = std::function; @@ -246,11 +249,11 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr auto & container = storage.container; auto parent_path = parentPath(zk_request->getPath()); - auto it = container.find(parent_path); - if (it == container.end()) + auto it = container.find(parent_path, false); + if (!it.active_in_map) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(it.value.acl_id); if (node_acls.empty()) return true; @@ -267,16 +270,15 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr Undo undo; Coordination::ZooKeeperCreateResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperCreateRequest & request = dynamic_cast(*zk_request); - + auto parent_path = parentPath(request.path); auto it = container.find(parent_path); - - if (it == container.end()) + if (!it.active_in_map) { response.error = Coordination::Error::ZNONODE; return { response_ptr, undo }; } - else if (it->value.stat.ephemeralOwner != 0) + else if (it.value.stat.ephemeralOwner != 0) { response.error = Coordination::Error::ZNOCHILDRENFOREPHEMERALS; return { response_ptr, undo }; @@ -284,7 +286,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr std::string path_created = request.path; if (request.is_sequential) { - auto seq_num = it->value.seq_num; + auto seq_num = it.value.seq_num; std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM seq_num_str.exceptions(std::ios::failbit); @@ -306,7 +308,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr auto & session_auth_ids = storage.session_and_auth[session_id]; - KeeperStorage::Node created_node; + KeeperStorageNode created_node; Coordination::ACLs node_acls; if (!fixupACL(request.acls, session_auth_ids, node_acls, !request.restored_from_zookeeper_log)) @@ -334,7 +336,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr int64_t prev_parent_zxid; int32_t prev_parent_cversion; container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid, - parent_cversion, &prev_parent_cversion] (KeeperStorage::Node & parent) + parent_cversion, &prev_parent_cversion] (KeeperStorageNode & parent) { parent.children.insert(child_path); @@ -368,7 +370,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr if (is_ephemeral) storage.ephemerals[session_id].erase(path_created); - storage.container.updateValue(parent_path, [child_path, prev_parent_zxid, prev_parent_cversion] (KeeperStorage::Node & undo_parent) + storage.container.updateValue(parent_path, [child_path, prev_parent_zxid, prev_parent_cversion] (KeeperStorageNode & undo_parent) { --undo_parent.stat.numChildren; --undo_parent.seq_num; @@ -389,11 +391,11 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto it = container.find(zk_request->getPath(), false); + if (!it.active_in_map) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(it.value.acl_id); if (node_acls.empty()) return true; @@ -410,14 +412,14 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce Coordination::ZooKeeperGetRequest & request = dynamic_cast(*zk_request); auto it = container.find(request.path); - if (it == container.end()) + if (!it.active_in_map) { response.error = Coordination::Error::ZNONODE; } else { - response.stat = it->value.stat; - response.data = it->value.data; + response.stat = it.value.stat; + response.data = it.value.data; response.error = Coordination::Error::ZOK; } @@ -432,9 +434,9 @@ namespace { auto parent_path = parentPath(child_path); auto parent_it = container.find(parent_path); - if (parent_it != container.end()) + if (parent_it.active_in_map) { - container.updateValue(parent_path, [zxid](KeeperStorage::Node & parent) + container.updateValue(parent_path, [zxid](KeeperStorageNode & parent) { if (parent.stat.pzxid < zxid) parent.stat.pzxid = zxid; @@ -448,11 +450,11 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(parentPath(zk_request->getPath())); - if (it == container.end()) + auto it = container.find(parentPath(zk_request->getPath()), false); + if (!it.active_in_map) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(it.value.acl_id); if (node_acls.empty()) return true; @@ -472,17 +474,17 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr Undo undo; auto it = container.find(request.path); - if (it == container.end()) + if (!it.active_in_map) { if (request.restored_from_zookeeper_log) updateParentPzxid(request.path, zxid, container); response.error = Coordination::Error::ZNONODE; } - else if (request.version != -1 && request.version != it->value.stat.version) + else if (request.version != -1 && request.version != it.value.stat.version) { response.error = Coordination::Error::ZBADVERSION; } - else if (it->value.stat.numChildren) + else if (it.value.stat.numChildren) { response.error = Coordination::Error::ZNOTEMPTY; } @@ -491,7 +493,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr if (request.restored_from_zookeeper_log) updateParentPzxid(request.path, zxid, container); - auto prev_node = it->value; + auto prev_node = it.value; if (prev_node.stat.ephemeralOwner != 0) { auto ephemerals_it = ephemerals.find(prev_node.stat.ephemeralOwner); @@ -502,8 +504,8 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr storage.acl_map.removeUsage(prev_node.acl_id); - auto child_basename = getBaseName(it->key); - container.updateValue(parentPath(request.path), [&child_basename] (KeeperStorage::Node & parent) + auto child_basename = getBaseName(it.key); + container.updateValue(parentPath(request.path), [&child_basename] (KeeperStorageNode & parent) { --parent.stat.numChildren; ++parent.stat.cversion; @@ -522,7 +524,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr storage.acl_map.addUsage(prev_node.acl_id); storage.container.insert(path, prev_node); - storage.container.updateValue(parentPath(path), [&child_basename] (KeeperStorage::Node & parent) + storage.container.updateValue(parentPath(path), [&child_basename] (KeeperStorageNode & parent) { ++parent.stat.numChildren; --parent.stat.cversion; @@ -552,9 +554,9 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr Coordination::ZooKeeperExistsRequest & request = dynamic_cast(*zk_request); auto it = container.find(request.path); - if (it != container.end()) + if (it.active_in_map) { - response.stat = it->value.stat; + response.stat = it.value.stat; response.error = Coordination::Error::ZOK; } else @@ -571,11 +573,11 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto it = container.find(zk_request->getPath(), false); + if (!it.active_in_map) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(it.value.acl_id); if (node_acls.empty()) return true; @@ -594,16 +596,16 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce Undo undo; auto it = container.find(request.path); - if (it == container.end()) + if (!it.active_in_map) { response.error = Coordination::Error::ZNONODE; } - else if (request.version == -1 || request.version == it->value.stat.version) + else if (request.version == -1 || request.version == it.value.stat.version) { - auto prev_node = it->value; + auto prev_node = it.value; - auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorage::Node & value) + auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorageNode & value) { value.data = request.data; value.stat.version++; @@ -613,18 +615,18 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce value.data = request.data; }); - container.updateValue(parentPath(request.path), [] (KeeperStorage::Node & parent) + container.updateValue(parentPath(request.path), [] (KeeperStorageNode & parent) { parent.stat.cversion++; }); - response.stat = itr->value.stat; + response.stat = itr.stat; response.error = Coordination::Error::ZOK; undo = [prev_node, &container, path = request.path] { - container.updateValue(path, [&prev_node] (KeeperStorage::Node & value) { value = prev_node; }); - container.updateValue(parentPath(path), [] (KeeperStorage::Node & parent) + container.updateValue(path, [&prev_node] (KeeperStorageNode & value) { value = prev_node; }); + container.updateValue(parentPath(path), [] (KeeperStorageNode & parent) { parent.stat.cversion--; }); @@ -649,11 +651,11 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto it = container.find(zk_request->getPath(), false); + if (!it.active_in_map) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(it.value.acl_id); if (node_acls.empty()) return true; @@ -669,7 +671,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc Coordination::ZooKeeperListResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperListRequest & request = dynamic_cast(*zk_request); auto it = container.find(request.path); - if (it == container.end()) + if (!it.active_in_map) { response.error = Coordination::Error::ZNONODE; } @@ -679,9 +681,8 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc if (path_prefix.empty()) throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR); - response.names.insert(response.names.end(), it->value.children.begin(), it->value.children.end()); - - response.stat = it->value.stat; + response.names.insert(response.names.end(), it.value.children.begin(), it.value.children.end()); + response.stat = it.value.stat; response.error = Coordination::Error::ZOK; } @@ -694,11 +695,11 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto it = container.find(zk_request->getPath(), false); + if (!it.active_in_map) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(it.value.acl_id); if (node_acls.empty()) return true; @@ -715,11 +716,11 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro Coordination::ZooKeeperCheckResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperCheckRequest & request = dynamic_cast(*zk_request); auto it = container.find(request.path); - if (it == container.end()) + if (!it.active_in_map) { response.error = Coordination::Error::ZNONODE; } - else if (request.version != -1 && request.version != it->value.stat.version) + else if (request.version != -1 && request.version != it.value.stat.version) { response.error = Coordination::Error::ZBADVERSION; } @@ -738,11 +739,11 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto it = container.find(zk_request->getPath(), false); + if (!it.active_in_map) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(it.value.acl_id); if (node_acls.empty()) return true; @@ -760,11 +761,11 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr Coordination::ZooKeeperSetACLResponse & response = dynamic_cast(*response_ptr); Coordination::ZooKeeperSetACLRequest & request = dynamic_cast(*zk_request); auto it = container.find(request.path); - if (it == container.end()) + if (!it.active_in_map) { response.error = Coordination::Error::ZNONODE; } - else if (request.version != -1 && request.version != it->value.stat.aversion) + else if (request.version != -1 && request.version != it.value.stat.aversion) { response.error = Coordination::Error::ZBADVERSION; } @@ -782,13 +783,13 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr uint64_t acl_id = storage.acl_map.convertACLs(node_acls); storage.acl_map.addUsage(acl_id); - storage.container.updateValue(request.path, [acl_id] (KeeperStorage::Node & node) + storage.container.updateValue(request.path, [acl_id] (KeeperStorageNode & node) { node.acl_id = acl_id; ++node.stat.aversion; }); - response.stat = it->value.stat; + response.stat = it.value.stat; response.error = Coordination::Error::ZOK; } @@ -802,11 +803,11 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr bool checkAuth(KeeperStorage & storage, int64_t session_id) const override { auto & container = storage.container; - auto it = container.find(zk_request->getPath()); - if (it == container.end()) + auto it = container.find(zk_request->getPath(), false); + if (!it.active_in_map) return true; - const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + const auto & node_acls = storage.acl_map.convertNumber(it.value.acl_id); if (node_acls.empty()) return true; @@ -823,14 +824,14 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr Coordination::ZooKeeperGetACLRequest & request = dynamic_cast(*zk_request); auto & container = storage.container; auto it = container.find(request.path); - if (it == container.end()) + if (!it.active_in_map) { response.error = Coordination::Error::ZNONODE; } else { - response.stat = it->value.stat; - response.acl = storage.acl_map.convertNumber(it->value.acl_id); + response.stat = it.value.stat; + response.acl = storage.acl_map.convertNumber(it.value.acl_id); } return {response_ptr, {}}; @@ -1005,6 +1006,7 @@ void KeeperStorage::finalize() list_watches.clear(); sessions_and_watchers.clear(); session_expiry_queue.clear(); + container.rocksdbClose(); } @@ -1089,7 +1091,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina for (const auto & ephemeral_path : it->second) { container.erase(ephemeral_path); - container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (KeeperStorage::Node & parent) + container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (KeeperStorageNode & parent) { --parent.stat.numChildren; ++parent.stat.cversion; diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index bc9a81bc4841..3b91545c37f7 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +23,7 @@ using ChildrenSet = std::unordered_set; using SessionAndTimeout = std::unordered_map; struct KeeperStorageSnapshot; +struct KeeperStorageNode; /// Keeper state machine almost equal to the ZooKeeper's state machine. /// Implements all logic of operations, data changes, sessions allocation. @@ -31,16 +33,6 @@ class KeeperStorage public: int64_t session_id_counter{1}; - struct Node - { - String data; - uint64_t acl_id = 0; /// 0 -- no ACL by default - bool is_sequental = false; - Coordination::Stat stat{}; - int32_t seq_num = 0; - ChildrenSet children{}; - }; - struct ResponseForSession { int64_t session_id; @@ -68,7 +60,7 @@ class KeeperStorage using RequestsForSessions = std::vector; - using Container = SnapshotableHashTable; + using Container = SnapshotableRocksdbInfo;//SnapshotableHashTable; using Ephemerals = std::unordered_map>; using SessionAndWatcher = std::unordered_map>; using SessionIDs = std::vector; @@ -114,9 +106,11 @@ class KeeperStorage } const String superdigest; + const String rocksdbpath; + int isuse_rocksdb; public: - KeeperStorage(int64_t tick_time_ms, const String & superdigest_); + KeeperStorage(int64_t tick_time_ms, const String & superdigest_, const std::string & rocksdbpath_ = "", int isuse_rocksdb_ = 0); /// Allocate new session id with the specified timeouts int64_t getSessionID(int64_t session_timeout_ms) diff --git a/src/Coordination/SnapshotableHashTable.h b/src/Coordination/SnapshotableHashTable.h index 454d11101963..02e7081f2def 100644 --- a/src/Coordination/SnapshotableHashTable.h +++ b/src/Coordination/SnapshotableHashTable.h @@ -132,6 +132,7 @@ class SnapshotableHashTable auto map_it = map.find(key); if (map_it != map.end()) return map_it->second; + return list.end(); } diff --git a/src/Coordination/SnapshotableRocksdbInfo.h b/src/Coordination/SnapshotableRocksdbInfo.h new file mode 100644 index 000000000000..afe4a9f81b3e --- /dev/null +++ b/src/Coordination/SnapshotableRocksdbInfo.h @@ -0,0 +1,625 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/env.h" +#include "clickhouse_grpc.grpc.pb.h" + +using GRPCSnapshotableDataInfo = clickhouse::grpc::SnapshotableDataInfo; + +namespace DB +{ + +struct KeeperStorageNode +{ + String data; + uint64_t acl_id = 0; /// 0 -- no ACL by default + bool is_sequental = false; + Coordination::Stat stat{}; + int32_t seq_num = 0; + std::unordered_set children{}; +}; + +struct ListRocksDbNode +{ + std::string key; + KeeperStorageNode value; + bool active_in_map; +}; + +class SnapshotableRocksdbInfo +{ +private: + using List = std::list; + using IndexMap = std::unordered_map; + + List list; + IndexMap map; + bool snapshot_mode{false}; + + String rocksdb_path; + bool isopen_rocksdb{false}; + bool isuse_rocksdb{false}; + Poco::Logger * log; + rocksdb::DB* db; + rocksdb::Options options; + size_t snapshot_container_size; +public: + SnapshotableRocksdbInfo() + { + rocksdb_path = "./racksdb"; + snapshot_container_size = 0; + log = &Poco::Logger::get("SnapshotableRocksdbInfo"); + LOG_DEBUG(log, "SnapshotableRocksdbInfo:SnapshotableRocksdbInfo snapshot_container_size = {} snapshots", snapshot_container_size); + } + + //RocksDb + void rocksdbSerializeToString(const KeeperStorageNode & value, std::string &buff) + { + GRPCSnapshotableDataInfo rsp; + rsp.set_data(value.data); + rsp.set_acl_id(value.acl_id); + rsp.set_is_sequental(value.is_sequental); + rsp.set_seq_num(value.seq_num); + auto stat_info = rsp.mutable_stat(); + stat_info->set_czxid(value.stat.czxid); + stat_info->set_mzxid(value.stat.mzxid); + stat_info->set_ctime(value.stat.ctime); + stat_info->set_mtime(value.stat.mtime); + stat_info->set_version(value.stat.version); + stat_info->set_aversion(value.stat.aversion); + stat_info->set_ephemeralowner(value.stat.ephemeralOwner); + stat_info->set_datalength(value.stat.dataLength); + stat_info->set_numchildren(value.stat.numChildren); + stat_info->set_pzxid(value.stat.pzxid); + + rsp.SerializeToString(&buff); + } + + bool rocksdbParseFromString(const std::string &buff, const std::string & key, KeeperStorageNode & value) const + { + GRPCSnapshotableDataInfo rsp; + if (!rsp.ParseFromString(buff)) + { + LOG_DEBUG(log, "SnapshotableRocksdbInfo ParseFromString is error!!!!!!!!!!!! key = {}.", key); + return false; + } + + value.data = rsp.data(); + value.acl_id = rsp.acl_id(); + value.is_sequental = rsp.is_sequental(); + value.seq_num = rsp.seq_num(); + auto statInfo = rsp.stat(); + value.stat.czxid = statInfo.czxid(); + value.stat.mzxid = statInfo.mzxid(); + value.stat.ctime = statInfo.ctime(); + value.stat.mtime = statInfo.mtime(); + value.stat.version = statInfo.version(); + value.stat.aversion = statInfo.aversion(); + value.stat.ephemeralOwner = statInfo.ephemeralowner(); + value.stat.dataLength = statInfo.datalength(); + value.stat.numChildren = statInfo.numchildren(); + value.stat.pzxid = statInfo.pzxid(); + return true; + } + + bool rocksdbOpen(const std::string & rocksdbpath_, int isuse_rocksdb_) + { + rocksdb_path = rocksdbpath_; + if (isuse_rocksdb_ > 0) + { + isuse_rocksdb = true; + } + + if (isopen_rocksdb) + return true; + + options.create_if_missing = true; + rocksdb::Status status = rocksdb::DB::Open(options, rocksdb_path, &db); + if(!status.ok()){ + LOG_ERROR(log, "SnapshotableRocksdbInfo: rocksdbOpen status is fail------path={} .isuse={}",rocksdb_path, isuse_rocksdb); + return true; + } + + isopen_rocksdb = true; + LOG_DEBUG(log, "SnapshotableRocksdbInfo: rocksdbOpen is success------path={} .isuse={}",rocksdb_path, isuse_rocksdb); + return true; + } + + bool rocksdbPut(const std::string & key, const KeeperStorageNode & value) + { + if (!isopen_rocksdb || db == nullptr) + { + LOG_ERROR(log, "SnapshotableRocksdbInfo rocksdbPut isopen_rocksdb is error"); + return false; + } + + std::string tempStr; + rocksdbSerializeToString(value,tempStr); + rocksdb::Status status = db->Put(rocksdb::WriteOptions(), key, tempStr); + if(status.ok()){ + return true; + } + + LOG_ERROR(log, "SnapshotableRocksdbInfo rocksdbPut status is error"); + return false; + } + + ListRocksDbNode rocksdbGet(const std::string & key, bool isneed_children) const + { + if (!isopen_rocksdb || db == nullptr) + { + LOG_ERROR(log, "SnapshotableRocksdbInfo rocksdbGet isopen_rocksdb is error"); + ListRocksDbNode elem; + elem.active_in_map = false; + return elem; + } + + String get_value; + rocksdb::Status status = db->Get(rocksdb::ReadOptions(), key, &get_value); + if(status.ok()) + { + KeeperStorageNode tempValue; + if (rocksdbParseFromString(get_value, key, tempValue)) + { + if (isneed_children) + { + tempValue.children.clear(); + auto iter = db->NewIterator(rocksdb::ReadOptions()); + for (iter->Seek(key); iter->Valid(); iter->Next()) + { + std::string tempStr = iter->key().ToString(); + if (tempStr == "/" || tempStr == key) + { + continue; + } + + if (key == parentPath(tempStr)) + { + tempValue.children.insert(getBaseName(tempStr)); + } + else if (tempStr.find(key) == tempStr.npos) + { + break; + } + + } + delete iter; + } + + ListRocksDbNode elem; + elem.key = key; + elem.value = tempValue; + elem.active_in_map = true; + return elem; + } + } + + LOG_ERROR(log, "SnapshotableRocksdbInfo rocksdbGet fail error -----"); + ListRocksDbNode elem; + elem.active_in_map = false; + return elem; + } + + bool rocksdbContains(const std::string & key) const + { + if (!isopen_rocksdb || db == nullptr) + return false; + + String get_value; + rocksdb::Status status = db->Get(rocksdb::ReadOptions(), key, &get_value); + if(status.ok()){ + return true; + } + + return false; + } + + bool rocksdbDelete(const std::string & key) + { + if (!isopen_rocksdb || db == nullptr) + return false; + + rocksdb::Status status = db->Delete(rocksdb::WriteOptions(), key); + if(status.ok()) + { + LOG_DEBUG(log, "SnapshotableRocksdbInfo rocksdbDelete!!!{}", key); + if (!snapshot_mode) + { + snapshot_container_size--; + } + return true; + } + + return false; + } + + bool rocksdbIter(std::vector& vecKey) + { + if (!isopen_rocksdb || db == nullptr) + { + return false; + } + + rocksdb::Iterator* iter = db->NewIterator(rocksdb::ReadOptions()); + for (iter->SeekToFirst();iter->Valid();iter->Next()) + { + vecKey.push_back(iter->key().ToString()); + KeeperStorageNode tempValue; + if (rocksdbParseFromString(iter->value().ToString(), iter->key().ToString(), tempValue)) + { + tempValue.children.clear(); + auto iterSeek = db->NewIterator(rocksdb::ReadOptions()); + for (iterSeek->Seek(iter->key().ToString()); iterSeek->Valid(); iterSeek->Next()) + { + std::string tempStr = iterSeek->key().ToString(); + if (tempStr == "/" || tempStr == iter->key().ToString()) + { + continue; + } + + if (iter->key().ToString() == parentPath(tempStr)) + { + tempValue.children.insert(getBaseName(tempStr)); + } + else if (tempStr.find(iter->key().ToString()) == tempStr.npos) + { + break; + } + + } + delete iterSeek; + } + } + + delete iter; + return true; + } + + bool rocksdbClose() + { + if (isopen_rocksdb) + { + db->Close(); + delete db; + rocksdb::DestroyDB(rocksdb_path, options); + isopen_rocksdb = false; + LOG_DEBUG(log, "SnapshotableRocksdbInfo:rocksdbClose!"); + } + return true; + } + + static String parentPath(const String & path) + { + auto rslash_pos = path.rfind('/'); + if (rslash_pos > 0) + return path.substr(0, rslash_pos); + return "/"; + } + + static std::string getBaseName(const String & path) + { + size_t basename_start = path.rfind('/'); + return std::string{&path[basename_start + 1], path.length() - basename_start - 1}; + } + + + using iterator = typename List::iterator; + using const_iterator = typename List::const_iterator; + using reverse_iterator = typename List::reverse_iterator; + using const_reverse_iterator = typename List::const_reverse_iterator; + using ValueUpdater = std::function; + + bool insert(const std::string & key, const KeeperStorageNode & value) + { + if (isuse_rocksdb) + { + if (rocksdbPut(key,value)) + { + if (!contains(key)) + { + snapshot_container_size++; + } + return true; + } + return false; + } + else + { + auto it = map.find(key); + if (it == map.end()) + { + ListRocksDbNode elem{key, value, true}; + auto itr = list.insert(list.end(), elem); + map.emplace(itr->key, itr); + return true; + } + } + + return false; + } + + + void insertOrReplace(const std::string & key, const KeeperStorageNode & value) + { + if (isuse_rocksdb) + { + if (rocksdbPut(key,value)) + { + if (snapshot_mode || !contains(key)) + { + snapshot_container_size++; + } + } + } + else + { + auto it = map.find(key); + if (it == map.end()) + { + ListRocksDbNode elem{key, value, true}; + auto itr = list.insert(list.end(), elem); + map.emplace(itr->key, itr); + } + else + { + auto list_itr = it->second; + if (snapshot_mode) + { + ListRocksDbNode elem{key, value, true}; + list_itr->active_in_map = false; + auto new_list_itr = list.insert(list.end(), elem); + map.erase(it); + map.emplace(new_list_itr->key, new_list_itr); + } + else + { + list_itr->value = value; + } + } + } + } + + bool erase(const std::string & key) + { + if (isuse_rocksdb) + { + rocksdbDelete(key); + } + else + { + auto it = map.find(key); + if (it == map.end()) + return false; + + auto list_itr = it->second; + if (snapshot_mode) + { + list_itr->active_in_map = false; + map.erase(it); + } + else + { + map.erase(it); + list.erase(list_itr); + } + } + return true; + } + + bool contains(const std::string & key) const + { + if (isuse_rocksdb) + { + return rocksdbContains(key); + } + else + { + return map.find(key) != map.end(); + } + } + + KeeperStorageNode updateValue(const std::string & key, ValueUpdater updater) + { + if (isuse_rocksdb) + { + KeeperStorageNode tempValue; + if (!isopen_rocksdb || db == nullptr) + { + LOG_ERROR(log, "SnapshotableRocksdbInfo updateValue isopen_rocksdb os error"); + return tempValue; + } + + String get_value; + rocksdb::Status status = db->Get(rocksdb::ReadOptions(), key, &get_value); + if(status.ok()) + { + if (rocksdbParseFromString(get_value, key, tempValue)) + { + updater(tempValue); + rocksdbPut(key,tempValue); + if (snapshot_mode) + { + snapshot_container_size++; + } + return tempValue; + } + } + else + { + LOG_ERROR(log, "SnapshotableRocksdbInfo updateValue status os error"); + } + return tempValue; + } + else + { + auto it = map.find(key); + assert(it != map.end()); + if (snapshot_mode) + { + auto list_itr = it->second; + auto elem_copy = *(list_itr); + list_itr->active_in_map = false; + map.erase(it); + updater(elem_copy.value); + auto itr = list.insert(list.end(), elem_copy); + map.emplace(itr->key, itr); + return itr->value; + } + else + { + auto list_itr = it->second; + updater(list_itr->value); + return list_itr->value; + } + } + } + + ListRocksDbNode find(const std::string & key,bool isneed_children = true) const + { + if (isuse_rocksdb) + { + return rocksdbGet(key, isneed_children); + } + else + { + auto map_it = map.find(key); + if (map_it != map.end()) + return *map_it->second; + + KeeperStorageNode value; + ListRocksDbNode tempElem{key, value, true}; + tempElem.active_in_map = false; + return tempElem; + } + } + + KeeperStorageNode getValue(const std::string & key) const + { + if (isuse_rocksdb) + { + KeeperStorageNode tempValue; + if (!isopen_rocksdb || db == nullptr) + { + LOG_ERROR(log, "SnapshotableRocksdbInfo getValue isopen_rocksdb is error"); + return tempValue; + } + + String get_value; + rocksdb::Status status = db->Get(rocksdb::ReadOptions(), key, &get_value); + if(status.ok()) + { + if (rocksdbParseFromString(get_value, key, tempValue)) + { + tempValue.children.clear(); + auto iter = db->NewIterator(rocksdb::ReadOptions()); + for (iter->Seek(key); iter->Valid(); iter->Next()) + { + std::string tempStr = iter->key().ToString(); + if (tempStr == "/" || tempStr == key) + { + continue; + } + + if (key == parentPath(tempStr)) + { + tempValue.children.insert(getBaseName(tempStr)); + } + else if (tempStr.find(key) == tempStr.npos) + { + break; + } + + } + delete iter; + } + else + { + LOG_ERROR(log, "SnapshotableRocksdbInfo getValue rocksdbParseFromString is error"); + } + + } + return tempValue; + } + else + { + auto it = map.find(key); + assert(it != map.end()); + return it->second->value; + } + } + + void clearOutdatedNodes() + { + if (isuse_rocksdb) + { + snapshot_container_size = 0; + } + else + { + auto start = list.begin(); + auto end = list.end(); + for (auto itr = start; itr != end;) + { + if (!itr->active_in_map) + itr = list.erase(itr); + else + itr++; + } + } + } + + void clear() + { + list.clear(); + map.clear(); + snapshot_container_size = 0; + } + + void enableSnapshotMode() + { + snapshot_mode = true; + } + + void disableSnapshotMode() + { + snapshot_mode = false; + } + + size_t size() const + { + return map.size(); + } + + size_t snapshotSize() const + { + if (isuse_rocksdb) + { + return snapshot_container_size; + } + else + { + return list.size(); + } + } + + + iterator begin() { return list.begin(); } + const_iterator begin() const { return list.cbegin(); } + iterator end() { return list.end(); } + const_iterator end() const { return list.cend(); } + + reverse_iterator rbegin() { return list.rbegin(); } + const_reverse_iterator rbegin() const { return list.crbegin(); } + reverse_iterator rend() { return list.rend(); } + const_reverse_iterator rend() const { return list.crend(); } +}; + + +} diff --git a/src/Coordination/ZooKeeperDataReader.cpp b/src/Coordination/ZooKeeperDataReader.cpp index 4f71274291b9..b905a3cfdc31 100644 --- a/src/Coordination/ZooKeeperDataReader.cpp +++ b/src/Coordination/ZooKeeperDataReader.cpp @@ -108,7 +108,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L size_t count = 0; while (path != "/") { - KeeperStorage::Node node{}; + KeeperStorageNode node{}; Coordination::read(node.data, in); Coordination::read(node.acl_id, in); @@ -143,15 +143,26 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L LOG_INFO(log, "Deserialized nodes from snapshot: {}", count); } - for (const auto & itr : storage.container) + std::vector vecKey; + storage.container.rocksdbIter(vecKey); + for (const auto & itr : vecKey) { - if (itr.key != "/") + if (itr != "/") { - auto parent_path = parentPath(itr.key); - storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); value.stat.numChildren++; }); + auto parent_path = parentPath(itr); + storage.container.updateValue(parent_path, [&path = itr] (KeeperStorageNode & value) { value.children.insert(getBaseName(path)); value.stat.numChildren++;}); } } + // for (const auto & itr : storage.container) + // { + // if (itr.key != "/") + // { + // auto parent_path = parentPath(itr.key); + // storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorageNode & value) { value.children.insert(getBaseName(path)); value.stat.numChildren++; }); + // } + // } + return max_zxid; } diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index b04af1422dc9..61811ca70cc8 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -923,7 +923,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapTrySnapshot) void addNode(DB::KeeperStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner=0) { - using Node = DB::KeeperStorage::Node; + using Node = DB::KeeperStorageNode; Node node{}; node.data = data; node.stat.ephemeralOwner = ephemeral_owner; diff --git a/src/Server/grpc_protos/clickhouse_grpc.proto b/src/Server/grpc_protos/clickhouse_grpc.proto index c6cafaf6e404..6f4c4f1c9eab 100644 --- a/src/Server/grpc_protos/clickhouse_grpc.proto +++ b/src/Server/grpc_protos/clickhouse_grpc.proto @@ -60,6 +60,29 @@ message Compression { CompressionLevel level = 2; } +message CoordinationStat { + int64 czxid = 1; + int64 mzxid = 2; + int64 ctime = 3; + int64 mtime = 4; + int32 version = 5; + int32 cversion = 6; + int32 aversion = 7; + int64 ephemeralOwner = 8; + int32 dataLength = 9; + int32 numChildren = 10; + int64 pzxid = 11; +} + +message SnapshotableDataInfo { + string data = 1; + uint64 acl_id = 2; + bool is_sequental = 3; + CoordinationStat stat = 4; + int32 seq_num = 5; + repeated string children = 6; +} + // Information about a query which a client sends to a ClickHouse server. // The first QueryInfo can set any of the following fields. Extra QueryInfos only add extra data. // In extra QueryInfos only `input_data`, `external_tables`, `next_query_info` and `cancel` fields can be set.