diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 8928e853e64d5..04dcf332430ee 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3850,3 +3850,90 @@ options: services: - rgw with_legacy: true +- name: rgw_enable_dedup_threads + type: bool + level: advanced + desc: Enable RGW deduplication + default: false + services: + - rgw + with_legacy: true +- name: rgw_dedup_num_workers + type: uint + level: advanced + desc: Number of workers for deduplication + default: 3 + services: + - rgw + with_legacy: true +- name: rgw_dedup_chunk_algo + type: str + level: advanced + desc: Chunking Algorithm for deduplication + default: fastcdc + services: + - rgw + enum_values: + - fastcdc + - fixed + with_legacy: true +- name: rgw_dedup_chunk_size + type: uint + level: advanced + desc: Chunk size for deduplication + default: 16384 + services: + - rgw + with_legacy: true +- name: rgw_dedup_fp_algo + type: str + level: advanced + desc: Fingerprint Algorithm for deduplication + default: sha1 + services: + - rgw + enum_values: + - sha1 + - sha256 + - sha512 + with_legacy: true +- name: rgw_dedup_threshold + type: uint + level: advanced + desc: Chunk count threshold for deduplication + default: 3 + services: + - rgw + with_legacy: true +- name: rgw_dedup_scrub_ratio + type: uint + level: advanced + desc: Scrub ratio for deduplication + default: 10 + services: + - rgw + with_legacy: true +- name: rgw_dedup_cold_pool_name + type: str + level: advanced + desc: Cold pool name for RGWDedup + default: default-cold-pool + services: + - rgw + with_legacy: true +- name: rgw_dedup_fpmanager_memory_limit + type: uint + level: advanced + desc: FPManager memory limit for deduplication + default: 4294967296 + services: + - rgw + with_legacy: true +- name: rgw_dedup_fpmanager_low_watermark + type: uint + level: advanced + desc: A low-watermark of fpmap in FPManager + default: 50 + services: + - rgw + with_legacy: true diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index c28184f9c9b69..c1384412a35c6 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -2573,6 +2573,11 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail( return cache_result_t::HANDLED_PROXY; case object_manifest_t::TYPE_CHUNKED: { + // in case of metadata handling ops don't need to promote chunk objects + if (op->may_read() && !op->may_read_data() && !op->may_write()) { + return cache_result_t::NOOP; + } + if (can_proxy_chunked_read(op, obc)) { map::iterator p = flush_ops.find(obc->obs.oi.soid); if (p != flush_ops.end()) { diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 79bc05a4df13a..1b72f3416505a 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -148,6 +148,10 @@ set(librgw_common_srcs rgw_bucket_encryption.cc rgw_tracer.cc rgw_lua_background.cc + rgw_dedup.cc + rgw_fp_manager.cc + rgw_dedup_manager.cc + rgw_dedup_worker.cc driver/rados/cls_fifo_legacy.cc driver/rados/rgw_bucket.cc driver/rados/rgw_bucket_sync.cc @@ -261,6 +265,8 @@ target_link_libraries(rgw_common cls_timeindex_client cls_user_client cls_version_client + cls_cas_client + cls_cas_internal librados rt ICU::uc diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index b714cbd566018..a813e64833985 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -65,6 +65,7 @@ #include "rgw_gc.h" #include "rgw_lc.h" +#include "rgw_dedup.h" #include "rgw_object_expirer_core.h" #include "rgw_sync.h" @@ -1070,6 +1071,13 @@ void RGWRados::finalize() if (run_notification_thread) { rgw::notify::shutdown(); } + + if (use_dedup) { + if (dedup.get()) { + dedup->finalize(); + dedup.reset(); + } + } } /** @@ -1197,6 +1205,17 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y) obj_expirer->start_processor(); } + if (use_dedup) { + dedup = std::make_shared(); + if (dedup->initialize(cct, this->driver) < 0) { + ldpp_dout(dpp, 0) << "initialing RGWDedup failed" << dendl; + } else { + dedup->start_dedup_manager(); + } + } else { + ldpp_dout(dpp, 5) << "note: RGWDedup not initialized" << dendl; + } + auto& current_period = svc.zone->get_current_period(); auto& zonegroup = svc.zone->get_zonegroup(); auto& zone_params = svc.zone->get_zone_params(); diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 29e0e70cc4903..acac17a2ae29b 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -58,6 +58,7 @@ struct RGWZoneGroup; struct RGWZoneParams; class RGWReshard; class RGWReshardWait; +class RGWDedup; struct get_obj_data; @@ -369,12 +370,14 @@ class RGWRados RGWGC* gc{nullptr}; RGWLC* lc{nullptr}; RGWObjectExpirer* obj_expirer{nullptr}; + std::shared_ptr dedup; bool use_gc_thread{false}; bool use_lc_thread{false}; bool quota_threads{false}; bool run_sync_thread{false}; bool run_reshard_thread{false}; bool run_notification_thread{false}; + bool use_dedup; RGWMetaNotifier* meta_notifier{nullptr}; RGWDataNotifier* data_notifier{nullptr}; @@ -523,6 +526,11 @@ class RGWRados return *this; } + RGWRados& set_use_dedup(bool _use_dedup) { + use_dedup = _use_dedup; + return *this; + } + librados::IoCtx* get_lc_pool_ctx() { return &lc_pool_ctx; } diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 40373d82cbab4..4185f3aac8fd2 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -243,6 +243,7 @@ int rgw::AppMain::init_storage() run_sync, g_conf().get_val("rgw_dynamic_resharding"), true, null_yield, // run notification thread + g_conf()->rgw_enable_dedup_threads, g_conf()->rgw_cache_enabled); if (!env.driver) { return -EIO; diff --git a/src/rgw/rgw_dedup.cc b/src/rgw/rgw_dedup.cc new file mode 100644 index 0000000000000..eea17136d095a --- /dev/null +++ b/src/rgw/rgw_dedup.cc @@ -0,0 +1,58 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup.h" + +#define dout_subsys ceph_subsys_rgw + +using namespace std; + + +int RGWDedup::initialize(CephContext* _cct, rgw::sal::RadosStore* _store) +{ + cct = _cct; + store = _store; + dedup_manager = make_unique(this, cct, store); + if (dedup_manager->initialize() < 0) { + return -1; + } + return 0; +} + +void RGWDedup::finalize() +{ + stop_dedup_manager(); + dedup_manager.reset(); +} + +void RGWDedup::start_dedup_manager() +{ + assert(dedup_manager.get()); + dedup_manager->set_down_flag(false); + dedup_manager->create("dedup_manager"); +} + +void RGWDedup::stop_dedup_manager() +{ + assert(dedup_manager.get()); + if (!dedup_manager->get_down_flag()) { + dedup_manager->stop(); + dedup_manager->join(); + } + ldpp_dout(this, 2) << "stop RGWDedup" << dendl; +} + +unsigned RGWDedup::get_subsys() const +{ + return dout_subsys; +} + +CephContext* RGWDedup::get_cct() const +{ + return cct; +} + +std::ostream& RGWDedup::gen_prefix(std::ostream& out) const +{ + return out << "RGWDedup: "; +} diff --git a/src/rgw/rgw_dedup.h b/src/rgw/rgw_dedup.h new file mode 100644 index 0000000000000..014eaa311e780 --- /dev/null +++ b/src/rgw/rgw_dedup.h @@ -0,0 +1,43 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_DEDUP_H +#define CEPH_RGW_DEDUP_H + + +#include +#include +#include + +#include "include/types.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "rgw_dedup_manager.h" + +using namespace std; + +class RGWDedup : public DoutPrefixProvider +{ + CephContext* cct; + rgw::sal::RadosStore* store; + unique_ptr dedup_manager; + +public: + RGWDedup() : cct(nullptr), store(nullptr) {} + RGWDedup(const RGWDedup& rhs) = delete; + RGWDedup& operator=(const RGWDedupManager& rhs) = delete; + virtual ~RGWDedup() override {} + + int initialize(CephContext* _cct, rgw::sal::RadosStore* _store); + void finalize(); + + void start_dedup_manager(); + void stop_dedup_manager(); + + CephContext* get_cct() const override; + unsigned get_subsys() const override; + std::ostream& gen_prefix(std::ostream& out) const override; +}; + +#endif + diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc new file mode 100644 index 0000000000000..cb240ed20f7cc --- /dev/null +++ b/src/rgw/rgw_dedup_manager.cc @@ -0,0 +1,317 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup_manager.h" +#include "rgw_rados.h" +#include "services/svc_zone.h" +#include "include/rados/librados.h" + +#define dout_subsys ceph_subsys_rgw + +const int DEDUP_INTERVAL = 3; +const int MAX_OBJ_SCAN_SIZE = 100; +const int RETRY_SLEEP_PERIOD = 5; +const uint32_t MAX_CHUNK_REF_SIZE = 10000; +const uint32_t DEFAULT_DEDUP_SCRUB_RATIO = 5; + +int RGWDedupManager::initialize() +{ + // initialize dedup parameters from rgw.yaml.in + num_workers = cct->_conf->rgw_dedup_num_workers; + ceph_assert(num_workers > 0); + chunk_algo = cct->_conf->rgw_dedup_chunk_algo; + ceph_assert(chunk_algo == "fastcdc" || chunk_algo == "fixed"); + chunk_size = cct->_conf->rgw_dedup_chunk_size; + ceph_assert(chunk_size > 0); + fp_algo = cct->_conf->rgw_dedup_fp_algo; + ceph_assert(fp_algo == "sha1" || fp_algo == "sha256" || fp_algo == "sha512"); + dedup_threshold = cct->_conf->rgw_dedup_threshold; + ceph_assert(dedup_threshold > 0); + dedup_scrub_ratio = cct->_conf->rgw_dedup_scrub_ratio; + ceph_assert(dedup_scrub_ratio > 0); + cold_pool_name = cct->_conf->rgw_dedup_cold_pool_name; + ceph_assert(!cold_pool_name.empty()); + fpmanager_memory_limit = cct->_conf->rgw_dedup_fpmanager_memory_limit; + ceph_assert(fpmanager_memory_limit > 0); + fpmanager_low_watermark = cct->_conf->rgw_dedup_fpmanager_low_watermark; + ceph_assert(fpmanager_low_watermark > 0); + + rados = store->getRados()->get_rados_handle(); + ceph_assert(rados); + + // get cold pool ioctx + IoCtx cold_ioctx; + int ret = rgw_init_ioctx(dpp, rados, rgw_pool(cold_pool_name), cold_ioctx, false, false); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to get pool=" << cold_pool_name << dendl; + return ret; + } + + // initialize components + fpmanager = make_shared( + dedup_threshold, fpmanager_memory_limit, fpmanager_low_watermark); + + for (uint32_t i = 0; i < num_workers; ++i) { + append_dedup_worker(make_unique( + dpp, cct, store, i, fpmanager, chunk_algo, chunk_size, + fp_algo, dedup_threshold, cold_ioctx)); + append_scrub_worker(make_unique( + dpp, cct, store, i, cold_ioctx)); + } + return 0; +} + +string RGWDedupManager::create_cmd(const string& prefix, + const vector>& options) +{ + ceph_assert(!prefix.empty()); + string cmd("{\"prefix\": \"" + prefix + "\""); + for (auto& opt : options) { + cmd.append(", \"" + opt.first + "\": \"" + opt.second + "\""); + } + cmd.append("}"); + return cmd; +} + +void RGWDedupManager::update_base_pool_info() +{ + RGWZoneParams zone_params = store->svc()->zone->get_zone_params(); + map placement_pools = zone_params.placement_pools; + for (auto& pp : placement_pools) { + rgw_pool data_pool = pp.second.get_standard_data_pool(); + ldpp_dout(dpp, 20) << " placement: " << pp.first << ", data pool: " + << data_pool << dendl; + append_ioctxs(data_pool.name); + } +} + +template +void RGWDedupManager::prepare_worker(vector& workers, + const int rgwdedup_cnt, + const int cur_rgwdedup_id) +{ + ceph_assert(!workers.empty()); + for (auto& worker : workers) { + ceph_assert(worker.get()); + worker->prepare(rgwdedup_cnt * num_workers, + cur_rgwdedup_id * rgwdedup_cnt + worker->get_id()); + } +} + +void RGWDedupManager::run_dedup(uint32_t& dedup_worked_cnt) +{ + ceph_assert(!dedup_workers.empty()); + for (auto& worker : dedup_workers) { + ceph_assert(worker.get()); + worker->create(("DedupWorker_" + to_string(worker->get_id())).c_str()); + } + ++dedup_worked_cnt; +} + +void RGWDedupManager::run_scrub(uint32_t& dedup_worked_cnt) +{ + ceph_assert(!scrub_workers.empty()); + for (auto& worker : scrub_workers) { + ceph_assert(worker.get()); + worker->create(("ScrubWorker_" + to_string(worker->get_id())).c_str()); + } + dedup_worked_cnt = 0; +} + +template +void RGWDedupManager::wait_worker(vector& workers) +{ + ceph_assert(!workers.empty()); + for (auto& worker : workers) { + ceph_assert(worker.get()); + worker->join(); + } +} + +int RGWDedupManager::get_multi_rgwdedup_info(int& num_rgwdedups, int& cur_id) +{ + bufferlist result; + vector> options; + options.emplace_back(make_pair("format", "json")); + string cmd = create_cmd("service dump", options); + + ceph_assert(rados); + if (rados->mgr_command(cmd, bufferlist(), &result, nullptr) < 0) { + ldpp_dout(dpp, 0) << __func__ << " mgr_command " << cmd << " failed" << dendl; + return -EACCES; + } + + string dump = result.to_str(); + JSONParser service_parser; + if (!service_parser.parse(dump.c_str(), dump.size())) { + return -1; + } + + JSONFormattable f; + try { + decode_json_obj(f, &service_parser); + } catch (JSONDecoder::err& e) { + ldpp_dout(dpp, 2) << __func__ << " Failed to decode JSON object" << dendl; + } + + if (!f.exists("services")) { + return -1; + } + if (!f["services"].exists("rgw")) { + return -1; + } + if (!f["services"]["rgw"].exists("daemons")) { + return -1; + } + + uint64_t rgw_gid = rados->get_instance_id(); + num_rgwdedups = f["services"]["rgw"]["daemons"].object().size(); + int idx = 0; + for (const auto& [k, v] : f["services"]["rgw"]["daemons"].object()) { + if (!v.exists("metadata") || !v["metadata"].exists("pid")) { + --num_rgwdedups; + continue; + } + + if (rgw_gid == std::stoull(v["gid"].val())) { + cur_id = idx; + break; + } + ++idx; + } + + // current RGWDedup not found in Ceph cluster + if (idx == num_rgwdedups) { + return -1; + } + return 0; +} + +void* RGWDedupManager::entry() +{ + ldpp_dout(dpp, 2) << "RGWDedupManager started" << dendl; + uint32_t dedup_worked_cnt = 0; + while (!get_down_flag()) { + if (perfcounter) { + perfcounter->set(l_rgw_dedup_worker_count, num_workers); + perfcounter->set(l_rgw_dedup_scrub_ratio, dedup_scrub_ratio); + + if (chunk_algo == "fixed") { + perfcounter->set(l_rgw_dedup_chunk_algo, 1); + } else if (chunk_algo == "fastcdc") { + perfcounter->set(l_rgw_dedup_chunk_algo, 2); + } + perfcounter->set(l_rgw_dedup_chunk_size, chunk_size); + + if (fp_algo == "sha1") { + perfcounter->set(l_rgw_dedup_fp_algo, 1); + } else if (fp_algo == "sha256") { + perfcounter->set(l_rgw_dedup_fp_algo, 2); + } else if (fp_algo == "sha512") { + perfcounter->set(l_rgw_dedup_fp_algo, 3); + } + } + + int num_rgwdedup, cur_rgwdedup_id; + if (get_multi_rgwdedup_info(num_rgwdedup, cur_rgwdedup_id) < 0) { + ldpp_dout(dpp, 2) << "current RGWDedup thread not found yet in Ceph Cluster." + << " Retry a few seconds later." << dendl; + sleep(RETRY_SLEEP_PERIOD); + continue; + } + ldpp_dout(dpp, 10) << "num rgwdedup: " << num_rgwdedup << ", cur rgwdedup id: " + << cur_rgwdedup_id << dendl; + + if (dedup_worked_cnt < dedup_scrub_ratio) { + // do dedup + if (perfcounter) { + perfcounter->set(l_rgw_dedup_current_worker_mode, 1); + } + ceph_assert(fpmanager.get()); + fpmanager->reset_fpmap(); + + update_base_pool_info(); + prepare_worker(dedup_workers, num_rgwdedup, cur_rgwdedup_id); + run_dedup(dedup_worked_cnt); + wait_worker(dedup_workers); + } else { + // do scrub + if (perfcounter) { + perfcounter->set(l_rgw_dedup_current_worker_mode, 2); + } + + prepare_worker(scrub_workers, num_rgwdedup, cur_rgwdedup_id); + run_scrub(dedup_worked_cnt); + wait_worker(scrub_workers); + } + sleep(DEDUP_INTERVAL); + } + return nullptr; +} + +void RGWDedupManager::stop() +{ + set_down_flag(true); + ldpp_dout(dpp, 2) << "RGWDedupManager is set to be stopped" << dendl; +} + +void RGWDedupManager::finalize() +{ + fpmanager->reset_fpmap(); + fpmanager.reset(); + + for (uint32_t i = 0; i < num_workers; ++i) { + dedup_workers[i]->finalize(); + dedup_workers[i].reset(); + } +} + +int RGWDedupManager::append_ioctxs(rgw_pool base_pool) +{ + ceph_assert(rados); + IoCtx base_ioctx; + int ret = rgw_init_ioctx(dpp, rados, base_pool, base_ioctx, false, false); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to get pool=" << base_pool.name << dendl; + return ret; + } + + for (uint32_t i = 0; i < num_workers; ++i) { + dedup_workers[i]->append_base_ioctx(base_ioctx.get_id(), base_ioctx); + scrub_workers[i]->append_base_ioctx(base_ioctx.get_id(), base_ioctx); + } + return 0; +} + +string RGWDedupManager::create_osd_pool_set_cmd(const string prefix, const string base_pool, + const string var, const string val) +{ + vector> options; + options.emplace_back(make_pair("pool", base_pool)); + options.emplace_back(make_pair("var", var)); + options.emplace_back(make_pair("val", val)); + return create_cmd(prefix, options); +} + +void RGWDedupManager::set_down_flag(bool new_flag) +{ + down_flag = new_flag; +} + +bool RGWDedupManager::get_down_flag() +{ + return down_flag; +} + +void RGWDedupManager::append_dedup_worker(unique_ptr&& new_worker) +{ + ceph_assert(new_worker.get()); + dedup_workers.emplace_back(move(new_worker)); +} + +void RGWDedupManager::append_scrub_worker(unique_ptr&& new_worker) +{ + ceph_assert(new_worker.get()); + scrub_workers.emplace_back(move(new_worker)); +} + diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h new file mode 100644 index 0000000000000..fa02181a66d0b --- /dev/null +++ b/src/rgw/rgw_dedup_manager.h @@ -0,0 +1,82 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_DEDUP_MANAGER_H +#define CEPH_RGW_DEDUP_MANAGER_H + +#include "include/types.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "rgw_sal_rados.h" +#include "rgw_perf_counters.h" +#include "rgw_fp_manager.h" +#include "rgw_dedup_worker.h" + +using namespace std; +using namespace librados; + +extern const uint32_t DEFAULT_DEDUP_SCRUB_RATIO; + +class RGWFPManager; +class RGWDedupWorker; +class RGWChunkScrubWorker; +class RGWDedupManager : public Thread +{ + const DoutPrefixProvider* dpp; + CephContext* cct; + rgw::sal::RadosStore* store; + bool down_flag; + Rados* rados; + + shared_ptr fpmanager; + vector> dedup_workers; + vector> scrub_workers; + + string cold_pool_name; + string chunk_algo; + string fp_algo; + uint32_t num_workers; + uint32_t chunk_size; + uint32_t dedup_threshold; + uint32_t dedup_scrub_ratio = DEFAULT_DEDUP_SCRUB_RATIO; + uint64_t fpmanager_memory_limit; + uint32_t fpmanager_low_watermark; + +public: + RGWDedupManager(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store) + : dpp(_dpp), cct(_cct), store(_store), down_flag(true) {} + RGWDedupManager() = delete; + RGWDedupManager(const RGWDedupManager& rhs) = delete; + RGWDedupManager& operator=(const RGWDedupManager& rhs) = delete; + virtual ~RGWDedupManager() override {} + virtual void* entry() override; + + void stop(); + int initialize(); + void finalize(); + void set_down_flag(bool new_flag); + bool get_down_flag(); + + // WorkerType: RGWDedupWorker or RGWChunkScrubWorker + template + void prepare_worker(vector& workers, const int num_rgwdedup, + const int rgwdedup_id); + template + void wait_worker(vector& workers); + void run_dedup(uint32_t& dedup_worked_cnt); + void run_scrub(uint32_t& dedup_worked_cnt); + + int append_ioctxs(rgw_pool base_pool); + void update_base_pool_info(); + string create_cmd(const string& prefix, + const vector>& options); + string create_osd_pool_set_cmd(const string prefix, const string base_pool, + const string var, const string val); + int get_multi_rgwdedup_info(int& num_rgwdedups, int& cur_id); + void append_dedup_worker(unique_ptr&& new_worker); + void append_scrub_worker(unique_ptr&& new_worker); +}; + +#endif diff --git a/src/rgw/rgw_dedup_worker.cc b/src/rgw/rgw_dedup_worker.cc new file mode 100644 index 0000000000000..dae32b81ca8b1 --- /dev/null +++ b/src/rgw/rgw_dedup_worker.cc @@ -0,0 +1,547 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup_worker.h" +#include "cls/cas/cls_cas_client.h" + +#define dout_subsys ceph_subsys_rgw + +unsigned default_obj_read_size = 1 << 26; + +int Worker::get_id() +{ + return id; +} + +void Worker::prepare(const int new_total_workers, const int new_gid) +{ + num_total_workers = new_total_workers; + gid = new_gid; +} + +void Worker::clear_base_ioctx_map(uint64_t id, IoCtx& ioctx) +{ + base_ioctx_map.clear(); +} + +void Worker::append_base_ioctx(uint64_t id, IoCtx& ioctx) +{ + base_ioctx_map.emplace(id, ioctx); +} + +int Worker::get_chunk_refs(IoCtx& chunk_ioctx, + const string& chunk_oid, + chunk_refs_t& refs) +{ + bufferlist bl; + if (chunk_ioctx.getxattr(chunk_oid, CHUNK_REFCOUNT_ATTR, bl) < 0) { + // non-chunk objects are not targets of a RGWChunkScrubWorker + ldpp_dout(dpp, 0) << "object " << chunk_oid << " getxattr failed" << dendl; + return -1; + } + auto p = bl.cbegin(); + decode(refs, p); + + if (refs.get_type() != chunk_refs_t::TYPE_BY_OBJECT) { + ldpp_dout(dpp, 0) << "do not allow other types except for TYPE_BY_OBJECT" << dendl; + return -1; + } + return 0; +} + + + +string RGWDedupWorker::get_archived_obj_name(IoCtx& ioctx, const string obj_name) +{ + ceph_assert(ioctx.get_id() > 0); + ceph_assert(!obj_name.empty()); + return to_string(ioctx.get_id()) + ":" + obj_name; +} + +RGWDedupWorker::MetadataObjType RGWDedupWorker::get_metadata_obj_type + (object_info_t& oi, IoCtx& ioctx, const string obj_name, const uint32_t data_len) +{ + bufferlist bl; + ioctx.getxattr(obj_name, "", bl); + try { + bufferlist::const_iterator bliter = bl.begin(); + decode(oi, bliter); + } catch (...) { + ldpp_dout(dpp, 0) << __func__ << " failed to get object_info_t of " + << obj_name << dendl; + return MetadataObjType::None; + } + + if (!oi.has_manifest()) { + return MetadataObjType::New; + } else if (oi.has_manifest() && oi.manifest.is_chunked()) { + const auto& first_entry = oi.manifest.chunk_map.begin(); + if (first_entry->first == 0 && + first_entry->second.length == data_len && + first_entry->second.oid.oid == get_archived_obj_name(ioctx, obj_name)) { + return MetadataObjType::Archived; + } else if (oi.manifest.chunk_map.size() > 1) { + return MetadataObjType::Deduped; + } + } + return MetadataObjType::None; +} + +template +void RGWDedupWorker::try_object_dedup(IoCtx& base_ioctx, Iter begin, Iter end) +{ + for (auto& obj = begin; obj != end; ++obj) { + list redundant_chunks; + auto target_oid = obj->oid; + ldpp_dout(dpp, 20) << "worker_" << id << " oid: " << target_oid << dendl; + + bufferlist data = read_object_data(base_ioctx, target_oid); + if (data.length() == 0) { + ldpp_dout(dpp, 5) << "Skip dedup object " + << target_oid << ", object data size is 0" << dendl; + continue; + } + auto chunks = do_cdc(data, chunk_algo, chunk_size); + + // check if a chunk is duplicated in sampled objects + for(auto &chunk : chunks) { + auto &chunk_data = get<0>(chunk); + string fingerprint = generate_fingerprint(chunk_data, fp_algo); + fpmanager->add(fingerprint); + if (fpmanager->find(fingerprint) >= dedup_threshold) { + std::pair chunk_boundary = std::get<1>(chunk); + chunk_t chunk_info = { + .start = chunk_boundary.first, + .size = chunk_boundary.second, + .fingerprint = fingerprint, + .data = chunk_data + }; + redundant_chunks.push_back(chunk_info); + } + } + + /** + * Object has 3 state, can be transferred to another state. + * - New: Never scanned before, so it must be transferred to Archived or Deduped state. + * - Archived: No duplicated chunk has been found, entire object is evicted to cold pool. + * - Deduped: Deduplication has occurred due to the duplicated chunks. Duplicated chunks + * have been evicted to cold pool + * + * example) + * +-----------+-----------+-----------+-----------+ + * | object | New | Archive | Deduped | + * +-----------+-----------+-----------+-----------+ + * | base pool | 123456789 | --------- | 1-3456-89 | + * +-----------+-----------+-----------+-----------+ + * | cold pool | --------- | 123456789 | -2----7-- | + * +-----------+-----------+-----------+-----------+ + */ + + object_info_t oi; + MetadataObjType meta_obj_type + = get_metadata_obj_type(oi, base_ioctx, target_oid, data.length()); + ldpp_dout(dpp, 20) << "oid: " << oi.soid << " state: " + << static_cast(meta_obj_type) << dendl; + + if (meta_obj_type == MetadataObjType::Archived) { + if (redundant_chunks.empty()) { + ldpp_dout(dpp, 20) << "oid: " << oi.soid << " Archived -> Archived" << dendl; + // no need further operations + continue; + } else { + // Archived -> Deduped + ldpp_dout(dpp, 20) << "oid: " << oi.soid << " Archived -> Deduped" << dendl; + clear_manifest(base_ioctx, target_oid); + } + } else if (meta_obj_type == MetadataObjType::Deduped) { + if (redundant_chunks.empty()) { + // Deduped -> Archived + ldpp_dout(dpp, 20) << "oid: " << oi.soid << " Deduped -> Archived" << dendl; + clear_manifest(base_ioctx, target_oid); + } + } + + if (redundant_chunks.empty()) { + chunk_t archived_object = { + .start = 0, + .size = data.length(), + .fingerprint = get_archived_obj_name(base_ioctx, target_oid), + .data = data + }; + redundant_chunks.push_back(archived_object); + } + + do_chunk_dedup(base_ioctx, cold_ioctx, target_oid, redundant_chunks, + oi.manifest.chunk_map); + do_data_evict(base_ioctx, target_oid); + } +} + +void* RGWDedupWorker::entry() +{ + ldpp_dout(dpp, 20) << "RGWDedupWorker_" << id << " starts" << dendl; + ceph_assert(chunk_algo == "fixed" || chunk_algo == "fastcdc"); + ceph_assert(chunk_size > 0); + ceph_assert(fp_algo == "sha1" || fp_algo == "sha256" || fp_algo == "sha512"); + ceph_assert(dedup_threshold > 0); + + for (auto& iter : base_ioctx_map) { + uint32_t num_objs = 0; + IoCtx base_ioctx = iter.second; + ObjectCursor pool_begin = base_ioctx.object_list_begin(); + ObjectCursor pool_end = base_ioctx.object_list_end(); + ObjectCursor shard_begin, shard_end; + + // get current worker's shard range of the base pool + base_ioctx.object_list_slice(pool_begin, pool_end, gid, num_total_workers, + &shard_begin, &shard_end); + ldpp_dout(dpp, 20) << "gid/# total workers: " << gid << "/" << num_total_workers + << ", id: " << id << ", scan dir: " << obj_scan_dir << dendl; + + ObjectCursor obj_cursor = shard_begin; + while (obj_cursor < shard_end) { + vector obj_shard; + int ret = base_ioctx.object_list(obj_cursor, shard_end, MAX_OBJ_SCAN_SIZE, + {}, &obj_shard, &obj_cursor); + if (ret < 0) { + ldpp_dout(dpp, 0) << "error object_list: " << cpp_strerror(ret) << dendl; + return nullptr; + } + num_objs += obj_shard.size(); + + if (obj_scan_dir) { + try_object_dedup(base_ioctx, obj_shard.begin(), obj_shard.end()); + } else { + try_object_dedup(base_ioctx, obj_shard.rbegin(), obj_shard.rend()); + } + } + ldpp_dout(dpp, 20) << "RGWDedupWorker_" << id << " pool: " << base_ioctx.get_pool_name() + << ", num objs: " << num_objs << dendl; + } + + // reverse object scanning direction + obj_scan_dir ^= 1; + return nullptr; +} + +bufferlist RGWDedupWorker::read_object_data(IoCtx& ioctx, string object_name) +{ + bufferlist whole_data; + uint64_t offset = 0; + int ret = -1; + + while (ret != 0) { + bufferlist partial_data; + ret = ioctx.read(object_name, partial_data, default_obj_read_size, offset); + if (ret < 0) { + ldpp_dout(dpp, 1) << "read object error pool_name: " << ioctx.get_pool_name() + << ", object_name: " << object_name << ", offset: " << offset + << ", size: " << default_obj_read_size << ", error:" << cpp_strerror(ret) + << dendl; + return bufferlist(); + } + offset += ret; + whole_data.claim_append(partial_data); + } + + if (perfcounter) { + perfcounter->inc(l_rgw_dedup_worker_read, whole_data.length()); + } + return whole_data; +} + +int RGWDedupWorker::write_object_data(IoCtx &ioctx, string object_name, bufferlist &data) +{ + ObjectWriteOperation write_op; + write_op.write_full(data); + int ret = ioctx.operate(object_name, &write_op); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to write rados object, pool_name: "<< ioctx.get_pool_name() + << ", object_name: " << object_name << ", ret: " << ret << dendl; + } + + if (perfcounter) { + perfcounter->inc(l_rgw_dedup_worker_write, data.length()); + } + return ret; +} + +int RGWDedupWorker::check_object_exists(IoCtx& ioctx, string object_name) +{ + uint64_t size; + time_t mtime; + return ioctx.stat(object_name, &size, &mtime); +} + +int RGWDedupWorker::try_set_chunk(IoCtx& ioctx, IoCtx& cold_ioctx, + string object_name, chunk_t &chunk) +{ + ObjectReadOperation chunk_op; + chunk_op.set_chunk( + chunk.start, + chunk.size, + cold_ioctx, + chunk.fingerprint, + 0, + CEPH_OSD_OP_FLAG_WITH_REFERENCE); + return ioctx.operate(object_name, &chunk_op, nullptr); +} + +void RGWDedupWorker::do_chunk_dedup(IoCtx& ioctx, IoCtx& cold_ioctx, + string object_name, + list redundant_chunks, + map& chunk_map) +{ + for (auto chunk : redundant_chunks) { + if (check_object_exists(cold_ioctx, chunk.fingerprint) < 0) { + int ret = write_object_data(cold_ioctx, chunk.fingerprint, chunk.data); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to write chunk to cold pool, cold_pool_name: " + << cold_ioctx.get_pool_name() + << ", fingerprint: " << chunk.fingerprint + << ", ret: " << ret << dendl; + return; + } + if (perfcounter) { + perfcounter->inc(l_rgw_dedup_chunk_data_size, chunk.data.length()); + } + } else { + if (chunk_map.find(chunk.start) != chunk_map.end() && + chunk_map[chunk.start].length == chunk.size && + chunk_map[chunk.start].oid.oid == chunk.fingerprint && + chunk_map[chunk.start].has_reference()) { + // this chunk has already been deduped. skip this chunk + ldpp_dout(dpp, 0) << chunk.fingerprint << " deduped -> deduped" << dendl; + continue; + } + + chunk_refs_t refs; + if (get_chunk_refs(cold_ioctx, chunk.fingerprint, refs)) { + continue; + } + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + + // # refs of chunk object must be less than max_chunk_ref_size + if (chunk_refs->by_object.size() >= max_chunk_ref_size) { + continue; + } + } + + if (try_set_chunk(ioctx, cold_ioctx, object_name, chunk) >= 0 && perfcounter) { + perfcounter->inc(l_rgw_dedup_deduped_data_size, chunk.data.length()); + } + } +} + +void RGWDedupWorker::do_data_evict(IoCtx &ioctx, string object_name) +{ + ObjectReadOperation tier_op; + tier_op.tier_evict(); + int ret = ioctx.operate(object_name, &tier_op, nullptr); + if (ret < 0) { + ldpp_dout(dpp, 1) << "Failed to tier_evict rados object, pool_name: " + << ioctx.get_pool_name() << ", object_name: " + << object_name << ", ret: " << ret << dendl; + } +} + +int RGWDedupWorker::clear_manifest(IoCtx &ioctx, string object_name) +{ + ObjectWriteOperation promote_op; + promote_op.tier_promote(); + int ret = ioctx.operate(object_name, &promote_op); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to tier promote rados object, pool_name: " << ioctx.get_pool_name() + << ", object_name: " << object_name << ", ret: " << ret << dendl; + return ret; + } + + ObjectWriteOperation unset_op; + unset_op.unset_manifest(); + ret = ioctx.operate(object_name, &unset_op); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to unset_manifest rados object, pool_name: " << ioctx.get_pool_name() + << ", object_name: " << object_name << ", ret: " << ret << dendl; + } + return ret; +} + +int RGWDedupWorker::remove_object(IoCtx &ioctx, string object_name) +{ + int ret = ioctx.remove(object_name); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to remove entire object in pool, pool_name: " + << ioctx.get_pool_name() << ", object_name: " + << object_name << ", ret: " << ret << dendl; + } + return ret; +} + +vector RGWDedupWorker::do_cdc(bufferlist &data, string chunk_algo, + uint32_t chunk_size) +{ + vector ret; + unique_ptr cdc = CDC::create(chunk_algo, cbits(chunk_size) - 1); + vector> chunks; + cdc->calc_chunks(data, &chunks); + + for (auto &p : chunks) { + bufferlist chunk; + chunk.substr_of(data, p.first, p.second); + ret.push_back(make_tuple(chunk, p)); + } + return ret; +} + +string RGWDedupWorker::generate_fingerprint(bufferlist chunk_data, string fp_algo) +{ + switch (pg_pool_t::get_fingerprint_from_str(fp_algo)) { + case pg_pool_t::TYPE_FINGERPRINT_SHA1: + return crypto::digest(chunk_data).to_str(); + + case pg_pool_t::TYPE_FINGERPRINT_SHA256: + return crypto::digest(chunk_data).to_str(); + + case pg_pool_t::TYPE_FINGERPRINT_SHA512: + return crypto::digest(chunk_data).to_str(); + + default: + ceph_assert(0 == "Invalid fp_algo type"); + break; + } + return string(); +} + +void RGWDedupWorker::finalize() +{ + fpmanager.reset(); +} + +void RGWDedupWorker::set_max_chunk_ref_size(const uint32_t new_max_size) +{ + max_chunk_ref_size = new_max_size; +} + + +int RGWChunkScrubWorker::do_chunk_repair(IoCtx& cold_ioctx, const string chunk_obj_name, + const hobject_t src_obj, int chunk_ref_cnt, + int source_ref_cnt) +{ + ceph_assert(chunk_ref_cnt >= source_ref_cnt); + + while (chunk_ref_cnt != source_ref_cnt) { + ObjectWriteOperation op; + cls_cas_chunk_put_ref(op, src_obj); + --chunk_ref_cnt; + int ret = cold_ioctx.operate(chunk_obj_name, &op); + if (ret < 0) { + return ret; + } + } + return 0; +} + +int RGWChunkScrubWorker::get_src_ref_cnt(const hobject_t& src_obj, + const string& chunk_oid) +{ + IoCtx src_ioctx; + int src_ref_cnt = -1; + if (base_ioctx_map.find(src_obj.pool) != base_ioctx_map.end()) { + src_ioctx = base_ioctx_map[src_obj.pool]; + } else { + // if base pool not found, try create ioctx + Rados* rados = store->getRados()->get_rados_handle(); + int ret = rados->ioctx_create2(src_obj.pool, src_ioctx); + if (ret < 0) { + ldpp_dout(dpp, 1) << __func__ << " src pool " << src_obj.pool + << " does not exist" << dendl; + return ret; + } + } + + // get reference count that src object is pointing a chunk object + src_ref_cnt = cls_cas_references_chunk(src_ioctx, src_obj.oid.name, chunk_oid); + if (src_ref_cnt < 0) { + if (src_ref_cnt == -ENOENT || src_ref_cnt == -EINVAL) { + ldpp_dout(dpp, 2) << "chunk (" << chunk_oid << ") is referencing " << src_obj + << ": referencing object missing" << dendl; + } else if (src_ref_cnt == -ENOLINK) { + ldpp_dout(dpp, 2) << "chunk (" << chunk_oid << ") is referencing " << src_obj + << ": referencing object does not reference this chunk" << dendl; + } else { + ldpp_dout(dpp, 0) << "cls_cas_references_chunk() fail: " + << strerror(src_ref_cnt) << dendl; + } + src_ref_cnt = 0; + } + return src_ref_cnt; +} + +/* + * - chunk object: A part of the source object that is created by doing deup + * operation on it. It has a reference list containing its' source objects. + * - source object: An original object of its' chunk objects. It has its chunk + * information in a chunk_map. + */ +void* RGWChunkScrubWorker::entry() +{ + ldpp_dout(dpp, 20) << "RGWChunkScrubWorker_" << id << " starts" << dendl; + + ObjectCursor shard_begin, shard_end; + // get current worker's shard range + cold_ioctx.object_list_slice(cold_ioctx.object_list_begin(), + cold_ioctx.object_list_end(), + id, num_total_workers, &shard_begin, &shard_end); + ObjectCursor obj_cursor = shard_begin; + uint32_t num_objs = 0; + while (obj_cursor < shard_end) { + vector obj_shard; + if (cold_ioctx.object_list(obj_cursor, shard_end, MAX_OBJ_SCAN_SIZE, {}, + &obj_shard, &obj_cursor) < 0) { + ldpp_dout(dpp, 0) << "error object_list" << dendl; + return nullptr; + } + + for (const auto& obj : obj_shard) { + auto cold_oid = obj.oid; + chunk_refs_t refs; + if (get_chunk_refs(cold_ioctx, cold_oid, refs) < 0) { + continue; + } + + chunk_refs_by_object_t* chunk_refs = + static_cast(refs.r.get()); + set src_obj_set(chunk_refs->by_object.begin(), + chunk_refs->by_object.end()); + for (auto& src_obj : src_obj_set) { + // get reference count that chunk object is pointing a src object + int chunk_ref_cnt = chunk_refs->by_object.count(src_obj); + int src_ref_cnt = get_src_ref_cnt(src_obj, cold_oid); + + ldpp_dout(dpp, 10) << "ScrubWorker_" << id << " chunk obj: " << cold_oid + << ", src obj: " << src_obj.oid.name << ", src pool: " << src_obj.pool + << ", chunk_ref_cnt: " << chunk_ref_cnt << ", src_ref_cnt: " << src_ref_cnt + << dendl; + + if (chunk_ref_cnt != src_ref_cnt) { + if (do_chunk_repair(cold_ioctx, cold_oid, src_obj, + chunk_ref_cnt, src_ref_cnt) < 0) { + ldpp_dout(dpp, 0) << "do_chunk_repair fail" << dendl; + continue; + } + } + } + } + num_objs += obj_shard.size(); + } + ldpp_dout(dpp, 20) << "RGWChunkScrubWorker_" << id << " pool: " + << cold_ioctx.get_pool_name() << ", num objs: " << num_objs << dendl; + return nullptr; +} + diff --git a/src/rgw/rgw_dedup_worker.h b/src/rgw/rgw_dedup_worker.h new file mode 100644 index 0000000000000..8a9a552125a31 --- /dev/null +++ b/src/rgw/rgw_dedup_worker.h @@ -0,0 +1,166 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_DEDUP_WORKER_H +#define CEPH_RGW_DEDUP_WORKER_H + +#include "cls/cas/cls_cas_internal.h" +#include "include/rados/librados.hpp" +#include "rgw_perf_counters.h" +#include "rgw_fp_manager.h" +#include "rgw_dedup_manager.h" +#include "common/CDC.h" + +extern const int MAX_OBJ_SCAN_SIZE; +extern const uint32_t MAX_CHUNK_REF_SIZE; + +extern const int MAX_OBJ_SCAN_SIZE; + +using namespace std; +using namespace librados; + +class RGWFPManager; +class Worker : public Thread +{ +protected: + const DoutPrefixProvider* dpp; + CephContext* cct; + rgw::sal::RadosStore* store; + + // local worker id of a RGWDedup + int id = -1; + // # workers throughout total RGWDedups (# RGWDedup * # workers) + int num_total_workers = 0; + // global worker id throughout total RGWDedups + int gid = -1; + map base_ioctx_map; + IoCtx cold_ioctx; + + enum class MetadataObjType : int { + New, + Archived, + Deduped, + None + }; + +public: + Worker(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id, + IoCtx _cold_ioctx) + : dpp(_dpp), cct(_cct), store(_store), id(_id), + cold_ioctx(_cold_ioctx) {} + Worker() = delete; + Worker(const Worker& rhs) = delete; + Worker& operator=(const Worker& rhs) = delete; + virtual ~Worker() {} + + virtual void* entry() = 0; + + int get_id(); + void prepare(const int new_total_workers, const int new_gid); + void clear_base_ioctx_map(uint64_t id, IoCtx& ioctx); + void append_base_ioctx(uint64_t name, IoCtx& ioctx); + + // get references of chunk object + int get_chunk_refs(IoCtx& chunk_ioctx, const string& chunk_oid, chunk_refs_t& refs); +}; + +// > +using ChunkInfoType = tuple>; +class RGWDedupWorker : public Worker +{ + bool obj_scan_dir; // true: scan obj forward, false: scan object reverse + shared_ptr fpmanager; + string chunk_algo; + uint32_t chunk_size; + string fp_algo; + uint32_t dedup_threshold; + uint32_t max_chunk_ref_size; + +public: + RGWDedupWorker(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id, + shared_ptr _fpmanager, + string _chunk_algo, + uint32_t _chunk_size, + string _fp_algo, + uint32_t _dedup_threshold, + IoCtx _cold_ioctx) + : Worker(_dpp, _cct, _store, _id, _cold_ioctx), + obj_scan_dir(true), + fpmanager(_fpmanager), + chunk_algo(_chunk_algo), + chunk_size(_chunk_size), + fp_algo(_fp_algo), + dedup_threshold(_dedup_threshold), + max_chunk_ref_size(MAX_CHUNK_REF_SIZE) {} + RGWDedupWorker() = delete; + RGWDedupWorker(const RGWDedupWorker& rhs) = delete; + RGWDedupWorker& operator=(const RGWDedupWorker& rhs) = delete; + virtual ~RGWDedupWorker() override {} + + struct chunk_t { + size_t start = 0; + size_t size = 0; + string fingerprint = ""; + bufferlist data; + }; + + virtual void* entry() override; + void finalize(); + + template + void try_object_dedup(IoCtx& base_ioctx, Iter begin, Iter end); + bufferlist read_object_data(IoCtx& ioctx, string object_name); + int write_object_data(IoCtx& ioctx, string object_name, bufferlist& data); + int check_object_exists(IoCtx& ioctx, string object_name); + MetadataObjType get_metadata_obj_type(object_info_t& oi, + IoCtx& ioctx, + const string obj_name, + const uint32_t data_len); + void do_chunk_dedup(IoCtx& ioctx, IoCtx& cold_ioctx, string object_name, + list redundant_chunks, + map& chunk_map); + void do_data_evict(IoCtx& ioctx, string object_name); + int remove_object(IoCtx &ioctx, string object_name); + + int try_set_chunk(IoCtx& ioctx, IoCtx& cold_ioctx, string object_name, + chunk_t& chunk); + int clear_manifest(IoCtx& ioctx, string object_name); + vector do_cdc(bufferlist& data, string chunk_algo, + uint32_t chunk_size); + string generate_fingerprint(bufferlist chunk_data, string fp_algo); + string get_archived_obj_name(IoCtx& ioctx, const string obj_name); + void set_max_chunk_ref_size(const uint32_t new_max_size); +}; + +class RGWChunkScrubWorker : public Worker +{ +public: + RGWChunkScrubWorker(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id, + IoCtx _cold_ioctx) + : Worker(_dpp, _cct, _store, _id, _cold_ioctx) {} + RGWChunkScrubWorker() = delete; + RGWChunkScrubWorker(const RGWChunkScrubWorker& rhs) = delete; + RGWChunkScrubWorker& operator=(const RGWChunkScrubWorker& rhs) = delete; + virtual ~RGWChunkScrubWorker() override {} + + virtual void* entry() override; + + // fix mismatched chunk reference + int do_chunk_repair(IoCtx& cold_ioctx, const string chunk_obj_name, + const hobject_t src_obj, int chunk_ref_cnt, + int src_ref_cnt); + + // check whether dedup reference is mismatched (false is mismatched) + int get_src_ref_cnt(const hobject_t& src_obj, const string& chunk_oid); +}; + +#endif diff --git a/src/rgw/rgw_fp_manager.cc b/src/rgw/rgw_fp_manager.cc new file mode 100644 index 0000000000000..cc7f8b2ba65fa --- /dev/null +++ b/src/rgw/rgw_fp_manager.cc @@ -0,0 +1,77 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include "rgw_fp_manager.h" + +#define dout_subsys ceph_subsys_rgw + +void RGWFPManager::reset_fpmap() +{ + fp_map.clear(); +} + +void RGWFPManager::set_low_watermark(const uint32_t new_low_wm) +{ + ceph_assert(new_low_wm <= 100); + low_watermark = new_low_wm; +} + +size_t RGWFPManager::find(const string& fingerprint) +{ + shared_lock lock(fingerprint_lock); + auto found_item = fp_map.find(fingerprint); + if ( found_item != fp_map.end() ) { + return found_item->second; + } + return 0; +} + +uint32_t RGWFPManager::get_fpmap_memory_size() +{ + if (fp_map.empty()) { + return 0; + } + return fp_map.size() * + (fp_map.begin()->first.length() + sizeof(fp_map.begin()->second)); +} + +size_t RGWFPManager::get_fpmap_size() +{ + return fp_map.size(); +} + +void RGWFPManager::check_memory_limit_and_do_evict() +{ + ceph_assert(memory_limit > 0); + if (get_fpmap_memory_size() > memory_limit) { + uint32_t current_dedup_threshold = dedup_threshold; + uint32_t target_fpmap_size = memory_limit * low_watermark / 100; + auto iter = fp_map.begin(); + while (get_fpmap_memory_size() > target_fpmap_size) { + if (iter == fp_map.end()) { + iter = fp_map.begin(); + ++current_dedup_threshold; + } + + if (iter->second < current_dedup_threshold) { + iter = fp_map.erase(iter); + } else { + ++iter; + } + } + } +} + +void RGWFPManager::add(string& fingerprint) +{ + unique_lock lock(fingerprint_lock); + auto found_iter = fp_map.find(fingerprint); + if (found_iter == fp_map.end()) { + check_memory_limit_and_do_evict(); + fp_map.insert({fingerprint, 1}); + } else { + ++found_iter->second; + } +} + diff --git a/src/rgw/rgw_fp_manager.h b/src/rgw/rgw_fp_manager.h new file mode 100644 index 0000000000000..1762e13b6b409 --- /dev/null +++ b/src/rgw/rgw_fp_manager.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_FP_MANAGER_H +#define CEPH_RGW_FP_MANAGER_H + +#include "include/types.h" + +using namespace std; + +class RGWFPManager +{ +public: + RGWFPManager(uint32_t _dedup_threshold, uint64_t _memory_limit, + uint32_t _low_watermark) + : dedup_threshold(_dedup_threshold), + memory_limit(_memory_limit), + low_watermark(_low_watermark) {} + RGWFPManager() = delete; + RGWFPManager(const RGWFPManager& rhs) = delete; + RGWFPManager& operator=(const RGWFPManager& rhs) = delete; + virtual ~RGWFPManager() {} + + void reset_fpmap(); + size_t find(const string& fingerprint); + void add(string& fingerprint); + void check_memory_limit_and_do_evict(); + void set_low_watermark(const uint32_t new_low_wm); + uint32_t get_fpmap_memory_size(); + size_t get_fpmap_size(); + +private: + std::shared_mutex fingerprint_lock; + uint32_t dedup_threshold; + uint64_t memory_limit; + uint32_t low_watermark; + unordered_map fp_map; +}; + +#endif diff --git a/src/rgw/rgw_object_expirer.cc b/src/rgw/rgw_object_expirer.cc index 7a49fc8d161ef..0a92dc3b13d9b 100644 --- a/src/rgw/rgw_object_expirer.cc +++ b/src/rgw/rgw_object_expirer.cc @@ -83,7 +83,7 @@ int main(const int argc, const char **argv) DriverManager::Config cfg; cfg.store_name = "rados"; cfg.filter_name = "none"; - driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, false, false, false, false, false, false, null_yield); + driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, false, false, false, false, false, false, null_yield, false); if (!driver) { std::cerr << "couldn't init storage provider" << std::endl; return EIO; diff --git a/src/rgw/rgw_perf_counters.cc b/src/rgw/rgw_perf_counters.cc index fd058ab00a9f2..bb9c164e4e492 100644 --- a/src/rgw/rgw_perf_counters.cc +++ b/src/rgw/rgw_perf_counters.cc @@ -31,6 +31,23 @@ int rgw_perf_start(CephContext *cct) plb.add_u64_counter(l_rgw_cache_hit, "cache_hit", "Cache hits"); plb.add_u64_counter(l_rgw_cache_miss, "cache_miss", "Cache miss"); + plb.add_u64(l_rgw_dedup_original_data_size, "dedup_original_data_size", "Total RGW object size before deduplication"); + plb.add_u64(l_rgw_dedup_archived_data_size, "dedup_archived_data_size", "Total archived object size during deduplication"); + plb.add_u64(l_rgw_dedup_chunk_data_size, "dedup_chunk_data_size", "Total chunk object size during deduplication"); + plb.add_u64(l_rgw_dedup_deduped_data_size, "dedup_deduped_data_size", "Current deduped data size during deduplication"); + plb.add_u64(l_rgw_dedup_current_worker_mode, "dedup_current_worker_mode", "Current worker mode(dedup | scrub)"); + plb.add_u64_counter(l_rgw_dedup_worker_read, "dedup_worker_read", "RGWDedupWorker's read size to base pools"); + plb.add_u64_counter(l_rgw_dedup_worker_write, "dedup_worker_write", "RGWDedupWorker's write size to cold pools"); + plb.add_u64(l_rgw_dedup_worker_count, "dedup_worker_count", "Configuration value of RGWDedupWorker count"); + plb.add_u64(l_rgw_dedup_scrub_ratio, "dedup_scrub_ratio", "Configuration value of dedup_scrub_ratio"); + plb.add_u64(l_rgw_dedup_chunk_algo, "dedup_chunk_algo", "Configuration value of chunk_algo"); + plb.add_u64(l_rgw_dedup_chunk_size, "dedup_chunk_size", "Configuration value of chunk_size"); + plb.add_u64(l_rgw_dedup_fp_algo, "dedup_fp_algo", "Configuration value of fp_algo"); + plb.add_u64(l_rgw_dedup_hitset_count, "dedup_hitset_count", "Configuration value of hitset_count"); + plb.add_u64(l_rgw_dedup_hitset_period, "dedup_hitset_period", "Configuration value of hitset_period"); + plb.add_u64(l_rgw_dedup_hitset_target_size, "dedup_hitset_target_size", "Configuration value of hitset_target_size"); + plb.add_u64(l_rgw_dedup_hitset_fpp, "dedup_hitset_fpp", "Configuration value of hitset_target_fpp"); + plb.add_u64_counter(l_rgw_keystone_token_cache_hit, "keystone_token_cache_hit", "Keystone token cache hits"); plb.add_u64_counter(l_rgw_keystone_token_cache_miss, "keystone_token_cache_miss", "Keystone token cache miss"); diff --git a/src/rgw/rgw_perf_counters.h b/src/rgw/rgw_perf_counters.h index 3c4e4e97f023f..29cf386ce3ef6 100644 --- a/src/rgw/rgw_perf_counters.h +++ b/src/rgw/rgw_perf_counters.h @@ -29,6 +29,23 @@ enum { l_rgw_cache_hit, l_rgw_cache_miss, + l_rgw_dedup_original_data_size, + l_rgw_dedup_archived_data_size, + l_rgw_dedup_chunk_data_size, + l_rgw_dedup_deduped_data_size, + l_rgw_dedup_current_worker_mode, + l_rgw_dedup_worker_read, + l_rgw_dedup_worker_write, + l_rgw_dedup_worker_count, + l_rgw_dedup_scrub_ratio, + l_rgw_dedup_chunk_algo, + l_rgw_dedup_chunk_size, + l_rgw_dedup_fp_algo, + l_rgw_dedup_hitset_count, + l_rgw_dedup_hitset_period, + l_rgw_dedup_hitset_target_size, + l_rgw_dedup_hitset_fpp, + l_rgw_keystone_token_cache_hit, l_rgw_keystone_token_cache_miss, diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index 4973ec14080d2..7bd6580847474 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -125,6 +125,7 @@ void RGWRealmReloader::reload() cct->_conf->rgw_run_sync_thread, cct->_conf.get_val("rgw_dynamic_resharding"), true, null_yield, // run notification thread + false, cct->_conf->rgw_cache_enabled); } diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index 042eab0be7257..d959f77ef1303 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -106,6 +106,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* bool run_sync_thread, bool run_reshard_thread, bool run_notification_thread, + bool use_dedup, bool use_cache, bool use_gc, optional_yield y) { @@ -124,6 +125,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* .set_run_sync_thread(run_sync_thread) .set_run_reshard_thread(run_reshard_thread) .set_run_notification_thread(run_notification_thread) + .set_use_dedup(use_dedup) .init_begin(cct, dpp) < 0) { delete driver; return nullptr; diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 3c33a6d51e172..efb967ee55b12 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -1590,6 +1590,7 @@ class DriverManager { bool run_sync_thread, bool run_reshard_thread, bool run_notification_thread, optional_yield y, + bool use_dedup_thread, bool use_cache = true, bool use_gc = true) { rgw::sal::Driver* driver = init_storage_provider(dpp, cct, cfg, use_gc_thread, @@ -1598,6 +1599,7 @@ class DriverManager { run_sync_thread, run_reshard_thread, run_notification_thread, + use_dedup_thread, use_cache, use_gc, y); return driver; } @@ -1617,6 +1619,7 @@ class DriverManager { bool run_sync_thread, bool run_reshard_thread, bool run_notification_thread, + bool use_dedup_thread, bool use_metadata_cache, bool use_gc, optional_yield y); /** Initialize a new raw Driver */ diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 0f99597c21e23..f4db4d0b4a659 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -315,3 +315,14 @@ target_link_libraries(radosgw-cr-test ${rgw_libs} librados OATH::OATH ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES} GTest::GTest) + +# unittest_rgw_dedup +add_executable(ceph_test_rgw_dedup test_rgw_dedup.cc) +target_link_libraries(ceph_test_rgw_dedup + ${rgw_libs} + librados + radostest-cxx + global + test_rgw_a + ${UNITTEST_LIBS} +) diff --git a/src/test/rgw/mock_rgw_dedup_worker.h b/src/test/rgw/mock_rgw_dedup_worker.h new file mode 100644 index 0000000000000..918ec454661c4 --- /dev/null +++ b/src/test/rgw/mock_rgw_dedup_worker.h @@ -0,0 +1,23 @@ + +#include "rgw/rgw_dedup_worker.h" + +class MockDedupWorker : public RGWDedupWorker { +public: + void* entry() override { + return nullptr; + } + + MockDedupWorker(int id) + : RGWDedupWorker(nullptr, nullptr, nullptr, id, nullptr, "", 0, "", 1, IoCtx()) {} + virtual ~MockDedupWorker() {} +}; + +class MockScrubWorker : public RGWChunkScrubWorker { +public: + void* entry() override { + return nullptr; + } + + MockScrubWorker(int id) : RGWChunkScrubWorker(nullptr, nullptr, nullptr, id, IoCtx()) {} + virtual ~MockScrubWorker() {} +}; diff --git a/src/test/rgw/test_rgw_dedup.cc b/src/test/rgw/test_rgw_dedup.cc new file mode 100644 index 0000000000000..e135271ab0067 --- /dev/null +++ b/src/test/rgw/test_rgw_dedup.cc @@ -0,0 +1,675 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "gtest/gtest.h" +#include "test/librados/test_cxx.h" +#include "test/librados/testcase_cxx.h" +#include "test_rgw_common.h" +#include "cls/cas/cls_cas_client.h" + +#include "rgw/rgw_fp_manager.h" +#include "rgw/rgw_dedup_manager.h" +#include "rgw/rgw_dedup_worker.h" +#include "rgw/driver/rados/rgw_sal_rados.h" + +#include "mock_rgw_dedup_worker.h" + +#define dout_subsys ceph_subsys_rgw + +auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT); +const DoutPrefix dpp(cct, 1, "test rgw dedup: "); + +static rgw::sal::RadosStore* store = nullptr; + +class RGWDedupTestWithTwoPools : public RadosTestPP +{ +public: + RGWDedupTestWithTwoPools() {}; + ~RGWDedupTestWithTwoPools() override {}; +protected: + static void SetUpTestCase() { + pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, s_cluster)); + } + static void TearDownTestCase() { + ASSERT_EQ(destroy_one_pool_pp(pool_name, s_cluster), 0); + } + string cold_pool_name; + IoCtx cold_ioctx; + + void SetUp() override { + RadosTestPP::SetUp(); + + cold_pool_name = get_temp_pool_name(); + ASSERT_EQ(s_cluster.pool_create(cold_pool_name.c_str()), 0); + ASSERT_EQ(cluster.ioctx_create(cold_pool_name.c_str(), cold_ioctx), 0); + + cold_ioctx.application_enable("rados", true); + cold_ioctx.set_namespace(nspace); + } + void TearDown() override { + // wait for maps to settle before next test + cluster.wait_for_latest_osdmap(); + + RadosTestPP::TearDown(); + + cleanup_default_namespace(cold_ioctx); + cleanup_namespace(cold_ioctx, nspace); + + cold_ioctx.close(); + ASSERT_EQ(s_cluster.pool_delete(cold_pool_name.c_str()), 0); + } +}; + + +// if there is any redundant chunk, it regards as deduplicated. +void get_chunk_map(const vector>> chunks, + RGWDedupWorker* worker, + const string fp_algorithm, + unordered_map& chunk_map) +{ + for (auto& chunk : chunks) { + string fp = worker->generate_fingerprint(get<0>(chunk), fp_algorithm); + auto it = chunk_map.find(fp); + if (it != chunk_map.end()) { + it->second = ++chunk_map[fp]; + } else { + chunk_map.emplace(fp, 1); + } + } +} + +void read_deduped_data( + RGWDedupWorker* worker, + IoCtx ioctx, + IoCtx cold_ioctx, + const vector>>& chunks, + unordered_map& chunk_map, + const string metadata_oid, + const uint32_t metadata_obj_size, + const string fp_algo, + bufferlist& deduped_data) +{ + if (chunks.size() == chunk_map.size()) { + // a metadata object has been archived + ASSERT_GT(cold_ioctx.read(worker->get_archived_obj_name(ioctx, metadata_oid), + deduped_data, metadata_obj_size, 0), 0); + } else { + // redundant chunks of a metadata object have been deduped + for (const auto& chunk : chunks) { + string fp = worker->generate_fingerprint(get<0>(chunk), fp_algo); + bufferlist tmpbl; + if (chunk_map[fp] == 1) { + // chunk (fp) is not deduped. read from base-pool + ASSERT_GT(ioctx.read(metadata_oid, tmpbl, get<1>(chunk).second, get<1>(chunk).first), 0); + deduped_data.append(tmpbl); + } else { + // chunk (fp) has been deduped. read from cold-pool + ASSERT_GT(cold_ioctx.read(fp, tmpbl, get<1>(chunk).second, 0), 0); + deduped_data.append(tmpbl); + } + } + } +} + +void write_object(string obj_name, bufferlist& data, IoCtx& ioctx) +{ + ObjectWriteOperation wop; + wop.write_full(data); + ASSERT_EQ(ioctx.operate(obj_name, &wop), 0); +} + +void try_dedup_for_all_objs(IoCtx& ioctx, RGWDedupWorker& worker) +{ + ObjectCursor shard_start, shard_end; + ioctx.object_list_slice(ioctx.object_list_begin(), ioctx.object_list_end(), + 0, 1, &shard_start, &shard_end); + ObjectCursor cursor = shard_start; + while (cursor < shard_end) { + vector objs; + ASSERT_GE(ioctx.object_list(cursor, shard_end, 100, {}, &objs, &cursor), 0); + worker.try_object_dedup(ioctx, objs.begin(), objs.end()); + } +} + +// RGWDedupWorker test +TEST_F(RGWDedupTestWithTwoPools, redundant_object_dedup) +{ + uint32_t dedup_threshold = 2; + + shared_ptr fpmanager + = make_shared(dedup_threshold, 16 * 1024, 50); + RGWDedupWorker worker(&dpp, cct, store, 0, fpmanager, "fastcdc", 1024, "sha1", + dedup_threshold, cold_ioctx); + worker.append_base_ioctx(ioctx.get_id(), ioctx); + + // generate data which has redundancy a lot + bufferlist data, tmpbl; + generate_buffer(1024, &tmpbl, clock()); + for (int i = 0; i < 32; ++i) { + data.append(tmpbl); + } + write_object("foo", data, ioctx); + + // redundant chunks will be deduped + try_dedup_for_all_objs(ioctx, worker); + + auto chunks = worker.do_cdc(data, "fastcdc", 1024); + unordered_map chunk_map; + get_chunk_map(chunks, &worker, "sha1", chunk_map); + + map chunk_len_map; + for (const auto& chunk : chunks) { + string fp = worker.generate_fingerprint(get<0>(chunk), "sha1"); + uint64_t chunk_len = get<1>(chunk).second; + chunk_len_map.emplace(fp, chunk_len); + } + + for (const auto& [fp, cnt] : chunk_map) { + bufferlist bl; + int ret = cold_ioctx.read(fp, bl, 32 * 1024, 0); + + // check chunk is deduped and the size of chunk object + if (ret < 0) { + ASSERT_EQ(ret, -2); + } else { + // check chunk object size + ASSERT_EQ(ret, chunk_len_map[fp]); + + // check ref cnt + chunk_refs_t refs; + worker.get_chunk_refs(cold_ioctx, fp, refs); + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + + // need to consider dedup threshold + ASSERT_EQ(chunk_refs->by_object.size(), cnt - dedup_threshold + 1); + } + } +} + +TEST_F(RGWDedupTestWithTwoPools, unique_object_archiving) +{ + uint32_t dedup_threshold = 2; + uint32_t obj_size = 32 * 1024; + + shared_ptr fpmanager + = make_shared(dedup_threshold, 16 * 1024, 50); + RGWDedupWorker worker(&dpp, cct, store, 0, fpmanager, "fastcdc", 1024, + "sha1", dedup_threshold, cold_ioctx); + worker.append_base_ioctx(ioctx.get_id(), ioctx); + + // generate data which has no redundancy + bufferlist data; + generate_buffer(obj_size, &data, clock()); + write_object("foo", data, ioctx); + + // the object will be archived as cold object + try_dedup_for_all_objs(ioctx, worker); + + auto chunks = worker.do_cdc(data, "fastcdc", 1024); + unordered_map chunk_map; + get_chunk_map(chunks, &worker, "sha1", chunk_map); + + bufferlist bl; + int ret = cold_ioctx.read(worker.get_archived_obj_name(ioctx, "foo"), bl, obj_size, 0); + + // check chunk object size + ASSERT_EQ(ret, obj_size); + + // check ref cnt + chunk_refs_t refs; + worker.get_chunk_refs(cold_ioctx, worker.get_archived_obj_name(ioctx, "foo"), refs); + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + + // need to consider dedup threshold + ASSERT_EQ(chunk_refs->by_object.size(), 1); +} + +#include "cls/cas/cls_cas_internal.h" +TEST_F(RGWDedupTestWithTwoPools, data_consistency_test_after_dedup) +{ + vector fp_algos{"sha1", "sha256", "sha512"}; + for (auto fp_algo : fp_algos) { + shared_ptr fpmanager + = make_shared(2, 1024 * 1024, 50); + RGWDedupWorker worker(&dpp, cct, store, 0, fpmanager, "fastcdc", 1024, + fp_algo, 2, cold_ioctx); + worker.append_base_ioctx(ioctx.get_id(), ioctx); + + // Test an object not containing any redundancy (archived as a cold object) + // generate random data + bufferlist og_data; + string rand_oid = "rand-data-" + fp_algo; + generate_buffer(1024 * 16, &og_data, clock()); + write_object(rand_oid, og_data, ioctx); + try_dedup_for_all_objs(ioctx, worker); + + // get chunk map in order to get a sequence of chunks + auto chunks = worker.do_cdc(og_data, "fastcdc", 1024); + unordered_map chunk_map; + get_chunk_map(chunks, &worker, fp_algo, chunk_map); + + // generate checksum of data before dedup + string metadata_obj_checksum = worker.generate_fingerprint(og_data, fp_algo); + ASSERT_NE(metadata_obj_checksum, string()); + + // read data after try_object_dedup + bufferlist chunk_data; + read_deduped_data(&worker, ioctx, cold_ioctx, chunks, chunk_map, rand_oid, + 1024 * 16, fp_algo, chunk_data); + string chunk_obj_checksum = worker.generate_fingerprint(chunk_data, fp_algo); + ASSERT_NE(chunk_obj_checksum, string()); + ASSERT_EQ(metadata_obj_checksum, chunk_obj_checksum); + + // reset variables + og_data.clear(); + chunk_data.clear(); + chunk_map.clear(); + fpmanager->reset_fpmap(); + + // Test an object containing redundant chunk + // generate redundant data + { + bufferlist tmpbl; + generate_buffer(1024, &tmpbl, clock()); + for (int i = 0; i < 16; ++i) { + og_data.append(tmpbl); + } + } + ASSERT_EQ(og_data.length(), 1024 * 16); + + string dup_oid = "dup-data-" + fp_algo; + write_object(dup_oid, og_data, ioctx); + try_dedup_for_all_objs(ioctx, worker); + + // get chunk map in order to get a sequence of chunks + chunks = worker.do_cdc(og_data, "fastcdc", 1024); + get_chunk_map(chunks, &worker, fp_algo, chunk_map); + + + // generate checksum of data before dedup + metadata_obj_checksum = worker.generate_fingerprint(og_data, fp_algo); + ASSERT_NE(metadata_obj_checksum, string()); + + // read data after try_object_dedup + read_deduped_data(&worker, ioctx, cold_ioctx, chunks, chunk_map, + dup_oid, 1024 * 16, fp_algo, chunk_data); + chunk_obj_checksum = worker.generate_fingerprint(chunk_data, fp_algo); + ASSERT_NE(chunk_obj_checksum, string()); + ASSERT_EQ(metadata_obj_checksum, chunk_obj_checksum); + + // clear objects in base-pool and cold-pool + cleanup_default_namespace(ioctx); + cleanup_namespace(ioctx, nspace); + cleanup_default_namespace(cold_ioctx); + cleanup_namespace(cold_ioctx, nspace); + } +} + +TEST_F(RGWDedupTestWithTwoPools, chunk_obj_ref_size) +{ + store = new rgw::sal::RadosStore(); + ASSERT_NE(store, nullptr); + RGWRados* rados = new RGWRados(); + ASSERT_NE(rados, nullptr); + rados->set_context(cct); + rados->init_rados(); + store->setRados(rados); + rados->set_store(store); + + shared_ptr fpmanager + = make_shared(2, 16 * 1024, 50); + RGWDedupWorker worker(&dpp, cct, store, 0, fpmanager, "fastcdc", 1024, + "sha1", 2, cold_ioctx); + worker.append_base_ioctx(ioctx.get_id(), ioctx); + // scale down max_chunk_ref_size not to take too much time + const uint32_t max_chunk_ref_size = 100; + worker.set_max_chunk_ref_size(max_chunk_ref_size); + + // generate duplicated data + bufferlist data, tmpbl; + generate_buffer(1024, &tmpbl); + for (int i = 0; i < 100; ++i) { + data.append(tmpbl); + } + + // create objects which have chunks larger than max_chunk_ref_size + string obj_name = "dup-a-lot-obj-"; + for (int i = 0; i < 2; ++i) { + write_object(obj_name + to_string(i), data, ioctx); + } + + try_dedup_for_all_objs(ioctx, worker); + + ObjectCursor shard_start, shard_end; + cold_ioctx.object_list_slice(cold_ioctx.object_list_begin(), ioctx.object_list_end(), + 0, 1, &shard_start, &shard_end); + ObjectCursor cursor = shard_start; + while (cursor < shard_end) { + vector objs; + ASSERT_GE(cold_ioctx.object_list(cursor, shard_end, 100, {}, &objs, &cursor), 0); + for (const auto& obj : objs) { + chunk_refs_t refs; + ASSERT_EQ(worker.get_chunk_refs(cold_ioctx, obj.oid, refs), 0); + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + ASSERT_LE(chunk_refs->by_object.size(), max_chunk_ref_size); + } + } +} + +TEST_F(RGWDedupTestWithTwoPools, multi_base_pool) +{ + shared_ptr fpmanager + = make_shared(2, 1024, 50); + RGWDedupWorker worker(&dpp, cct, store, 0, fpmanager, "fastcdc", 1024, + "sha1", 2, cold_ioctx); + worker.append_base_ioctx(ioctx.get_id(), ioctx); + worker.prepare(1, 0); + + bufferlist data, tmpbl; + generate_buffer(1024, &tmpbl, clock()); + for (int i = 0; i < 8; ++i) { + data.append(tmpbl); + } + + IoCtx ioctxs[10]; + vector pool_ids; + for (int i = 0; i < 10; ++i) { + string pool_name = get_temp_pool_name(); + ASSERT_EQ(cluster.pool_create(pool_name.c_str()), 0); + ASSERT_EQ(cluster.ioctx_create(pool_name.c_str(), ioctxs[i]), 0); + ioctxs[i].application_enable("rados", true); + + worker.append_base_ioctx(ioctxs[i].get_id(), ioctxs[i]); + string oid = "pool-" + to_string(i) + "-data"; + write_object(oid, data, ioctxs[i]); + pool_ids.emplace_back(ioctxs[i].get_id()); + } + + // try dedup for all objects in base-pools + worker.entry(); + + // get chunk map in order to get a sequence of chunks + auto chunks = worker.do_cdc(data, "fastcdc", 1024); + unordered_map chunk_map; + get_chunk_map(chunks, &worker, "sha1", chunk_map); + + // verify chunk objects' references + ObjectCursor shard_start, shard_end; + cold_ioctx.object_list_slice(cold_ioctx.object_list_begin(), cold_ioctx.object_list_end(), + 0, 1, &shard_start, &shard_end); + ObjectCursor cursor = shard_start; + while (cursor < shard_end) { + vector objs; + ASSERT_GE(cold_ioctx.object_list(cursor, shard_end, 100, {}, &objs, &cursor), 0); + for (const auto& obj: objs) { + chunk_refs_t refs; + ASSERT_EQ(worker.get_chunk_refs(cold_ioctx, obj.oid, refs), 0); + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + + for (auto ref : chunk_refs->by_object) { + EXPECT_NE(find(pool_ids.begin(), pool_ids.end(), ref.pool), pool_ids.end()); + } + } + } + + // clear base-pools + for (int i = 0; i < 10; ++i) { + string pool_name = ioctxs[i].get_pool_name(); + cleanup_default_namespace(ioctxs[i]); + ioctxs[i].close(); + ASSERT_EQ(s_cluster.pool_delete(pool_name.c_str()), 0); + } +} + + +// RGWFPManager test +TEST_F(RGWDedupTestWithTwoPools, fpmap_memory_size) +{ + // limit fpmanager memory size upto 4096 bytes + uint32_t memory_limit = 4096; + RGWFPManager fpmanager(2, memory_limit, 50); + RGWDedupWorker worker(&dpp, cct, store, 0, nullptr, "fastcdc", 1024, "sha1", 2, cold_ioctx); + + vector fp_algos = {"sha1", "sha256", "sha512"}; + for (const auto& fp_algo : fp_algos) { + fpmanager.reset_fpmap(); + bufferlist data; + generate_buffer(8 * 1024, &data, clock()); + + auto dup_chunks = worker.do_cdc(data, "fastcdc", 1024); + uint32_t dup_chunk_cnt = dup_chunks.size(); + + // make current chunks' fp duplicated + for (int i = 0; i < 2; ++i) { + for (const auto& chunk : dup_chunks) { + string fp = worker.generate_fingerprint(get<0>(chunk), fp_algo); + fpmanager.add(fp); + } + } + ASSERT_EQ(fpmanager.get_fpmap_size(), dup_chunk_cnt); + + // add unique objects' chunks + for (int i = 1; i <= 5; ++i) { + data.clear(); + generate_buffer(16 * 1024, &data, clock()); + + auto unique_chunks = worker.do_cdc(data, "fastcdc", 1024); + for (const auto& chunk : unique_chunks) { + string fp = worker.generate_fingerprint(get<0>(chunk), fp_algo); + fpmanager.add(fp); + } + } + + // call once again because add() inserts a fp value + // into fpmap after check_memory_limit_and_do_evict() + fpmanager.check_memory_limit_and_do_evict(); + ASSERT_LE(fpmanager.get_fpmap_memory_size(), memory_limit); + } +} + +TEST_F(RGWDedupTestWithTwoPools, thread_safe_fpmanager) +{ + RGWFPManager fpmanager(2, 10 * 1024 * 1024, 50); + RGWDedupWorker worker(&dpp, cct, store, 0, nullptr, "fastcdc", 1024, "sha1", 2, cold_ioctx); + + // create redundant metadata object + bufferlist data, tmp; + generate_buffer(16 * 1024, &tmp, clock()); + for (int i = 0; i < 4; ++i) { + data.append(tmp); + } + + // get chunks by do_cdc() + auto chunks = worker.do_cdc(data, "fastcdc", 1024); + + // run 10 threads which adds same chunk info simultaneously + int num_workers = 10; + vector threads; + for (int i = 0; i < num_workers; ++i) { + threads.emplace_back(thread( [] + (int id, RGWDedupWorker* worker, + RGWFPManager* fpmanager, + vector>> chunks) { + for (const auto& chunk : chunks) { + string fp = worker->generate_fingerprint(get<0>(chunk), "sha1"); + fpmanager->add(fp); + } + }, i, &worker, &fpmanager, chunks)); + } + + // join + for (auto& t : threads) { + t.join(); + } + + // when all threads are done, check chunk ref count + unordered_map chunk_map; + get_chunk_map(chunks, &worker, "sha1", chunk_map); + for (auto& [fp, cnt] : chunk_map) { + size_t fp_cnt = fpmanager.find(fp); + ASSERT_EQ(fp_cnt, cnt * num_workers); + } +} + + +string get_target_chunk_oid(const map& chunk_ref_cnt_map) +{ + random_device rd; + mt19937 gen(rd()); + uniform_int_distribution dis(0, chunk_ref_cnt_map.size() - 1); + int target_idx = dis(gen); + string target_chunk_oid; + + for (const auto& [fp, cnt] : chunk_ref_cnt_map) { + if (target_idx-- <= 0) { + target_chunk_oid = fp; + break; + } + } + return target_chunk_oid; +} + +// RGWScrubWorker test +TEST_F(RGWDedupTestWithTwoPools, scrub) +{ + store = new rgw::sal::RadosStore(); + ASSERT_NE(store, nullptr); + RGWRados* rados = new RGWRados(); + ASSERT_NE(rados, nullptr); + rados->set_context(cct); + rados->init_rados(); + store->setRados(rados); + rados->set_store(store); + + RGWChunkScrubWorker scrub_worker(&dpp, cct, store, 0, cold_ioctx); + scrub_worker.prepare(1, 0); + scrub_worker.append_base_ioctx(ioctx.get_id(), ioctx); + shared_ptr fpmanager + = make_shared(1, 8 * 1024, 50); + RGWDedupWorker dedup_worker(&dpp, cct, store, 0, fpmanager, "fastcdc", 8 * 1024, "sha1", 1, cold_ioctx); + dedup_worker.append_base_ioctx(ioctx.get_id(), ioctx); + + // create object + bufferlist data; + clock_t curtime = clock(); + generate_buffer(512 * 1024, &data, curtime); + write_object("metadata-obj", data, ioctx); + + // dedup objects + try_dedup_for_all_objs(ioctx, dedup_worker); + + // check the number of chunk objects' references + hobject_t metadata_obj; + map chunk_ref_cnt_map; + ObjectCursor shard_start, shard_end; + cold_ioctx.object_list_slice(cold_ioctx.object_list_begin(), + cold_ioctx.object_list_end(), + 0, 1, &shard_start, &shard_end); + ObjectCursor cursor = shard_start; + while (cursor < shard_end) { + vector objs; + ASSERT_GE(cold_ioctx.object_list(cursor, shard_end, 100, {}, &objs, &cursor), 0); + for (const auto& obj : objs) { + chunk_refs_t refs; + ASSERT_EQ(dedup_worker.get_chunk_refs(cold_ioctx, obj.oid, refs), 0); + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + chunk_ref_cnt_map.emplace(obj.oid, chunk_refs->by_object.size()); + + if (metadata_obj.oid.name.empty()) { + metadata_obj = *(chunk_refs->by_object.begin()); + } + } + } + + // inject not available pool fault into chunk object + string pool_fault_injected_oid = get_target_chunk_oid(chunk_ref_cnt_map); + uint32_t pool_fault_injected_oid_ref_cnt = chunk_ref_cnt_map[pool_fault_injected_oid]; + uint32_t hash; + ASSERT_GE(cold_ioctx.get_object_hash_position2(pool_fault_injected_oid, &hash), 0); + hobject_t invalid_pool_ref(sobject_t("invalid-pool-fault-obj", CEPH_NOSNAP), + "", hash, cold_ioctx.get_id() + 1, ""); + { + ObjectWriteOperation wop; + cls_cas_chunk_get_ref(wop, invalid_pool_ref); + ASSERT_GE(cold_ioctx.operate(pool_fault_injected_oid, &wop), 0); + } + + // inject not available oid fault into chunk object + string oid_fault_injected_oid = get_target_chunk_oid(chunk_ref_cnt_map); + uint32_t oid_fault_injected_oid_ref_cnt = chunk_ref_cnt_map[oid_fault_injected_oid]; + ASSERT_GE(cold_ioctx.get_object_hash_position2(oid_fault_injected_oid, &hash), 0); + hobject_t invalid_oid_ref(sobject_t("invalid-oid-fault-obj", CEPH_NOSNAP), + "", hash, ioctx.get_id(), ""); + { + ObjectWriteOperation wop; + cls_cas_chunk_get_ref(wop, invalid_oid_ref); + ASSERT_GE(cold_ioctx.operate(oid_fault_injected_oid, &wop), 0); + } + + // inject count mismatch fault into chunk object + string dummy_ref_injected_oid = get_target_chunk_oid(chunk_ref_cnt_map); + uint32_t dummy_ref_injected_oid_ref_cnt = chunk_ref_cnt_map[dummy_ref_injected_oid]; + ASSERT_GE(cold_ioctx.get_object_hash_position2(dummy_ref_injected_oid, &hash), 0); + hobject_t dummy_ref(sobject_t("metadata-obj", CEPH_NOSNAP), + "", hash, ioctx.get_id(), ""); + { + ObjectWriteOperation wop; + cls_cas_chunk_get_ref(wop, metadata_obj); + ASSERT_GE(cold_ioctx.operate(dummy_ref_injected_oid, &wop), 0); + } + + // run chunk scrub + scrub_worker.entry(); + + // check reference count + { + chunk_refs_t refs; + ASSERT_EQ(dedup_worker.get_chunk_refs(cold_ioctx, pool_fault_injected_oid, refs), 0); + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + ASSERT_EQ(pool_fault_injected_oid_ref_cnt, chunk_refs->by_object.size()); + } + + { + chunk_refs_t refs; + ASSERT_EQ(dedup_worker.get_chunk_refs(cold_ioctx, oid_fault_injected_oid, refs), 0); + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + ASSERT_EQ(oid_fault_injected_oid_ref_cnt, chunk_refs->by_object.size()); + } + + { + chunk_refs_t refs; + ASSERT_EQ(dedup_worker.get_chunk_refs(cold_ioctx, dummy_ref_injected_oid, refs), 0); + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + ASSERT_EQ(dummy_ref_injected_oid_ref_cnt, chunk_refs->by_object.size()); + } +} + +// RGWDedupManager test +TEST_F(RGWDedupTestWithTwoPools, dedup_scrub_ratio) +{ + RGWDedupManager manager(&dpp, cct, store); + manager.append_dedup_worker(make_unique(0)); + manager.append_scrub_worker(make_unique(0)); + + uint32_t dedup_worked_cnt = 0; + for (uint32_t i = 0; i < DEFAULT_DEDUP_SCRUB_RATIO * 10; ++i) { + if (dedup_worked_cnt < DEFAULT_DEDUP_SCRUB_RATIO) { + manager.run_dedup(dedup_worked_cnt); + ASSERT_LE(dedup_worked_cnt, DEFAULT_DEDUP_SCRUB_RATIO); + } else { + manager.run_scrub(dedup_worked_cnt); + ASSERT_EQ(dedup_worked_cnt, 0); + } + } + sleep(1); +} +