From 0dbabc24a2583fec030ccf76776d56006ba12135 Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Tue, 2 Sep 2025 15:30:29 +0200 Subject: [PATCH] Changing method prototype in doc processor --- .../src/actors/doc_processor.rs | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 407c55ff526..4c850d2106c 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -150,15 +150,11 @@ fn try_into_vrl_doc( Ok(vrl_doc) } -fn try_into_json_docs( - input_format: SourceInputFormat, - raw_doc: Bytes, - num_bytes: usize, -) -> JsonDocIterator { +fn try_into_json_docs(input_format: SourceInputFormat, raw_doc: Bytes) -> JsonDocIterator { match input_format { SourceInputFormat::Json => { let json_doc_result = serde_json::from_slice::(&raw_doc) - .map(|json_obj| JsonDoc::new(json_obj, num_bytes)); + .map(|json_obj| JsonDoc::new(json_obj, raw_doc.len())); JsonDocIterator::from(json_doc_result) } SourceInputFormat::OtlpLogsJson => { @@ -182,7 +178,7 @@ fn try_into_json_docs( let mut json_obj = serde_json::Map::with_capacity(1); let key = PLAIN_TEXT.to_string(); json_obj.insert(key, JsonValue::String(value)); - JsonDoc::new(json_obj, num_bytes) + JsonDoc::new(json_obj, raw_doc.len()) }); JsonDocIterator::from(json_doc_result) } @@ -210,10 +206,9 @@ fn parse_raw_doc( fn parse_raw_doc( input_format: SourceInputFormat, raw_doc: Bytes, - num_bytes: usize, _vrl_program_opt: Option<&mut VrlProgram>, ) -> JsonDocIterator { - try_into_json_docs(input_format, raw_doc, num_bytes) + try_into_json_docs(input_format, raw_doc) } enum JsonDocIterator { @@ -462,14 +457,14 @@ impl DocProcessor { } fn process_raw_doc(&mut self, raw_doc: Bytes, processed_docs: &mut Vec) { - let num_bytes = raw_doc.len(); - #[cfg(feature = "vrl")] let transform_opt = self.transform_opt.as_mut(); #[cfg(not(feature = "vrl"))] let transform_opt: Option<&mut VrlProgram> = None; - for json_doc_result in parse_raw_doc(self.input_format, raw_doc, num_bytes, transform_opt) { + let raw_doc_num_bytes = raw_doc.len(); + + for json_doc_result in parse_raw_doc(self.input_format, raw_doc, transform_opt) { let processed_doc_result = json_doc_result.and_then(|json_doc| self.process_json_doc(json_doc)); @@ -485,7 +480,10 @@ impl DocProcessor { source_id = self.counters.source_id, "{error}", ); - self.counters.record_error(error, num_bytes as u64); + // WARN The num bytes value is wrong here. Depending on the source, + // raw_doc could be generating more than one doc. In that case, raw_doc_num_bytes + // is not the size of the faulty doc we just encountered. + self.counters.record_error(error, raw_doc_num_bytes as u64); } } }