From a2ee06eb222aec6cd29a86843d4c4328d86d8d1e Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Tue, 8 Nov 2022 15:53:44 +0900 Subject: [PATCH 01/15] rgw/dedup: RGWDedup initial commit This commit adds an instance of RGWDedup and a skeleton code of DedupManager. RGWDedup - an instance that controlls RGWDedupManager during its lifecycle. DedupManager - a thread that manages whole deduplication routine. Signed-off-by: Sungmin Lee sung_min.lee@samsung.com --- src/common/options/rgw.yaml.in | 8 +++++ src/rgw/CMakeLists.txt | 4 ++- src/rgw/rgw_admin.cc | 2 +- src/rgw/rgw_appmain.cc | 1 + src/rgw/rgw_dedup.cc | 53 ++++++++++++++++++++++++++++++++++ src/rgw/rgw_dedup.h | 44 ++++++++++++++++++++++++++++ src/rgw/rgw_dedup_manager.cc | 35 ++++++++++++++++++++++ src/rgw/rgw_dedup_manager.h | 38 ++++++++++++++++++++++++ src/rgw/rgw_object_expirer.cc | 2 +- src/rgw/rgw_rados.cc | 14 +++++++++ src/rgw/rgw_rados.h | 8 +++++ src/rgw/rgw_realm_reloader.cc | 1 + src/rgw/rgw_sal.cc | 4 ++- src/rgw/rgw_sal.h | 3 ++ 14 files changed, 213 insertions(+), 4 deletions(-) create mode 100644 src/rgw/rgw_dedup.cc create mode 100644 src/rgw/rgw_dedup.h create mode 100644 src/rgw/rgw_dedup_manager.cc create mode 100644 src/rgw/rgw_dedup_manager.h diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 33fe0a607948a..937e96e6b99d8 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3692,3 +3692,11 @@ options: default: tank services: - rgw +- name: rgw_enable_dedup_threads + type: bool + level: advanced + desc: Use dedup function in rados gateway + default: true + services: + - rgw + with_legacy: true diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 02c1fa575583a..dd854a1c787b2 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -173,7 +173,9 @@ set(librgw_common_srcs rgw_lua_data_filter.cc rgw_bucket_encryption.cc rgw_tracer.cc - rgw_lua_background.cc) + rgw_lua_background.cc + rgw_dedup.cc + rgw_dedup_manager.cc) list(APPEND librgw_common_srcs store/immutable_config/store.cc diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 796c5a707f677..dbcf5d28ef7d3 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -4361,7 +4361,6 @@ int main(int argc, const char **argv) bool need_gc = (gc_ops_list.find(opt_cmd) != gc_ops_list.end()) && !bypass_gc; StoreManager::Config cfg = StoreManager::get_config(true, g_ceph_context); - auto config_store_type = g_conf().get_val("rgw_config_store"); cfgstore = StoreManager::create_config_store(dpp(), config_store_type); if (!cfgstore) { @@ -4382,6 +4381,7 @@ int main(int argc, const char **argv) false, false, false, + false, need_cache && g_conf()->rgw_cache_enabled, need_gc); } diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 5a9c1641da808..04b0bd0726125 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -219,6 +219,7 @@ void rgw::AppMain::init_storage() run_quota, run_sync, g_conf().get_val("rgw_dynamic_resharding"), + g_conf()->rgw_enable_dedup_threads, g_conf()->rgw_cache_enabled); } /* init_storage */ diff --git a/src/rgw/rgw_dedup.cc b/src/rgw/rgw_dedup.cc new file mode 100644 index 0000000000000..c1e76ae4c638f --- /dev/null +++ b/src/rgw/rgw_dedup.cc @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup.h" + +#define dout_subsys ceph_subsys_rgw + +using namespace std; + + +void RGWDedup::initialize(CephContext* _cct, rgw::sal::Store* _store) +{ + cct = _cct; + store = _store; + dedup_manager = make_unique(this, cct, store); + dedup_manager->initialize(); +} + +void RGWDedup::finalize() +{ + dedup_manager.reset(); +} + +void RGWDedup::start_dedup_manager() +{ + assert(dedup_manager.get()); + dedup_manager->set_down_flag(false); + dedup_manager->create("dedup_manager"); +} + +void RGWDedup::stop_dedup_manager() +{ + if (!dedup_manager->get_down_flag() && dedup_manager.get()) { + dedup_manager->stop(); + dedup_manager->join(); + dedup_manager->finalize(); + } +} + +RGWDedup::~RGWDedup() +{ + if (dedup_manager.get()) { + stop_dedup_manager(); + } + finalize(); + + ldpp_dout(this, 2) << "stop RGWDedup done" << dendl; +} + +unsigned RGWDedup::get_subsys() const +{ + return dout_subsys; +} diff --git a/src/rgw/rgw_dedup.h b/src/rgw/rgw_dedup.h new file mode 100644 index 0000000000000..c914156d10202 --- /dev/null +++ b/src/rgw/rgw_dedup.h @@ -0,0 +1,44 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_DEDUP_H +#define CEPH_RGW_DEDUP_H + + +#include +#include +#include + +#include "include/types.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "rgw_sal.h" +#include "rgw_dedup_manager.h" + + +using namespace std; + + +class RGWDedup : public DoutPrefixProvider +{ + CephContext* cct; + rgw::sal::Store* store; + unique_ptr dedup_manager; + +public: + RGWDedup() : cct(nullptr), store(nullptr) {} + ~RGWDedup() override; + + void initialize(CephContext* _cct, rgw::sal::Store* _store); + void finalize(); + + void start_dedup_manager(); + void stop_dedup_manager(); + + CephContext* get_cct() const override { return cct; } + unsigned get_subsys() const override; + std::ostream& gen_prefix(std::ostream& out) const override { return out << "RGWDedup: "; } +}; + +#endif + diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc new file mode 100644 index 0000000000000..4ef347711cd60 --- /dev/null +++ b/src/rgw/rgw_dedup_manager.cc @@ -0,0 +1,35 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup_manager.h" + +#define dout_subsys ceph_subsys_rgw + +void RGWDedupManager::initialize() +{ + // TODO: initialize member variables of RGWDedupManager +} + +void* RGWDedupManager::entry() +{ + ldpp_dout(dpp, 2) << "RGWDedupManager started" << dendl; + + while (!get_down_flag()) { + sleep(3); + } + + ldpp_dout(dpp, 2) << "RGWDedupManager is going down" << dendl; + return nullptr; +} + +void RGWDedupManager::stop() +{ + set_down_flag(true); + ldpp_dout(dpp, 2) << "RGWDedupManager is set to be stopped" << dendl; +} + +void RGWDedupManager::finalize() +{ + // TODO: finalize member variables of RGWDedupManager +} + diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h new file mode 100644 index 0000000000000..ccdb814b3ccc8 --- /dev/null +++ b/src/rgw/rgw_dedup_manager.h @@ -0,0 +1,38 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_DEDUP_PROC_H +#define CEPH_RGW_DEDUP_PROC_H + +#include "include/types.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "rgw_sal.h" + + +using namespace std; + +class RGWDedupManager : public Thread +{ + const DoutPrefixProvider* dpp; + CephContext* cct; + rgw::sal::Store* store; + bool down_flag; + +public: + RGWDedupManager(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::Store* _store) + : dpp(_dpp), cct(_cct), store(_store), down_flag(true) {} + ~RGWDedupManager() {} + void* entry() override; + void stop(); + void finalize(); + bool going_down(); + void initialize(); + + void set_down_flag(bool new_flag) { down_flag = new_flag; } + bool get_down_flag() { return down_flag; } +}; + +#endif diff --git a/src/rgw/rgw_object_expirer.cc b/src/rgw/rgw_object_expirer.cc index 2e1249b85f714..9b15ed3d6c202 100644 --- a/src/rgw/rgw_object_expirer.cc +++ b/src/rgw/rgw_object_expirer.cc @@ -83,7 +83,7 @@ int main(const int argc, const char **argv) StoreManager::Config cfg; cfg.store_name = "rados"; cfg.filter_name = "none"; - store = StoreManager::get_storage(&dp, g_ceph_context, cfg, false, false, false, false, false); + store = StoreManager::get_storage(&dp, g_ceph_context, cfg, false, false, false, false, false, false); if (!store) { std::cerr << "couldn't init storage provider" << std::endl; return EIO; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 63992e9bc1ee4..8cea44b48a0b8 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1112,6 +1112,12 @@ void RGWRados::finalize() delete reshard; delete index_completion_manager; + if (use_dedup) { + if (dedup.get()) { + dedup.reset(); + } + } + rgw::notify::shutdown(); } @@ -1240,6 +1246,14 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp) obj_expirer->start_processor(); } + if (use_dedup) { + dedup = std::make_shared(); + dedup->initialize(cct, this->store); + dedup->start_dedup_manager(); + } else { + ldpp_dout(dpp, 5) << "note: RGWDedup not initialized" << dendl; + } + auto& current_period = svc.zone->get_current_period(); auto& zonegroup = svc.zone->get_zonegroup(); auto& zone_params = svc.zone->get_zone_params(); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 933c3e4a3dd8a..62fe2d371c599 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -39,6 +39,7 @@ #include "common/ceph_mutex.h" #include "rgw_cache.h" #include "rgw_sal_fwd.h" +#include "rgw_dedup.h" struct D3nDataCache; @@ -362,11 +363,13 @@ class RGWRados RGWGC *gc = nullptr; RGWLC *lc; RGWObjectExpirer *obj_expirer; + std::shared_ptr dedup; bool use_gc_thread; bool use_lc_thread; bool quota_threads; bool run_sync_thread; bool run_reshard_thread; + bool use_dedup; RGWMetaNotifier *meta_notifier; RGWDataNotifier *data_notifier; @@ -513,6 +516,11 @@ class RGWRados return *this; } + RGWRados& set_use_dedup(bool _use_dedup) { + use_dedup = _use_dedup; + return *this; + } + librados::IoCtx* get_lc_pool_ctx() { return &lc_pool_ctx; } diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index 1bcfeb15ac821..3ef767ad94f91 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -116,6 +116,7 @@ void RGWRealmReloader::reload() cct->_conf->rgw_enable_quota_threads, cct->_conf->rgw_run_sync_thread, cct->_conf.get_val("rgw_dynamic_resharding"), + false, cct->_conf->rgw_cache_enabled); ldpp_dout(&dp, 1) << "Creating new store" << dendl; diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index 90786ac49c996..52421dbcb73a0 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -101,7 +101,8 @@ rgw::sal::Store* StoreManager::init_storage_provider(const DoutPrefixProvider* d bool run_sync_thread, bool run_reshard_thread, bool use_cache, - bool use_gc) + bool use_gc, + bool use_dedup) { rgw::sal::Store* store{nullptr}; @@ -117,6 +118,7 @@ rgw::sal::Store* StoreManager::init_storage_provider(const DoutPrefixProvider* d .set_run_quota_threads(quota_threads) .set_run_sync_thread(run_sync_thread) .set_run_reshard_thread(run_reshard_thread) + .set_use_dedup(use_dedup) .init_begin(cct, dpp) < 0) { delete store; return nullptr; diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index c0ea2e631963d..7a5518289a5c8 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -1576,6 +1576,7 @@ class StoreManager { bool quota_threads, bool run_sync_thread, bool run_reshard_thread, + bool use_dedup_thread, bool use_cache = true, bool use_gc = true) { rgw::sal::Store* store = init_storage_provider(dpp, cct, cfg, use_gc_thread, @@ -1583,6 +1584,7 @@ class StoreManager { quota_threads, run_sync_thread, run_reshard_thread, + use_dedup_thread, use_cache, use_gc); return store; } @@ -1601,6 +1603,7 @@ class StoreManager { bool quota_threads, bool run_sync_thread, bool run_reshard_thread, + bool use_dedup_thread, bool use_metadata_cache, bool use_gc); /** Initialize a new raw Store */ From 22a7e63afb3807aa23804e2ea674ebe3819c5cf1 Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Fri, 18 Nov 2022 14:51:22 +0900 Subject: [PATCH 02/15] Add get_objects() in RGWDedupManager get_object() collects all the rados object a zone which current RGW belongs. --- src/rgw/rgw_dedup.cc | 2 +- src/rgw/rgw_dedup.h | 7 +-- src/rgw/rgw_dedup_manager.cc | 89 ++++++++++++++++++++++++++++++++++++ src/rgw/rgw_dedup_manager.h | 13 +++--- src/rgw/rgw_rados.cc | 1 + src/rgw/rgw_rados.h | 5 +- 6 files changed, 103 insertions(+), 14 deletions(-) diff --git a/src/rgw/rgw_dedup.cc b/src/rgw/rgw_dedup.cc index c1e76ae4c638f..177d2406f75b8 100644 --- a/src/rgw/rgw_dedup.cc +++ b/src/rgw/rgw_dedup.cc @@ -8,7 +8,7 @@ using namespace std; -void RGWDedup::initialize(CephContext* _cct, rgw::sal::Store* _store) +void RGWDedup::initialize(CephContext* _cct, rgw::sal::RadosStore* _store) { cct = _cct; store = _store; diff --git a/src/rgw/rgw_dedup.h b/src/rgw/rgw_dedup.h index c914156d10202..c3f2a3cb5fab9 100644 --- a/src/rgw/rgw_dedup.h +++ b/src/rgw/rgw_dedup.h @@ -12,24 +12,21 @@ #include "include/types.h" #include "common/Cond.h" #include "common/Thread.h" -#include "rgw_sal.h" #include "rgw_dedup_manager.h" - using namespace std; - class RGWDedup : public DoutPrefixProvider { CephContext* cct; - rgw::sal::Store* store; + rgw::sal::RadosStore* store; unique_ptr dedup_manager; public: RGWDedup() : cct(nullptr), store(nullptr) {} ~RGWDedup() override; - void initialize(CephContext* _cct, rgw::sal::Store* _store); + void initialize(CephContext* _cct, rgw::sal::RadosStore* _store); void finalize(); void start_dedup_manager(); diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index 4ef347711cd60..a767a462e19f0 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -2,9 +2,13 @@ // vim: ts=8 sw=2 smarttab ft=cpp #include "rgw_dedup_manager.h" +#include "rgw_rados.h" #define dout_subsys ceph_subsys_rgw +const int MAX_OBJ_SCAN_SIZE = 100; +const int MAX_BUCKET_SCAN_SIZE = 100; + void RGWDedupManager::initialize() { // TODO: initialize member variables of RGWDedupManager @@ -15,6 +19,8 @@ void* RGWDedupManager::entry() ldpp_dout(dpp, 2) << "RGWDedupManager started" << dendl; while (!get_down_flag()) { + int ret = get_rados_objects(); + sleep(3); } @@ -33,3 +39,86 @@ void RGWDedupManager::finalize() // TODO: finalize member variables of RGWDedupManager } +int RGWDedupManager::get_rados_objects() +{ + void* handle = nullptr; + bool bucket_trunc = true; + int ret = 0; + int total_obj_cnt = 0; + + ret = store->meta_list_keys_init(dpp, "bucket", string(), &handle); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: meta_list_keys_init() failed" << dendl; + return ret; + } + + while (bucket_trunc) { + list bucket_list; + ret = store->meta_list_keys_next(dpp, handle, MAX_BUCKET_SCAN_SIZE, + bucket_list, &bucket_trunc); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: meta_list_keys_netx() failed" << dendl; + return ret; + } + else { + for (auto bucket_name : bucket_list){ + unique_ptr bkt; + ret = store->get_bucket(dpp, nullptr, "", bucket_name, &bkt, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: get_bucket() failed" << dendl; + return ret; + } + + rgw::sal::Bucket::ListParams params; + rgw::sal::Bucket::ListResults results; + bool obj_trunc = true; + const std::string bucket_id = bkt->get_key().get_key(); + + while (obj_trunc) { + ret = bkt->list(dpp, params, MAX_OBJ_SCAN_SIZE, results, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: list() failed" << dendl; + return ret; + } + + for (auto obj : results.objs) { + ldpp_dout(dpp, 0) << "rgw_obj name: " << obj.key.name << dendl; + + RGWObjectCtx obj_ctx(store); + unique_ptr rgw_obj = bkt->get_object(obj.key); + RGWRados::Object op_target(store->getRados(), bkt.get(), obj_ctx, rgw_obj.get()); + RGWRados::Object::Stat stat_op(&op_target); + ret = stat_op.stat_async(dpp); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " << + cpp_strerror(-ret) << dendl; + return ret; + } + ret = stat_op.wait(dpp); + if (ret < 0) { + if (ret != -ENOENT) { + ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " << + cpp_strerror(-ret) << dendl; + } + return ret; + } + + RGWObjManifest& manifest = *stat_op.result.manifest; + RGWObjManifest::obj_iterator miter; + for (miter = manifest.obj_begin(dpp); miter != manifest.obj_end(dpp); ++miter) { + const rgw_raw_obj& loc = miter.get_location().get_raw_obj(static_cast(store)); + rados_objs.emplace_back(loc.oid); + ldpp_dout(dpp, 0) << " rados_oid name: " << loc.oid << dendl; + } + } + + obj_trunc = results.is_truncated; + total_obj_cnt += results.objs.size(); + } + } + } + } + store->meta_list_keys_complete(handle); + + return total_obj_cnt; +} \ No newline at end of file diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h index ccdb814b3ccc8..74b791d9fb157 100644 --- a/src/rgw/rgw_dedup_manager.h +++ b/src/rgw/rgw_dedup_manager.h @@ -1,14 +1,13 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp -#ifndef CEPH_RGW_DEDUP_PROC_H -#define CEPH_RGW_DEDUP_PROC_H +#ifndef CEPH_RGW_DEDUP_MANAGER_H +#define CEPH_RGW_DEDUP_MANAGER_H #include "include/types.h" #include "common/Cond.h" #include "common/Thread.h" -#include "rgw_sal.h" - +#include "rgw_sal_rados.h" using namespace std; @@ -16,13 +15,14 @@ class RGWDedupManager : public Thread { const DoutPrefixProvider* dpp; CephContext* cct; - rgw::sal::Store* store; + rgw::sal::RadosStore* store; bool down_flag; + vector rados_objs; public: RGWDedupManager(const DoutPrefixProvider* _dpp, CephContext* _cct, - rgw::sal::Store* _store) + rgw::sal::RadosStore* _store) : dpp(_dpp), cct(_cct), store(_store), down_flag(true) {} ~RGWDedupManager() {} void* entry() override; @@ -33,6 +33,7 @@ class RGWDedupManager : public Thread void set_down_flag(bool new_flag) { down_flag = new_flag; } bool get_down_flag() { return down_flag; } + int get_rados_objects(); }; #endif diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8cea44b48a0b8..7a81c36707736 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -64,6 +64,7 @@ #include "rgw_gc.h" #include "rgw_lc.h" +#include "rgw_dedup.h" #include "rgw_object_expirer_core.h" #include "rgw_sync.h" diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 62fe2d371c599..5cff58c2429c5 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -39,7 +39,6 @@ #include "common/ceph_mutex.h" #include "rgw_cache.h" #include "rgw_sal_fwd.h" -#include "rgw_dedup.h" struct D3nDataCache; @@ -58,6 +57,8 @@ struct RGWZoneGroup; struct RGWZoneParams; class RGWReshard; class RGWReshardWait; +class RGWDedup; +class RGWDedupManager; struct get_obj_data; @@ -343,7 +344,7 @@ class RGWRados friend class rgw::sal::MPRadosSerializer; friend class rgw::sal::LCRadosSerializer; friend class rgw::sal::RadosStore; - + /** Open the pool used as root for this gateway */ int open_root_pool_ctx(const DoutPrefixProvider *dpp); int open_gc_pool_ctx(const DoutPrefixProvider *dpp); From 3f634ead95978fda7441fee0798aedb2ac92b138 Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Mon, 21 Nov 2022 16:44:19 +0900 Subject: [PATCH 03/15] Add some prepare routine required for triggering DedupWorkers - Change function name from get_rados_objects() to prepare_dedup_work() - Add append_ioctxs() to get base, chunk, and cold pools from existing data pools of storage_classes - Add set_dedup_tier() to declare dedup_tier between base-pools and chunk-pools. --- src/rgw/rgw_dedup_manager.cc | 175 +++++++++++++++++++++++++++++------ src/rgw/rgw_dedup_manager.h | 56 +++++++++-- 2 files changed, 199 insertions(+), 32 deletions(-) diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index a767a462e19f0..8b308a0e97c69 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -1,6 +1,8 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp +#include + #include "rgw_dedup_manager.h" #include "rgw_rados.h" @@ -8,6 +10,12 @@ const int MAX_OBJ_SCAN_SIZE = 100; const int MAX_BUCKET_SCAN_SIZE = 100; +const string DEFAULT_CHUNK_POOL_POSTFIX = "_chunk"; +const string DEFAULT_COLD_POOL_POSTFIX = "_cold"; +const string DEFAULT_CHUNK_SIZE = "16384"; +const string DEFAULT_CHUNK_ALGO = "fastcdc"; +const string DEFAULT_FP_ALGO = "sha1"; +const string DEFAULT_HITSET_TYPE = "bloom"; void RGWDedupManager::initialize() { @@ -19,8 +27,16 @@ void* RGWDedupManager::entry() ldpp_dout(dpp, 2) << "RGWDedupManager started" << dendl; while (!get_down_flag()) { - int ret = get_rados_objects(); + int ret = 0; + assert(prepare_dedup_work() >= 0); + if (ret == 0) { + ldpp_dout(dpp, 2) << "not a single rados object has been found. do retry" << dendl; + sleep(3); + continue; + } + // TODO: do dedup work + sleep(3); } @@ -39,7 +55,98 @@ void RGWDedupManager::finalize() // TODO: finalize member variables of RGWDedupManager } -int RGWDedupManager::get_rados_objects() +librados::IoCtx RGWDedupManager::get_or_create_ioctx(rgw_pool pool) +{ + librados::IoCtx ioctx; + rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), pool, + ioctx, true, false); + return ioctx; +} + +void RGWDedupManager::append_ioctxs(rgw_pool base_pool) +{ + string base_pool_name = base_pool.name; + librados::IoCtx base_ioctx = get_or_create_ioctx(base_pool); + + string chunk_pool_name = base_pool_name + chunk_pool_postfix; + librados::IoCtx chunk_ioctx = get_or_create_ioctx(rgw_pool(chunk_pool_name)); + + string cold_pool_name = base_pool_name + cold_pool_postfix; + librados::IoCtx cold_ioctx = get_or_create_ioctx(rgw_pool(cold_pool_name)); + + dedup_ioctx_set pool_set{base_ioctx, chunk_ioctx, cold_ioctx}; + ioctx_map.insert({base_pool_name, pool_set}); +} + +void RGWDedupManager::set_dedup_tier(string base_pool_name) +{ + string chunk_pool_name = ioctx_map[base_pool_name].chunk_pool_ctx.get_pool_name(); + librados::Rados* rados = store->getRados()->get_rados_handle(); + bufferlist inbl; + int ret = rados->mon_command( + "{\"prefix\": \"osd pool set\", \"pool\": \"" + base_pool_name + + "\",\"var\": \"dedup_tier\", \"val\": \"" + chunk_pool_name + + "\"}", inbl, nullptr, nullptr); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to set dedup_tier" << dendl; + } + + ret = rados->mon_command( + "{\"prefix\": \"osd pool set\", \"pool\": \"" + base_pool_name + + "\",\"var\": \"dedup_chunk_algorithm\", \"val\": \"" + chunk_algo + + "\"}", inbl, nullptr, nullptr); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to set dedup_chunk_algorithm" << dendl; + } + + ret = rados->mon_command( + "{\"prefix\": \"osd pool set\", \"pool\": \"" + base_pool_name + + "\",\"var\": \"dedup_cdc_chunk_size\", \"val\": \"" + chunk_size + + "\"}", inbl, nullptr, nullptr); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to set dedup_cdc_chunk_size" << dendl; + } + + ret = rados->mon_command( + "{\"prefix\": \"osd pool set\", \"pool\": \"" + base_pool_name + + "\",\"var\": \"fingerprint_algorithm\", \"val\": \"" + fp_algo + + "\"}", inbl, nullptr, nullptr); + + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to set fingerprint_algorithm" << dendl; + } + + ret = rados->mon_command( + "{\"prefix\": \"osd pool set\", \"pool\": \"" + base_pool_name + + "\",\"var\": \"hit_set_type\", \"val\": \"" + hitset_type + + "\"}", inbl, nullptr, nullptr); + + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to set hit_set_type" << dendl; + } +} + +int RGWDedupManager::get_rados_objects(RGWRados::Object::Stat& stat_op) +{ + int ret = 0; + ret = stat_op.stat_async(dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: stat_async() returned error: " << + cpp_strerror(-ret) << dendl; + return ret; + } + ret = stat_op.wait(dpp); + if (ret < 0) { + if (ret != -ENOENT) { + ldpp_dout(dpp, 0) << "ERROR: stat_async() returned error: " << + cpp_strerror(-ret) << dendl; + } + return ret; + } + return ret; +} + +int RGWDedupManager::prepare_dedup_work() { void* handle = nullptr; bool bucket_trunc = true; @@ -61,9 +168,9 @@ int RGWDedupManager::get_rados_objects() return ret; } else { - for (auto bucket_name : bucket_list){ - unique_ptr bkt; - ret = store->get_bucket(dpp, nullptr, "", bucket_name, &bkt, null_yield); + for (auto bucket_name : bucket_list) { + unique_ptr bucket; + ret = store->get_bucket(dpp, nullptr, "", bucket_name, &bucket, null_yield); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: get_bucket() failed" << dendl; return ret; @@ -72,43 +179,59 @@ int RGWDedupManager::get_rados_objects() rgw::sal::Bucket::ListParams params; rgw::sal::Bucket::ListResults results; bool obj_trunc = true; - const std::string bucket_id = bkt->get_key().get_key(); + const string bucket_id = bucket->get_key().get_key(); while (obj_trunc) { - ret = bkt->list(dpp, params, MAX_OBJ_SCAN_SIZE, results, null_yield); + ret = bucket->list(dpp, params, MAX_OBJ_SCAN_SIZE, results, null_yield); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: list() failed" << dendl; return ret; } for (auto obj : results.objs) { - ldpp_dout(dpp, 0) << "rgw_obj name: " << obj.key.name << dendl; + ldpp_dout(dpp, 10) << "rgw_obj name: " << obj.key.name << dendl; RGWObjectCtx obj_ctx(store); - unique_ptr rgw_obj = bkt->get_object(obj.key); - RGWRados::Object op_target(store->getRados(), bkt.get(), obj_ctx, rgw_obj.get()); + unique_ptr rgw_obj = bucket->get_object(obj.key); + RGWRados::Object op_target(store->getRados(), bucket.get(), + obj_ctx, rgw_obj.get()); RGWRados::Object::Stat stat_op(&op_target); - ret = stat_op.stat_async(dpp); - if (ret < 0) { - ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " << - cpp_strerror(-ret) << dendl; - return ret; - } - ret = stat_op.wait(dpp); + ret = get_rados_objects(stat_op); if (ret < 0) { - if (ret != -ENOENT) { - ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " << - cpp_strerror(-ret) << dendl; - } return ret; } RGWObjManifest& manifest = *stat_op.result.manifest; RGWObjManifest::obj_iterator miter; - for (miter = manifest.obj_begin(dpp); miter != manifest.obj_end(dpp); ++miter) { - const rgw_raw_obj& loc = miter.get_location().get_raw_obj(static_cast(store)); - rados_objs.emplace_back(loc.oid); - ldpp_dout(dpp, 0) << " rados_oid name: " << loc.oid << dendl; + for (miter = manifest.obj_begin(dpp); + miter != manifest.obj_end(dpp); + ++miter) { + const rgw_raw_obj& rados_obj + = miter.get_location() + .get_raw_obj(static_cast(store)); + + // do not allow duplicated objects in rados_objs + bool is_exist = false; + for (auto& it : rados_objs) { + if (it.object_name == rados_obj.oid) { + ldpp_dout(dpp, 20) << "get_raw_obj() got duplicated rados object (" + << rados_obj.oid << ")" << dendl; + is_exist = true; + continue; + } + } + if (!is_exist) { + target_rados_object obj{rados_obj.oid, rados_obj.pool.name}; + rados_objs.emplace_back(obj); + ldpp_dout(dpp, 10) << " rados_oid name: " << rados_obj.oid + << ", pool.name: " << rados_obj.pool.name << dendl; + } + + string base_pool_name = rados_obj.pool.name; + if (ioctx_map.find(base_pool_name) == ioctx_map.end()) { + append_ioctxs(rados_obj.pool); + set_dedup_tier(base_pool_name); + } } } @@ -121,4 +244,4 @@ int RGWDedupManager::get_rados_objects() store->meta_list_keys_complete(handle); return total_obj_cnt; -} \ No newline at end of file +} diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h index 74b791d9fb157..47c3e294e5f66 100644 --- a/src/rgw/rgw_dedup_manager.h +++ b/src/rgw/rgw_dedup_manager.h @@ -11,29 +11,73 @@ using namespace std; +extern const string DEFAULT_CHUNK_POOL_POSTFIX; +extern const string DEFAULT_COLD_POOL_POSTFIX; +extern const string DEFAULT_CHUNK_SIZE; +extern const string DEFAULT_CHUNK_ALGO; +extern const string DEFAULT_FP_ALGO; +extern const string DEFAULT_HITSET_TYPE; + +struct target_rados_object { + string object_name; + string pool_name; +}; + class RGWDedupManager : public Thread { const DoutPrefixProvider* dpp; CephContext* cct; rgw::sal::RadosStore* store; bool down_flag; - vector rados_objs; + vector rados_objs; + + string chunk_pool_postfix; + string cold_pool_postfix; + string chunk_size; + string chunk_algo; + string fp_algo; + string hitset_type; + + /** + * There is a data_pool which is regarded as base-pool for a storage_classes. + * For dedup, a chunk-pool and a cold-pool are required for each base-pool. + * struct dedup_ioctx_set indicates the IoCtxs of the pools related to each other. + */ + struct dedup_ioctx_set { + librados::IoCtx base_pool_ctx; + librados::IoCtx chunk_pool_ctx; + librados::IoCtx cold_pool_ctx; + }; + // sc data pool (base-pool name) : ioctx_set + map ioctx_map; public: RGWDedupManager(const DoutPrefixProvider* _dpp, CephContext* _cct, rgw::sal::RadosStore* _store) - : dpp(_dpp), cct(_cct), store(_store), down_flag(true) {} - ~RGWDedupManager() {} - void* entry() override; + : dpp(_dpp), cct(_cct), store(_store), down_flag(true), + chunk_pool_postfix(DEFAULT_CHUNK_POOL_POSTFIX), + cold_pool_postfix(DEFAULT_COLD_POOL_POSTFIX), + chunk_size(DEFAULT_CHUNK_SIZE), + chunk_algo(DEFAULT_CHUNK_ALGO), + fp_algo(DEFAULT_FP_ALGO), + hitset_type(DEFAULT_HITSET_TYPE) {} + virtual ~RGWDedupManager() override {} + virtual void* entry() override; + void stop(); void finalize(); bool going_down(); void initialize(); - void set_down_flag(bool new_flag) { down_flag = new_flag; } bool get_down_flag() { return down_flag; } - int get_rados_objects(); + +protected: + void set_dedup_tier(string base_pool_name); + int prepare_dedup_work(); + librados::IoCtx get_or_create_ioctx(rgw_pool pool); + void append_ioctxs(rgw_pool base_pool); + int get_rados_objects(RGWRados::Object::Stat& stat_op); }; #endif From ec479f116e01174bd010645d496a17ea5a8ca782 Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Wed, 23 Nov 2022 18:07:26 +0900 Subject: [PATCH 04/15] Add skeleton codes of RGWDedupWorker and RGWChunkScrubWorker --- src/rgw/CMakeLists.txt | 3 +- src/rgw/rgw_dedup_manager.cc | 116 ++++++++++++++++++++++++++++++++--- src/rgw/rgw_dedup_manager.h | 34 ++++++++-- src/rgw/rgw_dedup_worker.cc | 81 ++++++++++++++++++++++++ src/rgw/rgw_dedup_worker.h | 85 +++++++++++++++++++++++++ 5 files changed, 304 insertions(+), 15 deletions(-) create mode 100644 src/rgw/rgw_dedup_worker.cc create mode 100644 src/rgw/rgw_dedup_worker.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index dd854a1c787b2..2ec8960993469 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -175,7 +175,8 @@ set(librgw_common_srcs rgw_tracer.cc rgw_lua_background.cc rgw_dedup.cc - rgw_dedup_manager.cc) + rgw_dedup_manager.cc + rgw_dedup_worker.cc) list(APPEND librgw_common_srcs store/immutable_config/store.cc diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index 8b308a0e97c69..c4e36a405c88d 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -8,8 +8,11 @@ #define dout_subsys ceph_subsys_rgw +const int DEFAULT_NUM_WORKERS = 3; +const int DEFAULT_SAMPLING_RATIO = 50; const int MAX_OBJ_SCAN_SIZE = 100; const int MAX_BUCKET_SCAN_SIZE = 100; +const int DEFAULT_DEDUP_SCRUB_RATIO = 10; const string DEFAULT_CHUNK_POOL_POSTFIX = "_chunk"; const string DEFAULT_COLD_POOL_POSTFIX = "_cold"; const string DEFAULT_CHUNK_SIZE = "16384"; @@ -19,7 +22,59 @@ const string DEFAULT_HITSET_TYPE = "bloom"; void RGWDedupManager::initialize() { - // TODO: initialize member variables of RGWDedupManager + for (int i = 0; i < num_workers; i++) { + auto worker = make_unique(dpp, cct, store, i); + workers.emplace_back(move(worker)); + } +} + +void RGWDedupManager::reset_workers(bool need_scrub) +{ + for (int i = 0; i < num_workers; i++) { + if (need_scrub) { + workers[i].reset(new RGWChunkScrubWorker(dpp, cct, store, i, num_workers)); + } + workers[i].reset(new RGWDedupWorker(dpp, cct, store, i)); + } +} + +vector RGWDedupManager::sample_rados_objects() +{ + size_t num_objs = get_num_rados_obj(); + vector sampled_indexes(num_objs); + // fill out vector to get sampled indexes + for (size_t i = 0; i < num_objs; i++) { + sampled_indexes[i] = i; + } + + unsigned seed = chrono::system_clock::now().time_since_epoch().count(); + shuffle(sampled_indexes.begin(), sampled_indexes.end(), default_random_engine(seed)); + size_t sampling_count = num_objs * sampling_ratio / 100; + sampled_indexes.resize(sampling_count); + + return sampled_indexes; +} + +void RGWDedupManager::hand_out_objects(vector sampled_indexes) +{ + size_t num_objs_per_worker = sampled_indexes.size() / num_workers; + int remain_objs = sampled_indexes.size() % num_workers; + for (auto& worker: workers) { + worker->clear_objs(); + } + + vector>::iterator it = workers.begin(); + for (auto idx : sampled_indexes) { + (*it)->append_obj(rados_objs[idx]); + if ((*it)->get_num_objs() >= num_objs_per_worker) { + // append remain object for even distribution if remain_objs exists + if (remain_objs > 0) { + --remain_objs; + continue; + } + ++it; + } + } } void* RGWDedupManager::entry() @@ -28,14 +83,47 @@ void* RGWDedupManager::entry() while (!get_down_flag()) { int ret = 0; - assert(prepare_dedup_work() >= 0); - if (ret == 0) { - ldpp_dout(dpp, 2) << "not a single rados object has been found. do retry" << dendl; - sleep(3); - continue; + if (dedup_worked_cnt < dedup_scrub_ratio) { + ldpp_dout(dpp, 2) << "RGWDedupWorkers start" << dendl; + + assert(prepare_dedup_work() >= 0); + if (ret == 0 && get_num_rados_obj() == 0) { + ldpp_dout(dpp, 2) << "not a single rados object has been found. do retry" << dendl; + sleep(60); + continue; + } + + vector sampled_indexes = sample_rados_objects(); + hand_out_objects(sampled_indexes); + ++dedup_worked_cnt; + } + else { + ldpp_dout(dpp, 2) << "RGWChunkScrubWorkers start" << dendl; + + reset_workers(dedup_worked_cnt == dedup_scrub_ratio); + for (auto& worker : workers) { + worker->initialize(); + } + } + + // trigger RGWDedupWorkers + for (auto& worker : workers) + { + worker->set_run(true); + string name = worker->get_id(); + worker->create(name.c_str()); } - // TODO: do dedup work + // all RGWDedupWorkers synchronozed here + for (auto& w: workers) + { + w->join(); + } + + if (dedup_worked_cnt == dedup_scrub_ratio) { + dedup_worked_cnt = 0; + reset_workers(dedup_worked_cnt == dedup_scrub_ratio); + } sleep(3); } @@ -52,7 +140,10 @@ void RGWDedupManager::stop() void RGWDedupManager::finalize() { - // TODO: finalize member variables of RGWDedupManager + for (auto& worker : workers) { + worker.reset(); + } + workers.clear(); } librados::IoCtx RGWDedupManager::get_or_create_ioctx(rgw_pool pool) @@ -245,3 +336,12 @@ int RGWDedupManager::prepare_dedup_work() return total_obj_cnt; } + +int RGWDedupManager::set_sampling_ratio(int new_sampling_ratio) +{ + if (new_sampling_ratio <= 0 || new_sampling_ratio > 100) { + return -1; + } + sampling_ratio = new_sampling_ratio; + return 0; +} diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h index 47c3e294e5f66..818e60e0df0f6 100644 --- a/src/rgw/rgw_dedup_manager.h +++ b/src/rgw/rgw_dedup_manager.h @@ -8,8 +8,10 @@ #include "common/Cond.h" #include "common/Thread.h" #include "rgw_sal_rados.h" +#include "rgw_dedup_worker.h" using namespace std; +using namespace librados; extern const string DEFAULT_CHUNK_POOL_POSTFIX; extern const string DEFAULT_COLD_POOL_POSTFIX; @@ -17,12 +19,16 @@ extern const string DEFAULT_CHUNK_SIZE; extern const string DEFAULT_CHUNK_ALGO; extern const string DEFAULT_FP_ALGO; extern const string DEFAULT_HITSET_TYPE; +extern const int DEFAULT_NUM_WORKERS; +extern const int DEFAULT_SAMPLING_RATIO; +extern const int DEFAULT_DEDUP_SCRUB_RATIO; struct target_rados_object { string object_name; string pool_name; }; +class Worker; class RGWDedupManager : public Thread { const DoutPrefixProvider* dpp; @@ -30,6 +36,7 @@ class RGWDedupManager : public Thread rgw::sal::RadosStore* store; bool down_flag; vector rados_objs; + vector> workers; string chunk_pool_postfix; string cold_pool_postfix; @@ -37,6 +44,10 @@ class RGWDedupManager : public Thread string chunk_algo; string fp_algo; string hitset_type; + int num_workers; + int sampling_ratio; + int dedup_scrub_ratio; + int dedup_worked_cnt; /** * There is a data_pool which is regarded as base-pool for a storage_classes. @@ -44,9 +55,9 @@ class RGWDedupManager : public Thread * struct dedup_ioctx_set indicates the IoCtxs of the pools related to each other. */ struct dedup_ioctx_set { - librados::IoCtx base_pool_ctx; - librados::IoCtx chunk_pool_ctx; - librados::IoCtx cold_pool_ctx; + IoCtx base_pool_ctx; + IoCtx chunk_pool_ctx; + IoCtx cold_pool_ctx; }; // sc data pool (base-pool name) : ioctx_set map ioctx_map; @@ -61,7 +72,11 @@ class RGWDedupManager : public Thread chunk_size(DEFAULT_CHUNK_SIZE), chunk_algo(DEFAULT_CHUNK_ALGO), fp_algo(DEFAULT_FP_ALGO), - hitset_type(DEFAULT_HITSET_TYPE) {} + hitset_type(DEFAULT_HITSET_TYPE), + num_workers(DEFAULT_NUM_WORKERS), + sampling_ratio(DEFAULT_SAMPLING_RATIO), + dedup_scrub_ratio(DEFAULT_DEDUP_SCRUB_RATIO), + dedup_worked_cnt(0) {} virtual ~RGWDedupManager() override {} virtual void* entry() override; @@ -72,12 +87,19 @@ class RGWDedupManager : public Thread void set_down_flag(bool new_flag) { down_flag = new_flag; } bool get_down_flag() { return down_flag; } -protected: + void reset_workers(bool need_scrub); void set_dedup_tier(string base_pool_name); int prepare_dedup_work(); - librados::IoCtx get_or_create_ioctx(rgw_pool pool); + IoCtx get_or_create_ioctx(rgw_pool pool); void append_ioctxs(rgw_pool base_pool); int get_rados_objects(RGWRados::Object::Stat& stat_op); + vector sample_rados_objects(); + void hand_out_objects(vector sampled_indexes); + + int set_num_workers(int new_num_workers); + int set_sampling_ratio(int new_sampling_ratio); + void append_rados_obj(target_rados_object new_obj) { rados_objs.emplace_back(new_obj); } + size_t get_num_rados_obj() { return rados_objs.size(); } }; #endif diff --git a/src/rgw/rgw_dedup_worker.cc b/src/rgw/rgw_dedup_worker.cc new file mode 100644 index 0000000000000..ba548061c7978 --- /dev/null +++ b/src/rgw/rgw_dedup_worker.cc @@ -0,0 +1,81 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup_worker.h" + +#define dout_subsys ceph_subsys_rgw + +void Worker::clear_objs() +{ + rados_objs.clear(); +} + +void Worker::append_obj(target_rados_object new_obj) +{ + rados_objs.emplace_back(new_obj); +} + +size_t Worker::get_num_objs() +{ + return rados_objs.size(); +} + +void Worker::set_run(bool run_status) +{ + is_run = run_status; +} + + +string RGWDedupWorker::get_id() +{ + return "DedupWorker_" + to_string(id); +} + +void RGWDedupWorker::initialize() +{ + +} + +void* RGWDedupWorker::entry() +{ + + return nullptr; +} + +void RGWDedupWorker::stop() +{ + +} + +void RGWDedupWorker::finalize() +{ + +} + + +string RGWChunkScrubWorker::get_id() +{ + return "ScrubWorker_" + to_string(id); +} + +void RGWChunkScrubWorker::initialize() +{ + +} + +void* RGWChunkScrubWorker::entry() +{ + + return nullptr; +} + +void RGWChunkScrubWorker::stop() +{ + +} + +void RGWChunkScrubWorker::finalize() +{ + +} + diff --git a/src/rgw/rgw_dedup_worker.h b/src/rgw/rgw_dedup_worker.h new file mode 100644 index 0000000000000..2be11e36b65eb --- /dev/null +++ b/src/rgw/rgw_dedup_worker.h @@ -0,0 +1,85 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_DEDUP_WORKER_H +#define CEPH_RGW_DEDUP_WORKER_H + +#include "include/rados/librados.hpp" +#include "rgw_dedup_manager.h" + +using namespace std; +using namespace librados; + +struct target_rados_object; +class Worker : public Thread +{ +protected: + const DoutPrefixProvider* dpp; + CephContext* cct; + rgw::sal::RadosStore* store; + int id; + vector rados_objs; + bool is_run; + +public: + Worker(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id) + : dpp(_dpp), cct(_cct), store(_store), id(_id), is_run(false) {} + virtual ~Worker() {} + + virtual void* entry() = 0; + virtual void stop() = 0; + virtual void finalize() = 0; + virtual void initialize() = 0; + + void clear_objs(); + void append_obj(target_rados_object new_obj); + virtual string get_id() = 0; + size_t get_num_objs(); + void set_run(bool run_status); +}; + +class RGWDedupWorker : public Worker +{ + +public: + RGWDedupWorker(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id) + : Worker(_dpp, _cct, _store, _id) {} + virtual ~RGWDedupWorker() override {} + + virtual void* entry() override; + virtual void stop() override; + virtual void finalize() override; + virtual void initialize() override; + + virtual string get_id() override; +}; + +class RGWChunkScrubWorker : public Worker +{ + int num_threads; + +public: + RGWChunkScrubWorker(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id, + int _num_threads) + : Worker(_dpp, _cct, _store, _id), + num_threads(_num_threads) {} + virtual ~RGWChunkScrubWorker() override {} + + virtual void* entry() override; + virtual void stop() override; + virtual void initialize() override; + virtual void finalize() override; + + virtual string get_id() override; +}; + +#endif From 0f848a464439c1322adcb4f2ad3a28cd35aa9608 Mon Sep 17 00:00:00 2001 From: "daegon.yang" Date: Mon, 28 Nov 2022 15:22:41 +0900 Subject: [PATCH 05/15] Add unit tests for RGWDedupManager --- src/test/rgw/CMakeLists.txt | 16 +++++++ src/test/rgw/test_rgw_dedup.cc | 82 ++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 src/test/rgw/test_rgw_dedup.cc diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 55921737665d5..1643a8856c679 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -241,3 +241,19 @@ add_executable(unittest_rgw_lua test_rgw_lua.cc) add_ceph_unittest(unittest_rgw_lua) target_link_libraries(unittest_rgw_lua ${rgw_libs}) +# unittest_rgw_dedup +add_executable(unittest_rgw_dedup test_rgw_dedup.cc) +add_ceph_unittest(unittest_rgw_dedup) +target_link_libraries( + unittest_rgw_dedup + test_rgw_a + radostest-cxx + ${UNITTEST_LIBS} + ${rgw_libs} + cls_rgw_client + librados + global + ${CURL_LIBRARIES} + ${EXPAT_LIBRARIES} + ${CMAKE_DL_LIBS} +) \ No newline at end of file diff --git a/src/test/rgw/test_rgw_dedup.cc b/src/test/rgw/test_rgw_dedup.cc new file mode 100644 index 0000000000000..bd7d1cebae598 --- /dev/null +++ b/src/test/rgw/test_rgw_dedup.cc @@ -0,0 +1,82 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "gtest/gtest.h" +#include "test/librados/test_cxx.h" +#include "test_rgw_common.h" +#include "rgw/rgw_dedup_manager.h" +#include "rgw/rgw_dedup_worker.h" +#include "rgw/rgw_sal_rados.h" +#include "common/dout.h" + + +auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT); +const DoutPrefix dp(cct, 1, "test rgw dedup: "); + +class RGWDedupTest : public ::testing::Test +{ +protected: + rgw::sal::RadosStore store; + + static void SetUpTestCase() {} + static void TearDownTestCase() {} + + void SetUp() override {} + void TearDown() override {} + +public: + RGWDedupTest() {} + ~RGWDedupTest() override {} +}; + +TEST_F(RGWDedupTest, set_sampling_ratio) +{ + RGWDedupManager dedupmanager(&dp, cct, &store); + + EXPECT_EQ(0, dedupmanager.set_sampling_ratio(1)); + EXPECT_EQ(0, dedupmanager.set_sampling_ratio(100)); + EXPECT_EQ(-1, dedupmanager.set_sampling_ratio(0)); + EXPECT_EQ(-1, dedupmanager.set_sampling_ratio(101)); + EXPECT_EQ(-1, dedupmanager.set_sampling_ratio(-1000)); + EXPECT_EQ(-1, dedupmanager.set_sampling_ratio(1000)); +} + +TEST_F(RGWDedupTest, sample_objects) +{ + RGWDedupManager dedup_manager(&dp, cct, &store); + + int num_objs = 10; + for (int i = 0; i < num_objs; ++i) { + string oid = "obj_" + to_string(i); + target_rados_object obj{oid, "test_pool"}; + dedup_manager.append_rados_obj(obj); + } + EXPECT_EQ(num_objs, dedup_manager.get_num_rados_obj()); + + int sampling_ratio = 30; + EXPECT_EQ(0, dedup_manager.set_sampling_ratio(sampling_ratio)); + vector sampled_idx = dedup_manager.sample_rados_objects(); + EXPECT_EQ(num_objs * sampling_ratio / 100, sampled_idx.size()); + + sampling_ratio = 100; + EXPECT_EQ(0, dedup_manager.set_sampling_ratio(sampling_ratio)); + sampled_idx.clear(); + sampled_idx = dedup_manager.sample_rados_objects(); + EXPECT_EQ(num_objs * sampling_ratio / 100, sampled_idx.size()); +} + + +TEST_F(RGWDedupTest, get_worker_id) +{ + RGWDedupWorker dedupworker(&dp, cct, &store, 1234); + EXPECT_EQ("DedupWorker_1234", dedupworker.get_id()); + + RGWChunkScrubWorker scrubworker(&dp, cct, &store, 1234, 12345); + EXPECT_EQ("ScrubWorker_1234", scrubworker.get_id()); +} + + +int main (int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From ec8ebb89b8b64677e427f1fd86d7b00a26aa830c Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Fri, 16 Dec 2022 14:59:53 +0900 Subject: [PATCH 06/15] Remove unnecessary codes --- src/rgw/rgw_dedup_manager.cc | 76 +++++++++++++----------------------- src/rgw/rgw_dedup_manager.h | 11 +----- src/rgw/rgw_dedup_worker.cc | 36 ++++++++--------- src/rgw/rgw_dedup_worker.h | 16 ++++---- 4 files changed, 53 insertions(+), 86 deletions(-) diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index c4e36a405c88d..a4e3dacf8196d 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -13,12 +13,10 @@ const int DEFAULT_SAMPLING_RATIO = 50; const int MAX_OBJ_SCAN_SIZE = 100; const int MAX_BUCKET_SCAN_SIZE = 100; const int DEFAULT_DEDUP_SCRUB_RATIO = 10; -const string DEFAULT_CHUNK_POOL_POSTFIX = "_chunk"; const string DEFAULT_COLD_POOL_POSTFIX = "_cold"; const string DEFAULT_CHUNK_SIZE = "16384"; const string DEFAULT_CHUNK_ALGO = "fastcdc"; const string DEFAULT_FP_ALGO = "sha1"; -const string DEFAULT_HITSET_TYPE = "bloom"; void RGWDedupManager::initialize() { @@ -28,16 +26,6 @@ void RGWDedupManager::initialize() } } -void RGWDedupManager::reset_workers(bool need_scrub) -{ - for (int i = 0; i < num_workers; i++) { - if (need_scrub) { - workers[i].reset(new RGWChunkScrubWorker(dpp, cct, store, i, num_workers)); - } - workers[i].reset(new RGWDedupWorker(dpp, cct, store, i)); - } -} - vector RGWDedupManager::sample_rados_objects() { size_t num_objs = get_num_rados_obj(); @@ -95,36 +83,40 @@ void* RGWDedupManager::entry() vector sampled_indexes = sample_rados_objects(); hand_out_objects(sampled_indexes); + + // trigger RGWDedupWorkers + for (auto& worker : dedup_workers) { + worker->set_run(true); + string name = worker->get_id(); + worker->create(name.c_str()); + } + + // all RGWDedupWorkers synchronized here + for (auto& worker : dedup_workers) { + worker->join(); + } ++dedup_worked_cnt; } else { ldpp_dout(dpp, 2) << "RGWChunkScrubWorkers start" << dendl; - reset_workers(dedup_worked_cnt == dedup_scrub_ratio); - for (auto& worker : workers) { + for (auto& worker : scrub_workers) { worker->initialize(); } - } - - // trigger RGWDedupWorkers - for (auto& worker : workers) - { - worker->set_run(true); - string name = worker->get_id(); - worker->create(name.c_str()); - } - // all RGWDedupWorkers synchronozed here - for (auto& w: workers) - { - w->join(); - } + // trigger RGWChunkScrubWorkers + for (auto& worker : scrub_workers) { + worker->set_run(true); + string name = worker->get_id(); + worker->create(name.c_str()); + } - if (dedup_worked_cnt == dedup_scrub_ratio) { + // all RGWChunkScrubWorkers synchronozed here + for (auto& worker : scrub_workers) { + worker->join(); + } dedup_worked_cnt = 0; - reset_workers(dedup_worked_cnt == dedup_scrub_ratio); - } - + } sleep(3); } @@ -159,24 +151,21 @@ void RGWDedupManager::append_ioctxs(rgw_pool base_pool) string base_pool_name = base_pool.name; librados::IoCtx base_ioctx = get_or_create_ioctx(base_pool); - string chunk_pool_name = base_pool_name + chunk_pool_postfix; - librados::IoCtx chunk_ioctx = get_or_create_ioctx(rgw_pool(chunk_pool_name)); - string cold_pool_name = base_pool_name + cold_pool_postfix; librados::IoCtx cold_ioctx = get_or_create_ioctx(rgw_pool(cold_pool_name)); - dedup_ioctx_set pool_set{base_ioctx, chunk_ioctx, cold_ioctx}; + dedup_ioctx_set pool_set{base_ioctx, cold_ioctx}; ioctx_map.insert({base_pool_name, pool_set}); } void RGWDedupManager::set_dedup_tier(string base_pool_name) { - string chunk_pool_name = ioctx_map[base_pool_name].chunk_pool_ctx.get_pool_name(); + string cold_pool_name = ioctx_map[base_pool_name].cold_pool_ctx.get_pool_name(); librados::Rados* rados = store->getRados()->get_rados_handle(); bufferlist inbl; int ret = rados->mon_command( "{\"prefix\": \"osd pool set\", \"pool\": \"" + base_pool_name - + "\",\"var\": \"dedup_tier\", \"val\": \"" + chunk_pool_name + + "\",\"var\": \"dedup_tier\", \"val\": \"" + cold_pool_name + "\"}", inbl, nullptr, nullptr); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to set dedup_tier" << dendl; @@ -206,15 +195,6 @@ void RGWDedupManager::set_dedup_tier(string base_pool_name) if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to set fingerprint_algorithm" << dendl; } - - ret = rados->mon_command( - "{\"prefix\": \"osd pool set\", \"pool\": \"" + base_pool_name - + "\",\"var\": \"hit_set_type\", \"val\": \"" + hitset_type - + "\"}", inbl, nullptr, nullptr); - - if (ret < 0) { - ldpp_dout(dpp, 0) << "ERROR: failed to set hit_set_type" << dendl; - } } int RGWDedupManager::get_rados_objects(RGWRados::Object::Stat& stat_op) @@ -308,7 +288,7 @@ int RGWDedupManager::prepare_dedup_work() ldpp_dout(dpp, 20) << "get_raw_obj() got duplicated rados object (" << rados_obj.oid << ")" << dendl; is_exist = true; - continue; + break; } } if (!is_exist) { diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h index 818e60e0df0f6..a8c55ad1507a6 100644 --- a/src/rgw/rgw_dedup_manager.h +++ b/src/rgw/rgw_dedup_manager.h @@ -13,12 +13,10 @@ using namespace std; using namespace librados; -extern const string DEFAULT_CHUNK_POOL_POSTFIX; extern const string DEFAULT_COLD_POOL_POSTFIX; extern const string DEFAULT_CHUNK_SIZE; extern const string DEFAULT_CHUNK_ALGO; extern const string DEFAULT_FP_ALGO; -extern const string DEFAULT_HITSET_TYPE; extern const int DEFAULT_NUM_WORKERS; extern const int DEFAULT_SAMPLING_RATIO; extern const int DEFAULT_DEDUP_SCRUB_RATIO; @@ -38,12 +36,10 @@ class RGWDedupManager : public Thread vector rados_objs; vector> workers; - string chunk_pool_postfix; string cold_pool_postfix; string chunk_size; string chunk_algo; string fp_algo; - string hitset_type; int num_workers; int sampling_ratio; int dedup_scrub_ratio; @@ -51,12 +47,11 @@ class RGWDedupManager : public Thread /** * There is a data_pool which is regarded as base-pool for a storage_classes. - * For dedup, a chunk-pool and a cold-pool are required for each base-pool. - * struct dedup_ioctx_set indicates the IoCtxs of the pools related to each other. + * For a deduplication, a cold-pool is required for each base-pool. + * dedup_ioctx_set indicates the IoCtxs of the pools related to each other. */ struct dedup_ioctx_set { IoCtx base_pool_ctx; - IoCtx chunk_pool_ctx; IoCtx cold_pool_ctx; }; // sc data pool (base-pool name) : ioctx_set @@ -67,12 +62,10 @@ class RGWDedupManager : public Thread CephContext* _cct, rgw::sal::RadosStore* _store) : dpp(_dpp), cct(_cct), store(_store), down_flag(true), - chunk_pool_postfix(DEFAULT_CHUNK_POOL_POSTFIX), cold_pool_postfix(DEFAULT_COLD_POOL_POSTFIX), chunk_size(DEFAULT_CHUNK_SIZE), chunk_algo(DEFAULT_CHUNK_ALGO), fp_algo(DEFAULT_FP_ALGO), - hitset_type(DEFAULT_HITSET_TYPE), num_workers(DEFAULT_NUM_WORKERS), sampling_ratio(DEFAULT_SAMPLING_RATIO), dedup_scrub_ratio(DEFAULT_DEDUP_SCRUB_RATIO), diff --git a/src/rgw/rgw_dedup_worker.cc b/src/rgw/rgw_dedup_worker.cc index ba548061c7978..cfecdcbbb2714 100644 --- a/src/rgw/rgw_dedup_worker.cc +++ b/src/rgw/rgw_dedup_worker.cc @@ -5,24 +5,15 @@ #define dout_subsys ceph_subsys_rgw -void Worker::clear_objs() -{ - rados_objs.clear(); -} - -void Worker::append_obj(target_rados_object new_obj) -{ - rados_objs.emplace_back(new_obj); -} -size_t Worker::get_num_objs() +void Worker::set_run(bool run_status) { - return rados_objs.size(); + is_run = run_status; } -void Worker::set_run(bool run_status) +void Worker::stop() { - is_run = run_status; + is_run = false; } @@ -42,14 +33,24 @@ void* RGWDedupWorker::entry() return nullptr; } -void RGWDedupWorker::stop() +void RGWDedupWorker::finalize() { } -void RGWDedupWorker::finalize() +void RGWDedupWorker::clear_objs() { + rados_objs.clear(); +} +void RGWDedupWorker::append_obj(target_rados_object new_obj) +{ + rados_objs.emplace_back(new_obj); +} + +size_t RGWDedupWorker::get_num_objs() +{ + return rados_objs.size(); } @@ -69,11 +70,6 @@ void* RGWChunkScrubWorker::entry() return nullptr; } -void RGWChunkScrubWorker::stop() -{ - -} - void RGWChunkScrubWorker::finalize() { diff --git a/src/rgw/rgw_dedup_worker.h b/src/rgw/rgw_dedup_worker.h index 2be11e36b65eb..f6a3f8e8c7edf 100644 --- a/src/rgw/rgw_dedup_worker.h +++ b/src/rgw/rgw_dedup_worker.h @@ -10,7 +10,6 @@ using namespace std; using namespace librados; -struct target_rados_object; class Worker : public Thread { protected: @@ -18,7 +17,6 @@ class Worker : public Thread CephContext* cct; rgw::sal::RadosStore* store; int id; - vector rados_objs; bool is_run; public: @@ -27,22 +25,21 @@ class Worker : public Thread rgw::sal::RadosStore* _store, int _id) : dpp(_dpp), cct(_cct), store(_store), id(_id), is_run(false) {} - virtual ~Worker() {} + virtual ~Worker() {} virtual void* entry() = 0; - virtual void stop() = 0; virtual void finalize() = 0; virtual void initialize() = 0; + void stop(); - void clear_objs(); - void append_obj(target_rados_object new_obj); virtual string get_id() = 0; - size_t get_num_objs(); void set_run(bool run_status); }; +struct target_rados_object; class RGWDedupWorker : public Worker { + vector rados_objs; public: RGWDedupWorker(const DoutPrefixProvider* _dpp, @@ -53,11 +50,13 @@ class RGWDedupWorker : public Worker virtual ~RGWDedupWorker() override {} virtual void* entry() override; - virtual void stop() override; virtual void finalize() override; virtual void initialize() override; virtual string get_id() override; + void clear_objs(); + void append_obj(target_rados_object new_obj); + size_t get_num_objs(); }; class RGWChunkScrubWorker : public Worker @@ -75,7 +74,6 @@ class RGWChunkScrubWorker : public Worker virtual ~RGWChunkScrubWorker() override {} virtual void* entry() override; - virtual void stop() override; virtual void initialize() override; virtual void finalize() override; From ced67b9e0ffb767aa76da670010007f0f01c6bb8 Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Fri, 16 Dec 2022 15:47:12 +0900 Subject: [PATCH 07/15] Add prepare_scrub_work() which passes their own shard boundaries to each scrub workers. --- src/rgw/rgw_dedup_manager.cc | 78 ++++++++++++++++++++++++++++++++---- src/rgw/rgw_dedup_manager.h | 9 +++-- 2 files changed, 76 insertions(+), 11 deletions(-) diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index a4e3dacf8196d..5add741a12176 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -5,6 +5,7 @@ #include "rgw_dedup_manager.h" #include "rgw_rados.h" +#include "include/rados/librados.h" #define dout_subsys ceph_subsys_rgw @@ -18,11 +19,14 @@ const string DEFAULT_CHUNK_SIZE = "16384"; const string DEFAULT_CHUNK_ALGO = "fastcdc"; const string DEFAULT_FP_ALGO = "sha1"; + void RGWDedupManager::initialize() { for (int i = 0; i < num_workers; i++) { - auto worker = make_unique(dpp, cct, store, i); - workers.emplace_back(move(worker)); + auto dedup_worker = make_unique(dpp, cct, store, i); + dedup_workers.emplace_back(move(dedup_worker)); + auto scrub_worker = make_unique(dpp, cct, store, i, num_workers); + scrub_workers.emplace_back(move(scrub_worker)); } } @@ -47,11 +51,11 @@ void RGWDedupManager::hand_out_objects(vector sampled_indexes) { size_t num_objs_per_worker = sampled_indexes.size() / num_workers; int remain_objs = sampled_indexes.size() % num_workers; - for (auto& worker: workers) { + for (auto& worker: dedup_workers) { worker->clear_objs(); } - vector>::iterator it = workers.begin(); + vector>::iterator it = dedup_workers.begin(); for (auto idx : sampled_indexes) { (*it)->append_obj(rados_objs[idx]); if ((*it)->get_num_objs() >= num_objs_per_worker) { @@ -65,6 +69,61 @@ void RGWDedupManager::hand_out_objects(vector sampled_indexes) } } +struct cold_pool_info_t; +/* + * append cold pool information which is required to get chunk objects + * in order that each RGWChunkScrubWorker can get their own objects in cold pool + */ +int RGWDedupManager::prepare_scrub_work() +{ + int ret = 0; + Rados* rados = store->getRados()->get_rados_handle(); + cold_pool_info_t cold_pool_info; + list cold_pool_names; + map cold_pool_stats; + map cold_to_base; // cold_pool_name : base_pool_name + + for (const auto& [base_pool_name, ioctxs] : ioctx_map) { + string cold_pool_name = ioctxs.cold_pool_ctx.get_pool_name(); + cold_pool_names.emplace_back(cold_pool_name); + cold_to_base[cold_pool_name] = base_pool_name; + } + + ret = rados->get_pool_stats(cold_pool_names, cold_pool_stats); + if (ret < 0) { + ldpp_dout(dpp, 0) << "error fetching pool stats: " << cpp_strerror(ret) << dendl; + return ret; + } + + for (const auto& [cold_pool_name, pool_stat] : cold_pool_stats) { + if (pool_stat.num_objects <= 0) { + ldpp_dout(dpp, 2) << "cold pool (" << cold_pool_name << ") is empty" << dendl; + continue; + } + + cold_pool_info_t cold_pool_info; + ObjectCursor pool_begin, pool_end; + string base_pool_name = cold_to_base[cold_pool_name]; + + IoCtx cold_ioctx = ioctx_map[base_pool_name].cold_pool_ctx; + pool_begin = cold_ioctx.object_list_begin(); + pool_end = cold_ioctx.object_list_end(); + cold_pool_info.ioctx = cold_ioctx; + cold_pool_info.num_objs = pool_stat.num_objects; + + for (int i = 0; i < num_workers; ++i) { + ObjectCursor shard_begin, shard_end; + cold_ioctx.object_list_slice(pool_begin, pool_end, i, num_workers, + &shard_begin, &shard_end); + cold_pool_info.shard_begin = shard_begin; + cold_pool_info.shard_end = shard_end; + + scrub_workers[i]->append_cold_pool_info(cold_pool_info); + } + } + return ret; +} + void* RGWDedupManager::entry() { ldpp_dout(dpp, 2) << "RGWDedupManager started" << dendl; @@ -101,8 +160,9 @@ void* RGWDedupManager::entry() ldpp_dout(dpp, 2) << "RGWChunkScrubWorkers start" << dendl; for (auto& worker : scrub_workers) { - worker->initialize(); + worker->clear_chunk_pool_info(); } + prepare_scrub_work(); // trigger RGWChunkScrubWorkers for (auto& worker : scrub_workers) { @@ -132,10 +192,12 @@ void RGWDedupManager::stop() void RGWDedupManager::finalize() { - for (auto& worker : workers) { - worker.reset(); + for (int i = 0; i < num_workers; ++i) { + dedup_workers[i].reset(); + scrub_workers[i].reset(); } - workers.clear(); + dedup_workers.clear(); + scrub_workers.clear(); } librados::IoCtx RGWDedupManager::get_or_create_ioctx(rgw_pool pool) diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h index a8c55ad1507a6..a94da000222ae 100644 --- a/src/rgw/rgw_dedup_manager.h +++ b/src/rgw/rgw_dedup_manager.h @@ -26,7 +26,8 @@ struct target_rados_object { string pool_name; }; -class Worker; +class RGWDedupWorker; +class RGWChunkScrubWorker; class RGWDedupManager : public Thread { const DoutPrefixProvider* dpp; @@ -34,7 +35,9 @@ class RGWDedupManager : public Thread rgw::sal::RadosStore* store; bool down_flag; vector rados_objs; - vector> workers; + + vector> dedup_workers; + vector> scrub_workers; string cold_pool_postfix; string chunk_size; @@ -80,7 +83,6 @@ class RGWDedupManager : public Thread void set_down_flag(bool new_flag) { down_flag = new_flag; } bool get_down_flag() { return down_flag; } - void reset_workers(bool need_scrub); void set_dedup_tier(string base_pool_name); int prepare_dedup_work(); IoCtx get_or_create_ioctx(rgw_pool pool); @@ -93,6 +95,7 @@ class RGWDedupManager : public Thread int set_sampling_ratio(int new_sampling_ratio); void append_rados_obj(target_rados_object new_obj) { rados_objs.emplace_back(new_obj); } size_t get_num_rados_obj() { return rados_objs.size(); } + int prepare_scrub_work(); }; #endif From 56b7196d490200e8bbd7ec44737aa251d52f4e15 Mon Sep 17 00:00:00 2001 From: ssdohammer Date: Mon, 19 Dec 2022 01:06:01 +0900 Subject: [PATCH 08/15] Add searching chunk objects and repairing objects with mismatched references. --- src/rgw/CMakeLists.txt | 2 + src/rgw/rgw_dedup_manager.cc | 2 +- src/rgw/rgw_dedup_worker.cc | 128 ++++++++++++++++++++++++++++++++++- src/rgw/rgw_dedup_worker.h | 25 +++++-- 4 files changed, 151 insertions(+), 6 deletions(-) diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 2ec8960993469..9e2c4bee96ad1 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -233,6 +233,8 @@ target_link_libraries(rgw_common cls_timeindex_client cls_user_client cls_version_client + cls_cas_client + cls_cas_internal librados rt fmt::fmt diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index 5add741a12176..f9d1a88727128 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -35,7 +35,7 @@ vector RGWDedupManager::sample_rados_objects() size_t num_objs = get_num_rados_obj(); vector sampled_indexes(num_objs); // fill out vector to get sampled indexes - for (size_t i = 0; i < num_objs; i++) { + for (size_t i = 0; i < num_objs; ++i) { sampled_indexes[i] = i; } diff --git a/src/rgw/rgw_dedup_worker.cc b/src/rgw/rgw_dedup_worker.cc index cfecdcbbb2714..fd479201dde66 100644 --- a/src/rgw/rgw_dedup_worker.cc +++ b/src/rgw/rgw_dedup_worker.cc @@ -2,6 +2,8 @@ // vim: ts=8 sw=2 smarttab ft=cpp #include "rgw_dedup_worker.h" +#include "cls/cas/cls_cas_internal.h" +#include "cls/cas/cls_cas_client.h" #define dout_subsys ceph_subsys_rgw @@ -64,9 +66,129 @@ void RGWChunkScrubWorker::initialize() } -void* RGWChunkScrubWorker::entry() +int RGWChunkScrubWorker::do_chunk_repair(IoCtx& cold_ioctx, + const string chunk_obj_name, + const hobject_t src_obj, + int chunk_ref_cnt, + int source_ref_cnt) { + int ret = 0; + assert(chunk_ref_cnt >= source_ref_cnt); + + while (chunk_ref_cnt != source_ref_cnt) { + ObjectWriteOperation op; + cls_cas_chunk_put_ref(op, src_obj); + --chunk_ref_cnt; + ret = cold_ioctx.operate(chunk_obj_name, &op); + if (ret < 0) { + return ret; + } + } + return ret; +} +/* + * - chunk object: A part of the source object that is created by doing deup operation on it. + * It has a reference list containing its' source objects. + * - source object: An original object of its' chunk objects. It has its chunk information + * in a chunk_map. + */ +void* RGWChunkScrubWorker::entry() +{ + Rados* rados = store->getRados()->get_rados_handle(); + ldpp_dout(dpp, 10) << "ScrubWorker_" << id << " starts with " << cold_pool_info.size() + << " infos" << dendl; + + // get sharded chunk objects from all cold pools + for (auto& cold_pool : cold_pool_info) { + ldpp_dout(dpp, 10) << "cold pool (" << cold_pool.ioctx.get_pool_name() + << ") has " << cold_pool.num_objs << " objects" << dendl; + + IoCtx cold_ioctx = cold_pool.ioctx; + ObjectCursor obj_cursor = cold_pool.shard_begin; + while (obj_cursor < cold_pool.shard_end) { + vector obj_shard; + int ret = cold_ioctx.object_list(obj_cursor, cold_pool.shard_end, MAX_OBJ_SCAN_SIZE, {}, + &obj_shard, &obj_cursor); + if (ret < 0) { + ldpp_dout(dpp, 0) << "error object_list: " << cpp_strerror(ret) << dendl; + return nullptr; + } + + for (const auto& obj : obj_shard) { + auto cold_oid = obj.oid; + + chunk_refs_t refs; + bufferlist bl; + ret = cold_ioctx.getxattr(cold_oid, CHUNK_REFCOUNT_ATTR, bl); + if (ret < 0) { + // non-chunk objects are not targets of a RGWChunkScrubWorker + ldpp_dout(dpp, 0) << "object " << cold_oid << " getxattr failed" << dendl; + continue; + } + auto p = bl.cbegin(); + decode(refs, p); + + // do not allow other types except for TYPE_BY_OBJECT + if (refs.get_type() != chunk_refs_t::TYPE_BY_OBJECT) { + continue; + } + + chunk_refs_by_object_t* chunk_refs = + static_cast(refs.r.get()); + + set src_obj_set(chunk_refs->by_object.begin(), + chunk_refs->by_object.end()); + for (auto& src_obj : src_obj_set) { + IoCtx src_ioctx; + // get reference count that chunk object is pointing a src object + int chunk_ref_cnt = chunk_refs->by_object.count(src_obj); + int src_ref_cnt = -1; + + ret = rados->ioctx_create2(src_obj.pool, src_ioctx); + if (ret < 0) { + ldpp_dout(dpp, 0) << cold_oid << " reference " << src_obj + << ": referencing pool does not exist" << dendl; + src_ref_cnt = 0; + } + else { + // get reference count that src object is pointing a chunk object + src_ref_cnt = cls_cas_references_chunk(src_ioctx, src_obj.oid.name, cold_oid); + if (src_ref_cnt < 0) { + if (src_ref_cnt == -ENOENT || src_ref_cnt == -EINVAL) { + ldpp_dout(dpp, 2) << "chunk (" << cold_oid << ") is referencing " << src_obj + << ": referencing object missing" << dendl; + src_ref_cnt = 0; + } + else if (src_ref_cnt == -ENOLINK) { + ldpp_dout(dpp, 2) << "chunk (" << cold_oid << ") is referencing " << src_obj + << ": referencing object does not reference this chunk" << dendl; + src_ref_cnt = 0; + } + else { + ldpp_dout(dpp, 2) << "cls_cas_references_chunk() fail: " + << strerror(src_ref_cnt) << dendl; + continue; + } + } + } + + ldpp_dout(dpp, 10) << "ScrubWorker_" << id << " chunk obj: " << cold_oid + << ", src obj: " << src_obj.oid.name << ", src pool: " << src_obj.pool + << ", chunk_ref_cnt: " << chunk_ref_cnt << ", src_ref_cnt: " << src_ref_cnt + << dendl; + + if (chunk_ref_cnt != src_ref_cnt) { + ret = do_chunk_repair(cold_ioctx, cold_oid, src_obj, chunk_ref_cnt, src_ref_cnt); + if (ret < 0) { + ldpp_dout(dpp, 0) << "do_chunk_repair fail: " << cpp_strerror(ret) << dendl; + continue; + } + } + } + } + } + } return nullptr; } @@ -75,3 +197,7 @@ void RGWChunkScrubWorker::finalize() } +void RGWChunkScrubWorker::append_cold_pool_info(cold_pool_info_t new_pool_info) +{ + cold_pool_info.emplace_back(new_pool_info); +} diff --git a/src/rgw/rgw_dedup_worker.h b/src/rgw/rgw_dedup_worker.h index f6a3f8e8c7edf..981d6b3cc3c0f 100644 --- a/src/rgw/rgw_dedup_worker.h +++ b/src/rgw/rgw_dedup_worker.h @@ -7,9 +7,13 @@ #include "include/rados/librados.hpp" #include "rgw_dedup_manager.h" +extern const int MAX_OBJ_SCAN_SIZE; + using namespace std; using namespace librados; +struct target_rados_object; +struct dedup_ioctx_set; class Worker : public Thread { protected: @@ -36,16 +40,15 @@ class Worker : public Thread void set_run(bool run_status); }; -struct target_rados_object; class RGWDedupWorker : public Worker { vector rados_objs; public: RGWDedupWorker(const DoutPrefixProvider* _dpp, - CephContext* _cct, - rgw::sal::RadosStore* _store, - int _id) + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id) : Worker(_dpp, _cct, _store, _id) {} virtual ~RGWDedupWorker() override {} @@ -59,9 +62,18 @@ class RGWDedupWorker : public Worker size_t get_num_objs(); }; +struct cold_pool_info_t +{ + IoCtx ioctx; + uint64_t num_objs; + ObjectCursor shard_begin; + ObjectCursor shard_end; +}; + class RGWChunkScrubWorker : public Worker { int num_threads; + vector cold_pool_info; public: RGWChunkScrubWorker(const DoutPrefixProvider* _dpp, @@ -78,6 +90,11 @@ class RGWChunkScrubWorker : public Worker virtual void finalize() override; virtual string get_id() override; + void append_cold_pool_info(cold_pool_info_t cold_pool_info); + void clear_chunk_pool_info() {cold_pool_info.clear(); } + int do_chunk_repair(IoCtx& cold_ioctx, const string chunk_obj_name, + const hobject_t src_obj, int chunk_ref_cnt, + int src_ref_cnt); }; #endif From 772288b81ee74a99a91ca5b96498d374d937953e Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Tue, 17 Jan 2023 13:44:27 +0900 Subject: [PATCH 09/15] Fix hand_out_objects() bug --- src/rgw/rgw_dedup_manager.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index f9d1a88727128..f260d4120e09b 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -58,7 +58,7 @@ void RGWDedupManager::hand_out_objects(vector sampled_indexes) vector>::iterator it = dedup_workers.begin(); for (auto idx : sampled_indexes) { (*it)->append_obj(rados_objs[idx]); - if ((*it)->get_num_objs() >= num_objs_per_worker) { + if ((*it)->get_num_objs() == num_objs_per_worker) { // append remain object for even distribution if remain_objs exists if (remain_objs > 0) { --remain_objs; @@ -66,6 +66,9 @@ void RGWDedupManager::hand_out_objects(vector sampled_indexes) } ++it; } + else if ((*it)->get_num_objs() > num_objs_per_worker) { + ++it; + } } } From dd23590c192fb11371a10285d4ca22b5797edbd5 Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Wed, 18 Jan 2023 14:22:19 +0900 Subject: [PATCH 10/15] Add RGWIOTracker --- src/osd/HitSet.h | 17 +++++ src/rgw/CMakeLists.txt | 3 +- src/rgw/rgw_dedup.cc | 6 ++ src/rgw/rgw_dedup.h | 2 + src/rgw/rgw_dedup_iotracker.cc | 126 +++++++++++++++++++++++++++++++++ src/rgw/rgw_dedup_iotracker.h | 67 ++++++++++++++++++ src/rgw/rgw_dedup_manager.cc | 17 +++++ src/rgw/rgw_dedup_manager.h | 4 ++ src/test/rgw/test_rgw_dedup.cc | 42 +++++++++++ 9 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 src/rgw/rgw_dedup_iotracker.cc create mode 100644 src/rgw/rgw_dedup_iotracker.h diff --git a/src/osd/HitSet.h b/src/osd/HitSet.h index dedc45ed471dd..b10163b27b028 100644 --- a/src/osd/HitSet.h +++ b/src/osd/HitSet.h @@ -70,6 +70,8 @@ class HitSet { virtual void dump(ceph::Formatter *f) const = 0; virtual Impl* clone() const = 0; virtual void seal() {} + virtual void insert_string(const std::string& o) {} + virtual bool contains_string(const std::string& o) const { return false; } virtual ~Impl() {} }; @@ -148,6 +150,15 @@ class HitSet { return impl->contains(o); } + /// insert a hash of string into to set + void insert_string(const std::string& o) { + impl->insert_string(o); + } + /// query whether a string value is in the set + bool contains_string(const std::string& o) const { + return impl->contains_string(o); + } + unsigned insert_count() const { return impl->insert_count(); } @@ -418,6 +429,12 @@ class BloomHitSet : public HitSet::Impl { bool contains(const hobject_t& o) const override { return bloom.contains(o.get_hash()); } + void insert_string(const std::string& o) override { + bloom.insert(o); + } + bool contains_string(const std::string& o) const override { + return bloom.contains(o); + } unsigned insert_count() const override { return bloom.element_count(); } diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 9e2c4bee96ad1..984feab690264 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -176,7 +176,8 @@ set(librgw_common_srcs rgw_lua_background.cc rgw_dedup.cc rgw_dedup_manager.cc - rgw_dedup_worker.cc) + rgw_dedup_worker.cc + rgw_dedup_iotracker.cc) list(APPEND librgw_common_srcs store/immutable_config/store.cc diff --git a/src/rgw/rgw_dedup.cc b/src/rgw/rgw_dedup.cc index 177d2406f75b8..8a2321bc0609f 100644 --- a/src/rgw/rgw_dedup.cc +++ b/src/rgw/rgw_dedup.cc @@ -51,3 +51,9 @@ unsigned RGWDedup::get_subsys() const { return dout_subsys; } + + +void RGWDedup::trace_obj(rgw_obj obj) +{ + dedup_manager->trace_obj(obj); +} \ No newline at end of file diff --git a/src/rgw/rgw_dedup.h b/src/rgw/rgw_dedup.h index c3f2a3cb5fab9..c3775b3bdce05 100644 --- a/src/rgw/rgw_dedup.h +++ b/src/rgw/rgw_dedup.h @@ -35,6 +35,8 @@ class RGWDedup : public DoutPrefixProvider CephContext* get_cct() const override { return cct; } unsigned get_subsys() const override; std::ostream& gen_prefix(std::ostream& out) const override { return out << "RGWDedup: "; } + + void trace_obj(rgw_obj obj); }; #endif diff --git a/src/rgw/rgw_dedup_iotracker.cc b/src/rgw/rgw_dedup_iotracker.cc new file mode 100644 index 0000000000000..95c3cd01b6418 --- /dev/null +++ b/src/rgw/rgw_dedup_iotracker.cc @@ -0,0 +1,126 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup_iotracker.h" + +void RGWIOTracker::initialize() +{ + create_hit_set(); + hit_set_map.clear(); +} + +void RGWIOTracker::remove_oldest_hit_set() +{ + if (!hit_set_map.empty()) { + map::iterator it = hit_set_map.begin(); + hit_set_map.erase(it); + } +} + +void RGWIOTracker::create_hit_set() +{ + utime_t now = ceph_clock_now(); + BloomHitSet* bl_hitset = new BloomHitSet(hit_set_target_size, + hit_set_fpp, + now.sec()); + hit_set.reset(new HitSet(bl_hitset)); + hit_set_start_stamp = now; +} + +void RGWIOTracker::check_hit_set_valid(utime_t now, bool is_full) +{ + assert(hit_set_start_stamp > utime_t()); + assert(hit_set_count> 0); + + // active HitSet only case. hit_set_map doesn't need. + if (hit_set_count == 1) { + return; + } + + if (is_full || hit_set_start_stamp + hit_set_period < now) { + { + std::unique_lock lock(iotracker_lock); + hit_set_map.emplace(hit_set_start_stamp, hit_set); + hit_set_map.rbegin()->second->seal(); + create_hit_set(); + } + + while (hit_set_map.size() >= hit_set_count) { + remove_oldest_hit_set(); + } + } +} + +void RGWIOTracker::insert(rgw_obj obj) +{ + assert(hit_set.get()); + + utime_t now = ceph_clock_now(); + if (obj.bucket.bucket_id != "" && obj.get_oid() != "") { + bool is_full = false; + { + std::unique_lock lock(iotracker_lock); + hit_set->insert_string(obj.bucket.bucket_id + ":" + obj.get_oid()); + is_full = hit_set->is_full(); + } + check_hit_set_valid(now, is_full); + } +} + +bool RGWIOTracker::estimate_temp(rgw_obj obj) +{ + assert(hit_set.get()); + + utime_t now = ceph_clock_now(); + check_hit_set_valid(now); + + if (obj.bucket.bucket_id != "" && obj.get_oid() != "") { + { + std::shared_lock lock(iotracker_lock); + if (hit_set->contains_string(obj.bucket.bucket_id+ ":" + obj.get_oid())) { + ldpp_dout(dpp, 10) << obj << " found in active HitSet" << dendl; + return true; + } + } + + // serach from the latest + for (map::reverse_iterator p = hit_set_map.rbegin(); + p != hit_set_map.rend(); + ++p) { + // ignore too old HitSets + if (p->first + (hit_set_count * hit_set_period) < now) { + break; + } + if (p->second->contains_string(obj.bucket.bucket_id + ":" + obj.get_oid())) { + ldpp_dout(dpp, 10) << obj << " found in hit_set_map" << dendl; + return true; + } + } + } + ldpp_dout(dpp, 10) << obj << " not exists" << dendl; + return false; +} + +void RGWIOTracker::finalize() +{ + hit_set_map.clear(); + hit_set.reset(); +} + +void RGWIOTracker::set_hit_set_count(const uint32_t new_count) +{ + assert(new_count > 0); + hit_set_count = new_count; +} + +void RGWIOTracker::set_hit_set_period(const uint32_t new_period) +{ + assert(new_period > 0); + hit_set_period = new_period; +} + +void RGWIOTracker::set_hit_set_target_size(const uint32_t new_target_size) +{ + assert(new_target_size > 0); + hit_set_target_size = new_target_size; +} diff --git a/src/rgw/rgw_dedup_iotracker.h b/src/rgw/rgw_dedup_iotracker.h new file mode 100644 index 0000000000000..6c0a90d16a202 --- /dev/null +++ b/src/rgw/rgw_dedup_iotracker.h @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RGW_DEDUP_IOTRACKER_H +#define CEPH_RGW_DEDUP_IOTRACKER_H + +#include +#include + +#include "osd/HitSet.h" +#include "include/utime.h" +#include "rgw_common.h" + +extern const uint32_t DEFAULT_HITSET_COUNT; +extern const uint32_t DEFAULT_HITSET_PERIOD; +extern const uint32_t DEFAULT_HITSET_TARGET_SIZE; +extern const double DEFAULT_HITSET_FPP; + +using namespace std; + +class RGWIOTracker +{ +protected: + const DoutPrefixProvider* dpp; + + HitSetRef hit_set; // an active HitSet + utime_t hit_set_start_stamp; // creation time of an active HitSet + uint32_t hit_set_count; // num of in-memory HitSets + uint32_t hit_set_period; // active period of HitSet + uint32_t hit_set_target_size; // # allowed objects in a HitSet + double hit_set_fpp; // false positive rate of a HitSet + map hit_set_map; // hot objects + + std::shared_mutex iotracker_lock; // rw-lock (read: estimate, write: insert, trim) + +public: + RGWIOTracker(const DoutPrefixProvider* _dpp) + : dpp(_dpp), hit_set_count(DEFAULT_HITSET_COUNT), + hit_set_period(DEFAULT_HITSET_PERIOD), + hit_set_target_size(DEFAULT_HITSET_TARGET_SIZE), + hit_set_fpp(DEFAULT_HITSET_FPP) {} + ~RGWIOTracker() {} + + void initialize(); + void finalize(); + + // add object to active HitSet + void insert(rgw_obj obj); + + // check if HitSet contains a rgw_obj + bool estimate_temp(rgw_obj obj); + + // discard expired in-memory HitSet + void remove_oldest_hit_set(); + + // create and reset a new active HitSet + void create_hit_set(); + + // deactivate active HitSet if needed + void check_hit_set_valid(utime_t now, bool is_full = false); + + void set_hit_set_count(const uint32_t new_count); + void set_hit_set_period(const uint32_t new_period); + void set_hit_set_target_size(const uint32_t new_target_size); +}; + +#endif diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index f260d4120e09b..43e354fdf4782 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -18,10 +18,17 @@ const string DEFAULT_COLD_POOL_POSTFIX = "_cold"; const string DEFAULT_CHUNK_SIZE = "16384"; const string DEFAULT_CHUNK_ALGO = "fastcdc"; const string DEFAULT_FP_ALGO = "sha1"; +const uint32_t DEFAULT_HITSET_COUNT = 3; +const uint32_t DEFAULT_HITSET_PERIOD = 10; +const uint32_t DEFAULT_HITSET_TARGET_SIZE = 1000; +const double DEFAULT_HITSET_FPP = 0.05; void RGWDedupManager::initialize() { + io_tracker = make_unique(dpp); + io_tracker->initialize(); + for (int i = 0; i < num_workers; i++) { auto dedup_worker = make_unique(dpp, cct, store, i); dedup_workers.emplace_back(move(dedup_worker)); @@ -201,6 +208,9 @@ void RGWDedupManager::finalize() } dedup_workers.clear(); scrub_workers.clear(); + + io_tracker->finalize(); + io_tracker.reset(); } librados::IoCtx RGWDedupManager::get_or_create_ioctx(rgw_pool pool) @@ -390,3 +400,10 @@ int RGWDedupManager::set_sampling_ratio(int new_sampling_ratio) sampling_ratio = new_sampling_ratio; return 0; } + + +void RGWDedupManager::trace_obj(rgw_obj obj) +{ + assert(io_tracker.get()); + io_tracker->insert(obj); +} diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h index a94da000222ae..c334d2ced60ab 100644 --- a/src/rgw/rgw_dedup_manager.h +++ b/src/rgw/rgw_dedup_manager.h @@ -9,6 +9,7 @@ #include "common/Thread.h" #include "rgw_sal_rados.h" #include "rgw_dedup_worker.h" +#include "rgw_dedup_iotracker.h" using namespace std; using namespace librados; @@ -38,6 +39,7 @@ class RGWDedupManager : public Thread vector> dedup_workers; vector> scrub_workers; + unique_ptr io_tracker; string cold_pool_postfix; string chunk_size; @@ -96,6 +98,8 @@ class RGWDedupManager : public Thread void append_rados_obj(target_rados_object new_obj) { rados_objs.emplace_back(new_obj); } size_t get_num_rados_obj() { return rados_objs.size(); } int prepare_scrub_work(); + + void trace_obj(rgw_obj obj); }; #endif diff --git a/src/test/rgw/test_rgw_dedup.cc b/src/test/rgw/test_rgw_dedup.cc index bd7d1cebae598..44b893ec4f7a5 100644 --- a/src/test/rgw/test_rgw_dedup.cc +++ b/src/test/rgw/test_rgw_dedup.cc @@ -6,6 +6,7 @@ #include "test_rgw_common.h" #include "rgw/rgw_dedup_manager.h" #include "rgw/rgw_dedup_worker.h" +#include "rgw/rgw_dedup_iotracker.h" #include "rgw/rgw_sal_rados.h" #include "common/dout.h" @@ -75,6 +76,47 @@ TEST_F(RGWDedupTest, get_worker_id) EXPECT_EQ("ScrubWorker_1234", scrubworker.get_id()); } +// RGWIOTracker test +TEST_F(RGWDedupTest, iotracker) +{ + IOTracker iotracker(&dp); + iotracker.set_hit_set_count(2); + iotracker.set_hit_set_period(2); + iotracker.set_hit_set_target_size(2); + iotracker.initialize(); + + string bucket_id = "test_bucket_id"; + rgw_bucket bucket("tenant", "test_bucket", bucket_id); + + } + rgw_obj obj_01(bucket, "test_obj_01"); + rgw_obj obj_02(bucket, "test_obj_02"); + rgw_obj obj_03(bucket, "test_obj_03"); + rgw_obj obj_04(bucket, "test_obj_04"); + rgw_obj obj_05(bucket, "test_obj_05"); + + EXPECT_EQ(false, iotracker.estimate_temp(rgw_obj())); + EXPECT_EQ(false, iotracker.estimate_temp(obj_01)); + + // spacial locality test + iotracker.insert(obj_01); + EXPECT_EQ(true, iotracker.estimate_temp(obj_01)); + iotracker.insert(obj_02); + EXPECT_EQ(true, iotracker.estimate_temp(obj_02)); + iotracker.insert(obj_03); + EXPECT_EQ(true, iotracker.estimate_temp(obj_03)); + iotracker.insert(obj_04); + EXPECT_EQ(true, iotracker.estimate_temp(obj_04)); + iotracker.insert(obj_05); + EXPECT_EQ(true, iotracker.estimate_temp(obj_05)); + EXPECT_EQ(false, iotracker.estimate_temp(obj_01)); + + // temporal locality test + iotracker.insert(obj_01); + sleep(5); + EXPECT_EQ(false, iotracker.estimate_temp(obj_01)); +} + int main (int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); From 4eba0092416e4d344582d8d636a1843d4d9bd1b4 Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Wed, 18 Jan 2023 17:18:15 +0900 Subject: [PATCH 11/15] Insert rgw_obj to RGWIOTracker --- src/rgw/rgw_process.cc | 22 ++++++++++++++++++++++ src/rgw/rgw_rados.h | 8 ++++++++ src/test/rgw/test_rgw_dedup.cc | 5 ++--- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/rgw/rgw_process.cc b/src/rgw/rgw_process.cc index 2c115444c41d6..35f2a75780ca2 100644 --- a/src/rgw/rgw_process.cc +++ b/src/rgw/rgw_process.cc @@ -20,6 +20,7 @@ #include "rgw_lua_request.h" #include "rgw_tracer.h" #include "rgw_ratelimit.h" +#include "rgw_dedup.h" #include "services/svc_zone_utils.h" @@ -472,5 +473,26 @@ int process_request(rgw::sal::Store* const store, << " ======" << dendl; + auto rados_store = dynamic_cast(store); + // if a type of store is not RadosStore, do nothing + if (rados_store) { + RGWRados* rgw_rados = rados_store->getRados(); + assert(rgw_rados); + + // use IOTracker if RGW uses dedup + if (rgw_rados->get_use_dedup()) { + std::shared_ptr rgw_dedup(rgw_rados->get_dedup()); + assert(rgw_dedup.get()); + + if (s->op_type == RGW_OP_GET_OBJ || s->op_type == RGW_OP_PUT_OBJ) { + rgw_obj rgwobj = s->object->get_obj(); + ldpp_dout(op, 5) << "insert rgw_obj(" << rgwobj << ") into RGWIOTracker" << dendl; + if (rgwobj.bucket.bucket_id != "" && rgwobj.key.name != "") { + rgw_dedup->trace_obj(rgwobj); + } + } + } + } + return (ret < 0 ? ret : s->err.ret); } /* process_request */ diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 5cff58c2429c5..ccd433bda52e3 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -492,6 +492,14 @@ class RGWRados return gc; } + std::shared_ptr get_dedup() { + return dedup; + } + + bool get_use_dedup() { + return use_dedup; + } + RGWRados& set_run_gc_thread(bool _use_gc_thread) { use_gc_thread = _use_gc_thread; return *this; diff --git a/src/test/rgw/test_rgw_dedup.cc b/src/test/rgw/test_rgw_dedup.cc index 44b893ec4f7a5..493a1be9b47ac 100644 --- a/src/test/rgw/test_rgw_dedup.cc +++ b/src/test/rgw/test_rgw_dedup.cc @@ -79,7 +79,7 @@ TEST_F(RGWDedupTest, get_worker_id) // RGWIOTracker test TEST_F(RGWDedupTest, iotracker) { - IOTracker iotracker(&dp); + RGWIOTracker iotracker(&dp); iotracker.set_hit_set_count(2); iotracker.set_hit_set_period(2); iotracker.set_hit_set_target_size(2); @@ -88,7 +88,6 @@ TEST_F(RGWDedupTest, iotracker) string bucket_id = "test_bucket_id"; rgw_bucket bucket("tenant", "test_bucket", bucket_id); - } rgw_obj obj_01(bucket, "test_obj_01"); rgw_obj obj_02(bucket, "test_obj_02"); rgw_obj obj_03(bucket, "test_obj_03"); @@ -113,7 +112,7 @@ TEST_F(RGWDedupTest, iotracker) // temporal locality test iotracker.insert(obj_01); - sleep(5); + sleep(6); EXPECT_EQ(false, iotracker.estimate_temp(obj_01)); } From 0c04af6df42b122687d11224731fc591fd6e4e44 Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Wed, 18 Jan 2023 17:39:20 +0900 Subject: [PATCH 12/15] Find rgw_obj from RGWIOTracker --- src/rgw/rgw_dedup_manager.cc | 17 ++++++++++------- src/rgw/rgw_process.cc | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index 43e354fdf4782..864a4c3822ef5 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -335,12 +335,12 @@ int RGWDedupManager::prepare_dedup_work() } for (auto obj : results.objs) { - ldpp_dout(dpp, 10) << "rgw_obj name: " << obj.key.name << dendl; + ldpp_dout(dpp, 10) << "rgw::sal::RadosObject name: " << obj.key.name << dendl; RGWObjectCtx obj_ctx(store); - unique_ptr rgw_obj = bucket->get_object(obj.key); + unique_ptr rgw_sal_obj = bucket->get_object(obj.key); RGWRados::Object op_target(store->getRados(), bucket.get(), - obj_ctx, rgw_obj.get()); + obj_ctx, rgw_sal_obj.get()); RGWRados::Object::Stat stat_op(&op_target); ret = get_rados_objects(stat_op); if (ret < 0) { @@ -348,6 +348,7 @@ int RGWDedupManager::prepare_dedup_work() } RGWObjManifest& manifest = *stat_op.result.manifest; + rgw_obj rgwobj = manifest.get_obj(); RGWObjManifest::obj_iterator miter; for (miter = manifest.obj_begin(dpp); miter != manifest.obj_end(dpp); @@ -367,10 +368,12 @@ int RGWDedupManager::prepare_dedup_work() } } if (!is_exist) { - target_rados_object obj{rados_obj.oid, rados_obj.pool.name}; - rados_objs.emplace_back(obj); - ldpp_dout(dpp, 10) << " rados_oid name: " << rados_obj.oid - << ", pool.name: " << rados_obj.pool.name << dendl; + if (!io_tracker->estimate_temp(rgwobj)) { + target_rados_object obj{rados_obj.oid, rados_obj.pool.name}; + rados_objs.emplace_back(obj); + ldpp_dout(dpp, 10) << " rados_oid name: " << rados_obj.oid + << ", pool.name: " << rados_obj.pool.name << dendl; + } } string base_pool_name = rados_obj.pool.name; diff --git a/src/rgw/rgw_process.cc b/src/rgw/rgw_process.cc index 35f2a75780ca2..d906f19319800 100644 --- a/src/rgw/rgw_process.cc +++ b/src/rgw/rgw_process.cc @@ -486,7 +486,7 @@ int process_request(rgw::sal::Store* const store, if (s->op_type == RGW_OP_GET_OBJ || s->op_type == RGW_OP_PUT_OBJ) { rgw_obj rgwobj = s->object->get_obj(); - ldpp_dout(op, 5) << "insert rgw_obj(" << rgwobj << ") into RGWIOTracker" << dendl; + ldpp_dout(s, 5) << "insert rgw_obj(" << rgwobj << ") into RGWIOTracker" << dendl; if (rgwobj.bucket.bucket_id != "" && rgwobj.key.name != "") { rgw_dedup->trace_obj(rgwobj); } From be30cdebf3188bcc634bcbb46a96f879c43bd5db Mon Sep 17 00:00:00 2001 From: "daegon.yang" Date: Wed, 18 Jan 2023 21:12:10 +0900 Subject: [PATCH 13/15] rgw/dedup: RGWFPManager initial commit RGWFPManager is a RGWDedup component that stores chunks stat collected by RGWDedupWorker. This commit adds RGWFPManager and test codes. Signed-off-by: daegon.yang --- src/rgw/CMakeLists.txt | 1 + src/rgw/rgw_dedup_manager.cc | 5 ++- src/rgw/rgw_dedup_manager.h | 4 ++ src/rgw/rgw_fp_manager.cc | 77 ++++++++++++++++++++++++++++++++++ src/rgw/rgw_fp_manager.h | 35 ++++++++++++++++ src/test/rgw/test_rgw_dedup.cc | 55 +++++++++++++++++++++++- 6 files changed, 174 insertions(+), 3 deletions(-) create mode 100644 src/rgw/rgw_fp_manager.cc create mode 100644 src/rgw/rgw_fp_manager.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 984feab690264..b756d22293f70 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -175,6 +175,7 @@ set(librgw_common_srcs rgw_tracer.cc rgw_lua_background.cc rgw_dedup.cc + rgw_fp_manager.cc rgw_dedup_manager.cc rgw_dedup_worker.cc rgw_dedup_iotracker.cc) diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index 864a4c3822ef5..48aec5c4c9f40 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -26,13 +26,15 @@ const double DEFAULT_HITSET_FPP = 0.05; void RGWDedupManager::initialize() { + fpmanager = make_shared(chunk_algo, stoi(chunk_size), fp_algo); io_tracker = make_unique(dpp); io_tracker->initialize(); for (int i = 0; i < num_workers; i++) { auto dedup_worker = make_unique(dpp, cct, store, i); dedup_workers.emplace_back(move(dedup_worker)); - auto scrub_worker = make_unique(dpp, cct, store, i, num_workers); + auto scrub_worker = make_unique( + dpp, cct, store, i, num_workers); scrub_workers.emplace_back(move(scrub_worker)); } } @@ -155,6 +157,7 @@ void* RGWDedupManager::entry() // trigger RGWDedupWorkers for (auto& worker : dedup_workers) { + fpmanager->reset_fpmap(); worker->set_run(true); string name = worker->get_id(); worker->create(name.c_str()); diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h index c334d2ced60ab..6d71668e8f23e 100644 --- a/src/rgw/rgw_dedup_manager.h +++ b/src/rgw/rgw_dedup_manager.h @@ -8,6 +8,7 @@ #include "common/Cond.h" #include "common/Thread.h" #include "rgw_sal_rados.h" +#include "rgw_fp_manager.h" #include "rgw_dedup_worker.h" #include "rgw_dedup_iotracker.h" @@ -27,8 +28,10 @@ struct target_rados_object { string pool_name; }; +class RGWFPManager; class RGWDedupWorker; class RGWChunkScrubWorker; + class RGWDedupManager : public Thread { const DoutPrefixProvider* dpp; @@ -37,6 +40,7 @@ class RGWDedupManager : public Thread bool down_flag; vector rados_objs; + shared_ptr fpmanager; vector> dedup_workers; vector> scrub_workers; unique_ptr io_tracker; diff --git a/src/rgw/rgw_fp_manager.cc b/src/rgw/rgw_fp_manager.cc new file mode 100644 index 0000000000000..d51df24d9d766 --- /dev/null +++ b/src/rgw/rgw_fp_manager.cc @@ -0,0 +1,77 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include "rgw_fp_manager.h" + + +#define dout_subsys ceph_subsys_rgw + + +string RGWFPManager::get_chunk_algo() +{ + return chunk_algo; +} + +void RGWFPManager::set_chunk_algo(string chunk_algo) +{ + ceph_assert(chunk_algo == "fixed" || chunk_algo == "fastcdc"); + chunk_algo = chunk_algo; + return; +} + +ssize_t RGWFPManager::get_chunk_size() +{ + return chunk_size; +} + +void RGWFPManager::set_chunk_size(ssize_t chunk_size) +{ + ceph_assert(chunk_size > 0); + chunk_size = chunk_size; + return; +} + +string RGWFPManager::get_fp_algo() +{ + return fp_algo; +} + +void RGWFPManager::set_fp_algo(string fp_algo) +{ + ceph_assert(fp_algo == "sha1" || fp_algo == "sha256" || fp_algo == "sha512"); + fp_algo = fp_algo; + return; +} + +void RGWFPManager::reset_fpmap() +{ + fp_map.clear(); + return; +} + +ssize_t RGWFPManager::get_fpmap_size() +{ + return fp_map.size(); +} + +bool RGWFPManager::find(string& fingerprint) +{ + shared_lock lock(fingerprint_lock); + auto found_item = fp_map.find(fingerprint); + return found_item != fp_map.end(); +} + +void RGWFPManager::add(string& fingerprint) +{ + unique_lock lock(fingerprint_lock); + auto found_iter = fp_map.find(fingerprint); + + if (found_iter == fp_map.end()) { + fp_map.insert({fingerprint, 1}); + } else { + ++found_iter->second; + } + + return; +} \ No newline at end of file diff --git a/src/rgw/rgw_fp_manager.h b/src/rgw/rgw_fp_manager.h new file mode 100644 index 0000000000000..583ed706109b9 --- /dev/null +++ b/src/rgw/rgw_fp_manager.h @@ -0,0 +1,35 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_FP_MANAGER_H +#define CEPH_RGW_FP_MANAGER_H + +#include "include/types.h" + +using namespace std; + + +class RGWFPManager +{ +public: + bool find(string& fingerprint); + void add(string& fingerprint); + RGWFPManager(string _chunk_algo, ssize_t _chunk_size, string _fp_algo) + : chunk_algo(_chunk_algo), chunk_size(_chunk_size), fp_algo(_fp_algo) {} + string get_chunk_algo(); + void set_chunk_algo(string chunk_algo); + ssize_t get_chunk_size(); + void set_chunk_size(ssize_t chunk_size); + string get_fp_algo(); + void set_fp_algo(string fp_algo); + void reset_fpmap(); + ssize_t get_fpmap_size(); +private: + std::shared_mutex fingerprint_lock; + string chunk_algo; + ssize_t chunk_size; + string fp_algo; + unordered_map fp_map; +}; + +#endif \ No newline at end of file diff --git a/src/test/rgw/test_rgw_dedup.cc b/src/test/rgw/test_rgw_dedup.cc index 493a1be9b47ac..8fb4bc0acff21 100644 --- a/src/test/rgw/test_rgw_dedup.cc +++ b/src/test/rgw/test_rgw_dedup.cc @@ -4,12 +4,13 @@ #include "gtest/gtest.h" #include "test/librados/test_cxx.h" #include "test_rgw_common.h" +#include "rgw/rgw_fp_manager.h" #include "rgw/rgw_dedup_manager.h" #include "rgw/rgw_dedup_worker.h" #include "rgw/rgw_dedup_iotracker.h" #include "rgw/rgw_sal_rados.h" #include "common/dout.h" - +#define dout_subsys ceph_subsys_rgw auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT); const DoutPrefix dp(cct, 1, "test rgw dedup: "); @@ -66,7 +67,6 @@ TEST_F(RGWDedupTest, sample_objects) EXPECT_EQ(num_objs * sampling_ratio / 100, sampled_idx.size()); } - TEST_F(RGWDedupTest, get_worker_id) { RGWDedupWorker dedupworker(&dp, cct, &store, 1234); @@ -116,6 +116,57 @@ TEST_F(RGWDedupTest, iotracker) EXPECT_EQ(false, iotracker.estimate_temp(obj_01)); } +TEST_F(RGWDedupTest, fpmanager_add) +{ + RGWFPManager *fpmanager = new RGWFPManager("testchunkalgo", 1234, "testfpalgo"); + + string teststring1 = "1234"; + string teststring2 = "5678"; + + EXPECT_EQ(0, fpmanager->get_fpmap_size()); + + fpmanager->add(teststring1); + EXPECT_EQ(1, fpmanager->get_fpmap_size()); + + fpmanager->add(teststring1); + EXPECT_EQ(1, fpmanager->get_fpmap_size()); + + fpmanager->add(teststring2); + EXPECT_EQ(2, fpmanager->get_fpmap_size()); + + fpmanager->add(teststring1); + EXPECT_EQ(2, fpmanager->get_fpmap_size()); +} + +TEST_F(RGWDedupTest, fpmanager_find) +{ + RGWFPManager *fpmanager = new RGWFPManager("testchunkalgo", 1234, "testfpalgo"); + string teststring1 = "1234"; + string teststring2 = "5678"; + string teststring3 = "asdf"; + + fpmanager->add(teststring1); + fpmanager->add(teststring2); + + EXPECT_EQ(true, fpmanager->find(teststring1)); + EXPECT_EQ(true, fpmanager->find(teststring2)); + EXPECT_EQ(false, fpmanager->find(teststring3)); +} + +TEST_F(RGWDedupTest, reset_fpmap) +{ + RGWFPManager *fpmanager = new RGWFPManager("testchunkalgo", 1234, "testfpalgo"); + string teststring1 = "1234"; + string teststring2 = "5678"; + + fpmanager->add(teststring1); + fpmanager->add(teststring2); + + EXPECT_EQ(2, fpmanager->get_fpmap_size()); + + fpmanager->reset_fpmap(); + EXPECT_EQ(0, fpmanager->get_fpmap_size()); +} int main (int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); From 4b15fdf82be4df138c30c25d91b433cbe4703122 Mon Sep 17 00:00:00 2001 From: "daegon.yang" Date: Wed, 18 Jan 2023 21:48:22 +0900 Subject: [PATCH 14/15] rgw/dedup: RGWDedupWorker's dedup logic implement RGWDedupWorker chunks an RADOS Object and stores Chunk's fingerprint in FPManager. If the fingerprint is already stored in the FPManager, chunk dedup is performed. This commit implements RGWDedupWorker's dedup logic. Signed-off-by: daegon.yang --- src/rgw/rgw_dedup_manager.cc | 3 +- src/rgw/rgw_dedup_worker.cc | 280 +++++++++++++++++++++++++++++++-- src/rgw/rgw_dedup_worker.h | 33 +++- src/test/rgw/test_rgw_dedup.cc | 70 ++++++++- 4 files changed, 366 insertions(+), 20 deletions(-) diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc index 48aec5c4c9f40..5e859a5121cad 100644 --- a/src/rgw/rgw_dedup_manager.cc +++ b/src/rgw/rgw_dedup_manager.cc @@ -31,7 +31,8 @@ void RGWDedupManager::initialize() io_tracker->initialize(); for (int i = 0; i < num_workers; i++) { - auto dedup_worker = make_unique(dpp, cct, store, i); + auto dedup_worker = make_unique( + dpp, cct, store, i, fpmanager); dedup_workers.emplace_back(move(dedup_worker)); auto scrub_worker = make_unique( dpp, cct, store, i, num_workers); diff --git a/src/rgw/rgw_dedup_worker.cc b/src/rgw/rgw_dedup_worker.cc index fd479201dde66..58aaf32443e78 100644 --- a/src/rgw/rgw_dedup_worker.cc +++ b/src/rgw/rgw_dedup_worker.cc @@ -7,6 +7,7 @@ #define dout_subsys ceph_subsys_rgw +unsigned default_op_size = 1 << 26; void Worker::set_run(bool run_status) { @@ -19,11 +20,6 @@ void Worker::stop() } -string RGWDedupWorker::get_id() -{ - return "DedupWorker_" + to_string(id); -} - void RGWDedupWorker::initialize() { @@ -31,6 +27,168 @@ void RGWDedupWorker::initialize() void* RGWDedupWorker::entry() { + ldpp_dout(dpp, 20) << "RGWDedupWorker_" << id << " start" << dendl; + string chunk_algo = fpmanager->get_chunk_algo(); + ceph_assert(chunk_algo == "fixed" || chunk_algo == "fastcdc"); + ssize_t chunk_size = fpmanager->get_chunk_size(); + ceph_assert(chunk_size > 0); + string fp_algo = fpmanager->get_fp_algo(); + ceph_assert(fp_algo == "sha1" || fp_algo == "sha256" || fp_algo == "sha512"); + + map> ioctxs; + + for(auto rados_object : rados_objs) { + librados::Rados* rados = store->getRados()->get_rados_handle(); + IoCtx ioctx; + IoCtx cold_ioctx; + int ret = 0; + + // get ioctx + if (ioctxs.find(rados_object.pool_name) != ioctxs.end()) { + ioctx = ioctxs.find(rados_object.pool_name)->second.first; + cold_ioctx = ioctxs.find(rados_object.pool_name)->second.second; + } + + else { + rados->ioctx_create(rados_object.pool_name.c_str(), ioctx); + rados->ioctx_create((rados_object.pool_name + DEFAULT_COLD_POOL_POSTFIX).c_str(), + cold_ioctx); + ioctxs.insert({rados_object.pool_name, {ioctx, cold_ioctx}}); + } + + list redundant_chunks; + + bufferlist data = read_object_data(ioctx, rados_object.object_name); + + if (data.length() == 0) { + ldpp_dout(dpp, 5) << "Skip dedup object " + << rados_object.object_name << ", object data size is 0" << dendl; + continue; + } + + auto chunks = do_cdc(data, chunk_algo, chunk_size); + + // check if a chunk is duplicated in sampled objects + for(auto &chunk : chunks) { + auto &chunk_data = get<0>(chunk); + string fingerprint = generate_fingerprint(chunk_data, fp_algo); + + if (fpmanager->find(fingerprint)) { + std::pair chunk_boundary = std::get<1>(chunk); + chunk_t chunk_info = { + .start = chunk_boundary.first, + .size = chunk_boundary.second, + .fingerprint = fingerprint, + .data = chunk_data + }; + + redundant_chunks.push_back(chunk_info); + } + + fpmanager->add(fingerprint); + } + + // move data(new <-> chunked <-> entire object) according to policy + ret = check_object_exists(cold_ioctx, rados_object.object_name); + if (redundant_chunks.size() > 0) { + if (ret != -ENOENT) { // entire -> chunked + ObjectWriteOperation promote_op; + promote_op.tier_promote(); + ret = ioctx.operate(rados_object.object_name, &promote_op); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to promote rados object, pool_name: " << rados_object.pool_name + << ", oid: " << rados_object.object_name + << ", ret: " << ret << dendl; + continue; + } + + ObjectWriteOperation unset_op; + unset_op.unset_manifest(); + ret = ioctx.operate(rados_object.object_name, &unset_op); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to unset_manifest rados object, pool_name: " << rados_object.pool_name + << ", oid: " << rados_object.object_name + << ", ret: " << ret << dendl; + continue; + } + + for (auto chunk : redundant_chunks) { + if (check_object_exists(cold_ioctx, chunk.fingerprint) < 0) { + ret = write_object_data(cold_ioctx, chunk.fingerprint, chunk.data); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to write chunk to cold pool, cold_pool_name: " + << rados_object.pool_name << DEFAULT_COLD_POOL_POSTFIX + << ", fingerprint: " << chunk.fingerprint + << ", ret: " << ret << dendl; + continue; + } + } + try_set_chunk(ioctx, cold_ioctx, rados_object.object_name, chunk); + } + } else if (ret == -ENOENT) { // new, chunked -> chunked + for (auto chunk : redundant_chunks) { + if (check_object_exists(cold_ioctx, chunk.fingerprint) == -ENOENT) { + ret = write_object_data(cold_ioctx, chunk.fingerprint, chunk.data); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to write chunk to cold pool, cold_pool_name: " + << rados_object.pool_name << DEFAULT_COLD_POOL_POSTFIX + << ", fingerprint: " << chunk.fingerprint + << ", ret: " << ret << dendl; + continue; + } + } + try_set_chunk(ioctx, cold_ioctx, rados_object.object_name, chunk); + } + } + } else if (redundant_chunks.size() <= 0) { // new, whole -> whole, chunked -> chunked + chunk_t chunk = { + .start = 0, + .size = data.length(), + .fingerprint = rados_object.object_name, + .data = data + }; + + if (ret == -ENOENT) { // new -> whole + ret = write_object_data(cold_ioctx, rados_object.object_name, data); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to write entire object to cold pool, cold_pool_name: " + << rados_object.pool_name << DEFAULT_COLD_POOL_POSTFIX + << ", oid: " << rados_object.object_name + << ", ret: " << ret << dendl; + continue; + } + + ret = try_set_chunk(ioctx, cold_ioctx, rados_object.object_name, chunk); + if (ret == -EOPNOTSUPP) { // chunked -> chunked + ret = cold_ioctx.remove(rados_object.object_name); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to remove entire object in cold pool, cold_pool_name: " + << rados_object.pool_name << DEFAULT_COLD_POOL_POSTFIX + << ", oid: " << rados_object.object_name + << ", ret: " << ret << dendl; + continue; + } + } + } + } + + ObjectReadOperation tier_op; + tier_op.tier_evict(); + ret = ioctx.operate(rados_object.object_name, &tier_op, nullptr); + if (ret < 0) { + ldpp_dout(dpp, 1) << "Failed to tier_evict rados object, pool_name: " + << rados_object.pool_name << ", oid: " + << ", ret: " << ret + << rados_object.object_name << dendl; + continue; + } + } return nullptr; } @@ -40,19 +198,123 @@ void RGWDedupWorker::finalize() } +void RGWDedupWorker::append_obj(target_rados_object new_obj) +{ + rados_objs.emplace_back(new_obj); +} + +size_t RGWDedupWorker::get_num_objs() +{ + return rados_objs.size(); +} + void RGWDedupWorker::clear_objs() { rados_objs.clear(); } -void RGWDedupWorker::append_obj(target_rados_object new_obj) +string RGWDedupWorker::get_id() { - rados_objs.emplace_back(new_obj); + return "DedupWorker_" + to_string(id); } -size_t RGWDedupWorker::get_num_objs() +bufferlist RGWDedupWorker::read_object_data(IoCtx& ioctx, string oid) { - return rados_objs.size(); + bufferlist whole_data; + size_t offset = 0; + int ret = -1; + + while (ret != 0) { + bufferlist partial_data; + ret = ioctx.read(oid, partial_data, default_op_size, offset); + if(ret < 0) + { + ldpp_dout(dpp, 1) << "read object error " << oid << ", offset: " << offset + << ", size: " << default_op_size << ", error:" << cpp_strerror(ret) + << dendl; + bufferlist empty_buf; + return empty_buf; + } + offset += ret; + whole_data.claim_append(partial_data); + } + + return whole_data; +} + +vector>> RGWDedupWorker::do_cdc( + bufferlist &data, string chunk_algo, ssize_t chunk_size) +{ + vector>> ret; + + unique_ptr cdc = CDC::create(chunk_algo, cbits(chunk_size) - 1); + vector> chunks; + cdc->calc_chunks(data, &chunks); + + for (auto &p : chunks) { + bufferlist chunk; + chunk.substr_of(data, p.first, p.second); + ret.push_back(make_tuple(chunk, p)); + } + + return ret; +} + +string RGWDedupWorker::generate_fingerprint( + bufferlist chunk_data, string fp_algo) +{ + string ret; + + switch (pg_pool_t::get_fingerprint_from_str(fp_algo)) { + case pg_pool_t::TYPE_FINGERPRINT_SHA1: + ret = crypto::digest(chunk_data).to_str(); + break; + + case pg_pool_t::TYPE_FINGERPRINT_SHA256: + ret = crypto::digest(chunk_data).to_str(); + break; + + case pg_pool_t::TYPE_FINGERPRINT_SHA512: + ret = crypto::digest(chunk_data).to_str(); + break; + + default: + ceph_assert(0 == "Invalid fp_algo type"); + break; + } + + return ret; +} + +int RGWDedupWorker::check_object_exists(IoCtx& ioctx, string object_name) { + uint64_t size; + time_t mtime; + + int result = ioctx.stat(object_name, &size, &mtime); + + return result; +} + +int RGWDedupWorker::try_set_chunk(IoCtx& ioctx, IoCtx &cold_ioctx, string object_name, chunk_t &chunk) { + ObjectReadOperation chunk_op; + chunk_op.set_chunk( + chunk.start, + chunk.size, + cold_ioctx, + chunk.fingerprint, + 0, + CEPH_OSD_OP_FLAG_WITH_REFERENCE); + int result = ioctx.operate(object_name, &chunk_op, nullptr); + + return result; +} + +int RGWDedupWorker::write_object_data(IoCtx &ioctx, string object_name, bufferlist &data) { + ObjectWriteOperation write_op; + write_op.write_full(data); + int result = ioctx.operate(object_name, &write_op); + + return result; } diff --git a/src/rgw/rgw_dedup_worker.h b/src/rgw/rgw_dedup_worker.h index 981d6b3cc3c0f..859461d923f51 100644 --- a/src/rgw/rgw_dedup_worker.h +++ b/src/rgw/rgw_dedup_worker.h @@ -5,7 +5,9 @@ #define CEPH_RGW_DEDUP_WORKER_H #include "include/rados/librados.hpp" +#include "rgw_fp_manager.h" #include "rgw_dedup_manager.h" +#include "common/CDC.h" extern const int MAX_OBJ_SCAN_SIZE; @@ -14,6 +16,9 @@ using namespace librados; struct target_rados_object; struct dedup_ioctx_set; + +class RGWFPManager; + class Worker : public Thread { protected: @@ -40,26 +45,42 @@ class Worker : public Thread void set_run(bool run_status); }; +struct chunk_t { + size_t start = 0; + size_t size = 0; + string fingerprint = ""; + bufferlist data; +}; class RGWDedupWorker : public Worker { + shared_ptr fpmanager; vector rados_objs; public: RGWDedupWorker(const DoutPrefixProvider* _dpp, CephContext* _cct, rgw::sal::RadosStore* _store, - int _id) - : Worker(_dpp, _cct, _store, _id) {} + int _id, + shared_ptr _fpmanager) + : Worker(_dpp, _cct, _store, _id), fpmanager(_fpmanager) {} virtual ~RGWDedupWorker() override {} + virtual void initialize() override; virtual void* entry() override; virtual void finalize() override; - virtual void initialize() override; - - virtual string get_id() override; - void clear_objs(); + void append_obj(target_rados_object new_obj); size_t get_num_objs(); + void clear_objs(); + + virtual string get_id() override; + + bufferlist read_object_data(IoCtx &ioctx, string oid); + vector>> do_cdc(bufferlist &data, string chunk_algo, ssize_t chunk_size); + string generate_fingerprint(bufferlist chunk_data, string fp_algo); + int check_object_exists(IoCtx& ioctx, string object_name); + int try_set_chunk(IoCtx& ioctx, IoCtx &cold_ioctx, string object_name, chunk_t &chunk); + int write_object_data(IoCtx &ioctx, string object_name, bufferlist &data); }; struct cold_pool_info_t diff --git a/src/test/rgw/test_rgw_dedup.cc b/src/test/rgw/test_rgw_dedup.cc index 8fb4bc0acff21..26e04bbde6a96 100644 --- a/src/test/rgw/test_rgw_dedup.cc +++ b/src/test/rgw/test_rgw_dedup.cc @@ -69,7 +69,9 @@ TEST_F(RGWDedupTest, sample_objects) TEST_F(RGWDedupTest, get_worker_id) { - RGWDedupWorker dedupworker(&dp, cct, &store, 1234); + + shared_ptr fpmanager = make_shared("testchunkalgo", 16384, "testfpalgo"); + RGWDedupWorker dedupworker(&dp, cct, &store, 1234, fpmanager); EXPECT_EQ("DedupWorker_1234", dedupworker.get_id()); RGWChunkScrubWorker scrubworker(&dp, cct, &store, 1234, 12345); @@ -118,7 +120,7 @@ TEST_F(RGWDedupTest, iotracker) TEST_F(RGWDedupTest, fpmanager_add) { - RGWFPManager *fpmanager = new RGWFPManager("testchunkalgo", 1234, "testfpalgo"); + shared_ptr fpmanager = make_shared("testchunkalgo", 16384, "testfpalgo"); string teststring1 = "1234"; string teststring2 = "5678"; @@ -140,7 +142,8 @@ TEST_F(RGWDedupTest, fpmanager_add) TEST_F(RGWDedupTest, fpmanager_find) { - RGWFPManager *fpmanager = new RGWFPManager("testchunkalgo", 1234, "testfpalgo"); + shared_ptr fpmanager = make_shared("testchunkalgo", 16384, "testfpalgo"); + string teststring1 = "1234"; string teststring2 = "5678"; string teststring3 = "asdf"; @@ -155,7 +158,8 @@ TEST_F(RGWDedupTest, fpmanager_find) TEST_F(RGWDedupTest, reset_fpmap) { - RGWFPManager *fpmanager = new RGWFPManager("testchunkalgo", 1234, "testfpalgo"); + shared_ptr fpmanager = make_shared("testchunkalgo", 16384, "testfpalgo"); + string teststring1 = "1234"; string teststring2 = "5678"; @@ -168,6 +172,64 @@ TEST_F(RGWDedupTest, reset_fpmap) EXPECT_EQ(0, fpmanager->get_fpmap_size()); } +TEST_F(RGWDedupTest, do_cdc) +{ + shared_ptr fpmanager = make_shared("testchunkalgo", 16384, "testfpalgo"); + RGWDedupWorker dedupworker(&dp, cct, &store, 1234, fpmanager); + + bufferlist bl; + generate_buffer(4*1024*1024, &bl); + + vector> fixed_expected = { {0, 262144}, {262144, 262144}, {524288, 262144}, {786432, 262144}, {1048576, 262144}, {1310720, 262144}, {1572864, 262144}, {1835008, 262144}, {2097152, 262144}, {2359296, 262144}, {2621440, 262144}, {2883584, 262144}, {3145728, 262144}, {3407872, 262144}, {3670016, 262144}, {3932160, 262144} }; + vector> fastcdc_expected = { {0, 151460}, {151460, 441676}, {593136, 407491}, {1000627, 425767}, {1426394, 602875}, {2029269, 327307}, {2356576, 155515}, {2512091, 159392}, {2671483, 829416}, {3500899, 539667}, {4040566, 153738} }; + + auto fixed_chunks = dedupworker.do_cdc(bl, "fixed", 262144); + ASSERT_EQ(fixed_expected.size(), fixed_chunks.size()); + for (size_t i = 0; i < fixed_chunks.size(); i++) { + EXPECT_EQ(fixed_expected[i], std::get<1>(fixed_chunks[i])); + } + + auto fastcdc_chunks = dedupworker.do_cdc(bl, "fastcdc", 262144); + ASSERT_EQ(fastcdc_expected.size(), fastcdc_chunks.size()); + for (size_t i = 0; i < fastcdc_chunks.size(); i++) { + EXPECT_EQ(fastcdc_expected[i], std::get<1>(fastcdc_chunks[i])); + } + +} + +TEST_F(RGWDedupTest, generate_fingerprint) +{ + shared_ptr fpmanager = make_shared("testchunkalgo", 16384, "testfpalgo"); + RGWDedupWorker dedupworker(&dp, cct, &store, 1234, fpmanager); + + bufferlist data1; + data1.append(""); + EXPECT_EQ("da39a3ee5e6b4b0d3255bfef95601890afd80709", + dedupworker.generate_fingerprint(data1, "sha1")); + EXPECT_EQ("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + dedupworker.generate_fingerprint(data1, "sha256")); + EXPECT_EQ("cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e", + dedupworker.generate_fingerprint(data1, "sha512")); + + bufferlist data2; + data2.append("1234"); + EXPECT_EQ("7110eda4d09e062aa5e4a390b0a572ac0d2c0220", + dedupworker.generate_fingerprint(data2, "sha1")); + EXPECT_EQ("03ac674216f3e15c761ee1a5e255f067953623c8b388b4459e13f978d7c846f4", + dedupworker.generate_fingerprint(data2, "sha256")); + EXPECT_EQ("d404559f602eab6fd602ac7680dacbfaadd13630335e951f097af3900e9de176b6db28512f2e000b9d04fba5133e8b1c6e8df59db3a8ab9d60be4b97cc9e81db", + dedupworker.generate_fingerprint(data2, "sha512")); + + bufferlist data3; + data3.append("1234!@#$qwerQWER"); + EXPECT_EQ("4a8a52f40333d4a0a6b252eea92a157f655c0368", + dedupworker.generate_fingerprint(data3, "sha1")); + EXPECT_EQ("2b1f6dcffcc7cf39bb3b6a202e694699a57caadfa77236360e7934abb760a374", + dedupworker.generate_fingerprint(data3, "sha256")); + EXPECT_EQ("40b4d8d9a012f401488b0d3175cda012310e544dca3697f72554986d3acdbb2afd045370547b8438e9f66c9bf2b52043ff9616da251632d178916f5e9f4b0a65", + dedupworker.generate_fingerprint(data3, "sha512")); +} + int main (int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); From ac0549df789228052c4c8647ff12e688b7687edf Mon Sep 17 00:00:00 2001 From: Sungmin Lee Date: Thu, 19 Jan 2023 14:50:03 +0900 Subject: [PATCH 15/15] Fix object promotion when CEPH_OSD_OP_GETXATTR, and XATTRS op is called on a deduplicated metadata object --- src/osd/PrimaryLogPG.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 6ff3dc295e516..0f732769f597a 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -2545,7 +2545,9 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail( op.op == CEPH_OSD_OP_TIER_PROMOTE || op.op == CEPH_OSD_OP_TIER_FLUSH || op.op == CEPH_OSD_OP_TIER_EVICT || - op.op == CEPH_OSD_OP_ISDIRTY) { + op.op == CEPH_OSD_OP_ISDIRTY || + op.op == CEPH_OSD_OP_GETXATTR || + op.op == CEPH_OSD_OP_GETXATTRS) { return cache_result_t::NOOP; } }