Skip to content

Commit 9ae190f

Browse files
chore: add mandatory hottier for pstats dataset (#1414)
1 parent 5e89972 commit 9ae190f

File tree

1 file changed

+30
-1
lines changed

1 file changed

+30
-1
lines changed

src/hottier.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{
2727
catalog::manifest::{File, Manifest},
2828
handlers::http::cluster::INTERNAL_STREAM_NAME,
2929
parseable::PARSEABLE,
30-
storage::{ObjectStorage, ObjectStorageError},
30+
storage::{ObjectStorage, ObjectStorageError, field_stats::DATASET_STATS_STREAM_NAME},
3131
utils::{extract_datetime, human_size::bytes_to_human_size},
3232
validator::error::HotTierValidationError,
3333
};
@@ -252,6 +252,11 @@ impl HotTierManager {
252252

253253
///sync the hot tier files from S3 to the hot tier directory for all streams
254254
async fn sync_hot_tier(&self) -> Result<(), HotTierError> {
255+
// Before syncing, check if pstats stream was created and needs hot tier
256+
if let Err(e) = self.create_pstats_hot_tier().await {
257+
tracing::trace!("Skipping pstats hot tier creation because of error: {e}");
258+
}
259+
255260
let mut sync_hot_tier_tasks = FuturesUnordered::new();
256261
for stream in PARSEABLE.streams.list() {
257262
if self.check_stream_hot_tier_exists(&stream) {
@@ -708,6 +713,30 @@ impl HotTierManager {
708713
Ok(())
709714
}
710715

716+
/// Creates hot tier for pstats internal stream if the stream exists in storage
717+
async fn create_pstats_hot_tier(&self) -> Result<(), HotTierError> {
718+
// Check if pstats hot tier already exists
719+
if !self.check_stream_hot_tier_exists(DATASET_STATS_STREAM_NAME) {
720+
// Check if pstats stream exists in storage by attempting to load it
721+
if PARSEABLE
722+
.check_or_load_stream(DATASET_STATS_STREAM_NAME)
723+
.await
724+
{
725+
let mut stream_hot_tier = StreamHotTier {
726+
version: Some(CURRENT_HOT_TIER_VERSION.to_string()),
727+
size: MIN_STREAM_HOT_TIER_SIZE_BYTES,
728+
used_size: 0,
729+
available_size: MIN_STREAM_HOT_TIER_SIZE_BYTES,
730+
oldest_date_time_entry: None,
731+
};
732+
self.put_hot_tier(DATASET_STATS_STREAM_NAME, &mut stream_hot_tier)
733+
.await?;
734+
}
735+
}
736+
737+
Ok(())
738+
}
739+
711740
/// Get the disk usage for the hot tier storage path. If we have a three disk paritions
712741
/// mounted as follows:
713742
/// 1. /

0 commit comments

Comments
 (0)