Skip to content

feat: implement streaming image decoding and config-driven feature mappings#86

Merged
zhexuany merged 56 commits intomainfrom
feat/test-bag-conversion
Feb 7, 2026
Merged

feat: implement streaming image decoding and config-driven feature mappings#86
zhexuany merged 56 commits intomainfrom
feat/test-bag-conversion

Conversation

@zhexuany
Copy link
Contributor

@zhexuany zhexuany commented Feb 3, 2026

Summary

Enhances the bag-to-LeRobot conversion pipeline with image decoding support, config-driven feature naming, and improved distributed processing reliability.

Changes

Image Decoding

  • feat: Integrate image decoder into streaming converter
    • Enable image-decode feature by default for JPEG/PNG decoding
    • Handle Array<UInt8> format for encoded images in MCAP messages
    • Add decoder config to StreamingConfig defaults
    • Fix decoder to use Cursor for seekable input (image crate compatibility)

Config-Driven Feature Naming

  • refactor: Remove hardcoded "observation.images" prefix assumptions
    • camera_mappings(), state_mappings(), action_mappings() now filter by MappingType enum
    • camera_key() returns full feature path by default (config-driven)
    • Video directories, metadata, and upload paths use full feature paths directly
    • Supports any naming convention (e.g., obsv.images.cam_r, my.camera)

Distributed Processing

  • fix: Extract S3 object key from cloud URLs in distributed conversion
  • fix: Correct path validation to avoid false positives on valid local paths
  • fix: Handle Result from LocalStorage::full_path after path traversal fix
  • feat: Add async storage trait with security improvements
  • fix: Use ShutdownHandler's broadcast channel for graceful worker shutdown

Build & Development

  • chore: Add docker-compose targets to Makefile (dev-up, dev-down, dev-logs, dev-ps, dev-restart, dev-clean)
  • fix: Adopt robocodec v0.3 breaking API changes

Test Plan

  • Test local bag file conversion with image decoding
  • Test config with custom feature naming (non-standard prefixes)
  • Test distributed conversion with S3 input/output
  • Verify graceful shutdown on worker SIGTERM

Update robocodec dependency from rev 5a8e11b to 8adde9f and adapt
to the new decoded() API:

- Replace decode_messages_with_timestamp() with decoded()
- Handle Option<u64> timestamps with unwrap_or(0)
- Add require_feature("observation.state") for LeRobot streaming
- Fix LeRobot writer action forward-fill for missing data
- Detect action dimension from actual data instead of state dim
- Unify robocodec to workspace dependency across crates
- Update tests to use new API (remove ReaderBuilder, read_parallel)
- Fix ChannelInfo references to use public API path

This ensures BAG to LeRobot conversion works with the latest
robocodec library.
@greptile-apps
Copy link

greptile-apps bot commented Feb 3, 2026

Greptile Overview

Greptile Summary

Updated robocodec dependency from v0.2 (rev 5a8e11b) to v0.3 (rev 8adde9f) and adapted codebase to breaking API changes.

Key Changes:

  • Replaced decode_messages_with_timestamp() with decoded() API across all readers
  • Handled Option<u64> timestamps using .unwrap_or(0) fallback throughout
  • Unified robocodec to workspace dependency in roboflow-core and roboflow-pipeline Cargo.toml files
  • Removed deprecated ReaderBuilder from public API exports
  • Fixed ChannelInfo import paths from mcap::reader to io module
  • Added require_feature("observation.state") for LeRobot streaming converters
  • Improved LeRobot writer with action forward-fill logic and dimension detection from actual data

Issues Found:

  • Critical: ReaderStage::run() implementation is incomplete - it reads messages but never sends chunks to chunks_sender, breaking the pipeline compression stage
  • Test files now set message data field to empty vectors due to API limitations, reducing test coverage for message content integrity

Confidence Score: 2/5

  • This PR has a critical bug in the reader stage that breaks the hyper pipeline functionality
  • While most API migrations were done correctly, the ReaderStage::run() implementation is fundamentally broken - it iterates through messages but never sends chunks to the compression stage, which will cause pipeline failures
  • Pay critical attention to crates/roboflow-pipeline/src/stages/reader.rs - the reader stage needs to actually send chunks via chunks_sender

Important Files Changed

Filename Overview
crates/roboflow-dataset/src/streaming/converter.rs Updated to decoded() API, added require_feature for observation.state, handled Option timestamps
crates/roboflow-pipeline/src/dataset_converter/dataset_converter.rs Updated to decoded() API, handled Option timestamps throughout KPS and LeRobot conversion
crates/roboflow-dataset/src/lerobot/writer.rs Added forward-fill for missing actions, fixed action dimension detection from actual data instead of state dim
crates/roboflow-pipeline/src/stages/reader.rs Replaced parallel reading with decoded() API but implementation is incomplete - no chunks sent to compression stage
tests/pipeline_round_trip_tests.rs Replaced parallel reading with decoded() API, raw data field set to empty vec (limitation of new API)
tests/sequential_parallel_comparison_tests.rs Replaced parallel reading with decoded() API, raw data field set to empty vec (limitation of new API)

Sequence Diagram

sequenceDiagram
    participant Client
    participant RoboReader
    participant Old as decode_messages_with_timestamp()
    participant New as decoded()
    participant Message as TimestampedDecodedMessage
    
    Note over Client,Message: Old API (robocodec v0.2)
    Client->>RoboReader: decode_messages_with_timestamp()
    RoboReader->>Old: iterate
    Old-->>Client: (TimestampedMessage, ChannelInfo)
    Note over Client: log_time: u64<br/>channel in tuple
    
    Note over Client,Message: New API (robocodec v0.3)
    Client->>RoboReader: decoded()
    RoboReader->>New: iterate
    New-->>Client: TimestampedDecodedMessage
    Note over Client: log_time: Option<u64><br/>channel embedded in struct
    Client->>Client: unwrap_or(0) for timestamp
    Client->>Message: access msg.channel
    
    Note over Client,Message: Breaking Changes
    Note over Client: - Timestamps now Option<u64><br/>- ChannelInfo in message struct<br/>- ReaderBuilder removed<br/>- ParallelReader API removed
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

- Add AsyncStorage trait for clean async operations without nested runtimes
- Add AsyncOssStorage implementation using object_store directly
- Refactor OssStorage as sync wrapper with smart runtime handling

Security fixes:
- Add path traversal protection in LocalStorage::full_path()
- Make HTTP opt-in via allow_http config (defaults to false)
- Redact credentials in OssConfig Debug impl
- Add bucket name validation per S3/OSS naming rules

Testing:
- Add 6 new integration tests for OSS storage behavior
- Add security tests for path traversal attempts
- Add bucket name validation tests

Code quality:
- Fix all clippy warnings (or_else → or, collapsible if, etc.)
- Test suite: 112 tests passing
…paths

- Add normalize_root() helper to strip leading ./ for consistent comparison
- Fix Path::starts_with comparison by normalizing both paths
- Update StreamingDatasetConverter to pass filename only for LocalStorage
- Fix test_parse_storage_url_file to use path without leading ./

The previous path traversal validation was correctly blocking malicious
paths like "../etc/passwd" but also rejecting valid local paths due
to string comparison issues between "./tests/fixtures" and
"tests/fixtures/robocodec_test_14.mcap".

This fix uses Path::starts_with with normalized paths, ensuring
that paths like "./tests" and "tests" are treated as equivalent
during validation while still blocking actual traversal attempts.
Add JPEG/PNG decoding capability to the streaming dataset converter,
enabling automatic decompression of CompressedImage messages during
bag file processing.

Key changes:
- Add image module with Clean Architecture (backend trait, factory, format detection)
- Integrate ImageDecoderFactory into FrameAlignmentBuffer
- Add decoder_config option to StreamingConfig
- Update memory estimation to account for decoded RGB images
- Support CPU (image crate), Apple hardware acceleration, and GPU (nvJPEG) backends
- Format detection from magic bytes with fallback to ROS format strings

The decoder is configured via StreamingConfig::with_decoder_config() and
automatically decodes compressed images when available, storing RGB data
in the output dataset for faster training data loading.
When processing cloud storage inputs (s3:// or oss:// URLs), the converter
was passing the full URL as the object key instead of extracting just the
key portion. This caused "object not found" errors because storage backends
expect the key (e.g., "file.bag") not the full URL (e.g., "s3://bucket/file.bag").

Changes:
- Add extract_cloud_key() helper to parse s3://bucket/key and oss://bucket/key URLs
- Add create_cloud_storage() method to create OssStorage from cloud URLs
- Modify convert() to detect cloud URLs and extract keys for storage operations
- Enable HTTP support for MinIO endpoints in factory.rs

This fix enables distributed workers to properly download and convert bag
files stored in S3-compatible object storage (e.g., MinIO).
This commit makes the LeRobot conversion pipeline more flexible and fixes
several bugs that prevented proper video generation:

- Fix: Enable image-decode feature by default for JPEG/PNG decoding
- Fix: Handle Array<UInt8> format for encoded images in MCAP messages
- Fix: Use ShutdownHandler's broadcast channel for graceful worker shutdown
- Refactor: Remove hardcoded "observation.images" prefix assumptions
  - camera_mappings(), state_mappings(), action_mappings() now filter by MappingType
  - camera_key() returns full feature path instead of deriving from prefix
  - Video directories, metadata, and upload paths use full feature paths
- Fix: Reduce verbose debug output (log data size instead of raw bytes)
- Style: Apply clippy fixes (needless borrows, collapsible if, map_clone)
@zhexuany zhexuany changed the title fix: adopt robocodec v0.3 breaking API changes feat: implement streaming image decoding and config-driven feature mappings Feb 4, 2026
Fix multiple issues identified during comprehensive code review:

**Critical fixes:**
- Add mutex poison recovery in worker.rs checkpoint callbacks
- Fix RGB8 format dimension calculation (removed incorrect sqrt() assumption)
- Consolidate duplicate DecodedImage and ImageFormat definitions

**Error handling improvements:**
- Add async storage tests with bucket validation
- Add image decode error path tests (truncated data, invalid headers)
- Include bucket/endpoint in cloud storage error messages

**Code quality:**
- Add DecodedImage getter methods for better encapsulation
- Add DecodedImage::from_rgb8() with dimension validation
- Change GPU decoder fallback from warn! to info! level

**Testing:**
- Add tests for corrupted/truncated JPEG data handling
- Add tests for invalid format error cases
- Add tests for RGB8 dimension requirement
- Add async storage config validation tests
Replace pattern match on is_some() followed by unwrap() with
if let Some() for more idiomatic Rust code. Fixes clippy warning.
Fix clippy needless_range_loop warning by using enumerate()
iterator instead of indexing.
Make jobs internally use the batch system as single-file batches,
providing a unified architecture where all work goes through the
Kubernetes-style declarative batch controller.

Changes:
- Refactor submit.rs to create BatchSpec for single files
- Add dry run support to manifest submission
- Fix atomic batch submission (phase index in transaction)
- Add error logging for fallback summary retrieval
- Add warning for missing user identity env vars
- Add logging for deserialization failures
- Remove unused variables and imports

Fixes issues found in PR review:
- CRITICAL: Non-atomic transaction causing orphaned batches
- CRITICAL: Missing dry run support in manifest submission
- IMPORTANT: Silent fallback without error logging
- IMPORTANT: Inconsistent error handling between submit paths
This commit adds comprehensive test coverage for the roboflow
distributed system as part of Phase 2 (Testing Infrastructure):

- Add storage layer tests (storage_tests.rs)
  - Local storage read/write operations
  - Path handling and edge cases
  - StorageError conversion and display

- Add dataset writer error tests (dataset_writer_error_tests.rs)
  - Invalid configuration handling
  - Dimension mismatch handling
  - I/O error handling
  - State validation errors
  - Empty/incomplete data handling

- Add upload coordinator tests (upload_coordinator_tests.rs)
  - Worker thread spawning and management
  - Queue management and bounded channel behavior
  - Statistics collection and reporting
  - Checkpoint tracking for completed uploads
  - Graceful shutdown and cleanup
  - Retry logic with exponential backoff

- Expand worker integration tests (worker_integration_tests.rs)
  - JobRecord state transitions
  - WorkerMetrics atomic operations
  - ConfigRecord validation
  - LockRecord grace period expiration
  - HeartbeatRecord liveness tracking
  - CheckpointState progress calculation
  - ShutdownHandler graceful termination

Phase 2.3: Worker Integration Tests (40 hours)
Total Phase 2: ~137 new tests
Split the 1572-line writer.rs into focused modules:
- mod.rs: Main writer struct and orchestration (758 lines)
- encoding.rs: Video encoding logic (336 lines)
- parquet.rs: Parquet file writing (206 lines)
- upload.rs: Cloud storage uploads (153 lines)
- stats.rs: Episode statistics (48 lines)
- frame.rs: LerobotFrame data structure (35 lines)

All 29 lerobot lib tests and 11 integration tests pass.
The tests were only adding images but no frames with state/action data.
Updated tests now create proper AlignedFrame objects with:
- observation.state (joint positions)
- action (target positions)
- images

This ensures frames are written to Parquet and counted correctly.
- test_decoded_image_validation: Use from_rgb8() which returns Result
  instead of new() which panics in debug mode
- test_with_custom_alignment: Remove validate_alignment() assertion
  since standard Vec allocator doesn't guarantee custom alignment
This commit completes Phase 4 of the technical debt remediation plan:
- Simplify DatasetWriter trait architecture
- Implement type-safe builder pattern for writers
- Remove unnecessary dead code suppressions

DatasetWriter trait changes:
- Remove initialize(&mut self, config: &dyn Any) method
- Change finalize(&mut self) to take no parameters
- Configuration now provided via builder pattern

Writer implementation changes:
- Add LerobotWriterBuilder with fluent API
- Add ParquetWriterBuilder for KPS writers
- new_local() and new() now create fully-initialized writers
- Deprecate initialize_with_config() as a no-op

Dead code removal:
- Reduce #[allow(dead_code)] occurrences by 64% (45→16)
- Prefix truly unused struct fields with underscore
- Remove suppressions for actively used functions

All tests pass (201 tests total).
- Remove 12 dead code items (unused functions, methods, structs)
- Fix 11 unwrap() calls with proper error context
- Add safety documentation to unsafe blocks
- Reduce lint suppressions from 35 to 14 (-60%)
- Resolve TODO comments with proper documentation
- Improve mutex poisoning recovery in cached storage
- Fix type annotations in image decoder factory

Debt score reduced from 620 to 120. All changes verified with cargo clippy.
The scanner was incorrectly extracting paths from S3/OSS URLs by
looking for the "second slash" which found the second / of ://
instead of the slash after the bucket name.

For s3://bucket/file.mcap, it was extracting bucket/file.mcap
instead of file.mcap, causing S3 to look for keys with duplicated
bucket names that didn't exist.

Fixed by finding :// first, then locating the slash after the bucket
name. Also added comprehensive test coverage for URL path extraction.
The scanner's create_job function was passing only the filename
(metadata.path) instead of the full S3 URL to JobRecord. This caused
workers to look for local files instead of downloading from S3.

Fixed by passing the source_url parameter to create_job and using it
when constructing the JobRecord.
- Pass full S3/OSS URLs to converter instead of stripping prefix
- Add comprehensive tracing logs for job polling, claiming, and processing
- Improve batch controller logging for WorkUnit creation and URL discovery
- Fix code formatting in CLI commands
Update robocodec dependency and add match arm for the new RRD
format variant in the hyper pipeline orchestrator.
- Add named constant DEFAULT_ACTION_DIMENSION for magic number 14
  in parquet writer (common for dual-arm robotics setups)
- Document reader stage limitation: chunks_sender unused because
  decoded() API doesn't provide raw bytes for MessageChunkData
- Add TODO comments for future implementation
- Use ParallelMcapReader and ParallelBagReader directly to access read_parallel()
- No longer uses decoded() which doesn't provide raw bytes for chunk construction
- Properly sends MessageChunkData to compression stage via channel
- Removes TODO comments - implementation complete
Gracefully handle the case where ffmpeg is not available by checking
the error message and still asserting frame_count() works correctly.
- Describe distributed data transformation pipeline (not just format converter)
- Update workspace structure with all 6 crates
- Add architecture diagram with TiKV, Scanner, Worker components
- Document CLI commands: submit, worker, scanner, jobs
- Add environment variables table
- Update Rust version requirement to 1.80+
- Link to robocodec as I/O dependency
CRITICAL:
- Fix Finalizer checking BatchPhase::Processing (non-existent) → BatchPhase::Running
- Add ZombieReaper to unified deployment (was defined but never started)
- Add BatchController.run() background task to unified deployment

HIGH:
- Remove Scanner component from Role enum and help text (per design decision)
- Remove scanner-related integration tests
- Fix duplicate Finalizer constructors (removed with_defaults)

Error handling improvements:
- Add error logging to all spawned tasks in run_unified
- Fix Ctrl+C handler to handle errors gracefully instead of expect()
- Log when mark_batch_failed fails (batch stuck in Running state)
- Log when get_batch_spec fails or returns None
- Add warning logs for invalid FinalizerConfig env var values

Other fixes:
- Remove unused cancel token from run_worker (Worker has own shutdown)
- Fix robocodec API usage: McapReader → RoboReader in camera_params.rs
…ture

This change completes the migration from the legacy JobRecord-based system
to a unified WorkUnit-based batch processing architecture.

## Removed

- JobRecord and JobStatus from schema.rs
- Job-related TikvClient methods (get_job, put_job, claim_job, etc.)
- Worker's WorkItem enum and job processing logic
- JobKeys and related key structures
- JobsCommand enum variants from deprecated CLI command

## Updated

- Worker: now only processes WorkUnits directly
- Reaper: reclaims orphaned WorkUnits instead of Jobs
- Scanner: creates WorkUnits with proper BatchPhase::Running
- Finalizer: checks for BatchPhase::Running (was Processing)
- All tests to use WorkUnit-based patterns
- Coordinator trait to remove job-specific methods

## Impact

- Simplified data model: single WorkUnit abstraction
- Reduced code: ~2200 lines removed
- Consistent terminology throughout codebase
Simplify worker configuration by removing redundant storage_prefix.
The output path is now correctly read from the Batch/WorkUnit created
during submit, eliminating the need for separate worker-side storage configuration.

## Changes
- Remove storage_prefix from WorkerConfig and Worker
- Remove with_storage_prefix() builder method
- Update build_output_path() to use unit.output_path from Batch
- Keep output_prefix as fallback only for backward compatibility
- Remove WORKER_STORAGE_PREFIX from CLI documentation
- Update tests to remove storage_prefix references
- Add distributed feature to default features in main crate

## Impact
- Submit command now fully controls input/output paths
- Worker configuration is simpler - no redundant storage_prefix
- distributed feature is enabled by default
The distributed feature flag was confusing because:
1. TiKV dependencies are always included (not feature-gated)
2. The system is designed to be distributed by default
3. The feature only gated test modules, not core functionality

Changes:
- Remove `distributed = []` feature from Cargo.toml files
- Remove all `#[cfg(feature = "distributed")]` guards
- Remove all `#[cfg(not(feature = "distributed"))]` stub methods
- Update documentation to remove `--features distributed` references

The distributed coordination is now always enabled.
Phase 1 consolidation: remove leader election from merge path by using
compare-and-swap on batch status instead of distributed locks.

Changes:
- Add Merging phase to BatchPhase state machine
- Remove LockManager dependency from MergeCoordinator
- Replace lock-based try_claim_merge() with CAS pattern (Running → Merging)
- Add NotFound variant to MergeResult enum
- Remove unused legacy wrapper methods

This makes the merge path stateless and idempotent - any instance can
attempt to claim merge, only the first to transition the batch status wins.
Remove LockManager reference from Key Patterns table and replace with
BatchPhase state machine documentation to reflect Phase 1 changes.
The Swatinem/rust-cache doesn't properly invalidate when git dependencies
are updated. This causes CI to use a stale cached version of robocodec
that doesn't have the modules (mcap, bag, types) that the code expects.

Add cache-key based on Cargo.lock hash to force cache invalidation
when robocodec or any dependency changes.
Remove Cargo.lock from .gitignore and commit it to ensure consistent
dependency versions across all environments (local, CI, contributors).

Root cause: CI was using a stale cached Cargo.lock with an old robocodec
commit (dbc2cde) while local had the latest (3c679b4e), causing import
errors for modules that didn't exist in the old version.
lld linker crashes in CI environment when linking large test binaries
with 'Bus error'. Use system linker (gcc/cc) which is more stable.

This only affects Linux x86_64 builds (the CI environment), not macOS
or other platforms.
Use eprintln! to print help text directly instead of embedding
in error string, which was causing \n escape sequences to
display literally instead of as actual newlines.
Add get_run_help() function and handle --help/-h flags in run
command parsing to display help instead of starting the service.
Remove PyO3 bindings and all Python-related code to focus on Rust-only
implementation. This includes:

- Remove python/, examples/python/, and src/python/ directories
- Remove PyO3 dependency and python/extension-module features
- Remove pyproject.toml packaging config
- Update CI/CD workflows to remove Python testing and PyPI publishing
- Update Makefile to remove Python targets
- Update CLAUDE.md documentation

The project now focuses exclusively on Rust implementation.
- Fix ambiguous AsMut trait calls in buffer_pool tests using turbofish syntax
- Change expect(dead_code) to allow(dead_code) for record_state_dimension
  (method is used in tests but appears dead during normal compilation)
@zhexuany zhexuany force-pushed the feat/test-bag-conversion branch from a2f0289 to ed0cc4f Compare February 6, 2026 16:29
Remove all Python binding references from documentation files now that
Python support has been removed from the codebase:

- docs/README.md: Remove python/ from directory structure
- examples/rust/README.md: Remove link to non-existent Python examples
- CONTRIBUTING.md: Remove Python setup instructions and feature flag
- CONTRIBUTING_zh.md: Remove Python references from Chinese translation
- docs/ARCHITECTURE.md: Remove Python bindings section and API examples
- .gitignore: Remove all Python-related ignore patterns

These files now reflect the Rust-only implementation.
The standard Vec allocator doesn't guarantee page alignment.
This test was asserting that validate_alignment() always returns true,
but that's only true when the allocator happens to return aligned memory.
Now the test accepts both aligned and non-aligned outcomes.
@zhexuany zhexuany merged commit abb6ef9 into main Feb 7, 2026
6 checks passed
@zhexuany zhexuany deleted the feat/test-bag-conversion branch February 7, 2026 01:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant