diff --git a/packaging/rhel/xrootd-ceph-buffered.spec.in b/packaging/rhel/xrootd-ceph-buffered.spec.in new file mode 100644 index 00000000..96264c87 --- /dev/null +++ b/packaging/rhel/xrootd-ceph-buffered.spec.in @@ -0,0 +1,179 @@ +#------------------------------------------------------------------------------- +# Helper macros +#------------------------------------------------------------------------------- +%if %{?rhel:1}%{!?rhel:0} + %if %{rhel} >= 7 + %define use_systemd 1 + %else + %define use_systemd 0 + %endif +%else + %if %{?fedora}%{!?fedora:0} >= 19 + %define use_systemd 1 + %else + %define use_systemd 0 + %endif +%endif + +%if %{?fedora}%{!?fedora:0} >= 22 + %define use_libc_semaphore 1 +%else + %define use_libc_semaphore 0 +%endif + +%if %{?_with_ceph11:1}%{!?_with_ceph11:0} + %define _with_ceph 1 +%endif + +%if %{?rhel:1}%{!?rhel:0} + %if %{rhel} > 7 + %define use_cmake3 0 + %else + %define use_cmake3 1 + %endif +%else + %define use_cmake3 0 +%endif + +#------------------------------------------------------------------------------- +# Package definitions +#------------------------------------------------------------------------------- +Name: xrootd-ceph-buffered +Epoch: 1 +Version: __VERSION__ +Release: __RELEASE__%{?dist}%{?_with_clang:.clang} +Summary: CEPH plug-in for XRootD +Group: System Environment/Daemons +License: LGPLv3+ +URL: http://xrootd.org/ + +# git clone http://xrootd.org/repo/xrootd.git xrootd +# cd xrootd +# git-archive master | gzip -9 > ~/rpmbuild/SOURCES/xrootd.tgz +Source0: xrootd-ceph-buffered.tar.gz + +BuildRoot: %{_tmppath}/%{name}-root + +%if %{use_cmake3} +BuildRequires: cmake3 +%else +BuildRequires: cmake +%endif + +%if %{?_with_tests:1}%{!?_with_tests:0} +BuildRequires: cppunit-devel +%endif + +BuildRequires: librados-devel = 2:14.2.22 +BuildRequires: libradosstriper-devel = 2:14.2.22 + +%if %{?_with_clang:1}%{!?_with_clang:0} +BuildRequires: clang +%endif + +#BuildRequires: xrootd-server-devel%{?_isa} = %{epoch}:%{version}-%{release} +#BuildRequires: xrootd-private-devel%{?_isa} = %{epoch}:%{version}-%{release} +#BuildRequires: xrootd-libs%{?_isa} = %{epoch}:%{version}-%{release} +#BuildRequires: xrootd-server-libs%{?_isa} = %{epoch}:%{version}-%{release} +#BuildRequires: xrootd-client-libs%{?_isa} = %{epoch}:%{version}-%{release} + +#Requires: xrootd-server-libs%{?_isa} = %{epoch}:%{version}-%{release} +#Requires: xrootd-client-libs%{?_isa} = %{epoch}:%{version}-%{release} +#Requires: xrootd-libs%{?_isa} = %{epoch}:%{version}-%{release} + +BuildRequires: xrootd-server-devel%{?_isa} >= 1:5.3.3 +BuildRequires: xrootd-private-devel%{?_isa} >= 1:5.3.3 +BuildRequires: xrootd-libs%{?_isa} >= 1:5.3.1 +BuildRequires: xrootd-server-libs%{?_isa} >= 1:5.3.3 +BuildRequires: xrootd-client-libs%{?_isa} >= 1:5.3.3 + +Requires: xrootd-server-libs%{?_isa} >= 1:5.3.3 +Requires: xrootd-client-libs%{?_isa} >= 1:5.3.3 +Requires: xrootd-libs%{?_isa} >= 1:5.3.3 + +%description +The xrootd-ceph-buffered is an OSS layer plug-in for the XRootD server for interfacing +with the Ceph storage platform. + +#------------------------------------------------------------------------------- +# Build instructions +#------------------------------------------------------------------------------- +%prep +%setup -c -n xrootd-ceph-buffered + +%build +cd xrootd-ceph-buffered + +%if %{?_with_clang:1}%{!?_with_clang:0} +export CC=clang +export CXX=clang++ +%endif + +mkdir build +pushd build + +%if %{use_cmake3} +cmake3 \ +%else +cmake \ +%endif + -DCMAKE_INSTALL_PREFIX=/usr -DCMAKE_BUILD_TYPE=RelWithDebInfo \ +%if %{?_with_tests:1}%{!?_with_tests:0} + -DENABLE_TESTS=TRUE \ +%else + -DENABLE_TESTS=FALSE \ +%endif + ../ + +make -i VERBOSE=1 %{?_smp_mflags} +popd + +#------------------------------------------------------------------------------- +# Installation +#------------------------------------------------------------------------------- +%install +rm -rf $RPM_BUILD_ROOT + +#------------------------------------------------------------------------------- +# Install 4.x.y +#------------------------------------------------------------------------------- +pushd xrootd-ceph-buffered +pushd build +make install DESTDIR=$RPM_BUILD_ROOT +popd + +# ceph posix unversioned so +rm -f $RPM_BUILD_ROOT%{_libdir}/libXrdCephPosix.so + + +%clean +rm -rf $RPM_BUILD_ROOT + +#------------------------------------------------------------------------------- +# Files +#------------------------------------------------------------------------------- +%files +%defattr(-,root,root,-) +%{_libdir}/libXrdCeph-5.so +%{_libdir}/libXrdCephXattr-5.so +%{_libdir}/libXrdCephPosix.so* + +%if %{?_with_tests:1}%{!?_with_tests:0} +%files tests +%defattr(-,root,root,-) +%{_libdir}/libXrdCephTests*.so +%endif + +#------------------------------------------------------------------------------- +# Changelog +#------------------------------------------------------------------------------- +%changelog +* Mon Mar 14 2022 Jyothish Thomas +-offline file bug fix +* Wed Dec 16 2020 George Patargias +- updated version for librados-devel and libradosstriper-devel to 14.2.15 following the recent upgrade on external Echo gateways +- fixed version in xrootd-ceph-buffered shared libraries +* Mon Mar 02 2020 Michal Simon +- fixed RPM dependencies +* Thu Mar 08 2018 Michal Simon +- initial release diff --git a/src/XrdCeph.cmake b/src/XrdCeph.cmake index 9f666637..a3636610 100644 --- a/src/XrdCeph.cmake +++ b/src/XrdCeph.cmake @@ -46,7 +46,18 @@ add_library( XrdCeph/XrdCephOss.cc XrdCeph/XrdCephOss.hh XrdCeph/XrdCephOssFile.cc XrdCeph/XrdCephOssFile.hh XrdCeph/XrdCephOssDir.cc XrdCeph/XrdCephOssDir.hh - XrdCeph/XrdCephBulkAioRead.cc XrdCeph/XrdCephBulkAioRead.hh) + XrdCeph/XrdCephBulkAioRead.cc XrdCeph/XrdCephBulkAioRead.hh + XrdCeph/XrdCephOssBufferedFile.cc XrdCeph/XrdCephOssBufferedFile.hh + XrdCeph/XrdCephOssReadVFile.cc XrdCeph/XrdCephOssReadVFile.hh + XrdCeph/XrdCephBuffers/XrdCephBufferDataSimple.cc XrdCeph/XrdCephBuffers/XrdCephBufferDataSimple.hh + XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.cc XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.hh + XrdCeph/XrdCephBuffers/CephIOAdapterRaw.cc XrdCeph/XrdCephBuffers/CephIOAdapterRaw.hh + XrdCeph/XrdCephBuffers/CephIOAdapterAIORaw.cc XrdCeph/XrdCephBuffers/CephIOAdapterAIORaw.hh + XrdCeph/XrdCephBuffers/BufferUtils.cc XrdCeph/XrdCephBuffers/BufferUtils.hh + XrdCeph/XrdCephBuffers/XrdCephReadVNoOp.cc XrdCeph/XrdCephBuffers/XrdCephReadVNoOp.hh + XrdCeph/XrdCephBuffers/XrdCephReadVBasic.cc XrdCeph/XrdCephBuffers/XrdCephReadVBasic.hh +) + target_link_libraries( ${LIB_XRD_CEPH} diff --git a/src/XrdCeph/XrdCephBuffers/BufferUtils.cc b/src/XrdCeph/XrdCephBuffers/BufferUtils.cc new file mode 100644 index 00000000..a30adfbe --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/BufferUtils.cc @@ -0,0 +1,169 @@ + +#include "BufferUtils.hh" +#include // std::max + +using namespace XrdCephBuffer; + +#ifdef CEPHBUFDEBUG +// to synchronise logging statements + std::mutex cephbuf_iolock; +#endif + +// ------------------------------------------------------ // +// Extent // + +bool Extent::in_extent(off_t pos) const +{ + return ((pos > begin()) && (pos < end())); +} + +bool Extent::isContiguous(const Extent &rhs) const +{ + // does the rhs connect directly to the end of the first + if (end() != rhs.begin()) + return false; + return true; +} + +bool Extent::allInExtent(off_t pos, size_t len) const +{ + // is all the range in this extent + if ((pos < begin()) || (pos >= end())) + return false; + + if (off_t(pos + len) > end()) + return false; + return true; +} +bool Extent::someInExtent(off_t pos, size_t len) const +{ // is some of the range in this extent + if ((off_t(pos + len) < begin()) || (pos >= end())) + return false; + return true; +} + +Extent Extent::containedExtent(off_t pos, size_t len) const +{ + // return the subset of input range that is in this extent + off_t subbeg = std::max(begin(), pos); + off_t subend = std::min(end(), off_t(pos + len)); + + return Extent(subbeg, subend - subbeg); +} +Extent Extent::containedExtent(const Extent &rhs) const +{ + return containedExtent(rhs.begin(), rhs.len()); +} + +bool Extent::operator<(const Extent &rhs) const +{ + // comparison primarily on begin values + // use end values if begin values are equal. + + if (begin() > rhs.begin()) return false; + if (begin() < rhs.begin()) return true; + if (end() < rhs.end() ) return true; + return false; +} +bool Extent::operator==(const Extent &rhs) const +{ + // equivalence based only on start and end + if (begin() != rhs.begin()) + return false; + if (end() != rhs.end()) + return false; + return true; +} + +// ------------------------------------------------------ // +// ExtentHolder // + +ExtentHolder::ExtentHolder() {} + +ExtentHolder::ExtentHolder(size_t elements) +{ + m_extents.reserve(elements); +} + +ExtentHolder::ExtentHolder(const ExtentContainer &extents) +{ + m_extents.reserve(extents.size()); + for (ExtentContainer::const_iterator vit = m_extents.cbegin(); vit != m_extents.cend(); ++vit) { + push_back(*vit); + } + +} +ExtentHolder::~ExtentHolder() +{ + m_extents.clear(); +} + +void ExtentHolder::push_back(const Extent & in) { + if (size()) { + m_begin = std::min(m_begin, in.begin()); + m_end = std::max(m_end, in.end()); + } else { + m_begin = in.begin(); + m_end = in.end(); + } + return m_extents.push_back(in); +} + + + +Extent ExtentHolder::asExtent() const { + // if (!size()) return Extent(0,0); + // ExtentContainer se = getSortedExtents(); + // off_t b = se.front().begin(); + // off_t e = se.back().end(); + + return Extent(m_begin, m_end-m_begin); + +} + +size_t ExtentHolder::bytesContained() const { + size_t nbytes{0}; + for (ExtentContainer::const_iterator vit = m_extents.cbegin(); vit != m_extents.cend(); ++vit) { + nbytes += vit->len(); + } + return nbytes; +} + +size_t ExtentHolder::bytesMissing() const { + size_t bytesUsed = bytesContained(); + size_t totalRange = asExtent().len(); //might be expensive to call + return totalRange - bytesUsed; +} + + +void ExtentHolder::sort() { + std::sort(m_extents.begin(), m_extents.end()); +} + + +ExtentContainer ExtentHolder::getSortedExtents() const { + ExtentContainer v; + v.assign(m_extents.begin(), m_extents.end() ); + std::sort(v.begin(), v.end()); + return v; +} + +ExtentContainer ExtentHolder::getExtents() const { + ExtentContainer v; + v.assign(m_extents.begin(), m_extents.end() ); + return v; +} + +// ------------------------------------------------------ // +// Timer ns // + +Timer_ns::Timer_ns(long &output) : m_output_val(output) +{ + m_start = std::chrono::steady_clock::now(); +} + +Timer_ns::~Timer_ns() +{ + auto end = std::chrono::steady_clock::now(); + m_output_val = std::chrono::duration_cast(end - m_start).count(); +} diff --git a/src/XrdCeph/XrdCephBuffers/BufferUtils.hh b/src/XrdCeph/XrdCephBuffers/BufferUtils.hh new file mode 100644 index 00000000..8a0c6f0f --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/BufferUtils.hh @@ -0,0 +1,152 @@ +#ifndef __CEPH_BUFFER_UTILS_HH__ +#define __CEPH_BUFFER_UTILS_HH__ + +// holder of various small utility classes for debugging, profiling, logging, and general stuff + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +// basic logging +// #TODO; merge this into the xrootd logging, when xrootd is available +#define CEPHBUFDEBUG 1 +#ifdef CEPHBUFDEBUG +extern std::mutex cephbuf_iolock; +#define BUFLOG(x) {std::unique_lockcephbuf_iolock; std::stringstream _bs; _bs << x; std::clog << _bs.str() << std::endl;} +#else +#define BUFLOG(x) +#endif + +namespace XrdCephBuffer +{ + + + class Timer_ns + { + /** + * @brief RAII based timer information outputing a long value of ns + * Almost trivial class to time something and to pass the duration as a long + * to an output variable (specified in the constructor) at destruction. + * Create the object to start the timer. The timer stops when its destructor is called. + * #TODO improve to template the output type and the time ratio + */ + public: + explicit Timer_ns(long &output_ns); + ~Timer_ns(); + + private: + std::chrono::steady_clock::time_point m_start; + long &m_output_val; //!< reference to the external variable to store the output. + + }; //Timer_ns + + + + class Extent + { + /** + * @brief Ecapsulates an offsets and length, with added functionaliyu + * Class that represents an offset possition and a length. + * Simplest usecase is to avoid passing two values around, however this class + * provides additional funcationality for manipulation of extends (e.g. merging, splitting) + * which may prove useful. + */ + + public: + Extent(off_t offset, size_t len) : m_offset(offset), m_len(len){} + inline off_t offset() const { return m_offset; } + inline size_t len() const { return m_len; } + inline off_t begin() const { return m_offset; } //!< Same as offset, but a bit more stl container like + inline off_t end() const { return m_offset + m_len; } //!< similar to stl vector end. + inline bool empty() const {return m_len == 0;} + + /** + * Does the start of the rhs continue directly from the + * end of this Extent + */ + bool isContiguous(const Extent& rhs) const; + + inline off_t last_pos() const { return m_offset + m_len - 1; } //!< last real position + + bool in_extent(off_t pos) const; //!< is this position within the range of this extent + bool allInExtent(off_t pos, size_t len) const; //!< is all the range in this extent + bool someInExtent(off_t pos, size_t len) const; //!< is some of the range in this extent + + Extent containedExtent(off_t pos, size_t len) const; //!< return the subset of range that is in this extent + Extent containedExtent(const Extent &in) const; //!< + + bool operator<(const Extent &rhs) const; + bool operator==(const Extent &rhs) const; + + + private: + off_t m_offset; + size_t m_len; + }; + + /** + * @brief Container defintion for Extents + * Typedef to provide a container of extents as a simple stl vector container + */ + typedef std::vector ExtentContainer; + + /** + * @brief Designed to hold individual extents, but itself provide Extent-like capabilities + * Useful in cases of combining extends, or needing to hold a range of extends and extract + * information about (or aggregated from) the contained objects. + * Could be useful to inherit from Extent if improvements needed. + * + * + */ + class ExtentHolder { + // holder of a list of extent objects + public: + ExtentHolder(); + explicit ExtentHolder(size_t elements); //!< reserve memory only + explicit ExtentHolder(const ExtentContainer& extents); + ~ExtentHolder(); + + off_t begin() const {return m_begin;} + off_t end() const {return m_end;} + size_t len() const {return m_end - m_begin;} //! Total range in bytes of the extents + + bool empty() const {return m_extents.empty();} + size_t size() const {return m_extents.size();} //!< number of extent elements + + Extent asExtent() const; // return an extent covering the whole range + + + size_t bytesContained() const; // number of bytes across the extent not considering overlaps! + size_t bytesMissing() const; // number of bytes missing across the extent, not considering overlaps! + + void push_back(const Extent & in); + void sort(); //!< inplace sort by offset of contained extents + + const ExtentContainer & extents() const {return m_extents;} + //ExtentContainer & extents() {return m_extents;} + + ExtentContainer getSortedExtents() const; + ExtentContainer getExtents() const; + + + + protected: + ExtentContainer m_extents; + + off_t m_begin{0}; //lowest offset value + off_t m_end{0}; // one past end of last byte used. + + }; + + +} + +#endif diff --git a/src/XrdCeph/XrdCephBuffers/CephIOAdapterAIORaw.cc b/src/XrdCeph/XrdCephBuffers/CephIOAdapterAIORaw.cc new file mode 100644 index 00000000..0ba9caec --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/CephIOAdapterAIORaw.cc @@ -0,0 +1,184 @@ +#include "CephIOAdapterAIORaw.hh" +#include "../XrdCephPosix.hh" +#include "XrdOuc/XrdOucEnv.hh" + +#include +#include +#include +#include +#include +#include +#include + +using namespace XrdCephBuffer; + +using myclock = std::chrono::steady_clock; +//using myseconds = std::chrono::durationResult = rc; + aiop->doneRead(); + } + static void aioWriteCallback(XrdSfsAio *aiop, size_t rc) + { + aiop->Result = rc; + aiop->doneWrite(); + } + +} // anonymous namespace + +CephBufSfsAio::CephBufSfsAio() : m_lock(m_mutex) +{ +} + +void CephBufSfsAio::doneRead() +{ + //BUFLOG("DoneRead"); + m_dataOpDone = true; + m_lock.unlock(); + m_condVar.notify_all(); +} + +void CephBufSfsAio::doneWrite() +{ + //BUFLOG("DoneWrite"); + m_dataOpDone = true; + m_lock.unlock(); + m_condVar.notify_all(); +} + +CephIOAdapterAIORaw::CephIOAdapterAIORaw(IXrdCephBufferData *bufferdata, int fd) : m_bufferdata(bufferdata), m_fd(fd) +{ +} + +CephIOAdapterAIORaw::~CephIOAdapterAIORaw() +{ + // nothing to specifically to do; just print out some stats + float read_speed{0}, write_speed{0}; + if (m_stats_read_req.load() > 0) { + read_speed = m_stats_read_bytes.load() / m_stats_read_timer.load() * 1e-3; + } + if (m_stats_write_req.load() > 0) { + write_speed = m_stats_write_bytes.load() / m_stats_write_timer.load() * 1e-3; + } + BUFLOG("CephIOAdapterAIORaw::Summary fd:" << m_fd + << " nwrite:" << m_stats_write_req << " byteswritten:" << m_stats_write_bytes << " write_s:" + << m_stats_write_timer * 1e-3 << " writemax_s" << m_stats_write_longest * 1e-3 + << " write_MBs:" << write_speed + << " nread:" << m_stats_read_req << " bytesread:" << m_stats_read_bytes << " read_s:" + << m_stats_read_timer * 1e-3 << " readmax_s:" << m_stats_read_longest * 1e-3 + << " read_MBs:" << read_speed ); +} + +ssize_t CephIOAdapterAIORaw::write(off64_t offset, size_t count) +{ + void *buf = m_bufferdata->raw(); + if (!buf) { + BUFLOG("CephIOAdapterAIORaw::write null buffer was provided.") + return -EINVAL; + } + //BUFLOG("Make aio"); + std::unique_ptr aiop = std::unique_ptr(new CephBufSfsAio()); + aiocb &sfsAio = aiop->sfsAio; + // set the necessary parameters for the read, e.g. buffer pointer, offset and length + sfsAio.aio_buf = buf; + sfsAio.aio_nbytes = count; + sfsAio.aio_offset = offset; + // need the concrete object for the blocking / wait + CephBufSfsAio *ceph_aiop = dynamic_cast(aiop.get()); + + long dt_ns{0}; + ssize_t rc{0}; + { // brace is for timer RAII + XrdCephBuffer::Timer_ns timer(dt_ns); + rc = ceph_aio_write(m_fd, aiop.get(), aioWriteCallback); + + if (rc < 0) { + BUFLOG("CephIOAdapterAIORaw::write ceph_aio_write returned rc:" << rc) + return rc; + } + + while (!ceph_aiop->isDone()) + { + ceph_aiop->m_condVar.wait(ceph_aiop->m_lock, std::bind(&CephBufSfsAio::isDone, ceph_aiop)); + } + } // timer brace + + // cleanup + rc = ceph_aiop->Result; + if (rc < 0) { + BUFLOG("CephIOAdapterAIORaw::write ceph_aiop->Result returned rc:" << rc) + } + + // BUFLOG("CephIOAdapterAIORaw::write fd:" << m_fd << " off:" + // << offset << " len:" << count << " rc:" << rc << " ms:" << dt_ns / 1000000); + + m_stats_write_longest = std::max(m_stats_write_longest, dt_ns / 1000000); + m_stats_write_timer.fetch_add(dt_ns / 1000000); + m_stats_write_bytes.fetch_add(rc); + ++m_stats_write_req; + return rc; +} + +ssize_t CephIOAdapterAIORaw::read(off64_t offset, size_t count) +{ + void *buf = m_bufferdata->raw(); + if (!buf) + { + BUFLOG("CephIOAdapterAIORaw::read null buffer was provided.") + return -EINVAL; + } + + std::unique_ptr aiop = std::unique_ptr(new CephBufSfsAio()); + aiocb &sfsAio = aiop->sfsAio; + // set the necessary parameters for the read, e.g. buffer pointer, offset and length + sfsAio.aio_buf = buf; + sfsAio.aio_nbytes = count; + sfsAio.aio_offset = offset; + // need the concrete object for the blocking / wait + CephBufSfsAio *ceph_aiop = dynamic_cast(aiop.get()); + + long dt_ns{0}; + ssize_t rc{0}; + { // timer brace RAII + XrdCephBuffer::Timer_ns timer(dt_ns); + // no check is made whether the buffer has sufficient capacity + // rc = ceph_posix_pread(m_fd,buf,count,offset); + //BUFLOG("Submit aio read: "); + rc = ceph_aio_read(m_fd, aiop.get(), aioReadCallback); + + if (rc < 0) + return rc; + + // now block until the read is done + // take the lock on the aio object + // while(!ceph_aiop->isDone()) { ceph_aiop->m_condVar.wait(lock,std::bind(&CephBufSfsAio::isDone,ceph_aiop) ); } + while (!ceph_aiop->isDone()) + { + ceph_aiop->m_condVar.wait(ceph_aiop->m_lock, std::bind(&CephBufSfsAio::isDone, ceph_aiop)); + } + } // timer brace + + // cleanup + rc = ceph_aiop->Result; + + m_stats_read_longest = std::max(m_stats_read_longest, dt_ns / 1000000); + m_stats_read_timer.fetch_add(dt_ns * 1e-6); + m_stats_read_bytes.fetch_add(rc); + ++m_stats_read_req; + + // BUFLOG("CephIOAdapterAIORaw::read fd:" << m_fd << " " << offset + // << " " << count << " " << rc << " " << dt_ns * 1e-6); + + if (rc >= 0) + { + m_bufferdata->setLength(rc); + m_bufferdata->setStartingOffset(offset); + m_bufferdata->setValid(true); + } + return rc; +} diff --git a/src/XrdCeph/XrdCephBuffers/CephIOAdapterAIORaw.hh b/src/XrdCeph/XrdCephBuffers/CephIOAdapterAIORaw.hh new file mode 100644 index 00000000..a4d0f0c8 --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/CephIOAdapterAIORaw.hh @@ -0,0 +1,102 @@ +#ifndef __CEPH_IO_ADAPTER_AIORAW_HH__ +#define __CEPH_IO_ADAPTER_AIORAW_HH__ +//------------------------------------------------------------------------------ +// Interface of the logic part of the buffering +// Intention to be able to abstract the underlying implementation and code against the inteface +// e.g. for different complexities of control. +// Couples loosely to IXrdCepgBufferData and anticipated to be called by XrdCephOssBufferedFile. +// Should managage all of the IO and logic to give XrdCephOssBufferedFile only simple commands to call. +// implementations are likely to use (via callbacks?) CephPosix library code for actual reads and writes. +//------------------------------------------------------------------------------ + +#include +#include "IXrdCephBufferData.hh" +#include "ICephIOAdapter.hh" +#include "BufferUtils.hh" + +#include +#include +#include +#include +#include + +#include "XrdSfs/XrdSfsAio.hh" + + +namespace XrdCephBuffer { + + class CephBufSfsAio : virtual public XrdSfsAio + { + public: + CephBufSfsAio(); + // Method to handle completed reads + // + virtual void doneRead() override; + + // Method to hand completed writes + // + virtual void doneWrite() override; + + // Method to recycle free object + // + virtual void Recycle() override{}; + std::mutex m_mutex; + std::unique_lock m_lock; + std::condition_variable m_condVar; + bool isDone() {return m_dataOpDone;} + + protected: + bool m_dataOpDone {false}; + + }; + +/** + * @brief Implements a non-async read and write to ceph via aio ceph_posix calls + * Using the standard ceph_posix_aio calls do the actual read and write operations. + * No ownership is taken on the buffer that's passed via the constructor + * Although using aio calls, we block here until the data has been read/written + */ +class CephIOAdapterAIORaw: public virtual ICephIOAdapter { + public: + CephIOAdapterAIORaw(IXrdCephBufferData * bufferdata, int fd); + virtual ~CephIOAdapterAIORaw(); + + /** + * @brief Take the data in the buffer and write to ceph at given offset + * Issues a ceph_posix_pwrite for data in the buffer (from pos 0) into + * ceph at position offset with len count. + * Returns -ve on error, else the number of bytes writen. + * + * @param offset + * @param count + * @return ssize_t + */ + virtual ssize_t write(off64_t offset,size_t count) override; + + /** + * @brief Issue a ceph_posix_pread to read to the buffer data from file offset and len count. + * No range checking is currently provided here. The caller must provide sufficient space for the + * max len read. + * Returns -ve errorcode on failure, else the number of bytes returned. + * + * @param offset + * @param count + * @return ssize_t + */ + virtual ssize_t read(off64_t offset,size_t count) override; + + private: + IXrdCephBufferData * m_bufferdata; //!< no ownership of pointer (consider shared ptrs, etc) + int m_fd; + + // timer and counter info + std::atomic< long> m_stats_read_timer{0}, m_stats_write_timer{0}; + std::atomic< long> m_stats_read_bytes{0}, m_stats_write_bytes{0}; + std::atomic< long> m_stats_read_req{0}, m_stats_write_req{0}; + long m_stats_read_longest{0}, m_stats_write_longest{0}; + +}; + +} + +#endif diff --git a/src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.cc b/src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.cc new file mode 100644 index 00000000..130fb71a --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.cc @@ -0,0 +1,96 @@ +#include "CephIOAdapterRaw.hh" +#include "../XrdCephPosix.hh" +#include "XrdOuc/XrdOucEnv.hh" + +#include +#include +#include + +using namespace XrdCephBuffer; + +using myclock = std::chrono::steady_clock; +//using myseconds = std::chrono::duration 0 && m_stats_read_timer.load() > 0 ) { + read_speed = m_stats_read_bytes.load() / m_stats_read_timer.load() * 1e-6; + } + if (m_stats_write_req.load() > 0 && m_stats_read_timer.load() > 0 ) { + write_speed = m_stats_write_bytes.load() / m_stats_write_timer.load() * 1e-6; + } + BUFLOG("CephIOAdapterRaw::Summary fd:" << m_fd + << " nwrite:" << m_stats_write_req << " byteswritten:" << m_stats_write_bytes << " write_s:" + << m_stats_write_timer * 1e-6 << " writemax_s" << m_stats_write_longest * 1e-6 + << " write_MBs:" << write_speed + << " nread:" << m_stats_read_req << " bytesread:" << m_stats_read_bytes << " read_s:" + << m_stats_read_timer * 1e-6 << " readmax_s:" << m_stats_read_longest * 1e-6 + << " read_MBs:" << read_speed + << " striperlessRead: " << m_useStriperlessReads + ); + +} + +ssize_t CephIOAdapterRaw::write(off64_t offset,size_t count) { + const void* buf = m_bufferdata->raw(); + if (!buf) return -EINVAL; + + auto start = std::chrono::steady_clock::now(); + ssize_t rc = ceph_posix_pwrite(m_fd,buf,count,offset); + auto end = std::chrono::steady_clock::now(); + auto int_ns = std::chrono::duration_cast(end-start); + + // BUFLOG("CephIOAdapterRaw::write fd:" << m_fd << " " << rc << " " + // << offset << " " << count << " " << rc << " " << int_ms.count() ); + + if (rc < 0) return rc; + m_stats_write_longest = std::max(m_stats_write_longest,static_cast(int_ns.count())); + m_stats_write_timer.fetch_add(static_cast(int_ns.count())); + m_stats_write_bytes.fetch_add(rc); + ++m_stats_write_req; + return rc; +} + + +ssize_t CephIOAdapterRaw::read(off64_t offset, size_t count) { + void* buf = m_bufferdata->raw(); + if (!buf) { + return -EINVAL; + } + ssize_t rc {0}; + + // no check is made whether the buffer has sufficient capacity + auto start = std::chrono::steady_clock::now(); + rc = ceph_posix_maybestriper_pread(m_fd,buf,count,offset, m_useStriperlessReads); + auto end = std::chrono::steady_clock::now(); + //auto elapsed = end-start; + auto int_ns = std::chrono::duration_cast(end-start); + + if (rc < 0) { + BUFLOG("CephIOAdapterRaw::read: Error in read: " << rc ); + return rc; + } + + m_stats_read_longest = std::max(m_stats_read_longest,static_cast(int_ns.count())); + m_stats_read_timer.fetch_add(static_cast(int_ns.count())); + m_stats_read_bytes.fetch_add(rc); + ++m_stats_read_req; + + // BUFLOG("CephIOAdapterRaw::read fd:" << m_fd << " " << rc << " " << offset + // << " " << count << " " << rc << " " << int_ms.count() ); + + if (rc>=0) { + m_bufferdata->setLength(rc); + m_bufferdata->setStartingOffset(offset); + m_bufferdata->setValid(true); + } + return rc; +} + diff --git a/src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.hh b/src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.hh new file mode 100644 index 00000000..55d427f9 --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/CephIOAdapterRaw.hh @@ -0,0 +1,73 @@ +#ifndef __CEPH_IO_ADAPTER_RAW_HH__ +#define __CEPH_IO_ADAPTER_RAW_HH__ +//------------------------------------------------------------------------------ +// Interface of the logic part of the buffering +// Intention to be able to abstract the underlying implementation and code against the inteface +// e.g. for different complexities of control. +// Couples loosely to IXrdCepgBufferData and anticipated to be called by XrdCephOssBufferedFile. +// Should managage all of the IO and logic to give XrdCephOssBufferedFile only simple commands to call. +// implementations are likely to use (via callbacks?) CephPosix library code for actual reads and writes. +//------------------------------------------------------------------------------ + +#include +#include "IXrdCephBufferData.hh" +#include "ICephIOAdapter.hh" +#include "BufferUtils.hh" + +#include +#include +#include + +namespace XrdCephBuffer { + +/** + * @brief Implements a non-async read and write to ceph via ceph_posix calls + * Using the standard ceph_posix_ calls do the actual read and write operations. + * No ownership is taken on the buffer that's passed via the constructor + */ +class CephIOAdapterRaw: public virtual ICephIOAdapter { + public: + CephIOAdapterRaw(IXrdCephBufferData * bufferdata, int fd, + bool useStriperlessReads); + virtual ~CephIOAdapterRaw(); + + /** + * @brief Take the data in the buffer and write to ceph at given offset + * Issues a ceph_posix_pwrite for data in the buffer (from pos 0) into + * ceph at position offset with len count. + * Returns -ve on error, else the number of bytes writen. + * + * @param offset + * @param count + * @return ssize_t + */ + virtual ssize_t write(off64_t offset,size_t count) override; + + /** + * @brief Issue a ceph_posix_pread to read to the buffer data from file offset and len count. + * No range checking is currently provided here. The caller must provide sufficient space for the + * max len read. + * Returns -ve errorcode on failure, else the number of bytes returned. + * + * @param offset + * @param count + * @return ssize_t + */ + virtual ssize_t read(off64_t offset,size_t count) override; + + private: + IXrdCephBufferData * m_bufferdata; //!< no ownership of pointer (consider shared ptrs, etc) + int m_fd; + bool m_useStriperlessReads {true}; //!< use the striperless read code + + // timer and counter info + std::atomic< long long> m_stats_read_timer{0}, m_stats_write_timer{0}; + std::atomic< long long> m_stats_read_bytes{0}, m_stats_write_bytes{0}; + std::atomic< long long> m_stats_read_req{0}, m_stats_write_req{0}; + long long m_stats_read_longest{0}, m_stats_write_longest{0}; + +}; + +} + +#endif diff --git a/src/XrdCeph/XrdCephBuffers/ICephIOAdapter.hh b/src/XrdCeph/XrdCephBuffers/ICephIOAdapter.hh new file mode 100644 index 00000000..1fb2c363 --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/ICephIOAdapter.hh @@ -0,0 +1,35 @@ +#ifndef __ICEPH_IO_ADAPTER_HH__ +#define __ICEPH_IO_ADAPTER_HH__ +//------------------------------------------------------------------------------ +// Interface of the logic part of the buffering +// Intention to be able to abstract the underlying implementation and code against the inteface +// e.g. for different complexities of control. +// Couples loosely to IXrdCepgBufferData and anticipated to be called by XrdCephOssBufferedFile. +// Should managage all of the IO and logic to give XrdCephOssBufferedFile only simple commands to call. +// implementations are likely to use (via callbacks?) CephPosix library code for actual reads and writes. +//------------------------------------------------------------------------------ + +#include +#include "IXrdCephBufferData.hh" + +namespace XrdCephBuffer { + +/** + * @brief Manage the actual IO operations that read and write the data into Ceph via librados striper. + * Likely to be provided with a buffer in the concreate implementation's constructor. + * Attempt to decouple the low level IO operations from the buffer implementation. + * However, ight coupling might be strictly necessary, making this class a bit redundant. + * Consider to refactor if this proves to be the case ... + * + */ +class ICephIOAdapter { + public: + virtual ~ICephIOAdapter() {} + virtual ssize_t write(off64_t offset,size_t count) = 0; //!< write from buffer into ceph + virtual ssize_t read(off64_t offset,size_t count) = 0; //!< read from ceph into the buffer + +}; + +} + +#endif diff --git a/src/XrdCeph/XrdCephBuffers/IXrdCephBufferAlg.hh b/src/XrdCeph/XrdCephBuffers/IXrdCephBufferAlg.hh new file mode 100644 index 00000000..432273fa --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/IXrdCephBufferAlg.hh @@ -0,0 +1,47 @@ +#ifndef __IXRD_CEPH_BUFFER_ALG_HH__ +#define __IXRD_CEPH_BUFFER_ALG_HH__ +//------------------------------------------------------------------------------ +// Interface of the logic part of the buffering +// Intention to be able to abstract the underlying implementation and code against the inteface +// e.g. for different complexities of control. +// Couples loosely to IXrdCepgBufferData and anticipated to be called by XrdCephOssBufferedFile. +// Should managage all of the IO and logic to give XrdCephOssBufferedFile only simple commands to call. +// implementations are likely to use (via callbacks?) CephPosix library code for actual reads and writes. +//------------------------------------------------------------------------------ + +#include +#include "IXrdCephBufferData.hh" +#include "ICephIOAdapter.hh" + +class XrdSfsAio; + +namespace XrdCephBuffer { + +/** + * @brief Interface to a holder of the main logic decisions of the buffering algortithm, decoupled from the buffer resource itself. + * Main work of the buffering is done in the classes that inherit from the interace, of how and when and why to buffer and flush the data + * The physical representation of the buffer is not written here to allow for some flexibility of changing the internals of the buffer if needed. + * Anticipate that a non-async and async will be the main distinct use cases. + */ +class IXrdCephBufferAlg { + public: + virtual ~IXrdCephBufferAlg() {} + + virtual ssize_t read_aio (XrdSfsAio *aoip) = 0; //!< possible aio based code + virtual ssize_t write_aio(XrdSfsAio *aoip) = 0; //!< possible aio based code + + virtual ssize_t read (volatile void *buff, off_t offset, size_t blen) = 0; //!< read data through the buffer + virtual ssize_t write(const void *buff, off_t offset, size_t blen) = 0; //!< write data through the buffer + virtual ssize_t flushWriteCache() = 0; //!< remember to flush the cache on final writes + + + protected: + + + private: + +}; + +} + +#endif diff --git a/src/XrdCeph/XrdCephBuffers/IXrdCephBufferData.hh b/src/XrdCeph/XrdCephBuffers/IXrdCephBufferData.hh new file mode 100644 index 00000000..2b242b0d --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/IXrdCephBufferData.hh @@ -0,0 +1,46 @@ +#ifndef __IXRD_CEPH_BUFFER_DATA_HH__ +#define __IXRD_CEPH_BUFFER_DATA_HH__ +//------------------------------------------------------------------------------ +// Interface to the actual buffer data object used to store the data +// Intention to be able to abstract the underlying implementation and code against the inteface +// e.g. if choice of buffer data object +//------------------------------------------------------------------------------ + +#include + +namespace XrdCephBuffer { + +/** + * @brief Interface to the Buffer's physical representation. + * Allow an interface to encapsulate the requirements of a buffer's memory, without worrying about the details. + * Various options exist for the specific buffer implemented, and are left to the sub-classes. + */ +class IXrdCephBufferData { + public: + virtual ~IXrdCephBufferData(){} + virtual size_t capacity() const = 0;//! total available space + virtual size_t length() const = 0;//! Currently occupied and valid space, which may be less than capacity + virtual void setLength(size_t len) =0 ;//! Currently occupied and valid space, which may be less than capacity + virtual bool isValid() const =0; + virtual void setValid(bool isValid) =0; + + virtual off_t startingOffset() const = 0; + virtual off_t setStartingOffset(off_t offset) = 0; + + virtual ssize_t invalidate() = 0; //! set cache into an invalid state + + virtual ssize_t readBuffer(void* buf, off_t offset, size_t blen) const = 0; //! copy data from the internal buffer to buf + + virtual ssize_t writeBuffer(const void* buf, off_t offset, size_t blen,off_t externalOffset) = 0; //! write data into the buffer, store the external offset + + virtual const void* raw() const = 0; // const accessor to the 'raw' or underlying object + virtual void* raw() = 0; // accessor to the 'raw' or underlying object + + + protected: + +}; + +} + +#endif diff --git a/src/XrdCeph/XrdCephBuffers/IXrdCephReadVAdapter.hh b/src/XrdCeph/XrdCephBuffers/IXrdCephReadVAdapter.hh new file mode 100644 index 00000000..7e8361aa --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/IXrdCephReadVAdapter.hh @@ -0,0 +1,45 @@ +#ifndef __IXRD_CEPH_READV_ADAPTER_HH__ +#define __IXRD_CEPH_READV_ADAPTER_HH__ +//------------------------------------------------------------------------------ +// Interface to the actual buffer data object used to store the data +// Intention to be able to abstract the underlying implementation and code against the inteface +// e.g. if choice of buffer data object +//------------------------------------------------------------------------------ + +#include +#include + +#include "BufferUtils.hh" + +#include // #FIXME remove + +namespace XrdCephBuffer +{ + + /** + * @brief Interface to the logic of dealing with readV requests + */ + class IXrdCephReadVAdapter + { + public: + virtual ~IXrdCephReadVAdapter() {} + + /** + * @brief Take in a set of extents representing the readV requests. return a vector of each combined read request. + * Caller translates the readV request into a set of Extents (passed to an ExtentHolder). + * The logic of the specific concrete implementation combines the set of readV requests into merged requests. + * Output is a vector of those requests. Each ExtentHolder element holds the offset and len to be read, and also + * the contained extents of the readVs. + * The index of the readV element is not held, so the caller must ensure to match up appropriately. + * + * @param extentsIn + * @return std::vector + */ + virtual std::vector convert(const ExtentHolder &extentsIn) =0; + + protected: + }; + +} + +#endif diff --git a/src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.cc b/src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.cc new file mode 100644 index 00000000..43c355fb --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.cc @@ -0,0 +1,387 @@ +//------------------------------------------------------------------------------ +//------------------------------------------------------------------------------ + +#include +#include "XrdCephBufferAlgSimple.hh" + +#include "../XrdCephPosix.hh" +#include +#include +#include +#include +#include + +#include "XrdSfs/XrdSfsAio.hh" + + +using namespace XrdCephBuffer; + + +XrdCephBufferAlgSimple::XrdCephBufferAlgSimple(std::unique_ptr buffer, + std::unique_ptr cephio, int fd, + bool useStriperlessReads): +m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd), +m_useStriperlessReads(useStriperlessReads) { + +} + +XrdCephBufferAlgSimple::~XrdCephBufferAlgSimple() { + int prec = std::cout.precision(); + float bytesBuffered = m_stats_bytes_fromceph - m_stats_bytes_bypassed; + float cacheUseFraction = bytesBuffered > 0 ? (1.*(m_stats_bytes_toclient-m_stats_bytes_bypassed)/bytesBuffered) : 1. ; + + BUFLOG("XrdCephBufferAlgSimple::Destructor, fd=" << m_fd + << ", retrieved_bytes=" << m_stats_bytes_fromceph + << ", bypassed_bytes=" << m_stats_bytes_bypassed + << ", delivered_bytes=" << m_stats_bytes_toclient + << std::setprecision(4) + << ", cache_hit_frac=" << cacheUseFraction << std::setprecision(prec)); + m_fd = -1; +} + + +ssize_t XrdCephBufferAlgSimple::read_aio (XrdSfsAio *aoip) { + // Currently this is not supported, and callers using this should recieve the appropriate error code + //return -ENOSYS; + + ssize_t rc(-ENOSYS); + if (!aoip) { + return -EINVAL; + } + + volatile void * buf = aoip->sfsAio.aio_buf; + size_t blen = aoip->sfsAio.aio_nbytes; + off_t offset = aoip->sfsAio.aio_offset; + + // translate the aio read into a simple sync read. + // hopefully don't get too many out of sequence reads to effect the caching + rc = read(buf, offset, blen); + + aoip->Result = rc; + aoip->doneRead(); + + return rc; + +} + +ssize_t XrdCephBufferAlgSimple::write_aio(XrdSfsAio *aoip) { + // Currently this is not supported, and callers using this should recieve the appropriate error code + // return -ENOSYS; + + ssize_t rc(-ENOSYS); + if (!aoip) { + return -EINVAL; + } + + // volatile void * buf = aoip->sfsAio.aio_buf; + // size_t blen = aoip->sfsAio.aio_nbytes; + // off_t offset = aoip->sfsAio.aio_offset; + size_t blen = aoip->sfsAio.aio_nbytes; + off_t offset = aoip->sfsAio.aio_offset; + + rc = write(const_cast(aoip->sfsAio.aio_buf), offset, blen); + aoip->Result = rc; + aoip->doneWrite(); + return rc; + +} + + +ssize_t XrdCephBufferAlgSimple::read(volatile void *buf, off_t offset, size_t blen) { + // Set a lock for any attempt at a simultaneous operation + // Use recursive, as flushCache also calls the lock and don't want to deadlock + // No call to flushCache should happen in a read, but be consistent + // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash{}(std::this_thread::get_id()) << " " << offset << " " << blen); + const std::lock_guard lock(m_data_mutex); // + // BUFLOG("XrdCephBufferAlgSimple::read: postLock: " << std::hash{}(std::this_thread::get_id()) << " " << offset << " " << blen); + + // BUFLOG("XrdCephBufferAlgSimple::read status:" + // << "\n\tRead off/len/end: " << offset << "/" << blen << "/(" << (offset+blen) <<")" + // << "\n\tBuffer: start/length/end/cap: " << m_bufferStartingOffset << "/" << m_bufferLength << "/" + // << (m_bufferStartingOffset + m_bufferLength) << "/" << m_bufferdata->capacity() + // ); + if (blen == 0) return 0; + + /** + * If the requested read is larger than the buffer size, just bypass the cache. + * Invalidate the cache in anycase + */ + if (blen >= m_bufferdata->capacity()) { + //BUFLOG("XrdCephBufferAlgSimple::read: Readthrough cache: fd: " << m_fd + // << " " << offset << " " << blen); + // larger than cache, so read through, and invalidate the cache anyway + m_bufferdata->invalidate(); + m_bufferLength =0; // ensure cached data is set to zero length + // #FIXME JW: const_cast is probably a bit poor. + + ssize_t rc = ceph_posix_maybestriper_pread (m_fd, const_cast(buf), blen, offset, m_useStriperlessReads); + if (rc > 0) { + m_stats_bytes_fromceph += rc; + m_stats_bytes_toclient += rc; + m_stats_bytes_bypassed += rc; + } + return rc; + } + + ssize_t rc(-1); + size_t bytesRemaining = blen; // track how many bytes still need to be read + off_t offsetDelta = 0; + size_t bytesRead = 0; + /** + * In principle, only should ever have the first loop, however, in the case a read request + * passes over the boundary of the buffer, two reads will be needed; the first to read + * out the current buffer, and a second, to read the partial data from the refilled buffer + */ + while (bytesRemaining > 0) { + // BUFLOG("In loop: " << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining << " " << m_bufferLength); + + bool loadCache = false; + // run some checks to see if we need to fill the cache. + if (m_bufferLength == 0) { + // no data in buffer + loadCache = true; + } else if (offset < m_bufferStartingOffset) { + // offset before any cache data + loadCache = true; + } else if (offset >= (off_t) (m_bufferStartingOffset + m_bufferLength) ) { + // offset is beyond the stored data + loadCache = true; + } else if ((offset - m_bufferStartingOffset + offsetDelta) >= (off_t)m_bufferLength) { + // we have now read to the end of the buffers data + loadCache = true; + } + + /** + * @brief If we need to load data in the cache, do it here. + * + */ + if (loadCache) { + // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash{}(std::this_thread::get_id()) << " " << "Filling the cache"); + m_bufferdata->invalidate(); + m_bufferLength =0; // set lengh of data stored to 0 + rc = m_cephio->read(offset + offsetDelta, m_bufferdata->capacity()); // fill the cache + // BUFLOG("LoadCache ReadToCache: " << rc << " " << offset + offsetDelta << " " << m_bufferdata->capacity() ); + if (rc < 0) { + BUFLOG("LoadCache Error: " << rc); + return rc;// TODO return correct errors + } + m_stats_bytes_fromceph += rc; + m_bufferStartingOffset = offset + offsetDelta; + m_bufferLength = rc; + if (rc == 0) { + // We should be at the end of file, with nothing more to read, and nothing that could be returned + // break out of the loop. + break; + } + } + + + //now read as much data as possible + off_t bufPosition = offset + offsetDelta - m_bufferStartingOffset; + rc = m_bufferdata->readBuffer( (void*) &(((char*)buf)[offsetDelta]) , bufPosition , bytesRemaining); + // BUFLOG("Fill result: " << offsetDelta << " " << bufPosition << " " << bytesRemaining << " " << rc) + if (rc < 0 ) { + BUFLOG("Reading from Cache Failed: " << rc << " " << offset << " " + << offsetDelta << " " << m_bufferStartingOffset << " " + << bufPosition << " " + << bytesRemaining ); + return rc; // TODO return correct errors + } + if (rc == 0) { + // no bytes returned; much be at end of file + //BUFLOG("No bytes returned: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining); + break; // leave the loop even though bytesremaing is probably >=0. + //i.e. requested a full buffers worth, but only a fraction of the file is here. + } + m_stats_bytes_toclient += rc; + // BUFLOG("End of loop: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining); + offsetDelta += rc; + bytesRemaining -= rc; + bytesRead += rc; + + } // while bytesremaing + + return bytesRead; +} + +ssize_t XrdCephBufferAlgSimple::write (const void *buf, off_t offset, size_t blen) { + // Set a lock for any attempt at a simultaneous operation + // Use recursive, as flushCache also calls the lock and don't want to deadlock + const std::lock_guard lock(m_data_mutex); + + // take the data in buf and put it into the cache; when the cache is full, write to underlying storage + // remember to flush the cache at the end of operations ... + ssize_t rc(-1); + ssize_t bytesWrittenToStorage(0); + + if (blen == 0) { + return 0; // nothing to write; are we done? + } + + /** + * We expect the next write to be in order and well defined. + * Determine the expected offset, and compare against offset provided + * Expected offset is the end of the buffer. + * m_bufferStartingOffset is the represented offset in ceph that buffer[0] represents + */ + off_t expected_offset = (off_t)(m_bufferStartingOffset + m_bufferLength); + + if ((offset != expected_offset) && (m_bufferLength > 0) ) { + // for the moment we just log that there is some non expected offset value + // TODO, might be dangerous to flush the cache on non-aligned writes ... + BUFLOG("Non expected offset: " << rc << " " << offset << " " << expected_offset); + // rc = flushWriteCache(); + // if (rc < 0) { + // return rc; // TODO return correct errors + // } + } // mismatched offset + + //! We should be equally careful if the offset of the buffer start is not aligned sensibly. + //! Log this only for now, but #TODO, this should be come an error condition for over cautitious behaviour. + if ( (m_bufferStartingOffset % m_bufferdata->capacity()) != 0 ) { + BUFLOG(" Non aligned offset?" << m_bufferStartingOffset << " " + << m_bufferdata->capacity() << " " << m_bufferStartingOffset % m_bufferdata->capacity() ); + } + + // Commmented out below. It would be good to pass writes, which are larger than the buffer size, + // straight-through. However if the ranges are not well aligned, this could be an issue. + // And, what then to do about a possible partial filled buffer? + + // if (blen >= m_bufferdata->capacity()) { + // // TODO, might be dangerous to flush the cache on non-aligned writes ... + // // flush the cache now, if needed + // rc = flushWriteCache(); + // if (rc < 0) { + // return rc; // TODO return correct errors + // } + // bytesWrittenToStorage += rc; + + // // Size is larger than the buffer; send the write straight through + // std::clog << "XrdCephBufferAlgSimple::write: Readthrough cache: fd: " << m_fd + // << " " << offset << " " << blen << std::endl; + // // larger than cache, so read through, and invalidate the cache anyway + // m_bufferdata->invalidate(); + // m_bufferLength=0; + // m_bufferStartingOffset=0; + // rc = ceph_posix_pwrite(m_fd, buf, blen, offset); + // if (rc < 0) { + // return rc; // TODO return correct errors + // } + // bytesWrittenToStorage += rc; + // return rc; + // } + + /** + * @brief Provide some sanity checking for the write to the buffer. + * We call an error on this conditions as there is no immediate solution that is satisfactory. + */ + if ((offset != expected_offset) && (m_bufferLength > 0) ) { + BUFLOG("Error trying to write out of order: expeted at: " << expected_offset + << " got offset" << offset << " of len " << blen); + return -EINVAL; + } + if (offset < 0) { + BUFLOG("Got a negative offset: " << offset); + return -EINVAL; + } + + + size_t bytesRemaining = blen; //!< track how many bytes left to write + size_t bytesWritten = 0; + + /** Typically would expect only one loop, i.e. the write request is smaller than the buffer. + * If bigger, or the request stradles the end of the buffer, will need another loop + */ + while (bytesRemaining > 0) { + /** + * If the cache is already full, lets flush to disk now + */ + if (m_bufferLength == m_bufferdata->capacity()) { + rc = flushWriteCache(); + if (rc < 0) { + return rc; + } + bytesWrittenToStorage += rc; + } // at capacity; + + if (m_bufferLength == 0) { + // cache is currently empty, so set the 'reference' to the external offset now + m_bufferStartingOffset = offset + bytesWritten; + } + //add data to the cache from buf, from buf[offsetDelta] to the cache at position m_bufferLength + // make sure to write only as many bytes as left in the cache. + size_t nBytesToWrite = std::min(bytesRemaining, m_bufferdata->capacity()-m_bufferLength); + const void* bufAtOffset = (void*)((char*)buf + bytesWritten); // nasty cast as void* doesn't do arithmetic + if (nBytesToWrite == 0) { + BUFLOG( "Wanting to write 0 bytes; why is that?"); + } + rc = m_bufferdata->writeBuffer(bufAtOffset, m_bufferLength, nBytesToWrite, 0); + if (rc < 0) { + BUFLOG( "WriteBuffer step failed: " << rc << " " << m_bufferLength << " " << blen << " " << offset ); + return rc; // pass the error condidition upwards + } + if (rc != (ssize_t)nBytesToWrite) { + BUFLOG( "WriteBuffer returned unexpected number of bytes: " << rc << " Expected: " << nBytesToWrite << " " + << m_bufferLength << " " << blen << " " << offset ); + return -EBADE; // is bad exchange error best errno here? + } + + // lots of repetition here; #TODO try to reduce + m_bufferLength += rc; + bytesWritten += rc; + bytesRemaining -= rc; + + } // while byteRemaining + + /** + * @brief Check again if we can write data into the storage + */ + if (m_bufferLength == m_bufferdata->capacity()){ + rc = flushWriteCache(); + if (rc < 0) + { + return rc; // TODO return correct errors + } + bytesWrittenToStorage += rc; + } // at capacity; + + //BUFLOG( "WriteBuffer " << bytesWritten << " " << bytesWrittenToStorage << " " << offset << " " << blen << " " ); + return bytesWritten; +} + + + +ssize_t XrdCephBufferAlgSimple::flushWriteCache() { + // Set a lock for any attempt at a simultaneous operation + // Use recursive, as write (and read) also calls the lock and don't want to deadlock + const std::lock_guard lock(m_data_mutex); // + // BUFLOG("flushWriteCache: " << m_bufferStartingOffset << " " << m_bufferLength); + ssize_t rc(-1); + if (m_bufferLength == 0) { + BUFLOG("Empty buffer to flush: "); + rc = 0; // not an issue + } + + if (m_bufferLength > 0) { + rc = m_cephio->write(m_bufferStartingOffset, m_bufferLength); + if (rc < 0) { + BUFLOG("WriteBuffer write step failed: " << rc); + } + } // some bytes to write + + // reset values + m_bufferLength=0; + m_bufferStartingOffset=0; + m_bufferdata->invalidate(); + // return bytes written, or errorcode if failure + return rc; +} + + +ssize_t XrdCephBufferAlgSimple::rawRead (void *buf, off_t offset, size_t blen) { + return -ENOSYS; +} + +ssize_t XrdCephBufferAlgSimple::rawWrite(void *buf, off_t offset, size_t blen) { + return -ENOSYS; +} diff --git a/src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.hh b/src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.hh new file mode 100644 index 00000000..e96bd401 --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.hh @@ -0,0 +1,67 @@ +#ifndef __XRD_CEPH_BUFFER_ALG_SIMPLE_HH__ +#define __XRD_CEPH_BUFFER_ALG_SIMPLE_HH__ +//------------------------------------------------------------------------------ +// Implementation of the logic section of buffer code +//------------------------------------------------------------------------------ + +#include +#include +#include + +#include "IXrdCephBufferAlg.hh" +#include "ICephIOAdapter.hh" +#include "BufferUtils.hh" + + +namespace XrdCephBuffer { + +/** Non-async buffering code for non-aio read operations. + * Create a single buffer of a given size. + * For reads, if data in the buffer read and return the available bytes; + * if no useful data in the buffer fill the full buffer and return the requested read. + * If the data is partially in the buffer for the range requested, return only that subset; + * client should check and make an additional call for the data not returned. + * if 0 bytes are returned, it should be assumed it is at the end of the file. + */ + +class XrdCephBufferAlgSimple : public virtual IXrdCephBufferAlg { + public: + XrdCephBufferAlgSimple(std::unique_ptr buffer, std::unique_ptr cephio, int fd, + bool useStriperlessReads = true ); + virtual ~XrdCephBufferAlgSimple(); + + virtual ssize_t read_aio (XrdSfsAio *aoip) override; + virtual ssize_t write_aio(XrdSfsAio *aoip) override; + + + virtual ssize_t read (volatile void *buff, off_t offset, size_t blen) override; + virtual ssize_t write(const void *buff, off_t offset, size_t blen) override; + virtual ssize_t flushWriteCache() override; + + // #REVIEW + virtual const IXrdCephBufferData *buffer() const {return m_bufferdata.get();} + virtual IXrdCephBufferData *buffer() {return m_bufferdata.get();} + + protected: + virtual ssize_t rawRead (void *buff, off_t offset, size_t blen) ; // read from the storage, at its offset + virtual ssize_t rawWrite(void *buff, off_t offset, size_t blen) ; // write to the storage, to its offset posiiton + + private: + std::unique_ptr m_bufferdata; //! this algorithm takes ownership of the buffer, and will delete it on destruction + std::unique_ptr m_cephio ; // no ownership is taken here + int m_fd = -1; + bool m_useStriperlessReads {true}; + + off_t m_bufferStartingOffset = 0; + size_t m_bufferLength = 0; + + std::recursive_mutex m_data_mutex; // any data access method on the buffer will use this + + long m_stats_bytes_fromceph{0}; //! number of bytes requested from ceph, to fill the buffers, etc. + long m_stats_bytes_bypassed{0}; //! number of bytes specifically bypassed + long m_stats_bytes_toclient{0}; //! number of bytes requested by the client +}; + +} + +#endif diff --git a/src/XrdCeph/XrdCephBuffers/XrdCephBufferDataSimple.cc b/src/XrdCeph/XrdCephBuffers/XrdCephBufferDataSimple.cc new file mode 100644 index 00000000..fc9ed443 --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/XrdCephBufferDataSimple.cc @@ -0,0 +1,174 @@ +//------------------------------------------------------------------------------ +//! is a simple implementation of IXrdCephBufferData using std::vector representation for the buffer +//------------------------------------------------------------------------------ + +#include "XrdCephBufferDataSimple.hh" +#include "BufferUtils.hh" +//#include "XrdCeph/XrdCephBuffers/IXrdCephBufferData.hh" +#include +#include +#include +#include +#include +#include + + +using namespace XrdCephBuffer; + +std::atomic XrdCephBufferDataSimple::m_total_memory_used {0}; //!< total memory of all these buffers +std::atomic XrdCephBufferDataSimple::m_total_memory_nbuffers {0}; //!< total number of buffers actively open + + + +XrdCephBufferDataSimple::XrdCephBufferDataSimple(size_t bufCapacity): + m_bufferSize(bufCapacity), m_buffer(bufCapacity,0), m_externalOffset(0),m_bufLength(0) { + m_valid = true; + + // update global statistics + m_total_memory_used.fetch_add(bufCapacity); + ++m_total_memory_nbuffers; + BUFLOG("XrdCephBufferDataSimple: Global: " << m_total_memory_nbuffers.load() << " " << m_total_memory_used.load()); +} + +XrdCephBufferDataSimple::~XrdCephBufferDataSimple() { + m_valid = false; + // obtain the actual capacity here, as this is the real number of bytes to be released + auto cap = m_buffer.capacity(); + m_buffer.clear(); + m_buffer.reserve(0); // just to be paranoid and realse memory immediately + + // update global statistics + m_total_memory_used.fetch_add(-cap); + --m_total_memory_nbuffers; + BUFLOG("XrdCephBufferDataSimple~: Global: " << m_total_memory_nbuffers.load() << " " << m_total_memory_used.load()); + +} + + +size_t XrdCephBufferDataSimple::capacity() const { + // return defined buffered size, which might in principle be different + // to the actual size of the buffer allocated in memory + return m_bufferSize; +} + +size_t XrdCephBufferDataSimple::length() const { + return m_bufLength; +} +void XrdCephBufferDataSimple::setLength(size_t len) { + m_bufLength = len; +} +bool XrdCephBufferDataSimple::isValid() const { + return m_valid; +} +void XrdCephBufferDataSimple::setValid(bool isValid) { + m_valid = isValid; +} + + +off_t XrdCephBufferDataSimple::startingOffset() const { + return m_externalOffset; +} +off_t XrdCephBufferDataSimple::setStartingOffset(off_t offset) { + m_externalOffset = offset; + return m_externalOffset; +} + +ssize_t XrdCephBufferDataSimple::invalidate() { + m_externalOffset = 0; + m_bufLength = 0; + m_valid = false; + //m_buffer.clear(); // do we really need to clear the elements ? + return 0; +} + + + +ssize_t XrdCephBufferDataSimple::readBuffer(void* buf, off_t offset, size_t blen) const { + // read from the internal buffer to buf (at pos 0), from offset for blen, or max length possible + // returns -ve value on error, else the actual number of bytes read + + if (!m_valid) { + return -EINVAL; + } + if (offset < 0) { + return -EINVAL; + } + if (offset > (ssize_t) m_bufLength) { + return 0; + } + ssize_t readlength = blen; + if (offset + blen > m_bufLength) { + readlength = m_bufLength - offset; + } + //std::cout << readlength << " " << blen << " " << m_bufLength << " " << offset << std::endl; + if (readlength <0) { + return -EINVAL; + } + + if (readlength == 0) { + return 0; + } + + const char* rawbufstart = m_buffer.data(); + + long int_ns{0}; + {auto t = Timer_ns(int_ns); + // std::copy(rawbufstart + offset, rawbufstart+offset+readlength, reinterpret_cast(buf) ); + memcpy(reinterpret_cast(buf), rawbufstart + offset, readlength); + } // end Timer + // BUFLOG("XrdCephBufferDataSimple::readBuffer: " << offset << " " << readlength << " " << int_ns ); + + return readlength; +} + + +ssize_t XrdCephBufferDataSimple::writeBuffer(const void* buf, off_t offset, size_t blen, off_t externalOffset) { + // write data from buf (from pos 0), with length blen, into the buffer at position offset (local to the internal buffer) + + // #TODO Add test to see if it's in use + //invalidate(); + + if (offset < 0) { + BUFLOG("XrdCephBufferDataSimple::writeBuffer: offset <0"); + return -EINVAL; + } + + ssize_t cap = capacity(); + if ((ssize_t)blen > cap) { + BUFLOG("XrdCephBufferDataSimple::writeBuffer: blen > cap:" << blen << " > " << cap); + return -EINVAL; + } + if ((ssize_t)offset > cap) { + BUFLOG("XrdCephBufferDataSimple::writeBuffer: offset > cap:" << offset << " > " << cap); + return -EINVAL; + } + if (ssize_t(offset + blen) > cap) { + BUFLOG("XrdCephBufferDataSimple::writeBuffer: (offset + blen) > cap: (" << offset << " + " << blen << ") >" << cap); + return -EINVAL; + } + + // std::vector::iterator itstart = m_buffer.begin(); + size_t readBytes = blen; + char* rawbufstart = m_buffer.data(); + + + long int_ns{0}; + {auto t = Timer_ns(int_ns); // brace for timer start/stop scoping + //std::copy((char*)buf, (char*)buf +readBytes ,itstart + offset ); + memcpy(rawbufstart + offset, buf, readBytes); + + } // end Timer + + // BUFLOG("XrdCephBufferDataSimple::writeBuffer: " << offset << " " << readBytes << " " << int_ns); + + + + m_externalOffset = externalOffset; + // Decide to set the length of the maximum value that has be written + // note; unless invalidate is called, then this value may not be correctly set ... + m_bufLength = std::max(offset+blen, m_bufLength); + m_valid = true; + + + return readBytes; +} diff --git a/src/XrdCeph/XrdCephBuffers/XrdCephBufferDataSimple.hh b/src/XrdCeph/XrdCephBuffers/XrdCephBufferDataSimple.hh new file mode 100644 index 00000000..ca3fe8cc --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/XrdCephBufferDataSimple.hh @@ -0,0 +1,67 @@ +#ifndef __XRD_CEPH_BUFFER_DATA_SIMPLE_HH__ +#define __XRD_CEPH_BUFFER_DATA_SIMPLE_HH__ +//------------------------------------------------------------------------------ +//! is a simple implementation of IXrdCephBufferData using std::vector representation for the buffer +//------------------------------------------------------------------------------ + +#include +#include "IXrdCephBufferData.hh" +#include "BufferUtils.hh" +#include +#include +#include + +namespace XrdCephBuffer { + +/** + * @brief Implementation of a buffer using a simple vector + * Simplest implementation of a buffer using vector for underlying memory. + * Capacity is reserved on construction and released back at destruction. + * Does very little itself, except to provide access methods + * + */ +class XrdCephBufferDataSimple : public virtual IXrdCephBufferData + { + public: + XrdCephBufferDataSimple(size_t bufCapacity); + virtual ~XrdCephBufferDataSimple(); + virtual size_t capacity() const override;//! total available space + virtual size_t length() const override;//! Currently occupied and valid space, which may be less than capacity + virtual void setLength(size_t len) override;//! Currently occupied and valid space, which may be less than capacity + virtual bool isValid() const override; + virtual void setValid(bool isValid) override; + + virtual off_t startingOffset() const override; + virtual off_t setStartingOffset(off_t offset) override; + + + virtual ssize_t readBuffer(void* buf, off_t offset, size_t blen) const override; //! copy data from the internal buffer to buf + + virtual ssize_t invalidate() override; //! set cache into an invalid state; do this before writes to be consistent + virtual ssize_t writeBuffer(const void* buf, off_t offset, size_t blen, off_t externalOffset=0) override; //! write data into the buffer, store the external offset if provided + + virtual const void* raw() const override {return capacity() > 0 ? &(m_buffer[0]) : nullptr;} + virtual void* raw() override {return capacity() > 0 ? &(m_buffer[0]) : nullptr;} + + + protected: + size_t m_bufferSize; //! the buffer size + bool m_valid = false; + std::vector m_buffer; // actual physical buffer + off_t m_externalOffset = 0; //! what does the first byte of the buffer map to for external offsets + size_t m_bufLength = 0; //! length of valid stored data; might be less than the capacity + + // timer and counter info + std::atomic< long> m_stats_read_timer{0}, m_stats_write_timer{0}; + std::atomic< long> m_stats_read_bytes{0}, m_stats_write_bytes{0}; + std::atomic< long> m_stats_read_req{0}, m_stats_write_req{0}; + long m_stats_read_longest{0}, m_stats_write_longest{0}; + + // staric vars to store the total useage of memory across this class + static std::atomic m_total_memory_used; + static std::atomic m_total_memory_nbuffers; + +}; // XrdCephBufferDataSimple + +} // namespace +#endif diff --git a/src/XrdCeph/XrdCephBuffers/XrdCephReadVBasic.cc b/src/XrdCeph/XrdCephBuffers/XrdCephReadVBasic.cc new file mode 100644 index 00000000..2cd578db --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/XrdCephReadVBasic.cc @@ -0,0 +1,65 @@ + +#include "XrdCephReadVBasic.hh" +#include "BufferUtils.hh" + +using namespace XrdCephBuffer; + + +XrdCephReadVBasic::~XrdCephReadVBasic() { + + size_t totalBytes = m_usedBytes + m_wastedBytes; + float goodFrac_pct = totalBytes > 0 ? m_usedBytes/(totalBytes*100.) : 0; + BUFLOG("XrdCephReadVBasic: Summary: " + << " Used: " << m_usedBytes << " Wasted: " << m_wastedBytes << " goodFrac: " + << goodFrac_pct + ); +} + +std::vector XrdCephReadVBasic::convert(const ExtentHolder &extentsHolderInput) +{ + std::vector outputs; + + const ExtentContainer &extentsIn = extentsHolderInput.extents(); + + ExtentContainer::const_iterator it_l = extentsIn.begin(); + ExtentContainer::const_iterator it_r = extentsIn.begin(); + ExtentContainer::const_iterator it_end = extentsIn.end(); + + // Shortcut the process if range is small + if ((it_end->end() - it_l->begin()) <= m_minSize) { + ExtentHolder tmp(extentsIn); + outputs.push_back(tmp); + BUFLOG("XrdCephReadVBasic: Combine all extents: " + << tmp.size() << " " + << it_l->begin() << " " << it_end->end() ); + return outputs; + } + size_t usedBytes(0); + size_t wastedBytes(0); + + // outer loop over extents + while (it_r != it_end) + { + ExtentHolder tmp; + int counter(0); + it_l = it_r; + // inner loop over each internal extent range + while (it_r != it_end) { + if ((it_r->end() - it_l->begin()) > m_maxSize) break; // start a new holder + tmp.push_back(*it_r); // just put it into an extent + ++it_r; + ++counter; + } + outputs.push_back(tmp); + usedBytes += tmp.bytesContained(); + wastedBytes += tmp.bytesMissing(); + } + m_usedBytes += usedBytes; + m_wastedBytes += wastedBytes; + BUFLOG("XrdCephReadVBasic: In size: " << extentsHolderInput.size() << " " + << extentsHolderInput.extents().size() << " " << outputs.size() << " " + << " useful bytes: " << usedBytes << " wasted bytes:" << wastedBytes); + + + return outputs; +} // convert diff --git a/src/XrdCeph/XrdCephBuffers/XrdCephReadVBasic.hh b/src/XrdCeph/XrdCephBuffers/XrdCephReadVBasic.hh new file mode 100644 index 00000000..662b9931 --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/XrdCephReadVBasic.hh @@ -0,0 +1,48 @@ +#ifndef __IXRD_CEPH_READV_BASIC_HH__ +#define __IXRD_CEPH_READV_BASIC_HH__ +//------------------------------------------------------------------------------ +// Interface to the actual buffer data object used to store the data +// Intention to be able to abstract the underlying implementation and code against the inteface +// e.g. if choice of buffer data object +//------------------------------------------------------------------------------ + +#include +#include + +#include "BufferUtils.hh" +#include "IXrdCephReadVAdapter.hh" + +namespace XrdCephBuffer +{ + + /** + * @brief Combine requests into single reads accoriding to some basic rules. + * Read a minimum amount of data (2MiB default), keep adding chunks until the used fraction is lower than some threshold, or 64MiB is reached. + * Calling code unraveles the correct ranges for each + */ + + + class XrdCephReadVBasic : virtual public IXrdCephReadVAdapter { + // nothing more than readV in, and readV out + public: + XrdCephReadVBasic() {} + virtual ~XrdCephReadVBasic(); + + virtual std::vector convert(const ExtentHolder &extentsHolderInput) override; + + protected: + ssize_t m_minSize = 2*1024*1024; + ssize_t m_maxSize = 16*1024*1024; + + private: + size_t m_usedBytes = 0; + size_t m_wastedBytes = 0; + + + }; + + + +} + +#endif diff --git a/src/XrdCeph/XrdCephBuffers/XrdCephReadVNoOp.cc b/src/XrdCeph/XrdCephBuffers/XrdCephReadVNoOp.cc new file mode 100644 index 00000000..8c5617f8 --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/XrdCephReadVNoOp.cc @@ -0,0 +1,22 @@ + +#include "XrdCephReadVNoOp.hh" +#include "BufferUtils.hh" + +using namespace XrdCephBuffer; + +std::vector XrdCephReadVNoOp::convert(const ExtentHolder &extentsHolderInput) +{ + std::vector outputs; + + const ExtentContainer &extentsIn = extentsHolderInput.extents(); + + for (ExtentContainer::const_iterator it = extentsIn.begin(); it != extentsIn.end(); ++it) + { + ExtentHolder tmp; + tmp.push_back(*it); + outputs.push_back(tmp); + } // for + // each element in the output contains one element, the + + return outputs; +} // convert diff --git a/src/XrdCeph/XrdCephBuffers/XrdCephReadVNoOp.hh b/src/XrdCeph/XrdCephBuffers/XrdCephReadVNoOp.hh new file mode 100644 index 00000000..9344d51c --- /dev/null +++ b/src/XrdCeph/XrdCephBuffers/XrdCephReadVNoOp.hh @@ -0,0 +1,38 @@ +#ifndef __IXRD_CEPH_READV_NOOP_HH__ +#define __IXRD_CEPH_READV_NOOP_HH__ +//------------------------------------------------------------------------------ +// Interface to the actual buffer data object used to store the data +// Intention to be able to abstract the underlying implementation and code against the inteface +// e.g. if choice of buffer data object +//------------------------------------------------------------------------------ + +#include +#include + +#include "BufferUtils.hh" +#include "IXrdCephReadVAdapter.hh" + +namespace XrdCephBuffer +{ + + /** + * @brief Passthrough implementation. Convertes the ReadV requests to extents and makes the request. + * Does not change how the readV implementation is done, just implements a version with Extents + * More for functionality testing, or to allow easier access to readV statistics. + */ + class XrdCephReadVNoOp : virtual public IXrdCephReadVAdapter { + // nothing more than readV in, and readV out + public: + XrdCephReadVNoOp() {} + virtual ~XrdCephReadVNoOp() {} + + virtual std::vector convert(const ExtentHolder &extentsHolderInput) override; + + protected: + }; + + + +} + +#endif diff --git a/src/XrdCeph/XrdCephOss.cc b/src/XrdCeph/XrdCephOss.cc index 66ccadc2..4bcd3da9 100644 --- a/src/XrdCeph/XrdCephOss.cc +++ b/src/XrdCeph/XrdCephOss.cc @@ -30,6 +30,7 @@ #include "XrdCeph/XrdCephPosix.hh" #include "XrdOuc/XrdOucEnv.hh" #include "XrdSys/XrdSysError.hh" +#include "XrdSys/XrdSysPlatform.hh" #include "XrdOuc/XrdOucTrace.hh" #include "XrdOuc/XrdOucStream.hh" #include "XrdOuc/XrdOucName2Name.hh" @@ -42,6 +43,8 @@ #include "XrdCeph/XrdCephOss.hh" #include "XrdCeph/XrdCephOssDir.hh" #include "XrdCeph/XrdCephOssFile.hh" +#include "XrdCeph/XrdCephOssBufferedFile.hh" +#include "XrdCeph/XrdCephOssReadVFile.hh" XrdVERSIONINFO(XrdOssGetStorageSystem, XrdCephOss); @@ -68,10 +71,6 @@ static void logwrapper(char *format, va_list argp) { /// used in XrdCephPosix extern XrdOucName2Name *g_namelib; -// -// To-do: find the include file defining MAXPATHLEN -// -#define MAXPATHLEN 4096 /// converts a logical filename to physical one if needed void m_translateFileName(std::string &physName, std::string logName){ @@ -223,16 +222,6 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { } } - if (!strcmp(var, "ceph.reportingpools")) { - var = Config.GetWord(); - if (var) { - m_configPoolnames = var; - } else { - Eroute.Emsg("Config", "Missing value for ceph.reportingpools in config file", configfn); - return 1; - } - } - int pread_flag_set = !strncmp(var, "ceph.usedefaultpreadalg", 24); int readv_flag_set = !strncmp(var, "ceph.usedefaultreadvalg", 24); if (pread_flag_set or readv_flag_set) { @@ -272,8 +261,112 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { return 1; } } - } + + if (!strncmp(var, "ceph.usebuffer", 14)) { // allowable values: 0, 1 + var = Config.GetWord(); + if (var) { + unsigned long value = strtoul(var, 0, 10); + if (value <= 1) { + m_configBufferEnable = value; + Eroute.Emsg("Config", "ceph.usebuffer",std::to_string(m_configBufferEnable).c_str()); + } else { + Eroute.Emsg("Config", "Invalid value for ceph.usebuffer in config file (must be 0 or 1)", configfn, var); + return 1; + } + } else { + Eroute.Emsg("Config", "Missing value for ceph.usebuffer in config file", configfn); + return 1; + } + } // usebuffer + if (!strncmp(var, "ceph.buffersize", 15)) { // size in bytes + var = Config.GetWord(); + if (var) { + unsigned long value = strtoul(var, 0, 10); + if (value > 0 and value <= 1000000000L) { + m_configBufferSize = value; + Eroute.Emsg("Config", "ceph.buffersize", std::to_string(m_configBufferSize).c_str() ); + } else { + Eroute.Emsg("Config", "Invalid value for ceph.buffersize in config file; enter in bytes (no units)", configfn, var); + return 1; + } + } else { + Eroute.Emsg("Config", "Missing value for ceph.buffersize in config file", configfn); + return 1; + } + } // buffersize + if (!strncmp(var, "ceph.buffermaxpersimul", 22)) { // size in bytes + var = Config.GetWord(); + if (var) { + unsigned long value = strtoul(var, 0, 10); + if (value > 0 and value <= 1000000000L) { + m_configMaxSimulBufferCount = value; + Eroute.Emsg("Config", "ceph.buffermaxpersimul", std::to_string(m_configMaxSimulBufferCount).c_str() ); + } else { + Eroute.Emsg("Config", "Invalid value for ceph.buffermaxpersimul in config file; enter in bytes (no units)", configfn, var); + return 1; + } + } else { + Eroute.Emsg("Config", "Missing value for ceph.buffermaxpersimul in config file", configfn); + return 1; + } + } // buffersize + if (!strncmp(var, "ceph.usereadv", 13)) { // allowable values: 0, 1 + var = Config.GetWord(); + if (var) { + unsigned long value = strtoul(var, 0, 10); + if (value <= 1) { + m_configReadVEnable = value; + Eroute.Emsg("Config", "ceph.usereadvalg",std::to_string(m_configBufferEnable).c_str()); + } else { + Eroute.Emsg("Config", "Invalid value for ceph.usereadv in config file (must be 0 or 1)", configfn, var); + return 1; + } + } else { + Eroute.Emsg("Config", "Missing value for ceph.usereadv in config file", configfn); + return 1; + } + } // usereadv + if (!strncmp(var, "ceph.readvalgname", 17)) { + var = Config.GetWord(); + // Eroute.Emsg("Config", "readvalgname readvalgname readvalgname readvalgname", var); + if (var) { + // Warn in case parameters were givne + char parms[1040]; + if (!Config.GetRest(parms, sizeof(parms)) || parms[0]) { + Eroute.Emsg("Config", "readvalgname parameters will be ignored"); + } + m_configReadVAlgName = var; + } else { + Eroute.Emsg("Config", "Missing value for ceph.readvalgname in config file", configfn); + return 1; + } + } + if (!strncmp(var, "ceph.bufferiomode", 17)) { + var = Config.GetWord(); + if (var) { + // Warn in case parameters were givne + char parms[1040]; + if (!Config.GetRest(parms, sizeof(parms)) || parms[0]) { + Eroute.Emsg("Config", "readvalgname parameters will be ignored"); + } + m_configBufferIOmode = var; // allowed values would be aio, io + } else { + Eroute.Emsg("Config", "Missing value for ceph.bufferiomode in config file", configfn); + return 1; + } + } + + if (!strcmp(var, "ceph.reportingpools")) { + var = Config.GetWord(); + if (var) { + m_configPoolnames = var; + } else { + Eroute.Emsg("Config", "Missing value for ceph.reportingpools in config file", configfn); + return 1; + } + } + } // Now check if any errors occured during file i/o int retc = Config.LastError(); if (retc) { @@ -313,6 +406,32 @@ int XrdCephOss::Rename(const char *from, return -ENOTSUP; } +/** + * + + * @brief Extract a pool name (string before the first colon ':') from an object ID. + * @param (in) possPool the object ID + * @return pool name or unchanged object ID + * + * Implementation: + * Ian Johnson STFC RAL, ian.johnson@stfc.ac.uk, 2022 + * + */ + +std::string extractPool(std::string possPool) { + + std::string pool; + auto colonPos = possPool.find_first_of(':'); + + if (colonPos > 0) { + pool = possPool.substr(0, colonPos); + } else { + pool = possPool; + } + return pool; +} + + /** * * Populate a struct stat* with information on an object ID. @@ -343,14 +462,22 @@ int XrdCephOss::Stat(const char* path, m_translateFileName(spath,path); if (spath.back() == '/') { // Request to stat the root + + #ifdef STAT_TRACE XrdCephEroute.Say(__FUNCTION__, " - fake a return for stat'ing root element '/'"); #endif + + // special case of a stat made by the locate interface // we intend to then list all files memset(buff, 0, sizeof(*buff)); - buff->st_mode = S_IFDIR | 0700; + + buff->st_mode = S_IFDIR|S_IRWXU; + buff->st_dev = 1; + buff->st_ino = 1; + return XrdOssOK; } @@ -372,12 +499,24 @@ int XrdCephOss::Stat(const char* path, return -EINVAL; } + + } else if (ceph_posix_stat(env, path, buff) == 0) { // Found object ID + +#ifdef STAT_TRACE + XrdCephEroute.Say(__FUNCTION__, " - found object ", spath.c_str(), " via ceph_posix_stat"); +#endif + return XrdOssOK; + } else { + #ifdef STAT_TRACE - XrdCephEroute.Say(__FUNCTION__, " passing to ceph_posix_stat... "); + XrdCephEroute.Say(__FUNCTION__, " - cannot find object '", spath.c_str(), "'"); #endif - return ceph_posix_stat(env, path, buff); - } + return -ENOENT; + + } + + } @@ -438,17 +577,26 @@ int formatStatLSResponse(char *buff, int &blen, const char* cgroup, long long to */ -int XrdCephOss::StatLS(XrdOucEnv &env, const char *path, char *buff, int &blen) + +int XrdCephOss::StatLS(XrdOucEnv &env, const char *charPath, char *buff, int &blen) { - XrdCephEroute.Say(__FUNCTION__, " path = ", path); + XrdCephEroute.Say(__FUNCTION__, " incoming path = ", charPath); + + std::string path({charPath}); + path = extractPool(path); std::string spath {path}; + m_translateFileName(spath,path); +// +// Following test is now redundant as we take the substring up to colonPos +// + if (spath.back() == ':') { spath.pop_back(); } if (m_configPoolnames.find(spath) == std::string::npos) { - XrdCephEroute.Say("Can't report on ", path); + XrdCephEroute.Say("Can't report on ", spath.c_str()); return -EINVAL; } @@ -474,7 +622,7 @@ int XrdCephOss::StatLS(XrdOucEnv &env, const char *path, char *buff, int &blen) freeSpace = totalSpace - usedSpace; blen = formatStatLSResponse(buff, blen, - path, /* "oss.cgroup" */ + spath.c_str(), /* "oss.cgroup" */ totalSpace, /* "oss.space" */ usedSpace, /* "oss.used" */ freeSpace, /* "oss.free" */ @@ -512,6 +660,22 @@ XrdOssDF* XrdCephOss::newDir(const char *tident) { } XrdOssDF* XrdCephOss::newFile(const char *tident) { - return new XrdCephOssFile(this); + + // Depending on the configuration settings stack up the underlying + // XrdCephOssFile instance with decorator objects for readV and Buffering requests + + XrdCephOssFile* xrdCephOssDF = new XrdCephOssFile(this); + + if (m_configReadVEnable) { + xrdCephOssDF = new XrdCephOssReadVFile(this,xrdCephOssDF,m_configReadVAlgName); + } + + if (m_configBufferEnable) { + xrdCephOssDF = new XrdCephOssBufferedFile(this,xrdCephOssDF, m_configBufferSize, + m_configBufferIOmode, m_configMaxSimulBufferCount); + } + + + return xrdCephOssDF; } diff --git a/src/XrdCeph/XrdCephOss.hh b/src/XrdCeph/XrdCephOss.hh index 4bcf7a32..a54040cb 100644 --- a/src/XrdCeph/XrdCephOss.hh +++ b/src/XrdCeph/XrdCephOss.hh @@ -78,6 +78,12 @@ public: int m_useDefaultReadvAlg = 0; private: + bool m_configBufferEnable=false; //! config option for buffering + size_t m_configBufferSize=16*1024*1024L; //! Buffer size + std::string m_configBufferIOmode = "aio"; + bool m_configReadVEnable=false; //! enable readV decorator + std::string m_configReadVAlgName="passthrough"; // readV algorithm type + size_t m_configMaxSimulBufferCount=10; //! max number of buffers in a single Oss instance (.e.g simul. reads) std::string m_configPoolnames; }; diff --git a/src/XrdCeph/XrdCephOssBufferedFile.cc b/src/XrdCeph/XrdCephOssBufferedFile.cc new file mode 100644 index 00000000..113cddcf --- /dev/null +++ b/src/XrdCeph/XrdCephOssBufferedFile.cc @@ -0,0 +1,345 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN) +// Author: Sebastien Ponce +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +//------------------------------------------------------------------------------ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "XrdCeph/XrdCephPosix.hh" +#include "XrdOuc/XrdOucEnv.hh" +#include "XrdSys/XrdSysError.hh" +#include "XrdOuc/XrdOucTrace.hh" +#include "XrdSfs/XrdSfsAio.hh" +#include "XrdCeph/XrdCephOssFile.hh" + +#include "XrdCeph/XrdCephOssBufferedFile.hh" +#include "XrdCeph/XrdCephBuffers/XrdCephBufferAlgSimple.hh" +#include "XrdCeph/XrdCephBuffers/XrdCephBufferDataSimple.hh" +#include "XrdCeph/XrdCephBuffers/CephIOAdapterRaw.hh" +#include "XrdCeph/XrdCephBuffers/CephIOAdapterAIORaw.hh" + +#include + +using namespace XrdCephBuffer; +using namespace std::chrono_literals; + +extern XrdSysError XrdCephEroute; +extern XrdOucTrace XrdCephTrace; + + +XrdCephOssBufferedFile::XrdCephOssBufferedFile(XrdCephOss *cephoss,XrdCephOssFile *cephossDF, + size_t buffersize,const std::string& bufferIOmode, + size_t maxNumberSimulBuffers): + XrdCephOssFile(cephoss), m_cephoss(cephoss), m_xrdOssDF(cephossDF), + m_maxCountReadBuffers(maxNumberSimulBuffers), + m_maxBufferRetrySleepTime_ms(1000), + m_bufsize(buffersize), + m_bufferIOmode(bufferIOmode) +{ + +} + +XrdCephOssBufferedFile::~XrdCephOssBufferedFile() { + // XrdCephEroute.Say("XrdCephOssBufferedFile::Destructor"); + + // remember to delete the inner XrdCephOssFile object + if (m_xrdOssDF) { + delete m_xrdOssDF; + m_xrdOssDF = nullptr; + } + +} + + +int XrdCephOssBufferedFile::Open(const char *path, int flags, mode_t mode, XrdOucEnv &env) { + + int rc = m_xrdOssDF->Open(path, flags, mode, env); + if (rc < 0) { + return rc; + } + m_fd = m_xrdOssDF->getFileDescriptor(); + BUFLOG("XrdCephOssBufferedFile::Open got fd: " << m_fd << " " << path); + m_flags = flags; // e.g. for write/read knowledge + m_path = path; // good to keep the path for final stats presentation + + + // start the timer + //m_timestart = std::chrono::steady_clock::now(); + m_timestart = std::chrono::system_clock::now(); + // return the file descriptor + return rc; +} + +int XrdCephOssBufferedFile::Close(long long *retsz) { + // if data is still in the buffer and we are writing, make sure to write it + if (m_bufferAlg && (m_flags & (O_WRONLY|O_RDWR)) != 0) { + ssize_t rc = m_bufferAlg->flushWriteCache(); + if (rc < 0) { + LOGCEPH( "XrdCephOssBufferedFile::Close: flush Error fd: " << m_fd << " rc:" << rc ); + // still try to close the file + ssize_t rc2 = m_xrdOssDF->Close(retsz); + if (rc2 < 0) { + LOGCEPH( "XrdCephOssBufferedFile::Close: Close error after flush Error fd: " << m_fd << " rc:" << rc2 ); + } + return rc; // return the original flush error + } else { + LOGCEPH( "XrdCephOssBufferedFile::Close: Flushed data on close fd: " << m_fd << " rc:" << rc ); + } + } // check for write + const std::chrono::time_point now = + std::chrono::system_clock::now(); + const std::time_t t_s = std::chrono::system_clock::to_time_t(m_timestart); + const std::time_t t_c = std::chrono::system_clock::to_time_t(now); + + auto t_dur = std::chrono::duration_cast(now - m_timestart).count(); + + LOGCEPH("XrdCephOssBufferedFile::Summary: {\"fd\":" << m_fd << ", \"Elapsed_time_ms\":" << t_dur + << ", \"path\":\"" << m_path + << "\", read_B:" << m_bytesRead.load() + << ", readV_B:" << m_bytesReadV.load() + << ", readAIO_B:" << m_bytesReadAIO.load() + << ", writeB:" << m_bytesWrite.load() + << ", writeAIO_B:" << m_bytesWriteAIO.load() + << ", startTime:\"" << std::put_time(std::localtime(&t_s), "%F %T") << "\", endTime:\"" + << std::put_time(std::localtime(&t_c), "%F %T") << "\"" + << ", nBuffersRead:" << m_bufferReadAlgs.size() + << "}"); + + return m_xrdOssDF->Close(retsz); +} + + +ssize_t XrdCephOssBufferedFile::ReadV(XrdOucIOVec *readV, int rnum) { + // don't touch readV in the buffering method + ssize_t rc = m_xrdOssDF->ReadV(readV,rnum); + if (rc > 0) m_bytesReadV.fetch_add(rc); + return rc; +} + +ssize_t XrdCephOssBufferedFile::Read(off_t offset, size_t blen) { + return m_xrdOssDF->Read(offset, blen); +} + +ssize_t XrdCephOssBufferedFile::Read(void *buff, off_t offset, size_t blen) { + size_t thread_id = std::hash{}(std::this_thread::get_id()); + + IXrdCephBufferAlg * buffer{nullptr}; + // check for, and create if needed, a buffer + { + // lock in case need to create a new algorithm instance + const std::lock_guard lock(m_buf_mutex); + auto buffer_itr = m_bufferReadAlgs.find(thread_id); + if (buffer_itr == m_bufferReadAlgs.end()) { + // only create a buffer, if we haven't hit the max buffers yet + auto buffer_ptr = createBuffer(); + if (buffer_ptr) { + buffer = buffer_ptr.get(); + m_bufferReadAlgs[thread_id] = std::move(buffer_ptr); + } else { + // if we can't create a buffer, we just have to pass through the read ... + ssize_t rc = m_xrdOssDF->Read(buff, offset, blen); + if (rc >= 0) { + LOGCEPH( "XrdCephOssBufferedFile::Read buffers and read failed with rc: " << rc ); + } + return rc; + } + } else { + buffer = buffer_itr->second.get(); + } + } // scope of lock + + int retry_counter{m_maxBufferRetries}; + ssize_t rc {0}; + while (retry_counter > 0) { + rc = buffer->read(buff, offset, blen); + if (rc != -EBUSY) break; // either worked, or is a real non busy error + LOGCEPH( "XrdCephOssBufferedFile::Read Recieved EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. " + << " rc:" << rc << " off:" << offset << " len:" << blen); + std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms); + --retry_counter; + } + if (retry_counter == 0) { + // reach maximum attempts for ebusy retry; fail the job + LOGCEPH( "XrdCephOssBufferedFile::Read Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: " + << " rc:" << rc << " off:" << offset << " len:" << blen ); + // set a permanent error code: + rc = -EIO; + } + if (rc >=0) { + m_bytesRead.fetch_add(rc); + } else { + LOGCEPH( "XrdCephOssBufferedFile::Read: Read error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen); + } + // LOGCEPH( "XrdCephOssBufferedFile::Read: Read good fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen); + return rc; +} + +int XrdCephOssBufferedFile::Read(XrdSfsAio *aiop) { + size_t thread_id = std::hash{}(std::this_thread::get_id()); + IXrdCephBufferAlg * buffer{nullptr}; + // check for, and create if needed, a buffer + { + // lock in case need to create a new algorithm instance + const std::lock_guard lock(m_buf_mutex); + auto buffer_itr = m_bufferReadAlgs.find(thread_id); + if (buffer_itr == m_bufferReadAlgs.end()) { + m_bufferReadAlgs[thread_id] = createBuffer(); + buffer = m_bufferReadAlgs.find(thread_id)->second.get(); + } else { + buffer = buffer_itr->second.get(); + } + } + + // LOGCEPH("XrdCephOssBufferedFile::AIOREAD: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : " + // << aiop->sfsAio.aio_offset << " " + // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " " + // << aiop->sfsAio.aio_fildes ); + ssize_t rc = buffer->read_aio(aiop); + if (rc > 0) { + m_bytesReadAIO.fetch_add(rc); + } else { + LOGCEPH( "XrdCephOssBufferedFile::Read: ReadAIO error fd: " << m_fd << " rc:" << rc + << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes ); + } + return rc; +} + +ssize_t XrdCephOssBufferedFile::ReadRaw(void *buff, off_t offset, size_t blen) { + // #TODO; ReadRaw should bypass the buffer ? + return m_xrdOssDF->ReadRaw(buff, offset, blen); +} + +int XrdCephOssBufferedFile::Fstat(struct stat *buff) { + return m_xrdOssDF->Fstat(buff); +} + +ssize_t XrdCephOssBufferedFile::Write(const void *buff, off_t offset, size_t blen) { + + if (!m_bufferAlg) { + m_bufferAlg = createBuffer(); + if (!m_bufferAlg) { + LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object"); + return -EINVAL; + } + } + + + int retry_counter{m_maxBufferRetries}; + ssize_t rc {0}; + while (retry_counter > 0) { + rc = m_bufferAlg->write(buff, offset, blen); + if (rc != -EBUSY) break; // either worked, or is a real non busy error + LOGCEPH( "XrdCephOssBufferedFile::Write Recieved EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. " + << " rc:" << rc << " off:" << offset << " len:" << blen); + std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms); + --retry_counter; + } + if (retry_counter == 0) { + // reach maximum attempts for ebusy retry; fail the job + LOGCEPH( "XrdCephOssBufferedFile::Write Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: " + << " rc:" << rc << " off:" << offset << " len:" << blen ); + // set a permanent error code: + rc = -EIO; + } + if (rc >=0) { + m_bytesWrite.fetch_add(rc); + } else { + LOGCEPH( "XrdCephOssBufferedFile::Write: Write error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen); + } + return rc; +} + +int XrdCephOssBufferedFile::Write(XrdSfsAio *aiop) { + if (!m_bufferAlg) { + m_bufferAlg = createBuffer(); + if (!m_bufferAlg) { + LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object"); + return -EINVAL; + } + } + + // LOGCEPH("XrdCephOssBufferedFile::AIOWRITE: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : " + // << aiop->sfsAio.aio_offset << " " + // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " " + // << aiop->sfsAio.aio_fildes << " " ); + ssize_t rc = m_bufferAlg->write_aio(aiop); + if (rc > 0) { + m_bytesWriteAIO.fetch_add(rc); + } else { + LOGCEPH( "XrdCephOssBufferedFile::Write: WriteAIO error fd: " << m_fd << " rc:" << rc + << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes ); + } + return rc; + +} + +int XrdCephOssBufferedFile::Fsync() { + return m_xrdOssDF->Fsync(); +} + +int XrdCephOssBufferedFile::Ftruncate(unsigned long long len) { + return m_xrdOssDF->Ftruncate(len); +} + + +std::unique_ptr XrdCephOssBufferedFile::createBuffer() { + std::unique_ptr bufferAlg; + + size_t bufferSize {m_bufsize}; // create buffer of default size + if (m_bufferReadAlgs.size() >= m_maxCountReadBuffers) { + BUFLOG("XrdCephOssBufferedFile: buffer reached max number of simul-buffers for this file: creating only 1MiB buffer" ); + bufferSize = 1048576; + } else { + BUFLOG("XrdCephOssBufferedFile: buffer: got " << m_bufferReadAlgs.size() << " buffers already"); + } + + try { + std::unique_ptr cephbuffer = std::unique_ptr(new XrdCephBufferDataSimple(bufferSize)); + std::unique_ptr cephio; + if (m_bufferIOmode == "aio") { + cephio = std::unique_ptr(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd)); + } else if (m_bufferIOmode == "io") { + cephio = std::unique_ptr(new CephIOAdapterRaw(cephbuffer.get(),m_fd, + !m_cephoss->m_useDefaultPreadAlg)); + } else { + BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " ); + m_xrdOssDF->Close(); + return bufferAlg; // invalid instance; + } + + LOGCEPH( "XrdCephOssBufferedFile::Open: fd: " << m_fd << " Buffer created: " << cephbuffer->capacity() ); + bufferAlg = std::unique_ptr(new XrdCephBufferAlgSimple(std::move(cephbuffer),std::move(cephio),m_fd) ); + } catch (const std::bad_alloc &e) { + BUFLOG("XrdCephOssBufferedFile: Bad memory allocation in buffer: " << e.what() ); + } + + return bufferAlg; + } diff --git a/src/XrdCeph/XrdCephOssBufferedFile.hh b/src/XrdCeph/XrdCephOssBufferedFile.hh new file mode 100644 index 00000000..7371271d --- /dev/null +++ b/src/XrdCeph/XrdCephOssBufferedFile.hh @@ -0,0 +1,95 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN) +// Author: Sebastien Ponce +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +//------------------------------------------------------------------------------ + +#ifndef __XRD_CEPH_OSS_BUFFERED_FILE_HH__ +#define __XRD_CEPH_OSS_BUFFERED_FILE_HH__ + +#include "XrdOss/XrdOss.hh" +#include "XrdCeph/XrdCephOss.hh" +#include "XrdCeph/XrdCephOssFile.hh" + +#include "XrdCeph/XrdCephBuffers/IXrdCephBufferData.hh" +#include "XrdCeph/XrdCephBuffers/IXrdCephBufferAlg.hh" +#include "XrdCeph/XrdCephBuffers/IXrdCephReadVAdapter.hh" + +#include +#include +#include +#include +#include + +//------------------------------------------------------------------------------ +//! Decorator class XrdCephOssBufferedFile designed to wrap XrdCephOssFile +//! Functionality for buffered access to/from data in Ceph to avoid inefficient +//! small reads / writes from the client side +//------------------------------------------------------------------------------ + +class XrdCephOssBufferedFile : virtual public XrdCephOssFile { // XrdOssDF + +public: + XrdCephOssBufferedFile(XrdCephOss *cephoss,XrdCephOssFile *cephossDF, size_t buffersize, + const std::string& bufferIOmode, + size_t maxNumberSimulBuffers); + //explicit XrdCephOssBufferedFile(size_t buffersize); + virtual ~XrdCephOssBufferedFile(); + virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env); + virtual int Close(long long *retsz=0); + virtual ssize_t Read(off_t offset, size_t blen); + virtual ssize_t Read(void *buff, off_t offset, size_t blen); + virtual int Read(XrdSfsAio *aoip); + virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt); + virtual ssize_t ReadRaw(void *, off_t, size_t); + virtual int Fstat(struct stat *buff); + virtual ssize_t Write(const void *buff, off_t offset, size_t blen); + virtual int Write(XrdSfsAio *aiop); + virtual int Fsync(void); + virtual int Ftruncate(unsigned long long); + +protected: + std::unique_ptr createBuffer(); /// create a new instance of the buffer + + XrdCephOss *m_cephoss = nullptr; + XrdCephOssFile * m_xrdOssDF = nullptr; // holder of the XrdCephOssFile instance + std::unique_ptr m_bufferAlg; + std::map > m_bufferReadAlgs; + std::mutex m_buf_mutex; //! any data access method on the buffer will use this + size_t m_maxCountReadBuffers {10}; //! set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads) + + + int m_maxBufferRetries {5}; //! How many times to retry a ready from a buffer with EBUSY errors + int m_maxBufferRetrySleepTime_ms; //! number of ms to sleep if a retry is requested + + int m_flags = 0; + size_t m_bufsize = 16*1024*1024L; // default 16MiB size + std::string m_bufferIOmode; + std::string m_path; + std::chrono::time_point m_timestart; + std::atomic m_bytesRead = {0}; /// number of bytes read or written + std::atomic m_bytesReadV = {0}; /// number of bytes read or written + std::atomic m_bytesReadAIO = {0}; /// number of bytes read or written + std::atomic m_bytesWrite = {0}; /// number of bytes read or written + std::atomic m_bytesWriteAIO= {0}; /// number of bytes read or written +}; + +#endif /* __XRD_CEPH_OSS_BUFFERED_FILE_HH__ */ diff --git a/src/XrdCeph/XrdCephOssFile.hh b/src/XrdCeph/XrdCephOssFile.hh index 5c716b56..999cfcfd 100644 --- a/src/XrdCeph/XrdCephOssFile.hh +++ b/src/XrdCeph/XrdCephOssFile.hh @@ -49,11 +49,11 @@ //! In case one of the two only has a default, it will be applied for both plugins. //------------------------------------------------------------------------------ -class XrdCephOssFile : public XrdOssDF { +class XrdCephOssFile : virtual public XrdOssDF { public: - XrdCephOssFile(XrdCephOss *cephoss); + explicit XrdCephOssFile(XrdCephOss *cephoss); virtual ~XrdCephOssFile() {}; virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env); virtual int Close(long long *retsz=0); @@ -68,7 +68,8 @@ public: virtual int Fsync(void); virtual int Ftruncate(unsigned long long); -private: + inline virtual int getFileDescriptor() const {return m_fd;} +protected: int m_fd; XrdCephOss *m_cephOss; diff --git a/src/XrdCeph/XrdCephOssReadVFile.cc b/src/XrdCeph/XrdCephOssReadVFile.cc new file mode 100644 index 00000000..4160a4ea --- /dev/null +++ b/src/XrdCeph/XrdCephOssReadVFile.cc @@ -0,0 +1,215 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN) +// Author: Sebastien Ponce +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +//------------------------------------------------------------------------------ + +#include +#include +#include +#include +#include +#include +#include + +#include "XrdCeph/XrdCephPosix.hh" +#include "XrdOuc/XrdOucEnv.hh" +#include "XrdSys/XrdSysError.hh" +#include "XrdOuc/XrdOucTrace.hh" +#include "XrdSfs/XrdSfsAio.hh" +#include "XrdCeph/XrdCephOssFile.hh" + +#include "XrdCeph/XrdCephOssReadVFile.hh" +#include "XrdCeph/XrdCephBuffers/XrdCephReadVBasic.hh" +#include "XrdCeph/XrdCephBuffers/XrdCephReadVNoOp.hh" + +using namespace XrdCephBuffer; + +extern XrdSysError XrdCephEroute; +extern XrdOucTrace XrdCephTrace; + +XrdCephOssReadVFile::XrdCephOssReadVFile(XrdCephOss *cephoss,XrdCephOssFile *cephossDF,const std::string& algname): +XrdCephOssFile(cephoss), m_cephoss(cephoss), m_xrdOssDF(cephossDF),m_algname(algname) +{ + if (!m_xrdOssDF) XrdCephEroute.Say("XrdCephOssReadVFile::Null m_xrdOssDF"); + + if (m_algname == "passthrough") { // #TODO consider to use a factory method. but this is simple enough for now + m_readVAdapter = std::unique_ptr(new XrdCephBuffer::XrdCephReadVNoOp()); + } else if (m_algname == "basic") { + m_readVAdapter = std::unique_ptr(new XrdCephBuffer::XrdCephReadVBasic()); + } else { + XrdCephEroute.Say("XrdCephOssReadVFile::ERROR Invalid ReadV algorthm passed; defaulting to passthrough"); + m_algname = "passthrough"; + m_readVAdapter = std::unique_ptr(new XrdCephBuffer::XrdCephReadVNoOp()); + } + LOGCEPH("XrdCephOssReadVFile Algorithm type: " << m_algname); +} + +XrdCephOssReadVFile::~XrdCephOssReadVFile() { + if (m_xrdOssDF) { + delete m_xrdOssDF; + m_xrdOssDF = nullptr; + } + +} + +int XrdCephOssReadVFile::Open(const char *path, int flags, mode_t mode, XrdOucEnv &env) { + int rc = m_xrdOssDF->Open(path, flags, mode, env); + if (rc < 0) { + return rc; + } + m_fd = m_xrdOssDF->getFileDescriptor(); + LOGCEPH("XrdCephOssReadVFile::Open: fd: " << m_fd << " " << path ); + return rc; +} + +int XrdCephOssReadVFile::Close(long long *retsz) { + LOGCEPH("XrdCephOssReadVFile::Close: retsz: " << retsz << " Time_ceph_s: " << m_timer_read_ns.load()*1e-9 << " count: " + << m_timer_count.load() << " size_B: " << m_timer_size.load() + << " longest_s:" << m_timer_longest.load()*1e-9); + + return m_xrdOssDF->Close(retsz); +} + + +ssize_t XrdCephOssReadVFile::ReadV(XrdOucIOVec *readV, int rnum) { + int fd = m_xrdOssDF->getFileDescriptor(); + LOGCEPH("XrdCephOssReadVFile::ReadV: fd: " << fd << " " << rnum ); + + std::stringstream msg_extents; + msg_extents << "XrdCephOssReadVFile::Extentslist={\"fd\": " << fd << ", \"EXTENTS\":["; + + ExtentHolder extents(rnum); + for (int i = 0; i < rnum; i++) { + extents.push_back(Extent(readV[i].offset, readV[i].size)); + msg_extents << "[" << readV[i].offset << "," << readV[i].size << "]," ; + } + msg_extents << "]}"; + //XrdCephEroute.Say(msg_extents.str().c_str()); msg_extents.clear(); + if (m_extraLogging) { + // improve this so no wasted calls if logging is disabled + LOGCEPH(msg_extents.str()); + msg_extents.clear(); + } + + LOGCEPH("XrdCephOssReadVFile::Extents: fd: "<< fd << " " << extents.size() << " " << extents.len() << " " + << extents.begin() << " " << extents.end() << " " << extents.bytesContained() + << " " << extents.bytesMissing()); + + // take the input set of extents and return a vector of merged extents (covering the range to read) + std::vector mappedExtents = m_readVAdapter->convert(extents); + + + // counter is the iterator to the original readV elements, and is incremented for each chunk that's returned + int nbytes = 0, curCount = 0, counter(0); + size_t totalBytesRead(0), totalBytesUseful(0); + + // extract the largest range of the extents, and create a buffer. + size_t buffersize{0}; + for (std::vector::const_iterator ehit = mappedExtents.cbegin(); ehit!= mappedExtents.cend(); ++ehit ) { + buffersize = std::max(buffersize, ehit->len()); + } + std::vector buffer; + buffer.reserve(buffersize); + + + //LOGCEPH("mappedExtents: len: " << mappedExtents.size() ); + for (std::vector::const_iterator ehit = mappedExtents.cbegin(); ehit!= mappedExtents.cend(); ++ehit ) { + off_t off = ehit->begin(); + size_t len = ehit->len(); + + //LOGCEPH("outerloop: " << off << " " << len << " " << ehit->end() << " " << " " << ehit->size() ); + + // read the full extent into the buffer + long timed_read_ns{0}; + {Timer_ns ts(timed_read_ns); + curCount = m_xrdOssDF->Read(buffer.data(), off, len); + } // timer scope + ++m_timer_count; + auto l = m_timer_longest.load(); + m_timer_longest.store(max(l,timed_read_ns)); // doesn't quite prevent race conditions + m_timer_read_ns.fetch_add(timed_read_ns); + m_timer_size.fetch_add(curCount); + + // check that the correct amount of data was read. + // std:: clog << "buf Read " << curCount << std::endl; + if (curCount != (ssize_t)len) { + return (curCount < 0 ? curCount : -ESPIPE); + } + totalBytesRead += curCount; + totalBytesUseful += ehit->bytesContained(); + + + // now read out into the original readV requests for each of the held inner extents + const char* data = buffer.data(); + const ExtentContainer& innerExtents = ehit->extents(); + for (ExtentContainer::const_iterator it = innerExtents.cbegin(); it != innerExtents.cend(); ++it) { + off_t innerBegin = it->begin() - off; + off_t innerEnd = it->end() - off; + //LOGCEPH( "innerloop: " << innerBegin << " " << innerEnd << " " << off << " " + // << it->begin() << " " << it-> end() << " " + // << readV[counter].offset << " " << readV[counter].size); + std::copy(data+innerBegin, data+innerEnd, readV[counter].data ); + nbytes += it->len(); + ++counter; // next element + } // inner extents + + } // outer extents + LOGCEPH( "readV returning " << nbytes << " bytes: " << "Read: " <Read(offset,blen); +} + +ssize_t XrdCephOssReadVFile::Read(void *buff, off_t offset, size_t blen) { + return m_xrdOssDF->Read(buff,offset,blen); +} + +int XrdCephOssReadVFile::Read(XrdSfsAio *aiop) { + return m_xrdOssDF->Read(aiop); +} + +ssize_t XrdCephOssReadVFile::ReadRaw(void *buff, off_t offset, size_t blen) { + return m_xrdOssDF->ReadRaw(buff, offset, blen); +} + +int XrdCephOssReadVFile::Fstat(struct stat *buff) { + return m_xrdOssDF->Fstat(buff); +} + +ssize_t XrdCephOssReadVFile::Write(const void *buff, off_t offset, size_t blen) { + return m_xrdOssDF->Write(buff,offset,blen); +} + +int XrdCephOssReadVFile::Write(XrdSfsAio *aiop) { + return m_xrdOssDF->Write(aiop); +} + +int XrdCephOssReadVFile::Fsync() { + return m_xrdOssDF->Fsync(); +} + +int XrdCephOssReadVFile::Ftruncate(unsigned long long len) { + return m_xrdOssDF->Ftruncate(len); +} diff --git a/src/XrdCeph/XrdCephOssReadVFile.hh b/src/XrdCeph/XrdCephOssReadVFile.hh new file mode 100644 index 00000000..a27a439a --- /dev/null +++ b/src/XrdCeph/XrdCephOssReadVFile.hh @@ -0,0 +1,91 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN) +// Author: Sebastien Ponce +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +//------------------------------------------------------------------------------ + +#ifndef __XRD_CEPH_OSS_READV_FILE_HH__ +#define __XRD_CEPH_OSS_READV_FILE_HH__ + +#include "XrdOss/XrdOss.hh" +#include "XrdCeph/XrdCephOss.hh" +#include "XrdCeph/XrdCephOssFile.hh" + +#include "XrdCeph/XrdCephBuffers/IXrdCephBufferData.hh" +#include "XrdCeph/XrdCephBuffers/IXrdCephBufferAlg.hh" +#include "XrdCeph/XrdCephBuffers/IXrdCephReadVAdapter.hh" + +#include + + +//------------------------------------------------------------------------------ +//! Decorator class XrdCephOssReadVFile designed to wrap XrdCephOssFile +//! Functionality for ReadV access to/from data in Ceph to avoid inefficient +//! small reads / writes from the client side. +//! Initially for monitoring purposes +//------------------------------------------------------------------------------ + +class XrdCephOssReadVFile : virtual public XrdCephOssFile { + +public: + + explicit XrdCephOssReadVFile(XrdCephOss *cephoss, XrdCephOssFile *cephossDF,const std::string& algname); + virtual ~XrdCephOssReadVFile(); + virtual int Open(const char *path, int flags, mode_t mode, XrdOucEnv &env); + virtual int Close(long long *retsz=0); + +//----------------------------------------------------------------------------- +//! Read file bytes as directed by the read vector. +//! +//! @param readV pointer to the array of read requests. +//! @param rdvcnt the number of elements in readV. +//! +//! @return >=0 The numbe of bytes read. +//! @return < 0 -errno or -osserr upon failure (see XrdOssError.hh). +//----------------------------------------------------------------------------- + virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt); + + virtual ssize_t Read(off_t offset, size_t blen); + virtual ssize_t Read(void *buff, off_t offset, size_t blen); + virtual int Read(XrdSfsAio *aoip); + virtual ssize_t ReadRaw(void *, off_t, size_t); + virtual int Fstat(struct stat *buff); + virtual ssize_t Write(const void *buff, off_t offset, size_t blen); + virtual int Write(XrdSfsAio *aiop); + virtual int Fsync(void); + virtual int Ftruncate(unsigned long long); + +protected: + XrdCephOss *m_cephoss = nullptr; + XrdCephOssFile * m_xrdOssDF = nullptr; // holder of the XrdCephOssFile instance + bool m_extraLogging = true; // use verbose logging + std::string m_algname = "passthrough"; + std::unique_ptr m_readVAdapter; + + std::atomic m_timer_read_ns {0}; //! timer for the reads against ceph + std::atomic m_timer_count {0}; //! number of reads + std::atomic m_timer_size {0}; //! number of reads + std::atomic m_timer_longest {0}; //! size read in bytes + + +}; + +#endif /* __XRD_CEPH_OSS_READV_FILE_HH__ */ diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 975193a8..910505a6 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -51,9 +51,10 @@ #include "XrdSys/XrdSysPlatform.hh" #include #include "XrdOuc/XrdOucIOVec.hh" - #include "XrdCeph/XrdCephPosix.hh" #include "XrdCeph/XrdCephBulkAioRead.hh" +#include "XrdSfs/XrdSfsFlags.hh" // for the OFFLINE flag status + /// small struct for directory listing struct DirIterator { @@ -107,6 +108,9 @@ XrdSysMutex g_fd_mutex; /// mutex protecting initialization of ceph clusters XrdSysMutex g_init_mutex; +//JW Counter for number of times a given cluster is resolved. +std::map g_idxCntr; + /// Accessor to next ceph pool index /// Note that this is not thread safe, but we do not care /// as we only want a rough load balancing @@ -130,6 +134,8 @@ unsigned int getCephPoolIdxAndIncrease() { nextValue = 0; } g_cephPoolIdx = nextValue; + // JW logging of accesses: + ++g_idxCntr[res]; return res; } @@ -231,6 +237,32 @@ static unsigned int stoui(const std::string &s) { } + +void dumpClusterInfo() { + //JW + // log the current state of the cluster: + // don't want to lock here, so the numbers may not be 100% self-consistent + int n_cluster = g_cluster.size(); + int n_ioCtx = g_ioCtx.size(); + int n_filesOpenForWrite = g_filesOpenForWrite.size(); + int n_fds = g_fds.size(); + int n_stripers = g_radosStripers.size(); + int n_stripers_pool = 0; + for (size_t i = 0; i < g_radosStripers.size(); ++i) { + n_stripers_pool += g_radosStripers.at(i).size(); + } + std::stringstream ss; + ss << "Counts: " << n_cluster << " " << n_ioCtx << " " << n_filesOpenForWrite << " " + << n_fds << " " << n_stripers << " " << n_stripers_pool << " " << n_stripers_pool + << " CountsbyCluster: ["; + for (const auto& el : g_idxCntr) { + ss << el.first << ":" << el.second << ", " ; + } // it + ss<< "], "; + + logwrapper((char*)"dumpClusterInfo : %s", ss.str().c_str()); +} + /// fills the userId of a ceph file struct from a string and an environment /// returns position of first character after the userId static int fillCephUserId(const std::string ¶ms, XrdOucEnv *env, CephFile &file) { @@ -636,12 +668,12 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode struct stat buf; libradosstriper::RadosStriper *striper = getRadosStriper(fr); //Get a handle to the RADOS striper API - if (NULL == striper) { logwrapper((char*)"Cannot create striper"); return -EINVAL; } - + dumpClusterInfo(); // JW enhanced logging + int rc = striper->stat(fr.name, (uint64_t*)&(buf.st_size), &(buf.st_atime)); //Get details about a file @@ -652,6 +684,49 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode if ((flags&O_ACCMODE) == O_RDONLY) { // Access mode is READ if (fileExists) { + librados::bufferlist d_stripeUnit; + librados::bufferlist d_objectSize; + std::string obj_name; + librados::IoCtx *context = getIoCtx(fr); + + // read first stripe of the object for xattr stripe unit and object size + // this will fail if the object was not written in stripes e.g. s3 + // TBD: fallback to direct object (no stripe id appends to filename, + // replace striper metadata with corresponding metadata) + // + try { + obj_name = fr.name + std::string(".0000000000000000"); + } catch (std::bad_alloc&) { + logwrapper((char*)"Can not create object string for file %s)", fr.name.c_str()); + return -ENOMEM; + } + int ret = 0; + ret = context->getxattr(obj_name, "striper.layout.stripe_unit", d_stripeUnit); + ret = std::min(ret,context->getxattr(obj_name, "striper.layout.object_size", d_objectSize)); + //log_func((char*)"size xattr for %s , %llu ,%llu", file_ref->name.c_str(), file_ref->objectSize, file_ref->stripeUnit ); + if (ret<=0){ + logwrapper((char*)"Could not find size or stripe_unit xattr for %s", fr.name.c_str()); + } + else{ + //librados's c_str() method does not return a NULL-terminated string, hence why we need to cleanup here + char cleanStripeUnit[MAXDIGITSIZE]; + char cleanObjectSize[MAXDIGITSIZE]; + unsigned int stripeUnitLength = std::min((unsigned int)MAXDIGITSIZE-1, d_stripeUnit.length()); + unsigned int objectSizeLength = std::min((unsigned int)MAXDIGITSIZE-1, d_objectSize.length()); + (void)strncpy( cleanStripeUnit, d_stripeUnit.c_str(), stripeUnitLength ); + (void)strncpy( cleanObjectSize, d_objectSize.c_str(), objectSizeLength ); + cleanStripeUnit[stripeUnitLength] = '\0'; + cleanObjectSize[objectSizeLength] = '\0'; + //only change defaults if different + if(fr.stripeUnit != std::stoull(cleanStripeUnit)){ + logwrapper((char*)"WARNING: stripe unit of %s does not match defaults. object size is %s", pathname, cleanStripeUnit); + fr.stripeUnit = std::stoull(cleanStripeUnit); + } + if(fr.objectSize != std::stoull(cleanObjectSize)){ + logwrapper((char*)"WARNING: object size of %s does not match defaults. object size is %s",pathname, cleanObjectSize); + fr.objectSize = std::stoull(cleanObjectSize); + } + } int fd = insertFileRef(fr); logwrapper((char*)"File descriptor %d associated to file %s opened in read mode", fd, pathname); return fd; @@ -1022,7 +1097,7 @@ ssize_t ceph_posix_nonstriper_pread(int fd, void *buf, size_t count, off64_t off XrdSysMutexHelper lock(fr->statsMutex); fr->rdcount++; } else { - logwrapper( (char*)"Error while read\n"); + logwrapper( (char*)"Error while read: %d\n", bytes_read); } return bytes_read; } catch (std::bad_alloc&) { @@ -1057,6 +1132,26 @@ ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset) { } } +ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper) { + ssize_t rc {0}; + if (!allowStriper) { + rc = ceph_posix_pread(fd,buf,count,offset); + return rc; + } + rc = ceph_posix_nonstriper_pread(fd, buf, count,offset); + if (-ENOENT == rc || -ENOTSUP == rc) { + //This might be a sparse file or nbstripes > 1, so let's try striper read + rc = ceph_posix_pread(fd, buf, count,offset); + if (rc >= 0) { + char err_str[100]; //99 symbols should be enough for the short message + snprintf(err_str, 100, "WARNING! The file (fd %d) seem to be sparse, this is not expected", fd); + logwrapper(err_str); + } + } + return rc; +} + + static void ceph_aio_read_complete(rados_completion_t c, void *arg) { AioArgs *awa = reinterpret_cast(arg); size_t rc = rados_aio_get_return_value(c); @@ -1447,6 +1542,9 @@ int ceph_posix_truncate(XrdOucEnv* env, const char *pathname, unsigned long long int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) { logwrapper((char*)"ceph_posix_unlink : %s", pathname); + // start the timer + auto timer_start = std::chrono::steady_clock::now(); + // minimal stat : only size and times are filled CephFile file = getCephFile(pathname, env); libradosstriper::RadosStriper *striper = getRadosStriper(file); @@ -1454,7 +1552,15 @@ int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) { return -EINVAL; } int rc = striper->remove(file.name); + auto end = std::chrono::steady_clock::now(); + auto deltime_ms = std::chrono::duration_cast(end - timer_start).count(); + + if (rc == 0) { + logwrapper((char*)"ceph_posix_unlink : %s unlink successful: %d ms", pathname, deltime_ms); + return 0; + } if (rc != -EBUSY) { + logwrapper((char*)"ceph_posix_unlink : %s unlink failed: %d ms; return code %d", pathname, deltime_ms, rc); return rc; } // if EBUSY returned, assume the file is locked; so try to remove the lock @@ -1469,10 +1575,13 @@ int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) { // now try to remove again rc = striper->remove(file.name); + end = std::chrono::steady_clock::now(); + deltime_ms = std::chrono::duration_cast(end - timer_start).count(); + if (rc != 0) { - logwrapper((char*)"ceph_posix_unlink : unlink failed after lock removal %s, %d", pathname, rc); + logwrapper((char*)"ceph_posix_unlink : unlink failed after lock removal %s, %d ms", pathname, deltime_ms); } else { - logwrapper((char*)"ceph_posix_unlink : unlink suceeded after lock removal %s, %d", pathname, rc); + logwrapper((char*)"ceph_posix_unlink : unlink suceeded after lock removal %s, %d ms", pathname, deltime_ms); } return rc; } diff --git a/src/XrdCeph/XrdCephPosix.hh b/src/XrdCeph/XrdCephPosix.hh index b7f5a670..5d01129b 100644 --- a/src/XrdCeph/XrdCephPosix.hh +++ b/src/XrdCeph/XrdCephPosix.hh @@ -33,12 +33,27 @@ #include #include #include +#include #include #include #include "XrdSys/XrdSysPthread.hh" #include "XrdOuc/XrdOucIOVec.hh" +// simple logging for XrdCeph buffering code +#define XRDCEPHLOGLEVEL 1 +#define MAXDIGITSIZE 32 +#ifdef XRDCEPHLOGLEVEL + // ensure that + // extern XrdOucTrace XrdCephTrace; + // is in the cc file where you want to log // << std::endl + //#define LOGCEPH(x) {std::stringstream _s; _s << x; XrdCephTrace.Beg(); std::clog << _s.str() ; XrdCephTrace.End(); _s.clear();} + #define LOGCEPH(x) {std::stringstream _s; _s << x; std::clog << _s.str() << std::endl; _s.clear(); } +#else + #define LOGCEPH(x) +#endif + + class XrdSfsAio; typedef void(AioCB)(XrdSfsAio*, size_t); @@ -57,6 +72,8 @@ ssize_t ceph_striper_readv(int fd, XrdOucIOVec *readV, int n); ssize_t ceph_posix_read(int fd, void *buf, size_t count); ssize_t ceph_posix_nonstriper_pread(int fd, void *buf, size_t count, off64_t offset); ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset); +ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper=true); + ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb); int ceph_posix_fstat(int fd, struct stat *buf); int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf);