Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,17 @@ A high-performance, concurrent-safe, and crash-resilient **Write-Ahead Log (WAL)
- 🔄 **Log Rotation**: Automatic segment rotation based on configurable size.
- 💾 **Flexible Sync Strategies**: Choose between Performance (Background), Safety (Always), or Balance (OSCache).
- 🔍 **Iterator API**: Memory-efficient sequential reading of logs.
- 🧵 **Crash Safety:** Automatic recovery from power failures. Detects and truncates corrupted log tails (partial writes) on startup.
- ⚡ **Optimized Startup**: Uses reverse scanning to instantly recover the last segment state without reading the whole file.
- 🧹 **Retention Policies**: Automatic cleanup based on TTL (Time-To-Live) or Total Size.

## Roadmap

The following features are planned for future releases:

- [ ] **v0.1.1 - Compression Support**: Add Snappy/Zstd compression for payloads to reduce disk usage.
- [ ] **v0.1.2 - Sparse Indexing**: Implement a sidecar `.idx` file to support O(1) lookup time for `Seek(SeqID)`.
- [ ] **v0.1.3 - Metrics & Observability**: OpenTelemetry / Prometheus integration for monitoring throughput and latency.
- [ ] **v0.2.0 - Replication Hooks**: APIs to support streaming WAL entries to other nodes (Raft/Paxos integration).

## Architecture

Expand All @@ -23,17 +33,18 @@ A high-performance, concurrent-safe, and crash-resilient **Write-Ahead Log (WAL)
Each segment file consists of a sequence of binary encoded entries.

```Plaintext
+-------------------+-------------------+-------------------+----------------------+
| CRC32 (4 bytes) | Size (8 bytes) | SeqID (8 bytes) | Payload (N bytes) |
+-------------------+-------------------+-------------------+----------------------+
| Checksum of Data | Length of Payload | Monotonic ID | The actual data |
+-------------------+-------------------+-------------------+----------------------+
+-------------------+-------------------+-------------------+----------------------+-------------------+
| CRC32 (4 bytes) | Size (8 bytes) | SeqID (8 bytes) | Payload (N bytes) | Size (8 bytes) |
+-------------------+-------------------+-------------------+----------------------+-------------------+
| Checksum of Data | Length of Payload | Monotonic ID | The actual data | Backward Pointer |
+-------------------+-------------------+-------------------+----------------------+-------------------+
```

- CRC (Cyclic Redundancy Check): Ensures data integrity.
- Size: Enable fast reading without parsing the entire file.
- Size: Enable fast forward reading (skipping payloads).
- SeqID: Global Sequence ID
- Payload: The actual data.
- Size (Footer): Enable fast reverse reading for optimized startup recovery.

## Installation

Expand Down Expand Up @@ -106,17 +117,11 @@ if err := iter.Err(); err != nil {

Contributions are welcome! Please fork the repository and open a pull request.

1. Fork the Project.
2. Create your Feature Branch (`git checkout -b feature/AmazingFeature`)
3. Commit your Changes (`git commit -m 'Add some AmazingFeature'`)
4. Push to the Branch (`git push origin feature/AmazingFeature`)
5. Open a Pull Request

## License

MIT License. See [LICENSE](LICENSE) file.

## Reference
## References

- [tidwall/wal](https://github.com/tidwall/wal)
- [Log: What Every Software Engineer Should Know About Real-Time Data's Unifying Abstraction](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying)
- [Designing Data-Intensive Applications](https://www.oreilly.com/library/view/designing-data-intensive-applications/9781491903063)
28 changes: 15 additions & 13 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (it *Iterator) Next() bool {

// Loop to handle switching between segment files
for {
// If file is not opened or we've read all of the old file -> Open new file
if it.currentFile == nil {
if it.currentIdx >= len(it.segmentPaths) {
return false // All files have been read
Expand All @@ -73,15 +72,11 @@ func (it *Iterator) Next() bool {
path := it.segmentPaths[it.currentIdx]
f, err := os.Open(path)
if err != nil {
// If file is deleted (e.g. retention policy), we report error
// or we could skip it using it.currentIdx++ (data loss scenario)
// For strict consistency, we return error.
it.err = err
return false
}

it.currentFile = f
// Create a buffer reader for faster reading
br := bufio.NewReader(f)
reader := io.Reader(br)
it.currentReader = &reader
Expand All @@ -92,14 +87,12 @@ func (it *Iterator) Next() bool {
_, err := io.ReadFull(*it.currentReader, header)

if err != nil {
// If EOF is encountered, close the current file and increment the index so that the next iteration opens a new file
if err == io.EOF {
it.currentFile.Close()
it.currentFile = nil
it.currentIdx++
continue
}
// If unexpected error occurs (UnexpectedEOF) -> Report error
it.err = err
return false
}
Expand All @@ -114,6 +107,12 @@ func (it *Iterator) Next() bool {
return false
}

// Skip Footer (8 bytes)
if _, err := io.CopyN(io.Discard, *it.currentReader, int64(footerSize)); err != nil {
it.err = err
return false
}

neededSize := sizeSize + seqIDSize + len(payload)
if cap(it.verifyBuf) < neededSize {
it.verifyBuf = make([]byte, neededSize)
Expand Down Expand Up @@ -143,7 +142,6 @@ func (it *Iterator) Index() uint64 {

// Value: Get the data of the current log entry
func (it *Iterator) Value() []byte {
// Return a copy for safety, or return the original slice if you want zero-allocation (be careful)
out := make([]byte, len(it.currentEntry))
copy(out, it.currentEntry)
return out
Expand All @@ -164,10 +162,8 @@ func (it *Iterator) Close() error {
}

// Seek: Fast-forward to the record with ID >= startID
// It will skip reading the payload and verifying the CRC of older records
func (it *Iterator) Seek(startID uint64) bool {
for {

if it.currentFile == nil {
if it.currentIdx >= len(it.segmentPaths) {
return false
Expand Down Expand Up @@ -200,20 +196,26 @@ func (it *Iterator) Seek(startID uint64) bool {
seqID := binary.BigEndian.Uint64(header[crcSize+sizeSize : headerSize])

if seqID < startID {
// Use Discard to jump over byte 'size' without copying the data
if _, err := io.CopyN(io.Discard, *it.currentReader, int64(size)); err != nil {
// Skip payload + footer efficiently
if _, err := io.CopyN(io.Discard, *it.currentReader, int64(size)+int64(footerSize)); err != nil {
it.err = err
return false
}
continue
} else {

// Read Payload
payload := make([]byte, size)
if _, err := io.ReadFull(*it.currentReader, payload); err != nil {
it.err = err
return false
}

// Read and discard footer
if _, err := io.CopyN(io.Discard, *it.currentReader, int64(footerSize)); err != nil {
it.err = err
return false
}

verifyBuf := make([]byte, sizeSize+seqIDSize+len(payload))
binary.BigEndian.PutUint64(verifyBuf[:sizeSize], size)
binary.BigEndian.PutUint64(verifyBuf[sizeSize:sizeSize+seqIDSize], seqID)
Expand Down
Loading
Loading