Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ rock_library(pocolog_cpp
LogFile.cpp
IndexFile.cpp
FileStream.cpp
MappedStream.cpp
MultiFileIndex.cpp
named_vector_helpers.cpp
OwnedValue.cpp
${OPTIONAL_SOURCES}
HEADERS
FileStream.hpp
MappedStream.hpp
Format.hpp
Index.hpp
InputDataStream.hpp
Expand Down
120 changes: 120 additions & 0 deletions src/MappedStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "MappedStream.hpp"
#include <base-logging/Logging.hpp>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <cassert>
#include <iostream>
#include <stdexcept>
#include <unistd.h>

#include <sys/mman.h>
#include <cstring>

pocolog_cpp::MappedStream::MappedStream() : fd(-1), goodFlag(false), fileName("")
{

}

pocolog_cpp::MappedStream::MappedStream(const char* __s, std::ios_base::openmode mode)
{
if(!open(__s, mode))
{
throw std::runtime_error(std::string("Error opening file") + __s);
}
}

pocolog_cpp::MappedStream::~MappedStream()
{
if(fd != -1)
close();
}


bool pocolog_cpp::MappedStream::open(const char* fileName, std::ios_base::openmode mode)
{
fd = ::open(fileName, O_NONBLOCK);
if(fd < 0)
{
goodFlag = false;
return false;
}

struct stat stats;
int ret = ::fstat(fd, &stats);
if(ret < 0)
{
goodFlag = false;
return false;
}

fileSize = stats.st_size;
blockSize = stats.st_blksize;

goodFlag = true;
mem_map = static_cast<const char*>(mmap(0, fileSize, PROT_READ, MAP_PRIVATE, fd, 0));
readPos = 0;
writePos = 0;
this->fileName = fileName;

LOG_DEBUG_S << "File opened " << (fd > 0) << " file size " << fileSize << " blk size " << blockSize;

return true;
}


void pocolog_cpp::MappedStream::read(char* buffer, size_t size)
{
LOG_DEBUG_S << "Reading Bytes from pos " << readPos;

std::memcpy(buffer, mem_map + readPos, size);
readPos += size;
}

bool pocolog_cpp::MappedStream::good() const
{
return goodFlag;
}

bool pocolog_cpp::MappedStream::fail() const
{
return !goodFlag;
}

std::streampos pocolog_cpp::MappedStream::seekg(std::streampos pos)
{
readPos = pos;
goodFlag = true;
return readPos;
}

std::streampos pocolog_cpp::MappedStream::seekp(std::streampos pos)
{
writePos = pos;
goodFlag = true;
return writePos;
}

std::streampos pocolog_cpp::MappedStream::tellg()
{
return readPos;
}

std::streampos pocolog_cpp::MappedStream::tellp()
{
return writePos;
}

off_t pocolog_cpp::MappedStream::size() const
{
return fileSize;
}

void pocolog_cpp::MappedStream::close()
{
goodFlag = false;
if(fd > 0)
::close(fd);
fd = -1;
}

71 changes: 71 additions & 0 deletions src/MappedStream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#ifndef MAPPEDSTREAM_H
#define MAPPEDSTREAM_H

#include <fstream>
#include <vector>

namespace pocolog_cpp
{

/**
* Reimplementation of the std::fstream class.
*
* This implementation is buffered, meanding that the class
* will keep a read and a writebuffer, and only perform disc
* accesses if neccessary.
*
* This reimplementation performs way better in the usecase,
* that seekp/seekg is called often. The std::fstream implementation
* discards its buffer every time seek is called, resulting in a
* horrible runtime performance in our usecase.
* */
class MappedStream
{
private:
// std::vector<char> readBuffer;
char const* mem_map; // mmap'ed file contents
int fd;
off_t readPos;
off_t writePos;
off_t fileSize;
off_t blockSize;

bool reloadBuffer(off_t position);
bool goodFlag;
std::string fileName;

public:
MappedStream();

MappedStream(const char* __s,
std::ios_base::openmode mode);
~MappedStream();

bool open(const char* fileName, std::ios_base::openmode mode);

void read(char* buffer, size_t size);

std::streampos tellg();
std::streampos tellp();

std::streampos seekg(std::streampos pos);
std::streampos seekp(std::streampos pos);

bool good() const;
bool eof() const
{
return readPos >= fileSize;
}

const std::string getFileName() const
{
return fileName;
}

bool fail() const;
off_t size() const;
void close();

};
}
#endif // MAPPEDSTREAM_H
12 changes: 6 additions & 6 deletions src/Stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "Format.hpp"
#include "StreamDescription.hpp"
#include "Index.hpp"
#include "FileStream.hpp"
#include "MappedStream.hpp"

namespace pocolog_cpp
{
Expand All @@ -17,7 +17,7 @@ class Stream
const StreamDescription &desc;
Index &index;

FileStream fileStream;
MappedStream fileStream;
Stream(const StreamDescription &desc, Index &index);

bool loadSampleHeader(std::streampos pos, pocolog_cpp::SampleHeaderData& header);
Expand Down Expand Up @@ -75,10 +75,10 @@ class Stream
return index.getNumSamples();
}

const FileStream& getFileStream() const
{
return fileStream;
}
// const FileStream& getFileStream() const
// {
// return fileStream;
// }

bool getSampleData(std::vector<uint8_t> &result, size_t sampleNr);

Expand Down