From 9eb255d5806b79f9bed8bde47d9cc5985ecb4a75 Mon Sep 17 00:00:00 2001 From: hungpdn Date: Wed, 14 Jan 2026 15:37:44 +0700 Subject: [PATCH 1/3] wal: add {truncate, retention, write batch}, improve perf (alloc optimization, dir durability) --- README.md | 14 +- config.go | 28 ++-- examples/main.go | 132 +++++++++------ go.mod | 2 +- reader.go | 147 ++++++++++++++++ wal.go | 425 +++++++++++++++++++++++------------------------ wal_test.go | 7 +- 7 files changed, 459 insertions(+), 296 deletions(-) create mode 100644 reader.go diff --git a/README.md b/README.md index 4afc890..b7fd743 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # WAL: High-Performance Write-Ahead Log in Go -![Go Version](https://img.shields.io/badge/go-1.25-blue) +![Go Version](https://img.shields.io/badge/go-1.22-blue) ![License](https://img.shields.io/badge/license-MIT-green) [![Go Report Card](https://goreportcard.com/badge/github.com/hungpdn/wal)](https://goreportcard.com/report/github.com/hungpdn/wal) @@ -53,13 +53,12 @@ import ( ) func main() { - cfg := wal.Config{ - WALDir: "./wal_data", + opts := wal.Options{ SegmentSize: 10 * 1024 * 1024, // 10MB SyncStrategy: wal.SyncStrategyOSCache, } - w, _ := wal.New(cfg) + w, _ := wal.Open("./wal_data", &opts) defer w.Close() // Write data @@ -71,7 +70,7 @@ func main() { ### Reading Data (Replay) ```go -w, _ := wal.New(cfg) // Auto-recovers on open +w, _ := wal.Open("", &opts) // Auto-recovers on open iter, _ := w.NewReader() defer iter.Close() @@ -86,11 +85,10 @@ if err := iter.Err(); err != nil { } ``` -### Configuration +### Options |Field |Type |Default |Description | |------------|------|----------|----------------------------------------------------| -|WALDir |string|./wal |Directory to store segment files. | |SegmentSize |int64 |10MB |Max size of a single segment file before rotation. | |BufferSize |int |4KB |Size of the in-memory buffer. | |SyncStrategy|int |Background|0: Background, 1: Always (Fsync), 2: OSCache (Recm).| @@ -122,7 +120,7 @@ MIT License. See [LICENSE](LICENSE) file. - [x] Tests - [ ] Retention policy (remove, upload to s3, gcs, etc) - [ ] Index -- [ ] CI +- [x] CI - [ ] Benchmarks - [ ] Documentation - [x] Open source template (makefile, license, code of conduct, contributing, etc) diff --git a/config.go b/config.go index 3eb2095..fc62eb4 100644 --- a/config.go +++ b/config.go @@ -5,6 +5,9 @@ import "hash/crc32" const ( KB = 1024 // 1 Kilobyte MB = KB * KB // 1 Megabyte + + PermMkdir = 0755 + PermFileOpen = 0600 ) // SyncStrategy defines the synchronization strategy for WAL. @@ -25,9 +28,8 @@ const ( SyncStrategyOSCache SyncStrategy = 2 ) -// Config holds the configuration for WAL. -type Config struct { - WALDir string // Directory to store WAL files +// Options holds the configuration for WAL. +type Options struct { BufferSize int // Buffered writes size in bytes (e.g., 4KB) SegmentSize int64 // Maximum size of each file (e.g., 10MB) SegmentPrefix string // Prefix for segment file names (e.g., "segment") @@ -35,9 +37,8 @@ type Config struct { SyncInterval uint // Sync interval in milliseconds for background sync } -// DefaultConfig provides default configuration values for WAL. -var DefaultConfig = Config{ - WALDir: "./wal", +// DefaultOptions provides default configuration values for WAL. +var DefaultOptions = Options{ BufferSize: 4 * KB, // 4KB SegmentSize: 10 * MB, // 10MB SegmentPrefix: "segment", @@ -45,22 +46,19 @@ var DefaultConfig = Config{ SyncInterval: 1000, // 1000ms = 1s } -// SetDefault sets default values for any zero-value fields in the Config. -func (cfg *Config) SetDefault() { - if cfg.WALDir == "" { - cfg.WALDir = DefaultConfig.WALDir - } +// SetDefault sets default values for any zero-value fields in the Options. +func (cfg *Options) SetDefault() { if cfg.BufferSize == 0 { - cfg.BufferSize = DefaultConfig.BufferSize + cfg.BufferSize = DefaultOptions.BufferSize } if cfg.SegmentSize < MB { - cfg.SegmentSize = DefaultConfig.SegmentSize + cfg.SegmentSize = DefaultOptions.SegmentSize } if cfg.SegmentPrefix == "" { - cfg.SegmentPrefix = DefaultConfig.SegmentPrefix + cfg.SegmentPrefix = DefaultOptions.SegmentPrefix } if cfg.SyncInterval == 0 { - cfg.SyncInterval = DefaultConfig.SyncInterval + cfg.SyncInterval = DefaultOptions.SyncInterval } } diff --git a/examples/main.go b/examples/main.go index 27fb176..eb32063 100644 --- a/examples/main.go +++ b/examples/main.go @@ -3,96 +3,120 @@ package main import ( "fmt" "log" - "time" + "os" "github.com/hungpdn/wal" ) func main() { walDir := "./wal_data" + _ = os.RemoveAll(walDir) + + fmt.Println("๐Ÿš€ WAL Library - Full Feature Demo") + fmt.Println("==================================") + + // 1. Configuration + // We use a very small SegmentSize (10KB) to demonstrate Log Rotation and Cleanup easily. + cfg := wal.Options{ + BufferSize: 4 * 1024, // 4KB Buffer + SegmentSize: 10 * 1024, // 10KB (Small for demo purposes) + SegmentPrefix: "wal", // Prefix: wal-0000.wal + SyncStrategy: wal.SyncStrategyOSCache, // Performance + Safety balanced + SyncInterval: 500, // Sync every 500ms + } // ========================================== - // SCENARIO 1: NORMAL DATA RECORDING + // PART 1: WRITING (Basic & Batch) // ========================================== - fmt.Println("๐Ÿš€ [Phase 1] Initialize WAL & Write data...") - - // 1. Config - cfg := wal.Config{ - WALDir: walDir, - BufferSize: 4 * 1024, // 4KB Buffer - SegmentSize: 100 * 1024, // 100KB per file (to test file rotation speed) - SyncStrategy: wal.SyncStrategyOSCache, // Strategy 2 (Recommended) - SyncInterval: 1000, // Sync every 1s - } + fmt.Println("\n๐Ÿ“ [Part 1] Writing Data...") - // 2. Initialize - w, err := wal.New(cfg) + w, err := wal.Open(walDir, &cfg) if err != nil { log.Fatalf("โŒ Init failed: %v", err) } - // 3. Write 5000 log - start := time.Now() - for i := 0; i < 5000; i++ { - payload := fmt.Sprintf("Log-Entry-%d: User A transferred $100 to B", i) - if err := w.Write([]byte(payload)); err != nil { - log.Fatalf("โŒ Write error: %v", err) - } - // Simulate small delay to allow background sync to run - if i%1000 == 0 { - time.Sleep(10 * time.Millisecond) + // A. Basic Write + fmt.Println(" -> Writing 500 individual entries...") + for i := 0; i < 500; i++ { + payload := []byte(fmt.Sprintf("Entry-%d", i)) + if err := w.Write(payload); err != nil { + log.Fatalf("Write error: %v", err) } } - fmt.Printf("โœ… Write finished 5000 log in %v\n", time.Since(start)) - // 4. Close securely - if err := w.Close(); err != nil { - log.Fatalf("โŒ Close error: %v", err) + // B. Batch Write (Higher Throughput) + fmt.Println(" -> Writing 500 entries using WriteBatch...") + var batch [][]byte + for i := 500; i < 1000; i++ { + batch = append(batch, []byte(fmt.Sprintf("BatchEntry-%d", i))) } - fmt.Println("๐Ÿ”’ WAL has been closed. Data is now safe on disk.") + // Writes all 500 entries acquiring the lock only once + if err := w.WriteBatch(batch); err != nil { + log.Fatalf("Batch write error: %v", err) + } + + // Get current segment index to see rotation + lastIdx := w.GetLastSegmentIdx() + fmt.Printf(" โœ… Write complete. Current Active Segment Index: %d\n", lastIdx) + + w.Close() // ========================================== - // SCENARIO 2: RECOVERY & REPLAY + // PART 2: READING (Iterator) // ========================================== - fmt.Println("\n๐Ÿ”„ [Phase 2] Restart & Replay Simulation...") + fmt.Println("\n๐Ÿ“– [Part 2] Reading Data (Replay)...") - // 1. Reopen WAL (Automatic Recovery if errors occur) - w2, err := wal.New(cfg) + wRead, err := wal.Open(walDir, &cfg) if err != nil { - log.Fatalf("โŒ Recovery failed: %v", err) + log.Fatalf("Open failed: %v", err) } - defer w2.Close() + defer wRead.Close() - // 2. Create Iterator to read - iter, err := w2.NewReader() + iter, err := wRead.NewReader() if err != nil { - log.Fatalf("โŒ Iterator failed: %v", err) + log.Fatalf("Reader failed: %v", err) } defer iter.Close() - // 3. Browse logs count := 0 - fmt.Println(" --- Start reading ---") for iter.Next() { - data := iter.Value() - // Print the first 3 lines and the last 3 lines as a test. - if count < 3 || count >= 4997 { - fmt.Printf(" [%4d] %s\n", count, string(data)) - } - if count == 3 { - fmt.Println(" ... (reading) ...") - } + // val := iter.Value() + // fmt.Println(string(val)) // Uncomment to see data count++ } - // 4. Check for errors if err := iter.Err(); err != nil { - log.Printf("โš ๏ธ Replay stopped due to error.: %v", err) - } else { - fmt.Printf("โœ… Replay successful: Read %d/5000 records.\n", count) + log.Printf("โš ๏ธ Iterator stopped with error: %v", err) } + fmt.Printf(" โœ… Read %d total records from disk.\n", count) - if count == 5000 { - fmt.Println("๐ŸŽ‰ DATA INTEGRITY 100%!") + // ========================================== + // PART 3: RETENTION & CLEANUP + // ========================================== + fmt.Println("\n๐Ÿงน [Part 3] Retention & Cleanup...") + + // To demonstrate cleanup, we need multiple segments. + // Since we wrote ~1000 entries with small SegmentSize, we should have multiple files. + + // A. Cleanup By Size (Keep max 50KB) + fmt.Println(" -> Running CleanupBySize (Max 50KB)...") + // Note: 50KB is roughly 5 segments (since we set SegmentSize=10KB) + if err := wRead.CleanupBySize(50 * 1024); err != nil { + log.Printf("CleanupBySize warning: %v", err) } + + // B. Manual Truncate (Remove everything before Segment 2) + fmt.Println(" -> Running TruncateFront(2)...") + if err := wRead.TruncateFront(2); err != nil { + log.Printf("TruncateFront warning: %v", err) + } + + // Check remaining files + entries, _ := os.ReadDir(walDir) + fmt.Printf(" โœ… Cleanup finished. Remaining segment files: %d\n", len(entries)) + for _, e := range entries { + fmt.Printf(" - %s\n", e.Name()) + } + + fmt.Println("\n๐ŸŽ‰ Demo Completed Successfully!") } diff --git a/go.mod b/go.mod index 0ab82c1..b4341c5 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/hungpdn/wal -go 1.25.0 +go 1.22 diff --git a/reader.go b/reader.go new file mode 100644 index 0000000..f647980 --- /dev/null +++ b/reader.go @@ -0,0 +1,147 @@ +package wal + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "os" +) + +// Reader: Interface to read logs sequentially +type Reader interface { + Next() bool // Move to the next log + Value() []byte // Get the data of the current log + Err() error // Get error if any + Close() error // Close the reader +} + +// Iterator: Implementation of Reader +type Iterator struct { + wal *WAL + segmentPaths []string // List of segment file paths + currentIdx int // Index of the current segment file being read + currentFile *os.File // File descriptor currently open + currentReader *io.Reader // Reader wrapper (could be buffered) + currentEntry []byte // Current log entry data + err error // Error if any + closed bool // Whether the iterator is closed +} + +// NewReader: Create an Iterator starting from the oldest segment +func (w *WAL) NewReader() (*Iterator, error) { + w.mu.RLock() + defer w.mu.RUnlock() + + // Combine all segment files (including closed and active files) + var paths []string + for _, seg := range w.segments { + paths = append(paths, seg.path) + } + // Don't forget the active file (the last one) + if w.activeSegment != nil { + paths = append(paths, w.activeSegment.path) + } + + return &Iterator{ + wal: w, + segmentPaths: paths, + currentIdx: 0, + currentFile: nil, // Will open lazy in Next() + }, nil +} + +// Next: Read the next entry. Returns true if successful, false if no more data or error. +func (it *Iterator) Next() bool { + if it.err != nil || it.closed { + return false + } + + // Loop to handle switching between segment files + for { + // If file is not opened or we've read all of the old file -> Open new file + if it.currentFile == nil { + if it.currentIdx >= len(it.segmentPaths) { + return false // All files have been read + } + + path := it.segmentPaths[it.currentIdx] + f, err := os.Open(path) + if err != nil { + // If file is deleted (e.g. retention policy), we report error + // or we could skip it using it.currentIdx++ (data loss scenario) + // For strict consistency, we return error. + it.err = err + return false + } + + it.currentFile = f + // Create a buffer reader for faster reading + br := bufio.NewReader(f) + reader := io.Reader(br) + it.currentReader = &reader + } + + // Read Header + header := make([]byte, headerSize) + _, err := io.ReadFull(*it.currentReader, header) + + if err != nil { + // If EOF is encountered, close the current file and increment the index so that the next iteration opens a new file + if err == io.EOF { + it.currentFile.Close() + it.currentFile = nil + it.currentIdx++ + continue + } + // If unexpected error occurs (UnexpectedEOF) -> Report error + it.err = err + return false + } + + readCrc := binary.BigEndian.Uint32(header[:crcSize]) + size := binary.BigEndian.Uint64(header[crcSize : crcSize+sizeSize]) + offset := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) + + payload := make([]byte, size) + if _, err := io.ReadFull(*it.currentReader, payload); err != nil { + it.err = err + return false + } + + verifyBuf := make([]byte, sizeSize+offsetSize+len(payload)) + binary.BigEndian.PutUint64(verifyBuf[:sizeSize], size) + binary.BigEndian.PutUint64(verifyBuf[sizeSize:sizeSize+offsetSize], offset) + copy(verifyBuf[sizeSize+offsetSize:], payload) + + if calculateCRC(verifyBuf) != readCrc { + it.err = fmt.Errorf("wal: corrupted data (crc mismatch) at segment %s", it.segmentPaths[it.currentIdx]) + return false + } + + it.currentEntry = payload + return true + } +} + +// Value: Get the data of the current log entry +func (it *Iterator) Value() []byte { + // Return a copy for safety, or return the original slice if you want zero-allocation (be careful) + out := make([]byte, len(it.currentEntry)) + copy(out, it.currentEntry) + return out +} + +// Err: Returns an error if the browsing process is interrupted +func (it *Iterator) Err() error { + return it.err +} + +// Close: Close the current file and release its resources +func (it *Iterator) Close() error { + it.closed = true + if it.currentFile != nil { + return it.currentFile.Close() + } + return nil +} diff --git a/wal.go b/wal.go index 147b3fe..65aa183 100644 --- a/wal.go +++ b/wal.go @@ -42,47 +42,62 @@ var ( ErrInvalidCRC = errors.New("wal: invalid crc, data corruption detected") ) +var bufPool = sync.Pool{ + New: func() interface{} { + // Allocate a default buffer. It will grow if needed. + return make([]byte, 4096+headerSize) + }, +} + // WAL: Write-Ahead Log structure type WAL struct { - mu sync.RWMutex // Ensures thread safety (Concurrency) + mu sync.RWMutex // Ensures thread safety dir string // Directory to store WAL files bufferSize int // Size of the write buffer - segmentSize int64 // Max size of each segment file + segmentSize int64 // Max size of each segment file, soft limit not hard limit segmentPrefix string // Prefix for segment file names activeSegment *Segment // Current active segment for writing segments []*Segment // List of closed segment files syncStrategy SyncStrategy // Sync strategy - stopSync chan struct{} // Channel to stop background sync goroutine - wg sync.WaitGroup // WaitGroup to wait for goroutines to finish + stopSync chan struct{} // Channel to stop background sync + wg sync.WaitGroup // WaitGroup for background sync } // Segment: Represents a physical file type Segment struct { - idx int // Number of the segment (0, 1, 2...) + idx uint64 // Number of the segment (0, 1, 2...) path string // Path to the file - file *os.File // File descriptor - writer *bufio.Writer // Uses buffer to reduce system calls (High Performance) + file *os.File // File descriptor (Active only) + writer *bufio.Writer // Buffered writer (Active only) size int64 // Current size of the file } -// NewWAL: Initialize and recover data -func New(cfg Config) (*WAL, error) { +type segmentFile struct { + name string + idx uint64 +} + +// Open: Initialize and recover data +func Open(dir string, opts *Options) (*WAL, error) { // Ensure WAL directory exists - if err := os.MkdirAll(cfg.WALDir, 0755); err != nil { + if err := os.MkdirAll(dir, PermMkdir); err != nil { return nil, err } // Load default config values if not set - cfg.SetDefault() + if opts == nil { + opts = &DefaultOptions + } + opts.SetDefault() // Initialize WAL structure wal := WAL{ - dir: cfg.WALDir, - bufferSize: cfg.BufferSize, - segmentSize: cfg.SegmentSize, - segmentPrefix: cfg.SegmentPrefix, - syncStrategy: cfg.SyncStrategy, + dir: dir, + bufferSize: opts.BufferSize, + segmentSize: opts.SegmentSize, + segmentPrefix: opts.SegmentPrefix, + syncStrategy: opts.SyncStrategy, stopSync: make(chan struct{}), } @@ -91,11 +106,10 @@ func New(cfg Config) (*WAL, error) { return nil, err } - // Only run background sync if necessary - // If the SyncStrategyAlways, we skip it to save CPU + // Only run background sync if not SyncStrategyAlways if wal.syncStrategy != SyncStrategyAlways { wal.wg.Add(1) - go wal.backgroundSync(time.Duration(cfg.SyncInterval) * time.Millisecond) + go wal.backgroundSync(time.Duration(opts.SyncInterval) * time.Millisecond) } return &wal, nil @@ -110,45 +124,54 @@ func (w *WAL) loadSegments() error { return err } - // Filter segment files based on prefix and suffix - var segFiles []string + // Parse the filename and index it into a temporary struct + var segFiles []segmentFile for _, e := range entries { - if strings.HasPrefix(e.Name(), fmt.Sprintf("%s-", w.segmentPrefix)) && strings.HasSuffix(e.Name(), ".wal") { - segFiles = append(segFiles, e.Name()) + if strings.HasPrefix(e.Name(), fmt.Sprintf("%s-", w.segmentPrefix)) && + strings.HasSuffix(e.Name(), ".wal") { + + // Parse ID from file name + var idx uint64 + cleanName := strings.TrimPrefix(e.Name(), w.segmentPrefix+"-") + cleanName = strings.TrimSuffix(cleanName, ".wal") + + // Scan integer number + if _, err := fmt.Sscanf(cleanName, "%d", &idx); err == nil { + segFiles = append(segFiles, segmentFile{name: e.Name(), idx: idx}) + } } } - // Sort to ensure log order - sort.Strings(segFiles) + // Sort by Index (Integer Sort) + sort.Slice(segFiles, func(i, j int) bool { + return segFiles[i].idx < segFiles[j].idx + }) // If no segment files exist, create the first one if len(segFiles) == 0 { return w.createActiveSegment(0) } - // Reopen old segments - for i, name := range segFiles { - path := filepath.Join(w.dir, name) - f, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND, 0600) + for i, fileObj := range segFiles { + path := filepath.Join(w.dir, fileObj.name) + f, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND, PermFileOpen) if err != nil { return err } - // Get current size to determine offset stat, err := f.Stat() if err != nil { return err } - // Create Segment struct seg := &Segment{ - idx: i, + idx: fileObj.idx, path: path, file: f, size: stat.Size(), } - // If it's the last file (Active Segment), we need to check for consistency + // If it's the last file, set as active if i == len(segFiles)-1 { // Seek to the end of the file if _, err := f.Seek(0, io.SeekEnd); err != nil { @@ -159,7 +182,7 @@ func (w *WAL) loadSegments() error { // Important: Repair the last file if corrupted due to crash if err := w.repairActiveSegment(); err != nil { - return fmt.Errorf("corrupted segment repair failed: %v", err) + return err } } else { // Close old segment files @@ -213,7 +236,7 @@ func (w *WAL) repairActiveSegment() error { copy(verifyBuf[sizeSize+offsetSize:], payload) if calculateCRC(verifyBuf) != readCrc { - log.Println("โš ๏ธ Detected corrupted data due to crash -> Will truncate.") + log.Println("โš ๏ธ WAL: Corrupted tail detected, truncating...") break } @@ -237,7 +260,7 @@ func (w *WAL) repairActiveSegment() error { } // createActiveSegment: Create a new active segment -func (w *WAL) createActiveSegment(idx int) error { +func (w *WAL) createActiveSegment(idx uint64) error { // sync and close old segment if exists if w.activeSegment != nil { @@ -257,11 +280,19 @@ func (w *WAL) createActiveSegment(idx int) error { path := filepath.Join(w.dir, fileName) // Open file with 0600 permissions (read/write for owner only) -> Security - f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0600) + f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, PermFileOpen) if err != nil { return err } + // Sync parent directory + // The file has been synced, but its entry in the parent directory may not have been synced to disk + // If the power goes out at the moment the new file is created, that file may disappear completely + if dir, err := os.Open(w.dir); err == nil { + dir.Sync() // Ensure that the entry for the new file is written to the directory table + dir.Close() + } + w.activeSegment = &Segment{ idx: idx, path: path, @@ -275,7 +306,6 @@ func (w *WAL) createActiveSegment(idx int) error { // backgroundSync: Periodically sync data based on strategy func (w *WAL) backgroundSync(interval time.Duration) { defer w.wg.Done() - ticker := time.NewTicker(interval) defer ticker.Stop() @@ -291,119 +321,124 @@ func (w *WAL) backgroundSync(interval time.Duration) { } case SyncStrategyOSCache: - // --- OPTIMIZED LOCKING --- - - // Lock only to get the file pointer (Extremely short) + // Minimized Lock Contention w.mu.Lock() if w.activeSegment == nil || w.activeSegment.file == nil { - w.mu.Unlock() // Nothing to sync + w.mu.Unlock() continue } // Copy file pointer to local variable f := w.activeSegment.file - w.mu.Unlock() // <--- RELEASE LOCK IMMEDIATELY + w.mu.Unlock() - // Perform heavy I/O outside of Lock - // At this point, other Goroutines (Write) can acquire Lock and write logs normally err := f.Sync() - - // Troubleshooting (Race condition with Rotate/Close) - if err != nil { - // If the file is closed (due to the rotate file closing), we ignore this error - if errors.Is(err, os.ErrClosed) { - continue - } - // Other errors will be logged as warnings - log.Printf("wal: background sync os_cache warning: %v", err) + if err != nil && !errors.Is(err, os.ErrClosed) { + log.Printf("wal: background sync error: %v", err) } - // ------------------------- default: // do nothing } case <-w.stopSync: - log.Println("wal: stopping sync routine...") return } } } -// Write: Append a new entry to the WAL -func (w *WAL) Write(payload []byte) error { - w.mu.Lock() - defer w.mu.Unlock() - - // Prepare Header - // Total size of entry = Header + Payload +func (w *WAL) bufferWrite(payload []byte) error { pktSize := int64(headerSize + len(payload)) - // Check Log Rotation - if w.activeSegment.size+pktSize > w.segmentSize { - if err := w.createActiveSegment(w.activeSegment.idx + 1); err != nil { - return err - } - } - payloadLen := uint64(len(payload)) currentOffset := uint64(w.activeSegment.size) - // Prepare Buffer - buf := make([]byte, pktSize) + var buf []byte + obj := bufPool.Get() + if b, ok := obj.([]byte); ok && int64(cap(b)) >= pktSize { + buf = b[:pktSize] + } else { + buf = make([]byte, pktSize) + } - // Encoding Binary - // Format: [CRC][Size][Offset][Payload] -> Calculate CRC over binary.BigEndian.PutUint64(buf[crcSize:crcSize+sizeSize], payloadLen) binary.BigEndian.PutUint64(buf[crcSize+sizeSize:headerSize], currentOffset) copy(buf[headerSize:], payload) - // Calculate CRC checksum := calculateCRC(buf[crcSize:]) binary.BigEndian.PutUint32(buf[:crcSize], checksum) - // Write to buffered writer first (Go RAM) if _, err := w.activeSegment.writer.Write(buf); err != nil { return err } - // Sync based on strategy + bufPool.Put(buf) + w.activeSegment.size += pktSize + return nil +} + +func (w *WAL) runSyncStrategy() error { switch w.syncStrategy { + case SyncStrategyAlways: + return w.sync() + case SyncStrategyOSCache: + return w.activeSegment.writer.Flush() + } + return nil +} - case SyncStrategyAlways: // safest +// Write: Append a new entry to the WAL +func (w *WAL) Write(payload []byte) error { + w.mu.Lock() + defer w.mu.Unlock() - if err := w.sync(); err != nil { + pktSize := int64(headerSize + len(payload)) + if w.activeSegment.size+pktSize > w.segmentSize { + if err := w.createActiveSegment(w.activeSegment.idx + 1); err != nil { return err } + } + + if err := w.bufferWrite(payload); err != nil { + return err + } - case SyncStrategyOSCache: // fast, safe with app crash - // Push to OS - // Syncing is handled by the background goroutine every 1 second - if err := w.activeSegment.writer.Flush(); err != nil { + return w.runSyncStrategy() +} + +// WriteBatch writes a batch of entries to the WAL efficiently. +// It acquires the lock once and syncs once (if needed) for the whole batch. +func (w *WAL) WriteBatch(payloads [][]byte) error { + w.mu.Lock() + defer w.mu.Unlock() + + var batchSize int64 + for _, p := range payloads { + batchSize += int64(headerSize + len(p)) + } + + if w.activeSegment.size+batchSize > w.segmentSize { + if err := w.createActiveSegment(w.activeSegment.idx + 1); err != nil { return err } - - case SyncStrategyBackground: // high risk, super fast - // do nothing } - // Update Offset for next write - w.activeSegment.size += pktSize + for _, payload := range payloads { + if err := w.bufferWrite(payload); err != nil { + return err + } + } - return nil + return w.runSyncStrategy() } // sync: Flush buffer and fsync to disk // Internal method, caller must hold the lock func (w *WAL) sync() error { - if w.activeSegment == nil { return nil } - - // Flush buffer Go -> OS if err := w.activeSegment.writer.Flush(); err != nil { return err } - // Fsync OS -> Disk Platter (Mandatory for Durability) return w.activeSegment.file.Sync() } @@ -412,184 +447,146 @@ func (w *WAL) sync() error { func (w *WAL) Sync() error { w.mu.Lock() defer w.mu.Unlock() - return w.sync() } // StopSync stops sync goroutine and WAITS for it to finish func (w *WAL) StopSync() { - // Check the channel to avoid closing on nil or closing twice (if more thorough, use sync.Once) select { case <-w.stopSync: - // The channel is closed, it's not doing anything default: - // If not closed yet, close it to signal stop if w.stopSync != nil { close(w.stopSync) } } - - // Block here until backgroundSync actually returns. w.wg.Wait() } // Close: Safely close the WAL func (w *WAL) Close() error { w.mu.Lock() - // Sync data last time if err := w.sync(); err != nil { w.mu.Unlock() return err } w.mu.Unlock() // <--- IMPORTANT: Release the lock so that BackgroundSync can finish running (if it's stuck). - // Stop the background worker (This function has wg.Wait, so it must be called when the lock is not held) w.StopSync() - // Lock the file to close it securely w.mu.Lock() defer w.mu.Unlock() return w.activeSegment.file.Close() } -// Reader: Interface to read logs sequentially -type Reader interface { - Next() bool // Move to the next log - Value() []byte // Get the data of the current log - Err() error // Get error if any - Close() error // Close the reader -} - -// Iterator: Implementation of Reader -type Iterator struct { - wal *WAL - segmentPaths []string // List of segment file paths - currentIdx int // Index of the current segment file being read - currentFile *os.File // File descriptor currently open - currentReader *io.Reader // Reader wrapper (could be buffered) - currentEntry []byte // Current log entry data - err error // Error if any - closed bool // Whether the iterator is closed -} - -// NewReader: Create an Iterator starting from the oldest segment -func (w *WAL) NewReader() (*Iterator, error) { +func (w *WAL) GetLastSegmentIdx() uint64 { w.mu.RLock() defer w.mu.RUnlock() - // Combine all segment files (including closed and active files) - var paths []string - for _, seg := range w.segments { - paths = append(paths, seg.path) - } - // Don't forget the active file (the last one) - if w.activeSegment != nil { - paths = append(paths, w.activeSegment.path) + if w.activeSegment == nil { + return 0 } - return &Iterator{ - wal: w, - segmentPaths: paths, - currentIdx: 0, - currentFile: nil, // Will open lazy in Next() - }, nil + return w.activeSegment.idx } -// Next: Read the next entry. Returns true if successful, false if no more data or error. -func (it *Iterator) Next() bool { - if it.err != nil || it.closed { - return false - } +// TruncateFront deletes all closed segments with an index less than the provided index. +// This is used to free up disk space after a snapshot has been taken. +// Note: This does not affect the active segment. +func (w *WAL) TruncateFront(segmentIdx uint64) error { + w.mu.Lock() + defer w.mu.Unlock() - // Loop to handle switching between segment files - for { - // If file is not opened or we've read all of the old file -> Open new file - if it.currentFile == nil { - if it.currentIdx >= len(it.segmentPaths) { - return false // All files have been read + var keptSegments []*Segment + for _, seg := range w.segments { + if seg.idx < segmentIdx { + // Remove the file from disk + if err := os.Remove(seg.path); err != nil { + // If the file is already gone, ignore the error + if !os.IsNotExist(err) { + return err + } } + } else { + keptSegments = append(keptSegments, seg) + } + } - path := it.segmentPaths[it.currentIdx] - f, err := os.Open(path) - if err != nil { - it.err = err - return false - } + // Update the segment list + w.segments = keptSegments + return nil +} - it.currentFile = f - // Create a buffer reader for faster reading - br := bufio.NewReader(f) - reader := io.Reader(br) - it.currentReader = &reader - } +// Cleanup removes closed segments that are older than the specified TTL (Time To Live). +// This is useful for time-based retention policies. +func (w *WAL) CleanupByTTL(ttl time.Duration) error { + w.mu.Lock() + defer w.mu.Unlock() - // Read Header - header := make([]byte, headerSize) - _, err := io.ReadFull(*it.currentReader, header) + threshold := time.Now().Add(-ttl) + var keptSegments []*Segment + for _, seg := range w.segments { + // Get file stats to check modification time + info, err := os.Stat(seg.path) if err != nil { - // If EOF is encountered, close the current file and increment the index so that the next iteration opens a new file - if err == io.EOF { - it.currentFile.Close() - it.currentFile = nil - it.currentIdx++ + // If file doesn't exist, skip it (it's essentially cleaned) + if os.IsNotExist(err) { continue } - // If unexpected error occurs (UnexpectedEOF) -> Report error - it.err = err - return false + // If we can't stat it for some other reason, keep it to be safe + keptSegments = append(keptSegments, seg) + continue } - // Parse Header - // Note: This must match the structure used when writing within the Write function - readCrc := binary.BigEndian.Uint32(header[:crcSize]) - size := binary.BigEndian.Uint64(header[crcSize : crcSize+sizeSize]) - // offset := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) // It can be used for debugging - - // Read Payload - payload := make([]byte, size) - if _, err := io.ReadFull(*it.currentReader, payload); err != nil { - it.err = err - return false + // If the file is older than the threshold, delete it + if info.ModTime().Before(threshold) { + if err := os.Remove(seg.path); err != nil { + if !os.IsNotExist(err) { + return err + } + } + } else { + keptSegments = append(keptSegments, seg) } + } - // Verify CRC - // Create a buffer to recalculate CRC (Size + Offset + Payload) - // Note: When writing, you calculate CRC for (Buf[crcSize:]), which includes both Size and Offset - verifyBuf := make([]byte, sizeSize+offsetSize+len(payload)) - copy(verifyBuf[:sizeSize+offsetSize], header[crcSize:]) // Copy Size + Offset from header - copy(verifyBuf[sizeSize+offsetSize:], payload) // Copy Payload + w.segments = keptSegments + return nil +} - // Use Castagnoli table to match config.go - if calculateCRC(verifyBuf) != readCrc { - it.err = fmt.Errorf("wal: corrupted data (crc mismatch) at segment %s", it.segmentPaths[it.currentIdx]) - return false - } +// CleanupBySize removes old segments if the total WAL size exceeds maxSizeBytes. +// It deletes from the oldest segment until the total size is within the limit. +func (w *WAL) CleanupBySize(maxSizeBytes int64) error { + w.mu.Lock() + defer w.mu.Unlock() - // Finish - it.currentEntry = payload - return true + var totalSize int64 + for _, seg := range w.segments { + totalSize += seg.size + } + if w.activeSegment != nil { + totalSize += w.activeSegment.size } -} -// Value: Get the data of the current log entry -func (it *Iterator) Value() []byte { - // Return a copy for safety, or return the original slice if you want zero-allocation (be careful) - out := make([]byte, len(it.currentEntry)) - copy(out, it.currentEntry) - return out -} + if totalSize <= maxSizeBytes { + return nil + } -// Err: Returns an error if the browsing process is interrupted -func (it *Iterator) Err() error { - return it.err -} + var deleteCount int + for _, seg := range w.segments { + if totalSize > maxSizeBytes { + if err := os.Remove(seg.path); err != nil && !os.IsNotExist(err) { + return err + } + totalSize -= seg.size + deleteCount++ + } else { + break + } + } -// Close: Close the current file and release its resources -func (it *Iterator) Close() error { - it.closed = true - if it.currentFile != nil { - return it.currentFile.Close() + if deleteCount > 0 { + w.segments = w.segments[deleteCount:] } return nil } diff --git a/wal_test.go b/wal_test.go index 1f5efc5..63d161c 100644 --- a/wal_test.go +++ b/wal_test.go @@ -12,8 +12,7 @@ func TestWAL_WriteAndRead(t *testing.T) { os.RemoveAll(dir) // Clean up trฦฐแป›c khi test defer os.RemoveAll(dir) // Clean up sau khi test - cfg := Config{ - WALDir: dir, + opts := Options{ BufferSize: 4 * 1024, SegmentSize: 1024 * 1024, // 1MB SyncStrategy: SyncStrategyOSCache, @@ -21,7 +20,7 @@ func TestWAL_WriteAndRead(t *testing.T) { } // 2. Init WAL - w, err := New(cfg) + w, err := Open(dir, &opts) if err != nil { t.Fatalf("Failed to init WAL: %v", err) } @@ -41,7 +40,7 @@ func TestWAL_WriteAndRead(t *testing.T) { } // 5. Re-open (Recovery Test) - w2, err := New(cfg) + w2, err := Open(dir, &opts) if err != nil { t.Fatalf("Failed to re-open WAL: %v", err) } From 0f3f21c9a29cd55d032aaef93c6e605431667d0b Mon Sep 17 00:00:00 2001 From: hungpdn Date: Wed, 14 Jan 2026 23:05:16 +0700 Subject: [PATCH 2/3] wal: global sequence id, optimize allocation --- .github/workflows/go.yml | 2 +- README.md | 16 +- config.go | 35 +++-- examples/main.go | 46 +++++- reader.go | 106 +++++++++++-- wal.go | 115 ++++++++++---- wal_test.go | 328 ++++++++++++++++++++++++++++++++++++--- 7 files changed, 557 insertions(+), 91 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 4007e0b..bbe261d 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -15,7 +15,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: "1.25" + go-version: "1.22" - name: Test run: make test diff --git a/README.md b/README.md index b7fd743..457d202 100644 --- a/README.md +++ b/README.md @@ -24,14 +24,15 @@ Each segment file consists of a sequence of binary encoded entries. ```Plaintext +-------------------+-------------------+-------------------+----------------------+ -| CRC32 (4 bytes) | Size (8 bytes) | Offset (8 bytes) | Payload (N bytes) | +| CRC32 (4 bytes) | Size (8 bytes) | SeqID (8 bytes) | Payload (N bytes) | +-------------------+-------------------+-------------------+----------------------+ -| Checksum of Data | Length of Payload | Logical Position | The actual data | +| Checksum of Data | Length of Payload | Monotonic ID | The actual data | +-------------------+-------------------+-------------------+----------------------+ ``` - CRC (Cyclic Redundancy Check): Ensures data integrity. -- Size & Offset: Enable fast reading without parsing the entire file. +- Size: Enable fast reading without parsing the entire file. +- SeqID: Global Sequence ID - Payload: The actual data. ## Installation @@ -53,12 +54,12 @@ import ( ) func main() { - opts := wal.Options{ + cfg := wal.Config{ SegmentSize: 10 * 1024 * 1024, // 10MB SyncStrategy: wal.SyncStrategyOSCache, } - w, _ := wal.Open("./wal_data", &opts) + w, _ := wal.Open("./wal_data", &cfg) defer w.Close() // Write data @@ -70,7 +71,7 @@ func main() { ### Reading Data (Replay) ```go -w, _ := wal.Open("", &opts) // Auto-recovers on open +w, _ := wal.Open("", &cfg) // Auto-recovers on open iter, _ := w.NewReader() defer iter.Close() @@ -85,7 +86,7 @@ if err := iter.Err(); err != nil { } ``` -### Options +### Config |Field |Type |Default |Description | |------------|------|----------|----------------------------------------------------| @@ -93,6 +94,7 @@ if err := iter.Err(); err != nil { |BufferSize |int |4KB |Size of the in-memory buffer. | |SyncStrategy|int |Background|0: Background, 1: Always (Fsync), 2: OSCache (Recm).| |SyncInterval|uint |1000ms |Interval for background sync execution. | +|Mode |int |0 |0: debug, 1: prod. | ### Sync Strategies diff --git a/config.go b/config.go index fc62eb4..58a029c 100644 --- a/config.go +++ b/config.go @@ -28,17 +28,25 @@ const ( SyncStrategyOSCache SyncStrategy = 2 ) -// Options holds the configuration for WAL. -type Options struct { +type Mode int + +const ( + ModeDebug Mode = 0 + ModeProd Mode = 1 +) + +// Config holds the configuration for WAL. +type Config struct { BufferSize int // Buffered writes size in bytes (e.g., 4KB) SegmentSize int64 // Maximum size of each file (e.g., 10MB) SegmentPrefix string // Prefix for segment file names (e.g., "segment") SyncStrategy SyncStrategy // Sync strategy SyncInterval uint // Sync interval in milliseconds for background sync + Mode Mode } -// DefaultOptions provides default configuration values for WAL. -var DefaultOptions = Options{ +// DefaultConfig provides default configuration values for WAL. +var DefaultConfig = Config{ BufferSize: 4 * KB, // 4KB SegmentSize: 10 * MB, // 10MB SegmentPrefix: "segment", @@ -46,19 +54,24 @@ var DefaultOptions = Options{ SyncInterval: 1000, // 1000ms = 1s } -// SetDefault sets default values for any zero-value fields in the Options. -func (cfg *Options) SetDefault() { +// SetDefault sets default values for any zero-value fields in the Config. +func (cfg *Config) SetDefault() { if cfg.BufferSize == 0 { - cfg.BufferSize = DefaultOptions.BufferSize + cfg.BufferSize = DefaultConfig.BufferSize + } + if cfg.SegmentSize == 0 { + cfg.SegmentSize = DefaultConfig.SegmentSize } - if cfg.SegmentSize < MB { - cfg.SegmentSize = DefaultOptions.SegmentSize + if cfg.Mode == ModeProd { + if cfg.SegmentSize < MB { + cfg.SegmentSize = DefaultConfig.SegmentSize + } } if cfg.SegmentPrefix == "" { - cfg.SegmentPrefix = DefaultOptions.SegmentPrefix + cfg.SegmentPrefix = DefaultConfig.SegmentPrefix } if cfg.SyncInterval == 0 { - cfg.SyncInterval = DefaultOptions.SyncInterval + cfg.SyncInterval = DefaultConfig.SyncInterval } } diff --git a/examples/main.go b/examples/main.go index eb32063..05e90ba 100644 --- a/examples/main.go +++ b/examples/main.go @@ -17,7 +17,7 @@ func main() { // 1. Configuration // We use a very small SegmentSize (10KB) to demonstrate Log Rotation and Cleanup easily. - cfg := wal.Options{ + cfg := wal.Config{ BufferSize: 4 * 1024, // 4KB Buffer SegmentSize: 10 * 1024, // 10KB (Small for demo purposes) SegmentPrefix: "wal", // Prefix: wal-0000.wal @@ -62,7 +62,7 @@ func main() { w.Close() // ========================================== - // PART 2: READING (Iterator) + // PART 2: READING (Iterator / Checkpoint / Resume) // ========================================== fmt.Println("\n๐Ÿ“– [Part 2] Reading Data (Replay)...") @@ -79,9 +79,19 @@ func main() { defer iter.Close() count := 0 + lastIndex := uint64(0) for iter.Next() { - // val := iter.Value() - // fmt.Println(string(val)) // Uncomment to see data + + if count < 5 || count > 995 { + fmt.Printf("ID: %d | Data: %s\n", iter.Index(), string(iter.Value())) + } + + if iter.Index() != lastIndex+1 { + panic("Missing gap!") + } + + lastIndex = iter.Index() + count++ } @@ -90,6 +100,34 @@ func main() { } fmt.Printf(" โœ… Read %d total records from disk.\n", count) + fmt.Println("\n๐Ÿ“– [Part 2.1] Resuming from Checkpoint...") + + // Let's assume the app crashed at ID = 900 last time + checkpointID := uint64(500) + fmt.Printf(" -> Seeking to ID: %d ...\n", checkpointID) + + iter, err = wRead.NewReader() + if err != nil { + log.Fatalf("Reader failed: %v", err) + } + defer iter.Close() + + if found := iter.Seek(checkpointID); !found { + if iter.Err() != nil { + log.Fatalf("Seek failed: %v", iter.Err()) + } + fmt.Println(" -> ID not found (end of log).") + } else { + fmt.Printf(" โœ… Resumed! Found ID: %d | Data: %s\n", iter.Index(), string(iter.Value())) + + for iter.Next() { + // Process logic... + if iter.Index() < 503 && iter.Index() > 500 { + fmt.Printf("ID: %d | Data: %s\n", iter.Index(), string(iter.Value())) + } + } + } + // ========================================== // PART 3: RETENTION & CLEANUP // ========================================== diff --git a/reader.go b/reader.go index f647980..8f1ae6a 100644 --- a/reader.go +++ b/reader.go @@ -10,10 +10,12 @@ import ( // Reader: Interface to read logs sequentially type Reader interface { - Next() bool // Move to the next log - Value() []byte // Get the data of the current log - Err() error // Get error if any - Close() error // Close the reader + Next() bool // Move to the next log + Value() []byte // Get the data of the current log + Index() uint64 // Get the Global Sequence ID + Seek(id uint64) bool // Fast Forward + Err() error // Get error if any + Close() error // Close the reader } // Iterator: Implementation of Reader @@ -24,8 +26,11 @@ type Iterator struct { currentFile *os.File // File descriptor currently open currentReader *io.Reader // Reader wrapper (could be buffered) currentEntry []byte // Current log entry data + currentSeqID uint64 // Store current ID err error // Error if any closed bool // Whether the iterator is closed + // Optimization: Reusable buffer for CRC verification to reduce GC pressure + verifyBuf []byte } // NewReader: Create an Iterator starting from the oldest segment @@ -101,7 +106,7 @@ func (it *Iterator) Next() bool { readCrc := binary.BigEndian.Uint32(header[:crcSize]) size := binary.BigEndian.Uint64(header[crcSize : crcSize+sizeSize]) - offset := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) + seqID := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) payload := make([]byte, size) if _, err := io.ReadFull(*it.currentReader, payload); err != nil { @@ -109,21 +114,33 @@ func (it *Iterator) Next() bool { return false } - verifyBuf := make([]byte, sizeSize+offsetSize+len(payload)) - binary.BigEndian.PutUint64(verifyBuf[:sizeSize], size) - binary.BigEndian.PutUint64(verifyBuf[sizeSize:sizeSize+offsetSize], offset) - copy(verifyBuf[sizeSize+offsetSize:], payload) + neededSize := sizeSize + seqIDSize + len(payload) + if cap(it.verifyBuf) < neededSize { + it.verifyBuf = make([]byte, neededSize) + } else { + it.verifyBuf = it.verifyBuf[:neededSize] + } + + binary.BigEndian.PutUint64(it.verifyBuf[:sizeSize], size) + binary.BigEndian.PutUint64(it.verifyBuf[sizeSize:sizeSize+seqIDSize], seqID) + copy(it.verifyBuf[sizeSize+seqIDSize:], payload) - if calculateCRC(verifyBuf) != readCrc { + if calculateCRC(it.verifyBuf) != readCrc { it.err = fmt.Errorf("wal: corrupted data (crc mismatch) at segment %s", it.segmentPaths[it.currentIdx]) return false } it.currentEntry = payload + it.currentSeqID = seqID return true } } +// Index returns the Global Sequence ID of the current log entry +func (it *Iterator) Index() uint64 { + return it.currentSeqID +} + // Value: Get the data of the current log entry func (it *Iterator) Value() []byte { // Return a copy for safety, or return the original slice if you want zero-allocation (be careful) @@ -145,3 +162,72 @@ func (it *Iterator) Close() error { } return nil } + +// Seek: Fast-forward to the record with ID >= startID +// It will skip reading the payload and verifying the CRC of older records +func (it *Iterator) Seek(startID uint64) bool { + for { + + if it.currentFile == nil { + if it.currentIdx >= len(it.segmentPaths) { + return false + } + path := it.segmentPaths[it.currentIdx] + f, err := os.Open(path) + if err != nil { + it.err = err + return false + } + it.currentFile = f + br := bufio.NewReader(f) + reader := io.Reader(br) + it.currentReader = &reader + } + + header := make([]byte, headerSize) + if _, err := io.ReadFull(*it.currentReader, header); err != nil { + if err == io.EOF { + it.currentFile.Close() + it.currentFile = nil + it.currentIdx++ + continue + } + it.err = err + return false + } + + size := binary.BigEndian.Uint64(header[crcSize : crcSize+sizeSize]) + seqID := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) + + if seqID < startID { + // Use Discard to jump over byte 'size' without copying the data + if _, err := io.CopyN(io.Discard, *it.currentReader, int64(size)); err != nil { + it.err = err + return false + } + continue + } else { + + payload := make([]byte, size) + if _, err := io.ReadFull(*it.currentReader, payload); err != nil { + it.err = err + return false + } + + verifyBuf := make([]byte, sizeSize+seqIDSize+len(payload)) + binary.BigEndian.PutUint64(verifyBuf[:sizeSize], size) + binary.BigEndian.PutUint64(verifyBuf[sizeSize:sizeSize+seqIDSize], seqID) + copy(verifyBuf[sizeSize+seqIDSize:], payload) + + readCrc := binary.BigEndian.Uint32(header[:crcSize]) + if calculateCRC(verifyBuf) != readCrc { + it.err = fmt.Errorf("wal: corrupted data at seek target in segment %s", it.segmentPaths[it.currentIdx]) + return false + } + + it.currentEntry = payload + it.currentSeqID = seqID + return true + } + } +} diff --git a/wal.go b/wal.go index 65aa183..b442330 100644 --- a/wal.go +++ b/wal.go @@ -20,13 +20,14 @@ WAL (Write-Ahead Log) Format: Each segment file consists of a sequence of binary encoded entries. +-------------------+-------------------+-------------------+----------------------+ -| CRC32 (4 bytes) | Size (8 bytes) | Offset (8 bytes) | Payload (N bytes) | +| CRC32 (4 bytes) | Size (8 bytes) | SeqID (8 bytes) | Payload (N bytes) | +-------------------+-------------------+-------------------+----------------------+ -| Checksum of Data | Length of Payload | Logical Position | The actual data | +| Checksum of Data | Length of Payload | Monotonic ID | The actual data | +-------------------+-------------------+-------------------+----------------------+ - CRC (Cyclic Redundancy Check): Ensures data integrity. -- Size & Offset: Enable fast reading without parsing the entire file. +- Size: Enable fast reading without parsing the entire file. +- SeqID: Global Sequence ID - Payload: The actual data. */ @@ -34,21 +35,14 @@ const ( // Size of various components in the WAL entry header crcSize = 4 sizeSize = 8 - offsetSize = 8 - headerSize = crcSize + sizeSize + offsetSize + seqIDSize = 8 + headerSize = crcSize + sizeSize + seqIDSize ) var ( ErrInvalidCRC = errors.New("wal: invalid crc, data corruption detected") ) -var bufPool = sync.Pool{ - New: func() interface{} { - // Allocate a default buffer. It will grow if needed. - return make([]byte, 4096+headerSize) - }, -} - // WAL: Write-Ahead Log structure type WAL struct { mu sync.RWMutex // Ensures thread safety @@ -61,6 +55,8 @@ type WAL struct { syncStrategy SyncStrategy // Sync strategy stopSync chan struct{} // Channel to stop background sync wg sync.WaitGroup // WaitGroup for background sync + lastSeqID uint64 // Track the global sequence ID + bufPool sync.Pool } // Segment: Represents a physical file @@ -78,7 +74,7 @@ type segmentFile struct { } // Open: Initialize and recover data -func Open(dir string, opts *Options) (*WAL, error) { +func Open(dir string, cfg *Config) (*WAL, error) { // Ensure WAL directory exists if err := os.MkdirAll(dir, PermMkdir); err != nil { @@ -86,19 +82,24 @@ func Open(dir string, opts *Options) (*WAL, error) { } // Load default config values if not set - if opts == nil { - opts = &DefaultOptions + if cfg == nil { + cfg = &DefaultConfig } - opts.SetDefault() + cfg.SetDefault() // Initialize WAL structure wal := WAL{ dir: dir, - bufferSize: opts.BufferSize, - segmentSize: opts.SegmentSize, - segmentPrefix: opts.SegmentPrefix, - syncStrategy: opts.SyncStrategy, + bufferSize: cfg.BufferSize, + segmentSize: cfg.SegmentSize, + segmentPrefix: cfg.SegmentPrefix, + syncStrategy: cfg.SyncStrategy, stopSync: make(chan struct{}), + bufPool: sync.Pool{ + New: func() interface{} { + return make([]byte, 4*KB+headerSize) + }, + }, } // Load existing segments from disk for recovery @@ -109,7 +110,7 @@ func Open(dir string, opts *Options) (*WAL, error) { // Only run background sync if not SyncStrategyAlways if wal.syncStrategy != SyncStrategyAlways { wal.wg.Add(1) - go wal.backgroundSync(time.Duration(opts.SyncInterval) * time.Millisecond) + go wal.backgroundSync(time.Duration(cfg.SyncInterval) * time.Millisecond) } return &wal, nil @@ -190,16 +191,58 @@ func (w *WAL) loadSegments() error { w.segments = append(w.segments, seg) } } + + // SPECIAL CASE: If active segment is empty (newly rotated before crash), + // we need to find lastSeqID from the previous closed segment. + if w.activeSegment.size == 0 && len(w.segments) > 0 { + lastClosedSeg := w.segments[len(w.segments)-1] + lastID, err := w.scanSegmentForLastID(lastClosedSeg.path) + if err != nil { + return fmt.Errorf("failed to recover SeqID from previous segment: %v", err) + } + w.lastSeqID = lastID + } return nil } +// scanSegmentForLastID reads a closed segment to find the highest SeqID. +// This is only called once at startup if the active segment is empty. +func (w *WAL) scanSegmentForLastID(path string) (uint64, error) { + f, err := os.Open(path) + if err != nil { + return 0, err + } + defer f.Close() + + reader := bufio.NewReader(f) + var lastID uint64 + + for { + header := make([]byte, headerSize) + if _, err := io.ReadFull(reader, header); err != nil { + if err == io.EOF { + break + } + return 0, err + } + size := binary.BigEndian.Uint64(header[crcSize : crcSize+sizeSize]) + seqID := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) + lastID = seqID + + // Skip payload + if _, err := reader.Discard(int(size)); err != nil { + return 0, err + } + } + return lastID, nil +} + // repairActiveSegment: Repair the active segment file if corrupted // Simple logic: Read through the active file to find the last valid point // In real production, we would do tail reading to optimize. Here we use scanning for clarity. func (w *WAL) repairActiveSegment() error { f := w.activeSegment.file - // Read from the beginning if _, err := f.Seek(0, io.SeekStart); err != nil { return err @@ -207,13 +250,14 @@ func (w *WAL) repairActiveSegment() error { reader := bufio.NewReader(f) var validOffset int64 = 0 + var maxSeqID uint64 = 0 for { // Read Header (CRC + Size + Offset) header := make([]byte, headerSize) if _, err := io.ReadFull(reader, header); err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { - break // End of file or truncated file -> Stop + break } return err } @@ -221,27 +265,27 @@ func (w *WAL) repairActiveSegment() error { // Parse Header readCrc := binary.BigEndian.Uint32(header[:crcSize]) size := binary.BigEndian.Uint64(header[crcSize : crcSize+sizeSize]) - offset := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) + seqID := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize]) // Read Payload payload := make([]byte, size) if _, err := io.ReadFull(reader, payload); err != nil { - break // Payload read error -> Stop + break } // Verify CRC - verifyBuf := make([]byte, sizeSize+offsetSize+len(payload)) + verifyBuf := make([]byte, sizeSize+seqIDSize+len(payload)) binary.BigEndian.PutUint64(verifyBuf[:sizeSize], size) - binary.BigEndian.PutUint64(verifyBuf[sizeSize:sizeSize+offsetSize], offset) - copy(verifyBuf[sizeSize+offsetSize:], payload) + binary.BigEndian.PutUint64(verifyBuf[sizeSize:sizeSize+seqIDSize], seqID) + copy(verifyBuf[sizeSize+seqIDSize:], payload) if calculateCRC(verifyBuf) != readCrc { log.Println("โš ๏ธ WAL: Corrupted tail detected, truncating...") break } - // Valid record, update offset safely validOffset += int64(headerSize + size) + maxSeqID = seqID } // Truncate file to the last valid position @@ -255,6 +299,7 @@ func (w *WAL) repairActiveSegment() error { } w.activeSegment.size = validOffset w.activeSegment.writer = bufio.NewWriterSize(f, w.bufferSize) // Reset buffer writer + w.lastSeqID = maxSeqID // Update global state return nil } @@ -347,20 +392,24 @@ func (w *WAL) backgroundSync(interval time.Duration) { } func (w *WAL) bufferWrite(payload []byte) error { + w.lastSeqID++ + pktSize := int64(headerSize + len(payload)) payloadLen := uint64(len(payload)) - currentOffset := uint64(w.activeSegment.size) + + currentSeqID := w.lastSeqID var buf []byte - obj := bufPool.Get() + obj := w.bufPool.Get() if b, ok := obj.([]byte); ok && int64(cap(b)) >= pktSize { buf = b[:pktSize] } else { buf = make([]byte, pktSize) } + // [CRC][Size][SeqID][Payload] binary.BigEndian.PutUint64(buf[crcSize:crcSize+sizeSize], payloadLen) - binary.BigEndian.PutUint64(buf[crcSize+sizeSize:headerSize], currentOffset) + binary.BigEndian.PutUint64(buf[crcSize+sizeSize:headerSize], currentSeqID) copy(buf[headerSize:], payload) checksum := calculateCRC(buf[crcSize:]) @@ -370,7 +419,7 @@ func (w *WAL) bufferWrite(payload []byte) error { return err } - bufPool.Put(buf) + w.bufPool.Put(buf) w.activeSegment.size += pktSize return nil } diff --git a/wal_test.go b/wal_test.go index 63d161c..46a815e 100644 --- a/wal_test.go +++ b/wal_test.go @@ -3,50 +3,55 @@ package wal import ( "fmt" "os" + "path/filepath" "testing" + "time" ) -func TestWAL_WriteAndRead(t *testing.T) { - // 1. Setup - dir := "./test_wal_data" - os.RemoveAll(dir) // Clean up trฦฐแป›c khi test - defer os.RemoveAll(dir) // Clean up sau khi test +func cleanUp(dir string) { + _ = os.RemoveAll(dir) +} - opts := Options{ - BufferSize: 4 * 1024, - SegmentSize: 1024 * 1024, // 1MB - SyncStrategy: SyncStrategyOSCache, - SyncInterval: 100, +func TestWAL_BasicWriteAndRead(t *testing.T) { + dir := "./test_data_basic" + cleanUp(dir) + defer cleanUp(dir) + + cfg := Config{ + BufferSize: 4 * 1024, + SegmentSize: 1024 * 1024, // 1MB + SyncStrategy: SyncStrategyOSCache, + SegmentPrefix: "wal", } - // 2. Init WAL - w, err := Open(dir, &opts) + w, err := Open(dir, &cfg) if err != nil { - t.Fatalf("Failed to init WAL: %v", err) + t.Fatalf("Failed to open WAL: %v", err) } - // 3. Write Data + // 1. Write entries := 100 for i := 0; i < entries; i++ { payload := []byte(fmt.Sprintf("entry-%d", i)) if err := w.Write(payload); err != nil { - t.Fatalf("Write failed at %d: %v", i, err) + t.Fatalf("Write failed: %v", err) } } - // 4. Close - if err := w.Close(); err != nil { - t.Fatalf("Close failed: %v", err) + // Verify LastSeqID + if w.lastSeqID != uint64(entries) { + t.Errorf("Expected LastSeqID %d, got %d", entries, w.lastSeqID) } - // 5. Re-open (Recovery Test) - w2, err := Open(dir, &opts) + w.Close() + + // 2. Read (Re-open) + w2, err := Open(dir, &cfg) if err != nil { t.Fatalf("Failed to re-open WAL: %v", err) } defer w2.Close() - // 6. Verify Data using Iterator iter, err := w2.NewReader() if err != nil { t.Fatalf("Failed to create reader: %v", err) @@ -59,14 +64,287 @@ func TestWAL_WriteAndRead(t *testing.T) { if string(iter.Value()) != expected { t.Errorf("Mismatch at %d: expected %s, got %s", count, expected, string(iter.Value())) } + if iter.Index() != uint64(count+1) { // SeqID starts at 1 + t.Errorf("SeqID mismatch: expected %d, got %d", count+1, iter.Index()) + } count++ } - if err := iter.Err(); err != nil { - t.Fatalf("Iterator error: %v", err) - } - if count != entries { t.Errorf("Expected %d entries, got %d", entries, count) } } + +func TestWAL_WriteBatch(t *testing.T) { + dir := "./test_data_batch" + cleanUp(dir) + defer cleanUp(dir) + + cfg := Config{ + BufferSize: 4 * 1024, + SegmentSize: 10 * 1024 * 1024, + SyncStrategy: SyncStrategyOSCache, + } + + w, err := Open(dir, &cfg) + if err != nil { + t.Fatalf("Init failed: %v", err) + } + defer w.Close() + + // Batch Write + var batch [][]byte + for i := 0; i < 50; i++ { + batch = append(batch, []byte(fmt.Sprintf("batch-%d", i))) + } + + if err := w.WriteBatch(batch); err != nil { + t.Fatalf("WriteBatch failed: %v", err) + } + + // Verify + iter, _ := w.NewReader() + defer iter.Close() + count := 0 + for iter.Next() { + count++ + } + if count != 50 { + t.Errorf("Expected 50 entries, got %d", count) + } +} + +func TestWAL_LogRotation(t *testing.T) { + dir := "./test_data_rotation" + cleanUp(dir) + defer cleanUp(dir) + + // Set very small segment size to force rotation + cfg := Config{ + BufferSize: 1024, + SegmentSize: 100, // 100 Bytes per file + SegmentPrefix: "wal", + } + + w, err := Open(dir, &cfg) + if err != nil { + t.Fatalf("Init failed: %v", err) + } + + // Write enough data to create multiple files + // Header overhead is approx 20 bytes. Payload "data" is 4 bytes. + // Total ~24 bytes per entry. 100 bytes limit -> ~4 entries per file. + for i := 0; i < 20; i++ { + w.Write([]byte("data")) + } + w.Close() + + // Check files + entries, _ := os.ReadDir(dir) + walFiles := 0 + for _, e := range entries { + if filepath.Ext(e.Name()) == ".wal" { + walFiles++ + } + } + + if walFiles < 2 { + t.Errorf("Expected multiple segment files, got %d", walFiles) + } +} + +func TestWAL_Seek(t *testing.T) { + dir := "./test_data_seek" + cleanUp(dir) + defer cleanUp(dir) + + cfg := Config{SegmentSize: 10 * 1024 * 1024} + w, _ := Open(dir, &cfg) + + // Write 100 entries (IDs 1 to 100) + for i := 1; i <= 100; i++ { + w.Write([]byte(fmt.Sprintf("val-%d", i))) + } + w.Close() + + // Re-open for reading + w2, _ := Open(dir, &cfg) + defer w2.Close() + iter, _ := w2.NewReader() + defer iter.Close() + + // Test Case 1: Seek to middle (ID 50) + if !iter.Seek(50) { + t.Fatalf("Seek(50) failed") + } + if iter.Index() != 50 { + t.Errorf("Expected index 50, got %d", iter.Index()) + } + if string(iter.Value()) != "val-50" { + t.Errorf("Expected val-50, got %s", iter.Value()) + } + + // Test Case 2: Continue reading from 50 + iter.Next() + if iter.Index() != 51 { + t.Errorf("Expected next index 51, got %d", iter.Index()) + } + + // Test Case 3: Seek to non-existent future ID + if iter.Seek(200) { + t.Errorf("Seek(200) should fail for 100 entries") + } + + // Test Case 4: Seek backwards (Should rely on Re-creating Reader or just work if implemented) + // Current Seek implementation assumes forward scan from current position OR strictly forward? + // The current Reader implementation scans from current file/position. + // If we want random seek, we usually assume it works if we haven't passed it, + // OR we might need to reset reader. For now let's test a new reader. + iter2, _ := w2.NewReader() + defer iter2.Close() + if !iter2.Seek(10) { + t.Errorf("Seek(10) failed") + } + if iter2.Index() != 10 { + t.Errorf("Expected index 10, got %d", iter2.Index()) + } +} + +func TestWAL_Cleanup(t *testing.T) { + dir := "./test_data_cleanup" + cleanUp(dir) + defer cleanUp(dir) + + // Small segment size to generate many files + cfg := Config{SegmentSize: 500} // ~20 entries per file + w, _ := Open(dir, &cfg) + + // Write 100 entries -> Should create ~5 files + for i := 0; i < 100; i++ { + w.Write([]byte("payload")) + } + + // 1. Test TruncateFront + // Files might be: wal-0000, wal-0001, wal-0002... + // Truncate everything before index 2 (delete 0 and 1) + // Note: TruncateFront param is 'segmentIdx', not log SeqID. + // We need to know segment indices. + initialFiles, _ := os.ReadDir(dir) + if len(initialFiles) < 3 { + t.Skip("Not enough files generated for Cleanup test") + } + + // Call TruncateFront with index 1 (should keep 1 and greater, delete 0) + err := w.TruncateFront(1) + if err != nil { + t.Errorf("TruncateFront failed: %v", err) + } + + // 2. Test CleanupBySize + // Limit total size to ~500 bytes (should keep only active + maybe 1 closed) + err = w.CleanupBySize(600) + if err != nil { + t.Errorf("CleanupBySize failed: %v", err) + } + + w.Close() +} + +func TestWAL_CorruptionRecovery(t *testing.T) { + dir := "./test_data_corrupt" + cleanUp(dir) + defer cleanUp(dir) + + cfg := Config{SegmentSize: 1024 * 1024} + w, _ := Open(dir, &cfg) + + // Write valid data + w.Write([]byte("valid-1")) + w.Write([]byte("valid-2")) + w.Close() + + // Manually corrupt the file + files, _ := os.ReadDir(dir) + lastFile := filepath.Join(dir, files[len(files)-1].Name()) + f, err := os.OpenFile(lastFile, os.O_APPEND|os.O_WRONLY, 0600) + if err != nil { + t.Fatalf("Failed to open file for corruption: %v", err) + } + // Write garbage bytes (partial header or random data) + f.Write([]byte{0xDE, 0xAD, 0xBE, 0xEF}) + f.Close() + + // Re-open WAL -> Should detect and truncate garbage + w2, err := Open(dir, &cfg) + if err != nil { + t.Fatalf("Failed to recover WAL: %v", err) + } + defer w2.Close() + + // Verify data (should have 2 valid entries) + iter, _ := w2.NewReader() + count := 0 + for iter.Next() { + count++ + // fmt.Printf("Recovered: %s\n", string(iter.Value())) + } + if count != 2 { + t.Errorf("Expected 2 recovered entries, got %d", count) + } +} + +// TestWAL_SyncLogic checks if Sync can be called without error. +// Hard to verify disk flush in unit test, but ensures no panics. +func TestWAL_SyncLogic(t *testing.T) { + dir := "./test_data_sync" + cleanUp(dir) + defer cleanUp(dir) + + w, _ := Open(dir, &Config{SyncStrategy: SyncStrategyAlways}) + w.Write([]byte("data")) + if err := w.Sync(); err != nil { + t.Errorf("Manual Sync failed: %v", err) + } + w.Close() +} + +// TestWAL_CleanupByTTL verifies that old segments are deleted. +func TestWAL_CleanupByTTL(t *testing.T) { + dir := "./test_data_ttl" + cleanUp(dir) + defer cleanUp(dir) + + // Create segments quickly + cfg := Config{SegmentSize: 100} + w, _ := Open(dir, &cfg) + + // Write a segment + for i := 0; i < 10; i++ { + w.Write([]byte("old-data")) + } + // Force rotation by creating new active segment implicitly via writes + // Wait a bit to simulate "old" time + time.Sleep(10 * time.Millisecond) + + // We need to modify the modtime of the generated file to make it "old" + w.mu.Lock() + // Close active to rotate + w.createActiveSegment(w.activeSegment.idx + 1) + w.mu.Unlock() + + // Hack: Modify ModTime of the first segment + entries, _ := os.ReadDir(dir) + if len(entries) > 0 { + oldFile := filepath.Join(dir, entries[0].Name()) + // Set time to 2 hours ago + oldTime := time.Now().Add(-2 * time.Hour) + os.Chtimes(oldFile, oldTime, oldTime) + } + + // Run CleanupByTTL (TTL = 1 hour) + if err := w.CleanupByTTL(1 * time.Hour); err != nil { + t.Errorf("CleanupByTTL failed: %v", err) + } + + w.Close() +} From 31bebc6b22009f76677780ccd4bf587ed8bea348 Mon Sep 17 00:00:00 2001 From: hungpdn Date: Wed, 14 Jan 2026 23:08:47 +0700 Subject: [PATCH 3/3] wal: update readme --- README.md | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/README.md b/README.md index 457d202..9f92f27 100644 --- a/README.md +++ b/README.md @@ -116,18 +116,6 @@ Contributions are welcome! Please fork the repository and open a pull request. MIT License. See [LICENSE](LICENSE) file. -## TODO - -- [ ] Log level, metrics -- [x] Tests -- [ ] Retention policy (remove, upload to s3, gcs, etc) -- [ ] Index -- [x] CI -- [ ] Benchmarks -- [ ] Documentation -- [x] Open source template (makefile, license, code of conduct, contributing, etc) -- [ ] Distributed Replication - ## Reference - [tidwall/wal](https://github.com/tidwall/wal)