From a82ab6a24c23d97bf65b39b034a1253c0eeee735 Mon Sep 17 00:00:00 2001 From: Ivan Karpan Date: Sat, 18 Apr 2026 00:09:57 +0100 Subject: [PATCH] defer pathc loading until partial dds extraction and buffer pathc reads --- src/pycrimson/_context.py | 13 ++++++-- src/pycrimson/_files/_pathc.py | 13 ++++---- tests/test_context.py | 39 +++++++++++++++++++++++ tests/test_pathc.py | 58 ++++++++++++++++++++++++++++++++++ 4 files changed, 114 insertions(+), 9 deletions(-) create mode 100644 tests/test_context.py create mode 100644 tests/test_pathc.py diff --git a/src/pycrimson/_context.py b/src/pycrimson/_context.py index cbbfb9f..d3931c7 100644 --- a/src/pycrimson/_context.py +++ b/src/pycrimson/_context.py @@ -18,6 +18,7 @@ class PackageContext: _packs: dict[str, PackMeta] _paz_handle_cache: dict _pack_group_whitelist: list[str] | None + _pthc: PackTextureHeaderCollection | None def __init__( self, base_directory: Path, pack_group_whitelist: list[str] | None = None @@ -26,14 +27,18 @@ def __init__( self._packs = {} self._paz_handle_cache = {} self._pack_group_whitelist = pack_group_whitelist + self._pthc = None self._parse_pack_meta() - self._parse_texture_header_collection() - def _parse_texture_header_collection(self): + def _get_texture_header_collection(self) -> PackTextureHeaderCollection | None: + if self._pthc is not None: + return self._pthc + pthc_file = self._base_path / "meta" / "0.pathc" if pthc_file.exists(): self._pthc = PackTextureHeaderCollection.from_file(pthc_file) + return self._pthc def _parse_pack_meta(self): papgt_file = self._base_path / "meta" / "0.papgt" @@ -177,7 +182,9 @@ def get_file(self, path: str) -> bytes | None: and is_dds_file and entry.compressed_size != entry.uncompressed_size ): - header = self._pthc.get_file_header(path) + pthc = self._get_texture_header_collection() + assert pthc is not None + header = pthc.get_file_header(path) data = self._handle_partial_texture(data, header) return data diff --git a/src/pycrimson/_files/_pathc.py b/src/pycrimson/_files/_pathc.py index fae3a81..927faa2 100644 --- a/src/pycrimson/_files/_pathc.py +++ b/src/pycrimson/_files/_pathc.py @@ -11,7 +11,7 @@ custom, static_length, ) -from bier.EndianedBinaryIO import EndianedReaderIOBase, EndianedFileIO, EndianedBytesIO +from bier.EndianedBinaryIO import EndianedReaderIOBase, EndianedBytesIO from .. import _crypto @@ -86,13 +86,14 @@ def __init__(self, reader: EndianedReaderIOBase): @classmethod def from_file(cls, path: Path): - with EndianedFileIO(path, "rb") as f: + # load entire file before parsing; streaming tiny reads can be very slow on some filesystems + data = path.read_bytes() + with EndianedBytesIO(data) as f: return cls(f) def get_file_header(self, path: str) -> bytes: - checksum = _crypto.calculate_checksum( - f"/{path}" if not path.startswith("/") else path - ) + normalized_path = f"/{path}" if not path.startswith("/") else path + checksum = _crypto.calculate_checksum(normalized_path) entry = self._entries.get(checksum) assert entry is not None @@ -101,7 +102,7 @@ def get_file_header(self, path: str) -> bytes: header = self._headers[entry.texture_header_index] compressed_block_infos = entry.compressed_block_infos else: - collision_entry = self._hash_collision_entries.get(path) + collision_entry = self._hash_collision_entries.get(normalized_path) assert collision_entry is not None header = self._headers[collision_entry.texture_header_index] diff --git a/tests/test_context.py b/tests/test_context.py new file mode 100644 index 0000000..1bc2261 --- /dev/null +++ b/tests/test_context.py @@ -0,0 +1,39 @@ +import unittest +from pathlib import Path +from unittest.mock import patch + +from pycrimson._context import PackageContext + + +class TestPackageContext(unittest.TestCase): + def test_texture_headers_are_loaded_lazily(self): + with patch.object(PackageContext, "_parse_pack_meta") as parse_pack_meta: + with patch( + "pycrimson._context.PackTextureHeaderCollection.from_file" + ) as from_file: + PackageContext(Path("/game")) + + parse_pack_meta.assert_called_once_with() + from_file.assert_not_called() + + def test_texture_headers_are_cached_after_first_load(self): + ctx = PackageContext.__new__(PackageContext) + ctx._base_path = Path("/game") + ctx._pthc = None + + fake_collection = object() + expected_path = Path("/game/meta/0.pathc") + + with patch("pathlib.Path.exists", return_value=True): + with patch( + "pycrimson._context.PackTextureHeaderCollection.from_file", + return_value=fake_collection, + ) as from_file: + self.assertIs(ctx._get_texture_header_collection(), fake_collection) + self.assertIs(ctx._get_texture_header_collection(), fake_collection) + + from_file.assert_called_once_with(expected_path) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_pathc.py b/tests/test_pathc.py new file mode 100644 index 0000000..8331c73 --- /dev/null +++ b/tests/test_pathc.py @@ -0,0 +1,58 @@ +import unittest +from types import SimpleNamespace + +from pycrimson._crypto import calculate_checksum +from pycrimson._files._pathc import ( + PackTextureHeaderCollection, + _PackTextureHeaderCollectionCollisionEntry, + _PackTextureHeaderCollectionEntry, +) + + +class TestPackTextureHeaderCollection(unittest.TestCase): + def _make_collection(self, normalized_path: str) -> PackTextureHeaderCollection: + checksum = calculate_checksum(normalized_path) + + collection = PackTextureHeaderCollection.__new__(PackTextureHeaderCollection) + collection._header = SimpleNamespace(header_size=0x80) + collection._headers = [b"A" * 0x80, b"B" * 0x80] + collection._entries = { + checksum: _PackTextureHeaderCollectionEntry( + texture_header_index=0xFFFF, + collision_start_index=0, + collision_end_index=0, + compressed_block_infos=b"\x00" * 16, + ) + } + collection._hash_collision_entries = { + normalized_path: _PackTextureHeaderCollectionCollisionEntry( + filename_offset=0, + texture_header_index=1, + unknown0=0, + compressed_block_infos=b"\x00" * 16, + ) + } + return collection + + def test_collision_lookup_normalizes_leading_slash(self): + path = "ui/texture/image/worldmap/cd_worldmap_road_sdf_32768x32768_6_8.dds" + normalized_path = f"/{path}" + collection = self._make_collection(normalized_path) + + header = collection.get_file_header(path) + + self.assertEqual(header, b"B" * 0x80) + + def test_collision_lookup_accepts_already_normalized_path(self): + normalized_path = ( + "/ui/texture/image/worldmap/cd_worldmap_road_sdf_32768x32768_6_8.dds" + ) + collection = self._make_collection(normalized_path) + + header = collection.get_file_header(normalized_path) + + self.assertEqual(header, b"B" * 0x80) + + +if __name__ == "__main__": + unittest.main()