Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/pycrimson/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class PackageContext:
_packs: dict[str, PackMeta]
_paz_handle_cache: dict
_pack_group_whitelist: list[str] | None
_pthc: PackTextureHeaderCollection | None

def __init__(
self, base_directory: Path, pack_group_whitelist: list[str] | None = None
Expand All @@ -26,14 +27,18 @@ def __init__(
self._packs = {}
self._paz_handle_cache = {}
self._pack_group_whitelist = pack_group_whitelist
self._pthc = None

self._parse_pack_meta()
self._parse_texture_header_collection()

def _parse_texture_header_collection(self):
def _get_texture_header_collection(self) -> PackTextureHeaderCollection | None:
if self._pthc is not None:
return self._pthc

pthc_file = self._base_path / "meta" / "0.pathc"
if pthc_file.exists():
self._pthc = PackTextureHeaderCollection.from_file(pthc_file)
return self._pthc

def _parse_pack_meta(self):
papgt_file = self._base_path / "meta" / "0.papgt"
Expand Down Expand Up @@ -177,7 +182,9 @@ def get_file(self, path: str) -> bytes | None:
and is_dds_file
and entry.compressed_size != entry.uncompressed_size
):
header = self._pthc.get_file_header(path)
pthc = self._get_texture_header_collection()
assert pthc is not None
header = pthc.get_file_header(path)
data = self._handle_partial_texture(data, header)

return data
Expand Down
13 changes: 7 additions & 6 deletions src/pycrimson/_files/_pathc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
custom,
static_length,
)
from bier.EndianedBinaryIO import EndianedReaderIOBase, EndianedFileIO, EndianedBytesIO
from bier.EndianedBinaryIO import EndianedReaderIOBase, EndianedBytesIO

from .. import _crypto

Expand Down Expand Up @@ -86,13 +86,14 @@ def __init__(self, reader: EndianedReaderIOBase):

@classmethod
def from_file(cls, path: Path):
with EndianedFileIO(path, "rb") as f:
# load entire file before parsing; streaming tiny reads can be very slow on some filesystems
data = path.read_bytes()
with EndianedBytesIO(data) as f:
return cls(f)

def get_file_header(self, path: str) -> bytes:
checksum = _crypto.calculate_checksum(
f"/{path}" if not path.startswith("/") else path
)
normalized_path = f"/{path}" if not path.startswith("/") else path
checksum = _crypto.calculate_checksum(normalized_path)

entry = self._entries.get(checksum)
assert entry is not None
Expand All @@ -101,7 +102,7 @@ def get_file_header(self, path: str) -> bytes:
header = self._headers[entry.texture_header_index]
compressed_block_infos = entry.compressed_block_infos
else:
collision_entry = self._hash_collision_entries.get(path)
collision_entry = self._hash_collision_entries.get(normalized_path)
assert collision_entry is not None

header = self._headers[collision_entry.texture_header_index]
Expand Down
39 changes: 39 additions & 0 deletions tests/test_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import unittest
from pathlib import Path
from unittest.mock import patch

from pycrimson._context import PackageContext


class TestPackageContext(unittest.TestCase):
def test_texture_headers_are_loaded_lazily(self):
with patch.object(PackageContext, "_parse_pack_meta") as parse_pack_meta:
with patch(
"pycrimson._context.PackTextureHeaderCollection.from_file"
) as from_file:
PackageContext(Path("/game"))

parse_pack_meta.assert_called_once_with()
from_file.assert_not_called()

def test_texture_headers_are_cached_after_first_load(self):
ctx = PackageContext.__new__(PackageContext)
ctx._base_path = Path("/game")
ctx._pthc = None

fake_collection = object()
expected_path = Path("/game/meta/0.pathc")

with patch("pathlib.Path.exists", return_value=True):
with patch(
"pycrimson._context.PackTextureHeaderCollection.from_file",
return_value=fake_collection,
) as from_file:
self.assertIs(ctx._get_texture_header_collection(), fake_collection)
self.assertIs(ctx._get_texture_header_collection(), fake_collection)

from_file.assert_called_once_with(expected_path)


if __name__ == "__main__":
unittest.main()
58 changes: 58 additions & 0 deletions tests/test_pathc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import unittest
from types import SimpleNamespace

from pycrimson._crypto import calculate_checksum
from pycrimson._files._pathc import (
PackTextureHeaderCollection,
_PackTextureHeaderCollectionCollisionEntry,
_PackTextureHeaderCollectionEntry,
)


class TestPackTextureHeaderCollection(unittest.TestCase):
def _make_collection(self, normalized_path: str) -> PackTextureHeaderCollection:
checksum = calculate_checksum(normalized_path)

collection = PackTextureHeaderCollection.__new__(PackTextureHeaderCollection)
collection._header = SimpleNamespace(header_size=0x80)
collection._headers = [b"A" * 0x80, b"B" * 0x80]
collection._entries = {
checksum: _PackTextureHeaderCollectionEntry(
texture_header_index=0xFFFF,
collision_start_index=0,
collision_end_index=0,
compressed_block_infos=b"\x00" * 16,
)
}
collection._hash_collision_entries = {
normalized_path: _PackTextureHeaderCollectionCollisionEntry(
filename_offset=0,
texture_header_index=1,
unknown0=0,
compressed_block_infos=b"\x00" * 16,
)
}
return collection

def test_collision_lookup_normalizes_leading_slash(self):
path = "ui/texture/image/worldmap/cd_worldmap_road_sdf_32768x32768_6_8.dds"
normalized_path = f"/{path}"
collection = self._make_collection(normalized_path)

header = collection.get_file_header(path)

self.assertEqual(header, b"B" * 0x80)

def test_collision_lookup_accepts_already_normalized_path(self):
normalized_path = (
"/ui/texture/image/worldmap/cd_worldmap_road_sdf_32768x32768_6_8.dds"
)
collection = self._make_collection(normalized_path)

header = collection.get_file_header(normalized_path)

self.assertEqual(header, b"B" * 0x80)


if __name__ == "__main__":
unittest.main()