Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.25"
go-version: "1.22"

- name: Test
run: make test
30 changes: 9 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# WAL: High-Performance Write-Ahead Log in Go

![Go Version](https://img.shields.io/badge/go-1.25-blue)
![Go Version](https://img.shields.io/badge/go-1.22-blue)
![License](https://img.shields.io/badge/license-MIT-green)
[![Go Report Card](https://goreportcard.com/badge/github.com/hungpdn/wal)](https://goreportcard.com/report/github.com/hungpdn/wal)

Expand All @@ -24,14 +24,15 @@ Each segment file consists of a sequence of binary encoded entries.

```Plaintext
+-------------------+-------------------+-------------------+----------------------+
| CRC32 (4 bytes) | Size (8 bytes) | Offset (8 bytes) | Payload (N bytes) |
| CRC32 (4 bytes) | Size (8 bytes) | SeqID (8 bytes) | Payload (N bytes) |
+-------------------+-------------------+-------------------+----------------------+
| Checksum of Data | Length of Payload | Logical Position | The actual data |
| Checksum of Data | Length of Payload | Monotonic ID | The actual data |
+-------------------+-------------------+-------------------+----------------------+
```

- CRC (Cyclic Redundancy Check): Ensures data integrity.
- Size & Offset: Enable fast reading without parsing the entire file.
- Size: Enable fast reading without parsing the entire file.
- SeqID: Global Sequence ID
- Payload: The actual data.

## Installation
Expand All @@ -54,12 +55,11 @@ import (

func main() {
cfg := wal.Config{
WALDir: "./wal_data",
SegmentSize: 10 * 1024 * 1024, // 10MB
SyncStrategy: wal.SyncStrategyOSCache,
}

w, _ := wal.New(cfg)
w, _ := wal.Open("./wal_data", &cfg)
defer w.Close()

// Write data
Expand All @@ -71,7 +71,7 @@ func main() {
### Reading Data (Replay)

```go
w, _ := wal.New(cfg) // Auto-recovers on open
w, _ := wal.Open("", &cfg) // Auto-recovers on open

iter, _ := w.NewReader()
defer iter.Close()
Expand All @@ -86,15 +86,15 @@ if err := iter.Err(); err != nil {
}
```

### Configuration
### Config

|Field |Type |Default |Description |
|------------|------|----------|----------------------------------------------------|
|WALDir |string|./wal |Directory to store segment files. |
|SegmentSize |int64 |10MB |Max size of a single segment file before rotation. |
|BufferSize |int |4KB |Size of the in-memory buffer. |
|SyncStrategy|int |Background|0: Background, 1: Always (Fsync), 2: OSCache (Recm).|
|SyncInterval|uint |1000ms |Interval for background sync execution. |
|Mode |int |0 |0: debug, 1: prod. |

### Sync Strategies

Expand All @@ -116,18 +116,6 @@ Contributions are welcome! Please fork the repository and open a pull request.

MIT License. See [LICENSE](LICENSE) file.

## TODO

- [ ] Log level, metrics
- [x] Tests
- [ ] Retention policy (remove, upload to s3, gcs, etc)
- [ ] Index
- [ ] CI
- [ ] Benchmarks
- [ ] Documentation
- [x] Open source template (makefile, license, code of conduct, contributing, etc)
- [ ] Distributed Replication

## Reference

- [tidwall/wal](https://github.com/tidwall/wal)
Expand Down
23 changes: 17 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import "hash/crc32"
const (
KB = 1024 // 1 Kilobyte
MB = KB * KB // 1 Megabyte

PermMkdir = 0755
PermFileOpen = 0600
)

// SyncStrategy defines the synchronization strategy for WAL.
Expand All @@ -25,19 +28,25 @@ const (
SyncStrategyOSCache SyncStrategy = 2
)

type Mode int

const (
ModeDebug Mode = 0
ModeProd Mode = 1
)

// Config holds the configuration for WAL.
type Config struct {
WALDir string // Directory to store WAL files
BufferSize int // Buffered writes size in bytes (e.g., 4KB)
SegmentSize int64 // Maximum size of each file (e.g., 10MB)
SegmentPrefix string // Prefix for segment file names (e.g., "segment")
SyncStrategy SyncStrategy // Sync strategy
SyncInterval uint // Sync interval in milliseconds for background sync
Mode Mode
}

// DefaultConfig provides default configuration values for WAL.
var DefaultConfig = Config{
WALDir: "./wal",
BufferSize: 4 * KB, // 4KB
SegmentSize: 10 * MB, // 10MB
SegmentPrefix: "segment",
Expand All @@ -47,15 +56,17 @@ var DefaultConfig = Config{

// SetDefault sets default values for any zero-value fields in the Config.
func (cfg *Config) SetDefault() {
if cfg.WALDir == "" {
cfg.WALDir = DefaultConfig.WALDir
}
if cfg.BufferSize == 0 {
cfg.BufferSize = DefaultConfig.BufferSize
}
if cfg.SegmentSize < MB {
if cfg.SegmentSize == 0 {
cfg.SegmentSize = DefaultConfig.SegmentSize
}
if cfg.Mode == ModeProd {
if cfg.SegmentSize < MB {
cfg.SegmentSize = DefaultConfig.SegmentSize
}
}
if cfg.SegmentPrefix == "" {
cfg.SegmentPrefix = DefaultConfig.SegmentPrefix
}
Expand Down
162 changes: 112 additions & 50 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,96 +3,158 @@ package main
import (
"fmt"
"log"
"time"
"os"

"github.com/hungpdn/wal"
)

func main() {
walDir := "./wal_data"
_ = os.RemoveAll(walDir)

// ==========================================
// SCENARIO 1: NORMAL DATA RECORDING
// ==========================================
fmt.Println("🚀 [Phase 1] Initialize WAL & Write data...")
fmt.Println("🚀 WAL Library - Full Feature Demo")
fmt.Println("==================================")

// 1. Config
// 1. Configuration
// We use a very small SegmentSize (10KB) to demonstrate Log Rotation and Cleanup easily.
cfg := wal.Config{
WALDir: walDir,
BufferSize: 4 * 1024, // 4KB Buffer
SegmentSize: 100 * 1024, // 100KB per file (to test file rotation speed)
SyncStrategy: wal.SyncStrategyOSCache, // Strategy 2 (Recommended)
SyncInterval: 1000, // Sync every 1s
BufferSize: 4 * 1024, // 4KB Buffer
SegmentSize: 10 * 1024, // 10KB (Small for demo purposes)
SegmentPrefix: "wal", // Prefix: wal-0000.wal
SyncStrategy: wal.SyncStrategyOSCache, // Performance + Safety balanced
SyncInterval: 500, // Sync every 500ms
}

// 2. Initialize
w, err := wal.New(cfg)
// ==========================================
// PART 1: WRITING (Basic & Batch)
// ==========================================
fmt.Println("\n📝 [Part 1] Writing Data...")

w, err := wal.Open(walDir, &cfg)
if err != nil {
log.Fatalf("❌ Init failed: %v", err)
}

// 3. Write 5000 log
start := time.Now()
for i := 0; i < 5000; i++ {
payload := fmt.Sprintf("Log-Entry-%d: User A transferred $100 to B", i)
if err := w.Write([]byte(payload)); err != nil {
log.Fatalf("❌ Write error: %v", err)
}
// Simulate small delay to allow background sync to run
if i%1000 == 0 {
time.Sleep(10 * time.Millisecond)
// A. Basic Write
fmt.Println(" -> Writing 500 individual entries...")
for i := 0; i < 500; i++ {
payload := []byte(fmt.Sprintf("Entry-%d", i))
if err := w.Write(payload); err != nil {
log.Fatalf("Write error: %v", err)
}
}
fmt.Printf("✅ Write finished 5000 log in %v\n", time.Since(start))

// 4. Close securely
if err := w.Close(); err != nil {
log.Fatalf("❌ Close error: %v", err)
// B. Batch Write (Higher Throughput)
fmt.Println(" -> Writing 500 entries using WriteBatch...")
var batch [][]byte
for i := 500; i < 1000; i++ {
batch = append(batch, []byte(fmt.Sprintf("BatchEntry-%d", i)))
}
// Writes all 500 entries acquiring the lock only once
if err := w.WriteBatch(batch); err != nil {
log.Fatalf("Batch write error: %v", err)
}
fmt.Println("🔒 WAL has been closed. Data is now safe on disk.")

// Get current segment index to see rotation
lastIdx := w.GetLastSegmentIdx()
fmt.Printf(" ✅ Write complete. Current Active Segment Index: %d\n", lastIdx)

w.Close()

// ==========================================
// SCENARIO 2: RECOVERY & REPLAY
// PART 2: READING (Iterator / Checkpoint / Resume)
// ==========================================
fmt.Println("\n🔄 [Phase 2] Restart & Replay Simulation...")
fmt.Println("\n📖 [Part 2] Reading Data (Replay)...")

// 1. Reopen WAL (Automatic Recovery if errors occur)
w2, err := wal.New(cfg)
wRead, err := wal.Open(walDir, &cfg)
if err != nil {
log.Fatalf("❌ Recovery failed: %v", err)
log.Fatalf("Open failed: %v", err)
}
defer w2.Close()
defer wRead.Close()

// 2. Create Iterator to read
iter, err := w2.NewReader()
iter, err := wRead.NewReader()
if err != nil {
log.Fatalf("❌ Iterator failed: %v", err)
log.Fatalf("Reader failed: %v", err)
}
defer iter.Close()

// 3. Browse logs
count := 0
fmt.Println(" --- Start reading ---")
lastIndex := uint64(0)
for iter.Next() {
data := iter.Value()
// Print the first 3 lines and the last 3 lines as a test.
if count < 3 || count >= 4997 {
fmt.Printf(" [%4d] %s\n", count, string(data))

if count < 5 || count > 995 {
fmt.Printf("ID: %d | Data: %s\n", iter.Index(), string(iter.Value()))
}
if count == 3 {
fmt.Println(" ... (reading) ...")

if iter.Index() != lastIndex+1 {
panic("Missing gap!")
}

lastIndex = iter.Index()

count++
}

// 4. Check for errors
if err := iter.Err(); err != nil {
log.Printf("⚠️ Replay stopped due to error.: %v", err)
log.Printf("⚠️ Iterator stopped with error: %v", err)
}
fmt.Printf(" ✅ Read %d total records from disk.\n", count)

fmt.Println("\n📖 [Part 2.1] Resuming from Checkpoint...")

// Let's assume the app crashed at ID = 900 last time
checkpointID := uint64(500)
fmt.Printf(" -> Seeking to ID: %d ...\n", checkpointID)

iter, err = wRead.NewReader()
if err != nil {
log.Fatalf("Reader failed: %v", err)
}
defer iter.Close()

if found := iter.Seek(checkpointID); !found {
if iter.Err() != nil {
log.Fatalf("Seek failed: %v", iter.Err())
}
fmt.Println(" -> ID not found (end of log).")
} else {
fmt.Printf("✅ Replay successful: Read %d/5000 records.\n", count)
fmt.Printf(" ✅ Resumed! Found ID: %d | Data: %s\n", iter.Index(), string(iter.Value()))

for iter.Next() {
// Process logic...
if iter.Index() < 503 && iter.Index() > 500 {
fmt.Printf("ID: %d | Data: %s\n", iter.Index(), string(iter.Value()))
}
}
}

// ==========================================
// PART 3: RETENTION & CLEANUP
// ==========================================
fmt.Println("\n🧹 [Part 3] Retention & Cleanup...")

// To demonstrate cleanup, we need multiple segments.
// Since we wrote ~1000 entries with small SegmentSize, we should have multiple files.

// A. Cleanup By Size (Keep max 50KB)
fmt.Println(" -> Running CleanupBySize (Max 50KB)...")
// Note: 50KB is roughly 5 segments (since we set SegmentSize=10KB)
if err := wRead.CleanupBySize(50 * 1024); err != nil {
log.Printf("CleanupBySize warning: %v", err)
}

// B. Manual Truncate (Remove everything before Segment 2)
fmt.Println(" -> Running TruncateFront(2)...")
if err := wRead.TruncateFront(2); err != nil {
log.Printf("TruncateFront warning: %v", err)
}

if count == 5000 {
fmt.Println("🎉 DATA INTEGRITY 100%!")
// Check remaining files
entries, _ := os.ReadDir(walDir)
fmt.Printf(" ✅ Cleanup finished. Remaining segment files: %d\n", len(entries))
for _, e := range entries {
fmt.Printf(" - %s\n", e.Name())
}

fmt.Println("\n🎉 Demo Completed Successfully!")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/hungpdn/wal

go 1.25.0
go 1.22
Loading
Loading