A memory-efficient streaming data loader designed for LLM pretraining under limited CPU and GPU memory constraints.
Streaming-Dataloader is a high-performance data loading solution that enables training large language models on massive datasets without overwhelming system memory. It uses smart caching, sliding window techniques, and distributed processing to handle terabyte-scale datasets efficiently.
- Memory Efficient: LRU cache mechanism controls memory usage, preventing OOM errors
- Streaming Processing: Processes data chunks on-demand without loading entire datasets
- Sliding Window: Maximizes data utilization through configurable stride patterns
- Dynamic Shifts: Increases training data diversity with randomized sequence shifts
- Distributed Ready: Built-in support for multi-GPU and multi-node training
- Thread Safe: Robust concurrent data loading with multiple workers
- Low Memory Footprint: Works efficiently with <32GB CPU RAM and <24GB GPU memory
- Scalable: Handles TB-scale datasets through intelligent chunking
- Fast Loading: Binary search optimization for rapid chunk location
- High Throughput: Optimized batch processing with shared shift values
git clone https://github.com/your-username/Streaming-Dataloader.git
cd Streaming-DataloaderThis project requires the following packages with tested versions:
- PyTorch: 2.4.0
- CUDA: 12.1
- datasets: 3.5.1
- transformers: 4.51.3
- tqdm: 4.66.5
- numpy: 1.26.4
Please install these dependencies according to your environment setup.
First, prepare your dataset by tokenizing and chunking:
cd prepare/
# Optional: For users who cannot use HF, use HF-mirror
export HF_ENDPOINT="https://hf-mirror.com"
# Prepare FineWeb-Edu dataset (example)
python fineweb_edu.py \
--tokenizer gpt2 \
--data_name sample-10BT \
--output_path ./data/fineweb-edu-sample-10BT/ \
--tokens_per_chunk 100000000 \
--max_samples 1000Parameters:
--tokenizer: Tokenizer model (default: gpt2)--data_name: Dataset name from HuggingFace--output_path: Output directory for processed chunks--tokens_per_chunk: Tokens per chunk (default: 100M)--max_samples: Maximum samples to process (optional)
from dataset import SlidingTokenDataset
from torch.utils.data import DataLoader
# Create dataset
dataset = SlidingTokenDataset(
dataset_path="./data/fineweb-edu-sample-10BT",
split="train",
split_rate=0.9,
seq_len=1024,
stride=512,
batch_size=16,
cache_capacity=2
)
# Create dataloader
dataloader = DataLoader(
dataset,
batch_size=16,
num_workers=4,
pin_memory=True
)
# Training loop
for epoch in range(10):
dataset.set_epoch(epoch) # Important: set epoch for randomization
for batch in dataloader:
input_ids = batch["input_ids"] # [batch_size, seq_len//shift, shift]
labels = batch["labels"] # [batch_size, seq_len//shift, shift]
shift = batch["shift"] # shift value used for this batch
# Your training code here
loss = model(input_ids, labels=labels)
loss.backward()# Run with torchrun
torchrun --nproc_per_node=4 demo.py \
--data_path ./data/fineweb-edu-sample-10BT \
--seq_len 2048 \
--stride 1024 \
--batch_size 8 \
--num_workers 2# Single GPU
python demo.py --data_path ./data/fineweb-edu-sample-10BT
# Multi-GPU (DDP)
torchrun --nproc_per_node=2 demo.py --data_path ./data/fineweb-edu-sample-10BT| Parameter | Type | Default | Description |
|---|---|---|---|
dataset_path |
str | None | Path to processed dataset chunks |
split |
str | "train" | Dataset split ("train" or "validation") |
split_rate |
float | 1.0 | Train/validation split ratio |
seq_len |
int | 1024 | Sequence length |
stride |
int | 512 | Sliding window stride |
batch_size |
int | 1 | Batch size for shift grouping |
m |
int | 1 | Fixed shift value (default 1 for next token prediction) |
seed |
int | 42 | Random seed |
rank |
int | 0 | Process rank for distributed training |
world_size |
int | 1 | Total number of processes |
cache_capacity |
int | 2 | LRU cache capacity (number of chunks) |
tokens_per_chunk |
float | 1e8 | Expected tokens per chunk |
Original Dataset β Tokenize β Split into Chunks β Save to Disk
[Raw Text] β [Tokens] β [Chunk_0, Chunk_1, ...] β [chunk_000000/, chunk_000001/, ...]
Sequence: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
stride=4, seq_len=8, shift=2
Sample 1: input=[0,1,2,3,4,5,6,7], label=[2,3,4,5,6,7,8,9]
Sample 2: input=[4,5,6,7,8,9,10,11], label=[6,7,8,9,10,11,12,13]
- LRU Cache: Only keeps recently accessed chunks in memory
- Lazy Loading: Loads chunks only when needed
- Garbage Collection: Automatic cleanup of unused chunks
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Data Prep β β Sliding Window β β LRU Cache β
β (streaming) βββββΆβ Processing βββββΆβ Management β
β β β β β β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Tokenized Chunksβ β Batched Samples β β Memory Efficientβ
β chunk_000000/ β β with Shifts β β Loading β
β chunk_000001/ β β β β β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
| Component | Memory Impact | Notes |
|---|---|---|
| Chunk Cache | cache_capacity Γ chunk_size |
Typically 2 Γ 100M tokens |
| Batch Buffer | batch_size Γ seq_len Γ dtype |
Temporary batch storage |
| Metadata | Minimal | Only stores chunk lengths and indices |
Example: With cache_capacity=2, tokens_per_chunk=1e8, each chunk ~400MB, total cache ~800MB.
# Use fixed shift for consistent batching
dataset = SlidingTokenDataset(
dataset_path="./data/your-dataset",
seq_len=1024,
m=256, # Fixed shift value
)
# Let the system choose optimal shifts automatically
dataset = SlidingTokenDataset(
dataset_path="./data/your-dataset",
seq_len=1024,
# m=None (default) - uses proper divisors of seq_len
)# For memory-constrained environments
dataset = SlidingTokenDataset(
dataset_path="./data/your-dataset",
cache_capacity=1, # Minimal cache
tokens_per_chunk=5e7, # Smaller chunks
)
# For high-memory environments
dataset = SlidingTokenDataset(
dataset_path="./data/your-dataset",
cache_capacity=10, # Larger cache
tokens_per_chunk=2e8, # Bigger chunks
)Out of Memory (OOM)
# Reduce cache capacity
cache_capacity=1
# Reduce batch size
batch_size=8
# Use smaller chunks in preprocessing
tokens_per_chunk=5e7Slow Loading
# Increase cache capacity (if memory allows)
cache_capacity=5
# Increase number of workers
num_workers=4
# Use larger chunks
tokens_per_chunk=2e8Data Imbalance in DDP
# Ensure proper distributed setup
dataset = SlidingTokenDataset(
dataset_path="./data/your-dataset",
rank=rank, # Must set rank
world_size=world_size # Must set world_size
)- Optimize Cache Size: Balance between memory usage and I/O operations
- Tune Chunk Size: Larger chunks = fewer files but more memory per chunk
- Use SSD Storage: Significantly improves chunk loading speed
- Pin Memory: Use
pin_memory=Truein DataLoader for GPU training - Proper Workers: Set
num_workersbased on CPU cores and I/O capacity
Contributions are welcome! Please feel free to:
- Report bugs and issues
- Suggest new features
- Submit pull requests
- Improve documentation
This project is licensed under the MIT License - see the LICENSE file for details.
- HuggingFace for the datasets library
- PyTorch team for distributed training utilities
- FineWeb-Edu dataset for providing high-quality training data
Built for efficient LLM pretraining π