Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/common/options/rgw.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -3692,3 +3692,11 @@ options:
default: tank
services:
- rgw
- name: rgw_enable_dedup_threads
type: bool
level: advanced
desc: Use dedup function in rados gateway
default: true
services:
- rgw
with_legacy: true
17 changes: 17 additions & 0 deletions src/osd/HitSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class HitSet {
virtual void dump(ceph::Formatter *f) const = 0;
virtual Impl* clone() const = 0;
virtual void seal() {}
virtual void insert_string(const std::string& o) {}
virtual bool contains_string(const std::string& o) const { return false; }
virtual ~Impl() {}
};

Expand Down Expand Up @@ -148,6 +150,15 @@ class HitSet {
return impl->contains(o);
}

/// insert a hash of string into to set
void insert_string(const std::string& o) {
impl->insert_string(o);
}
/// query whether a string value is in the set
bool contains_string(const std::string& o) const {
return impl->contains_string(o);
}

unsigned insert_count() const {
return impl->insert_count();
}
Expand Down Expand Up @@ -418,6 +429,12 @@ class BloomHitSet : public HitSet::Impl {
bool contains(const hobject_t& o) const override {
return bloom.contains(o.get_hash());
}
void insert_string(const std::string& o) override {
bloom.insert(o);
}
bool contains_string(const std::string& o) const override {
return bloom.contains(o);
}
unsigned insert_count() const override {
return bloom.element_count();
}
Expand Down
4 changes: 3 additions & 1 deletion src/osd/PrimaryLogPG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2545,7 +2545,9 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
op.op == CEPH_OSD_OP_TIER_PROMOTE ||
op.op == CEPH_OSD_OP_TIER_FLUSH ||
op.op == CEPH_OSD_OP_TIER_EVICT ||
op.op == CEPH_OSD_OP_ISDIRTY) {
op.op == CEPH_OSD_OP_ISDIRTY ||
op.op == CEPH_OSD_OP_GETXATTR ||
op.op == CEPH_OSD_OP_GETXATTRS) {
return cache_result_t::NOOP;
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/rgw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ set(librgw_common_srcs
rgw_lua_data_filter.cc
rgw_bucket_encryption.cc
rgw_tracer.cc
rgw_lua_background.cc)
rgw_lua_background.cc
rgw_dedup.cc
rgw_fp_manager.cc
rgw_dedup_manager.cc
rgw_dedup_worker.cc
rgw_dedup_iotracker.cc)

list(APPEND librgw_common_srcs
store/immutable_config/store.cc
Expand Down Expand Up @@ -230,6 +235,8 @@ target_link_libraries(rgw_common
cls_timeindex_client
cls_user_client
cls_version_client
cls_cas_client
cls_cas_internal
librados
rt
fmt::fmt
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4361,7 +4361,6 @@ int main(int argc, const char **argv)
bool need_gc = (gc_ops_list.find(opt_cmd) != gc_ops_list.end()) && !bypass_gc;

StoreManager::Config cfg = StoreManager::get_config(true, g_ceph_context);

auto config_store_type = g_conf().get_val<std::string>("rgw_config_store");
cfgstore = StoreManager::create_config_store(dpp(), config_store_type);
if (!cfgstore) {
Expand All @@ -4382,6 +4381,7 @@ int main(int argc, const char **argv)
false,
false,
false,
false,
need_cache && g_conf()->rgw_cache_enabled,
need_gc);
}
Expand Down
1 change: 1 addition & 0 deletions src/rgw/rgw_appmain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ void rgw::AppMain::init_storage()
run_quota,
run_sync,
g_conf().get_val<bool>("rgw_dynamic_resharding"),
g_conf()->rgw_enable_dedup_threads,
g_conf()->rgw_cache_enabled);

} /* init_storage */
Expand Down
59 changes: 59 additions & 0 deletions src/rgw/rgw_dedup.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

#include "rgw_dedup.h"

#define dout_subsys ceph_subsys_rgw

using namespace std;


void RGWDedup::initialize(CephContext* _cct, rgw::sal::RadosStore* _store)
{
cct = _cct;
store = _store;
dedup_manager = make_unique<RGWDedupManager>(this, cct, store);
dedup_manager->initialize();
}

void RGWDedup::finalize()
{
dedup_manager.reset();
}

void RGWDedup::start_dedup_manager()
{
assert(dedup_manager.get());
dedup_manager->set_down_flag(false);
dedup_manager->create("dedup_manager");
}

void RGWDedup::stop_dedup_manager()
{
if (!dedup_manager->get_down_flag() && dedup_manager.get()) {
dedup_manager->stop();
dedup_manager->join();
dedup_manager->finalize();
}
}

RGWDedup::~RGWDedup()
{
if (dedup_manager.get()) {
stop_dedup_manager();
}
finalize();

ldpp_dout(this, 2) << "stop RGWDedup done" << dendl;
}

unsigned RGWDedup::get_subsys() const
{
return dout_subsys;
}


void RGWDedup::trace_obj(rgw_obj obj)
{
dedup_manager->trace_obj(obj);
}
43 changes: 43 additions & 0 deletions src/rgw/rgw_dedup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

#ifndef CEPH_RGW_DEDUP_H
#define CEPH_RGW_DEDUP_H


#include <string>
#include <atomic>
#include <sstream>

#include "include/types.h"
#include "common/Cond.h"
#include "common/Thread.h"
#include "rgw_dedup_manager.h"

using namespace std;

class RGWDedup : public DoutPrefixProvider
{
CephContext* cct;
rgw::sal::RadosStore* store;
unique_ptr<RGWDedupManager> dedup_manager;

public:
RGWDedup() : cct(nullptr), store(nullptr) {}
~RGWDedup() override;

void initialize(CephContext* _cct, rgw::sal::RadosStore* _store);
void finalize();

void start_dedup_manager();
void stop_dedup_manager();

CephContext* get_cct() const override { return cct; }
unsigned get_subsys() const override;
std::ostream& gen_prefix(std::ostream& out) const override { return out << "RGWDedup: "; }

void trace_obj(rgw_obj obj);
};

#endif

126 changes: 126 additions & 0 deletions src/rgw/rgw_dedup_iotracker.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

#include "rgw_dedup_iotracker.h"

void RGWIOTracker::initialize()
{
create_hit_set();
hit_set_map.clear();
}

void RGWIOTracker::remove_oldest_hit_set()
{
if (!hit_set_map.empty()) {
map<utime_t, HitSetRef>::iterator it = hit_set_map.begin();
hit_set_map.erase(it);
}
}

void RGWIOTracker::create_hit_set()
{
utime_t now = ceph_clock_now();
BloomHitSet* bl_hitset = new BloomHitSet(hit_set_target_size,
hit_set_fpp,
now.sec());
hit_set.reset(new HitSet(bl_hitset));
hit_set_start_stamp = now;
}

void RGWIOTracker::check_hit_set_valid(utime_t now, bool is_full)
{
assert(hit_set_start_stamp > utime_t());
assert(hit_set_count> 0);

// active HitSet only case. hit_set_map doesn't need.
if (hit_set_count == 1) {
return;
}

if (is_full || hit_set_start_stamp + hit_set_period < now) {
{
std::unique_lock<std::shared_mutex> lock(iotracker_lock);
hit_set_map.emplace(hit_set_start_stamp, hit_set);
hit_set_map.rbegin()->second->seal();
create_hit_set();
}

while (hit_set_map.size() >= hit_set_count) {
remove_oldest_hit_set();
}
}
}

void RGWIOTracker::insert(rgw_obj obj)
{
assert(hit_set.get());

utime_t now = ceph_clock_now();
if (obj.bucket.bucket_id != "" && obj.get_oid() != "") {
bool is_full = false;
{
std::unique_lock<std::shared_mutex> lock(iotracker_lock);
hit_set->insert_string(obj.bucket.bucket_id + ":" + obj.get_oid());
is_full = hit_set->is_full();
}
check_hit_set_valid(now, is_full);
}
}

bool RGWIOTracker::estimate_temp(rgw_obj obj)
{
assert(hit_set.get());

utime_t now = ceph_clock_now();
check_hit_set_valid(now);

if (obj.bucket.bucket_id != "" && obj.get_oid() != "") {
{
std::shared_lock<std::shared_mutex> lock(iotracker_lock);
if (hit_set->contains_string(obj.bucket.bucket_id+ ":" + obj.get_oid())) {
ldpp_dout(dpp, 10) << obj << " found in active HitSet" << dendl;
return true;
}
}

// serach from the latest
for (map<utime_t, HitSetRef>::reverse_iterator p = hit_set_map.rbegin();
p != hit_set_map.rend();
++p) {
// ignore too old HitSets
if (p->first + (hit_set_count * hit_set_period) < now) {
break;
}
if (p->second->contains_string(obj.bucket.bucket_id + ":" + obj.get_oid())) {
ldpp_dout(dpp, 10) << obj << " found in hit_set_map" << dendl;
return true;
}
}
}
ldpp_dout(dpp, 10) << obj << " not exists" << dendl;
return false;
}

void RGWIOTracker::finalize()
{
hit_set_map.clear();
hit_set.reset();
}

void RGWIOTracker::set_hit_set_count(const uint32_t new_count)
{
assert(new_count > 0);
hit_set_count = new_count;
}

void RGWIOTracker::set_hit_set_period(const uint32_t new_period)
{
assert(new_period > 0);
hit_set_period = new_period;
}

void RGWIOTracker::set_hit_set_target_size(const uint32_t new_target_size)
{
assert(new_target_size > 0);
hit_set_target_size = new_target_size;
}
Loading