Skip to content

Commit 6833237

Browse files
committed
v0.27
1 parent 5fd0296 commit 6833237

File tree

7 files changed

+354
-5
lines changed

7 files changed

+354
-5
lines changed

LineReader.cpp

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,3 +356,155 @@ std::optional<std::string_view> CMappingLineReader::GetNextLine()
356356
assert(foundLineLength > 0 && "result should contain at least LF char");
357357
return result;
358358
}
359+
360+
//////////////////////////////////////////////////////////////////////////
361+
//////////////////////////////////////////////////////////////////////////
362+
//////////////////////////////////////////////////////////////////////////
363+
364+
CLockFreeLineReader::CLockFreeLineReader()
365+
{
366+
this->_buffer1.Allocate(ReadBufferSize);
367+
if (this->_buffer1.ptr != nullptr)
368+
{
369+
this->_buffer2.Allocate(ReadBufferSize);
370+
}
371+
}
372+
373+
bool CLockFreeLineReader::Open(const wchar_t* const filename)
374+
{
375+
if (this->_buffer1.ptr == nullptr || filename == nullptr)
376+
{
377+
return false;
378+
}
379+
assert(this->_buffer2.ptr != nullptr);
380+
this->Close();
381+
382+
const bool bAsyncMode = false;
383+
const bool succeeded = this->_file.Open(filename, bAsyncMode);
384+
if (!succeeded)
385+
{
386+
return false;
387+
}
388+
389+
this->_bufferData = std::string_view(this->_buffer1.ptr, 0);
390+
this->_firstBufferIsActive = true;
391+
392+
const bool initLockFreeOk = this->_file.LockFreecInit();
393+
if (!initLockFreeOk)
394+
{
395+
this->_file.Close();
396+
return false;
397+
}
398+
399+
const bool readStartOk = this->_file.LockFreeReadStart(this->_buffer2.ptr + ReadBufferOffset, ReadChunkSize);
400+
if (!readStartOk)
401+
{
402+
this->_file.LockFreecClean();
403+
this->_file.Close();
404+
return false;
405+
}
406+
407+
return true;
408+
}
409+
410+
void CLockFreeLineReader::Close()
411+
{
412+
this->_file.LockFreecClean();
413+
this->_file.Close();
414+
}
415+
416+
__declspec(noinline) // noinline is added to help CPU profiling in release version
417+
std::optional<std::string_view> CLockFreeLineReader::GetNextLine()
418+
{
419+
if (this->_buffer1.ptr == nullptr)
420+
{
421+
return {};
422+
}
423+
assert(this->_buffer2.ptr != nullptr);
424+
425+
// Find EOL:
426+
size_t eolOffset = this->_bufferData.find('\n');
427+
428+
if (eolOffset == this->_bufferData.npos)
429+
{
430+
// EOL was not found. Make a choice between last line case and reading additional data from functor.
431+
432+
if (this->_bufferData.size() > MaxLogLineLength)
433+
{
434+
// Incomplete line is already too long
435+
return {};
436+
}
437+
438+
CCharBuffer& currentBuffer = this->_firstBufferIsActive ? this->_buffer1 : this->_buffer2;
439+
CCharBuffer& nextBuffer = this->_firstBufferIsActive ? this->_buffer2 : this->_buffer1;
440+
441+
const size_t prefixLength = this->_bufferData.size();
442+
assert(prefixLength <= MaxLogLineLength && "the rest of buffer is too big for moving to beginning");
443+
char* const newDataBufferPtr = nextBuffer.ptr + ReadBufferOffset - prefixLength;
444+
445+
// don't need memmove since the whole high level algorithm will fail if buffers overlap
446+
memcpy(newDataBufferPtr, this->_bufferData.data(), prefixLength);
447+
448+
size_t readBytes = 0;
449+
const bool readCompleteOk = this->_file.LockFreeReadWait(readBytes);
450+
if (!readCompleteOk)
451+
{
452+
// Previous reading failed
453+
return {};
454+
}
455+
456+
// Read missing data:
457+
const bool readOk = this->_file.LockFreeReadStart(currentBuffer.ptr + ReadBufferOffset, ReadChunkSize);
458+
if (!readOk)
459+
{
460+
// New reading failed
461+
return {};
462+
}
463+
464+
this->_bufferData = { newDataBufferPtr, prefixLength + readBytes };
465+
this->_firstBufferIsActive = !this->_firstBufferIsActive;
466+
467+
if (this->_bufferData.empty())
468+
{
469+
assert(readBytes == 0);
470+
// The very last line without LF is not counted
471+
return {};
472+
}
473+
474+
// Search EOL again after reading additional data:
475+
// I expect we read either ReadChunkSize bytes or we read the data chunk in file.
476+
eolOffset = this->_bufferData.find('\n', prefixLength);
477+
if (eolOffset == this->_bufferData.npos)
478+
{
479+
if (this->_bufferData.size() > MaxLogLineLength)
480+
{
481+
// Incomplete line is too long
482+
return {};
483+
}
484+
485+
// Found last line after reading missing data
486+
const std::string_view result = this->_bufferData;
487+
this->_bufferData = { this->_buffer1.ptr, 0 };
488+
assert(!result.empty() && "last line without LF should be not empty");
489+
return result;
490+
}
491+
}
492+
493+
const size_t foundLineLength = eolOffset + 1;
494+
495+
if (foundLineLength > MaxLogLineLength)
496+
{
497+
// Line is too long
498+
return {};
499+
}
500+
501+
const std::string_view result = this->_bufferData.substr(0, foundLineLength);
502+
this->_bufferData.remove_prefix(foundLineLength);
503+
504+
assert(foundLineLength > 0 && "result should contain at least LF char");
505+
return result;
506+
}
507+
508+
//////////////////////////////////////////////////////////////////////////
509+
//////////////////////////////////////////////////////////////////////////
510+
//////////////////////////////////////////////////////////////////////////

LineReader.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,29 @@ class CMappingLineReader
7070
bool _mappedToMemory = false;
7171
std::string_view _bufferData; // filled part of the current buffer
7272
};
73+
74+
//////////////////////////////////////////////////////////////////////////
75+
76+
class CLockFreeLineReader
77+
{
78+
public:
79+
CLockFreeLineReader();
80+
81+
bool Open(const wchar_t* const filename);
82+
void Close();
83+
84+
// request next matching line; line may contain '\0' and may end with '\n'; return false on error or EOF
85+
// returned line is never empty (it contains at least one '\n' or any other character).
86+
std::optional<std::string_view> GetNextLine();
87+
88+
protected:
89+
CScanFile _file;
90+
// Buffer structure: [ rest_of_previousline|data_read_from_file ]
91+
// [ len = MaxLogLineLength | len = ReadChunkSize ]
92+
bool _firstBufferIsActive = true;
93+
CCharBuffer _buffer1;
94+
CCharBuffer _buffer2;
95+
std::string_view _bufferData; // filled part of the current buffer
96+
};
97+
98+
//////////////////////////////////////////////////////////////////////////

LogReader.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,18 @@ class CLogReader final
4747
}
4848

4949
protected:
50-
#if 1
50+
#if 0
5151
#if 0
5252
CSyncLineReader _lineReader;
5353
#else
54-
CAsyncLineReader _lineReader;
54+
CMappingLineReader _lineReader;
5555
#endif
5656
#else
57-
CMappingLineReader _lineReader;
57+
#if 0
58+
CAsyncLineReader _lineReader;
59+
#else
60+
CLockFreeLineReader _lineReader;
61+
#endif
5862
#endif
5963
CCharBuffer _pattern;
6064
CFnMatch _lineMatcher;

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,6 @@
7575

7676
Как и в условии задачи, основное консольное приложение собрано с отключенными исключениями и заодно без RTTI.
7777

78-
Я притянул часть STL на мой страх и риск. Ту, часть, которая не требует исключений и работает без лишних накладных расходов.
78+
Я притянул часть STL на мой страх и риск. Ту часть, которая не требует исключений и работает без лишних накладных расходов.
7979

8080
---

ScanFile.cpp

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ bool CScanFile::Open(const wchar_t* const filename, const bool asyncMode)
1818
{
1919
return false;
2020
}
21-
2221
assert(this->_hEvent == nullptr);
2322

2423
const DWORD dwDesiredAccess = FILE_READ_DATA | FILE_READ_ATTRIBUTES; // minimal required rights
@@ -38,6 +37,7 @@ bool CScanFile::Open(const wchar_t* const filename, const bool asyncMode)
3837
return false;
3938
}
4039

40+
// Init data for async IO:
4141
this->_hEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
4242

4343
if (this->_hEvent == nullptr)
@@ -56,6 +56,8 @@ bool CScanFile::Open(const wchar_t* const filename, const bool asyncMode)
5656

5757
void CScanFile::Close()
5858
{
59+
this->LockFreecClean();
60+
5961
if (this->_pViewOfFile != nullptr)
6062
{
6163
UnmapViewOfFile(this->_pViewOfFile);
@@ -129,6 +131,8 @@ std::optional<std::string_view> CScanFile::MapToMemory()
129131
return std::string_view(static_cast<const char*>(this->_pViewOfFile), fileSizeAsSizeT);
130132
}
131133

134+
//////////////////////////////////////////////////////////////////////////
135+
132136
__declspec(noinline) // noinline is added to help CPU profiling in release version
133137
bool CScanFile::Read(char* const buffer, const size_t bufferLength, size_t& readBytes)
134138
{
@@ -152,6 +156,8 @@ bool CScanFile::Read(char* const buffer, const size_t bufferLength, size_t& read
152156
return !!succeeded;
153157
}
154158

159+
//////////////////////////////////////////////////////////////////////////
160+
155161
__declspec(noinline) // noinline is added to help CPU profiling in release version
156162
bool CScanFile::AsyncReadStart(char* const buffer, const size_t bufferLength)
157163
{
@@ -210,3 +216,132 @@ bool CScanFile::AsyncReadWait(size_t& readBytes)
210216

211217
return true;
212218
}
219+
220+
//////////////////////////////////////////////////////////////////////////
221+
222+
bool CScanFile::LockFreecInit()
223+
{
224+
if (this->_pThread != nullptr)
225+
{
226+
return false;
227+
}
228+
229+
this->_threadFinishSignal = false;
230+
this->_threadOperationReadStartSignal = false;
231+
this->_threadOperationReadCompletedSignal = false;
232+
{
233+
std::lock_guard<std::mutex> lock(this->_protectThreadData);
234+
this->_pThreadReadBuffer = nullptr;
235+
this->_threadReadBufferSize = 0;
236+
this->_threadActuallyReadBytes = 0;
237+
this->_threadReadSucceeded = false;
238+
}
239+
240+
this->_pThread = std::make_unique<std::thread>(&CScanFile::LockFreeThread, this);
241+
242+
return true;
243+
}
244+
245+
void CScanFile::LockFreecClean()
246+
{
247+
if (this->_pThread != nullptr)
248+
{
249+
this->_threadFinishSignal = true;
250+
this->_pThread->join();
251+
this->_pThread.reset();
252+
}
253+
}
254+
255+
__declspec(noinline) // noinline is added to help CPU profiling in release version
256+
bool CScanFile::LockFreeReadStart(char* const buffer, const size_t bufferLength)
257+
{
258+
if (this->_pThread == nullptr || this->_threadOperationInProgress)
259+
{
260+
return false;
261+
}
262+
263+
this->_threadOperationInProgress = true;
264+
265+
this->_threadOperationReadCompletedSignal = false;
266+
267+
{
268+
std::lock_guard<std::mutex> lock(this->_protectThreadData);
269+
this->_pThreadReadBuffer = buffer;
270+
this->_threadReadBufferSize = bufferLength;
271+
this->_threadActuallyReadBytes = 0;
272+
this->_threadReadSucceeded = false;
273+
}
274+
275+
this->_threadOperationReadStartSignal = true;
276+
277+
return true;
278+
}
279+
280+
__declspec(noinline) // noinline is added to help CPU profiling in release version
281+
bool CScanFile::LockFreeReadWait(size_t& readBytes)
282+
{
283+
if (this->_pThread == nullptr || !this->_threadOperationInProgress)
284+
{
285+
return false;
286+
}
287+
288+
this->_threadOperationInProgress = false;
289+
290+
while (!this->_threadOperationReadCompletedSignal)
291+
{
292+
// nothing
293+
}
294+
295+
{
296+
std::lock_guard<std::mutex> lock(this->_protectThreadData);
297+
readBytes = this->_threadActuallyReadBytes;
298+
if (!this->_threadReadSucceeded)
299+
{
300+
return false;
301+
}
302+
}
303+
304+
return true;
305+
}
306+
307+
void CScanFile::LockFreeThread()
308+
{
309+
while (true)
310+
{
311+
if (this->_threadFinishSignal)
312+
{
313+
// thread exit signal is caught
314+
break;
315+
}
316+
317+
if (!this->_threadOperationReadStartSignal)
318+
{
319+
// nothing to do
320+
continue;
321+
}
322+
323+
this->_threadOperationReadStartSignal = false;
324+
325+
char* buffer = nullptr;
326+
size_t bufferSize = 0;
327+
328+
{
329+
std::lock_guard<std::mutex> lock(this->_protectThreadData);
330+
buffer = this->_pThreadReadBuffer;
331+
bufferSize = this->_threadReadBufferSize;
332+
}
333+
334+
size_t readBytes = 0;
335+
const bool readOk = this->Read(buffer, bufferSize, readBytes);
336+
337+
{
338+
std::lock_guard<std::mutex> lock(this->_protectThreadData);
339+
this->_threadActuallyReadBytes = readBytes;
340+
this->_threadReadSucceeded = readOk;
341+
}
342+
343+
this->_threadOperationReadCompletedSignal = true;
344+
}
345+
}
346+
347+
//////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)