From 087b01a328044886aae1560d70632c4eb0eb59cd Mon Sep 17 00:00:00 2001 From: AivanF Date: Fri, 27 Feb 2026 16:41:28 +0300 Subject: [PATCH] MockGoogleAPIClient --- gslides_api/mock/__init__.py | 8 + gslides_api/mock/batch_processor.py | 258 ++++++++++++++++ gslides_api/mock/client.py | 327 ++++++++++++++++++++ gslides_api/mock/snapshots.py | 72 +++++ tests/test_mock_client.py | 451 ++++++++++++++++++++++++++++ 5 files changed, 1116 insertions(+) create mode 100644 gslides_api/mock/__init__.py create mode 100644 gslides_api/mock/batch_processor.py create mode 100644 gslides_api/mock/client.py create mode 100644 gslides_api/mock/snapshots.py create mode 100644 tests/test_mock_client.py diff --git a/gslides_api/mock/__init__.py b/gslides_api/mock/__init__.py new file mode 100644 index 0000000..74e0881 --- /dev/null +++ b/gslides_api/mock/__init__.py @@ -0,0 +1,8 @@ +from gslides_api.mock.client import MockGoogleAPIClient +from gslides_api.mock.snapshots import dump_presentation_snapshot, load_presentation_snapshot + +__all__ = [ + "MockGoogleAPIClient", + "dump_presentation_snapshot", + "load_presentation_snapshot", +] diff --git a/gslides_api/mock/batch_processor.py b/gslides_api/mock/batch_processor.py new file mode 100644 index 0000000..0db0e06 --- /dev/null +++ b/gslides_api/mock/batch_processor.py @@ -0,0 +1,258 @@ +"""Batch request processor for MockGoogleAPIClient. + +Processes Google Slides API batch request dicts and applies mutations +to in-memory presentation state. Returns reply dicts matching the +real Google API response format. +""" + +import copy +import logging +from typing import Any, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Request types that create objects and return objectId in the reply +_CREATE_REQUEST_TYPES = { + "createSlide", + "createShape", + "createImage", + "createTable", + "createLine", + "createVideo", + "createSheetsChart", +} + + +def process_batch_requests( + requests: List[List[Dict[str, Any]]], + presentation_id: str, + presentations: Dict[str, dict], + generate_id: Callable[[], str], +) -> Dict[str, Any]: + """Process a list of batch request dicts against in-memory state. + + Args: + requests: List of request lists from GSlidesAPIRequest.to_request(). + Each inner list typically contains one dict like [{"createSlide": {...}}]. + presentation_id: The target presentation ID. + presentations: The shared in-memory presentations store. + generate_id: Callable that returns a new unique ID string. + + Returns: + A dict matching Google Slides API batchUpdate response format: + {"presentationId": "...", "replies": [{...}, ...]} + """ + presentation = presentations.get(presentation_id) + if presentation is None: + raise KeyError( + f"Presentation '{presentation_id}' not found in mock store" + ) + + replies = [] + for request_group in requests: + for request_dict in request_group: + request_type = list(request_dict.keys())[0] + request_body = request_dict[request_type] + reply = _process_single_request( + request_type, request_body, presentation, generate_id + ) + replies.append(reply) + + return {"presentationId": presentation_id, "replies": replies} + + +def _process_single_request( + request_type: str, + body: Dict[str, Any], + presentation: dict, + generate_id: Callable[[], str], +) -> Dict[str, Any]: + """Process a single request and return the reply dict.""" + handler = _HANDLERS.get(request_type) + if handler is not None: + return handler(body, presentation, generate_id) + + # Passthrough: record but don't mutate state + return {} + + +def _handle_create_slide( + body: Dict[str, Any], presentation: dict, generate_id: Callable[[], str] +) -> Dict[str, Any]: + object_id = body.get("objectId") or generate_id() + notes_id = generate_id() + speaker_notes_id = generate_id() + + slide = { + "objectId": object_id, + "pageElements": [], + "slideProperties": { + "layoutObjectId": body.get("slideLayoutReference", {}).get("layoutId"), + "masterObjectId": None, + "notesPage": { + "objectId": notes_id, + "pageElements": [], + "notesProperties": {"speakerNotesObjectId": speaker_notes_id}, + "pageType": "NOTES", + }, + }, + "pageType": "SLIDE", + } + + slides = presentation.setdefault("slides", []) + insertion_index = body.get("insertionIndex") + if insertion_index is not None and insertion_index < len(slides): + slides.insert(insertion_index, slide) + else: + slides.append(slide) + + return {"createSlide": {"objectId": object_id}} + + +def _handle_create_element( + element_type: str, + body: Dict[str, Any], + presentation: dict, + generate_id: Callable[[], str], +) -> Dict[str, Any]: + """Generic handler for element creation requests (shape, image, table, etc.).""" + object_id = body.get("objectId") or generate_id() + + element = { + "objectId": object_id, + "transform": body.get("elementProperties", {}).get("transform", {}), + "size": body.get("elementProperties", {}).get("size", {}), + } + + # Find the target page and add the element + page_id = body.get("elementProperties", {}).get("pageObjectId") + if page_id: + page = _find_page(presentation, page_id) + if page is not None: + page.setdefault("pageElements", []).append(element) + + # The API key in the reply matches the request type + return {element_type: {"objectId": object_id}} + + +def _handle_create_shape(body, presentation, generate_id): + return _handle_create_element("createShape", body, presentation, generate_id) + + +def _handle_create_image(body, presentation, generate_id): + return _handle_create_element("createImage", body, presentation, generate_id) + + +def _handle_create_table(body, presentation, generate_id): + return _handle_create_element("createTable", body, presentation, generate_id) + + +def _handle_create_line(body, presentation, generate_id): + return _handle_create_element("createLine", body, presentation, generate_id) + + +def _handle_create_video(body, presentation, generate_id): + return _handle_create_element("createVideo", body, presentation, generate_id) + + +def _handle_duplicate_object( + body: Dict[str, Any], presentation: dict, generate_id: Callable[[], str] +) -> Dict[str, Any]: + source_id = body["objectId"] + id_mapping = body.get("objectIds") or {} + + # Try to find as a slide first + slides = presentation.get("slides", []) + source_slide = next((s for s in slides if s.get("objectId") == source_id), None) + + if source_slide is not None: + new_slide = copy.deepcopy(source_slide) + new_id = id_mapping.get(source_id) or generate_id() + new_slide["objectId"] = new_id + + # Remap element IDs within the duplicated slide + for element in new_slide.get("pageElements", []): + old_element_id = element.get("objectId") + if old_element_id and old_element_id in id_mapping: + element["objectId"] = id_mapping[old_element_id] + elif old_element_id: + element["objectId"] = generate_id() + + # Insert after the source slide + source_index = slides.index(source_slide) + slides.insert(source_index + 1, new_slide) + return {"duplicateObject": {"objectId": new_id}} + + # Try to find as a page element + for slide in slides: + for element in slide.get("pageElements", []): + if element.get("objectId") == source_id: + new_element = copy.deepcopy(element) + new_id = id_mapping.get(source_id) or generate_id() + new_element["objectId"] = new_id + slide["pageElements"].append(new_element) + return {"duplicateObject": {"objectId": new_id}} + + # Object not found — still return a reply with a generated ID + new_id = id_mapping.get(source_id) or generate_id() + return {"duplicateObject": {"objectId": new_id}} + + +def _handle_delete_object( + body: Dict[str, Any], presentation: dict, generate_id: Callable[[], str] +) -> Dict[str, Any]: + target_id = body["objectId"] + + # Try removing as a slide + slides = presentation.get("slides", []) + presentation["slides"] = [s for s in slides if s.get("objectId") != target_id] + + # Try removing as a page element from all slides + for slide in presentation.get("slides", []): + elements = slide.get("pageElements", []) + slide["pageElements"] = [ + e for e in elements if e.get("objectId") != target_id + ] + + return {} + + +def _handle_update_slides_position( + body: Dict[str, Any], presentation: dict, generate_id: Callable[[], str] +) -> Dict[str, Any]: + slide_ids = set(body.get("slideObjectIds", [])) + insertion_index = body.get("insertionIndex", 0) + + slides = presentation.get("slides", []) + moving = [s for s in slides if s.get("objectId") in slide_ids] + remaining = [s for s in slides if s.get("objectId") not in slide_ids] + + # Clamp insertion index + insertion_index = min(insertion_index, len(remaining)) + presentation["slides"] = ( + remaining[:insertion_index] + moving + remaining[insertion_index:] + ) + return {} + + +def _find_page(presentation: dict, page_id: str) -> Optional[dict]: + """Find a page (slide, layout, or master) by its objectId.""" + for page_list_key in ("slides", "layouts", "masters"): + for page in presentation.get(page_list_key, []): + if page.get("objectId") == page_id: + return page + return None + + +# Handler registry +_HANDLERS = { + "createSlide": _handle_create_slide, + "createShape": _handle_create_shape, + "createImage": _handle_create_image, + "createTable": _handle_create_table, + "createLine": _handle_create_line, + "createVideo": _handle_create_video, + "duplicateObject": _handle_duplicate_object, + "deleteObject": _handle_delete_object, + "updateSlidesPosition": _handle_update_slides_position, +} diff --git a/gslides_api/mock/client.py b/gslides_api/mock/client.py new file mode 100644 index 0000000..3ea0b5e --- /dev/null +++ b/gslides_api/mock/client.py @@ -0,0 +1,327 @@ +"""Mock Google API client for integration testing without real credentials. + +Provides MockGoogleAPIClient, a subclass of GoogleAPIClient that stores +presentations and Drive files in memory. Drop-in replacement wherever +GoogleAPIClient is accepted. + +Usage: + from gslides_api.mock import MockGoogleAPIClient + from gslides_api import Presentation + + mock = MockGoogleAPIClient() + pres = Presentation.create_blank("Test", api_client=mock) + loaded = Presentation.from_id(pres.presentationId, api_client=mock) +""" + +import copy +import logging +import os +from typing import Any, Dict, Optional + +from gslides_api.client import GoogleAPIClient +from gslides_api.domain.domain import ThumbnailProperties +from gslides_api.mock.batch_processor import process_batch_requests +from gslides_api.request.parent import GSlidesAPIRequest +from gslides_api.request.request import DeleteObjectRequest, DuplicateObjectRequest +from gslides_api.response import ImageThumbnail + +logger = logging.getLogger(__name__) + +# Default page size matching Google Slides default (widescreen 16:9) +_DEFAULT_PAGE_SIZE = { + "width": {"magnitude": 9144000, "unit": "EMU"}, + "height": {"magnitude": 5143500, "unit": "EMU"}, +} + + +class MockGoogleAPIClient(GoogleAPIClient): + """In-memory mock of GoogleAPIClient for testing. + + Stores presentations and Drive files in memory. Supports all + GoogleAPIClient methods without requiring Google credentials. + + The batch system (batch_update / flush_batch_update) works identically + to the real client — requests are queued and flushed — but flush + processes them against in-memory state instead of calling Google. + + Extra test helpers: + seed_presentation(id, json_dict) — preload a presentation snapshot + get_batch_log() — inspect all processed batch requests + """ + + def __init__( + self, + auto_flush: bool = True, + *, + _shared_state: Optional[dict] = None, + ) -> None: + # Initialize parent's batch state and attributes. + # Pass n_backoffs=0 so the backoff decorator is a no-op if somehow invoked. + super().__init__(auto_flush=auto_flush, initial_wait_s=0, n_backoffs=0) + + if _shared_state is not None: + # Child client: share stores with parent + self._state = _shared_state + else: + self._state = { + "presentations": {}, # id -> presentation JSON dict + "files": {}, # id -> file metadata dict + "id_counter": 0, + "batch_log": [], # list of (presentation_id, request_dicts) + } + + def _generate_id(self) -> str: + self._state["id_counter"] += 1 + return f"mock_id_{self._state['id_counter']}" + + # ── Properties ────────────────────────────────────────────────────── + + @property + def is_initialized(self) -> bool: + return True + + # ── Credential methods (no-ops) ───────────────────────────────────── + + def set_credentials(self, credentials=None) -> None: + pass + + def initialize_credentials(self, credential_location: str) -> None: + pass + + # ── Child client ──────────────────────────────────────────────────── + + def create_child_client(self, auto_flush: bool = False) -> "MockGoogleAPIClient": + return MockGoogleAPIClient( + auto_flush=auto_flush, + _shared_state=self._state, + ) + + # ── Batch system ──────────────────────────────────────────────────── + # batch_update() is inherited from GoogleAPIClient — it queues + # requests and calls flush_batch_update() when appropriate. + + def flush_batch_update(self) -> Dict[str, Any]: + if not self.pending_batch_requests: + return {} + + re_requests = [r.to_request() for r in self.pending_batch_requests] + presentation_id = self.pending_presentation_id + + # Log for test assertions + self._state["batch_log"].append( + {"presentation_id": presentation_id, "requests": re_requests} + ) + + result = process_batch_requests( + requests=re_requests, + presentation_id=presentation_id, + presentations=self._state["presentations"], + generate_id=self._generate_id, + ) + + self.pending_batch_requests = [] + self.pending_presentation_id = None + return result + + # ── Slides API methods ────────────────────────────────────────────── + + def create_presentation(self, config: dict) -> str: + self.flush_batch_update() + pres_id = self._generate_id() + + # Build a minimal but valid presentation JSON + default_slide_id = self._generate_id() + notes_id = self._generate_id() + speaker_notes_id = self._generate_id() + + presentation = { + "presentationId": pres_id, + "title": config.get("title", "Untitled"), + "pageSize": config.get("pageSize", copy.deepcopy(_DEFAULT_PAGE_SIZE)), + "slides": config.get("slides", [ + { + "objectId": default_slide_id, + "pageElements": [], + "slideProperties": { + "layoutObjectId": None, + "masterObjectId": None, + "notesPage": { + "objectId": notes_id, + "pageElements": [], + "notesProperties": { + "speakerNotesObjectId": speaker_notes_id + }, + "pageType": "NOTES", + }, + }, + "pageType": "SLIDE", + } + ]), + "layouts": config.get("layouts", []), + "masters": config.get("masters", []), + } + + self._state["presentations"][pres_id] = presentation + return pres_id + + def get_presentation_json(self, presentation_id: str) -> Dict[str, Any]: + self.flush_batch_update() + presentation = self._state["presentations"].get(presentation_id) + if presentation is None: + raise KeyError( + f"Presentation '{presentation_id}' not found in mock store" + ) + return copy.deepcopy(presentation) + + def get_slide_json(self, presentation_id: str, slide_id: str) -> Dict[str, Any]: + self.flush_batch_update() + presentation = self._state["presentations"].get(presentation_id) + if presentation is None: + raise KeyError( + f"Presentation '{presentation_id}' not found in mock store" + ) + + for slide in presentation.get("slides", []): + if slide.get("objectId") == slide_id: + return copy.deepcopy(slide) + + raise KeyError( + f"Slide '{slide_id}' not found in presentation '{presentation_id}'" + ) + + def duplicate_object( + self, + object_id: str, + presentation_id: str, + id_map: Dict[str, str] | None = None, + ) -> str: + request = DuplicateObjectRequest(objectId=object_id, objectIds=id_map) + + if id_map is not None and object_id in id_map: + self.batch_update([request], presentation_id, flush=False) + return id_map[object_id] + + out = self.batch_update([request], presentation_id, flush=True) + new_object_id = out["replies"][-1]["duplicateObject"]["objectId"] + return new_object_id + + def delete_object(self, object_id: str, presentation_id: str) -> None: + request = DeleteObjectRequest(objectId=object_id) + self.batch_update([request], presentation_id, flush=False) + + def slide_thumbnail( + self, presentation_id: str, slide_id: str, properties: ThumbnailProperties + ) -> ImageThumbnail: + self.flush_batch_update() + # Verify the presentation and slide exist + self.get_slide_json(presentation_id, slide_id) + return ImageThumbnail( + contentUrl=f"https://mock.test/thumbnail/{presentation_id}/{slide_id}", + width=1600, + height=900, + ) + + # ── Drive API methods ─────────────────────────────────────────────── + + def copy_presentation(self, presentation_id, copy_title, folder_id=None): + self.flush_batch_update() + source = self._state["presentations"].get(presentation_id) + if source is None: + raise KeyError( + f"Presentation '{presentation_id}' not found in mock store" + ) + + new_id = self._generate_id() + new_pres = copy.deepcopy(source) + new_pres["presentationId"] = new_id + new_pres["title"] = copy_title + self._state["presentations"][new_id] = new_pres + + file_meta = {"id": new_id, "name": copy_title} + if folder_id: + file_meta["parents"] = [folder_id] + self._state["files"][new_id] = file_meta + + return file_meta + + def create_folder(self, folder_name, parent_folder_id=None, ignore_existing=False): + self.flush_batch_update() + + if ignore_existing: + for file_meta in self._state["files"].values(): + if ( + file_meta.get("name") == folder_name + and file_meta.get("mimeType") == "application/vnd.google-apps.folder" + ): + parent_match = ( + parent_folder_id is None + or parent_folder_id in file_meta.get("parents", []) + ) + if parent_match: + return {"id": file_meta["id"], "name": file_meta["name"]} + + folder_id = self._generate_id() + file_meta = { + "id": folder_id, + "name": folder_name, + "mimeType": "application/vnd.google-apps.folder", + } + if parent_folder_id: + file_meta["parents"] = [parent_folder_id] + self._state["files"][folder_id] = file_meta + return {"id": folder_id, "name": folder_name} + + def delete_file(self, file_id): + self.flush_batch_update() + self._state["files"].pop(file_id, None) + self._state["presentations"].pop(file_id, None) + return {} + + def upload_image_to_drive(self, image_path, folder_id: str | None = None) -> str: + supported_formats = {".png", ".jpg", ".jpeg", ".gif"} + ext = os.path.splitext(image_path)[1].lower() + if ext not in supported_formats: + raise ValueError( + f"Unsupported image format '{ext}'. " + f"Supported formats are: {', '.join(supported_formats)}" + ) + + file_id = self._generate_id() + file_meta = { + "id": file_id, + "name": os.path.basename(image_path), + "mimeType": f"image/{ext.lstrip('.')}", + } + if folder_id: + file_meta["parents"] = [folder_id] + self._state["files"][file_id] = file_meta + return f"https://drive.google.com/uc?id={file_id}" + + # ── Test helpers ──────────────────────────────────────────────────── + + def seed_presentation(self, presentation_id: str, json_dict: dict) -> None: + """Load a presentation JSON dict into the mock store. + + Useful for seeding the mock with a snapshot captured from a real + Google Slides API response. + + Args: + presentation_id: The ID to use for storage (overrides any + presentationId in the dict). + json_dict: A presentation JSON dict as returned by + get_presentation_json() on a real client. + """ + data = copy.deepcopy(json_dict) + data["presentationId"] = presentation_id + self._state["presentations"][presentation_id] = data + + def get_batch_log(self) -> list: + """Return the log of all processed batch requests. + + Each entry is a dict with: + - "presentation_id": str + - "requests": the raw request dicts sent to batchUpdate + + Useful for asserting that the correct API requests were generated. + """ + return list(self._state["batch_log"]) diff --git a/gslides_api/mock/snapshots.py b/gslides_api/mock/snapshots.py new file mode 100644 index 0000000..44c67c7 --- /dev/null +++ b/gslides_api/mock/snapshots.py @@ -0,0 +1,72 @@ +"""Utilities for saving and loading presentation snapshots. + +Snapshots are JSON files captured from real Google Slides API responses. +They can be loaded into a MockGoogleAPIClient for reproducible testing. + +Usage: + # Capture a snapshot from a real presentation: + from gslides_api import GoogleAPIClient + from gslides_api.mock.snapshots import dump_presentation_snapshot + + client = GoogleAPIClient() + client.initialize_credentials("/path/to/creds") + dump_presentation_snapshot(client, "real-presentation-id", "tests/snapshots/my_pres.json") + + # Load into a mock for testing: + from gslides_api.mock import MockGoogleAPIClient, load_presentation_snapshot + + mock = MockGoogleAPIClient() + load_presentation_snapshot(mock, "tests/snapshots/my_pres.json") +""" + +import json +from typing import Optional + +from gslides_api.client import GoogleAPIClient +from gslides_api.mock.client import MockGoogleAPIClient + + +def dump_presentation_snapshot( + client: GoogleAPIClient, + presentation_id: str, + output_path: str, +) -> None: + """Fetch a presentation from Google and save its JSON to a file. + + Args: + client: An initialized GoogleAPIClient (real, with credentials). + presentation_id: The ID of the presentation to snapshot. + output_path: File path to write the JSON to. + """ + data = client.get_presentation_json(presentation_id) + with open(output_path, "w") as f: + json.dump(data, f, indent=2) + + +def load_presentation_snapshot( + mock_client: MockGoogleAPIClient, + snapshot_path: str, + presentation_id: Optional[str] = None, +) -> str: + """Load a JSON snapshot file into a MockGoogleAPIClient. + + Args: + mock_client: The mock client to load the snapshot into. + snapshot_path: Path to the JSON snapshot file. + presentation_id: Override the presentation ID. If None, uses the + presentationId from the snapshot JSON. + + Returns: + The presentation ID used for storage. + """ + with open(snapshot_path) as f: + data = json.load(f) + + pres_id = presentation_id or data.get("presentationId") + if pres_id is None: + raise ValueError( + "No presentation_id provided and snapshot JSON has no 'presentationId' field" + ) + + mock_client.seed_presentation(pres_id, data) + return pres_id diff --git a/tests/test_mock_client.py b/tests/test_mock_client.py new file mode 100644 index 0000000..91cb1de --- /dev/null +++ b/tests/test_mock_client.py @@ -0,0 +1,451 @@ +"""Tests for MockGoogleAPIClient.""" + +import json +import os +import tempfile + +import pytest + +from gslides_api.mock import ( + MockGoogleAPIClient, + dump_presentation_snapshot, + load_presentation_snapshot, +) +from gslides_api.mock.batch_processor import process_batch_requests +from gslides_api.presentation import Presentation +from gslides_api.request.request import ( + CreateSlideRequest, + DeleteObjectRequest, + DuplicateObjectRequest, + InsertTextRequest, + UpdateTextStyleRequest, +) + + +# ── Basic client operations ───────────────────────────────────────────── + + +class TestMockClientBasics: + def test_is_initialized(self): + mock = MockGoogleAPIClient() + assert mock.is_initialized is True + + def test_set_credentials_noop(self): + mock = MockGoogleAPIClient() + mock.set_credentials(None) + assert mock.is_initialized is True + + def test_initialize_credentials_noop(self): + mock = MockGoogleAPIClient() + mock.initialize_credentials("/nonexistent/path") + assert mock.is_initialized is True + + def test_create_presentation(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test Presentation"}) + assert pres_id is not None + assert isinstance(pres_id, str) + + def test_get_presentation_json(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "My Deck"}) + data = mock.get_presentation_json(pres_id) + assert data["presentationId"] == pres_id + assert data["title"] == "My Deck" + assert len(data["slides"]) == 1 # default blank slide + + def test_get_presentation_not_found(self): + mock = MockGoogleAPIClient() + with pytest.raises(KeyError, match="not found"): + mock.get_presentation_json("nonexistent") + + def test_get_slide_json(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + pres = mock.get_presentation_json(pres_id) + slide_id = pres["slides"][0]["objectId"] + + slide = mock.get_slide_json(pres_id, slide_id) + assert slide["objectId"] == slide_id + + def test_get_slide_not_found(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + with pytest.raises(KeyError, match="not found"): + mock.get_slide_json(pres_id, "nonexistent_slide") + + def test_create_presentation_returns_unique_ids(self): + mock = MockGoogleAPIClient() + id1 = mock.create_presentation({"title": "A"}) + id2 = mock.create_presentation({"title": "B"}) + assert id1 != id2 + + +# ── Drive operations ──────────────────────────────────────────────────── + + +class TestMockDriveOperations: + def test_copy_presentation(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Original"}) + result = mock.copy_presentation(pres_id, "Copy of Original") + + assert "id" in result + assert result["name"] == "Copy of Original" + + # The copy should be loadable + copy_data = mock.get_presentation_json(result["id"]) + assert copy_data["title"] == "Copy of Original" + + def test_copy_presentation_not_found(self): + mock = MockGoogleAPIClient() + with pytest.raises(KeyError): + mock.copy_presentation("nonexistent", "copy") + + def test_create_folder(self): + mock = MockGoogleAPIClient() + result = mock.create_folder("My Folder") + assert "id" in result + assert result["name"] == "My Folder" + + def test_create_folder_ignore_existing(self): + mock = MockGoogleAPIClient() + result1 = mock.create_folder("Shared") + result2 = mock.create_folder("Shared", ignore_existing=True) + assert result1["id"] == result2["id"] + + def test_create_folder_no_ignore(self): + mock = MockGoogleAPIClient() + result1 = mock.create_folder("Shared") + result2 = mock.create_folder("Shared", ignore_existing=False) + assert result1["id"] != result2["id"] + + def test_delete_file(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "To Delete"}) + mock.delete_file(pres_id) + with pytest.raises(KeyError): + mock.get_presentation_json(pres_id) + + def test_upload_image_to_drive(self): + mock = MockGoogleAPIClient() + # Just checks the URL format and validation — doesn't need a real file + url = mock.upload_image_to_drive("/fake/path/image.png") + assert url.startswith("https://drive.google.com/uc?id=") + + def test_upload_image_unsupported_format(self): + mock = MockGoogleAPIClient() + with pytest.raises(ValueError, match="Unsupported"): + mock.upload_image_to_drive("/fake/path/file.bmp") + + +# ── Batch operations ──────────────────────────────────────────────────── + + +class TestMockBatchOperations: + def test_create_slide_via_batch(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + + request = CreateSlideRequest() + result = mock.batch_update([request], pres_id) + + assert "replies" in result + new_slide_id = result["replies"][0]["createSlide"]["objectId"] + assert new_slide_id is not None + + # Verify the slide was added + pres = mock.get_presentation_json(pres_id) + slide_ids = [s["objectId"] for s in pres["slides"]] + assert new_slide_id in slide_ids + + def test_duplicate_object(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + pres = mock.get_presentation_json(pres_id) + slide_id = pres["slides"][0]["objectId"] + + new_id = mock.duplicate_object(slide_id, pres_id) + assert new_id != slide_id + + # Should now have 2 slides + pres = mock.get_presentation_json(pres_id) + assert len(pres["slides"]) == 2 + + def test_duplicate_object_with_id_map(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + pres = mock.get_presentation_json(pres_id) + slide_id = pres["slides"][0]["objectId"] + + id_map = {slide_id: "my_custom_id"} + new_id = mock.duplicate_object(slide_id, pres_id, id_map=id_map) + assert new_id == "my_custom_id" + + def test_delete_object(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + + # Add a second slide, then delete the first + request = CreateSlideRequest() + result = mock.batch_update([request], pres_id) + new_slide_id = result["replies"][0]["createSlide"]["objectId"] + + pres = mock.get_presentation_json(pres_id) + original_slide_id = pres["slides"][0]["objectId"] + + mock.delete_object(original_slide_id, pres_id) + mock.flush_batch_update() + + pres = mock.get_presentation_json(pres_id) + assert len(pres["slides"]) == 1 + assert pres["slides"][0]["objectId"] == new_slide_id + + def test_passthrough_requests_recorded_in_log(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + + request = InsertTextRequest( + objectId="some_element", text="Hello", insertionIndex=0 + ) + mock.batch_update([request], pres_id) + + log = mock.get_batch_log() + assert len(log) > 0 + # Find the entry with our InsertText request + last_entry = log[-1] + assert last_entry["presentation_id"] == pres_id + + def test_auto_flush_false(self): + mock = MockGoogleAPIClient(auto_flush=False) + pres_id = mock.create_presentation({"title": "Test"}) + + request = CreateSlideRequest() + result = mock.batch_update([request], pres_id) + # With auto_flush=False, batch_update returns empty dict + assert result == {} + assert len(mock.pending_batch_requests) == 1 + + # Manual flush + result = mock.flush_batch_update() + assert "replies" in result + assert len(mock.pending_batch_requests) == 0 + + +# ── Child client ──────────────────────────────────────────────────────── + + +class TestMockChildClient: + def test_child_shares_state(self): + parent = MockGoogleAPIClient() + pres_id = parent.create_presentation({"title": "Shared"}) + + child = parent.create_child_client(auto_flush=False) + assert isinstance(child, MockGoogleAPIClient) + + # Child can see parent's presentations + data = child.get_presentation_json(pres_id) + assert data["title"] == "Shared" + + def test_child_has_isolated_batch_state(self): + parent = MockGoogleAPIClient(auto_flush=False) + pres_id = parent.create_presentation({"title": "Test"}) + + child = parent.create_child_client(auto_flush=False) + + request = CreateSlideRequest() + child.batch_update([request], pres_id) + assert len(child.pending_batch_requests) == 1 + assert len(parent.pending_batch_requests) == 0 + + def test_child_mutations_visible_to_parent(self): + parent = MockGoogleAPIClient() + pres_id = parent.create_presentation({"title": "Test"}) + + child = parent.create_child_client(auto_flush=True) + child.batch_update([CreateSlideRequest()], pres_id) + + # Parent should see the new slide + pres = parent.get_presentation_json(pres_id) + assert len(pres["slides"]) == 2 + + +# ── Integration with Presentation model ───────────────────────────────── + + +class TestMockWithPresentation: + def test_create_blank_presentation(self): + mock = MockGoogleAPIClient() + pres = Presentation.create_blank("Integration Test", api_client=mock) + assert pres.presentationId is not None + assert pres.title == "Integration Test" + + def test_from_id_round_trip(self): + mock = MockGoogleAPIClient() + pres = Presentation.create_blank("Round Trip", api_client=mock) + loaded = Presentation.from_id(pres.presentationId, api_client=mock) + assert loaded.presentationId == pres.presentationId + assert loaded.title == "Round Trip" + + def test_copy_via_drive(self): + mock = MockGoogleAPIClient() + pres = Presentation.create_blank("Original", api_client=mock) + copy_pres = pres.copy_via_drive("The Copy", api_client=mock) + assert copy_pres.title == "The Copy" + assert copy_pres.presentationId != pres.presentationId + + def test_delete_slide(self): + mock = MockGoogleAPIClient() + pres = Presentation.create_blank("Delete Test", api_client=mock) + + # Add a second slide + request = CreateSlideRequest() + result = mock.batch_update([request], pres.presentationId) + new_slide_id = result["replies"][0]["createSlide"]["objectId"] + + original_slide_id = pres.slides[0].objectId + pres.delete_slide(original_slide_id, api_client=mock) + mock.flush_batch_update() + + reloaded = Presentation.from_id(pres.presentationId, api_client=mock) + assert len(reloaded.slides) == 1 + assert reloaded.slides[0].objectId == new_slide_id + + +# ── Snapshot utilities ────────────────────────────────────────────────── + + +class TestSnapshots: + def test_seed_presentation(self): + mock = MockGoogleAPIClient() + snapshot = { + "presentationId": "original_id", + "title": "Snapshot Test", + "pageSize": { + "width": {"magnitude": 9144000, "unit": "EMU"}, + "height": {"magnitude": 5143500, "unit": "EMU"}, + }, + "slides": [ + { + "objectId": "slide_1", + "pageElements": [], + "slideProperties": { + "notesPage": { + "objectId": "notes_1", + "pageElements": [], + "notesProperties": { + "speakerNotesObjectId": "speaker_1" + }, + "pageType": "NOTES", + } + }, + } + ], + "layouts": [], + "masters": [], + } + + mock.seed_presentation("my_pres", snapshot) + data = mock.get_presentation_json("my_pres") + assert data["title"] == "Snapshot Test" + assert data["presentationId"] == "my_pres" + assert len(data["slides"]) == 1 + + def test_load_and_dump_snapshot(self): + # Create a mock with data, "dump" it, then load into a fresh mock + source_mock = MockGoogleAPIClient() + pres_id = source_mock.create_presentation({"title": "Snapshot Source"}) + + with tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False + ) as f: + tmp_path = f.name + + try: + # dump_presentation_snapshot works with any client that has + # get_presentation_json — including MockGoogleAPIClient + dump_presentation_snapshot(source_mock, pres_id, tmp_path) + + # Verify JSON was written + with open(tmp_path) as f: + saved = json.load(f) + assert saved["title"] == "Snapshot Source" + + # Load into a fresh mock + target_mock = MockGoogleAPIClient() + loaded_id = load_presentation_snapshot(target_mock, tmp_path) + assert loaded_id == pres_id + + data = target_mock.get_presentation_json(loaded_id) + assert data["title"] == "Snapshot Source" + finally: + os.unlink(tmp_path) + + def test_load_snapshot_with_custom_id(self): + source_mock = MockGoogleAPIClient() + pres_id = source_mock.create_presentation({"title": "Custom ID"}) + + with tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False + ) as f: + tmp_path = f.name + + try: + dump_presentation_snapshot(source_mock, pres_id, tmp_path) + + target_mock = MockGoogleAPIClient() + loaded_id = load_presentation_snapshot( + target_mock, tmp_path, presentation_id="custom_id" + ) + assert loaded_id == "custom_id" + data = target_mock.get_presentation_json("custom_id") + assert data["presentationId"] == "custom_id" + finally: + os.unlink(tmp_path) + + +# ── Batch processor edge cases ────────────────────────────────────────── + + +class TestBatchProcessorEdgeCases: + def test_delete_nonexistent_object(self): + """Deleting a nonexistent object should not raise.""" + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + mock.delete_object("nonexistent", pres_id) + mock.flush_batch_update() # should not raise + + def test_update_slides_position(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + pres = mock.get_presentation_json(pres_id) + first_slide_id = pres["slides"][0]["objectId"] + + # Add a second slide + result = mock.batch_update([CreateSlideRequest()], pres_id) + second_slide_id = result["replies"][0]["createSlide"]["objectId"] + + # Move second slide to position 0 + from gslides_api.request.request import UpdateSlidesPositionRequest + + request = UpdateSlidesPositionRequest( + slideObjectIds=[second_slide_id], insertionIndex=0 + ) + mock.batch_update([request], pres_id) + + pres = mock.get_presentation_json(pres_id) + assert pres["slides"][0]["objectId"] == second_slide_id + assert pres["slides"][1]["objectId"] == first_slide_id + + def test_slide_thumbnail_stub(self): + mock = MockGoogleAPIClient() + pres_id = mock.create_presentation({"title": "Test"}) + pres = mock.get_presentation_json(pres_id) + slide_id = pres["slides"][0]["objectId"] + + from gslides_api.domain.domain import ThumbnailProperties + + thumb = mock.slide_thumbnail(pres_id, slide_id, ThumbnailProperties()) + assert thumb.contentUrl.startswith("https://mock.test/thumbnail/") + assert thumb.width > 0 + assert thumb.height > 0