Skip to content

Commit 48f97e7

Browse files
chore: validate parquet before and after upload to object store (#1432)
after parquet file creation - 1. validate if file size > FOOTER_SIZE 2. read the file and validate if num_rows > 0 after parquet file upload to object store - 1. perform a head() to get the metadata of the file validate if file size = file size of the parquet from staging directory Fixes: #1430
1 parent 6fc7cdf commit 48f97e7

File tree

2 files changed

+125
-16
lines changed

2 files changed

+125
-16
lines changed

src/parseable/streams.rs

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ use itertools::Itertools;
3434
use parquet::{
3535
arrow::ArrowWriter,
3636
basic::Encoding,
37-
file::{FOOTER_SIZE, properties::WriterProperties},
37+
file::{
38+
FOOTER_SIZE, properties::WriterProperties, reader::FileReader,
39+
serialized_reader::SerializedFileReader,
40+
},
3841
format::SortingColumn,
3942
schema::types::ColumnPath,
4043
};
@@ -409,7 +412,7 @@ impl Stream {
409412
.map(|file| file.path())
410413
.filter(|file| {
411414
file.extension().is_some_and(|ext| ext.eq("parquet"))
412-
&& std::fs::metadata(file).is_ok_and(|meta| meta.len() > FOOTER_SIZE as u64)
415+
&& Self::is_valid_parquet_file(file, &self.stream_name)
413416
})
414417
.collect()
415418
}
@@ -649,7 +652,7 @@ impl Stream {
649652
continue;
650653
}
651654

652-
if let Err(e) = self.finalize_parquet_file(&part_path, &parquet_path) {
655+
if let Err(e) = std::fs::rename(&part_path, &parquet_path) {
653656
error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}");
654657
} else {
655658
self.cleanup_arrow_files_and_dir(&arrow_files);
@@ -682,12 +685,10 @@ impl Stream {
682685
}
683686
writer.close()?;
684687

685-
if part_file.metadata().expect("File was just created").len()
686-
< parquet::file::FOOTER_SIZE as u64
687-
{
688+
if !Self::is_valid_parquet_file(part_path, &self.stream_name) {
688689
error!(
689-
"Invalid parquet file {part_path:?} detected for stream {}, removing it",
690-
&self.stream_name
690+
"Invalid parquet file {part_path:?} detected for stream {stream_name}, removing it",
691+
stream_name = &self.stream_name
691692
);
692693
remove_file(part_path).expect("File should be removable if it is invalid");
693694
return Ok(false);
@@ -696,8 +697,47 @@ impl Stream {
696697
Ok(true)
697698
}
698699

699-
fn finalize_parquet_file(&self, part_path: &Path, parquet_path: &Path) -> std::io::Result<()> {
700-
std::fs::rename(part_path, parquet_path)
700+
/// function to validate parquet files
701+
fn is_valid_parquet_file(path: &Path, stream_name: &str) -> bool {
702+
// First check file size as a quick validation
703+
match path.metadata() {
704+
Ok(meta) if meta.len() < FOOTER_SIZE as u64 => {
705+
error!(
706+
"Invalid parquet file {path:?} detected for stream {stream_name}, size: {} bytes",
707+
meta.len()
708+
);
709+
return false;
710+
}
711+
Err(e) => {
712+
error!(
713+
"Cannot read metadata for parquet file {path:?} for stream {stream_name}: {e}"
714+
);
715+
return false;
716+
}
717+
_ => {} // File size is adequate, continue validation
718+
}
719+
720+
// Try to open and read the parquet file metadata to verify it's valid
721+
match std::fs::File::open(path) {
722+
Ok(file) => match SerializedFileReader::new(file) {
723+
Ok(reader) => {
724+
if reader.metadata().file_metadata().num_rows() == 0 {
725+
error!("Invalid parquet file {path:?} for stream {stream_name}");
726+
false
727+
} else {
728+
true
729+
}
730+
}
731+
Err(e) => {
732+
error!("Failed to read parquet file {path:?} for stream {stream_name}: {e}");
733+
false
734+
}
735+
},
736+
Err(e) => {
737+
error!("Failed to open parquet file {path:?} for stream {stream_name}: {e}");
738+
false
739+
}
740+
}
701741
}
702742

703743
fn cleanup_arrow_files_and_dir(&self, arrow_files: &[PathBuf]) {
@@ -951,7 +991,10 @@ impl Stream {
951991
shutdown_signal: bool,
952992
) -> Result<(), StagingError> {
953993
let start_flush = Instant::now();
954-
self.flush(shutdown_signal);
994+
// Force flush for init or shutdown signals to convert all .part files to .arrows
995+
// For regular cycles, use false to only flush non-current writers
996+
let forced = init_signal || shutdown_signal;
997+
self.flush(forced);
955998
trace!(
956999
"Flushing stream ({}) took: {}s",
9571000
self.stream_name,

src/storage/object_storage.rs

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ async fn upload_single_parquet_file(
106106
.to_str()
107107
.expect("filename is valid string");
108108

109+
// Get the local file size for validation
110+
let local_file_size = path
111+
.metadata()
112+
.map_err(|e| ObjectStorageError::Custom(format!("Failed to get local file metadata: {e}")))?
113+
.len();
114+
109115
// Upload the file
110116
store
111117
.upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path)
@@ -115,6 +121,27 @@ async fn upload_single_parquet_file(
115121
ObjectStorageError::Custom(format!("Failed to upload {filename}: {e}"))
116122
})?;
117123

124+
// Validate the uploaded file size matches local file
125+
let upload_is_valid = validate_uploaded_parquet_file(
126+
&store,
127+
&stream_relative_path,
128+
local_file_size,
129+
&stream_name,
130+
)
131+
.await?;
132+
133+
if !upload_is_valid {
134+
// Upload validation failed, clean up the uploaded file and return error
135+
let _ = store
136+
.delete_object(&RelativePathBuf::from(&stream_relative_path))
137+
.await;
138+
error!("Upload size validation failed for file {filename:?}, deleted from object storage");
139+
return Ok(UploadResult {
140+
file_path: path,
141+
manifest_file: None, // Preserve staging file for retry; no manifest created
142+
});
143+
}
144+
118145
// Update storage metrics
119146
update_storage_metrics(&path, &stream_name, filename)?;
120147

@@ -177,6 +204,44 @@ async fn calculate_stats_if_enabled(
177204
}
178205
}
179206

207+
/// Validates that a parquet file uploaded to object storage matches the staging file size
208+
async fn validate_uploaded_parquet_file(
209+
store: &Arc<dyn ObjectStorage>,
210+
stream_relative_path: &str,
211+
expected_size: u64,
212+
stream_name: &str,
213+
) -> Result<bool, ObjectStorageError> {
214+
// Verify the file exists and has the expected size
215+
match store
216+
.head(&RelativePathBuf::from(stream_relative_path))
217+
.await
218+
{
219+
Ok(metadata) => {
220+
if metadata.size as u64 != expected_size {
221+
warn!(
222+
"Uploaded file size mismatch for stream {}: expected {}, got {}",
223+
stream_name, expected_size, metadata.size
224+
);
225+
Ok(false)
226+
} else {
227+
tracing::trace!(
228+
"Uploaded parquet file size validated successfully for stream {}, size: {}",
229+
stream_name,
230+
expected_size
231+
);
232+
Ok(true)
233+
}
234+
}
235+
Err(e) => {
236+
error!(
237+
"Failed to get metadata for uploaded file in stream {}: {e}",
238+
stream_name
239+
);
240+
Ok(false)
241+
}
242+
}
243+
}
244+
180245
pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync {
181246
fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder;
182247
fn construct_client(&self) -> Arc<dyn ObjectStorage>;
@@ -880,14 +945,15 @@ async fn collect_upload_results(
880945
if let Some(manifest_file) = upload_result.manifest_file {
881946
uploaded_files.push((upload_result.file_path, manifest_file));
882947
} else {
883-
// File failed to upload, clean up
884-
if let Err(e) = remove_file(upload_result.file_path) {
885-
warn!("Failed to remove staged file: {e}");
886-
}
948+
// File failed in upload size validation, preserve staging file for retry
949+
error!(
950+
"Parquet file upload size validation failed for {:?}, preserving in staging for retry",
951+
upload_result.file_path
952+
);
887953
}
888954
}
889955
Ok(Err(e)) => {
890-
error!("Error processing parquet file: {e}");
956+
error!("Error uploading parquet file: {e}");
891957
return Err(e);
892958
}
893959
Err(e) => {

0 commit comments

Comments
 (0)