diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index f802b42b07509..e7c990c6adfcd 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3838,4 +3838,90 @@ options: flags: - startup with_legacy: true - +- name: rgw_enable_dedup_threads + type: bool + level: advanced + desc: Enable RGW deduplication + default: false + services: + - rgw + with_legacy: true +- name: rgw_dedup_num_workers + type: uint + level: advanced + desc: Number of workers for deduplication + default: 3 + services: + - rgw + with_legacy: true +- name: rgw_dedup_chunk_algo + type: str + level: advanced + desc: Chunking Algorithm for deduplication + default: fastcdc + services: + - rgw + enum_values: + - fastcdc + - fixed + with_legacy: true +- name: rgw_dedup_chunk_size + type: uint + level: advanced + desc: Chunk size for deduplication + default: 16384 + services: + - rgw + with_legacy: true +- name: rgw_dedup_fp_algo + type: str + level: advanced + desc: Fingerprint Algorithm for deduplication + default: sha1 + services: + - rgw + enum_values: + - sha1 + - sha256 + - sha512 + with_legacy: true +- name: rgw_dedup_threshold + type: uint + level: advanced + desc: Chunk count threshold for deduplication + default: 3 + services: + - rgw + with_legacy: true +- name: rgw_dedup_scrub_ratio + type: uint + level: advanced + desc: Scrub ratio for deduplication + default: 10 + services: + - rgw + with_legacy: true +- name: rgw_dedup_cold_pool_name + type: str + level: advanced + desc: Cold pool name for RGWDedup + default: default-cold-pool + services: + - rgw + with_legacy: true +- name: rgw_dedup_fpmanager_memory_limit + type: uint + level: advanced + desc: FPManager memory limit for deduplication + default: 4294967296 + services: + - rgw + with_legacy: true +- name: rgw_dedup_fpmanager_low_watermark + type: uint + level: advanced + desc: A low-watermark of fpmap in FPManager + default: 50 + services: + - rgw + with_legacy: true diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index c28184f9c9b69..c1384412a35c6 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -2573,6 +2573,11 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail( return cache_result_t::HANDLED_PROXY; case object_manifest_t::TYPE_CHUNKED: { + // in case of metadata handling ops don't need to promote chunk objects + if (op->may_read() && !op->may_read_data() && !op->may_write()) { + return cache_result_t::NOOP; + } + if (can_proxy_chunked_read(op, obc)) { map::iterator p = flush_ops.find(obc->obs.oi.soid); if (p != flush_ops.end()) { diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 79bc05a4df13a..1b72f3416505a 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -148,6 +148,10 @@ set(librgw_common_srcs rgw_bucket_encryption.cc rgw_tracer.cc rgw_lua_background.cc + rgw_dedup.cc + rgw_fp_manager.cc + rgw_dedup_manager.cc + rgw_dedup_worker.cc driver/rados/cls_fifo_legacy.cc driver/rados/rgw_bucket.cc driver/rados/rgw_bucket_sync.cc @@ -261,6 +265,8 @@ target_link_libraries(rgw_common cls_timeindex_client cls_user_client cls_version_client + cls_cas_client + cls_cas_internal librados rt ICU::uc diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index b714cbd566018..a813e64833985 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -65,6 +65,7 @@ #include "rgw_gc.h" #include "rgw_lc.h" +#include "rgw_dedup.h" #include "rgw_object_expirer_core.h" #include "rgw_sync.h" @@ -1070,6 +1071,13 @@ void RGWRados::finalize() if (run_notification_thread) { rgw::notify::shutdown(); } + + if (use_dedup) { + if (dedup.get()) { + dedup->finalize(); + dedup.reset(); + } + } } /** @@ -1197,6 +1205,17 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y) obj_expirer->start_processor(); } + if (use_dedup) { + dedup = std::make_shared(); + if (dedup->initialize(cct, this->driver) < 0) { + ldpp_dout(dpp, 0) << "initialing RGWDedup failed" << dendl; + } else { + dedup->start_dedup_manager(); + } + } else { + ldpp_dout(dpp, 5) << "note: RGWDedup not initialized" << dendl; + } + auto& current_period = svc.zone->get_current_period(); auto& zonegroup = svc.zone->get_zonegroup(); auto& zone_params = svc.zone->get_zone_params(); diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 29e0e70cc4903..acac17a2ae29b 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -58,6 +58,7 @@ struct RGWZoneGroup; struct RGWZoneParams; class RGWReshard; class RGWReshardWait; +class RGWDedup; struct get_obj_data; @@ -369,12 +370,14 @@ class RGWRados RGWGC* gc{nullptr}; RGWLC* lc{nullptr}; RGWObjectExpirer* obj_expirer{nullptr}; + std::shared_ptr dedup; bool use_gc_thread{false}; bool use_lc_thread{false}; bool quota_threads{false}; bool run_sync_thread{false}; bool run_reshard_thread{false}; bool run_notification_thread{false}; + bool use_dedup; RGWMetaNotifier* meta_notifier{nullptr}; RGWDataNotifier* data_notifier{nullptr}; @@ -523,6 +526,11 @@ class RGWRados return *this; } + RGWRados& set_use_dedup(bool _use_dedup) { + use_dedup = _use_dedup; + return *this; + } + librados::IoCtx* get_lc_pool_ctx() { return &lc_pool_ctx; } diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 40373d82cbab4..4185f3aac8fd2 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -243,6 +243,7 @@ int rgw::AppMain::init_storage() run_sync, g_conf().get_val("rgw_dynamic_resharding"), true, null_yield, // run notification thread + g_conf()->rgw_enable_dedup_threads, g_conf()->rgw_cache_enabled); if (!env.driver) { return -EIO; diff --git a/src/rgw/rgw_dedup.cc b/src/rgw/rgw_dedup.cc new file mode 100644 index 0000000000000..eea17136d095a --- /dev/null +++ b/src/rgw/rgw_dedup.cc @@ -0,0 +1,58 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup.h" + +#define dout_subsys ceph_subsys_rgw + +using namespace std; + + +int RGWDedup::initialize(CephContext* _cct, rgw::sal::RadosStore* _store) +{ + cct = _cct; + store = _store; + dedup_manager = make_unique(this, cct, store); + if (dedup_manager->initialize() < 0) { + return -1; + } + return 0; +} + +void RGWDedup::finalize() +{ + stop_dedup_manager(); + dedup_manager.reset(); +} + +void RGWDedup::start_dedup_manager() +{ + assert(dedup_manager.get()); + dedup_manager->set_down_flag(false); + dedup_manager->create("dedup_manager"); +} + +void RGWDedup::stop_dedup_manager() +{ + assert(dedup_manager.get()); + if (!dedup_manager->get_down_flag()) { + dedup_manager->stop(); + dedup_manager->join(); + } + ldpp_dout(this, 2) << "stop RGWDedup" << dendl; +} + +unsigned RGWDedup::get_subsys() const +{ + return dout_subsys; +} + +CephContext* RGWDedup::get_cct() const +{ + return cct; +} + +std::ostream& RGWDedup::gen_prefix(std::ostream& out) const +{ + return out << "RGWDedup: "; +} diff --git a/src/rgw/rgw_dedup.h b/src/rgw/rgw_dedup.h new file mode 100644 index 0000000000000..014eaa311e780 --- /dev/null +++ b/src/rgw/rgw_dedup.h @@ -0,0 +1,43 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_DEDUP_H +#define CEPH_RGW_DEDUP_H + + +#include +#include +#include + +#include "include/types.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "rgw_dedup_manager.h" + +using namespace std; + +class RGWDedup : public DoutPrefixProvider +{ + CephContext* cct; + rgw::sal::RadosStore* store; + unique_ptr dedup_manager; + +public: + RGWDedup() : cct(nullptr), store(nullptr) {} + RGWDedup(const RGWDedup& rhs) = delete; + RGWDedup& operator=(const RGWDedupManager& rhs) = delete; + virtual ~RGWDedup() override {} + + int initialize(CephContext* _cct, rgw::sal::RadosStore* _store); + void finalize(); + + void start_dedup_manager(); + void stop_dedup_manager(); + + CephContext* get_cct() const override; + unsigned get_subsys() const override; + std::ostream& gen_prefix(std::ostream& out) const override; +}; + +#endif + diff --git a/src/rgw/rgw_dedup_manager.cc b/src/rgw/rgw_dedup_manager.cc new file mode 100644 index 0000000000000..0e678ad5a20ff --- /dev/null +++ b/src/rgw/rgw_dedup_manager.cc @@ -0,0 +1,295 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup_manager.h" +#include "rgw_rados.h" +#include "services/svc_zone.h" +#include "include/rados/librados.h" + +#define dout_subsys ceph_subsys_rgw + +const int DEDUP_INTERVAL = 3; +const int MAX_OBJ_SCAN_SIZE = 100; +const int RETRY_SLEEP_PERIOD = 5; +const uint32_t MAX_CHUNK_REF_SIZE = 10000; + +int RGWDedupManager::initialize() +{ + // initialize dedup parameters from rgw.yaml.in + num_workers = cct->_conf->rgw_dedup_num_workers; + ceph_assert(num_workers > 0); + chunk_algo = cct->_conf->rgw_dedup_chunk_algo; + ceph_assert(chunk_algo == "fastcdc" || chunk_algo == "fixed"); + chunk_size = cct->_conf->rgw_dedup_chunk_size; + ceph_assert(chunk_size > 0); + fp_algo = cct->_conf->rgw_dedup_fp_algo; + ceph_assert(fp_algo == "sha1" || fp_algo == "sha256" || fp_algo == "sha512"); + dedup_threshold = cct->_conf->rgw_dedup_threshold; + ceph_assert(dedup_threshold > 0); + dedup_scrub_ratio = cct->_conf->rgw_dedup_scrub_ratio; + ceph_assert(dedup_scrub_ratio > 0); + cold_pool_name = cct->_conf->rgw_dedup_cold_pool_name; + ceph_assert(!cold_pool_name.empty()); + fpmanager_memory_limit = cct->_conf->rgw_dedup_fpmanager_memory_limit; + ceph_assert(fpmanager_memory_limit > 0); + fpmanager_low_watermark = cct->_conf->rgw_dedup_fpmanager_low_watermark; + ceph_assert(fpmanager_low_watermark > 0); + + rados = store->getRados()->get_rados_handle(); + ceph_assert(rados); + + // get cold pool ioctx + IoCtx cold_ioctx; + int ret = rgw_init_ioctx(dpp, rados, rgw_pool(cold_pool_name), cold_ioctx, false, false); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to get pool=" << cold_pool_name << dendl; + return ret; + } + + // initialize components + fpmanager = make_shared( + dedup_threshold, fpmanager_memory_limit, fpmanager_low_watermark); + + for (uint32_t i = 0; i < num_workers; ++i) { + dedup_workers.emplace_back( + make_unique(dpp, cct, store, i, fpmanager, chunk_algo, + chunk_size, fp_algo, dedup_threshold, cold_ioctx)); + scrub_workers.emplace_back( + make_unique(dpp, cct, store, i, cold_ioctx)); + } + return 0; +} + +string RGWDedupManager::create_cmd(const string& prefix, + const vector>& options) +{ + ceph_assert(!prefix.empty()); + string cmd("{\"prefix\": \"" + prefix + "\""); + for (auto& opt : options) { + cmd.append(", \"" + opt.first + "\": \"" + opt.second + "\""); + } + cmd.append("}"); + return cmd; +} + +void RGWDedupManager::update_base_pool_info() +{ + RGWZoneParams zone_params = store->svc()->zone->get_zone_params(); + map placement_pools = zone_params.placement_pools; + for (auto& pp : placement_pools) { + rgw_pool data_pool = pp.second.get_standard_data_pool(); + ldpp_dout(dpp, 20) << " placement: " << pp.first << ", data pool: " + << data_pool << dendl; + append_ioctxs(data_pool.name); + } +} + +template +void RGWDedupManager::prepare_worker(vector& workers, + const int rgwdedup_cnt, + const int cur_rgwdedup_id) +{ + ceph_assert(!workers.empty()); + for (auto& worker : workers) { + ceph_assert(worker.get()); + worker->prepare(rgwdedup_cnt * num_workers, + cur_rgwdedup_id * rgwdedup_cnt + worker->get_id()); + } +} + +template +void RGWDedupManager::run_worker(vector& workers, string tname_prefix) +{ + ceph_assert(!workers.empty()); + for (auto& worker : workers) { + ceph_assert(worker.get()); + worker->create((tname_prefix + to_string(worker->get_id())).c_str()); + } +} + +template +void RGWDedupManager::wait_worker(vector& workers) +{ + ceph_assert(!workers.empty()); + for (auto& worker : workers) { + ceph_assert(worker.get()); + worker->join(); + } +} + +int RGWDedupManager::get_multi_rgwdedup_info(int& num_rgwdedups, int& cur_id) +{ + bufferlist result; + vector> options; + options.emplace_back(make_pair("format", "json")); + string cmd = create_cmd("service dump", options); + + Rados* rados = store->getRados()->get_rados_handle(); + if (rados->mgr_command(cmd, bufferlist(), &result, nullptr) < 0) { + ldpp_dout(dpp, 0) << __func__ << " mgr_command " << cmd << " failed" << dendl; + return -EACCES; + } + + string dump = result.to_str(); + JSONParser service_parser; + if (!service_parser.parse(dump.c_str(), dump.size())) { + return -1; + } + + JSONFormattable f; + try { + decode_json_obj(f, &service_parser); + } catch (JSONDecoder::err& e) { + ldpp_dout(dpp, 2) << __func__ << " Failed to decode JSON object" << dendl; + } + + if (!f.exists("services")) { + return -1; + } + if (!f["services"].exists("rgw")) { + return -1; + } + if (!f["services"]["rgw"].exists("daemons")) { + return -1; + } + + uint64_t rgw_gid = rados->get_instance_id(); + num_rgwdedups = f["services"]["rgw"]["daemons"].object().size(); + int idx = 0; + for (const auto& [k, v] : f["services"]["rgw"]["daemons"].object()) { + if (!v.exists("metadata") || !v["metadata"].exists("pid")) { + --num_rgwdedups; + continue; + } + + if (rgw_gid == std::stoull(k)) { + cur_id = idx; + } + ++idx; + } + + // current RGWDedup not found in Ceph cluster + if (cur_id == num_rgwdedups) { + return -1; + } + return 0; +} + +void* RGWDedupManager::entry() +{ + ldpp_dout(dpp, 2) << "RGWDedupManager started" << dendl; + uint32_t dedup_worked_cnt = 0; + while (!get_down_flag()) { + if (perfcounter) { + perfcounter->set(l_rgw_dedup_worker_count, num_workers); + perfcounter->set(l_rgw_dedup_scrub_ratio, dedup_scrub_ratio); + + if (chunk_algo == "fixed") { + perfcounter->set(l_rgw_dedup_chunk_algo, 1); + } else if (chunk_algo == "fastcdc") { + perfcounter->set(l_rgw_dedup_chunk_algo, 2); + } + perfcounter->set(l_rgw_dedup_chunk_size, chunk_size); + + if (fp_algo == "sha1") { + perfcounter->set(l_rgw_dedup_fp_algo, 1); + } else if (fp_algo == "sha256") { + perfcounter->set(l_rgw_dedup_fp_algo, 2); + } else if (fp_algo == "sha512") { + perfcounter->set(l_rgw_dedup_fp_algo, 3); + } + } + + int num_rgwdedup, cur_rgwdedup_id; + if (get_multi_rgwdedup_info(num_rgwdedup, cur_rgwdedup_id) < 0) { + ldpp_dout(dpp, 2) << "current RGWDedup thread not found yet in Ceph Cluster." + << " Retry a few seconds later." << dendl; + sleep(RETRY_SLEEP_PERIOD); + continue; + } + ldpp_dout(dpp, 10) << "num rgwdedup: " << num_rgwdedup << ", cur rgwdedup id: " + << cur_rgwdedup_id << dendl; + + if (dedup_worked_cnt < dedup_scrub_ratio) { + // do dedup + if (perfcounter) { + perfcounter->set(l_rgw_dedup_current_worker_mode, 1); + } + ceph_assert(fpmanager.get()); + fpmanager->reset_fpmap(); + + update_base_pool_info(); + prepare_worker(dedup_workers, num_rgwdedup, cur_rgwdedup_id); + run_worker(dedup_workers, "DedupWorker_"); + wait_worker(dedup_workers); + ++dedup_worked_cnt; + } else { + // do scrub + if (perfcounter) { + perfcounter->set(l_rgw_dedup_current_worker_mode, 2); + } + + prepare_worker(scrub_workers, num_rgwdedup, cur_rgwdedup_id); + run_worker(scrub_workers, "ScrubWorker_"); + wait_worker(scrub_workers); + dedup_worked_cnt = 0; + } + sleep(DEDUP_INTERVAL); + } + return nullptr; +} + +void RGWDedupManager::stop() +{ + set_down_flag(true); + ldpp_dout(dpp, 2) << "RGWDedupManager is set to be stopped" << dendl; +} + +void RGWDedupManager::finalize() +{ + fpmanager->reset_fpmap(); + fpmanager.reset(); + + for (uint32_t i = 0; i < num_workers; ++i) { + dedup_workers[i]->finalize(); + dedup_workers[i].reset(); + } +} + +int RGWDedupManager::append_ioctxs(rgw_pool base_pool) +{ + ceph_assert(rados); + IoCtx base_ioctx; + int ret = rgw_init_ioctx(dpp, rados, base_pool, base_ioctx, false, false); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to get pool=" << base_pool.name << dendl; + return ret; + } + + for (uint32_t i = 0; i < num_workers; ++i) { + dedup_workers[i]->append_base_ioctx(base_ioctx.get_id(), base_ioctx); + scrub_workers[i]->append_base_ioctx(base_ioctx.get_id(), base_ioctx); + } + return 0; +} + +string RGWDedupManager::create_osd_pool_set_cmd(const string prefix, const string base_pool, + const string var, const string val) +{ + vector> options; + options.emplace_back(make_pair("pool", base_pool)); + options.emplace_back(make_pair("var", var)); + options.emplace_back(make_pair("val", val)); + return create_cmd(prefix, options); +} + +void RGWDedupManager::set_down_flag(bool new_flag) +{ + down_flag = new_flag; +} + +bool RGWDedupManager::get_down_flag() +{ + return down_flag; +} + diff --git a/src/rgw/rgw_dedup_manager.h b/src/rgw/rgw_dedup_manager.h new file mode 100644 index 0000000000000..797bff2e4f080 --- /dev/null +++ b/src/rgw/rgw_dedup_manager.h @@ -0,0 +1,78 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_DEDUP_MANAGER_H +#define CEPH_RGW_DEDUP_MANAGER_H + +#include "include/types.h" +#include "common/Cond.h" +#include "common/Thread.h" +#include "rgw_sal_rados.h" +#include "rgw_perf_counters.h" +#include "rgw_fp_manager.h" +#include "rgw_dedup_worker.h" + +using namespace std; +using namespace librados; + +class RGWFPManager; +class RGWDedupWorker; +class RGWChunkScrubWorker; +class RGWDedupManager : public Thread +{ + const DoutPrefixProvider* dpp; + CephContext* cct; + rgw::sal::RadosStore* store; + bool down_flag; + Rados* rados; + + shared_ptr fpmanager; + vector> dedup_workers; + vector> scrub_workers; + + string cold_pool_name; + string chunk_algo; + string fp_algo; + uint32_t num_workers; + uint32_t chunk_size; + uint32_t dedup_threshold; + uint32_t dedup_scrub_ratio; + uint64_t fpmanager_memory_limit; + uint32_t fpmanager_low_watermark; + +public: + RGWDedupManager(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store) + : dpp(_dpp), cct(_cct), store(_store), down_flag(true) {} + RGWDedupManager() = delete; + RGWDedupManager(const RGWDedupManager& rhs) = delete; + RGWDedupManager& operator=(const RGWDedupManager& rhs) = delete; + virtual ~RGWDedupManager() override {} + virtual void* entry() override; + + void stop(); + int initialize(); + void finalize(); + void set_down_flag(bool new_flag); + bool get_down_flag(); + + // WorkerType: RGWDedupWorker or RGWChunkScrubWorker + template + void prepare_worker(vector& workers, const int num_rgwdedup, + const int rgwdedup_id); + template + void run_worker(vector& workers, string tname_prefix); + template + void wait_worker(vector& workers); + + int append_ioctxs(rgw_pool base_pool); + void update_base_pool_info(); + string create_cmd(const string& prefix, + const vector>& options); + string create_osd_pool_set_cmd(const string prefix, const string base_pool, + const string var, const string val); + int get_multi_rgwdedup_info(int& num_rgwdedups, int& cur_id); +}; + +#endif diff --git a/src/rgw/rgw_dedup_worker.cc b/src/rgw/rgw_dedup_worker.cc new file mode 100644 index 0000000000000..edf00599b9433 --- /dev/null +++ b/src/rgw/rgw_dedup_worker.cc @@ -0,0 +1,541 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_dedup_worker.h" +#include "cls/cas/cls_cas_client.h" + +#define dout_subsys ceph_subsys_rgw + +unsigned default_obj_read_size = 1 << 26; + +int Worker::get_id() +{ + return id; +} + +void Worker::prepare(const int new_total_workers, const int new_gid) +{ + num_total_workers = new_total_workers; + gid = new_gid; +} + +void Worker::clear_base_ioctx_map(uint64_t id, IoCtx& ioctx) +{ + base_ioctx_map.clear(); +} + +void Worker::append_base_ioctx(uint64_t id, IoCtx& ioctx) +{ + base_ioctx_map.emplace(id, ioctx); +} + +int Worker::get_chunk_refs(IoCtx& chunk_ioctx, + const string& chunk_oid, + chunk_refs_t& refs) +{ + bufferlist bl; + if (chunk_ioctx.getxattr(chunk_oid, CHUNK_REFCOUNT_ATTR, bl) < 0) { + // non-chunk objects are not targets of a RGWChunkScrubWorker + ldpp_dout(dpp, 0) << "object " << chunk_oid << " getxattr failed" << dendl; + return -1; + } + auto p = bl.cbegin(); + decode(refs, p); + + if (refs.get_type() != chunk_refs_t::TYPE_BY_OBJECT) { + ldpp_dout(dpp, 0) << "do not allow other types except for TYPE_BY_OBJECT" << dendl; + return -1; + } + return 0; +} + + + +string RGWDedupWorker::get_archived_obj_name(IoCtx& ioctx, const string obj_name) +{ + ceph_assert(ioctx.get_id() > 0); + ceph_assert(!obj_name.empty()); + return to_string(ioctx.get_id()) + ":" + obj_name; +} + +RGWDedupWorker::MetadataObjType RGWDedupWorker::get_metadata_obj_type + (object_info_t& oi, IoCtx& ioctx, const string obj_name, const uint32_t data_len) +{ + bufferlist bl; + ioctx.getxattr(obj_name, "", bl); + try { + bufferlist::const_iterator bliter = bl.begin(); + decode(oi, bliter); + } catch (...) { + ldpp_dout(dpp, 0) << __func__ << " failed to get object_info_t of " + << obj_name << dendl; + return MetadataObjType::None; + } + + if (!oi.has_manifest()) { + return MetadataObjType::New; + } else if (oi.has_manifest() && oi.manifest.is_chunked()) { + const auto& first_entry = oi.manifest.chunk_map.begin(); + if (first_entry->first == 0 && + first_entry->second.length == data_len && + first_entry->second.oid.oid == get_archived_obj_name(ioctx, obj_name)) { + return MetadataObjType::Archived; + } else if (oi.manifest.chunk_map.size() > 1) { + return MetadataObjType::Deduped; + } + } + return MetadataObjType::None; +} + +template +void RGWDedupWorker::try_object_dedup(IoCtx& base_ioctx, Iter begin, Iter end) +{ + for (auto& obj = begin; obj != end; ++obj) { + list redundant_chunks; + auto target_oid = obj->oid; + ldpp_dout(dpp, 20) << "worker_" << id << " oid: " << target_oid << dendl; + + bufferlist data = read_object_data(base_ioctx, target_oid); + if (data.length() == 0) { + ldpp_dout(dpp, 5) << "Skip dedup object " + << target_oid << ", object data size is 0" << dendl; + continue; + } + auto chunks = do_cdc(data, chunk_algo, chunk_size); + + // check if a chunk is duplicated in sampled objects + for(auto &chunk : chunks) { + auto &chunk_data = get<0>(chunk); + string fingerprint = generate_fingerprint(chunk_data, fp_algo); + fpmanager->add(fingerprint); + if (fpmanager->find(fingerprint) >= dedup_threshold) { + std::pair chunk_boundary = std::get<1>(chunk); + chunk_t chunk_info = { + .start = chunk_boundary.first, + .size = chunk_boundary.second, + .fingerprint = fingerprint, + .data = chunk_data + }; + redundant_chunks.push_back(chunk_info); + } + } + + /** + * Object has 3 state, can be transferred to another state. + * - New: Never scanned before, so it must be transferred to Archived or Deduped state. + * - Archived: No duplicated chunk has been found, entire object is evicted to cold pool. + * - Deduped: Deduplication has occurred due to the duplicated chunks. Duplicated chunks + * have been evicted to cold pool + * + * example) + * +-----------+-----------+-----------+-----------+ + * | object | New | Archive | Deduped | + * +-----------+-----------+-----------+-----------+ + * | base pool | 123456789 | --------- | 1-3456-89 | + * +-----------+-----------+-----------+-----------+ + * | cold pool | --------- | 123456789 | -2----7-- | + * +-----------+-----------+-----------+-----------+ + */ + + object_info_t oi; + MetadataObjType meta_obj_type + = get_metadata_obj_type(oi, base_ioctx, target_oid, data.length()); + ldpp_dout(dpp, 20) << "oid: " << oi.soid << " state: " + << static_cast(meta_obj_type) << dendl; + + if (meta_obj_type == MetadataObjType::Archived) { + if (redundant_chunks.empty()) { + ldpp_dout(dpp, 20) << "oid: " << oi.soid << " Archived -> Archived" << dendl; + // no need further operations + continue; + } else { + // Archived -> Deduped + ldpp_dout(dpp, 20) << "oid: " << oi.soid << " Archived -> Deduped" << dendl; + clear_manifest(base_ioctx, target_oid); + } + } else if (meta_obj_type == MetadataObjType::Deduped) { + if (redundant_chunks.empty()) { + // Deduped -> Archived + ldpp_dout(dpp, 20) << "oid: " << oi.soid << " Deduped -> Archived" << dendl; + clear_manifest(base_ioctx, target_oid); + } + } + + if (redundant_chunks.empty()) { + chunk_t archived_object = { + .start = 0, + .size = data.length(), + .fingerprint = get_archived_obj_name(base_ioctx, target_oid), + .data = data + }; + redundant_chunks.push_back(archived_object); + } + + do_chunk_dedup(base_ioctx, cold_ioctx, target_oid, redundant_chunks, + oi.manifest.chunk_map); + do_data_evict(base_ioctx, target_oid); + } +} + +void* RGWDedupWorker::entry() +{ + ldpp_dout(dpp, 20) << "RGWDedupWorker_" << id << " starts" << dendl; + ceph_assert(chunk_algo == "fixed" || chunk_algo == "fastcdc"); + ceph_assert(chunk_size > 0); + ceph_assert(fp_algo == "sha1" || fp_algo == "sha256" || fp_algo == "sha512"); + ceph_assert(dedup_threshold > 0); + + for (auto& iter : base_ioctx_map) { + uint32_t num_objs = 0; + IoCtx base_ioctx = iter.second; + ObjectCursor pool_begin = base_ioctx.object_list_begin(); + ObjectCursor pool_end = base_ioctx.object_list_end(); + ObjectCursor shard_begin, shard_end; + + // get current worker's shard range of the base pool + base_ioctx.object_list_slice(pool_begin, pool_end, gid, num_total_workers, + &shard_begin, &shard_end); + ldpp_dout(dpp, 20) << "gid/# total workers: " << gid << "/" << num_total_workers + << ", id: " << id << ", scan dir: " << obj_scan_dir << dendl; + + ObjectCursor obj_cursor = shard_begin; + while (obj_cursor < shard_end) { + vector obj_shard; + int ret = base_ioctx.object_list(obj_cursor, shard_end, MAX_OBJ_SCAN_SIZE, + {}, &obj_shard, &obj_cursor); + if (ret < 0) { + ldpp_dout(dpp, 0) << "error object_list: " << cpp_strerror(ret) << dendl; + return nullptr; + } + num_objs += obj_shard.size(); + + if (obj_scan_dir) { + try_object_dedup(base_ioctx, obj_shard.begin(), obj_shard.end()); + } else { + try_object_dedup(base_ioctx, obj_shard.rbegin(), obj_shard.rend()); + } + } + ldpp_dout(dpp, 20) << "RGWDedupWorker_" << id << " pool: " << base_ioctx.get_pool_name() + << ", num objs: " << num_objs << dendl; + } + + // reverse object scanning direction + obj_scan_dir ^= 1; + return nullptr; +} + +bufferlist RGWDedupWorker::read_object_data(IoCtx& ioctx, string object_name) +{ + bufferlist whole_data; + uint64_t offset = 0; + int ret = -1; + + while (ret != 0) { + bufferlist partial_data; + ret = ioctx.read(object_name, partial_data, default_obj_read_size, offset); + if (ret < 0) { + ldpp_dout(dpp, 1) << "read object error pool_name: " << ioctx.get_pool_name() + << ", object_name: " << object_name << ", offset: " << offset + << ", size: " << default_obj_read_size << ", error:" << cpp_strerror(ret) + << dendl; + return bufferlist(); + } + offset += ret; + whole_data.claim_append(partial_data); + } + + if (perfcounter) { + perfcounter->inc(l_rgw_dedup_worker_read, whole_data.length()); + } + return whole_data; +} + +int RGWDedupWorker::write_object_data(IoCtx &ioctx, string object_name, bufferlist &data) +{ + ObjectWriteOperation write_op; + write_op.write_full(data); + int ret = ioctx.operate(object_name, &write_op); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to write rados object, pool_name: "<< ioctx.get_pool_name() + << ", object_name: " << object_name << ", ret: " << ret << dendl; + } + + if (perfcounter) { + perfcounter->inc(l_rgw_dedup_worker_write, data.length()); + } + return ret; +} + +int RGWDedupWorker::check_object_exists(IoCtx& ioctx, string object_name) +{ + uint64_t size; + time_t mtime; + return ioctx.stat(object_name, &size, &mtime); +} + +int RGWDedupWorker::try_set_chunk(IoCtx& ioctx, IoCtx& cold_ioctx, + string object_name, chunk_t &chunk) +{ + ObjectReadOperation chunk_op; + chunk_op.set_chunk( + chunk.start, + chunk.size, + cold_ioctx, + chunk.fingerprint, + 0, + CEPH_OSD_OP_FLAG_WITH_REFERENCE); + return ioctx.operate(object_name, &chunk_op, nullptr); +} + +void RGWDedupWorker::do_chunk_dedup(IoCtx& ioctx, IoCtx& cold_ioctx, + string object_name, + list redundant_chunks, + map& chunk_map) +{ + for (auto chunk : redundant_chunks) { + if (check_object_exists(cold_ioctx, chunk.fingerprint) < 0) { + int ret = write_object_data(cold_ioctx, chunk.fingerprint, chunk.data); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to write chunk to cold pool, cold_pool_name: " + << cold_ioctx.get_pool_name() + << ", fingerprint: " << chunk.fingerprint + << ", ret: " << ret << dendl; + return; + } + if (perfcounter) { + perfcounter->inc(l_rgw_dedup_chunk_data_size, chunk.data.length()); + } + } else { + if (chunk_map.find(chunk.start) != chunk_map.end() && + chunk_map[chunk.start].length == chunk.size && + chunk_map[chunk.start].oid.oid == chunk.fingerprint && + chunk_map[chunk.start].has_reference()) { + // this chunk has already been deduped. skip this chunk + ldpp_dout(dpp, 0) << chunk.fingerprint << " deduped -> deduped" << dendl; + continue; + } + + chunk_refs_t refs; + if (get_chunk_refs(cold_ioctx, chunk.fingerprint, refs)) { + continue; + } + chunk_refs_by_object_t* chunk_refs + = static_cast(refs.r.get()); + + // # refs of chunk object must be less than max_chunk_ref_size + if (chunk_refs->by_object.size() >= max_chunk_ref_size) { + continue; + } + } + + if (try_set_chunk(ioctx, cold_ioctx, object_name, chunk) >= 0 && perfcounter) { + perfcounter->inc(l_rgw_dedup_deduped_data_size, chunk.data.length()); + } + } +} + +void RGWDedupWorker::do_data_evict(IoCtx &ioctx, string object_name) +{ + ObjectReadOperation tier_op; + tier_op.tier_evict(); + int ret = ioctx.operate(object_name, &tier_op, nullptr); + if (ret < 0) { + ldpp_dout(dpp, 1) << "Failed to tier_evict rados object, pool_name: " + << ioctx.get_pool_name() << ", object_name: " + << object_name << ", ret: " << ret << dendl; + } +} + +int RGWDedupWorker::clear_manifest(IoCtx &ioctx, string object_name) +{ + ObjectWriteOperation promote_op; + promote_op.tier_promote(); + int ret = ioctx.operate(object_name, &promote_op); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to tier promote rados object, pool_name: " << ioctx.get_pool_name() + << ", object_name: " << object_name << ", ret: " << ret << dendl; + return ret; + } + + ObjectWriteOperation unset_op; + unset_op.unset_manifest(); + ret = ioctx.operate(object_name, &unset_op); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to unset_manifest rados object, pool_name: " << ioctx.get_pool_name() + << ", object_name: " << object_name << ", ret: " << ret << dendl; + } + return ret; +} + +int RGWDedupWorker::remove_object(IoCtx &ioctx, string object_name) +{ + int ret = ioctx.remove(object_name); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "Failed to remove entire object in pool, pool_name: " + << ioctx.get_pool_name() << ", object_name: " + << object_name << ", ret: " << ret << dendl; + } + return ret; +} + +vector RGWDedupWorker::do_cdc(bufferlist &data, string chunk_algo, + uint32_t chunk_size) +{ + vector ret; + unique_ptr cdc = CDC::create(chunk_algo, cbits(chunk_size) - 1); + vector> chunks; + cdc->calc_chunks(data, &chunks); + + for (auto &p : chunks) { + bufferlist chunk; + chunk.substr_of(data, p.first, p.second); + ret.push_back(make_tuple(chunk, p)); + } + return ret; +} + +string RGWDedupWorker::generate_fingerprint(bufferlist chunk_data, string fp_algo) +{ + switch (pg_pool_t::get_fingerprint_from_str(fp_algo)) { + case pg_pool_t::TYPE_FINGERPRINT_SHA1: + return crypto::digest(chunk_data).to_str(); + + case pg_pool_t::TYPE_FINGERPRINT_SHA256: + return crypto::digest(chunk_data).to_str(); + + case pg_pool_t::TYPE_FINGERPRINT_SHA512: + return crypto::digest(chunk_data).to_str(); + + default: + ceph_assert(0 == "Invalid fp_algo type"); + break; + } + return string(); +} + +void RGWDedupWorker::finalize() +{ + fpmanager.reset(); +} + +int RGWChunkScrubWorker::do_chunk_repair(IoCtx& cold_ioctx, const string chunk_obj_name, + const hobject_t src_obj, int chunk_ref_cnt, + int source_ref_cnt) +{ + ceph_assert(chunk_ref_cnt >= source_ref_cnt); + + while (chunk_ref_cnt != source_ref_cnt) { + ObjectWriteOperation op; + cls_cas_chunk_put_ref(op, src_obj); + --chunk_ref_cnt; + int ret = cold_ioctx.operate(chunk_obj_name, &op); + if (ret < 0) { + return ret; + } + } + return 0; +} + +int RGWChunkScrubWorker::get_src_ref_cnt(const hobject_t& src_obj, + const string& chunk_oid) +{ + IoCtx src_ioctx; + int src_ref_cnt = -1; + if (base_ioctx_map.find(src_obj.pool) != base_ioctx_map.end()) { + src_ioctx = base_ioctx_map[src_obj.pool]; + } else { + // if base pool not found, try create ioctx + Rados* rados = store->getRados()->get_rados_handle(); + int ret = rados->ioctx_create2(src_obj.pool, src_ioctx); + if (ret < 0) { + ldpp_dout(dpp, 1) << __func__ << " src pool " << src_obj.pool + << " does not exist" << dendl; + return ret; + } + } + + // get reference count that src object is pointing a chunk object + src_ref_cnt = cls_cas_references_chunk(src_ioctx, src_obj.oid.name, chunk_oid); + if (src_ref_cnt < 0) { + if (src_ref_cnt == -ENOENT || src_ref_cnt == -EINVAL) { + ldpp_dout(dpp, 2) << "chunk (" << chunk_oid << ") is referencing " << src_obj + << ": referencing object missing" << dendl; + } else if (src_ref_cnt == -ENOLINK) { + ldpp_dout(dpp, 2) << "chunk (" << chunk_oid << ") is referencing " << src_obj + << ": referencing object does not reference this chunk" << dendl; + } else { + ldpp_dout(dpp, 0) << "cls_cas_references_chunk() fail: " + << strerror(src_ref_cnt) << dendl; + } + src_ref_cnt = 0; + } + return src_ref_cnt; +} + +/* + * - chunk object: A part of the source object that is created by doing deup + * operation on it. It has a reference list containing its' source objects. + * - source object: An original object of its' chunk objects. It has its chunk + * information in a chunk_map. + */ +void* RGWChunkScrubWorker::entry() +{ + ldpp_dout(dpp, 20) << "RGWChunkScrubWorker_" << id << " starts" << dendl; + + ObjectCursor shard_begin, shard_end; + // get current worker's shard range + cold_ioctx.object_list_slice(cold_ioctx.object_list_begin(), + cold_ioctx.object_list_end(), + id, num_total_workers, &shard_begin, &shard_end); + ObjectCursor obj_cursor = shard_begin; + uint32_t num_objs = 0; + while (obj_cursor < shard_end) { + vector obj_shard; + if (cold_ioctx.object_list(obj_cursor, shard_end, MAX_OBJ_SCAN_SIZE, {}, + &obj_shard, &obj_cursor) < 0) { + ldpp_dout(dpp, 0) << "error object_list" << dendl; + return nullptr; + } + + for (const auto& obj : obj_shard) { + auto cold_oid = obj.oid; + chunk_refs_t refs; + if (get_chunk_refs(cold_ioctx, cold_oid, refs) < 0) { + continue; + } + + chunk_refs_by_object_t* chunk_refs = + static_cast(refs.r.get()); + set src_obj_set(chunk_refs->by_object.begin(), + chunk_refs->by_object.end()); + for (auto& src_obj : src_obj_set) { + // get reference count that chunk object is pointing a src object + int chunk_ref_cnt = chunk_refs->by_object.count(src_obj); + int src_ref_cnt = get_src_ref_cnt(src_obj, cold_oid); + + ldpp_dout(dpp, 10) << "ScrubWorker_" << id << " chunk obj: " << cold_oid + << ", src obj: " << src_obj.oid.name << ", src pool: " << src_obj.pool + << ", chunk_ref_cnt: " << chunk_ref_cnt << ", src_ref_cnt: " << src_ref_cnt + << dendl; + + if (chunk_ref_cnt != src_ref_cnt) { + if (do_chunk_repair(cold_ioctx, cold_oid, src_obj, + chunk_ref_cnt, src_ref_cnt) < 0) { + ldpp_dout(dpp, 0) << "do_chunk_repair fail" << dendl; + continue; + } + } + } + } + num_objs += obj_shard.size(); + } + ldpp_dout(dpp, 20) << "RGWChunkScrubWorker_" << id << " pool: " + << cold_ioctx.get_pool_name() << ", num objs: " << num_objs << dendl; + return nullptr; +} + diff --git a/src/rgw/rgw_dedup_worker.h b/src/rgw/rgw_dedup_worker.h new file mode 100644 index 0000000000000..b4c5698856e66 --- /dev/null +++ b/src/rgw/rgw_dedup_worker.h @@ -0,0 +1,165 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_DEDUP_WORKER_H +#define CEPH_RGW_DEDUP_WORKER_H + +#include "cls/cas/cls_cas_internal.h" +#include "include/rados/librados.hpp" +#include "rgw_perf_counters.h" +#include "rgw_fp_manager.h" +#include "rgw_dedup_manager.h" +#include "common/CDC.h" + +extern const int MAX_OBJ_SCAN_SIZE; +extern const uint32_t MAX_CHUNK_REF_SIZE; + +extern const int MAX_OBJ_SCAN_SIZE; + +using namespace std; +using namespace librados; + +class RGWFPManager; +class Worker : public Thread +{ +protected: + const DoutPrefixProvider* dpp; + CephContext* cct; + rgw::sal::RadosStore* store; + + // local worker id of a RGWDedup + int id = -1; + // # workers throughout total RGWDedups (# RGWDedup * # workers) + int num_total_workers = 0; + // global worker id throughout total RGWDedups + int gid = -1; + map base_ioctx_map; + IoCtx cold_ioctx; + + enum class MetadataObjType : int { + New, + Archived, + Deduped, + None + }; + +public: + Worker(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id, + IoCtx _cold_ioctx) + : dpp(_dpp), cct(_cct), store(_store), id(_id), + cold_ioctx(_cold_ioctx) {} + Worker() = delete; + Worker(const Worker& rhs) = delete; + Worker& operator=(const Worker& rhs) = delete; + virtual ~Worker() {} + + virtual void* entry() = 0; + + int get_id(); + void prepare(const int new_total_workers, const int new_gid); + void clear_base_ioctx_map(uint64_t id, IoCtx& ioctx); + void append_base_ioctx(uint64_t name, IoCtx& ioctx); + + // get references of chunk object + int get_chunk_refs(IoCtx& chunk_ioctx, const string& chunk_oid, chunk_refs_t& refs); +}; + +// > +using ChunkInfoType = tuple>; +class RGWDedupWorker : public Worker +{ + bool obj_scan_dir; // true: scan obj forward, false: scan object reverse + shared_ptr fpmanager; + string chunk_algo; + uint32_t chunk_size; + string fp_algo; + uint32_t dedup_threshold; + uint32_t max_chunk_ref_size; + +public: + RGWDedupWorker(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id, + shared_ptr _fpmanager, + string _chunk_algo, + uint32_t _chunk_size, + string _fp_algo, + uint32_t _dedup_threshold, + IoCtx _cold_ioctx) + : Worker(_dpp, _cct, _store, _id, _cold_ioctx), + obj_scan_dir(true), + fpmanager(_fpmanager), + chunk_algo(_chunk_algo), + chunk_size(_chunk_size), + fp_algo(_fp_algo), + dedup_threshold(_dedup_threshold), + max_chunk_ref_size(MAX_CHUNK_REF_SIZE) {} + RGWDedupWorker() = delete; + RGWDedupWorker(const RGWDedupWorker& rhs) = delete; + RGWDedupWorker& operator=(const RGWDedupWorker& rhs) = delete; + virtual ~RGWDedupWorker() override {} + + struct chunk_t { + size_t start = 0; + size_t size = 0; + string fingerprint = ""; + bufferlist data; + }; + + virtual void* entry() override; + void finalize(); + + template + void try_object_dedup(IoCtx& base_ioctx, Iter begin, Iter end); + bufferlist read_object_data(IoCtx& ioctx, string object_name); + int write_object_data(IoCtx& ioctx, string object_name, bufferlist& data); + int check_object_exists(IoCtx& ioctx, string object_name); + MetadataObjType get_metadata_obj_type(object_info_t& oi, + IoCtx& ioctx, + const string obj_name, + const uint32_t data_len); + void do_chunk_dedup(IoCtx& ioctx, IoCtx& cold_ioctx, string object_name, + list redundant_chunks, + map& chunk_map); + void do_data_evict(IoCtx& ioctx, string object_name); + int remove_object(IoCtx &ioctx, string object_name); + + int try_set_chunk(IoCtx& ioctx, IoCtx& cold_ioctx, string object_name, + chunk_t& chunk); + int clear_manifest(IoCtx& ioctx, string object_name); + vector do_cdc(bufferlist& data, string chunk_algo, + uint32_t chunk_size); + string generate_fingerprint(bufferlist chunk_data, string fp_algo); + string get_archived_obj_name(IoCtx& ioctx, const string obj_name); +}; + +class RGWChunkScrubWorker : public Worker +{ +public: + RGWChunkScrubWorker(const DoutPrefixProvider* _dpp, + CephContext* _cct, + rgw::sal::RadosStore* _store, + int _id, + IoCtx _cold_ioctx) + : Worker(_dpp, _cct, _store, _id, _cold_ioctx) {} + RGWChunkScrubWorker() = delete; + RGWChunkScrubWorker(const RGWChunkScrubWorker& rhs) = delete; + RGWChunkScrubWorker& operator=(const RGWChunkScrubWorker& rhs) = delete; + virtual ~RGWChunkScrubWorker() override {} + + virtual void* entry() override; + + // fix mismatched chunk reference + int do_chunk_repair(IoCtx& cold_ioctx, const string chunk_obj_name, + const hobject_t src_obj, int chunk_ref_cnt, + int src_ref_cnt); + + // check whether dedup reference is mismatched (false is mismatched) + int get_src_ref_cnt(const hobject_t& src_obj, const string& chunk_oid); +}; + +#endif diff --git a/src/rgw/rgw_fp_manager.cc b/src/rgw/rgw_fp_manager.cc new file mode 100644 index 0000000000000..4dd7e9988d49c --- /dev/null +++ b/src/rgw/rgw_fp_manager.cc @@ -0,0 +1,76 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include "rgw_fp_manager.h" + +#define dout_subsys ceph_subsys_rgw + +void RGWFPManager::reset_fpmap() +{ + fp_map.clear(); +} + +void RGWFPManager::set_low_watermark(const uint32_t new_low_wm) +{ + ceph_assert(new_low_wm <= 100); + low_watermark = new_low_wm; +} + +size_t RGWFPManager::find(string& fingerprint) +{ + shared_lock lock(fingerprint_lock); + auto found_item = fp_map.find(fingerprint); + if ( found_item != fp_map.end() ) { + return found_item->second; + } + return 0; +} + +uint32_t RGWFPManager::get_fpmap_memory_size() +{ + if (fp_map.empty()) { + return 0; + } + return fp_map.size() * + (fp_map.begin()->first.length() + sizeof(fp_map.begin()->second)); +} + +size_t RGWFPManager::get_fpmap_size() +{ + return fp_map.size(); +} + +void RGWFPManager::check_memory_limit_and_do_evict() +{ + ceph_assert(memory_limit > 0); + if (get_fpmap_memory_size() > memory_limit) { + uint32_t current_dedup_threshold = dedup_threshold; + uint32_t target_fpmap_size = memory_limit * low_watermark / 100; + auto iter = fp_map.begin(); + while (get_fpmap_memory_size() > target_fpmap_size) { + if (iter == fp_map.end()) { + iter = fp_map.begin(); + ++current_dedup_threshold; + } + + if (iter->second < current_dedup_threshold) { + iter = fp_map.erase(iter); + } else { + ++iter; + } + } + } +} + +void RGWFPManager::add(string& fingerprint) +{ + unique_lock lock(fingerprint_lock); + auto found_iter = fp_map.find(fingerprint); + if (found_iter == fp_map.end()) { + check_memory_limit_and_do_evict(); + fp_map.insert({fingerprint, 1}); + } else { + ++found_iter->second; + } +} diff --git a/src/rgw/rgw_fp_manager.h b/src/rgw/rgw_fp_manager.h new file mode 100644 index 0000000000000..a34ae07dd9add --- /dev/null +++ b/src/rgw/rgw_fp_manager.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_FP_MANAGER_H +#define CEPH_RGW_FP_MANAGER_H + +#include "include/types.h" + +using namespace std; + +class RGWFPManager +{ +public: + RGWFPManager(uint32_t _dedup_threshold, uint64_t _memory_limit, + uint32_t _low_watermark) + : dedup_threshold(_dedup_threshold), + memory_limit(_memory_limit), + low_watermark(_low_watermark) {} + RGWFPManager() = delete; + RGWFPManager(const RGWFPManager& rhs) = delete; + RGWFPManager& operator=(const RGWFPManager& rhs) = delete; + virtual ~RGWFPManager() {} + + void reset_fpmap(); + size_t find(string& fingerprint); + void add(string& fingerprint); + void check_memory_limit_and_do_evict(); + void set_low_watermark(const uint32_t new_low_wm); + uint32_t get_fpmap_memory_size(); + size_t get_fpmap_size(); + +private: + std::shared_mutex fingerprint_lock; + uint32_t dedup_threshold; + uint64_t memory_limit; + uint32_t low_watermark; + unordered_map fp_map; +}; + +#endif diff --git a/src/rgw/rgw_object_expirer.cc b/src/rgw/rgw_object_expirer.cc index 7a49fc8d161ef..0a92dc3b13d9b 100644 --- a/src/rgw/rgw_object_expirer.cc +++ b/src/rgw/rgw_object_expirer.cc @@ -83,7 +83,7 @@ int main(const int argc, const char **argv) DriverManager::Config cfg; cfg.store_name = "rados"; cfg.filter_name = "none"; - driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, false, false, false, false, false, false, null_yield); + driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, false, false, false, false, false, false, null_yield, false); if (!driver) { std::cerr << "couldn't init storage provider" << std::endl; return EIO; diff --git a/src/rgw/rgw_perf_counters.cc b/src/rgw/rgw_perf_counters.cc index fd058ab00a9f2..bb9c164e4e492 100644 --- a/src/rgw/rgw_perf_counters.cc +++ b/src/rgw/rgw_perf_counters.cc @@ -31,6 +31,23 @@ int rgw_perf_start(CephContext *cct) plb.add_u64_counter(l_rgw_cache_hit, "cache_hit", "Cache hits"); plb.add_u64_counter(l_rgw_cache_miss, "cache_miss", "Cache miss"); + plb.add_u64(l_rgw_dedup_original_data_size, "dedup_original_data_size", "Total RGW object size before deduplication"); + plb.add_u64(l_rgw_dedup_archived_data_size, "dedup_archived_data_size", "Total archived object size during deduplication"); + plb.add_u64(l_rgw_dedup_chunk_data_size, "dedup_chunk_data_size", "Total chunk object size during deduplication"); + plb.add_u64(l_rgw_dedup_deduped_data_size, "dedup_deduped_data_size", "Current deduped data size during deduplication"); + plb.add_u64(l_rgw_dedup_current_worker_mode, "dedup_current_worker_mode", "Current worker mode(dedup | scrub)"); + plb.add_u64_counter(l_rgw_dedup_worker_read, "dedup_worker_read", "RGWDedupWorker's read size to base pools"); + plb.add_u64_counter(l_rgw_dedup_worker_write, "dedup_worker_write", "RGWDedupWorker's write size to cold pools"); + plb.add_u64(l_rgw_dedup_worker_count, "dedup_worker_count", "Configuration value of RGWDedupWorker count"); + plb.add_u64(l_rgw_dedup_scrub_ratio, "dedup_scrub_ratio", "Configuration value of dedup_scrub_ratio"); + plb.add_u64(l_rgw_dedup_chunk_algo, "dedup_chunk_algo", "Configuration value of chunk_algo"); + plb.add_u64(l_rgw_dedup_chunk_size, "dedup_chunk_size", "Configuration value of chunk_size"); + plb.add_u64(l_rgw_dedup_fp_algo, "dedup_fp_algo", "Configuration value of fp_algo"); + plb.add_u64(l_rgw_dedup_hitset_count, "dedup_hitset_count", "Configuration value of hitset_count"); + plb.add_u64(l_rgw_dedup_hitset_period, "dedup_hitset_period", "Configuration value of hitset_period"); + plb.add_u64(l_rgw_dedup_hitset_target_size, "dedup_hitset_target_size", "Configuration value of hitset_target_size"); + plb.add_u64(l_rgw_dedup_hitset_fpp, "dedup_hitset_fpp", "Configuration value of hitset_target_fpp"); + plb.add_u64_counter(l_rgw_keystone_token_cache_hit, "keystone_token_cache_hit", "Keystone token cache hits"); plb.add_u64_counter(l_rgw_keystone_token_cache_miss, "keystone_token_cache_miss", "Keystone token cache miss"); diff --git a/src/rgw/rgw_perf_counters.h b/src/rgw/rgw_perf_counters.h index 3c4e4e97f023f..29cf386ce3ef6 100644 --- a/src/rgw/rgw_perf_counters.h +++ b/src/rgw/rgw_perf_counters.h @@ -29,6 +29,23 @@ enum { l_rgw_cache_hit, l_rgw_cache_miss, + l_rgw_dedup_original_data_size, + l_rgw_dedup_archived_data_size, + l_rgw_dedup_chunk_data_size, + l_rgw_dedup_deduped_data_size, + l_rgw_dedup_current_worker_mode, + l_rgw_dedup_worker_read, + l_rgw_dedup_worker_write, + l_rgw_dedup_worker_count, + l_rgw_dedup_scrub_ratio, + l_rgw_dedup_chunk_algo, + l_rgw_dedup_chunk_size, + l_rgw_dedup_fp_algo, + l_rgw_dedup_hitset_count, + l_rgw_dedup_hitset_period, + l_rgw_dedup_hitset_target_size, + l_rgw_dedup_hitset_fpp, + l_rgw_keystone_token_cache_hit, l_rgw_keystone_token_cache_miss, diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index 4973ec14080d2..7bd6580847474 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -125,6 +125,7 @@ void RGWRealmReloader::reload() cct->_conf->rgw_run_sync_thread, cct->_conf.get_val("rgw_dynamic_resharding"), true, null_yield, // run notification thread + false, cct->_conf->rgw_cache_enabled); } diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index 042eab0be7257..d959f77ef1303 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -106,6 +106,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* bool run_sync_thread, bool run_reshard_thread, bool run_notification_thread, + bool use_dedup, bool use_cache, bool use_gc, optional_yield y) { @@ -124,6 +125,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* .set_run_sync_thread(run_sync_thread) .set_run_reshard_thread(run_reshard_thread) .set_run_notification_thread(run_notification_thread) + .set_use_dedup(use_dedup) .init_begin(cct, dpp) < 0) { delete driver; return nullptr; diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 3c33a6d51e172..efb967ee55b12 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -1590,6 +1590,7 @@ class DriverManager { bool run_sync_thread, bool run_reshard_thread, bool run_notification_thread, optional_yield y, + bool use_dedup_thread, bool use_cache = true, bool use_gc = true) { rgw::sal::Driver* driver = init_storage_provider(dpp, cct, cfg, use_gc_thread, @@ -1598,6 +1599,7 @@ class DriverManager { run_sync_thread, run_reshard_thread, run_notification_thread, + use_dedup_thread, use_cache, use_gc, y); return driver; } @@ -1617,6 +1619,7 @@ class DriverManager { bool run_sync_thread, bool run_reshard_thread, bool run_notification_thread, + bool use_dedup_thread, bool use_metadata_cache, bool use_gc, optional_yield y); /** Initialize a new raw Driver */ diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 0f99597c21e23..82f7f04054f9b 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -315,3 +315,4 @@ target_link_libraries(radosgw-cr-test ${rgw_libs} librados OATH::OATH ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES} GTest::GTest) + diff --git a/src/test/rgw/rgw_dedup_integration_test.py b/src/test/rgw/rgw_dedup_integration_test.py new file mode 100644 index 0000000000000..6dcc55ee3ef61 --- /dev/null +++ b/src/test/rgw/rgw_dedup_integration_test.py @@ -0,0 +1,1744 @@ +import os +import random +import sys +import time +import json +import threading +from subprocess import Popen, PIPE +from threading import Thread + + +VERBOSE = False +BASE_POOL = "default.rgw.buckets.data" +COLD_POOL = "default-cold-pool" +DOWNLOAD_POSTFIX = "_download" +DEFAULT_CHUNK_SIZE = 16 * 1024 * 4 * 2 +MULTIPART_CHUNK_SIZE = 15 * 1024 * 1024 +CEPH_RADOS_OBJECT_SIZE = 4 * 1024 * 1024 +MAX_CHUNK_REF_SIZE = 10000 +RGW_INSTANCE_NUM = 3 +DEDUP_WORKER_THREAD_NUM = 3 +THREAD_EXIT_SIGNAL = False +THREAD_SUCCESS = True +TEST_SUCCESS = True + +IMAGE_NAME = "10.10.40.35:5000/testimage" + +BOOTSTRAP_NODE_IP = "10.10.40.35" +BOOTSTRAP_NODE_PW = "" + +OSD_NODE_1_IP = "10.10.40.40" +OSD_NODE_1_PW = "" +OSD_NODE_1_DISK_1 = "/dev/nvme21n1" +OSD_NODE_1_DISK_2 = "/dev/nvme22n1" +OSD_NODE_1_DISK_3 = "/dev/nvme23n1" + +OSD_NODE_2_IP = "10.10.40.41" +OSD_NODE_2_PW = "" +OSD_NODE_2_DISK_1 = "/dev/nvme21n1" +OSD_NODE_2_DISK_2 = "/dev/nvme22n1" +OSD_NODE_2_DISK_3 = "/dev/nvme23n1" + +OSD_NODE_3_IP = "10.10.40.42" +OSD_NODE_3_PW = "" +OSD_NODE_3_DISK_1 = "/dev/nvme21n1" +OSD_NODE_3_DISK_2 = "/dev/nvme22n1" +OSD_NODE_3_DISK_3 = "/dev/nvme23n1" + +RGW_NODE_IP = "10.10.40.39" +RGW_NODE_PW = "" +RGW_VIRTUAL_IP = "10.10.40.100" + +OSD_NODE_1_HOSTNAME = "testosdnode1" +OSD_NODE_2_HOSTNAME = "testosdnode2" +OSD_NODE_3_HOSTNAME = "testosdnode3" +RGW_NODE_HOSTNAME = "testrgwnode" + + +def log(msg): + global VERBOSE + if VERBOSE: + print(msg) + +def bootstrap_ceph(): + print("Bootstrap Ceph cluster START") + + # bootstrap + cmd = "sudo ./cephadm --image {} bootstrap --mon-ip {} --allow-mismatched-release --fsid 12341234-1234-1234-1234-123412341234 --initial-dashboard-password 12341234 --dashboard-password-noupdate".format(IMAGE_NAME, BOOTSTRAP_NODE_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph -s" + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + # discover hostname & add hosts + cmd = "sudo sshpass -p{} ssh-copy-id -f -i /etc/ceph/ceph.pub root@{}".format(OSD_NODE_1_PW, OSD_NODE_1_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo sshpass -p{} ssh-copy-id -f -i /etc/ceph/ceph.pub root@{}".format(OSD_NODE_2_PW, OSD_NODE_2_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo sshpass -p{} ssh-copy-id -f -i /etc/ceph/ceph.pub root@{}".format(OSD_NODE_3_PW, OSD_NODE_3_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo sshpass -p{} ssh-copy-id -f -i /etc/ceph/ceph.pub root@{}".format(RGW_NODE_PW, RGW_NODE_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + global OSD_NODE_1_HOSTNAME + cmd = "sudo cat /etc/hosts | grep {} | awk '{{print $2}}'".format(OSD_NODE_1_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + OSD_NODE_1_HOSTNAME = p.communicate()[0].decode().strip() + + cmd = "sudo ceph orch host add {} {}".format(OSD_NODE_1_HOSTNAME, OSD_NODE_1_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + global OSD_NODE_2_HOSTNAME + cmd = "sudo cat /etc/hosts | grep {} | awk '{{print $2}}'".format(OSD_NODE_2_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + OSD_NODE_2_HOSTNAME = p.communicate()[0].decode().strip() + + cmd = "sudo ceph orch host add {} {}".format(OSD_NODE_2_HOSTNAME, OSD_NODE_2_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + global OSD_NODE_3_HOSTNAME + cmd = "sudo cat /etc/hosts | grep {} | awk '{{print $2}}'".format(OSD_NODE_3_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + OSD_NODE_3_HOSTNAME = p.communicate()[0].decode().strip() + + cmd = "sudo ceph orch host add {} {}".format(OSD_NODE_3_HOSTNAME, OSD_NODE_3_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + global RGW_NODE_HOSTNAME + cmd = "sudo cat /etc/hosts | grep {} | awk '{{print $2}}'".format(RGW_NODE_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + RGW_NODE_HOSTNAME = p.communicate()[0].decode().strip() + + cmd = "sudo ceph orch host add {} {}".format(RGW_NODE_HOSTNAME, RGW_NODE_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + time.sleep(30) + + # add OSDs + cmd = "sudo ceph orch daemon add osd {}:{}".format(OSD_NODE_1_HOSTNAME, OSD_NODE_1_DISK_1) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph orch daemon add osd {}:{}".format(OSD_NODE_1_HOSTNAME, OSD_NODE_1_DISK_2) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph orch daemon add osd {}:{}".format(OSD_NODE_1_HOSTNAME, OSD_NODE_1_DISK_3) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph orch daemon add osd {}:{}".format(OSD_NODE_2_HOSTNAME, OSD_NODE_2_DISK_1) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph orch daemon add osd {}:{}".format(OSD_NODE_2_HOSTNAME, OSD_NODE_2_DISK_2) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph orch daemon add osd {}:{}".format(OSD_NODE_2_HOSTNAME, OSD_NODE_2_DISK_3) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph orch daemon add osd {}:{}".format(OSD_NODE_3_HOSTNAME, OSD_NODE_3_DISK_1) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph orch daemon add osd {}:{}".format(OSD_NODE_3_HOSTNAME, OSD_NODE_3_DISK_2) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph orch daemon add osd {}:{}".format(OSD_NODE_3_HOSTNAME, OSD_NODE_3_DISK_3) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + # add RGWs + cmd = "sudo ceph orch apply rgw testrgw \"--placement=3 {}\" --port=8080".format(RGW_NODE_HOSTNAME) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + time.sleep(30) + + # add LB + cmd = "sudo cat << EOF > ingress.yaml\nservice_type: ingress\nservice_id: rgw.testrgw\nservice_name: ingress.rgw.testrgw\nplacement:\n hosts:\n - {}\nspec:\n backend_service: rgw.testrgw\n frontend_port: 80\n monitor_port: 1967\n virtual_ip: {}\nEOF".format(RGW_NODE_HOSTNAME, RGW_VIRTUAL_IP) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo ceph orch apply -i ingress.yaml" + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo rm ingress.yaml" + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + # add s3 user + cmd = "sudo radosgw-admin user create --uid=\"1234\" --display-name=\"1234\" --access-key=\"0555b35654ad1656d804\" --secret-key=\"h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q==\"" + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + # set dedup config + time.sleep(10) + cmd = "sudo ceph config set global rgw_enable_dedup_threads true" + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + time.sleep(10) + cmd = "sudo ceph orch restart rgw.testrgw" + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + time.sleep(60) + print("Bootstrap Ceph cluster END") + +def destroy_ceph(): + # cleanup + cmd = "sudo sshpass -p{} ssh root@{} ./cephadm --image {} rm-cluster --force --fsid 12341234-1234-1234-1234-123412341234 --zap-osds".format(BOOTSTRAP_NODE_PW, BOOTSTRAP_NODE_IP, IMAGE_NAME) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo sshpass -p{} ssh root@{} ./cephadm --image {} rm-cluster --force --fsid 12341234-1234-1234-1234-123412341234 --zap-osds".format(OSD_NODE_1_PW, OSD_NODE_1_IP, IMAGE_NAME) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo sshpass -p{} ssh root@{} ./cephadm --image {} rm-cluster --force --fsid 12341234-1234-1234-1234-123412341234 --zap-osds".format(OSD_NODE_2_PW, OSD_NODE_2_IP, IMAGE_NAME) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo sshpass -p{} ssh root@{} ./cephadm --image {} rm-cluster --force --fsid 12341234-1234-1234-1234-123412341234 --zap-osds".format(OSD_NODE_3_PW, OSD_NODE_3_IP, IMAGE_NAME) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + cmd = "sudo sshpass -p{} ssh root@{} ./cephadm --image {} rm-cluster --force --fsid 12341234-1234-1234-1234-123412341234 --zap-osds".format(RGW_NODE_PW, RGW_NODE_IP, IMAGE_NAME) + log("bootstrap_ceph() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + + +def create_file(mode, file_name, file_size): + if mode == "zero": + cmd = "dd if=/dev/zero of=/tmp/{} bs={} count=1".format(file_name, file_size) + elif mode == "random": + cmd = "dd if=/dev/urandom of=/tmp/{} bs={} count=1".format(file_name, file_size) + elif mode == "random-with-hole": + create_file("random", file_name, file_size) + cmd = "dd if=/dev/zero of=/tmp/{} bs=1 count={} seek={} conv=notrunc".format(file_name, DEFAULT_CHUNK_SIZE, 0) + cmd += "; dd if=/dev/zero of=/tmp/{} bs=1 count={} seek={} conv=notrunc".format(file_name, DEFAULT_CHUNK_SIZE, get_file_size(file_name) // 2) + cmd += "; dd if=/dev/zero of=/tmp/{} bs=1 count={} seek={} conv=notrunc".format(file_name, DEFAULT_CHUNK_SIZE, get_file_size(file_name) - DEFAULT_CHUNK_SIZE) + elif mode == "big-random": + cmd = "dd if=/dev/urandom of=/tmp/{} bs={} count=5".format(file_name, file_size) + else: + raise Exception("mode is not valid") + + log("create_file() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("create_file() failed: {}".format(err)) + +def get_file_size(file_name): + cmd = "stat -c %s /tmp/{}".format(file_name) + log("get_file_size() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("get_file_size() failed: {}".format(err)) + log("get_file_size() out: {}".format(out)) + return int(out) + +# return file's rados object(4MB) count +def get_file_rados_object_count(file_size): + log("get_file_rados_object_count() file_size: {}".format(file_size)) + base = file_size // MULTIPART_CHUNK_SIZE + remain = file_size % MULTIPART_CHUNK_SIZE + if remain == 0: + log("get_file_rados_object_count() return: {}".format(base * 4)) + return int(base * 4) + elif 0 < remain <= CEPH_RADOS_OBJECT_SIZE: + log("get_file_rados_object_count() return: {}".format(base * 4 + 1)) + return int(base * 4 + 1) + elif CEPH_RADOS_OBJECT_SIZE < remain <= CEPH_RADOS_OBJECT_SIZE * 2: + log("get_file_rados_object_count() return: {}".format(base * 4 + 2)) + return int(base * 4 + 2) + elif CEPH_RADOS_OBJECT_SIZE * 2 < remain <= CEPH_RADOS_OBJECT_SIZE * 3: + log("get_file_rados_object_count() return: {}".format(base * 4 + 3)) + return int(base * 4 + 3) + +def get_file_checksum(file_name): + cmd = "md5sum /tmp/{}".format(file_name) + log("get_checksum() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("get_checksum() failed: {}".format(err)) + log("get_file_checksum() out: {}".format(out.decode().split()[0])) + return out.decode().split()[0] + +def delete_file(file_name): + cmd = "rm -rf /tmp/{}".format(file_name) + log("delete_file() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("delete_file() failed: {}".format(err)) + + +def create_rgw_bucket(bucket_name): + cmd = "s3cmd mb --host={} s3://{}".format(RGW_VIRTUAL_IP, bucket_name) + log("create_rgw_bucket() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("create_rgw_bucket() failed: {}".format(err)) + +def upload_rgw_object(bucket_name, file_name): + cmd = "s3cmd put --host={} /tmp/{} s3://{}/".format(RGW_VIRTUAL_IP, file_name, bucket_name) + log("upload_rgw_object() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("upload_rgw_object() failed: {}".format(err)) + +def download_rgw_object(bucket_name, file_name): + cmd = "s3cmd get --host={} s3://{}/{} /tmp/{} --force".format(RGW_VIRTUAL_IP, bucket_name, file_name, file_name + DOWNLOAD_POSTFIX) + log("download_rgw_object() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("download_rgw_object() failed: {}".format(err)) + +def delete_rgw_object(bucket_name, file_name): + cmd = "s3cmd rm --host={} s3://{}/{}".format(RGW_VIRTUAL_IP, bucket_name, file_name) + log("delete_rgw_object() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("delete_rgw_object() failed: {}".format(err)) + +def delete_rgw_bucket(bucket_name): + cmd = "s3cmd rb --host={} s3://{} --recursive".format(RGW_VIRTUAL_IP, bucket_name) + log("delete_rgw_bucket() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("delete_rgw_bucket() failed: {}".format(err)) + + +def upload_rados_object(pool_name, file_name): + cmd = "rados -p {} put {} /tmp/{}".format(pool_name, file_name, file_name) + log("upload_rados_object() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("upload_rados_object() failed: {}".format(err)) + +def download_rados_object(pool_name, object_name): + cmd = "rados -p {} get {} /tmp/{}".format(pool_name, object_name, object_name + DOWNLOAD_POSTFIX) + log("download_rados_object() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("download_rados_object() failed: {}".format(err)) + +def list_rados_pool(pool_name): + cmd = "rados -p {} ls".format(pool_name) + log("list_rados_pool() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("list_rados_pool() failed: {}".format(err)) + return out.decode().split() + +def stat_rados_object(pool_name, object_name): + cmd = "rados -p {} stat {}".format(pool_name, object_name) + log("stat_rados_object() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + return out.decode() + +def delete_rados_object(pool_name, object_name): + cmd = "rados -p {} rm {}".format(pool_name, object_name) + log("delete_rados_object() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("delete_rados_object() failed: {}".format(err)) + + +def clear_rgw(): + cmd = "radosgw-admin gc process --include-all" + log("clear_rgw() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("clear_rgw() failed: {}".format(err)) + +def clear_cold_pool(): + cmd = "rados purge {} --yes-i-really-really-mean-it".format(COLD_POOL) + log("clear_cold_pool() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + raise Exception("clear_cold_pool() failed: {}".format(err)) + + +def create_and_upload_file_thread(thread_id, bucket_name, num_files, mode, file_size): + log("create_and_upload_file_thread {} start".format(thread_id)) + for i in range(num_files): + file_name = "testthread{}_testfile{}".format(thread_id, i) + create_file(mode, file_name, file_size) + upload_rgw_object(bucket_name, file_name) + log("create_and_upload_file_thread {} end".format(thread_id)) + +def download_and_compare_file_thread(thread_id, bucket_name, num_files): + global THREAD_SUCCESS + log("download_and_compare_file_thread {} start".format(thread_id)) + for i in range(num_files): + file_name = "testthread{}_testfile{}".format(thread_id, i) + download_rgw_object(bucket_name, file_name) + if get_file_checksum(file_name) != get_file_checksum(file_name + DOWNLOAD_POSTFIX): + THREAD_SUCCESS = False + log("download_and_compare_file_thread {} end".format(thread_id)) + +def delete_file_thread(thread_id, bucket_name, num_files): + log("delete_file_thread {} start".format(thread_id)) + for i in range(num_files): + file_name = "testthread{}_testfile{}".format(thread_id, i) + delete_rgw_object(bucket_name, file_name) + delete_file(file_name) + delete_file(file_name + DOWNLOAD_POSTFIX) + log("delete_file_thread {} end".format(thread_id)) + + +# threads for workload generation +def read_load_prepare_thread(thread_id, bucket_name, num_files): + log("read_load_prepare_thread {} start".format(thread_id)) + for i in range(num_files): + file_name = "testthread{}_testreadfile{}".format(thread_id, i) + create_file("random", file_name, "10M") + upload_rgw_object(bucket_name, file_name) + log("read_load_prepare_thread {} end".format(thread_id)) + +def write_load_prepare_thread(thread_id, num_files): + log("write_load_prepare_thread {} start".format(thread_id)) + for i in range(num_files): + file_name = "testthread{}_testwritefile{}".format(thread_id, i) + create_file("random", file_name, "10M") + log("write_load_prepare_thread {} end".format(thread_id)) + +def read_load_thread(thread_id, bucket_name, num_files): + log("read_load_thread {} start".format(thread_id)) + + while not THREAD_EXIT_SIGNAL: + file_name = "testthread{}_testreadfile{}".format(thread_id, random.randint(0, num_files - 1)) + download_rgw_object(bucket_name, file_name) + + log("read_load_thread {} end".format(thread_id)) + +def write_load_thread(thread_id, bucket_name, num_files): + log("write_load_thread {} start".format(thread_id)) + + while not THREAD_EXIT_SIGNAL: + file_name = "testthread{}_testwritefile{}".format(thread_id, random.randint(0, num_files - 1)) + upload_rgw_object(bucket_name, file_name) + + log("write_load_thread {} end".format(thread_id)) + +def delete_read_load_thread(thread_id, bucket_name, num_read_files): + log("delete_read_load_thread {} start".format(thread_id)) + for i in range(num_read_files): + read_file_name = "testthread{}_testreadfile{}".format(thread_id, i) + delete_rgw_object(bucket_name, read_file_name) + delete_file(read_file_name) + delete_file(read_file_name + DOWNLOAD_POSTFIX) + log("delete_read_load_thread {} end".format(thread_id)) + +def delete_write_load_thread(thread_id, bucket_name, num_write_files): + log("delete_write_load_thread {} start".format(thread_id)) + for i in range(num_write_files): + write_file_name = "testthread{}_testwritefile{}".format(thread_id, i) + delete_rgw_object(bucket_name, write_file_name) + delete_file(write_file_name) + log("delete_write_load_thread {} end".format(thread_id)) + + +def preflight_check(): + print("Preflight check start") + print("1. checking ceph... ", end="") + cmd = "ceph -s" + log("preflight_check() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + print("failed!") + raise Exception("ceph -s failed: {}".format(err)) + print("OK!") + + print("2. checking pools exists... ", end="") + cmd = "rados lspools" + log("preflight_check() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + print("failed!") + raise Exception("rados lspools failed: {}".format(err)) + if BASE_POOL not in out.decode(): + print("failed!") + raise Exception("base pool not found: {}".format(BASE_POOL)) + if COLD_POOL not in out.decode(): + print("failed!") + raise Exception("cold pool not found: {}".format(COLD_POOL)) + print("OK!") + + print("3. checking s3cmd... ", end="") + cmd = "s3cmd --host=" + RGW_VIRTUAL_IP + " ls" + log("preflight_check() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + print("failed!") + raise Exception("s3cmd ls failed: {}".format(err)) + print("OK!") + + print("4. checking ceph-dedup-tool... ", end="") + cmd = "ceph-dedup-tool --help" + log("preflight_check() cmd: {}".format(cmd)) + p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True) + out, err = p.communicate() + if p.returncode != 0: + print("failed!") + raise Exception("ceph-dedup-tool --help failed: {}".format(err)) + print("OK!") + print("Preflight check end\n") + + +# 01. archive small file with no data duplication +def test01(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + file_mode = "random" + file_name = "testfile" + file_size = "4M" + create_file(file_mode, file_name, file_size) + + upload_rgw_object(bucket_name, file_name) + + while True: + if len(list_rados_pool(COLD_POOL)) == 0: + log("waiting for cold object creation...") + time.sleep(1) + elif len(list_rados_pool(COLD_POOL)) == 1: + archieve_object_name = list_rados_pool(COLD_POOL)[0] + log("cold object created, cold object name: {}".format(archieve_object_name)) + break + else: + raise Exception("cold object more than 1, expected: 1, actual: {}".format(len(list_rados_pool(COLD_POOL)))) + + stat_rados_object(COLD_POOL, archieve_object_name) + + download_rados_object(COLD_POOL, archieve_object_name) + + log("checksum of uploaded file: {}".format(get_file_checksum(file_name))) + log("checksum of downloaded file: {}".format(get_file_checksum(archieve_object_name + DOWNLOAD_POSTFIX))) + if get_file_checksum(file_name) == get_file_checksum(archieve_object_name + DOWNLOAD_POSTFIX): + print("TEST01 passed!") + else: + print("TEST01 failed!") + TEST_SUCCESS = False + + delete_rgw_object(bucket_name, file_name) + delete_file(file_name) + delete_file(archieve_object_name + DOWNLOAD_POSTFIX) + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 02. dedup small file with many data duplication +def test02(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + file_mode = "zero" + file_name = "testfile" + file_size = "1M" + create_file(file_mode, file_name, file_size) + + upload_rgw_object(bucket_name, file_name) + + while True: + if len(list_rados_pool(COLD_POOL)) == 0: + log("waiting for cold object creation...") + time.sleep(1) + elif len(list_rados_pool(COLD_POOL)) == 1: + cold_object_name = list_rados_pool(COLD_POOL)[0] + log("cold object created, cold object name: {}".format(cold_object_name)) + break + else: + raise Exception("cold object more than 1, expected: 1, actual: {}".format(len(list_rados_pool(COLD_POOL)))) + + stat_rados_object(COLD_POOL, cold_object_name) + + download_rados_object(COLD_POOL, cold_object_name) + + if (len(list_rados_pool(BASE_POOL)) == 1): + base_object_name = list_rados_pool(BASE_POOL)[0] + log("base object name: {}".format(base_object_name)) + stat_rados_object(BASE_POOL, base_object_name) + download_rados_object(BASE_POOL, base_object_name) + else: + raise Exception("base object mismatch, expected: 1, actual: {}".format(len(list_rados_pool(BASE_POOL)))) + + log("checksum of uploaded file: {}".format(get_file_checksum(file_name))) + log("checksum of downloaded file: {}".format(get_file_checksum(cold_object_name + DOWNLOAD_POSTFIX))) + log("size of downloaded cold object: {}".format(get_file_size(cold_object_name + DOWNLOAD_POSTFIX))) + if get_file_checksum(file_name) == get_file_checksum(base_object_name + DOWNLOAD_POSTFIX) and get_file_size(cold_object_name + DOWNLOAD_POSTFIX) == 65536: + print("TEST02 passed!") + else: + print("TEST02 failed!") + TEST_SUCCESS = False + + delete_rgw_object(bucket_name, file_name) + delete_file(file_name) + delete_file(cold_object_name + DOWNLOAD_POSTFIX) + delete_file(base_object_name + DOWNLOAD_POSTFIX) + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 03. dedup small file with partial data duplication +def test03(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + file_mode = "random-with-hole" + file_name = "testfile" + file_size = "1M" + create_file(file_mode, file_name, file_size) + + upload_rgw_object(bucket_name, file_name) + + while True: + if len(list_rados_pool(COLD_POOL)) == 0: + log("waiting for cold object creation...") + time.sleep(1) + elif len(list_rados_pool(COLD_POOL)) == 1: + chunk_object_name = list_rados_pool(COLD_POOL)[0] + log("cold object created, cold object name: {}".format(chunk_object_name)) + break + else: + raise Exception("cold object more than 1, expected: 1, actual: {}".format(len(list_rados_pool(COLD_POOL)))) + + stat_rados_object(COLD_POOL, chunk_object_name) + + download_rados_object(COLD_POOL, chunk_object_name) + + if (len(list_rados_pool(BASE_POOL)) == 1): + base_object_name = list_rados_pool(BASE_POOL)[0] + log("base object name: {}".format(base_object_name)) + stat_rados_object(BASE_POOL, base_object_name) + download_rados_object(BASE_POOL, base_object_name) + else: + raise Exception("base object mismatch, expected: 1, actual: {}".format(len(list_rados_pool(BASE_POOL)))) + + log("checksum of uploaded file: {}".format(get_file_checksum(file_name))) + log("checksum of downloaded file: {}".format(get_file_checksum(base_object_name + DOWNLOAD_POSTFIX))) + log("size of downloaded cold object: {}".format(get_file_size(chunk_object_name + DOWNLOAD_POSTFIX))) + if get_file_checksum(file_name) == get_file_checksum(base_object_name + DOWNLOAD_POSTFIX) and get_file_size(chunk_object_name + DOWNLOAD_POSTFIX) == 65536: + print("TEST03 passed!") + else: + print("TEST03 failed!") + TEST_SUCCESS = False + + delete_rgw_object(bucket_name, file_name) + delete_file(file_name) + delete_file(base_object_name + DOWNLOAD_POSTFIX) + delete_file(chunk_object_name + DOWNLOAD_POSTFIX) + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 04. archive many small files with no data duplication +def test04(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + file_count = 1000 + file_mode = "random" + file_name = "testfile" + file_size = "4M" + for i in range(file_count): + create_file(file_mode, file_name + str(i), file_size) + + for i in range(file_count): + upload_rgw_object(bucket_name, file_name + str(i)) + + while True: + if len(list_rados_pool(COLD_POOL)) < file_count: + log("waiting for cold object creation...") + time.sleep(1) + elif len(list_rados_pool(COLD_POOL)) == file_count: + log("cold object created, cold object count: {}".format(len(list_rados_pool(COLD_POOL)))) + break + else: + raise Exception("cold object more than {}, expected: {}, actual: {}".format(file_count, file_count, len(list_rados_pool(COLD_POOL)))) + + failed = False + for i in range(file_count): + download_rgw_object(bucket_name, file_name + str(i)) + if get_file_checksum(file_name + str(i)) != get_file_checksum(file_name + str(i) + DOWNLOAD_POSTFIX): + failed = True + + if not failed: + print("TEST04 passed!") + else: + print("TEST04 failed!") + TEST_SUCCESS = False + + for i in range(file_count): + delete_rgw_object(bucket_name, file_name + str(i)) + delete_file(file_name + str(i)) + delete_file(file_name + str(i) + DOWNLOAD_POSTFIX) + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 05. archive big file with no data duplication +def test05(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + file_mode = "random" + file_name = "testfile" + file_size = "1G" + + create_file(file_mode, file_name, file_size) + + upload_rgw_object(bucket_name, file_name) + + while True: + if len(list_rados_pool(COLD_POOL)) < get_file_rados_object_count(get_file_size(file_name)): + log("waiting for base object creation...") + time.sleep(1) + elif len(list_rados_pool(COLD_POOL)) == get_file_rados_object_count(get_file_size(file_name)): + log("archive object created, archive object count: {}".format(len(list_rados_pool(COLD_POOL)))) + break + else: + raise Exception("cold object more than {}, expected: {}, actual: {}".format(get_file_rados_object_count(get_file_size(file_name)), get_file_rados_object_count(get_file_size(file_name)), len(list_rados_pool(COLD_POOL)))) + + download_rgw_object(bucket_name, file_name) + + log("checksum of uploaded file: {}".format(get_file_checksum(file_name))) + log("checksum of downloaded file: {}".format(get_file_checksum(file_name + DOWNLOAD_POSTFIX))) + if get_file_checksum(file_name) == get_file_checksum(file_name + DOWNLOAD_POSTFIX): + print("TEST05 passed!") + else: + print("TEST05 failed!") + TEST_SUCCESS = False + + delete_rgw_object(bucket_name, file_name) + delete_file(file_name) + delete_file(file_name + DOWNLOAD_POSTFIX) + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 06. archive big file with many data duplication without chunk ref flooding +def test06(): + global TEST_SUCCESS + + bucket_name = "testbucket" + file_mode = "zero" + file_name = "testfile" + file_size = "2G" + + create_rgw_bucket(bucket_name) + + create_file(file_mode, file_name, file_size) + + upload_rgw_object(bucket_name, file_name) + + while True: + if len(list_rados_pool(COLD_POOL)) < 1: + log("waiting for base object creation...") + time.sleep(1) + elif len(list_rados_pool(COLD_POOL)) == 1: + cold_object_name = list_rados_pool(COLD_POOL)[0] + log("cold object created, cold object count: {}".format(len(list_rados_pool(COLD_POOL)))) + break + else: + raise Exception("cold object more than 1, expected: 1, actual: {}".format(len(list_rados_pool(COLD_POOL)))) + + log("sleep 300 seconds for more dedup...") + time.sleep(300) + + json_data = os.popen("ceph-dedup-tool --op dump-chunk-refs --chunk-pool default-cold-pool --object {}".format(cold_object_name)).read() + log("cmd: ceph-dedup-tool --op dump-chunk-refs --chunk-pool default-cold-pool --object {}".format(cold_object_name)) + count = json.loads(json_data)["count"] + refs_count = len(json.loads(json_data)["refs"]) + log("count: {}, refs_count: {}".format(count, refs_count)) + + if count >= MAX_CHUNK_REF_SIZE - 10 and count <= MAX_CHUNK_REF_SIZE + 10 and refs_count >= MAX_CHUNK_REF_SIZE - 10 and refs_count <= MAX_CHUNK_REF_SIZE + 10: + print("TEST06 passed!") + else: + print("TEST06 failed!") + TEST_SUCCESS = False + + delete_rgw_object(bucket_name, file_name) + delete_file(file_name) + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 07. FPManager entry eviction when fingerprint entries exceed the memory target +def test07(): + global TEST_SUCCESS + + # change fpmanager_memory target to 4MB + original_fpmanager_memory_target = int(os.popen("ceph config get client.rgw rgw_dedup_fpmanager_memory_limit").read()) + log("original_fpmanager_memory target: {}".format(original_fpmanager_memory_target)) + + os.system("ceph config set client.rgw rgw_dedup_fpmanager_memory_limit 4194304") + log("set fpmanager_memory target to 4MB") + log("cmd: ceph config set client.rgw rgw_dedup_fpmanager_memory_limit 4194304") + + rgw_service_name = os.popen("ceph orch ls | grep rgw. | grep -v ingress | awk '{print $1}'").read().strip() + log("rgw_service_name: {}".format(rgw_service_name)) + + os.system("ceph orch restart {}".format(rgw_service_name)) + log("cmd: ceph orch restart {}".format(rgw_service_name)) + time.sleep(60) + + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + file_count = 10 + file_mode = "random" + file_name = "testfile" + file_size = "2G" + for i in range(file_count): + create_file(file_mode, file_name + str(i), file_size) + upload_rgw_object(bucket_name, file_name + str(i)) + + time.sleep(300) + + chunk_file_mode = "zero" + chunk_file_name = "testchunkfile" + chunk_file_size = "4M" + create_file(chunk_file_mode, chunk_file_name, chunk_file_size) + upload_rgw_object(bucket_name, chunk_file_name) + + time.sleep(300) + + if stat_rados_object(COLD_POOL, "1adc95bebe9eea8c112d40cd04ab7a8d75c4f961"): + print("TEST07 passed!") + else: + print("TEST07 failed!") + TEST_SUCCESS = False + + for i in range(file_count): + delete_rgw_object(bucket_name, file_name + str(i)) + delete_file(file_name + str(i)) + delete_rgw_object(bucket_name, chunk_file_name) + delete_file(chunk_file_name) + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + # revert fpmanager_memory target config to original value + os.system("ceph config set client.rgw rgw_dedup_fpmanager_memory_limit {}".format(original_fpmanager_memory_target)) + log("set fpmanager_memory target to original_fpmanager_memory_target") + log("cmd: ceph config set client.rgw rgw_dedup_fpmanager_memory_limit {}".format(original_fpmanager_memory_target)) + + os.system("ceph orch restart {}".format(rgw_service_name)) + log("cmd: ceph orch restart {}".format(rgw_service_name)) + time.sleep(60) + + +# 08. scrub mismatched chunk +def test08(): + bucket_name = "testbucket" + file_mode = "random-with-hole" + file_name = "testfile" + file_size = "1M" + + create_rgw_bucket(bucket_name) + + create_file(file_mode, file_name, file_size) + + upload_rgw_object(bucket_name, file_name) + + while True: + if len(list_rados_pool(COLD_POOL)) == 0: + log("waiting for cold object creation...") + time.sleep(1) + elif len(list_rados_pool(COLD_POOL)) == 1: + cold_object_name = list_rados_pool(COLD_POOL)[0] + log("cold object created, cold object name: {}".format(cold_object_name)) + break + else: + raise Exception("cold object more than 1, expected: 1, actual: {}".format(len(list_rados_pool(COLD_POOL)))) + + log("cmd: ceph-dedup-tool --op dump-chunk-refs --chunk-pool {} --object {} | grep count | awk '{{print $2}}'".format(COLD_POOL, cold_object_name)) + cold_object_ref_count = os.popen("ceph-dedup-tool --op dump-chunk-refs --chunk-pool {} --object {} | grep count | awk '{{print $2}}'".format(COLD_POOL, cold_object_name)).read().strip()[:-1] + log("cold object ref count: {}".format(cold_object_ref_count)) + + # use dummy file to make reference mismatch + dummy_file_mode = "zero" + dummy_file_name = "testdummyfile" + dummy_file_size = "1" + create_file(dummy_file_mode, dummy_file_name, dummy_file_size) + + upload_rados_object(BASE_POOL, dummy_file_name) + + stat_rados_object(BASE_POOL, dummy_file_name) + + log("cmd: ceph osd pool ls detail | grep {} | awk '{{print $2}}'".format(BASE_POOL)) + base_pool_id = os.popen("ceph osd pool ls detail | grep {} | awk '{{print $2}}'".format(BASE_POOL)).read().strip() + log("base pool id: {}".format(base_pool_id)) + + log("cmd: ceph-dedup-tool --op chunk-get-ref --chunk-pool {} --object {} --target-ref-pool-id {} --target-ref {}".format(COLD_POOL, cold_object_name, base_pool_id, dummy_file_name)) + os.system("ceph-dedup-tool --op chunk-get-ref --chunk-pool {} --object {} --target-ref-pool-id {} --target-ref {}".format(COLD_POOL, cold_object_name, base_pool_id, dummy_file_name)) + + log("cmd: ceph-dedup-tool --op dump-chunk-refs --chunk-pool {} --object {} | grep count | awk '{{print $2}}'".format(COLD_POOL, cold_object_name)) + cold_object_ref_count_after = os.popen("ceph-dedup-tool --op dump-chunk-refs --chunk-pool {} --object {} | grep count | awk '{{print $2}}'".format(COLD_POOL, cold_object_name)).read().strip()[:-1] + log("cold_object_ref_count_after: {}".format(cold_object_ref_count_after)) + + if int(cold_object_ref_count_after) != int(cold_object_ref_count) + 1: + raise Exception("cold object ref count is not increased, before: {}, after: {}".format(cold_object_ref_count, cold_object_ref_count_after)) + + # check reference mismatch fixed with scrubbing + start_time = time.time() + while True: + log("cmd: ceph-dedup-tool --op dump-chunk-refs --chunk-pool {} --object {} | grep count | awk '{{print $2}}'".format(COLD_POOL, cold_object_name)) + cold_object_ref_count_after = os.popen("ceph-dedup-tool --op dump-chunk-refs --chunk-pool {} --object {} | grep count | awk '{{print $2}}'".format(COLD_POOL, cold_object_name)).read().strip()[:-1] + log("cold_object_ref_count_after: {}".format(cold_object_ref_count_after)) + + if int(cold_object_ref_count_after) == int(cold_object_ref_count): + log("now scrub finished, cold object ref count is {}".format(cold_object_ref_count_after)) + print("TEST08 passed!") + break + elif time.time() - start_time > 300: + print("TEST08 failed!") + TEST_SUCCESS = False + break + else: + log("waiting for cold object ref count to be {}, cold object ref count: {}".format(cold_object_ref_count, cold_object_ref_count_after)) + time.sleep(5) + + delete_rgw_object(bucket_name, file_name) + delete_file(file_name) + delete_rados_object(BASE_POOL, dummy_file_name) + delete_file(dummy_file_name) + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 09. check dedup_worker thread count in RGW node +def test09(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + file_count = 100 + file_mode = "random" + file_name = "testfile" + file_size = "100M" + for i in range(file_count): + create_file(file_mode, file_name + str(i), file_size) + upload_rgw_object(bucket_name, file_name + str(i)) + + cmd = "sshpass -pRoot0215! ssh root@{} ps -eLa | grep DedupWorker | wc -l".format(RGW_VIRTUAL_IP) + log("cmd: {}".format(cmd)) + dedup_worker_total_thread = int(os.popen(cmd).read().strip()) + log("dedup_worker_total_thread: {}".format(dedup_worker_total_thread)) + + if dedup_worker_total_thread == RGW_INSTANCE_NUM * DEDUP_WORKER_THREAD_NUM: + print("TEST09 passed!") + else: + print("TEST09 failed!") + TEST_SUCCESS = False + + # 파일 삭제 + for i in range(file_count): + delete_rgw_object(bucket_name, file_name + str(i)) + delete_file(file_name + str(i)) + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 10. data consistency when Restart RGW instance while dedup +def test10(): + global TEST_SUCCESS + + bucket_name = "testbucket" + + create_rgw_bucket(bucket_name) + + thread_count = 100 + file_count = 100 + file_mode = "random" + file_size = "10M" + threads = [] + for i in range(thread_count): + t = threading.Thread(target=create_and_upload_file_thread, args=(i, bucket_name, file_count, file_mode, file_size)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + rgw_service_name = os.popen("ceph orch ls | grep rgw. | grep -v ingress | awk '{print $1}'").read().strip() + log("rgw_service_name: {}".format(rgw_service_name)) + rgw_service_num = int(os.popen("ceph orch ls | grep rgw. | grep -v ingress | awk '{print $3}'").read().strip()[0]) + log("rgw_service_num: {}".format(rgw_service_num)) + + # stop rgw instances + os.system("ceph orch stop {}".format(rgw_service_name)) + + # waiting for rgw instances to be stop + start_time = time.time() + while True: + cmd = "ceph orch ls | grep rgw. | grep -v ingress | awk '{print $3}'" + log("cmd: {}".format(cmd)) + rgw_running = int(os.popen(cmd).read().strip()[0]) + log("rgw_running: {}".format(rgw_running)) + + if rgw_running == 0: + log("rgw_running: {}".format(rgw_running)) + break + elif time.time() - start_time > 300: + raise Exception("rgw instance stop timeout") + else: + log("waiting for rgw instance to be stop, now: {}".format(rgw_running)) + time.sleep(5) + + time.sleep(10) + + # start rgw instances + cmd = "ceph orch start {}".format(rgw_service_name) + log("cmd: {}".format(cmd)) + os.system(cmd) + + # waiting for rgw instances to be start + start_time = time.time() + while True: + cmd = "ceph orch ls | grep rgw. | grep -v ingress | awk '{print $3}'" + log("cmd: {}".format(cmd)) + rgw_running = int(os.popen(cmd).read().strip()[0]) + log("rgw_running: {}".format(rgw_running)) + + if rgw_running == rgw_service_num: + log("rgw_running: {}".format(rgw_running)) + break + elif time.time() - start_time > 300: + raise Exception("rgw instance stop timeout") + else: + log("waiting for rgw instance to be start, now: {}".format(rgw_running)) + time.sleep(5) + + time.sleep(300) + + global THREAD_SUCCESS + THREAD_SUCCESS = True + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=download_and_compare_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + if THREAD_SUCCESS: + print("TEST10 passed!") + else: + print("TEST10 failed!") + TEST_SUCCESS = False + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=delete_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 11. object resharding when RGW instance failure +def test11(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + thread_count = 100 + file_count = 100 + file_mode = "random" + file_size = "10M" + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=create_and_upload_file_thread, args=(i, bucket_name, file_count, file_mode, file_size)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + rgw_service_name = os.popen("ceph orch ls | grep rgw. | grep -v ingress | awk '{print $1}'").read().strip() + log("rgw_service_name: {}".format(rgw_service_name)) + rgw_service_num = int(os.popen("ceph orch ls | grep rgw. | grep -v ingress | awk '{print $3}'").read().strip()[0]) + log("rgw_service_num: {}".format(rgw_service_num)) + + # stop 1 rgw instance + cmd = "ceph orch apply rgw {} \"--placement={} {}\" --port=8080".format(rgw_service_name.split(".")[1], rgw_service_num - 1, RGW_NODE_HOSTNAME) + log("cmd: {}".format(cmd)) + os.system(cmd) + + start_time = time.time() + while True: + cmd = "ceph orch ls | grep rgw. | grep -v ingress | awk '{print $3}'" + log("cmd: {}".format(cmd)) + rgw_running = int(os.popen(cmd).read().strip()[0]) + log("rgw_running: {}".format(rgw_running)) + + if rgw_running == rgw_service_num - 1: + log("rgw_running: {}".format(rgw_running)) + break + elif time.time() - start_time > 300: + raise Exception("rgw instance apply timeout") + else: + log("waiting for rgw instance to be apply, now: {}".format(rgw_running)) + time.sleep(5) + + time.sleep(10) + + cmd = "sshpass -pRoot0215! ssh root@{} ps -eLa | grep DedupWorker | wc -l".format(RGW_VIRTUAL_IP) + log("cmd: {}".format(cmd)) + dedup_worker_total_thread = int(os.popen(cmd).read().strip()) + log("dedup_worker_total_thread: {}".format(dedup_worker_total_thread)) + + if dedup_worker_total_thread == DEDUP_WORKER_THREAD_NUM * (rgw_service_num - 1): + log("dedup_worker_total_thread: {}".format(dedup_worker_total_thread)) + print("TEST11 passed!") + else: + print("TEST11 failed!") + + # recover rgw instance + cmd = "ceph orch apply rgw {} \"--placement={} {}\" --port=8080".format(rgw_service_name.split(".")[1], rgw_service_num, RGW_NODE_HOSTNAME) + log("cmd: {}".format(cmd)) + os.system(cmd) + + start_time = time.time() + while True: + cmd = "ceph orch ls | grep rgw. | grep -v ingress | awk '{print $3}'" + log("cmd: {}".format(cmd)) + rgw_running = int(os.popen(cmd).read().strip()[0]) + log("rgw_running: {}".format(rgw_running)) + + if rgw_running == rgw_service_num: + log("rgw_running: {}".format(rgw_running)) + break + elif time.time() - start_time > 300: + raise Exception("rgw instance apply timeout") + else: + log("waiting for rgw instance to be apply, now: {}".format(rgw_running)) + time.sleep(5) + time.sleep(10) + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=delete_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 12. Heavy load(object read/write) +def test12(): + global TEST_SUCCESS + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + thread_count = 100 + file_count = 100 + file_mode = "random" + file_size = "10M" + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=create_and_upload_file_thread, args=(i, bucket_name, file_count, file_mode, file_size)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + time.sleep(60) + + read_thread_count = 100 + write_thread_count = 100 + read_file_count = 100 + write_file_count = 100 + + # load prepare + read_threads = [] + write_threads = [] + + for i in range(read_thread_count): + rt = threading.Thread(target=read_load_prepare_thread, args=(i, bucket_name, read_file_count)) + read_threads.append(rt) + rt.start() + + for rt in read_threads: + rt.join() + + for i in range(write_thread_count): + wt = threading.Thread(target=write_load_prepare_thread, args=(i, write_file_count)) + write_threads.append(wt) + wt.start() + + for wt in write_threads: + wt.join() + + # load start + read_threads = [] + for i in range(read_thread_count): + rt = threading.Thread(target=read_load_thread, args=(i, bucket_name, read_file_count)) + read_threads.append(rt) + rt.start() + + write_threads = [] + for i in range(write_thread_count): + wt = threading.Thread(target=write_load_thread, args=(i, bucket_name, write_file_count)) + write_threads.append(wt) + wt.start() + + time.sleep(100) + + global THREAD_SUCCESS + THREAD_SUCCESS = True + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=download_and_compare_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + if THREAD_SUCCESS: + print("TEST12 passed!") + else: + print("TEST12 failed!") + + # load stop + global THREAD_EXIT_SIGNAL + THREAD_EXIT_SIGNAL = True + + for rt in read_threads: + rt.join() + + for wt in write_threads: + wt.join() + + read_threads = [] + for i in range(read_thread_count): + rt = threading.Thread(target=delete_read_load_thread, args=(i, bucket_name, read_file_count)) + read_threads.append(rt) + rt.start() + + for rt in read_threads: + rt.join() + + write_threads = [] + for i in range(write_thread_count): + wt = threading.Thread(target=delete_write_load_thread, args=(i, bucket_name, write_file_count)) + write_threads.append(wt) + wt.start() + + for wt in write_threads: + wt.join() + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=delete_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 13. big object consistency when RGW restart +def test13(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + thread_count = 10 + file_count = 1 + file_mode = "big-random" + file_size = "10G" + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=create_and_upload_file_thread, args=(i, bucket_name, file_count, file_mode, file_size)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + time.sleep(60) + + rgw_service_name = os.popen("ceph orch ls | grep rgw. | grep -v ingress | awk '{print $1}'").read().strip() + log("rgw_service_name: {}".format(rgw_service_name)) + rgw_service_num = int(os.popen("ceph orch ls | grep rgw. | grep -v ingress | awk '{print $3}'").read().strip()[0]) + log("rgw_service_num: {}".format(rgw_service_num)) + + cmd = "ceph orch stop {}".format(rgw_service_name) + log("cmd: {}".format(cmd)) + os.system(cmd) + + start_time = time.time() + while True: + rgw_running = int(os.popen("ceph orch ls | grep rgw. | grep -v ingress | awk '{print $3}'").read().strip()[0]) + if rgw_running == 0: + log("all rgw instance stopped") + break + elif time.time() - start_time > 300: + log("rgw stop timeout") + break + else: + log("waiting for rgw stop, current rgw instance num: {}/{}".format(rgw_running, rgw_service_num)) + time.sleep(5) + + time.sleep(60) + + cmd = "ceph orch start {}".format(rgw_service_name) + log("cmd: {}".format(cmd)) + os.system(cmd) + + start_time = time.time() + while True: + rgw_running = int(os.popen("ceph orch ls | grep rgw. | grep -v ingress | awk '{print $3}'").read().strip()[0]) + if rgw_running == rgw_service_num: + log("all rgw instance started") + break + elif time.time() - start_time > 300: + log("rgw start timeout") + break + else: + log("waiting for rgw start, current rgw instance num: {}/{}".format(rgw_running, rgw_service_num)) + time.sleep(5) + + time.sleep(60) + + global THREAD_SUCCESS + THREAD_SUCCESS = True + threads = [] + for i in range(thread_count): + t = threading.Thread(target=download_and_compare_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + if THREAD_SUCCESS: + print("TEST13 passed!") + else: + print("TEST13 failed!") + TEST_SUCCESS = False + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=delete_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# 14. round sync when lopsided load without HAProxy +def test14(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + thread_count = 100 + file_count = 100 + file_mode = "random" + file_size = "10M" + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=create_and_upload_file_thread, args=(i, bucket_name, file_count, file_mode, file_size)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + read_thread_count = 100 + write_thread_count = 100 + read_file_count = 100 + write_file_count = 100 + + read_threads = [] + for i in range(read_thread_count): + rt = threading.Thread(target=read_load_prepare_thread, args=(i, bucket_name, read_file_count)) + read_threads.append(rt) + rt.start() + + for rt in read_threads: + rt.join() + + write_threads = [] + for i in range(write_thread_count): + wt = threading.Thread(target=write_load_prepare_thread, args=(i, write_file_count)) + write_threads.append(wt) + wt.start() + + for wt in write_threads: + wt.join() + + global RGW_VIRTUAL_IP + RGW_HOST_WITH_HAPROXY = RGW_VIRTUAL_IP + RGW_VIRTUAL_IP = RGW_NODE_IP + ":8080" + + read_threads = [] + for i in range(read_thread_count): + rt = threading.Thread(target=read_load_thread, args=(i, bucket_name, read_file_count)) + read_threads.append(rt) + rt.start() + + write_threads = [] + for i in range(write_thread_count): + wt = threading.Thread(target=write_load_thread, args=(i, bucket_name, write_thread_count)) + write_threads.append(wt) + wt.start() + + time.sleep(60) + + cmd = "sshpass -pRoot0215! ssh root@{} ps -eLa | grep DedupWorker | wc -l".format(RGW_HOST_WITH_HAPROXY) + log("cmd: {}".format(cmd)) + dedup_worker_total_thread = int(os.popen(cmd).read().strip()) + log("dedup_worker_total_thread: {}".format(dedup_worker_total_thread)) + + if dedup_worker_total_thread == RGW_INSTANCE_NUM * DEDUP_WORKER_THREAD_NUM: + print("TEST14 passed!") + else: + print("TEST14 failed!") + TEST_SUCCESS = False + + + global THREAD_EXIT_SIGNAL + THREAD_EXIT_SIGNAL = True + + for rt in read_threads: + rt.join() + + for wt in write_threads: + wt.join() + + RGW_VIRTUAL_IP = RGW_HOST_WITH_HAPROXY + + read_threads = [] + for i in range(read_thread_count): + rt = threading.Thread(target=delete_read_load_thread, args=(i, bucket_name, read_file_count)) + read_threads.append(rt) + rt.start() + + for rt in read_threads: + rt.join() + + write_threads = [] + for i in range(write_thread_count): + wt = threading.Thread(target=delete_write_load_thread, args=(i, bucket_name, write_file_count)) + write_threads.append(wt) + wt.start() + + for wt in write_threads: + wt.join() + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=delete_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + delete_rgw_bucket + clear_rgw() + clear_cold_pool() + + +# 15. dedup data consistency during flapping OSD +def test15(): + global TEST_SUCCESS + + bucket_name = "testbucket" + create_rgw_bucket(bucket_name) + + thread_count = 100 + file_count = 100 + file_mode = "random" + file_size = "10M" + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=create_and_upload_file_thread, args=(i, bucket_name, file_count, file_mode, file_size)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + cmd = "ceph orch daemon stop osd.0" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(3) + + cmd = "ceph orch daemon stop osd.1" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(3) + + cmd = "ceph orch daemon stop osd.2" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(3) + + for i in range(10): + cmd = "ceph orch daemon stop osd.3" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(10) + cmd = "ceph orch daemon stop osd.4" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(10) + cmd = "ceph orch daemon stop osd.5" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(10) + cmd = "ceph orch daemon start osd.3" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(10) + cmd = "ceph orch daemon start osd.4" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(10) + cmd = "ceph orch daemon start osd.5" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(10) + + cmd = "ceph orch daemon start osd.0" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(10) + + cmd = "ceph orch daemon start osd.1" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(10) + + cmd = "ceph orch daemon start osd.2" + log("cmd: {}".format(cmd)) + os.system(cmd) + time.sleep(10) + + global THREAD_SUCCESS + THREAD_SUCCESS = True + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=download_and_compare_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + if THREAD_SUCCESS: + print("TEST15 passed!") + else: + print("TEST15 failed!") + TEST_SUCCESS = False + + threads = [] + for i in range(thread_count): + t = threading.Thread(target=delete_file_thread, args=(i, bucket_name, file_count)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + delete_rgw_bucket(bucket_name) + clear_rgw() + clear_cold_pool() + + +# main 함수 +if __name__ == "__main__": + if len(sys.argv) < 22 or "-h" in sys.argv or "--help" in sys.argv: + print("Usage: python3 rgw_dedup_integration_test.py [image name] " \ + "[bootstrap node ip] [bootstrap node pw] " \ + "[osd node 1 ip] [osd node 1 pw] [osd node 1 disk 1] [osd node 1 disk 2] [osd node 1 disk 3] " \ + "[osd node 2 ip] [osd node 2 pw] [osd node 2 disk 1] [osd node 2 disk 2] [osd node 2 disk 3] " \ + "[osd node 3 ip] [osd node 3 pw] [osd node 3 disk 1] [osd node 3 disk 2] [osd node 3 disk 3] " \ + "[rgw node ip] [rgw node pw] [rgw virtual ip] " \ + "[-v / --verbose]") + sys.exit(1) + + if "-v" in sys.argv or "--verbose" in sys.argv: + VERBOSE = True + + IMAGE_NAME = sys.argv[1] + BOOTSTRAP_NODE_IP = sys.argv[2] + BOOTSTRAP_NODE_PW = sys.argv[3] + OSD_NODE_1_IP = sys.argv[4] + OSD_NODE_1_PW = sys.argv[5] + OSD_NODE_1_DISK_1 = sys.argv[6] + OSD_NODE_1_DISK_2 = sys.argv[7] + OSD_NODE_1_DISK_3 = sys.argv[8] + OSD_NODE_2_IP = sys.argv[9] + OSD_NODE_2_PW = sys.argv[10] + OSD_NODE_2_DISK_1 = sys.argv[11] + OSD_NODE_2_DISK_2 = sys.argv[12] + OSD_NODE_2_DISK_3 = sys.argv[13] + OSD_NODE_3_IP = sys.argv[14] + OSD_NODE_3_PW = sys.argv[15] + OSD_NODE_3_DISK_1 = sys.argv[16] + OSD_NODE_3_DISK_2 = sys.argv[17] + OSD_NODE_3_DISK_3 = sys.argv[18] + RGW_NODE_IP = sys.argv[19] + RGW_NODE_PW = sys.argv[20] + RGW_VIRTUAL_IP = sys.argv[21] + + destroy_ceph() + bootstrap_ceph() + preflight_check() + + test01() + test02() + test03() + test04() + test05() + test06() + test07() + test08() + test09() + test10() + test11() + test12() + test13() + test14() + test15() + + destroy_ceph() + + if TEST_SUCCESS : + print("All tests passed!") + sys.exit(0) + else: + print("Some tests failed!") + sys.exit(1) \ No newline at end of file