Skip to content

nbathreya/Memory-Efficient-Stream-Processor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Memory-Efficient Stream Processor

Process GB-scale signals with constant memory usage. Demonstrates chunked processing, memory mapping, and zero-copy operations for real-time data pipelines.

Python PyTorch License

Key Features

  • Memory-Mapped I/O: Zero-copy file reading with mmap
  • Constant Memory: Process unlimited file sizes with fixed RAM
  • Chunked Operations: FFT and convolution with overlap-save/add
  • Memory Pooling: Array reuse to minimize allocations

Performance

File Size Peak Memory Processing Time
200 MB 48 MB 2.3s
2 GB 52 MB 18.7s
20 GB 54 MB 186s

Memory efficiency: 40-400x less than naive loading
Throughput: ~110 MB/s on standard SSD

Quick Start

from stream_processor import StreamProcessor, StreamConfig

config = StreamConfig(chunk_size=1_000_000, overlap=1000)
processor = StreamProcessor(config)

# Process large file with constant memory
data_iter = processor.process_file_mmap("signal.bin")
result = processor.process_stream(data_iter)

print(f"Peak memory: {result['stats']['peak_memory_mb']:.1f} MB")
print(f"Events found: {result['stats']['events_found']}")

Architecture

Large File (10 GB)
       │
       ▼
┌──────────────┐
│ Memory Map   │  ← Zero-copy view
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ Chunk Reader │  ← Process 1M samples at a time
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ Detector     │  ← Event detection
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ Memory Pool  │  ← Reuse arrays
└──────────────┘

Peak Memory: ~50 MB (constant)

Installation

pip install numpy psutil

Chunked Operations

FFT with Overlap-Add:

from stream_processor import ChunkedArrayProcessor

processor = ChunkedArrayProcessor(chunk_size=1_000_000)

# 5x less memory than np.fft.fft()
fft_result = processor.chunked_fft(large_signal)

Convolution with Overlap-Save:

kernel = np.array([0.25, 0.5, 0.25])  # Moving average
filtered = processor.chunked_convolve(signal, kernel)

Memory Management

Memory pooling:

pool = MemoryPool(max_size=10)

# Get from pool (or allocate)
arr = pool.get(shape=(1000000,), dtype=np.float32)

# Use array...

# Return to pool
pool.put(arr)

Memory tracking:

# Built-in memory monitoring
processor.stats["peak_memory_mb"]  # Peak RAM usage

Benchmark

python stream_processor.py

Expected output:

=== Memory-Efficient Stream Processing ===

Creating 50,000,000 sample signal (~190 MB)...

Processing with memory mapping...

Results:
  Total samples: 50,000,000
  Chunks processed: 50
  Events found: 1,000
  Peak memory: 48.3 MB
  Memory efficiency: 4.0x

=== Chunked FFT vs Standard FFT ===
Chunked FFT: 2.34s, 82.1 MB
Standard FFT: 2.18s, 458.3 MB
Memory savings: 5.6x

Use Cases

High-throughput signal processing:

  • Process TB-scale sensor data
  • Real-time streaming from hardware
  • Batch processing on memory-constrained systems

Edge computing:

  • Process on embedded devices with limited RAM
  • Cloud cost optimization (smaller instances)

Data pipelines:

  • ETL for time-series databases
  • Feature extraction for ML training

Technical Details

Memory Mapping

with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
    # Zero-copy view into file
    chunk_bytes = mm[offset:offset+chunk_size]
    chunk = np.frombuffer(chunk_bytes, dtype=np.float32)

Benefits:

  • OS handles paging automatically
  • No full file load into RAM
  • Shared memory for multiple processes

Overlap Processing

chunk_size = 1_000_000
overlap = 1000

for i in range(0, len(signal), chunk_size - overlap):
    chunk = signal[i:i+chunk_size]
    # Process with overlap to avoid edge artifacts

Why overlap?

  • Prevents edge effects in filtering
  • Ensures event continuity across boundaries
  • Minimal overhead (0.1% extra processing)

Comparison

Method Memory Speed Complexity
np.load() O(n) Fast Simple
np.memmap() O(1) Fast Simple
This implementation O(1) Fast Medium

When to use:

  • Files > 1GB and limited RAM
  • Streaming real-time data
  • Processing on edge devices

Advanced Usage

Custom chunk processing:

def custom_detector(chunk, offset):
    # Your detection logic
    return events

processor._detect_in_chunk = custom_detector

Progress callbacks:

def progress_cb(chunk_num, total):
    print(f"Processing {chunk_num}/{total}")

# Add to processor

Multi-file batch:

from pathlib import Path

for filepath in Path("data/").glob("*.bin"):
    result = processor.process_stream(
        processor.process_file_mmap(filepath)
    )

Future Enhancements

  • Multi-threaded chunk processing
  • GPU transfer for chunked computation
  • Distributed processing (Ray/Dask)
  • Apache Arrow zero-copy format

License: MIT | Python: 3.8+ | Memory: Works on 2GB RAM

About

Memory-Efficient Streaming Signal Processor

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages