Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4296,7 +4296,7 @@ int Client::mds_command(
// Construct and send MCommand
MCommand *m = new MCommand(monclient->get_fsid());
m->cmd = cmd;
m->set_data(inbl);
m->set_data(const_cast<bufferlist&>(inbl));
m->set_tid(tid);
conn->send_message(m);
}
Expand Down
33 changes: 27 additions & 6 deletions src/common/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/safe_io.h"
#include "common/simple_spin.h"
#include "common/strtol.h"
#include "common/likely.h"
#include "include/atomic.h"
#include "common/Mutex.h"
#include "include/types.h"
Expand Down Expand Up @@ -161,6 +162,10 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
bool is_n_page_sized() {
return (len & ~CEPH_PAGE_MASK) == 0;
}
virtual bool sharable() {
// true if safe to claim-hold due to, e.g., special registration
return true;
}
bool get_crc(const pair<size_t, size_t> &fromto,
pair<uint32_t, uint32_t> *crc) const {
Mutex::Locker l(crc_lock);
Expand Down Expand Up @@ -602,6 +607,18 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
return _raw->clone();
}

buffer::ptr& buffer::ptr::clone_nonsharable() {
if (_raw && !_raw->sharable()) {
buffer::raw *tr = _raw;
_raw = tr->clone();
_raw->nref.set(1);
if (unlikely(tr->nref.dec() == 0)) {
delete tr;
}
}
return *this;
}

void buffer::ptr::swap(ptr& other)
{
raw *r = _raw;
Expand Down Expand Up @@ -1170,27 +1187,31 @@ void buffer::list::rebuild_page_aligned()
}

// sort-of-like-assignment-op
void buffer::list::claim(list& bl)
void buffer::list::claim(list& bl, bool clone_nonsharable)
{
// free my buffers
clear();
claim_append(bl);
claim_append(bl, clone_nonsharable);
}

void buffer::list::claim_append(list& bl)
void buffer::list::claim_append(list& bl, bool clone_nonsharable)
{
// steal the other guy's buffers
_len += bl._len;
_buffers.splice( _buffers.end(), bl._buffers );
if (clone_nonsharable)
bl.clone_nonsharable();
_buffers.splice(_buffers.end(), bl._buffers );
bl._len = 0;
bl.last_p = bl.begin();
}

void buffer::list::claim_prepend(list& bl)
void buffer::list::claim_prepend(list& bl, bool clone_nonsharable)
{
// steal the other guy's buffers
_len += bl._len;
_buffers.splice( _buffers.begin(), bl._buffers );
if (clone_nonsharable)
bl.clone_nonsharable();
_buffers.splice(_buffers.begin(), bl._buffers );
bl._len = 0;
bl.last_p = bl.begin();
}
Expand Down
38 changes: 32 additions & 6 deletions src/include/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class CEPH_BUFFER_API buffer {

raw *clone();
void swap(ptr& other);
ptr& clone_nonsharable();

// misc
bool at_buffer_head() const { return _off == 0; }
Expand Down Expand Up @@ -329,12 +330,17 @@ class CEPH_BUFFER_API buffer {
append_buffer.set_length(0); // unused, so far.
}
~list() {}

list(const list& other) : _buffers(other._buffers), _len(other._len), _memcopy_count(other._memcopy_count),last_p(this) { }
list(const list& other) : _buffers(other._buffers), _len(other._len),
_memcopy_count(other._memcopy_count), last_p(this) {
// make deep copy semantics the default
clone_nonsharable();
}
list& operator= (const list& other) {
if (this != &other) {
_buffers = other._buffers;
_len = other._len;
// make deep copy semantics the default
clone_nonsharable();
}
return *this;
}
Expand Down Expand Up @@ -404,10 +410,30 @@ class CEPH_BUFFER_API buffer {
unsigned align_memory);
void rebuild_page_aligned();

// sort-of-like-assignment-op
void claim(list& bl);
void claim_append(list& bl);
void claim_prepend(list& bl);
// assignment-op with move semantics
void claim(list& bl, bool clone_nonsharable=true);
void claim_append(list& bl, bool clone_nonsharable=true);
void claim_prepend(list& bl, bool clone_nonsharable=true);

// clone non-sharable buffers (make sharable)
void clone_nonsharable() {
std::list<buffer::ptr>::iterator pb;
for (pb = _buffers.begin(); pb != _buffers.end(); ++pb) {
(void) pb->clone_nonsharable();
}
}

// copy with explicit volatile-sharing semantics
void share(list& bl)
{
if (this != &bl) {
clear();
std::list<buffer::ptr>::iterator pb;
for (pb = bl._buffers.begin(); pb != bl._buffers.end(); ++pb) {
push_back(*pb);
}
}
}

iterator begin() {
return iterator(this, 0);
Expand Down
12 changes: 6 additions & 6 deletions src/msg/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,33 +314,33 @@ class Message : public RefCountedObject {
void set_payload(bufferlist& bl) {
if (byte_throttler)
byte_throttler->put(payload.length());
payload.claim(bl);
payload.claim(bl, false /* !clone_nonsharable */);
if (byte_throttler)
byte_throttler->take(payload.length());
}

void set_middle(bufferlist& bl) {
if (byte_throttler)
byte_throttler->put(payload.length());
middle.claim(bl);
middle.claim(bl, false /* !clone_nonsharable */);
if (byte_throttler)
byte_throttler->take(payload.length());
}
bufferlist& get_middle() { return middle; }

void set_data(const bufferlist &d) {
void set_data(bufferlist &bl) {
if (byte_throttler)
byte_throttler->put(data.length());
data = d;
data.claim(bl, false /* !clone_nonsharable */);
if (byte_throttler)
byte_throttler->take(data.length());
}

bufferlist& get_data() { return data; }
void claim_data(bufferlist& bl) {
void claim_data(bufferlist& bl, bool clone_nonsharable=true) {
if (byte_throttler)
byte_throttler->put(data.length());
bl.claim(data);
bl.claim(data, clone_nonsharable);
}
off_t get_data_len() { return data.length(); }

Expand Down
2 changes: 1 addition & 1 deletion src/os/FileJournal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64
bufferptr bp = buffer::create_static(pre_pad, zero_buf);
bl.push_back(bp);
}
bl.claim_append(ebl);
bl.claim_append(ebl, false /* !clone_nonsharable */); // potential zero-copy

if (h.post_pad) {
bufferptr bp = buffer::create_static(post_pad, zero_buf);
Expand Down
2 changes: 1 addition & 1 deletion src/os/FileJournal.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class FileJournal : public Journal {
TrackedOpRef tracked_op;
write_item(uint64_t s, bufferlist& b, int al, TrackedOpRef opref) :
seq(s), alignment(al), tracked_op(opref) {
bl.claim(b);
bl.claim(b, false /* !clone_nonsharable */); // potential zero-copy
}
write_item() : seq(0), alignment(0) {}
};
Expand Down