基于 Bitcask 模型的高性能键值存储引擎,Python 实现的教学版本。
- 高性能读写 - 单次磁盘 IO 完成操作
- 内存索引 - 全内存 Key 索引,毫秒级查询
- 原子事务 - 批量写入支持 ACID 特性
- 数据合并 - 自动回收无效数据空间
- 崩溃恢复 - 自动从数据文件重建索引
- 多种索引 - BTree / SkipList 可选
- LRU 缓存 - 热点数据缓存加速
- 数据压缩 - 可选 zlib 压缩节省空间
- 布隆过滤器 - 快速判断 Key 不存在
graph TB
subgraph SangDB引擎
direction TB
subgraph 写入路径
W[写操作<br/>put/delete] --> WAL[WAL缓冲区]
WAL --> DF[数据文件<br/>.data]
DF --> IDX[内存索引<br/>BTree/SkipList]
IDX --> CACHE[LRU缓存]
end
subgraph 读取路径
R[读操作<br/>get] --> CACHE
CACHE -->|命中| RET1[返回数据]
CACHE -->|未命中| IDX
IDX --> POS[获取位置]
POS --> READ[读取磁盘]
READ --> CRC[CRC校验]
CRC --> RET2[返回数据]
end
subgraph 辅助组件
BF[布隆过滤器<br/>BloomFilter]
MD[元数据文件<br/>Metadata]
WB[批量写入<br/>WriteBatch]
MG[数据合并<br/>Merge]
end
DF -.-> MD
IDX -.-> BF
W -.-> WB
DF -.-> MG
end
style W fill:#e1f5ff
style R fill:#fff4e1
style WB fill:#f0f0f0
style MG fill:#f0f0f0
graph LR
subgraph 写入流程
WD[用户数据] --> ENC[LogRecord编码]
ENC --> WCRC[CRC校验]
WCRC --> DISK1[写入磁盘]
DISK1 --> UIDX[更新索引]
UIDX --> UCACHE[更新缓存]
end
subgraph 读取流程
QK[查询Key] --> CHK{检查缓存}
CHK -->|命中| RHIT[返回]
CHK -->|未命中| QIDX[查询索引]
QIDX --> GETPOS[获取位置]
GETPOS --> RDISK[读取磁盘]
RDISK --> RCRC[CRC校验]
RCRC --> RMISS[返回]
end
sangdb/
├── __init__.py 入口文件
├── config.py 配置选项
├── errors.py 错误定义
├── db.py 核心引擎(含 WriteBatch/Merge/Iterator)
├── utils.py 工具集(含 Compressor/BloomFilter/LRUCache/Metadata)
├── log_record.py 日志记录编码
├── data_file.py 数据文件管理
├── io_manager.py IO 抽象层(FileIO/MmapIO)
├── index.py 索引实现(BTree/SkipList)
└── logger.py 日志系统
graph TD
INIT[__init__.py] --> DB[db.py]
INIT --> UTILS[utils.py]
DB --> CONFIG[config.py]
DB --> ERRORS[errors.py]
DB --> LOG_RECORD[log_record.py]
DB --> DATA_FILE[data_file.py]
DB --> INDEX[index.py]
DB --> UTILS
DATA_FILE --> IO_MGR[io_manager.py]
DATA_FILE --> LOG_RECORD
UTILS --> LOGGER[logger.py]
IO_MGR --> LOGGER
DATA_FILE --> LOGGER
DB --> LOGGER
INDEX --> LOGGER
style DB fill:#ffcccc
style UTILS fill:#ccffcc
style DATA_FILE fill:#ccccff
[CRC:4字节][类型:1字节][Key长度:4字节][Value长度:4字节][Key][Value]
- BTree: 基于 SortedDict,有序存储
- SkipList: 跳表实现,概率平衡
- ShardedIndex: 分片索引,降低锁竞争
- Append-Only: 仅追加写入,不修改旧数据
- Log-Structured: 日志结构化存储
- Copy-on-Write: 更新时写入新记录
事务序列号编码: [seq_no:8字节][原始key]
事务提交标记: sangdb-tx-finishsequenceDiagram
participant User
participant DB
participant OldFiles as 旧数据文件
participant MergeDB as 临时合并DB
participant NewFiles as 新数据文件
User->>DB: 调用 merge()
DB->>DB: 设置合并标志
DB->>DB: 刷新 WAL
DB->>OldFiles: 获取需要合并的文件
DB->>MergeDB: 创建临时数据库
loop 遍历旧文件
OldFiles->>DB: 读取记录
DB->>DB: 检查索引中的位置
alt 记录仍然有效
DB->>MergeDB: 写入新记录
MergeDB->>NewFiles: 写入 Hint 索引
else 记录已过期
DB->>DB: 跳过
end
end
MergeDB->>NewFiles: 写入 merge.fina 标记
DB->>OldFiles: 删除旧文件
DB->>DB: 清除合并标志
DB->>User: 合并完成
import sangdb
opts = sangdb.Options()
opts.dir_path = './mydb'
opts.data_file_size = 256 * 1024 * 1024
opts.enable_cache = True
opts.enable_compression = True
db = sangdb.open_db(opts)
db.put(b'name', b'张三')
value = db.get(b'name')
print(value)
db.delete(b'name')
db.close()batch_opts = sangdb.WriteBatchOptions()
batch = sangdb.WriteBatch(db, batch_opts)
for i in range(1000):
batch.put(f'key{i}'.encode(), f'value{i}'.encode())
batch.commit()opts = sangdb.IteratorOptions()
opts.prefix = b'user:'
opts.reverse = False
it = db.new_iterator(opts)
it.rewind()
while it.valid():
print(it.key(), it.value())
it.next()
it.close()sangdb.merge(db)py sangdb/test_basic.py- 写入: ~10,000-50,000 ops/s
- 读取: ~50,000-200,000 ops/s
- 延迟: 单次操作 < 1ms
- 内存: O(key数量)
| 参数 | 说明 | 默认值 |
|---|---|---|
| dir_path | 数据目录 | ~/sangdb_data |
| data_file_size | 单文件大小 | 256MB |
| sync_write | 同步写入 | False |
| index_type | 索引类型 | BTREE |
| enable_cache | 启用缓存 | True |
| cache_size | 缓存容量 | 10000 |
| enable_compression | 启用压缩 | False |
- 简单优先 - 代码清晰易懂,适合教学
- 性能为王 - 单次磁盘 IO,内存索引
- 数据安全 - CRC 校验,原子操作,崩溃恢复
- 可扩展性 - 模块化设计,易于扩展
✅ 适合:
- 读多写少场景
- Key 数量可控(百万级)
- 需要快速查询
- 教学和学习
❌ 不适合:
- 超大规模数据(亿级 Key)
- 需要范围查询优化
- 多节点分布式
- WAL 一致性修复 - 异步模式下立即写入文件,延迟 fsync
- Partial Write 检测 - 确保写入完整性
- 元数据持久化 - 事务序列号持久化,防止重启冲突
- 原子文件替换 - Merge 使用 os.replace 保证原子性
- 零注释代码 - 代码即文档,清晰明了
- v2.0.2 - 文件合并,移除注释,重构架构
- v2.0.1 - 修复 WAL 一致性,添加 utils 模块
- v2.0.0 - 初始教学版本
MIT License