From 6f0c689e6c1f4615441079ba9b4863a2e0fb72e7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 24 Sep 2025 23:10:22 -0700 Subject: [PATCH 1/3] fix: incorrect metric collection of PUT_MULTIPART remove at the async writer add only when put_part() is called and is successful --- src/storage/azure_blob.rs | 14 +++----------- src/storage/gcs.rs | 12 +++--------- src/storage/s3.rs | 14 +++----------- 3 files changed, 9 insertions(+), 31 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 32b3cb1c5..0d00f39f1 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -378,13 +378,7 @@ impl BlobStore { let mut file = OpenOptions::new().read(true).open(path).await?; let location = &to_object_store_path(key); - // Track multipart initiation let async_writer = self.client.put_multipart(location).await; - increment_object_store_calls_by_date( - "azure_blob", - "PUT_MULTIPART", - &Utc::now().date_naive().to_string(), - ); let mut async_writer = match async_writer { Ok(writer) => writer, Err(err) => { @@ -445,16 +439,14 @@ impl BlobStore { let part_data = data[start_pos..end_pos].to_vec(); let result = async_writer.put_part(part_data.into()).await; + if result.is_err() { + return Err(result.err().unwrap().into()); + } increment_object_store_calls_by_date( "azure_blob", "PUT_MULTIPART", &Utc::now().date_naive().to_string(), ); - if result.is_err() { - return Err(result.err().unwrap().into()); - } - - // upload_parts.push(part_number as u64 + 1); } // Track multipart completion diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 93c7a954f..62e0b82a1 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -322,13 +322,7 @@ impl Gcs { let mut file = OpenOptions::new().read(true).open(path).await?; let location = &to_object_store_path(key); - // Track multipart initiation let async_writer = self.client.put_multipart(location).await; - increment_object_store_calls_by_date( - "gcs", - "PUT_MULTIPART", - &Utc::now().date_naive().to_string(), - ); let mut async_writer = match async_writer { Ok(writer) => writer, Err(err) => { @@ -387,14 +381,14 @@ impl Gcs { // Track individual part upload let result = async_writer.put_part(part_data.into()).await; + if result.is_err() { + return Err(result.err().unwrap().into()); + } increment_object_store_calls_by_date( "gcs", "PUT_MULTIPART", &Utc::now().date_naive().to_string(), ); - if result.is_err() { - return Err(result.err().unwrap().into()); - } } // Track multipart completion diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 853fef8b7..ad72105a2 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -486,13 +486,7 @@ impl S3 { let mut file = OpenOptions::new().read(true).open(path).await?; let location = &to_object_store_path(key); - // Track multipart initiation let async_writer = self.client.put_multipart(location).await; - increment_object_store_calls_by_date( - "s3", - "PUT_MULTIPART", - &Utc::now().date_naive().to_string(), - ); let mut async_writer = match async_writer { Ok(writer) => writer, Err(err) => { @@ -552,16 +546,14 @@ impl S3 { // Track individual part upload let result = async_writer.put_part(part_data.into()).await; + if result.is_err() { + return Err(result.err().unwrap().into()); + } increment_object_store_calls_by_date( "s3", "PUT_MULTIPART", &Utc::now().date_naive().to_string(), ); - if result.is_err() { - return Err(result.err().unwrap().into()); - } - - // upload_parts.push(part_number as u64 + 1); } // Track multipart completion From 74f6703c29d6fa490e1a3944d7c552d4d2cf0391 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 24 Sep 2025 23:41:08 -0700 Subject: [PATCH 2/3] remove provider label from metrics, add metrics for bytes scanned --- src/metrics/mod.rs | 43 ++++++++---- src/storage/azure_blob.rs | 144 ++++++++------------------------------ src/storage/gcs.rs | 83 ++++++++-------------- src/storage/localfs.rs | 91 ++++-------------------- src/storage/s3.rs | 81 +++++++++------------ 5 files changed, 137 insertions(+), 305 deletions(-) diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 56f3d98b9..d15270fb9 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -279,7 +279,7 @@ pub static TOTAL_OBJECT_STORE_CALLS_BY_DATE: Lazy = Lazy::new(|| "Total object store calls by date", ) .namespace(METRICS_NAMESPACE), - &["provider", "method", "date"], + &["method", "date"], ) .expect("metric can be created") }); @@ -292,7 +292,20 @@ pub static TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE: Lazy = + Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_bytes_scanned_in_object_store_calls_by_date", + "Total bytes scanned in object store calls by date", + ) + .namespace(METRICS_NAMESPACE), + &["method", "date"], ) .expect("metric can be created") }); @@ -304,7 +317,7 @@ pub static TOTAL_INPUT_LLM_TOKENS_BY_DATE: Lazy = Lazy::new(|| { "Total input LLM tokens used by date", ) .namespace(METRICS_NAMESPACE), - &["provider", "model", "date"], + &["model", "date"], ) .expect("metric can be created") }); @@ -409,6 +422,11 @@ fn custom_metrics(registry: &Registry) { TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), )) .expect("metric can be registered"); + registry + .register(Box::new( + TOTAL_BYTES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), + )) + .expect("metric can be registered"); registry .register(Box::new(TOTAL_INPUT_LLM_TOKENS_BY_DATE.clone())) .expect("metric can be registered"); @@ -518,23 +536,24 @@ pub fn increment_bytes_scanned_in_query_by_date(bytes: u64, date: &str) { .inc_by(bytes); } -pub fn increment_object_store_calls_by_date(provider: &str, method: &str, date: &str) { +pub fn increment_object_store_calls_by_date(method: &str, date: &str) { TOTAL_OBJECT_STORE_CALLS_BY_DATE - .with_label_values(&[provider, method, date]) + .with_label_values(&[method, date]) .inc(); } -pub fn increment_files_scanned_in_object_store_calls_by_date( - provider: &str, - method: &str, - count: u64, - date: &str, -) { +pub fn increment_files_scanned_in_object_store_calls_by_date(method: &str, count: u64, date: &str) { TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE - .with_label_values(&[provider, method, date]) + .with_label_values(&[method, date]) .inc_by(count); } +pub fn increment_bytes_scanned_in_object_store_calls_by_date(method: &str, bytes: u64, date: &str) { + TOTAL_BYTES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE + .with_label_values(&[method, date]) + .inc_by(bytes); +} + pub fn increment_input_llm_tokens_by_date(provider: &str, model: &str, tokens: u64, date: &str) { TOTAL_INPUT_LLM_TOKENS_BY_DATE .with_label_values(&[provider, model, date]) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 0d00f39f1..4ffa23c21 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -51,7 +51,9 @@ use url::Url; use crate::{ metrics::{ - increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_bytes_scanned_in_object_store_calls_by_date, + increment_files_scanned_in_object_store_calls_by_date, + increment_object_store_calls_by_date, }, parseable::LogStream, }; @@ -212,21 +214,21 @@ pub struct BlobStore { impl BlobStore { async fn _get_object(&self, path: &RelativePath) -> Result { let resp = self.client.get(&to_object_store_path(path)).await; - increment_object_store_calls_by_date( - "azure_blob", - "GET", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); match resp { Ok(resp) => { let body: Bytes = resp.bytes().await?; increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "GET", 1, &Utc::now().date_naive().to_string(), ); + increment_bytes_scanned_in_object_store_calls_by_date( + "GET", + body.len() as u64, + &Utc::now().date_naive().to_string(), + ); Ok(body) } Err(err) => Err(err.into()), @@ -239,15 +241,10 @@ impl BlobStore { resource: PutPayload, ) -> Result<(), ObjectStorageError> { let resp = self.client.put(&to_object_store_path(path), resource).await; - increment_object_store_calls_by_date( - "azure_blob", - "PUT", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); match resp { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "PUT", 1, &Utc::now().date_naive().to_string(), @@ -262,11 +259,7 @@ impl BlobStore { let files_scanned = Arc::new(AtomicU64::new(0)); let files_deleted = Arc::new(AtomicU64::new(0)); let object_stream = self.client.list(Some(&(key.into()))); - increment_object_store_calls_by_date( - "azure_blob", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); object_stream .for_each_concurrent(None, |x| async { @@ -277,7 +270,6 @@ impl BlobStore { files_deleted.fetch_add(1, Ordering::Relaxed); let delete_resp = self.client.delete(&obj.location).await; increment_object_store_calls_by_date( - "azure_blob", "DELETE", &Utc::now().date_naive().to_string(), ); @@ -296,13 +288,11 @@ impl BlobStore { .await; increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "LIST", files_scanned.load(Ordering::Relaxed), &Utc::now().date_naive().to_string(), ); increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "DELETE", files_deleted.load(Ordering::Relaxed), &Utc::now().date_naive().to_string(), @@ -315,11 +305,7 @@ impl BlobStore { .client .list_with_delimiter(Some(&(stream.into()))) .await; - increment_object_store_calls_by_date( - "azure_blob", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let resp = match resp { Ok(resp) => resp, @@ -331,7 +317,6 @@ impl BlobStore { let common_prefixes = resp.common_prefixes; increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "LIST", common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), @@ -351,15 +336,10 @@ impl BlobStore { let bytes = tokio::fs::read(path).await?; let result = self.client.put(&key.into(), bytes.into()).await; - increment_object_store_calls_by_date( - "azure_blob", - "PUT", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); match result { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "PUT", 1, &Utc::now().date_naive().to_string(), @@ -392,16 +372,11 @@ impl BlobStore { let mut data = Vec::new(); file.read_to_end(&mut data).await?; let result = self.client.put(location, data.into()).await; - increment_object_store_calls_by_date( - "azure_blob", - "PUT", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); match result { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "PUT", 1, &Utc::now().date_naive().to_string(), @@ -443,7 +418,6 @@ impl BlobStore { return Err(result.err().unwrap().into()); } increment_object_store_calls_by_date( - "azure_blob", "PUT_MULTIPART", &Utc::now().date_naive().to_string(), ); @@ -485,14 +459,9 @@ impl ObjectStorage for BlobStore { async fn head(&self, path: &RelativePath) -> Result { let result = self.client.head(&to_object_store_path(path)).await; - increment_object_store_calls_by_date( - "azure_blob", - "HEAD", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); if result.is_ok() { increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "HEAD", 1, &Utc::now().date_naive().to_string(), @@ -545,31 +514,26 @@ impl ObjectStorage for BlobStore { ) .await?; increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "GET", 1, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "azure_blob", + increment_bytes_scanned_in_object_store_calls_by_date( "GET", + byts.len() as u64, &Utc::now().date_naive().to_string(), ); + increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); res.push(byts); } // Record total files scanned increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "LIST", files_scanned as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "azure_blob", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); Ok(res) } @@ -580,11 +544,7 @@ impl ObjectStorage for BlobStore { let mut files_scanned = 0; let mut object_stream = self.client.list(Some(&self.root)); - increment_object_store_calls_by_date( - "azure_blob", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); while let Some(meta_result) = object_stream.next().await { let meta = match meta_result { @@ -603,7 +563,6 @@ impl ObjectStorage for BlobStore { } // Record total files scanned increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "LIST", files_scanned as u64, &Utc::now().date_naive().to_string(), @@ -631,14 +590,9 @@ impl ObjectStorage for BlobStore { async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { let result = self.client.delete(&to_object_store_path(path)).await; - increment_object_store_calls_by_date( - "azure_blob", - "DELETE", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); if result.is_ok() { increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "DELETE", 1, &Utc::now().date_naive().to_string(), @@ -653,14 +607,9 @@ impl ObjectStorage for BlobStore { .client .head(&to_object_store_path(&parseable_json_path())) .await; - increment_object_store_calls_by_date( - "azure_blob", - "HEAD", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); if result.is_ok() { increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "HEAD", 1, &Utc::now().date_naive().to_string(), @@ -680,15 +629,10 @@ impl ObjectStorage for BlobStore { let file = RelativePathBuf::from(&node_filename); let result = self.client.delete(&to_object_store_path(&file)).await; - increment_object_store_calls_by_date( - "azure_blob", - "DELETE", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); match result { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "DELETE", 1, &Utc::now().date_naive().to_string(), @@ -711,16 +655,11 @@ impl ObjectStorage for BlobStore { let common_prefixes = resp.common_prefixes; // get all dirs increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "LIST", common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "azure_blob", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); // return prefixes at the root level let dirs: HashSet<_> = common_prefixes .iter() @@ -735,17 +674,12 @@ impl ObjectStorage for BlobStore { let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}"); let task = async move { let result = self.client.head(&StorePath::from(key)).await; - increment_object_store_calls_by_date( - "azure_blob", - "HEAD", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); result.map(|_| ()) }; stream_json_check.push(task); } increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "HEAD", dirs.len() as u64, &Utc::now().date_naive().to_string(), @@ -769,16 +703,11 @@ impl ObjectStorage for BlobStore { let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "azure_blob", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let hours: Vec = resp .common_prefixes @@ -809,16 +738,11 @@ impl ObjectStorage for BlobStore { let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "azure_blob", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let minutes: Vec = resp .common_prefixes .iter() @@ -870,15 +794,10 @@ impl ObjectStorage for BlobStore { let pre = object_store::path::Path::from("/"); let resp = self.client.list_with_delimiter(Some(&pre)).await; - increment_object_store_calls_by_date( - "azure_blob", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let resp = match resp { Ok(resp) => { increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), @@ -905,15 +824,10 @@ impl ObjectStorage for BlobStore { ) -> Result, ObjectStorageError> { let prefix = object_store::path::Path::from(relative_path.as_str()); let resp = self.client.list_with_delimiter(Some(&prefix)).await; - increment_object_store_calls_by_date( - "azure_blob", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let resp = match resp { Ok(resp) => { increment_files_scanned_in_object_store_calls_by_date( - "azure_blob", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 62e0b82a1..2e062e4bb 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -28,7 +28,9 @@ use std::{ use crate::{ metrics::{ - increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_bytes_scanned_in_object_store_calls_by_date, + increment_files_scanned_in_object_store_calls_by_date, + increment_object_store_calls_by_date, }, parseable::LogStream, }; @@ -177,16 +179,20 @@ pub struct Gcs { impl Gcs { async fn _get_object(&self, path: &RelativePath) -> Result { let resp = self.client.get(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("gcs", "GET", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); match resp { Ok(resp) => { let body: Bytes = resp.bytes().await?; increment_files_scanned_in_object_store_calls_by_date( - "gcs", "GET", 1, &Utc::now().date_naive().to_string(), ); + increment_bytes_scanned_in_object_store_calls_by_date( + "GET", + body.len() as u64, + &Utc::now().date_naive().to_string(), + ); Ok(body) } Err(err) => Err(err.into()), @@ -199,11 +205,10 @@ impl Gcs { resource: PutPayload, ) -> Result<(), ObjectStorageError> { let resp = self.client.put(&to_object_store_path(path), resource).await; - increment_object_store_calls_by_date("gcs", "PUT", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); match resp { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "PUT", 1, &Utc::now().date_naive().to_string(), @@ -219,7 +224,7 @@ impl Gcs { let files_deleted = Arc::new(AtomicU64::new(0)); // Track LIST operation let object_stream = self.client.list(Some(&(key.into()))); - increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); object_stream .for_each_concurrent(None, |x| async { files_scanned.fetch_add(1, Ordering::Relaxed); @@ -229,7 +234,6 @@ impl Gcs { files_deleted.fetch_add(1, Ordering::Relaxed); let delete_resp = self.client.delete(&obj.location).await; increment_object_store_calls_by_date( - "gcs", "DELETE", &Utc::now().date_naive().to_string(), ); @@ -248,13 +252,11 @@ impl Gcs { .await; increment_files_scanned_in_object_store_calls_by_date( - "gcs", "LIST", files_scanned.load(Ordering::Relaxed), &Utc::now().date_naive().to_string(), ); increment_files_scanned_in_object_store_calls_by_date( - "gcs", "DELETE", files_deleted.load(Ordering::Relaxed), &Utc::now().date_naive().to_string(), @@ -267,7 +269,7 @@ impl Gcs { .client .list_with_delimiter(Some(&(stream.into()))) .await; - increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let resp = match resp { Ok(resp) => resp, @@ -279,7 +281,6 @@ impl Gcs { let common_prefixes = resp.common_prefixes; increment_files_scanned_in_object_store_calls_by_date( - "gcs", "LIST", common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), @@ -299,11 +300,10 @@ impl Gcs { let bytes = tokio::fs::read(path).await?; let result = self.client.put(&key.into(), bytes.into()).await; - increment_object_store_calls_by_date("gcs", "PUT", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); match result { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "PUT", 1, &Utc::now().date_naive().to_string(), @@ -338,15 +338,10 @@ impl Gcs { // Track single PUT operation for small files let result = self.client.put(location, data.into()).await; - increment_object_store_calls_by_date( - "gcs", - "PUT", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); match result { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "PUT", 1, &Utc::now().date_naive().to_string(), @@ -385,7 +380,6 @@ impl Gcs { return Err(result.err().unwrap().into()); } increment_object_store_calls_by_date( - "gcs", "PUT_MULTIPART", &Utc::now().date_naive().to_string(), ); @@ -416,11 +410,10 @@ impl ObjectStorage for Gcs { let path = &to_object_store_path(path); let meta = self.client.head(path).await; - increment_object_store_calls_by_date("gcs", "HEAD", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); let meta = match meta { Ok(meta) => { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "HEAD", 1, &Utc::now().date_naive().to_string(), @@ -447,10 +440,9 @@ impl ObjectStorage for Gcs { async fn head(&self, path: &RelativePath) -> Result { let result = self.client.head(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("gcs", "HEAD", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); if result.is_ok() { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "HEAD", 1, &Utc::now().date_naive().to_string(), @@ -503,27 +495,26 @@ impl ObjectStorage for Gcs { ) .await?; increment_files_scanned_in_object_store_calls_by_date( - "gcs", "GET", 1, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "gcs", + increment_bytes_scanned_in_object_store_calls_by_date( "GET", + byts.len() as u64, &Utc::now().date_naive().to_string(), ); + increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); res.push(byts); } // Record total files scanned increment_files_scanned_in_object_store_calls_by_date( - "gcs", "LIST", files_scanned as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); Ok(res) } @@ -534,7 +525,7 @@ impl ObjectStorage for Gcs { let mut files_scanned = 0; let mut object_stream = self.client.list(Some(&self.root)); - increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); while let Some(meta_result) = object_stream.next().await { let meta = match meta_result { @@ -553,7 +544,6 @@ impl ObjectStorage for Gcs { } // Record total files scanned increment_files_scanned_in_object_store_calls_by_date( - "gcs", "LIST", files_scanned as u64, &Utc::now().date_naive().to_string(), @@ -581,10 +571,9 @@ impl ObjectStorage for Gcs { async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { let result = self.client.delete(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("gcs", "DELETE", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); if result.is_ok() { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "DELETE", 1, &Utc::now().date_naive().to_string(), @@ -599,11 +588,10 @@ impl ObjectStorage for Gcs { .client .head(&to_object_store_path(&parseable_json_path())) .await; - increment_object_store_calls_by_date("gcs", "HEAD", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); if result.is_ok() { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "HEAD", 1, &Utc::now().date_naive().to_string(), @@ -623,11 +611,10 @@ impl ObjectStorage for Gcs { let file = RelativePathBuf::from(&node_filename); let result = self.client.delete(&to_object_store_path(&file)).await; - increment_object_store_calls_by_date("gcs", "DELETE", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); match result { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "DELETE", 1, &Utc::now().date_naive().to_string(), @@ -649,12 +636,11 @@ impl ObjectStorage for Gcs { let resp = self.client.list_with_delimiter(None).await?; let common_prefixes = resp.common_prefixes; // get all dirs increment_files_scanned_in_object_store_calls_by_date( - "gcs", "LIST", common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); // return prefixes at the root level let dirs: HashSet<_> = common_prefixes .iter() @@ -669,17 +655,12 @@ impl ObjectStorage for Gcs { let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}"); let task = async move { let result = self.client.head(&StorePath::from(key)).await; - increment_object_store_calls_by_date( - "gcs", - "HEAD", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); result.map(|_| ()) }; stream_json_check.push(task); } increment_files_scanned_in_object_store_calls_by_date( - "gcs", "HEAD", dirs.len() as u64, &Utc::now().date_naive().to_string(), @@ -703,12 +684,11 @@ impl ObjectStorage for Gcs { let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; increment_files_scanned_in_object_store_calls_by_date( - "gcs", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let hours: Vec = resp .common_prefixes @@ -739,12 +719,11 @@ impl ObjectStorage for Gcs { let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; increment_files_scanned_in_object_store_calls_by_date( - "gcs", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let minutes: Vec = resp .common_prefixes .iter() @@ -792,11 +771,10 @@ impl ObjectStorage for Gcs { let pre = object_store::path::Path::from("/"); let resp = self.client.list_with_delimiter(Some(&pre)).await; - increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let resp = match resp { Ok(resp) => { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), @@ -824,11 +802,10 @@ impl ObjectStorage for Gcs { let prefix = object_store::path::Path::from(relative_path.as_str()); let resp = self.client.list_with_delimiter(Some(&prefix)).await; - increment_object_store_calls_by_date("gcs", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let resp = match resp { Ok(resp) => { increment_files_scanned_in_object_store_calls_by_date( - "gcs", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 2271cbf80..92b74434b 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -164,16 +164,11 @@ impl ObjectStorage for LocalFS { Ok(x) => { // Record single file accessed successfully increment_files_scanned_in_object_store_calls_by_date( - "localfs", "GET", 1, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "localfs", - "GET", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); Ok(x.into()) } Err(e) => { @@ -222,16 +217,11 @@ impl ObjectStorage for LocalFS { // Record total files scanned increment_files_scanned_in_object_store_calls_by_date( - "localfs", "LIST", files_scanned, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "localfs", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); Ok(path_arr) } @@ -280,13 +270,11 @@ impl ObjectStorage for LocalFS { Ok(file) => { // Record total files scanned increment_files_scanned_in_object_store_calls_by_date( - "localfs", "GET", 1, &Utc::now().date_naive().to_string(), ); increment_object_store_calls_by_date( - "localfs", "GET", &Utc::now().date_naive().to_string(), ); @@ -299,16 +287,11 @@ impl ObjectStorage for LocalFS { } increment_files_scanned_in_object_store_calls_by_date( - "localfs", "LIST", files_scanned as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "localfs", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); Ok(res) } @@ -327,16 +310,11 @@ impl ObjectStorage for LocalFS { if res.is_ok() { // Record single file written successfully increment_files_scanned_in_object_store_calls_by_date( - "localfs", "PUT", 1, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "localfs", - "PUT", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); } res.map_err(Into::into) @@ -347,11 +325,7 @@ impl ObjectStorage for LocalFS { let result = tokio::fs::remove_dir_all(path).await; if result.is_ok() { - increment_object_store_calls_by_date( - "localfs", - "DELETE", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); } result?; Ok(()) @@ -364,16 +338,11 @@ impl ObjectStorage for LocalFS { if result.is_ok() { // Record single file deleted successfully increment_files_scanned_in_object_store_calls_by_date( - "localfs", "DELETE", 1, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date( - "localfs", - "DELETE", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); } result?; @@ -383,11 +352,7 @@ impl ObjectStorage for LocalFS { async fn check(&self) -> Result<(), ObjectStorageError> { let result = fs::create_dir_all(&self.root).await; if result.is_ok() { - increment_object_store_calls_by_date( - "localfs", - "HEAD", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); } result.map_err(|e| ObjectStorageError::UnhandledError(e.into())) @@ -398,11 +363,7 @@ impl ObjectStorage for LocalFS { let result = fs::remove_dir_all(path).await; if result.is_ok() { - increment_object_store_calls_by_date( - "localfs", - "DELETE", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); } Ok(result?) @@ -413,11 +374,7 @@ impl ObjectStorage for LocalFS { let result = fs::remove_file(path).await; if result.is_ok() { - increment_object_store_calls_by_date( - "localfs", - "DELETE", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); } Ok(result?) @@ -435,11 +392,7 @@ impl ObjectStorage for LocalFS { let result = fs::read_dir(&self.root).await; let directories = match result { Ok(read_dir) => { - increment_object_store_calls_by_date( - "localfs", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); ReadDirStream::new(read_dir) } Err(err) => { @@ -471,11 +424,7 @@ impl ObjectStorage for LocalFS { let result = fs::read_dir(&self.root).await; let directories = match result { Ok(read_dir) => { - increment_object_store_calls_by_date( - "localfs", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); ReadDirStream::new(read_dir) } Err(err) => { @@ -500,11 +449,7 @@ impl ObjectStorage for LocalFS { let result = fs::read_dir(&self.root).await; let read_dir = match result { Ok(read_dir) => { - increment_object_store_calls_by_date( - "localfs", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); read_dir } Err(err) => { @@ -564,11 +509,7 @@ impl ObjectStorage for LocalFS { let result = fs::read_dir(&path).await; let read_dir = match result { Ok(read_dir) => { - increment_object_store_calls_by_date( - "localfs", - "LIST", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); read_dir } Err(err) => { @@ -635,11 +576,7 @@ impl ObjectStorage for LocalFS { let result = fs_extra::file::copy(path, to_path, &op); match result { Ok(_) => { - increment_object_store_calls_by_date( - "localfs", - "PUT", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); Ok(()) } Err(err) => Err(err.into()), diff --git a/src/storage/s3.rs b/src/storage/s3.rs index ad72105a2..114969071 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -52,7 +52,9 @@ use tracing::error; use crate::{ metrics::{ - increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_bytes_scanned_in_object_store_calls_by_date, + increment_files_scanned_in_object_store_calls_by_date, + increment_object_store_calls_by_date, }, parseable::LogStream, }; @@ -339,17 +341,21 @@ pub struct S3 { impl S3 { async fn _get_object(&self, path: &RelativePath) -> Result { let resp = self.client.get(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("s3", "GET", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); match resp { Ok(resp) => { let body = resp.bytes().await?; increment_files_scanned_in_object_store_calls_by_date( - "s3", "GET", 1, &Utc::now().date_naive().to_string(), ); + increment_bytes_scanned_in_object_store_calls_by_date( + "GET", + body.len() as u64, + &Utc::now().date_naive().to_string(), + ); Ok(body) } Err(err) => Err(err.into()), @@ -362,11 +368,10 @@ impl S3 { resource: PutPayload, ) -> Result<(), ObjectStorageError> { let resp = self.client.put(&to_object_store_path(path), resource).await; - increment_object_store_calls_by_date("s3", "PUT", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); match resp { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "s3", "PUT", 1, &Utc::now().date_naive().to_string(), @@ -382,7 +387,7 @@ impl S3 { let files_deleted = Arc::new(AtomicU64::new(0)); // Track LIST operation let object_stream = self.client.list(Some(&(key.into()))); - increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); object_stream .for_each_concurrent(None, |x| async { @@ -393,7 +398,6 @@ impl S3 { files_deleted.fetch_add(1, Ordering::Relaxed); let delete_resp = self.client.delete(&obj.location).await; increment_object_store_calls_by_date( - "s3", "DELETE", &Utc::now().date_naive().to_string(), ); @@ -412,13 +416,11 @@ impl S3 { .await; increment_files_scanned_in_object_store_calls_by_date( - "s3", "LIST", files_scanned.load(Ordering::Relaxed), &Utc::now().date_naive().to_string(), ); increment_files_scanned_in_object_store_calls_by_date( - "s3", "DELETE", files_deleted.load(Ordering::Relaxed), &Utc::now().date_naive().to_string(), @@ -431,7 +433,7 @@ impl S3 { .client .list_with_delimiter(Some(&(stream.into()))) .await; - increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let resp = match resp { Ok(resp) => resp, @@ -443,7 +445,6 @@ impl S3 { let common_prefixes = resp.common_prefixes; increment_files_scanned_in_object_store_calls_by_date( - "s3", "LIST", common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), @@ -463,11 +464,10 @@ impl S3 { let bytes = tokio::fs::read(path).await?; let result = self.client.put(&key.into(), bytes.into()).await; - increment_object_store_calls_by_date("s3", "PUT", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); match result { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "s3", "PUT", 1, &Utc::now().date_naive().to_string(), @@ -502,11 +502,10 @@ impl S3 { // Track single PUT operation for small files let result = self.client.put(location, data.into()).await; - increment_object_store_calls_by_date("s3", "PUT", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("PUT", &Utc::now().date_naive().to_string()); match result { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "s3", "PUT", 1, &Utc::now().date_naive().to_string(), @@ -550,7 +549,6 @@ impl S3 { return Err(result.err().unwrap().into()); } increment_object_store_calls_by_date( - "s3", "PUT_MULTIPART", &Utc::now().date_naive().to_string(), ); @@ -576,11 +574,10 @@ impl ObjectStorage for S3 { ) -> Result { let path = &to_object_store_path(path); let meta = self.client.head(path).await; - increment_object_store_calls_by_date("s3", "HEAD", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); let meta = match meta { Ok(meta) => { increment_files_scanned_in_object_store_calls_by_date( - "s3", "HEAD", 1, &Utc::now().date_naive().to_string(), @@ -607,10 +604,9 @@ impl ObjectStorage for S3 { async fn head(&self, path: &RelativePath) -> Result { let result = self.client.head(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("s3", "HEAD", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); if result.is_ok() { increment_files_scanned_in_object_store_calls_by_date( - "s3", "HEAD", 1, &Utc::now().date_naive().to_string(), @@ -663,22 +659,25 @@ impl ObjectStorage for S3 { ) .await?; increment_files_scanned_in_object_store_calls_by_date( - "s3", "GET", 1, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date("s3", "GET", &Utc::now().date_naive().to_string()); + increment_bytes_scanned_in_object_store_calls_by_date( + "GET", + byts.len() as u64, + &Utc::now().date_naive().to_string(), + ); + increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); res.push(byts); } // Record total files scanned increment_files_scanned_in_object_store_calls_by_date( - "s3", "LIST", files_scanned as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); Ok(res) } @@ -689,7 +688,7 @@ impl ObjectStorage for S3 { let mut files_scanned = 0; let mut object_stream = self.client.list(Some(&self.root)); - increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); while let Some(meta_result) = object_stream.next().await { let meta = match meta_result { @@ -708,7 +707,6 @@ impl ObjectStorage for S3 { } // Record total files scanned increment_files_scanned_in_object_store_calls_by_date( - "s3", "LIST", files_scanned as u64, &Utc::now().date_naive().to_string(), @@ -736,10 +734,9 @@ impl ObjectStorage for S3 { async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { let result = self.client.delete(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("s3", "DELETE", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); if result.is_ok() { increment_files_scanned_in_object_store_calls_by_date( - "s3", "DELETE", 1, &Utc::now().date_naive().to_string(), @@ -754,11 +751,10 @@ impl ObjectStorage for S3 { .client .head(&to_object_store_path(&parseable_json_path())) .await; - increment_object_store_calls_by_date("s3", "HEAD", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); if result.is_ok() { increment_files_scanned_in_object_store_calls_by_date( - "s3", "HEAD", 1, &Utc::now().date_naive().to_string(), @@ -778,11 +774,10 @@ impl ObjectStorage for S3 { let file = RelativePathBuf::from(&node_filename); let result = self.client.delete(&to_object_store_path(&file)).await; - increment_object_store_calls_by_date("s3", "DELETE", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("DELETE", &Utc::now().date_naive().to_string()); match result { Ok(_) => { increment_files_scanned_in_object_store_calls_by_date( - "s3", "DELETE", 1, &Utc::now().date_naive().to_string(), @@ -804,12 +799,11 @@ impl ObjectStorage for S3 { let resp = self.client.list_with_delimiter(None).await?; let common_prefixes = resp.common_prefixes; // get all dirs increment_files_scanned_in_object_store_calls_by_date( - "s3", "LIST", common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); // return prefixes at the root level let dirs: HashSet<_> = common_prefixes .iter() @@ -824,17 +818,12 @@ impl ObjectStorage for S3 { let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}"); let task = async move { let result = self.client.head(&StorePath::from(key)).await; - increment_object_store_calls_by_date( - "s3", - "HEAD", - &Utc::now().date_naive().to_string(), - ); + increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string()); result.map(|_| ()) }; stream_json_check.push(task); } increment_files_scanned_in_object_store_calls_by_date( - "s3", "HEAD", dirs.len() as u64, &Utc::now().date_naive().to_string(), @@ -858,12 +847,11 @@ impl ObjectStorage for S3 { let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; increment_files_scanned_in_object_store_calls_by_date( - "s3", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let hours: Vec = resp .common_prefixes @@ -894,12 +882,11 @@ impl ObjectStorage for S3 { let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; increment_files_scanned_in_object_store_calls_by_date( - "s3", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), ); - increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let minutes: Vec = resp .common_prefixes .iter() @@ -946,11 +933,10 @@ impl ObjectStorage for S3 { async fn list_dirs(&self) -> Result, ObjectStorageError> { let pre = object_store::path::Path::from("/"); let resp = self.client.list_with_delimiter(Some(&pre)).await; - increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let resp = match resp { Ok(resp) => { increment_files_scanned_in_object_store_calls_by_date( - "s3", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), @@ -978,11 +964,10 @@ impl ObjectStorage for S3 { let prefix = object_store::path::Path::from(relative_path.as_str()); let resp = self.client.list_with_delimiter(Some(&prefix)).await; - increment_object_store_calls_by_date("s3", "LIST", &Utc::now().date_naive().to_string()); + increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string()); let resp = match resp { Ok(resp) => { increment_files_scanned_in_object_store_calls_by_date( - "s3", "LIST", resp.common_prefixes.len() as u64, &Utc::now().date_naive().to_string(), From 2fd53162dc1ac9ac837c078202acbb5f55fe8d27 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 24 Sep 2025 23:57:16 -0700 Subject: [PATCH 3/3] add provider label for llm token, remove redundant metrics capture for get_object --- src/metrics/mod.rs | 2 +- src/storage/azure_blob.rs | 11 ----------- src/storage/gcs.rs | 11 ----------- src/storage/s3.rs | 11 ----------- 4 files changed, 1 insertion(+), 34 deletions(-) diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index d15270fb9..1a5f177dc 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -317,7 +317,7 @@ pub static TOTAL_INPUT_LLM_TOKENS_BY_DATE: Lazy = Lazy::new(|| { "Total input LLM tokens used by date", ) .namespace(METRICS_NAMESPACE), - &["model", "date"], + &["provider", "model", "date"], ) .expect("metric can be created") }); diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 4ffa23c21..96b8f148a 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -513,17 +513,6 @@ impl ObjectStorage for BlobStore { .map_err(ObjectStorageError::PathError)?, ) .await?; - increment_files_scanned_in_object_store_calls_by_date( - "GET", - 1, - &Utc::now().date_naive().to_string(), - ); - increment_bytes_scanned_in_object_store_calls_by_date( - "GET", - byts.len() as u64, - &Utc::now().date_naive().to_string(), - ); - increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); res.push(byts); } diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 2e062e4bb..acde4cab4 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -494,17 +494,6 @@ impl ObjectStorage for Gcs { .map_err(ObjectStorageError::PathError)?, ) .await?; - increment_files_scanned_in_object_store_calls_by_date( - "GET", - 1, - &Utc::now().date_naive().to_string(), - ); - increment_bytes_scanned_in_object_store_calls_by_date( - "GET", - byts.len() as u64, - &Utc::now().date_naive().to_string(), - ); - increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); res.push(byts); } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 114969071..79b93d175 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -658,17 +658,6 @@ impl ObjectStorage for S3 { .map_err(ObjectStorageError::PathError)?, ) .await?; - increment_files_scanned_in_object_store_calls_by_date( - "GET", - 1, - &Utc::now().date_naive().to_string(), - ); - increment_bytes_scanned_in_object_store_calls_by_date( - "GET", - byts.len() as u64, - &Utc::now().date_naive().to_string(), - ); - increment_object_store_calls_by_date("GET", &Utc::now().date_naive().to_string()); res.push(byts); } // Record total files scanned