Skip to content

[Phase 2] Implement PrefetchQueue for I/O overlap #76

@zhexuany

Description

@zhexuany

Summary

Implement PrefetchQueue to hide I/O latency by downloading the next job while processing the current one. This is a critical performance component from DISTRIBUTED_DESIGN.md.

Context

From DISTRIBUTED_DESIGN.md:

Prefetch Pipeline: Download next job while processing current (hides I/O latency)

The design calls for 2 prefetch slots per worker to achieve 40% throughput improvement from I/O overlap.

Current State

  • Workers process one job at a time sequentially
  • Each job requires S3/OSS download before processing starts
  • I/O wait time adds directly to total processing time

Target State

┌─────────────────────────────────────────────────────────────────┐
│                    Worker Internal Architecture                  │
│                                                                  │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                   Prefetch Pipeline                      │    │
│  │  ┌─────────────┐     ┌─────────────┐                    │    │
│  │  │  Slot 1     │     │  Slot 2     │                    │    │
│  │  │ (downloading│     │ (queued)    │                    │    │
│  │  │  next job)  │     │             │                    │    │
│  │  └──────┬──────┘     └─────────────┘                    │    │
│  └─────────┼───────────────────────────────────────────────┘    │
│            │                                                     │
│  ┌─────────▼───────────────────────────────────────────────┐    │
│  │              Active Job Processing                       │    │
│  └──────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

Tasks

1. Define PrefetchQueue Structure

pub struct PrefetchQueue {
    /// Slots for prefetching (default: 2)
    slots: Vec<Option<PrefetchSlot>>,
    
    /// Background downloader task
    downloader: Arc<Downloader>,
    
    /// Available jobs to prefetch
    pending: Receiver<JobRecord>,
    
    /// Storage for downloads
    storage: Arc<Storage>,
}

struct PrefetchSlot {
    job: JobRecord,
    local_path: PathBuf,
    status: PrefetchStatus,
}

enum PrefetchStatus {
    Downloading,
    Ready,
    Failed(String),
}

2. Implement Parallel Range Downloader

pub struct ParallelDownloader {
    /// Number of parallel connections (default: 16)
    connections: usize,
    
    /// Chunk size for range requests (default: 8MB)
    chunk_size: usize,
}

impl ParallelDownloader {
    /// Download file using parallel range requests
    pub async fn download(&self, url: &str, dest: &Path) -> Result<()>;
    
    /// Resume from existing partial download
    pub async fn resume(&self, url: &str, dest: &Path) -> Result<()>;
}

3. Implement PrefetchQueue Logic

  1. Initialization: Start with 2 slots, queue first 2 jobs for download
  2. Get Next Job:
    • Return ready slot if available
    • Wait for download to complete if downloading
    • Queue next job download in background
  3. Download Strategy:
    • Use parallel range requests (16 connections)
    • Memory-mapped file for large downloads
    • Retry on failure with exponential backoff

4. Integrate with Worker

// In Worker::run()
let prefetch = PrefetchQueue::new(storage, config);

loop {
    // Get next ready job (blocks until download complete)
    let job = prefetch.next_ready().await?;
    
    // Process job (I/O already done)
    let result = self.process_job(&job).await?;
    
    // Notify prefetch to queue another download
    prefetch.notify_complete();
}

5. Add Configuration

pub struct PrefetchConfig {
    /// Number of prefetch slots
    pub slots: usize,  // default: 2
    
    /// Parallel download connections
    pub download_connections: usize,  // default: 16
    
    /// Range request chunk size
    pub chunk_size: usize,  // default: 8MB
    
    /// Local buffer directory
    pub buffer_dir: PathBuf,
}

6. Add Metrics

  • roboflow_prefetch_queue_depth (Gauge)
  • roboflow_prefetch_download_duration_seconds (Histogram)
  • roboflow_prefetch_bytes_downloaded_total (Counter)
  • roboflow_prefetch_errors_total (Counter)

Performance Target

Metric Before After Improvement
I/O Wait 3-8 sec Hidden (overlap) ~100%
Throughput ~536 Mbps/worker ~750 Mbps/worker +40%

Dependencies

  • Requires: Streaming S3 reader (exists)
  • Enables: 10 Gbps throughput target

Files to Create

  • crates/roboflow-distributed/src/prefetch.rs
  • crates/roboflow-distributed/src/prefetch/queue.rs
  • crates/roboflow-distributed/src/prefetch/downloader.rs

Files to Modify

  • crates/roboflow-distributed/src/worker.rs
  • crates/roboflow-distributed/src/lib.rs

Acceptance Criteria

  • PrefetchQueue with 2 slots implemented
  • Parallel range-request downloader (16 connections)
  • Background download while processing
  • Memory-mapped file handling for large downloads
  • Worker integration works
  • 40% throughput improvement measured
  • No memory leaks from buffer files
  • Tests pass with local and S3 storage

Metadata

Metadata

Assignees

No one assigned

    Labels

    area/pipelinePipeline processingenhancementNew feature or requestpriority/criticalMust be done first, blocks other worksize/LLarge: 1-2 weeks

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions