-
-
Notifications
You must be signed in to change notification settings - Fork 153
chore: update metrics collection #1441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: update metrics collection #1441
Conversation
remove at the async writer add only when put_part() is called and is successful
WalkthroughRemoved provider-specific labels from object-store metrics, added bytes-scanned counters, and made multipart uploads fail-fast with per-part post-success metric increments across storage backends and the metrics registry. Public metric helper signatures changed accordingly. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Service as AppStorage
participant Provider as ObjectStore
note over Service,Provider: Multipart upload now fails fast and logs per-part metrics after success
Client->>Service: upload_multipart(parts)
Service->>Provider: initiate_multipart()
Provider-->>Service: upload_id
loop for each part
Service->>Provider: put_part(upload_id, partN)
alt part fails
Provider-->>Service: Error
Service-->>Client: Return Error (early) %% early return on part failure
else part succeeds
Provider-->>Service: OK
rect rgb(220,240,255)
Service->>Service: increment_object_store_calls_by_date("PUT_MULTIPART", date) %% post-success metric
end
end
end
Service->>Provider: complete_multipart(upload_id)
Provider-->>Service: OK
Service-->>Client: Success
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
src/storage/gcs.rs (2)
384-386
: Use?
/if let
instead ofis_err()
+unwrap()
This is unnecessarily verbose and risks accidental
unwrap()
. Propagate the error directly.- let result = async_writer.put_part(part_data.into()).await; - if result.is_err() { - return Err(result.err().unwrap().into()); - } + async_writer + .put_part(part_data.into()) + .await + .map_err(Into::into)?;
325-332
: Defer multipart session creation to the large-file branchCreating the multipart writer before the size check can start/allocate a session that’s never used for small files. Initialize it only when needed.
- let async_writer = self.client.put_multipart(location).await; - let mut async_writer = match async_writer { - Ok(writer) => writer, - Err(err) => { - return Err(err.into()); - } - }; + // Create multipart writer only for large files (see below)Then inside the
else
branch, at its beginning:} else { + let mut async_writer = self + .client + .put_multipart(location) + .await + .map_err(Into::into)?; let mut data = Vec::new(); file.read_to_end(&mut data).await?;src/storage/azure_blob.rs (2)
442-444
: Use?
/if let
instead ofis_err()
+unwrap()
Avoid manual error probing; bubble it up cleanly.
- let result = async_writer.put_part(part_data.into()).await; - if result.is_err() { - return Err(result.err().unwrap().into()); - } + async_writer + .put_part(part_data.into()) + .await + .map_err(Into::into)?;
381-388
: Postponeput_multipart
until confirmed large uploadStarting a multipart upload before checking file size can create an unused session for small files.
- let async_writer = self.client.put_multipart(location).await; - let mut async_writer = match async_writer { - Ok(writer) => writer, - Err(err) => { - return Err(err.into()); - } - }; + // Defer creating multipart writer until we know it's a large uploadInside the
else
branch:} else { + let mut async_writer = self + .client + .put_multipart(location) + .await + .map_err(Into::into)?; let mut data = Vec::new(); file.read_to_end(&mut data).await?;src/storage/s3.rs (2)
549-551
: Prefer?
over manual error check +unwrap()
Cleaner and safer error propagation.
- // Track individual part upload - let result = async_writer.put_part(part_data.into()).await; - if result.is_err() { - return Err(result.err().unwrap().into()); - } + // Track individual part upload + async_writer + .put_part(part_data.into()) + .await + .map_err(Into::into)?;
489-496
: Initialize multipart writer only for large filesAvoid creating a multipart upload session when the file will be uploaded via single PUT.
- let async_writer = self.client.put_multipart(location).await; - let mut async_writer = match async_writer { - Ok(writer) => writer, - Err(err) => { - return Err(err.into()); - } - }; + // Defer multipart writer creation; see large-file branch belowInside the
else
branch:} else { + let mut async_writer = self + .client + .put_multipart(location) + .await + .map_err(Into::into)?; let mut data = Vec::new(); file.read_to_end(&mut data).await?;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/storage/azure_blob.rs
(1 hunks)src/storage/gcs.rs
(1 hunks)src/storage/s3.rs
(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/storage/azure_blob.rs
src/storage/gcs.rs
src/storage/s3.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Applied to files:
src/storage/azure_blob.rs
src/storage/gcs.rs
src/storage/s3.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
74f6703
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/metrics/mod.rs
(5 hunks)src/storage/azure_blob.rs
(20 hunks)src/storage/gcs.rs
(21 hunks)src/storage/localfs.rs
(14 hunks)src/storage/s3.rs
(21 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/storage/s3.rs
🧰 Additional context used
🧠 Learnings (7)
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Applied to files:
src/storage/azure_blob.rs
src/storage/localfs.rs
src/metrics/mod.rs
src/storage/gcs.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.
Applied to files:
src/storage/azure_blob.rs
src/storage/localfs.rs
src/metrics/mod.rs
src/storage/gcs.rs
📚 Learning: 2025-09-23T10:08:36.368Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/storage/localfs.rs:41-43
Timestamp: 2025-09-23T10:08:36.368Z
Learning: In src/storage/localfs.rs, the metrics use "localfs" as the provider label (not "drive" from FSConfig.name()) because it aligns with the object storage implementation naming for the file system. This is intentional and should not be flagged as an inconsistency.
Applied to files:
src/storage/azure_blob.rs
src/storage/localfs.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Applied to files:
src/storage/azure_blob.rs
src/storage/gcs.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/storage/azure_blob.rs
src/storage/localfs.rs
📚 Learning: 2025-09-11T06:35:24.721Z
Learnt from: parmesant
PR: parseablehq/parseable#1424
File: src/storage/azure_blob.rs:736-742
Timestamp: 2025-09-11T06:35:24.721Z
Learning: The Azure Blob Storage client's `list_with_delimiter()` method handles Azure-specific implementation details internally, including proper root listing behavior and path normalization, so manual prefix handling is not needed when delegating to this method.
Applied to files:
src/storage/azure_blob.rs
📚 Learning: 2025-08-21T14:41:55.462Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1410
File: src/storage/object_storage.rs:876-916
Timestamp: 2025-08-21T14:41:55.462Z
Learning: In Parseable's object storage system (src/storage/object_storage.rs), date directories (date=YYYY-MM-DD) are only created when there's actual data to store, which means they will always contain corresponding hour and minute subdirectories. There can be no case where a date directory exists without hour or minute subdirectories.
Applied to files:
src/storage/localfs.rs
🧬 Code graph analysis (4)
src/storage/azure_blob.rs (1)
src/metrics/mod.rs (3)
increment_bytes_scanned_in_object_store_calls_by_date
(551-555)increment_files_scanned_in_object_store_calls_by_date
(545-549)increment_object_store_calls_by_date
(539-543)
src/storage/localfs.rs (1)
src/metrics/mod.rs (1)
increment_object_store_calls_by_date
(539-543)
src/metrics/mod.rs (1)
src/storage/metrics_layer.rs (1)
new
(71-76)
src/storage/gcs.rs (1)
src/metrics/mod.rs (3)
increment_bytes_scanned_in_object_store_calls_by_date
(551-555)increment_files_scanned_in_object_store_calls_by_date
(545-549)increment_object_store_calls_by_date
(539-543)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (14)
src/storage/localfs.rs (14)
171-171
: Align metric call with providerless counter.This change lines up LocalFS with the updated
increment_object_store_calls_by_date(method, date)
signature and still scopes the GET increment to successful reads.
224-224
: LIST metric call updated correctly.Confirmed that the ingestor metadata listing now emits the provider-agnostic LIST metric only on success, matching the refactored metrics API.
294-294
: LIST aggregation remains consistent.The post-scan LIST increment now uses the new two-argument helper and continues to fire only once per directory traversal.
318-318
: PUT metric scoped to successful writes.Good to see the PUT counter updated to the providerless form while still gated on a successful
fs::write
.
329-329
: DELETE prefix metric retains success guard.The delete-prefix path now calls the provider-agnostic DELETE metric and skips emission on errors as before.
346-346
: DELETE object call uses new signature.The single-file delete continues to double-count file/operation metrics correctly with the revised helper.
356-356
: HEAD/check path matches metrics refactor.
check()
now reports via the updated HEAD metric signature, preserving the previous success-only behavior.
367-367
: Stream deletion metric keeps parity.The stream cleanup still increments DELETE only on success, now through the provider-free API.
378-378
: Node meta deletion aligned.
try_delete_node_meta
now emits the updated DELETE metric variant while retaining its existing error handling.
396-396
: Stream listing remains tracked.
list_streams
switches to the method/date-only counter, maintaining the same call timing as before.
428-428
: Old stream listing metric refresh.The old-streams path follows the same providerless LIST metric approach, keeping metrics consistent across both list variants.
453-453
: Directory listing metric signature updated.
list_dirs
now records via the modern LIST helper without altering the control flow.
513-513
: Date listing metric conforms to new API.The date enumeration now hits the two-parameter LIST counter, preserving the success-only emission.
580-580
: PUT metric on upload_file modernized.
upload_file
reports PUT via the updated helper and remains gated on successful copies, matching the broader metrics refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
src/storage/s3.rs (1)
634-634
: Ensure LIST calls are counted even on errors in get_objects.Increment the LIST "calls" counter when the stream is created, not only at the end; otherwise failed streams return early and the call is never counted.
Apply this diff:
let mut list_stream = self.client.list(Some(&prefix)); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); @@ - increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string());Also applies to: 669-669
src/storage/azure_blob.rs (2)
489-489
: Count LIST calls in get_objects even when the stream errors.Apply this diff:
let mut list_stream = self.client.list(Some(&prefix)); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); @@ - increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string());Also applies to: 525-525
643-651
: Record LIST call before?
in list_old_streams so failures are counted.Apply this diff:
- let resp = self.client.list_with_delimiter(None).await?; + let resp = self.client.list_with_delimiter(None).await; + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); + let resp = resp?; @@ - increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string());src/storage/gcs.rs (1)
470-470
: Count LIST calls in get_objects even on stream errors.Apply this diff:
let mut list_stream = self.client.list(Some(&prefix)); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); @@ - increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string());Also applies to: 506-506
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/metrics/mod.rs
(4 hunks)src/storage/azure_blob.rs
(20 hunks)src/storage/gcs.rs
(21 hunks)src/storage/s3.rs
(21 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/metrics/mod.rs
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-08-25T01:31:41.786Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metadata.rs:63-68
Timestamp: 2025-08-25T01:31:41.786Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metadata.rs and src/storage/object_storage.rs are designed to track total events across all streams, not per-stream. They use labels [origin, parsed_date] to aggregate by format and date, while per-stream metrics use [stream_name, origin, parsed_date] labels.
Applied to files:
src/storage/gcs.rs
src/storage/s3.rs
src/storage/azure_blob.rs
📚 Learning: 2025-08-25T01:32:25.980Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/metrics/mod.rs:163-173
Timestamp: 2025-08-25T01:32:25.980Z
Learning: The TOTAL_EVENTS_INGESTED_DATE, TOTAL_EVENTS_INGESTED_SIZE_DATE, and TOTAL_EVENTS_STORAGE_SIZE_DATE metrics in src/metrics/mod.rs are intentionally designed to track global totals across all streams for a given date, using labels ["format", "date"] rather than per-stream labels. This is the correct design for global aggregation purposes.
Applied to files:
src/storage/gcs.rs
src/storage/s3.rs
src/storage/azure_blob.rs
📚 Learning: 2025-08-18T14:56:18.463Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/storage/object_storage.rs:997-1040
Timestamp: 2025-08-18T14:56:18.463Z
Learning: In Parseable's staging upload system (src/storage/object_storage.rs), failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately. Early return on first error in collect_upload_results is correct behavior as concurrent tasks handle their own cleanup and failed files need to stay for retry.
Applied to files:
src/storage/gcs.rs
src/storage/s3.rs
src/storage/azure_blob.rs
📚 Learning: 2025-09-18T09:52:07.554Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1415
File: src/storage/object_storage.rs:173-177
Timestamp: 2025-09-18T09:52:07.554Z
Learning: In Parseable's upload system (src/storage/object_storage.rs), the update_storage_metrics function can safely use path.metadata().map_err() to fail on local file metadata read failures because parquet validation (validate_uploaded_parquet_file) ensures file integrity before this step, and the system guarantees local staging files remain accessible throughout the upload flow.
Applied to files:
src/storage/s3.rs
src/storage/azure_blob.rs
📚 Learning: 2025-09-11T06:35:24.721Z
Learnt from: parmesant
PR: parseablehq/parseable#1424
File: src/storage/azure_blob.rs:736-742
Timestamp: 2025-09-11T06:35:24.721Z
Learning: The Azure Blob Storage client's `list_with_delimiter()` method handles Azure-specific implementation details internally, including proper root listing behavior and path normalization, so manual prefix handling is not needed when delegating to this method.
Applied to files:
src/storage/s3.rs
src/storage/azure_blob.rs
🧬 Code graph analysis (3)
src/storage/gcs.rs (1)
src/metrics/mod.rs (3)
increment_bytes_scanned_in_object_store_calls_by_date
(551-555)increment_files_scanned_in_object_store_calls_by_date
(545-549)increment_object_store_calls_by_date
(539-543)
src/storage/s3.rs (1)
src/metrics/mod.rs (3)
increment_bytes_scanned_in_object_store_calls_by_date
(551-555)increment_files_scanned_in_object_store_calls_by_date
(545-549)increment_object_store_calls_by_date
(539-543)
src/storage/azure_blob.rs (1)
src/metrics/mod.rs (3)
increment_bytes_scanned_in_object_store_calls_by_date
(551-555)increment_files_scanned_in_object_store_calls_by_date
(545-549)increment_object_store_calls_by_date
(539-543)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: coverage
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (3)
src/storage/s3.rs (1)
343-359
: Bytes-scanned metric placement looks correct.Incrementing bytes after a successful body read aligns with “GET bytes scanned” objective.
Please confirm downstream dashboards expect “bytes scanned” only on successful GETs (not attempts).
src/storage/azure_blob.rs (1)
510-517
: LGTM: Duplicate GET metrics avoided in get_objects.
_get_object
owns GET metrics; no extra increments here. Good.src/storage/gcs.rs (1)
191-196
: Bytes-scanned metric usage is correct.Emitting bytes after successful GET aligns with the new metric semantics.
My previous suggestion was incorrect. The current code correctly captures metrics only on successful operations by placing them after the |
remove at the async writer
add only when put_part() is called and is successful
remove provider label from metrics
add metrics for bytes scanned (GET calls)
Summary by CodeRabbit
Bug Fixes
Reliability
New Features
Chores