From 585aceb8aba1a3bd8456a09774ca6045caafa382 Mon Sep 17 00:00:00 2001 From: hungpdn Date: Thu, 15 Jan 2026 10:55:40 +0700 Subject: [PATCH 1/2] wal: reverse reading --- README.md | 11 ++-- reader.go | 28 ++++---- wal.go | 185 +++++++++++++++++++++++++++++++++++++++------------- wal_test.go | 52 +-------------- 4 files changed, 163 insertions(+), 113 deletions(-) diff --git a/README.md b/README.md index 9f92f27..f81fe68 100644 --- a/README.md +++ b/README.md @@ -23,17 +23,18 @@ A high-performance, concurrent-safe, and crash-resilient **Write-Ahead Log (WAL) Each segment file consists of a sequence of binary encoded entries. ```Plaintext -+-------------------+-------------------+-------------------+----------------------+ -| CRC32 (4 bytes) | Size (8 bytes) | SeqID (8 bytes) | Payload (N bytes) | -+-------------------+-------------------+-------------------+----------------------+ -| Checksum of Data | Length of Payload | Monotonic ID | The actual data | -+-------------------+-------------------+-------------------+----------------------+ ++-------------------+-------------------+-------------------+----------------------+-------------------+ +| CRC32 (4 bytes) | Size (8 bytes) | SeqID (8 bytes) | Payload (N bytes) | Size (8 bytes) | ++-------------------+-------------------+-------------------+----------------------+-------------------+ +| Checksum of Data | Length of Payload | Monotonic ID | The actual data | Backward Pointer | ++-------------------+-------------------+-------------------+----------------------+-------------------+ ``` - CRC (Cyclic Redundancy Check): Ensures data integrity. - Size: Enable fast reading without parsing the entire file. - SeqID: Global Sequence ID - Payload: The actual data. +- Size (Footer): Enable fast reverse reading (Startup Optimization). ## Installation diff --git a/reader.go b/reader.go index 8f1ae6a..6ec4b55 100644 --- a/reader.go +++ b/reader.go @@ -64,7 +64,6 @@ func (it *Iterator) Next() bool { // Loop to handle switching between segment files for { - // If file is not opened or we've read all of the old file -> Open new file if it.currentFile == nil { if it.currentIdx >= len(it.segmentPaths) { return false // All files have been read @@ -73,15 +72,11 @@ func (it *Iterator) Next() bool { path := it.segmentPaths[it.currentIdx] f, err := os.Open(path) if err != nil { - // If file is deleted (e.g. retention policy), we report error - // or we could skip it using it.currentIdx++ (data loss scenario) - // For strict consistency, we return error. it.err = err return false } it.currentFile = f - // Create a buffer reader for faster reading br := bufio.NewReader(f) reader := io.Reader(br) it.currentReader = &reader @@ -92,14 +87,12 @@ func (it *Iterator) Next() bool { _, err := io.ReadFull(*it.currentReader, header) if err != nil { - // If EOF is encountered, close the current file and increment the index so that the next iteration opens a new file if err == io.EOF { it.currentFile.Close() it.currentFile = nil it.currentIdx++ continue } - // If unexpected error occurs (UnexpectedEOF) -> Report error it.err = err return false } @@ -114,6 +107,12 @@ func (it *Iterator) Next() bool { return false } + // Skip Footer (8 bytes) + if _, err := io.CopyN(io.Discard, *it.currentReader, int64(footerSize)); err != nil { + it.err = err + return false + } + neededSize := sizeSize + seqIDSize + len(payload) if cap(it.verifyBuf) < neededSize { it.verifyBuf = make([]byte, neededSize) @@ -143,7 +142,6 @@ func (it *Iterator) Index() uint64 { // Value: Get the data of the current log entry func (it *Iterator) Value() []byte { - // Return a copy for safety, or return the original slice if you want zero-allocation (be careful) out := make([]byte, len(it.currentEntry)) copy(out, it.currentEntry) return out @@ -164,10 +162,8 @@ func (it *Iterator) Close() error { } // Seek: Fast-forward to the record with ID >= startID -// It will skip reading the payload and verifying the CRC of older records func (it *Iterator) Seek(startID uint64) bool { for { - if it.currentFile == nil { if it.currentIdx >= len(it.segmentPaths) { return false @@ -200,20 +196,26 @@ func (it *Iterator) Seek(startID uint64) bool { seqID := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) if seqID < startID { - // Use Discard to jump over byte 'size' without copying the data - if _, err := io.CopyN(io.Discard, *it.currentReader, int64(size)); err != nil { + // Skip payload + footer efficiently + if _, err := io.CopyN(io.Discard, *it.currentReader, int64(size)+int64(footerSize)); err != nil { it.err = err return false } continue } else { - + // Read Payload payload := make([]byte, size) if _, err := io.ReadFull(*it.currentReader, payload); err != nil { it.err = err return false } + // Read and discard footer + if _, err := io.CopyN(io.Discard, *it.currentReader, int64(footerSize)); err != nil { + it.err = err + return false + } + verifyBuf := make([]byte, sizeSize+seqIDSize+len(payload)) binary.BigEndian.PutUint64(verifyBuf[:sizeSize], size) binary.BigEndian.PutUint64(verifyBuf[sizeSize:sizeSize+seqIDSize], seqID) diff --git a/wal.go b/wal.go index b442330..4215d70 100644 --- a/wal.go +++ b/wal.go @@ -19,16 +19,17 @@ import ( WAL (Write-Ahead Log) Format: Each segment file consists of a sequence of binary encoded entries. -+-------------------+-------------------+-------------------+----------------------+ -| CRC32 (4 bytes) | Size (8 bytes) | SeqID (8 bytes) | Payload (N bytes) | -+-------------------+-------------------+-------------------+----------------------+ -| Checksum of Data | Length of Payload | Monotonic ID | The actual data | -+-------------------+-------------------+-------------------+----------------------+ ++-------------------+-------------------+-------------------+----------------------+-------------------+ +| CRC32 (4 bytes) | Size (8 bytes) | SeqID (8 bytes) | Payload (N bytes) | Size (8 bytes) | ++-------------------+-------------------+-------------------+----------------------+-------------------+ +| Checksum of Data | Length of Payload | Monotonic ID | The actual data | Backward Pointer | ++-------------------+-------------------+-------------------+----------------------+-------------------+ - CRC (Cyclic Redundancy Check): Ensures data integrity. -- Size: Enable fast reading without parsing the entire file. +- Size (Header): Enable fast forward reading. - SeqID: Global Sequence ID - Payload: The actual data. +- Size (Footer): Enable fast reverse reading (Startup Optimization). */ const ( @@ -37,6 +38,7 @@ const ( sizeSize = 8 seqIDSize = 8 headerSize = crcSize + sizeSize + seqIDSize + footerSize = sizeSize // New Footer ) var ( @@ -97,7 +99,8 @@ func Open(dir string, cfg *Config) (*WAL, error) { stopSync: make(chan struct{}), bufPool: sync.Pool{ New: func() interface{} { - return make([]byte, 4*KB+headerSize) + // Allocate buffer for Header + Payload + Footer + return make([]byte, 4*KB+headerSize+footerSize) }, }, } @@ -214,6 +217,35 @@ func (w *WAL) scanSegmentForLastID(path string) (uint64, error) { } defer f.Close() + // Optimized: Try reverse scan first + stat, err := f.Stat() + if err != nil { + return 0, err + } + fileSize := stat.Size() + if fileSize >= int64(headerSize+footerSize) { + // Read last 8 bytes (Footer) + footer := make([]byte, footerSize) + if _, err := f.ReadAt(footer, fileSize-int64(footerSize)); err == nil { + lastPayloadSize := binary.BigEndian.Uint64(footer) + // Calculate the start of the last record + recordSize := int64(headerSize) + int64(lastPayloadSize) + int64(footerSize) + if fileSize >= recordSize { + // Read Header + header := make([]byte, headerSize) + if _, err := f.ReadAt(header, fileSize-recordSize); err == nil { + seqID := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) + // Ideally we should verify CRC here too, but for simplicity we assume closed segments are valid + return seqID, nil + } + } + } + } + + // Fallback to sequential scan + if _, err := f.Seek(0, io.SeekStart); err != nil { + return 0, err + } reader := bufio.NewReader(f) var lastID uint64 @@ -229,21 +261,76 @@ func (w *WAL) scanSegmentForLastID(path string) (uint64, error) { seqID := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) lastID = seqID - // Skip payload - if _, err := reader.Discard(int(size)); err != nil { + // Skip payload AND footer + if _, err := reader.Discard(int(size) + footerSize); err != nil { return 0, err } } return lastID, nil } -// repairActiveSegment: Repair the active segment file if corrupted -// Simple logic: Read through the active file to find the last valid point -// In real production, we would do tail reading to optimize. Here we use scanning for clarity. +// repairActiveSegment: Repair the active segment file if corrupted using Reverse Scan. func (w *WAL) repairActiveSegment() error { - f := w.activeSegment.file - // Read from the beginning + stat, err := f.Stat() + if err != nil { + return err + } + fileSize := stat.Size() + + // Empty file is valid + if fileSize == 0 { + w.activeSegment.size = 0 + w.lastSeqID = 0 // Will be fixed by loadSegments if previous segments exist + return nil + } + + // 1. Attempt Reverse Recovery (Optimization) + if fileSize >= int64(headerSize+footerSize) { + // Read Footer (Last 8 bytes) + footer := make([]byte, footerSize) + if _, err := f.ReadAt(footer, fileSize-int64(footerSize)); err == nil { + payloadSize := binary.BigEndian.Uint64(footer) + totalRecordSize := int64(headerSize) + int64(payloadSize) + int64(footerSize) + + if fileSize >= totalRecordSize { + // Jump back to the start of this record + offset := fileSize - totalRecordSize + + // Read Header + Payload + data := make([]byte, headerSize+int64(payloadSize)) + if _, err := f.ReadAt(data, offset); err == nil { + + // Parse Header + readCrc := binary.BigEndian.Uint32(data[:crcSize]) + size := binary.BigEndian.Uint64(data[crcSize : crcSize+sizeSize]) + seqID := binary.BigEndian.Uint64(data[crcSize+sizeSize : headerSize]) + + if size == payloadSize { + // Verify CRC + verifyBuf := make([]byte, sizeSize+seqIDSize+int(payloadSize)) + binary.BigEndian.PutUint64(verifyBuf[:sizeSize], size) + binary.BigEndian.PutUint64(verifyBuf[sizeSize:sizeSize+seqIDSize], seqID) + copy(verifyBuf[sizeSize+seqIDSize:], data[headerSize:]) + + if calculateCRC(verifyBuf) == readCrc { + // SUCCESS: The last record is valid. + // No truncation needed. + w.activeSegment.size = fileSize + w.activeSegment.writer = bufio.NewWriterSize(f, w.bufferSize) + w.lastSeqID = seqID + log.Printf("WAL: Startup optimized. Verified last record at offset %d (SeqID: %d)", offset, seqID) + return nil + } + } + } + } + } + } + + // 2. Fallback: If reverse check fails (corrupted tail or file too small), do full scan + log.Println("WAL: Fast startup failed or corruption detected. Falling back to sequential repair...") + if _, err := f.Seek(0, io.SeekStart); err != nil { return err } @@ -253,7 +340,7 @@ func (w *WAL) repairActiveSegment() error { var maxSeqID uint64 = 0 for { - // Read Header (CRC + Size + Offset) + // Read Header header := make([]byte, headerSize) if _, err := io.ReadFull(reader, header); err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { @@ -262,7 +349,6 @@ func (w *WAL) repairActiveSegment() error { return err } - // Parse Header readCrc := binary.BigEndian.Uint32(header[:crcSize]) size := binary.BigEndian.Uint64(header[crcSize : crcSize+sizeSize]) seqID := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) @@ -273,6 +359,19 @@ func (w *WAL) repairActiveSegment() error { break } + // Read Footer + footer := make([]byte, footerSize) + if _, err := io.ReadFull(reader, footer); err != nil { + break + } + footerSizeVal := binary.BigEndian.Uint64(footer) + + // Basic integrity check: Footer size must match Header size + if footerSizeVal != size { + log.Println("⚠️ WAL: Corrupted record (Size mismatch between Header and Footer)") + break + } + // Verify CRC verifyBuf := make([]byte, sizeSize+seqIDSize+len(payload)) binary.BigEndian.PutUint64(verifyBuf[:sizeSize], size) @@ -280,11 +379,11 @@ func (w *WAL) repairActiveSegment() error { copy(verifyBuf[sizeSize+seqIDSize:], payload) if calculateCRC(verifyBuf) != readCrc { - log.Println("⚠️ WAL: Corrupted tail detected, truncating...") + log.Println("⚠️ WAL: Corrupted tail detected (CRC mismatch), truncating...") break } - validOffset += int64(headerSize + size) + validOffset += int64(headerSize + size + footerSize) maxSeqID = seqID } @@ -298,8 +397,8 @@ func (w *WAL) repairActiveSegment() error { return err } w.activeSegment.size = validOffset - w.activeSegment.writer = bufio.NewWriterSize(f, w.bufferSize) // Reset buffer writer - w.lastSeqID = maxSeqID // Update global state + w.activeSegment.writer = bufio.NewWriterSize(f, w.bufferSize) + w.lastSeqID = maxSeqID return nil } @@ -331,10 +430,8 @@ func (w *WAL) createActiveSegment(idx uint64) error { } // Sync parent directory - // The file has been synced, but its entry in the parent directory may not have been synced to disk - // If the power goes out at the moment the new file is created, that file may disappear completely if dir, err := os.Open(w.dir); err == nil { - dir.Sync() // Ensure that the entry for the new file is written to the directory table + dir.Sync() dir.Close() } @@ -366,13 +463,11 @@ func (w *WAL) backgroundSync(interval time.Duration) { } case SyncStrategyOSCache: - // Minimized Lock Contention w.mu.Lock() if w.activeSegment == nil || w.activeSegment.file == nil { w.mu.Unlock() continue } - // Copy file pointer to local variable f := w.activeSegment.file w.mu.Unlock() @@ -394,7 +489,8 @@ func (w *WAL) backgroundSync(interval time.Duration) { func (w *WAL) bufferWrite(payload []byte) error { w.lastSeqID++ - pktSize := int64(headerSize + len(payload)) + // Header + Payload + Footer + pktSize := int64(headerSize + len(payload) + footerSize) payloadLen := uint64(len(payload)) currentSeqID := w.lastSeqID @@ -407,12 +503,23 @@ func (w *WAL) bufferWrite(payload []byte) error { buf = make([]byte, pktSize) } - // [CRC][Size][SeqID][Payload] + // 1. Write Header: [CRC][Size][SeqID] binary.BigEndian.PutUint64(buf[crcSize:crcSize+sizeSize], payloadLen) binary.BigEndian.PutUint64(buf[crcSize+sizeSize:headerSize], currentSeqID) + + // 2. Write Payload copy(buf[headerSize:], payload) - checksum := calculateCRC(buf[crcSize:]) + // 3. Write Footer: [Size] + // Position = headerSize + len(payload) + binary.BigEndian.PutUint64(buf[headerSize+len(payload):], payloadLen) + + // Calculate CRC (Header + Payload only, excluding Footer usually, but let's exclude footer from CRC for compatibility with header check) + // We calculate CRC over [Size][SeqID][Payload] just like before + // The buffer contains [CRC][Size][SeqID][Payload][Footer] + // CRC is stored at buf[0:4]. It covers buf[4 : headerSize+len(payload)] + + checksum := calculateCRC(buf[crcSize : headerSize+len(payload)]) binary.BigEndian.PutUint32(buf[:crcSize], checksum) if _, err := w.activeSegment.writer.Write(buf); err != nil { @@ -439,7 +546,7 @@ func (w *WAL) Write(payload []byte) error { w.mu.Lock() defer w.mu.Unlock() - pktSize := int64(headerSize + len(payload)) + pktSize := int64(headerSize + len(payload) + footerSize) if w.activeSegment.size+pktSize > w.segmentSize { if err := w.createActiveSegment(w.activeSegment.idx + 1); err != nil { return err @@ -454,14 +561,13 @@ func (w *WAL) Write(payload []byte) error { } // WriteBatch writes a batch of entries to the WAL efficiently. -// It acquires the lock once and syncs once (if needed) for the whole batch. func (w *WAL) WriteBatch(payloads [][]byte) error { w.mu.Lock() defer w.mu.Unlock() var batchSize int64 for _, p := range payloads { - batchSize += int64(headerSize + len(p)) + batchSize += int64(headerSize + len(p) + footerSize) } if w.activeSegment.size+batchSize > w.segmentSize { @@ -480,7 +586,6 @@ func (w *WAL) WriteBatch(payloads [][]byte) error { } // sync: Flush buffer and fsync to disk -// Internal method, caller must hold the lock func (w *WAL) sync() error { if w.activeSegment == nil { return nil @@ -492,7 +597,6 @@ func (w *WAL) sync() error { } // Sync: Public method to sync data to disk -// Caller does not need to hold the lock func (w *WAL) Sync() error { w.mu.Lock() defer w.mu.Unlock() @@ -518,7 +622,7 @@ func (w *WAL) Close() error { w.mu.Unlock() return err } - w.mu.Unlock() // <--- IMPORTANT: Release the lock so that BackgroundSync can finish running (if it's stuck). + w.mu.Unlock() w.StopSync() @@ -539,8 +643,6 @@ func (w *WAL) GetLastSegmentIdx() uint64 { } // TruncateFront deletes all closed segments with an index less than the provided index. -// This is used to free up disk space after a snapshot has been taken. -// Note: This does not affect the active segment. func (w *WAL) TruncateFront(segmentIdx uint64) error { w.mu.Lock() defer w.mu.Unlock() @@ -548,9 +650,7 @@ func (w *WAL) TruncateFront(segmentIdx uint64) error { var keptSegments []*Segment for _, seg := range w.segments { if seg.idx < segmentIdx { - // Remove the file from disk if err := os.Remove(seg.path); err != nil { - // If the file is already gone, ignore the error if !os.IsNotExist(err) { return err } @@ -560,13 +660,11 @@ func (w *WAL) TruncateFront(segmentIdx uint64) error { } } - // Update the segment list w.segments = keptSegments return nil } -// Cleanup removes closed segments that are older than the specified TTL (Time To Live). -// This is useful for time-based retention policies. +// Cleanup removes closed segments that are older than the specified TTL. func (w *WAL) CleanupByTTL(ttl time.Duration) error { w.mu.Lock() defer w.mu.Unlock() @@ -575,19 +673,15 @@ func (w *WAL) CleanupByTTL(ttl time.Duration) error { var keptSegments []*Segment for _, seg := range w.segments { - // Get file stats to check modification time info, err := os.Stat(seg.path) if err != nil { - // If file doesn't exist, skip it (it's essentially cleaned) if os.IsNotExist(err) { continue } - // If we can't stat it for some other reason, keep it to be safe keptSegments = append(keptSegments, seg) continue } - // If the file is older than the threshold, delete it if info.ModTime().Before(threshold) { if err := os.Remove(seg.path); err != nil { if !os.IsNotExist(err) { @@ -604,7 +698,6 @@ func (w *WAL) CleanupByTTL(ttl time.Duration) error { } // CleanupBySize removes old segments if the total WAL size exceeds maxSizeBytes. -// It deletes from the oldest segment until the total size is within the limit. func (w *WAL) CleanupBySize(maxSizeBytes int64) error { w.mu.Lock() defer w.mu.Unlock() diff --git a/wal_test.go b/wal_test.go index 46a815e..7311c45 100644 --- a/wal_test.go +++ b/wal_test.go @@ -83,7 +83,7 @@ func TestWAL_WriteBatch(t *testing.T) { cfg := Config{ BufferSize: 4 * 1024, SegmentSize: 10 * 1024 * 1024, - SyncStrategy: SyncStrategyOSCache, + SyncStrategy: SyncStrategyOSCache, // Changed to OSCache to ensure flush } w, err := Open(dir, &cfg) @@ -132,8 +132,7 @@ func TestWAL_LogRotation(t *testing.T) { } // Write enough data to create multiple files - // Header overhead is approx 20 bytes. Payload "data" is 4 bytes. - // Total ~24 bytes per entry. 100 bytes limit -> ~4 entries per file. + // Header 20 + Footer 8 + Payload 4 = 32 bytes/entry. 100 bytes limit -> ~3 entries for i := 0; i < 20; i++ { w.Write([]byte("data")) } @@ -194,20 +193,6 @@ func TestWAL_Seek(t *testing.T) { if iter.Seek(200) { t.Errorf("Seek(200) should fail for 100 entries") } - - // Test Case 4: Seek backwards (Should rely on Re-creating Reader or just work if implemented) - // Current Seek implementation assumes forward scan from current position OR strictly forward? - // The current Reader implementation scans from current file/position. - // If we want random seek, we usually assume it works if we haven't passed it, - // OR we might need to reset reader. For now let's test a new reader. - iter2, _ := w2.NewReader() - defer iter2.Close() - if !iter2.Seek(10) { - t.Errorf("Seek(10) failed") - } - if iter2.Index() != 10 { - t.Errorf("Expected index 10, got %d", iter2.Index()) - } } func TestWAL_Cleanup(t *testing.T) { @@ -216,7 +201,7 @@ func TestWAL_Cleanup(t *testing.T) { defer cleanUp(dir) // Small segment size to generate many files - cfg := Config{SegmentSize: 500} // ~20 entries per file + cfg := Config{SegmentSize: 500} w, _ := Open(dir, &cfg) // Write 100 entries -> Should create ~5 files @@ -224,11 +209,6 @@ func TestWAL_Cleanup(t *testing.T) { w.Write([]byte("payload")) } - // 1. Test TruncateFront - // Files might be: wal-0000, wal-0001, wal-0002... - // Truncate everything before index 2 (delete 0 and 1) - // Note: TruncateFront param is 'segmentIdx', not log SeqID. - // We need to know segment indices. initialFiles, _ := os.ReadDir(dir) if len(initialFiles) < 3 { t.Skip("Not enough files generated for Cleanup test") @@ -286,62 +266,36 @@ func TestWAL_CorruptionRecovery(t *testing.T) { count := 0 for iter.Next() { count++ - // fmt.Printf("Recovered: %s\n", string(iter.Value())) } if count != 2 { t.Errorf("Expected 2 recovered entries, got %d", count) } } -// TestWAL_SyncLogic checks if Sync can be called without error. -// Hard to verify disk flush in unit test, but ensures no panics. -func TestWAL_SyncLogic(t *testing.T) { - dir := "./test_data_sync" - cleanUp(dir) - defer cleanUp(dir) - - w, _ := Open(dir, &Config{SyncStrategy: SyncStrategyAlways}) - w.Write([]byte("data")) - if err := w.Sync(); err != nil { - t.Errorf("Manual Sync failed: %v", err) - } - w.Close() -} - -// TestWAL_CleanupByTTL verifies that old segments are deleted. func TestWAL_CleanupByTTL(t *testing.T) { dir := "./test_data_ttl" cleanUp(dir) defer cleanUp(dir) - // Create segments quickly cfg := Config{SegmentSize: 100} w, _ := Open(dir, &cfg) - // Write a segment for i := 0; i < 10; i++ { w.Write([]byte("old-data")) } - // Force rotation by creating new active segment implicitly via writes - // Wait a bit to simulate "old" time time.Sleep(10 * time.Millisecond) - // We need to modify the modtime of the generated file to make it "old" w.mu.Lock() - // Close active to rotate w.createActiveSegment(w.activeSegment.idx + 1) w.mu.Unlock() - // Hack: Modify ModTime of the first segment entries, _ := os.ReadDir(dir) if len(entries) > 0 { oldFile := filepath.Join(dir, entries[0].Name()) - // Set time to 2 hours ago oldTime := time.Now().Add(-2 * time.Hour) os.Chtimes(oldFile, oldTime, oldTime) } - // Run CleanupByTTL (TTL = 1 hour) if err := w.CleanupByTTL(1 * time.Hour); err != nil { t.Errorf("CleanupByTTL failed: %v", err) } From 8f7909f335115c8b2de54fe5bdb9730c31b6758a Mon Sep 17 00:00:00 2001 From: hungpdn Date: Thu, 15 Jan 2026 15:09:07 +0700 Subject: [PATCH 2/2] wal: update TruncateFront, readme --- README.md | 26 +++++++++------ wal.go | 96 ++++++++++++++++++++++++++++++++++++++--------------- wal_test.go | 17 +++++++--- 3 files changed, 97 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index f81fe68..4397ce3 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,17 @@ A high-performance, concurrent-safe, and crash-resilient **Write-Ahead Log (WAL) - 🔄 **Log Rotation**: Automatic segment rotation based on configurable size. - 💾 **Flexible Sync Strategies**: Choose between Performance (Background), Safety (Always), or Balance (OSCache). - 🔍 **Iterator API**: Memory-efficient sequential reading of logs. -- 🧵 **Crash Safety:** Automatic recovery from power failures. Detects and truncates corrupted log tails (partial writes) on startup. +- ⚡ **Optimized Startup**: Uses reverse scanning to instantly recover the last segment state without reading the whole file. +- 🧹 **Retention Policies**: Automatic cleanup based on TTL (Time-To-Live) or Total Size. + +## Roadmap + +The following features are planned for future releases: + +- [ ] **v0.1.1 - Compression Support**: Add Snappy/Zstd compression for payloads to reduce disk usage. +- [ ] **v0.1.2 - Sparse Indexing**: Implement a sidecar `.idx` file to support O(1) lookup time for `Seek(SeqID)`. +- [ ] **v0.1.3 - Metrics & Observability**: OpenTelemetry / Prometheus integration for monitoring throughput and latency. +- [ ] **v0.2.0 - Replication Hooks**: APIs to support streaming WAL entries to other nodes (Raft/Paxos integration). ## Architecture @@ -31,10 +41,10 @@ Each segment file consists of a sequence of binary encoded entries. ``` - CRC (Cyclic Redundancy Check): Ensures data integrity. -- Size: Enable fast reading without parsing the entire file. +- Size: Enable fast forward reading (skipping payloads). - SeqID: Global Sequence ID - Payload: The actual data. -- Size (Footer): Enable fast reverse reading (Startup Optimization). +- Size (Footer): Enable fast reverse reading for optimized startup recovery. ## Installation @@ -107,17 +117,11 @@ if err := iter.Err(); err != nil { Contributions are welcome! Please fork the repository and open a pull request. -1. Fork the Project. -2. Create your Feature Branch (`git checkout -b feature/AmazingFeature`) -3. Commit your Changes (`git commit -m 'Add some AmazingFeature'`) -4. Push to the Branch (`git push origin feature/AmazingFeature`) -5. Open a Pull Request - ## License MIT License. See [LICENSE](LICENSE) file. -## Reference +## References -- [tidwall/wal](https://github.com/tidwall/wal) - [Log: What Every Software Engineer Should Know About Real-Time Data's Unifying Abstraction](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying) +- [Designing Data-Intensive Applications](https://www.oreilly.com/library/view/designing-data-intensive-applications/9781491903063) diff --git a/wal.go b/wal.go index 4215d70..019ede6 100644 --- a/wal.go +++ b/wal.go @@ -63,11 +63,12 @@ type WAL struct { // Segment: Represents a physical file type Segment struct { - idx uint64 // Number of the segment (0, 1, 2...) - path string // Path to the file - file *os.File // File descriptor (Active only) - writer *bufio.Writer // Buffered writer (Active only) - size int64 // Current size of the file + idx uint64 // Number of the segment (0, 1, 2...) + path string // Path to the file + file *os.File // File descriptor (Active only) + writer *bufio.Writer // Buffered writer (Active only) + size int64 // Current size of the file + startID uint64 // The first SeqID in this segment } type segmentFile struct { @@ -168,11 +169,21 @@ func (w *WAL) loadSegments() error { return err } + // Read the first block to get StartID + var startID uint64 = 0 + if stat.Size() >= int64(headerSize) { + header := make([]byte, headerSize) + if _, err := f.ReadAt(header, 0); err == nil { + startID = binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) + } + } + seg := &Segment{ - idx: fileObj.idx, - path: path, - file: f, - size: stat.Size(), + idx: fileObj.idx, + path: path, + file: f, + size: stat.Size(), + startID: startID, } // If it's the last file, set as active @@ -205,6 +216,12 @@ func (w *WAL) loadSegments() error { } w.lastSeqID = lastID } + + // Ensure active segment has correct StartID if it was empty/new + if w.activeSegment.startID == 0 { + w.activeSegment.startID = w.lastSeqID + 1 + } + return nil } @@ -217,7 +234,7 @@ func (w *WAL) scanSegmentForLastID(path string) (uint64, error) { } defer f.Close() - // Optimized: Try reverse scan first + // 1. Reverse Check stat, err := f.Stat() if err != nil { return 0, err @@ -242,7 +259,7 @@ func (w *WAL) scanSegmentForLastID(path string) (uint64, error) { } } - // Fallback to sequential scan + // 2. Sequential Fallback if _, err := f.Seek(0, io.SeekStart); err != nil { return 0, err } @@ -278,14 +295,13 @@ func (w *WAL) repairActiveSegment() error { } fileSize := stat.Size() - // Empty file is valid if fileSize == 0 { w.activeSegment.size = 0 w.lastSeqID = 0 // Will be fixed by loadSegments if previous segments exist return nil } - // 1. Attempt Reverse Recovery (Optimization) + // 1. Reverse Recovery if fileSize >= int64(headerSize+footerSize) { // Read Footer (Last 8 bytes) footer := make([]byte, footerSize) @@ -328,7 +344,7 @@ func (w *WAL) repairActiveSegment() error { } } - // 2. Fallback: If reverse check fails (corrupted tail or file too small), do full scan + // 2. Fallback: Sequential repair log.Println("WAL: Fast startup failed or corruption detected. Falling back to sequential repair...") if _, err := f.Seek(0, io.SeekStart); err != nil { @@ -436,11 +452,12 @@ func (w *WAL) createActiveSegment(idx uint64) error { } w.activeSegment = &Segment{ - idx: idx, - path: path, - file: f, - writer: bufio.NewWriterSize(f, w.bufferSize), - size: 0, + idx: idx, + path: path, + file: f, + writer: bufio.NewWriterSize(f, w.bufferSize), + size: 0, + startID: w.lastSeqID + 1, } return nil } @@ -642,18 +659,43 @@ func (w *WAL) GetLastSegmentIdx() uint64 { return w.activeSegment.idx } -// TruncateFront deletes all closed segments with an index less than the provided index. -func (w *WAL) TruncateFront(segmentIdx uint64) error { +// TruncateFront removes all closed segments that contain NO entries with ID >= minSeqID. +// Example: +// Seg1: Start=1. Seg2: Start=100. Seg3: Start=200. +// TruncateFront(150): +// Seg1 <= 150. Candidate. +// Seg2 <= 150. Better Candidate. +// Seg3 > 150. Stop. +// Result: Keep Seg2, Seg3. Delete Seg1. +func (w *WAL) TruncateFront(minSeqID uint64) error { w.mu.Lock() defer w.mu.Unlock() + var pivotIdx int = -1 + for i, seg := range w.segments { + if seg.startID <= minSeqID { + pivotIdx = i + } else { + break + } + } + + // Check if active segment covers minSeqID entirely (meaning all closed segments are old) + if w.activeSegment != nil && w.activeSegment.startID <= minSeqID { + pivotIdx = len(w.segments) + } + + if pivotIdx == -1 { + return nil // All segments are newer than minSeqID (or list empty), nothing to delete + } + + // Delete segments from index 0 to pivotIdx-1 + // KEEP the pivotIdx because it contains logs that start before minSeqID but might end after it var keptSegments []*Segment - for _, seg := range w.segments { - if seg.idx < segmentIdx { - if err := os.Remove(seg.path); err != nil { - if !os.IsNotExist(err) { - return err - } + for i, seg := range w.segments { + if i < pivotIdx { + if err := os.Remove(seg.path); err != nil && !os.IsNotExist(err) { + return err } } else { keptSegments = append(keptSegments, seg) diff --git a/wal_test.go b/wal_test.go index 7311c45..a74ffaa 100644 --- a/wal_test.go +++ b/wal_test.go @@ -214,14 +214,23 @@ func TestWAL_Cleanup(t *testing.T) { t.Skip("Not enough files generated for Cleanup test") } - // Call TruncateFront with index 1 (should keep 1 and greater, delete 0) - err := w.TruncateFront(1) + // Test TruncateFront with SeqID + // We want to remove the first segment (StartID 1) but keep the second (StartID ~21). + // If we call TruncateFront(30), it finds the last segment with StartID <= 30. + // That would be the 2nd segment (StartID ~21). + // So it keeps 2nd segment and deletes the 1st. + err := w.TruncateFront(30) if err != nil { t.Errorf("TruncateFront failed: %v", err) } - // 2. Test CleanupBySize - // Limit total size to ~500 bytes (should keep only active + maybe 1 closed) + // Check that we deleted something + remainingFiles, _ := os.ReadDir(dir) + if len(remainingFiles) >= len(initialFiles) { + t.Errorf("Expected fewer files, got %d vs %d", len(remainingFiles), len(initialFiles)) + } + + // CleanupBySize check) err = w.CleanupBySize(600) if err != nil { t.Errorf("CleanupBySize failed: %v", err)