refactor(mcap): integrate mcap crate for S3 streaming#53
Merged
Conversation
Integrate Foxglove's mcap crate for MCAP parsing while keeping robocodec's custom Rayon-based parallel decompression for 6-8x performance on local files. Changes: - Add McapS3Adapter wrapping mcap::LinearReader for S3 streaming - Replace StreamingMcapParser with McapS3Adapter in S3Reader - Deprecate StreamingMcapParser (use McapS3Adapter instead) - Make s3_adapter crate-private (not part of public API) - Fix silent failures: add logging for MCAP parse errors This reduces custom MCAP parsing code from ~1600 lines to ~460 lines while maintaining parallel decompression performance.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
Greptile OverviewGreptile SummaryRefactored MCAP S3 streaming to use Foxglove's Key Changes:
Critical Issue Found:
Positive Improvements:
|
| Filename | Overview |
|---|---|
| src/io/formats/mcap/s3_adapter.rs | New adapter integrating mcap::LinearReader for S3 streaming, but contains critical buffer bounds bug in parse_message() (line 276-290) |
| src/io/s3/reader.rs | Replaces StreamingMcapParser with McapS3Adapter, adds proper error logging for silent failures, refactors parsing logic with cleaner structure |
| src/io/formats/mcap/stream.rs | Adds deprecation warning for StreamingMcapParser directing users to McapS3Adapter |
| src/io/formats/mcap/mod.rs | Adds crate-private s3_adapter module and updates documentation to reflect hybrid architecture |
Sequence Diagram
sequenceDiagram
participant S3 as S3 Storage
participant Reader as S3Reader
participant Adapter as McapS3Adapter
participant LinearReader as mcap::LinearReader
participant Stream as S3MessageStream
Note over Reader: Initialization Phase
Reader->>S3: fetch_range(footer)
S3-->>Reader: footer data
Reader->>Reader: parse_mcap_footer()
alt Has Summary Section
Reader->>S3: fetch_range(summary)
S3-->>Reader: summary data
Reader->>Reader: scan_summary_for_channels()
else No Summary
Reader->>S3: fetch_range(0, 10MB)
S3-->>Reader: initial data
Reader->>Adapter: new()
Adapter->>LinearReader: new()
Reader->>Adapter: process_chunk(data)
Adapter->>LinearReader: insert() + notify_read()
loop Process Events
LinearReader-->>Adapter: Record{opcode, data}
alt Schema Record
Adapter->>Adapter: parse_schema()
else Channel Record
Adapter->>Adapter: parse_channel()
end
end
Adapter-->>Reader: channels discovered
end
Note over Stream: Streaming Phase
Stream->>Adapter: new()
loop While not EOF
Stream->>S3: fetch_range(position, chunk_size)
S3-->>Stream: chunk data
Stream->>Adapter: process_chunk(chunk_data)
Adapter->>LinearReader: insert() + notify_read()
loop Process Events
LinearReader-->>Adapter: Record{opcode, data}
alt Message Record
Adapter->>Adapter: parse_message()
Adapter-->>Stream: MessageRecord
end
end
Stream-->>Stream: pending_messages.push()
Stream-->>Reader: (ChannelInfo, data)
end
src/io/formats/mcap/s3_adapter.rs
Outdated
Comment on lines
276
to
290
| const MESSAGE_HEADER_LEN: usize = 20; | ||
|
|
||
| if body.len() < MESSAGE_HEADER_LEN { | ||
| return Err(FatalError::invalid_format( | ||
| "MCAP Message record", | ||
| body[..body.len().min(10)].to_vec(), | ||
| )); | ||
| } | ||
|
|
||
| let channel_id = u16::from_le_bytes(body[0..2].try_into().unwrap()); | ||
| let sequence = u64::from_le_bytes(body[2..10].try_into().unwrap()); | ||
| let log_time = u64::from_le_bytes(body[10..18].try_into().unwrap()); | ||
| let publish_time = u64::from_le_bytes(body[18..26].try_into().unwrap()); | ||
|
|
||
| let data = body[20..].to_vec(); |
There was a problem hiding this comment.
MESSAGE_HEADER_LEN is 20 but line 288 accesses body[18..26] which requires 26 bytes minimum, and line 290 should use body[26..] not body[20..] for data
Suggested change
| const MESSAGE_HEADER_LEN: usize = 20; | |
| if body.len() < MESSAGE_HEADER_LEN { | |
| return Err(FatalError::invalid_format( | |
| "MCAP Message record", | |
| body[..body.len().min(10)].to_vec(), | |
| )); | |
| } | |
| let channel_id = u16::from_le_bytes(body[0..2].try_into().unwrap()); | |
| let sequence = u64::from_le_bytes(body[2..10].try_into().unwrap()); | |
| let log_time = u64::from_le_bytes(body[10..18].try_into().unwrap()); | |
| let publish_time = u64::from_le_bytes(body[18..26].try_into().unwrap()); | |
| let data = body[20..].to_vec(); | |
| const MESSAGE_HEADER_LEN: usize = 26; | |
| if body.len() < MESSAGE_HEADER_LEN { | |
| return Err(FatalError::invalid_format( | |
| "MCAP Message record", | |
| body[..body.len().min(10)].to_vec(), | |
| )); | |
| } | |
| let channel_id = u16::from_le_bytes(body[0..2].try_into().unwrap()); | |
| let sequence = u64::from_le_bytes(body[2..10].try_into().unwrap()); | |
| let log_time = u64::from_le_bytes(body[10..18].try_into().unwrap()); | |
| let publish_time = u64::from_le_bytes(body[18..26].try_into().unwrap()); | |
| let data = body[26..].to_vec(); |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/io/formats/mcap/s3_adapter.rs
Line: 276:290
Comment:
`MESSAGE_HEADER_LEN` is 20 but line 288 accesses `body[18..26]` which requires 26 bytes minimum, and line 290 should use `body[26..]` not `body[20..]` for data
```suggestion
const MESSAGE_HEADER_LEN: usize = 26;
if body.len() < MESSAGE_HEADER_LEN {
return Err(FatalError::invalid_format(
"MCAP Message record",
body[..body.len().min(10)].to_vec(),
));
}
let channel_id = u16::from_le_bytes(body[0..2].try_into().unwrap());
let sequence = u64::from_le_bytes(body[2..10].try_into().unwrap());
let log_time = u64::from_le_bytes(body[10..18].try_into().unwrap());
let publish_time = u64::from_le_bytes(body[18..26].try_into().unwrap());
let data = body[26..].to_vec();
```
How can I resolve this? If you propose a fix, please make it concise.Implement unified transport and streaming parser architecture for consistent data access across local files, S3, and HTTP sources. **Transport Layer:** - Add Transport trait with poll-based async I/O - Implement LocalTransport, S3Transport, HttpTransport, MemoryTransport - Add TransportExt with convenience methods (read, seek, read_to_end) **Streaming Parser Unification:** - Consolidate StreamingParser trait in io/streaming/parser.rs - Add Clone + Send bounds to Message associated type - All formats implement StreamingParser (MCAP, BAG, RRD) **Format Readers:** - Add open_from_transport() method to FormatReader trait - Add McapTransportReader for transport-based MCAP reading - Add RrdTransportReader for transport-based RRD reading **RoboReader Enhancements:** - Support s3:// URLs (delegates to S3Transport) - Support http:// and https:// URLs (delegates to HttpTransport) - Unified path/URL parsing with automatic transport selection **Bug Fixes:** - Fix BAG StreamingParser infinite recursion (use fully qualified syntax) - Remove duplicate s3/parser.rs (consolidated to io/streaming/parser.rs) - Fix duplicate #[test] attribute in reader/mod.rs **New Files:** - src/io/streaming/ - Unified streaming parser module - src/io/transport/transport.rs - Core Transport trait - src/io/transport/http/ - HTTP transport implementation - src/io/transport/memory/ - In-memory transport for testing - src/io/formats/mcap/streaming.rs - Unified MCAP streaming parser - src/io/formats/mcap/transport_reader.rs - Transport-based MCAP reader - src/io/formats/mcap/adaptive.rs - Adaptive reader strategy - docs/ - Design documentation All 1905 tests pass.
Add documentation for the new HTTP/HTTPS URL reading feature that was introduced in the transport layer unification. - Add "Read from HTTP/HTTPS" section to README.md - Add "从 HTTP/HTTPS 读取" section to README_zh.md
This adds comprehensive HTTP/HTTPS support for both reading and writing robotics data files. **HTTP Read Authentication:** - Add HttpAuthConfig to ReaderConfig for Bearer token and Basic auth - Support authentication via WriterConfig or URL query parameters - Examples: ?bearer_token=xxx or ?basic_auth=user:pass **HTTP Write Support:** - Add HttpWriter for uploading files via HTTP PUT - Support three upload strategies: - SinglePut: entire file in one request (small files) - ChunkedPut: chunked upload with Range headers (default) - ChunkedEncoding: streaming upload - Add configurable buffer size, chunk size, and retry logic - Support authentication via WriterConfig **New Components:** - HttpWriter in src/io/transport/http/writer.rs - HttpUploadStrategy enum in src/io/transport/http/upload_strategy.rs - HttpAuthConfig in src/io/writer/builder.rs - URL detection for HTTP/HTTPS in RoboWriter::create_with_config **Configuration:** - ReaderConfig::with_http_bearer_token() - ReaderConfig::with_http_basic_auth() - WriterConfig::http_bearer_token() - WriterConfig::http_basic_auth() - WriterConfig::http_upload_chunk_size() - WriterConfig::http_max_retries() **Tests:** - 1950 tests pass (43 new tests added) - Coverage for HTTP auth, upload strategies, retry logic - Error handling and edge cases
…ples This commit addresses the top priority technical debt items identified in the codebase analysis: ## Dead Code Removal - Removed unused `seek_future` field from S3Transport - Removed unused `log_time()` method from ParsedMessage enum - Removed associated test for deleted method - Prefixed unused `_path` parameter with underscore ## Development Examples Fixed - test_decode_debug.rs: Accept path via CLI arg or BAG_PATH env var - test_bag_decode_small.rs: Same portable path handling - test_bag_dump.rs: Same portable path handling - test_decode_trace.rs: Same portable path handling All examples now work on any system without hardcoded paths. ## Clippy Warnings Fixed (16 total) - Collapsible if statements using let-chains - Needless borrow in Context::from_waker - Redundant guard in Poll pattern - std::io::Error::other() simplification - Derivable Default impl with #[default] attribute - Iterator::last on DoubleEndedIterator optimization - Renamed transport/transport.rs to core.rs to avoid module conflict - Explicit auto-deref removal - Added is_empty() method to Transport trait - Simplified map_or expression ## Code Organization - Renamed transport/transport.rs to transport/core.rs for clarity - Updated all imports to use new module path ## Impact - Removed ~360 lines of dead/unused code - All 1949 tests passing - Zero Clippy warnings with strict -D warnings - Examples now portable across different systems Related to: code quality cleanup initiative
This commit completes all remaining technical debt elimination tasks: ## Task 1: Remove Deprecated stream.rs (1636 lines deleted) - Removed deprecated `src/io/formats/mcap/stream.rs` - Updated imports in `src/io/s3/mod.rs` to use `streaming` module - Updated tests to use new `StreamingParser` trait - Added `StreamingMcapParser` type alias for backward compatibility - Total net change: -1636 lines of code ## Task 2: Resolve TODO Comments (4 TODOs) - tests/round_trip_tests.rs:505 - Updated comment to reflect that RrdReader limitation is fundamental to RRF2 format design, not implementation debt - src/io/transport/http/writer.rs:430 - Added comprehensive documentation explaining why ChunkedEncoding falls back to SinglePut - src/io/transport/http/writer.rs:453 - Created GitHub issue #54 for exponential backoff feature and updated TODO to reference it - src/io/formats/bag/parallel.rs:59 - Created GitHub issue #55 for config-driven compression options ## Task 3: Audit and Document All Unsafe Blocks (14 blocks) Added `# Safety` documentation to all unsafe blocks: **Memory Mapping (9 blocks)**: `memmap2::Mmap::map` - src/io/formats/mcap/two_pass.rs - src/io/formats/mcap/parallel.rs (2 blocks) - src/io/formats/mcap/sequential.rs - src/io/formats/mcap/reader.rs (2 blocks) - src/io/formats/rrd/parallel.rs (2 blocks) - src/io/formats/bag/parser.rs **Raw Pointer Access (3 blocks)**: Self-referential futures - src/io/transport/core.rs (3 blocks in ReadFuture, ReadExactFuture) - Fixed doc comment formatting (use // not /// on expressions) **Pin::new_unchecked (1 block)**: Trait object pinning - src/io/formats/mcap/transport_reader.rs **Transmute (1 block)**: Lifetime extension - src/io/formats/bag/sequential.rs ## Impact Summary - **Net lines removed**: 1,713 deleted, 319 added = -1,394 lines - **Deprecated code removed**: 1,636 lines from stream.rs - **All TODOs resolved**: 4 TODOs either implemented or tracked as issues - **All unsafe blocks documented**: 14 blocks with safety explanations - **All 1,893 tests passing** ## Technical Debt Elimination Complete All high and medium priority technical debt items have been addressed: ✅ Dead code and unused variables removed ✅ Development examples fixed (portable paths) ✅ All Clippy warnings resolved ✅ TODO comments resolved or tracked ✅ Deprecated code removed ✅ Unsafe blocks documented Related to: code quality cleanup initiative Issues: #54, #55
## Task 1: Quick Fixes ### 1.1 Doc Link Fixes (2 links) - src/io/transport/http/mod.rs: Fixed [`Transport`] link to use full path - src/io/transport/memory/mod.rs: Fixed [`Transport`] link to use full path ### 1.2 URL Formatting Fixes (2 URLs) - src/io/formats/rrd/constants.rs: Formatted URL as hyperlink - src/io/formats/rrd/arrow_msg.rs: Formatted URL as hyperlink ### 1.3 CLI Test Helper Warnings - tests/cli_tests.rs: Added #[allow(dead_code)] to helper functions Functions are used in conditionally compiled tests module ## Task 2: Refactor Tests to Use Public API Updated test files to use RoboReader/RoboWriter instead of internal types: ### tests/rrd_roundtrip_test.rs - Replaced RrdReader/RrdWriter with RoboReader/RoboWriter - Fixed path handling for RoboReader::open (takes &str, not &String) - All tests pass with public API ### tests/bag_rewriter_tests.rs - Updated to use RoboReader::open for verification - All tests pass with public API ### tests/s3_tests.rs - Updated golden_tests to use RoboReader instead of McapReader/SequentialBagReader - All tests pass with public API ### tests/two_pass_mcap_tests.rs - Added test_public_api_robo_reader() to verify public API works - Original tests remain for internal TwoPassMcapReader implementation ### tests/test_mcap_stream.rs - Added test_public_api_robo_reader_mcap() to verify public API works - Original tests remain for internal StreamingMcapParser implementation ## Verification - All tests passing (1893 tests) - Zero Clippy warnings - Zero doc warnings - Public API contract now tested end-to-end Related to: code quality cleanup initiative
This commit removes dead code and unused abstractions identified through
comprehensive analysis of public API call chains.
## Dead Code Removed (~1,330 lines)
### Deleted Files (5 modules, ~1,200 lines)
1. **src/encoding/registry.rs** (280 lines)
- CodecRegistry struct (unused, only tests)
- CodecProviderFactory trait (unused)
- Codec trait (registry version, unused)
- global_registry() function (unused)
- Entire registry pattern never used in production code
2. **src/schema/descriptor.rs** (133 lines)
- SchemaDescriptor trait (no external consumers)
- FieldInfo struct (only test usage)
3. **src/io/formats/mcap/adaptive.rs** (268 lines)
- AdaptiveMcapReader enum (never re-exported or used)
- ReadStrategy enum (only used within module)
4. **src/io/s3/async_source.rs** (227 lines)
- S3StreamConfig struct (never referenced)
- S3ByteSource struct (never referenced)
### Code Removed from Existing Files (~130 lines)
5. **src/io/traits.rs** - Removed unused traits:
- RawMessageStream trait (no trait bound usage)
- DecodedMessageStream trait (no trait bound usage)
- FormatReaderBuilder trait (no implementations)
- FormatWriterBuilder trait (no implementations)
6. **src/io/transport/mod.rs** - Removed unused sync transport:
- ByteStream trait (superseded by async Transport)
- ByteStreamExt trait (only used by ByteStream)
- ChunkIterator struct (only used by ByteStreamExt)
7. **src/encoding/codec.rs** - Removed MessageCodec trait:
- Only DynCodec is used in production
- No implementations of MessageCodec exist
8. **src/encoding/transform.rs** - Removed TransformResult:
- Only used in tests, not production code
9. **src/encoding/mod.rs** - Updated exports:
- Removed registry module
- Removed MessageCodec, TransformResult re-exports
10. **src/schema/mod.rs** - Updated exports:
- Removed descriptor module
- Removed SchemaDescriptor, FieldInfo re-exports
## Analysis Performed
- Traced all public API call chains (RoboReader, RoboWriter, RoboRewriter)
- Identified code never reached from public API
- Verified each removal against test suite
- Checked for external usage via re-exports
## Impact
- **~1,330 lines of dead code removed**
- Cleaner, more maintainable codebase
- Zero breaking changes to public API
- All 80 tests passing
Related to: code quality cleanup initiative
This commit removes technical debt identified through systematic analysis of each package/module in the codebase. ## Dead Code Removed (~315 lines) ### CRITICAL 1. **src/core/registry.rs** - Removed duplicate `Encoding` enum (144 lines) - The canonical version is in `core/mod.rs` (exported via lib.rs) - Removed duplicate enum definition and tests - Eliminates confusion and maintenance burden ### HIGH 2. **src/io/detection.rs** - Removed dead format detection helpers (76 lines) - Removed `FormatDetector` trait (never used) - Removed `DefaultFormatDetector` struct (never used) - Removed `is_mcap_file()`, `is_bag_file()`, `is_rrd_file()` functions - All were marked with `#[allow(dead_code)]` and never called 3. **src/io/mod.rs** - Removed commented-out code (1 line) - Removed `// pub mod channel_iterator;` comment - Cleaned up artifact from removed module ### MEDIUM 4. **src/io/formats/mcap/writer.rs** - Removed unused code (41 lines) - Removed unused `COMPRESSION_NONE` and `COMPRESSION_LZ4` constants - Removed unused `write_summary_offsets()` function - Removed unused `write_summary_offset_for()` function - Removed unused import `OP_SUMMARY_OFFSET` 5. **src/io/formats/bag/writer.rs** - Fixed incorrect dead code annotation (3 lines) - Removed incorrect `#[allow(dead_code)]` from `path` field - The field is actually used by the `FormatWriter` trait 6. **src/encoding/protobuf/codec.rs** - Removed unused method (26 lines) - Removed `get_descriptor_by_name()` method - Marked with `#[allow(dead_code)]` and never called 7. **src/cli/output.rs** - Removed unused terminal functions (26 lines) - Removed `is_stdout_terminal()` function - Removed `is_stderr_terminal()` function - Removed associated tests for these functions - Removed unused `std::io::IsTerminal` import ## Verification - All 80 unit tests pass - All doctests pass - Zero Clippy warnings (`cargo clippy --all-features -- -D warnings`) - Build succeeds with all features ## Analysis Coverage This cleanup was the result of systematic analysis of: - src/core (9 files) - src/io (47 files) - src/encoding (19 files) - src/schema (14 files) - src/transform (9 files) - src/cli (8 files) - src/types (4 files) - src/rewriter (6 files) Related to: code quality cleanup initiative
The `s3` feature gate was misleading as it also controlled HTTP/HTTPS support. Renamed to `remote` to better reflect its broader scope. Changes: - Renamed feature from `s3` to `remote` in Cargo.toml - Updated all #[cfg(feature = "s3")] to #[cfg(feature = "remote")] - Gated streaming and transport modules appropriately - Updated ARCHITECTURE.md to document remote storage architecture - Added HTTP authentication examples to documentation The `remote` feature now correctly gates: - S3 transport (AWS S3, MinIO, R2) - HTTP/HTTPS transport with authentication - Streaming parser interfaces - Memory transport for testing All 1,856+ tests pass.
The examples/ directory previously contained test-like utilities that: - Used internal APIs (BagFormat, McapFormat directly) - Had hardcoded paths like /tmp/leju_bag.mcap - Were named with test_ prefix, indicating they were tests not examples These have been replaced with proper examples demonstrating the public API (RoboReader, RoboWriter, RoboRewriter): - read_file.rs: Basic file inspection and metadata display - convert_format.rs: Format conversion between MCAP and ROS1 bag - decode_messages.rs: Message decoding with timestamps - README.md: Documentation for all examples The old test utilities were moved to scripts/ and then deleted as they were no longer needed for development.
- Add #[must_use] to CodecError constructor methods - Add #[must_use] to CodecValue type checking and conversion methods - Add #[must_use] to PrimitiveType methods - Add #[must_use] to DecodedMessageResult, ChannelInfo, RawMessage, and FileInfo methods - Add #[must_use] to FileFormat methods - Add #[must_use] to FormatReader::file_info and FormatReader::duration - Add #[must_use] to ParallelReaderConfig and MessageChunkData methods - Add #[must_use] to HttpAuthConfig methods - Add #[must_use] to ReaderConfig and ReaderConfigBuilder builder methods - Add #[must_use] to WriterConfig and WriterConfigBuilder builder methods This addresses clippy::must_use_candidate warnings for core public API types. The changes ensure important return values are not accidentally ignored. Progress: Reduced warnings from 380 to 291 (89 warnings fixed)
- Add #[must_use] to RewriteOptions::with_transforms - Add #[must_use] to RewriteOptions::has_transforms - Add #[must_use] to RewriteStats::new Progress: Total 92 must_use warnings fixed (380 -> 288 remaining) The remaining warnings are primarily in: - Internal format-specific implementations (mcap, bag, rrd) - Transport layer implementations (S3, HTTP) - Transform pipeline implementations - Encoding layer internals These were prioritized lower as they are not part of the core public API.
Reduced clippy warnings from 1,618 to 25 (98.5% reduction). Code Quality: - Added #[must_use] to 300+ public/internal methods - Eliminated ~90 lines of code duplication in DecodedMessageIter - Fixed similar variable names (decoder vs decoded) - Added # Errors sections to 31+ Result-returning functions - Refactored downcasting to trait-based approach (DecodedMessageIterator trait) Testing Infrastructure: - Added Criterion benchmark suite (reader_bench, decoder_bench, rewriter_bench, large_file_bench) - Added 93 property-based tests with proptest - Added fuzzing infrastructure with 5 targets (mcap, bag, rrd, cdr, schema parsers) Architecture: - Separated CLI into robocodec-cli workspace member - Library no longer contains CLI dependencies (clap, indicatif, human-size) - Converted to Cargo workspace with edition 2024 Documentation: - Added comprehensive examples to public API (FormatReader, FormatWriter, TransformBuilder) - Added TECHNICAL_DEBT.md analysis and remediation plan - Added FUZZING.md documentation All 1,856 tests pass.
- Remove all HTTP/HTTPS read/write support (transport layer, auth configs) - Replace .unwrap() with .expect() in production code for better error messages - Move CLI tests from library to CLI crate - Fix writer builder test to use create() instead of build() - Remove unused WriteStrategy::resolve() method
Add example showing how to read directly from S3-compatible storage using RoboReader::open() with s3:// URLs.
- Replace --features cli with --package robocodec-cli - Replace --features s3 with --features remote (the actual feature name) - Fix coverage build to use workspace with remote feature
The fixture_path function now handles both scenarios: - When run from workspace root: uses workspace-root/tests/fixtures - When run from CLI crate: looks in parent directory for fixtures This fixes test_inspect_multiple_formats failing in CI when run with workspace-level coverage commands.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Integrate Foxglove's
mcapcrate for MCAP record parsing while keeping robocodec's custom Rayon-based parallel decompression for 6-8x performance on local files.Type of Change
Related Issues
Related to #52 (enhancement for S3MessageStream metadata)
Changes Made
Add
McapS3Adapter(src/io/formats/mcap/s3_adapter.rs, ~460 lines)mcap::LinearReaderfor event-driven MCAP parsingUpdate
S3Reader(src/io/s3/reader.rs)StreamingMcapParserwithMcapS3AdapterDeprecate
StreamingMcapParser(src/io/formats/mcap/stream.rs)McapS3AdapterUpdate module exports (
src/io/formats/mcap/mod.rs)s3_adaptercrate-privateReduces custom MCAP parsing code from ~1600 lines to ~460 lines.
Testing
Test Commands Used:
cargo test cargo clippy --all-features -- -D warnings cargo fmt --checkChecklist
Additional Notes
Hybrid Architecture
This PR uses a hybrid approach:
mcap::LinearReaderfor robust event-driven parsingSilent Failure Fixes
Three locations where parse errors were silently discarded now log explicitly:
scan_mcap_for_metadata()- logs MCAP parse errors during channel discoveryparse_mcap_header()- logs MCAP parse errors during header parsingS3MessageStream::parse_chunk()- logs MCAP parse errors during streamingAll use
tracing::warn!with context (location, offset, error details).