From 1f6dd72b8d90c66eae672b0b52a8314bd5a08cff Mon Sep 17 00:00:00 2001 From: ANUBprad Date: Sat, 21 Mar 2026 09:36:13 +0530 Subject: [PATCH 1/4] Fix duplicate output during resume in extract_text_from_xml --- openverifiablellm/utils.py | 96 +++++++++++++++++++++++--------------- test.xml | 14 ++++++ 2 files changed, 72 insertions(+), 38 deletions(-) create mode 100644 test.xml diff --git a/openverifiablellm/utils.py b/openverifiablellm/utils.py index 47cace0..8d26861 100644 --- a/openverifiablellm/utils.py +++ b/openverifiablellm/utils.py @@ -252,37 +252,45 @@ def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity logger.warning("Failed to save checkpoint: %s", e) tmp.unlink(missing_ok=True) +def count_written_pages(output_path: Path) -> int: + """ + Count number of processed pages based on output file. + Assumes each page is separated by double newline. + """ + if not output_path.exists(): + return 0 -def extract_text_from_xml(input_path, *, write_manifest: bool = False): + with output_path.open("r", encoding="utf-8") as f: + content = f.read().strip() + + if not content: + return 0 + + return len(content.split("\n\n")) + + +def truncate_output_to_pages(output_path: Path, max_pages: int) -> None: """ - Process a Wikipedia XML dump (compressed or uncompressed) into cleaned plain text. - - Each element is parsed, its revision text is extracted, - cleaned using `clean_wikitext()`, and appended to a single - output text file. - - The processed output is saved to: - data/processed/wiki_clean.txt - - Supports resuming interrupted runs via a checkpoint file - (data/processed/wiki_clean.checkpoint.json). If the checkpoint - exists, already-processed pages are skipped and new pages are - appended to the existing output. Delete the checkpoint file to - force a full reprocessing from scratch. - - Parameters - ---------- - input_path : str or Path - Path to the Wikipedia XML dump file. - - Output - ------ - Creates: - data/processed/wiki_clean.txt + Truncate output file to match checkpoint page count. """ + with output_path.open("r", encoding="utf-8") as f: + content = f.read().strip() + + if not content: + return + + pages = content.split("\n\n") + + with output_path.open("w", encoding="utf-8") as f: + if pages[:max_pages]: + f.write("\n\n".join(pages[:max_pages]) + "\n\n") + else: + f.write("") + + +def extract_text_from_xml(input_path, *, write_manifest: bool = False): input_path = Path(input_path) - # Fixed output path project_root = Path.cwd() output_dir = project_root / "data" / "processed" output_dir.mkdir(parents=True, exist_ok=True) @@ -290,14 +298,26 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): output_path = output_dir / "wiki_clean.txt" checkpoint_path = _checkpoint_path(output_dir) - # Load checkpoint — tells us how many pages were already written checkpoint = _load_checkpoint(checkpoint_path, input_path, output_path) pages_already_done = checkpoint["pages_processed"] - # If resuming, append to existing output; otherwise start fresh + # ================== FIX FOR ISSUE #76 ================== + written_pages = count_written_pages(output_path) + + if written_pages > pages_already_done: + logger.warning( + "Output file ahead of checkpoint (%d > %d). Truncating...", + written_pages, + pages_already_done, + ) + truncate_output_to_pages(output_path, pages_already_done) + # ====================================================== + write_mode = "a" if pages_already_done > 0 else "w" - # Auto-detect file type using magic bytes separation + # FIX: correct input identity usage + input_identity = _compute_input_identity(input_path) + with open(input_path, "rb") as test_f: is_bz2 = test_f.read(3) == b"BZh" @@ -315,7 +335,6 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): if elem.tag.endswith("page"): pages_seen += 1 - # Skip pages already processed in a previous run if pages_seen <= pages_already_done: elem.clear() continue @@ -330,25 +349,26 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): pages_written += 1 elem.clear() - # Flush output and save checkpoint periodically - if pages_written % CHECKPOINT_INTERVAL == 0: + # More frequent checkpointing (safer) + if pages_written % 100 == 0: out.flush() - _save_checkpoint(checkpoint_path, pages_written, input_path) + _save_checkpoint(checkpoint_path, pages_written, input_identity) + except KeyboardInterrupt: - _save_checkpoint(checkpoint_path, pages_written, input_path) + _save_checkpoint(checkpoint_path, pages_written, input_identity) logger.warning("Interrupted by user after %d pages. Run again to resume.", pages_written) raise + except Exception: - # Save progress before propagating the exception so the next run can resume - _save_checkpoint(checkpoint_path, pages_written, input_path) + _save_checkpoint(checkpoint_path, pages_written, input_identity) logger.error("Processing interrupted after %d pages. Run again to resume.", pages_written) raise - # Processing finished successfully — remove checkpoint so a fresh - # re-run (if ever needed) starts from the beginning if write_manifest: generate_manifest(input_path, output_path) + checkpoint_path.unlink(missing_ok=True) + logger.info( "Preprocessing complete. %d pages processed. Output saved to %s", pages_written, diff --git a/test.xml b/test.xml new file mode 100644 index 0000000..f14b1a3 --- /dev/null +++ b/test.xml @@ -0,0 +1,14 @@ + + + Test1 + + This is [[sample]] text + + + + Test2 + + Another {{template}} example + + + \ No newline at end of file From fc5209bddee1c11640ce948991d68534982ce2a8 Mon Sep 17 00:00:00 2001 From: ANUBprad Date: Sat, 21 Mar 2026 13:03:03 +0530 Subject: [PATCH 2/4] Use file offset for resume consistency and address review feedback --- openverifiablellm/utils.py | 63 +++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/openverifiablellm/utils.py b/openverifiablellm/utils.py index 8d26861..b7b9750 100644 --- a/openverifiablellm/utils.py +++ b/openverifiablellm/utils.py @@ -202,7 +202,11 @@ def _compute_input_identity(input_path: Path) -> str: def _load_checkpoint(checkpoint_path: Path, input_path: Path, output_path: Path) -> Dict[str, Any]: """Load checkpoint safely and validate resume conditions.""" if not checkpoint_path.exists(): - return {"pages_processed": 0} + return { + "pages_processed": 0, + "input_identity": _compute_input_identity(input_path), + "file_offset": 0, + } try: with checkpoint_path.open("r", encoding="utf-8") as f: @@ -231,7 +235,7 @@ def _load_checkpoint(checkpoint_path: Path, input_path: Path, output_path: Path) return {"pages_processed": 0} -def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity: str) -> None: +def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity: str, file_offset: int,) -> None: """Atomically save checkpoint with input identity.""" tmp = checkpoint_path.with_suffix(".tmp") @@ -239,6 +243,7 @@ def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity checkpoint_data = { "pages_processed": pages_processed, "input_identity": input_identity, + "file_offset": file_offset, } with tmp.open("w", encoding="utf-8") as f: @@ -298,26 +303,31 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): output_path = output_dir / "wiki_clean.txt" checkpoint_path = _checkpoint_path(output_dir) + # Load checkpoint checkpoint = _load_checkpoint(checkpoint_path, input_path, output_path) - pages_already_done = checkpoint["pages_processed"] + pages_already_done = checkpoint.get("pages_processed", 0) + input_identity = checkpoint.get("input_identity") + file_offset = checkpoint.get("file_offset", 0) # ================== FIX FOR ISSUE #76 ================== - written_pages = count_written_pages(output_path) - - if written_pages > pages_already_done: - logger.warning( - "Output file ahead of checkpoint (%d > %d). Truncating...", - written_pages, - pages_already_done, - ) - truncate_output_to_pages(output_path, pages_already_done) + # Ensure output file matches checkpoint state using byte offset + if output_path.exists() and file_offset > 0: + with output_path.open("rb+") as f: + f.seek(0, os.SEEK_END) + current_size = f.tell() + + if current_size > file_offset: + logger.warning( + "Output file ahead of checkpoint (%d > %d). Truncating...", + current_size, + file_offset, + ) + f.truncate(file_offset) # ====================================================== write_mode = "a" if pages_already_done > 0 else "w" - # FIX: correct input identity usage - input_identity = _compute_input_identity(input_path) - + # Detect file type with open(input_path, "rb") as test_f: is_bz2 = test_f.read(3) == b"BZh" @@ -331,6 +341,10 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): context = ET.iterparse(f, events=("end",)) with open(output_path, write_mode, encoding="utf-8") as out: + # Move pointer to correct position when resuming + if write_mode == "a": + out.seek(0, os.SEEK_END) + for _, elem in context: if elem.tag.endswith("page"): pages_seen += 1 @@ -345,22 +359,28 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): cleaned = clean_wikitext(text_elem.text) if cleaned: out.write(cleaned + "\n\n") + out.flush() + file_offset = out.tell() # 🔥 Track exact byte position pages_written += 1 elem.clear() - # More frequent checkpointing (safer) - if pages_written % 100 == 0: - out.flush() - _save_checkpoint(checkpoint_path, pages_written, input_identity) + # Save checkpoint periodically + if pages_written % CHECKPOINT_INTERVAL == 0: + _save_checkpoint( + checkpoint_path, + pages_written, + input_identity, + file_offset, + ) except KeyboardInterrupt: - _save_checkpoint(checkpoint_path, pages_written, input_identity) + _save_checkpoint(checkpoint_path, pages_written, input_identity, file_offset) logger.warning("Interrupted by user after %d pages. Run again to resume.", pages_written) raise except Exception: - _save_checkpoint(checkpoint_path, pages_written, input_identity) + _save_checkpoint(checkpoint_path, pages_written, input_identity, file_offset) logger.error("Processing interrupted after %d pages. Run again to resume.", pages_written) raise @@ -375,7 +395,6 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): output_path, ) - # generate data manifest def generate_manifest(raw_path, processed_path): raw_path = Path(raw_path) From ccd48d7d59afe32ebf42e2a3907a1b4393f31e35 Mon Sep 17 00:00:00 2001 From: ANUBprad Date: Sat, 21 Mar 2026 13:12:36 +0530 Subject: [PATCH 3/4] Fix resume logic using file offsets and address review feedback --- openverifiablellm/utils.py | 51 +++++++++----------------------------- 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/openverifiablellm/utils.py b/openverifiablellm/utils.py index b7b9750..ccd00d1 100644 --- a/openverifiablellm/utils.py +++ b/openverifiablellm/utils.py @@ -187,7 +187,7 @@ def verify_merkle_proof(chunk_bytes: bytes, proof, merkle_root: str) -> bool: # extract clean wikipage from actual wikipage -CHECKPOINT_INTERVAL = 1_000 # Save checkpoint every N pages +CHECKPOINT_INTERVAL = 100 # Save checkpoint every N pages def _checkpoint_path(output_dir: Path) -> Path: @@ -228,11 +228,19 @@ def _load_checkpoint(checkpoint_path: Path, input_path: Path, output_path: Path) logger.info("Resuming from checkpoint: %d pages already processed", pages_processed) - return data + return { + "pages_processed": pages_processed, + "input_identity": stored_identity, + "file_offset": data.get("file_offset", 0), + } except Exception as e: logger.warning("Checkpoint invalid (%s) — starting fresh.", e) - return {"pages_processed": 0} + return { + "pages_processed": 0, + "input_identity": _compute_input_identity(input_path), + "file_offset": 0, + } def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity: str, file_offset: int,) -> None: @@ -257,41 +265,6 @@ def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity logger.warning("Failed to save checkpoint: %s", e) tmp.unlink(missing_ok=True) -def count_written_pages(output_path: Path) -> int: - """ - Count number of processed pages based on output file. - Assumes each page is separated by double newline. - """ - if not output_path.exists(): - return 0 - - with output_path.open("r", encoding="utf-8") as f: - content = f.read().strip() - - if not content: - return 0 - - return len(content.split("\n\n")) - - -def truncate_output_to_pages(output_path: Path, max_pages: int) -> None: - """ - Truncate output file to match checkpoint page count. - """ - with output_path.open("r", encoding="utf-8") as f: - content = f.read().strip() - - if not content: - return - - pages = content.split("\n\n") - - with output_path.open("w", encoding="utf-8") as f: - if pages[:max_pages]: - f.write("\n\n".join(pages[:max_pages]) + "\n\n") - else: - f.write("") - def extract_text_from_xml(input_path, *, write_manifest: bool = False): input_path = Path(input_path) @@ -342,7 +315,7 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): with open(output_path, write_mode, encoding="utf-8") as out: # Move pointer to correct position when resuming - if write_mode == "a": + if write_mode == "a" and output_path.exists(): out.seek(0, os.SEEK_END) for _, elem in context: From 2f13a4e6ae64899ff99e04418b92dcc7b08af1a1 Mon Sep 17 00:00:00 2001 From: ANUBprad Date: Sat, 21 Mar 2026 13:41:26 +0530 Subject: [PATCH 4/4] Fix file offset tracking using actual byte size instead of tell() --- openverifiablellm/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openverifiablellm/utils.py b/openverifiablellm/utils.py index ccd00d1..3207130 100644 --- a/openverifiablellm/utils.py +++ b/openverifiablellm/utils.py @@ -333,7 +333,7 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): if cleaned: out.write(cleaned + "\n\n") out.flush() - file_offset = out.tell() # 🔥 Track exact byte position + file_offset = output_path.stat().st_size # Track exact byte position pages_written += 1 elem.clear()