diff --git a/README.md b/README.md index 9e6310e6..916fef73 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ api.devices The `icloud` command line interface is organized around top-level subcommands such as `auth`, `account`, `devices`, `calendar`, -`contacts`, `drive`, `photos`, and `hidemyemail`. +`contacts`, `drive`, `photos`, `hidemyemail`, `notes`, and `reminders`. Command options belong on the final command that uses them. For example: @@ -1134,6 +1134,66 @@ rules, and delete flows against a real iCloud account. [`example_reminders_delta.py`](example_reminders_delta.py) is a smaller live validator focused on `sync_cursor()` and `iter_changes(since=...)`. +### Reminders CLI + +The official Typer CLI exposes `icloud reminders ...` for common read and write +flows. + +_List reminder lists and open reminders:_ + +```bash +icloud reminders lists --username you@example.com +icloud reminders list --username you@example.com +icloud reminders list --username you@example.com --list-id INBOX --include-completed +``` + +`icloud reminders list` defaults to open reminders only. Use +`--include-completed` to include completed reminders, and `--list-id` to scope +the query to one list. + +_Get, create, update, and delete reminders:_ + +```bash +icloud reminders get REMINDER_ID --username you@example.com +icloud reminders create --username you@example.com --list-id INBOX --title "Buy milk" +icloud reminders update REMINDER_ID --username you@example.com --title "Buy oat milk" +icloud reminders set-status REMINDER_ID --username you@example.com --completed +icloud reminders delete REMINDER_ID --username you@example.com +``` + +_Inspect snapshots and incremental changes:_ + +```bash +icloud reminders snapshot --username you@example.com --list-id INBOX +icloud reminders changes --username you@example.com --since PREVIOUS_CURSOR +icloud reminders sync-cursor --username you@example.com +``` + +_Work with reminder sub-records:_ + +```bash +icloud reminders alarm add-location REMINDER_ID \ + --username you@example.com \ + --title "Office" \ + --address "1 Infinite Loop, Cupertino, CA" \ + --latitude 37.3318 \ + --longitude -122.0312 + +icloud reminders hashtag create REMINDER_ID errands --username you@example.com +icloud reminders attachment create-url REMINDER_ID \ + --username you@example.com \ + --url https://example.com/checklist +icloud reminders recurrence create REMINDER_ID \ + --username you@example.com \ + --frequency weekly \ + --interval 1 +``` + +The reminder CLI is organized as core commands plus `alarm`, `hashtag`, +`attachment`, and `recurrence` subgroups. Hashtag rename is exposed through +`icloud reminders hashtag update`, but Apple’s web app may still treat hashtag +names as effectively read-only in some live flows. + ## Notes You can access your iCloud Notes through the `notes` property: @@ -1260,6 +1320,54 @@ Notes caveats: - `api.notes.raw` is available for advanced/debug workflows, but it is not the primary Notes API surface. +### Notes CLI + +The official Typer CLI exposes `icloud notes ...` for recent-note inspection, +folder browsing, title-based search, HTML rendering, and note-id-based export. + +_List recent notes, folders, or one folder’s notes:_ + +```bash +icloud notes recent --username you@example.com +icloud notes folders --username you@example.com +icloud notes list --username you@example.com --folder-id FOLDER_ID +icloud notes list --username you@example.com --all --since PREVIOUS_CURSOR +``` + +_Search notes by title:_ + +```bash +icloud notes search --username you@example.com --title "Daily Plan" +icloud notes search --username you@example.com --title-contains "meeting" +``` + +`icloud notes search` is the official title-filter workflow. It uses a +recents-first search strategy and falls back to a full feed scan when needed. + +_Fetch, render, and export one note by id:_ + +```bash +icloud notes get NOTE_ID --username you@example.com --with-attachments +icloud notes render NOTE_ID --username you@example.com --preview-appearance dark +icloud notes export NOTE_ID \ + --username you@example.com \ + --output-dir ./exports/notes_html \ + --export-mode archival \ + --assets-dir ./exports/assets +``` + +`icloud notes export` stays explicit by note id. Title filters are intentionally +handled by `icloud notes search` rather than by bulk export flags. + +_Inspect incremental changes:_ + +```bash +icloud notes changes --username you@example.com --since PREVIOUS_CURSOR +icloud notes sync-cursor --username you@example.com +``` + +### Notes CLI Example + [`examples/notes_cli.py`](examples/notes_cli.py) is a local developer utility built on top of `api.notes`. It is useful for searching notes, inspecting the rendering pipeline, and exporting HTML, but its selection heuristics and debug diff --git a/pyicloud/cli/app.py b/pyicloud/cli/app.py index 9052954a..b8626327 100644 --- a/pyicloud/cli/app.py +++ b/pyicloud/cli/app.py @@ -14,7 +14,9 @@ from pyicloud.cli.commands.devices import app as devices_app from pyicloud.cli.commands.drive import app as drive_app from pyicloud.cli.commands.hidemyemail import app as hidemyemail_app +from pyicloud.cli.commands.notes import app as notes_app from pyicloud.cli.commands.photos import app as photos_app +from pyicloud.cli.commands.reminders import app as reminders_app from pyicloud.cli.context import CLIAbort app = typer.Typer( @@ -87,6 +89,12 @@ def root_callback( invoke_without_command=True, callback=_group_root, ) +app.add_typer( + reminders_app, name="reminders", invoke_without_command=True, callback=_group_root +) +app.add_typer( + notes_app, name="notes", invoke_without_command=True, callback=_group_root +) def main() -> int: diff --git a/pyicloud/cli/commands/notes.py b/pyicloud/cli/commands/notes.py new file mode 100644 index 00000000..ff6748b7 --- /dev/null +++ b/pyicloud/cli/commands/notes.py @@ -0,0 +1,542 @@ +"""Notes commands.""" + +from __future__ import annotations + +from enum import Enum +from itertools import islice +from pathlib import Path +from typing import Optional + +import typer + +from pyicloud.cli.context import CLIAbort, get_state, service_call +from pyicloud.cli.normalize import ( + search_notes_by_title, + select_recent_notes, +) +from pyicloud.cli.options import ( + DEFAULT_LOG_LEVEL, + DEFAULT_OUTPUT_FORMAT, + HttpProxyOption, + HttpsProxyOption, + LogLevelOption, + NoVerifySslOption, + OutputFormatOption, + SessionDirOption, + UsernameOption, + store_command_options, +) +from pyicloud.cli.output import console_table +from pyicloud.services.notes.service import NoteLockedError, NoteNotFound + +app = typer.Typer(help="Inspect, render, and export Notes.") + +NOTES = "Notes" + + +class PreviewAppearance(str, Enum): + """Supported Notes preview appearances.""" + + LIGHT = "light" + DARK = "dark" + + +class ExportMode(str, Enum): + """Supported Notes export modes.""" + + ARCHIVAL = "archival" + LIGHTWEIGHT = "lightweight" + + +def _notes_service(api): + """Return the Notes service with reauthentication handling.""" + + return service_call(NOTES, lambda: api.notes, account_name=api.account_name) + + +def _notes_call(api, fn): + """Wrap Notes service calls with note-specific user-facing errors.""" + + try: + return service_call(NOTES, fn, account_name=api.account_name) + except (NoteNotFound, NoteLockedError) as err: + raise CLIAbort(str(err)) from err + + +def _print_note_rows(state, title: str, rows) -> None: + """Render note summary rows in text mode.""" + + state.console.print( + console_table( + title, + ["ID", "Title", "Folder", "Modified", "Deleted"], + [ + ( + row.id, + row.title, + row.folder_name, + row.modified_at, + getattr(row, "is_deleted", False), + ) + for row in rows + ], + ) + ) + + +@app.command("recent") +def notes_recent( + ctx: typer.Context, + limit: int = typer.Option(10, "--limit", min=1, help="Maximum notes to show."), + include_deleted: bool = typer.Option( + False, + "--include-deleted", + help="Include notes from Recently Deleted.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List recent notes.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + notes = _notes_service(api) + payload = _notes_call( + api, + lambda: select_recent_notes( + notes, + limit=limit, + include_deleted=include_deleted, + ), + ) + if state.json_output: + state.write_json(payload) + return + _print_note_rows(state, "Recent Notes", payload) + + +@app.command("folders") +def notes_folders( + ctx: typer.Context, + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List note folders.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + notes = _notes_service(api) + payload = _notes_call(api, lambda: list(notes.folders())) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_table( + "Note Folders", + ["ID", "Name", "Has Subfolders", "Count"], + [(row.id, row.name, row.has_subfolders, row.count) for row in payload], + ) + ) + + +@app.command("list") +def notes_list( + ctx: typer.Context, + folder_id: Optional[str] = typer.Option(None, "--folder-id", help="Folder id."), + all_notes: bool = typer.Option(False, "--all", help="Iterate all notes."), + since: Optional[str] = typer.Option( + None, + "--since", + help="Incremental sync cursor for --all.", + ), + limit: int = typer.Option(50, "--limit", min=1, help="Maximum notes to show."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List notes.""" + + if folder_id and all_notes: + raise typer.BadParameter("Choose either --folder-id or --all, not both.") + if since and not all_notes: + raise typer.BadParameter("The --since option requires --all.") + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + notes = _notes_service(api) + if folder_id: + payload = _notes_call( + api, lambda: list(notes.in_folder(folder_id, limit=limit)) + ) + elif all_notes: + payload = _notes_call( + api, + lambda: list(islice(notes.iter_all(since=since), limit)), + ) + else: + payload = _notes_call( + api, + lambda: select_recent_notes( + notes, + limit=limit, + include_deleted=False, + ), + ) + if state.json_output: + state.write_json(payload) + return + _print_note_rows(state, "Notes", payload) + + +@app.command("search") +def notes_search( + ctx: typer.Context, + title: str = typer.Option("", "--title", help="Exact note title."), + title_contains: str = typer.Option( + "", + "--title-contains", + help="Case-insensitive note title substring.", + ), + limit: int = typer.Option(10, "--limit", min=1, help="Maximum notes to show."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Search notes by title.""" + + if not title.strip() and not title_contains.strip(): + raise CLIAbort("Pass --title or --title-contains to search notes.") + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + notes = _notes_service(api) + payload = _notes_call( + api, + lambda: search_notes_by_title( + notes, + title=title, + title_contains=title_contains, + limit=limit, + ), + ) + if state.json_output: + state.write_json(payload) + return + _print_note_rows(state, "Matching Notes", payload) + + +@app.command("get") +def notes_get( + ctx: typer.Context, + note_id: str = typer.Argument(..., help="Note id."), + with_attachments: bool = typer.Option( + False, + "--with-attachments", + help="Include attachment metadata.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Get one note.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + notes = _notes_service(api) + note = _notes_call( + api, lambda: notes.get(note_id, with_attachments=with_attachments) + ) + if state.json_output: + state.write_json(note) + return + state.console.print(f"{note.title} [{note.id}]") + if note.folder_name: + state.console.print(f"Folder: {note.folder_name}") + if note.modified_at: + state.console.print(f"Modified: {note.modified_at}") + if note.text: + state.console.print(note.text) + if with_attachments and note.attachments: + state.console.print( + console_table( + "Attachments", + ["ID", "Filename", "UTI", "Size"], + [(att.id, att.filename, att.uti, att.size) for att in note.attachments], + ) + ) + + +@app.command("render") +def notes_render( + ctx: typer.Context, + note_id: str = typer.Argument(..., help="Note id."), + preview_appearance: PreviewAppearance = typer.Option( + PreviewAppearance.LIGHT, + "--preview-appearance", + help="Preview appearance preference.", + ), + pdf_height: int = typer.Option( + 600, + "--pdf-height", + min=1, + help="Embedded PDF height in pixels.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Render a note to HTML.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + notes = _notes_service(api) + html = _notes_call( + api, + lambda: notes.render_note( + note_id, + preview_appearance=preview_appearance.value, + pdf_object_height=pdf_height, + ), + ) + if state.json_output: + state.write_json({"note_id": note_id, "html": html}) + return + state.console.print(html, soft_wrap=True) + + +@app.command("export") +def notes_export( + ctx: typer.Context, + note_id: str = typer.Argument(..., help="Note id."), + output_dir: Path = typer.Option(..., "--output-dir", help="Destination directory."), + export_mode: ExportMode = typer.Option( + ExportMode.ARCHIVAL, + "--export-mode", + help="Export mode.", + ), + assets_dir: Path | None = typer.Option( + None, + "--assets-dir", + help="Directory for downloaded assets in archival mode.", + ), + full_page: bool = typer.Option( + True, + "--full-page/--fragment", + help="Wrap exported output in a full HTML page.", + ), + preview_appearance: PreviewAppearance = typer.Option( + PreviewAppearance.LIGHT, + "--preview-appearance", + help="Preview appearance preference.", + ), + pdf_height: int = typer.Option( + 600, + "--pdf-height", + min=1, + help="Embedded PDF height in pixels.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Export a note to disk.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + notes = _notes_service(api) + path = _notes_call( + api, + lambda: notes.export_note( + note_id, + str(output_dir), + export_mode=export_mode.value, + assets_dir=str(assets_dir) if assets_dir else None, + full_page=full_page, + preview_appearance=preview_appearance.value, + pdf_object_height=pdf_height, + ), + ) + if state.json_output: + state.write_json({"note_id": note_id, "path": path}) + return + state.console.print(path) + + +@app.command("changes") +def notes_changes( + ctx: typer.Context, + since: str | None = typer.Option(None, "--since", help="Sync cursor."), + limit: int = typer.Option(50, "--limit", min=1, help="Maximum changes to show."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List note changes since a cursor.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + notes = _notes_service(api) + payload = _notes_call( + api, + lambda: list(islice(notes.iter_changes(since=since), limit)), + ) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_table( + "Note Changes", + ["Type", "Note ID", "Folder", "Modified"], + [ + (row.type, row.note.id, row.note.folder_name, row.note.modified_at) + for row in payload + ], + ) + ) + + +@app.command("sync-cursor") +def notes_sync_cursor( + ctx: typer.Context, + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Print the current Notes sync cursor.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + notes = _notes_service(api) + cursor = _notes_call(api, lambda: notes.sync_cursor()) + if state.json_output: + state.write_json({"cursor": cursor}) + return + state.console.print(cursor) diff --git a/pyicloud/cli/commands/reminders.py b/pyicloud/cli/commands/reminders.py new file mode 100644 index 00000000..6020bff0 --- /dev/null +++ b/pyicloud/cli/commands/reminders.py @@ -0,0 +1,1608 @@ +"""Reminders commands.""" + +from __future__ import annotations + +from enum import Enum +from typing import Callable, TypeVar + +import typer +from pydantic import ValidationError + +from pyicloud.cli.context import CLIAbort, get_state, parse_datetime, service_call +from pyicloud.cli.options import ( + DEFAULT_LOG_LEVEL, + DEFAULT_OUTPUT_FORMAT, + HttpProxyOption, + HttpsProxyOption, + LogLevelOption, + NoVerifySslOption, + OutputFormatOption, + SessionDirOption, + UsernameOption, + store_command_options, +) +from pyicloud.cli.output import console_kv_table, console_table, format_color_value +from pyicloud.services.reminders.client import RemindersApiError, RemindersAuthError +from pyicloud.services.reminders.models import ( + AlarmWithTrigger, + ImageAttachment, + RecurrenceFrequency, + Reminder, + URLAttachment, +) +from pyicloud.services.reminders.service import Attachment, Proximity + +app = typer.Typer(help="Inspect and mutate Reminders.") +alarm_app = typer.Typer(help="Work with reminder alarms.") +attachment_app = typer.Typer(help="Work with reminder attachments.") +hashtag_app = typer.Typer(help="Work with reminder hashtags.") +recurrence_app = typer.Typer(help="Work with reminder recurrence rules.") + +REMINDERS = "Reminders" +TRelated = TypeVar("TRelated") + + +class ProximityChoice(str, Enum): + """CLI-facing proximity choice.""" + + ARRIVING = "arriving" + LEAVING = "leaving" + + +class RecurrenceFrequencyChoice(str, Enum): + """CLI-facing recurrence frequency.""" + + DAILY = "daily" + WEEKLY = "weekly" + MONTHLY = "monthly" + YEARLY = "yearly" + + +PROXIMITY_MAP = { + ProximityChoice.ARRIVING: Proximity.ARRIVING, + ProximityChoice.LEAVING: Proximity.LEAVING, +} +RECURRENCE_FREQUENCY_MAP = { + RecurrenceFrequencyChoice.DAILY: RecurrenceFrequency.DAILY, + RecurrenceFrequencyChoice.WEEKLY: RecurrenceFrequency.WEEKLY, + RecurrenceFrequencyChoice.MONTHLY: RecurrenceFrequency.MONTHLY, + RecurrenceFrequencyChoice.YEARLY: RecurrenceFrequency.YEARLY, +} + + +def _group_root(ctx: typer.Context) -> None: + """Show subgroup help when invoked without a subcommand.""" + + if ctx.invoked_subcommand is None: + typer.echo(ctx.get_help()) + raise typer.Exit() + + +app.add_typer( + alarm_app, name="alarm", invoke_without_command=True, callback=_group_root +) +app.add_typer( + hashtag_app, name="hashtag", invoke_without_command=True, callback=_group_root +) +app.add_typer( + attachment_app, + name="attachment", + invoke_without_command=True, + callback=_group_root, +) +app.add_typer( + recurrence_app, + name="recurrence", + invoke_without_command=True, + callback=_group_root, +) + + +def _normalize_prefixed_id(value: str, prefix: str) -> str: + """Return an identifier with the expected record prefix.""" + + normalized = str(value).strip() + if not normalized: + return normalized + token = f"{prefix}/" + if normalized.startswith(token): + return normalized + return f"{token}{normalized}" + + +def _id_matches(record_id: str, query: str) -> bool: + """Return whether a record id matches a full or shorthand query.""" + + normalized = str(query).strip() + if not normalized: + return False + if record_id == normalized: + return True + if "/" in record_id and record_id.split("/", 1)[1] == normalized: + return True + return False + + +def _reminders_service(api): + """Return the Reminders service with reauthentication handling.""" + + return service_call(REMINDERS, lambda: api.reminders, account_name=api.account_name) + + +def _reminders_call(api, fn): + """Wrap reminder calls with reminder-specific user-facing errors.""" + + try: + return service_call(REMINDERS, fn, account_name=api.account_name) + except ( + LookupError, + ValidationError, + RemindersApiError, + RemindersAuthError, + ) as err: + raise CLIAbort(str(err)) from err + + +def _resolve_reminder(api, reminder_id: str) -> Reminder: + """Return one reminder by id.""" + + reminders = _reminders_service(api) + return _reminders_call(api, lambda: reminders.get(reminder_id)) + + +def _list_reminder_rows( + api, + *, + list_id: str | None = None, + include_completed: bool, + limit: int, +) -> list[Reminder]: + """Return reminder rows using compound snapshots to preserve completion filtering.""" + + reminders = _reminders_service(api) + results_limit = max(limit, 200) + if list_id: + snapshot = _reminders_call( + api, + lambda: reminders.list_reminders( + list_id=_normalize_prefixed_id(list_id, "List"), + include_completed=include_completed, + results_limit=results_limit, + ), + ) + return snapshot.reminders[:limit] + + rows: list[Reminder] = [] + seen_ids: set[str] = set() + for reminder_list in _reminders_call(api, lambda: list(reminders.lists())): + snapshot = _reminders_call( + api, + lambda lid=reminder_list.id: reminders.list_reminders( + list_id=lid, + include_completed=include_completed, + results_limit=results_limit, + ), + ) + for reminder in snapshot.reminders: + if reminder.id in seen_ids: + continue + seen_ids.add(reminder.id) + rows.append(reminder) + if len(rows) >= limit: + return rows + return rows + + +def _resolve_related_record( + api, + reminder_id: str, + query: str, + *, + label: str, + fetch_rows: Callable[[Reminder], list[TRelated]], +) -> tuple[Reminder, TRelated]: + """Return one reminder child record matched by full or shorthand id.""" + + reminder = _resolve_reminder(api, reminder_id) + rows = _reminders_call(api, lambda: fetch_rows(reminder)) + for row in rows: + row_id = getattr(row, "id", "") + if _id_matches(row_id, query): + return reminder, row + raise CLIAbort(f"No {label} matched '{query}' for reminder {reminder.id}.") + + +def _attachment_kind(attachment: Attachment) -> str: + """Return a compact attachment type label.""" + + if isinstance(attachment, URLAttachment): + return "url" + if isinstance(attachment, ImageAttachment): + return "image" + return type(attachment).__name__.lower() + + +def _proximity_label(proximity: Proximity | None) -> str | None: + """Return a human-readable proximity label.""" + + if proximity is None: + return None + return proximity.name.lower() + + +def _frequency_label(frequency: RecurrenceFrequency | None) -> str | None: + """Return a human-readable recurrence frequency label.""" + + if frequency is None: + return None + return frequency.name.lower() + + +def _sync_cursor_payload(state, cursor: str) -> None: + """Render a sync cursor in JSON or text mode.""" + + if state.json_output: + state.write_json({"cursor": cursor}) + return + state.console.print(cursor) + + +@app.command("lists") +def reminders_lists( + ctx: typer.Context, + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List reminder lists.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + payload = _reminders_call(api, lambda: list(reminders.lists())) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_table( + "Reminder Lists", + ["ID", "Title", "Color", "Count"], + [ + ( + row.id, + row.title, + format_color_value(row.color), + row.count, + ) + for row in payload + ], + ) + ) + + +@app.command("list") +def reminders_list( + ctx: typer.Context, + list_id: str | None = typer.Option(None, "--list-id", help="Reminder list id."), + include_completed: bool = typer.Option( + False, + "--include-completed", + help="Include completed reminders.", + ), + limit: int = typer.Option(50, "--limit", min=1, help="Maximum reminders to show."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List reminders.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + payload = _list_reminder_rows( + api, + list_id=list_id, + include_completed=include_completed, + limit=limit, + ) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_table( + "Reminders", + ["ID", "Title", "Completed", "Due", "Priority"], + [ + ( + reminder.id, + reminder.title, + reminder.completed, + reminder.due_date, + reminder.priority, + ) + for reminder in payload + ], + ) + ) + + +@app.command("get") +def reminders_get( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Get one reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminder = _resolve_reminder(api, reminder_id) + if state.json_output: + state.write_json(reminder) + return + state.console.print( + console_kv_table( + f"Reminder: {reminder.title}", + [ + ("ID", reminder.id), + ("List ID", reminder.list_id), + ("Description", reminder.desc), + ("Completed", reminder.completed), + ("Due Date", reminder.due_date), + ("Priority", reminder.priority), + ("Flagged", reminder.flagged), + ("All Day", reminder.all_day), + ("Time Zone", reminder.time_zone), + ("Parent Reminder", reminder.parent_reminder_id), + ], + ) + ) + + +@app.command("create") +def reminders_create( + ctx: typer.Context, + list_id: str = typer.Option(..., "--list-id", help="Target list id."), + title: str = typer.Option(..., "--title", help="Reminder title."), + desc: str = typer.Option("", "--desc", help="Reminder description."), + completed: bool = typer.Option( + False, + "--completed/--not-completed", + help="Create the reminder as completed or incomplete.", + ), + due_date: str | None = typer.Option(None, "--due-date", help="Due datetime."), + priority: int = typer.Option(0, "--priority", help="Apple priority number."), + flagged: bool = typer.Option( + False, + "--flagged/--not-flagged", + help="Create the reminder flagged or unflagged.", + ), + all_day: bool = typer.Option( + False, + "--all-day/--not-all-day", + help="Create the reminder as all-day or timed.", + ), + time_zone: str | None = typer.Option(None, "--time-zone", help="IANA time zone."), + parent_reminder_id: str | None = typer.Option( + None, + "--parent-reminder-id", + help="Parent reminder id for a subtask.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Create a reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _reminders_call( + api, + lambda: reminders.create( + list_id=_normalize_prefixed_id(list_id, "List"), + title=title, + desc=desc, + completed=completed, + due_date=parse_datetime(due_date), + priority=priority, + flagged=flagged, + all_day=all_day, + time_zone=time_zone, + parent_reminder_id=parent_reminder_id, + ), + ) + if state.json_output: + state.write_json(reminder) + return + state.console.print(reminder.id) + + +@app.command("update") +def reminders_update( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + title: str | None = typer.Option(None, "--title", help="Reminder title."), + desc: str | None = typer.Option(None, "--desc", help="Reminder description."), + completed: bool | None = typer.Option( + None, + "--completed/--not-completed", + help="Mark the reminder completed or incomplete.", + ), + due_date: str | None = typer.Option(None, "--due-date", help="Due datetime."), + clear_due_date: bool = typer.Option( + False, + "--clear-due-date", + help="Clear the due date.", + ), + priority: int | None = typer.Option(None, "--priority", help="Apple priority."), + flagged: bool | None = typer.Option( + None, + "--flagged/--not-flagged", + help="Flag or unflag the reminder.", + ), + all_day: bool | None = typer.Option( + None, + "--all-day/--not-all-day", + help="Mark as all-day or timed.", + ), + time_zone: str | None = typer.Option(None, "--time-zone", help="IANA time zone."), + clear_time_zone: bool = typer.Option( + False, + "--clear-time-zone", + help="Clear the time zone.", + ), + parent_reminder_id: str | None = typer.Option( + None, + "--parent-reminder-id", + help="Set the parent reminder id.", + ), + clear_parent_reminder: bool = typer.Option( + False, + "--clear-parent-reminder", + help="Clear the parent reminder id.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Update one reminder.""" + + if due_date and clear_due_date: + raise typer.BadParameter( + "Choose either --due-date or --clear-due-date, not both." + ) + if time_zone and clear_time_zone: + raise typer.BadParameter( + "Choose either --time-zone or --clear-time-zone, not both." + ) + if parent_reminder_id and clear_parent_reminder: + raise typer.BadParameter( + "Choose either --parent-reminder-id or --clear-parent-reminder, not both." + ) + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + changed = False + + if title is not None: + reminder.title = title + changed = True + if desc is not None: + reminder.desc = desc + changed = True + if completed is not None: + reminder.completed = completed + changed = True + if due_date is not None: + reminder.due_date = parse_datetime(due_date) + changed = True + elif clear_due_date: + reminder.due_date = None + changed = True + if priority is not None: + reminder.priority = priority + changed = True + if flagged is not None: + reminder.flagged = flagged + changed = True + if all_day is not None: + reminder.all_day = all_day + changed = True + if time_zone is not None: + reminder.time_zone = time_zone + changed = True + elif clear_time_zone: + reminder.time_zone = None + changed = True + if parent_reminder_id is not None: + reminder.parent_reminder_id = parent_reminder_id + changed = True + elif clear_parent_reminder: + reminder.parent_reminder_id = None + changed = True + + if not changed: + raise CLIAbort("No reminder updates were requested.") + + _reminders_call(api, lambda: reminders.update(reminder)) + if state.json_output: + state.write_json(reminder) + return + state.console.print(f"Updated {reminder.id}") + + +@app.command("set-status") +def reminders_set_status( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + completed: bool = typer.Option( + True, + "--completed/--not-completed", + help="Mark the reminder completed or incomplete.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Mark a reminder completed or incomplete.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + reminder.completed = completed + _reminders_call(api, lambda: reminders.update(reminder)) + if state.json_output: + state.write_json(reminder) + return + state.console.print(f"Updated {reminder.id}: completed={completed}") + + +@app.command("delete") +def reminders_delete( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Delete a reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + _reminders_call(api, lambda: reminders.delete(reminder)) + if state.json_output: + state.write_json({"reminder_id": reminder.id, "deleted": True}) + return + state.console.print(f"Deleted {reminder.id}") + + +@app.command("snapshot") +def reminders_snapshot( + ctx: typer.Context, + list_id: str = typer.Option(..., "--list-id", help="Reminder list id."), + include_completed: bool = typer.Option( + False, + "--include-completed", + help="Include completed reminders.", + ), + results_limit: int = typer.Option( + 200, + "--results-limit", + min=1, + help="Maximum reminders to request from the compound query.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Fetch a compound reminder snapshot for one list.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + payload = _reminders_call( + api, + lambda: reminders.list_reminders( + list_id=_normalize_prefixed_id(list_id, "List"), + include_completed=include_completed, + results_limit=results_limit, + ), + ) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_kv_table( + "Reminder Snapshot", + [ + ("List ID", _normalize_prefixed_id(list_id, "List")), + ("Reminders", len(payload.reminders)), + ("Alarms", len(payload.alarms)), + ("Triggers", len(payload.triggers)), + ("Attachments", len(payload.attachments)), + ("Hashtags", len(payload.hashtags)), + ("Recurrence Rules", len(payload.recurrence_rules)), + ], + ) + ) + state.console.print( + console_table( + "Snapshot Reminders", + ["ID", "Title", "Completed", "Due", "Priority"], + [ + ( + reminder.id, + reminder.title, + reminder.completed, + reminder.due_date, + reminder.priority, + ) + for reminder in payload.reminders + ], + ) + ) + + +@app.command("changes") +def reminders_changes( + ctx: typer.Context, + since: str | None = typer.Option(None, "--since", help="Sync cursor."), + limit: int = typer.Option(50, "--limit", min=1, help="Maximum changes to show."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List reminder changes since a cursor.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + payload = _reminders_call( + api, + lambda: list(reminders.iter_changes(since=since))[:limit], + ) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_table( + "Reminder Changes", + ["Type", "Reminder ID", "Title", "Completed"], + [ + ( + event.type, + event.reminder_id, + event.reminder.title if event.reminder else None, + event.reminder.completed if event.reminder else None, + ) + for event in payload + ], + ) + ) + + +@app.command("sync-cursor") +def reminders_sync_cursor( + ctx: typer.Context, + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Print the current Reminders sync cursor.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + cursor = _reminders_call(api, lambda: reminders.sync_cursor()) + _sync_cursor_payload(state, cursor) + + +@alarm_app.command("list") +def reminders_alarm_list( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List alarms for one reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + payload = _reminders_call(api, lambda: reminders.alarms_for(reminder)) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_table( + "Reminder Alarms", + [ + "Alarm ID", + "Trigger ID", + "Title", + "Address", + "Radius", + "Proximity", + ], + [ + ( + row.alarm.id, + row.trigger.id if row.trigger else None, + row.trigger.title if row.trigger else None, + row.trigger.address if row.trigger else None, + row.trigger.radius if row.trigger else None, + _proximity_label(row.trigger.proximity if row.trigger else None), + ) + for row in payload + ], + ) + ) + + +@alarm_app.command("add-location") +def reminders_alarm_add_location( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + title: str = typer.Option(..., "--title", help="Location title."), + address: str = typer.Option(..., "--address", help="Location address."), + latitude: float = typer.Option(..., "--latitude", help="Location latitude."), + longitude: float = typer.Option(..., "--longitude", help="Location longitude."), + radius: float = typer.Option(100.0, "--radius", min=0.0, help="Radius in meters."), + proximity: ProximityChoice = typer.Option( + ProximityChoice.ARRIVING, + "--proximity", + help="Trigger direction.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Add a location alarm to a reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + alarm, trigger = _reminders_call( + api, + lambda: reminders.add_location_trigger( + reminder, + title=title, + address=address, + latitude=latitude, + longitude=longitude, + radius=radius, + proximity=PROXIMITY_MAP[proximity], + ), + ) + payload = AlarmWithTrigger(alarm=alarm, trigger=trigger) + if state.json_output: + state.write_json(payload) + return + state.console.print(f"Created {alarm.id} with trigger {trigger.id}") + + +@hashtag_app.command("list") +def reminders_hashtag_list( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List hashtags for one reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + payload = _reminders_call(api, lambda: reminders.tags_for(reminder)) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_table( + "Reminder Hashtags", + ["ID", "Name", "Reminder ID"], + [(row.id, row.name, row.reminder_id) for row in payload], + ) + ) + + +@hashtag_app.command("create") +def reminders_hashtag_create( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + name: str = typer.Argument(..., help="Hashtag name."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Create a hashtag on one reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + payload = _reminders_call(api, lambda: reminders.create_hashtag(reminder, name)) + if state.json_output: + state.write_json(payload) + return + state.console.print(payload.id) + + +@hashtag_app.command("update") +def reminders_hashtag_update( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + hashtag_id: str = typer.Argument(..., help="Hashtag id."), + name: str = typer.Option(..., "--name", help="Updated hashtag name."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Update a hashtag name.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + _reminder, hashtag = _resolve_related_record( + api, + reminder_id, + hashtag_id, + label="hashtag", + fetch_rows=lambda reminder: reminders.tags_for(reminder), + ) + _reminders_call(api, lambda: reminders.update_hashtag(hashtag, name)) + hashtag.name = name + if state.json_output: + state.write_json(hashtag) + return + state.console.print(f"Updated {hashtag.id}") + + +@hashtag_app.command("delete") +def reminders_hashtag_delete( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + hashtag_id: str = typer.Argument(..., help="Hashtag id."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Delete a hashtag from one reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder, hashtag = _resolve_related_record( + api, + reminder_id, + hashtag_id, + label="hashtag", + fetch_rows=lambda row: reminders.tags_for(row), + ) + _reminders_call(api, lambda: reminders.delete_hashtag(reminder, hashtag)) + payload = {"reminder_id": reminder.id, "hashtag_id": hashtag.id, "deleted": True} + if state.json_output: + state.write_json(payload) + return + state.console.print(f"Deleted {hashtag.id}") + + +@attachment_app.command("list") +def reminders_attachment_list( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List attachments for one reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + payload = _reminders_call(api, lambda: reminders.attachments_for(reminder)) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_table( + "Reminder Attachments", + ["ID", "Type", "URL", "Filename", "UTI", "Size"], + [ + ( + row.id, + _attachment_kind(row), + getattr(row, "url", None), + getattr(row, "filename", None), + row.uti, + getattr(row, "file_size", None), + ) + for row in payload + ], + ) + ) + + +@attachment_app.command("create-url") +def reminders_attachment_create_url( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + url: str = typer.Option(..., "--url", help="Attachment URL."), + uti: str = typer.Option("public.url", "--uti", help="Attachment UTI."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Create a URL attachment on one reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + payload = _reminders_call( + api, + lambda: reminders.create_url_attachment(reminder, url=url, uti=uti), + ) + if state.json_output: + state.write_json(payload) + return + state.console.print(payload.id) + + +@attachment_app.command("update") +def reminders_attachment_update( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + attachment_id: str = typer.Argument(..., help="Attachment id."), + url: str | None = typer.Option(None, "--url", help="Updated attachment URL."), + uti: str | None = typer.Option(None, "--uti", help="Updated attachment UTI."), + filename: str | None = typer.Option( + None, + "--filename", + help="Updated attachment filename.", + ), + file_size: int | None = typer.Option( + None, + "--file-size", + min=0, + help="Updated attachment size.", + ), + width: int | None = typer.Option( + None, + "--width", + min=0, + help="Updated attachment width.", + ), + height: int | None = typer.Option( + None, + "--height", + min=0, + help="Updated attachment height.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Update one attachment.""" + + if all(value is None for value in (url, uti, filename, file_size, width, height)): + raise CLIAbort("No attachment updates were requested.") + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + _reminder, attachment = _resolve_related_record( + api, + reminder_id, + attachment_id, + label="attachment", + fetch_rows=lambda reminder: reminders.attachments_for(reminder), + ) + _reminders_call( + api, + lambda: reminders.update_attachment( + attachment, + url=url, + uti=uti, + filename=filename, + file_size=file_size, + width=width, + height=height, + ), + ) + if url is not None and hasattr(attachment, "url"): + attachment.url = url + if uti is not None: + attachment.uti = uti + if filename is not None and hasattr(attachment, "filename"): + attachment.filename = filename + if file_size is not None and hasattr(attachment, "file_size"): + attachment.file_size = file_size + if width is not None and hasattr(attachment, "width"): + attachment.width = width + if height is not None and hasattr(attachment, "height"): + attachment.height = height + if state.json_output: + state.write_json(attachment) + return + state.console.print(f"Updated {attachment.id}") + + +@attachment_app.command("delete") +def reminders_attachment_delete( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + attachment_id: str = typer.Argument(..., help="Attachment id."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Delete one attachment.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder, attachment = _resolve_related_record( + api, + reminder_id, + attachment_id, + label="attachment", + fetch_rows=lambda row: reminders.attachments_for(row), + ) + _reminders_call(api, lambda: reminders.delete_attachment(reminder, attachment)) + payload = { + "reminder_id": reminder.id, + "attachment_id": attachment.id, + "deleted": True, + } + if state.json_output: + state.write_json(payload) + return + state.console.print(f"Deleted {attachment.id}") + + +@recurrence_app.command("list") +def reminders_recurrence_list( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """List recurrence rules for one reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + payload = _reminders_call(api, lambda: reminders.recurrence_rules_for(reminder)) + if state.json_output: + state.write_json(payload) + return + state.console.print( + console_table( + "Reminder Recurrence Rules", + ["ID", "Frequency", "Interval", "Occurrence Count", "First Day"], + [ + ( + row.id, + _frequency_label(row.frequency), + row.interval, + row.occurrence_count, + row.first_day_of_week, + ) + for row in payload + ], + ) + ) + + +@recurrence_app.command("create") +def reminders_recurrence_create( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + frequency: RecurrenceFrequencyChoice = typer.Option( + RecurrenceFrequencyChoice.DAILY, + "--frequency", + help="Recurrence frequency.", + ), + interval: int = typer.Option(1, "--interval", min=1, help="Recurrence interval."), + occurrence_count: int = typer.Option( + 0, + "--occurrence-count", + min=0, + help="Occurrence count; 0 means unlimited.", + ), + first_day_of_week: int = typer.Option( + 0, + "--first-day-of-week", + min=0, + max=6, + help="First day of week; 0 is Sunday.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Create a recurrence rule on one reminder.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder = _resolve_reminder(api, reminder_id) + payload = _reminders_call( + api, + lambda: reminders.create_recurrence_rule( + reminder, + frequency=RECURRENCE_FREQUENCY_MAP[frequency], + interval=interval, + occurrence_count=occurrence_count, + first_day_of_week=first_day_of_week, + ), + ) + if state.json_output: + state.write_json(payload) + return + state.console.print(payload.id) + + +@recurrence_app.command("update") +def reminders_recurrence_update( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + rule_id: str = typer.Argument(..., help="Recurrence rule id."), + frequency: RecurrenceFrequencyChoice | None = typer.Option( + None, + "--frequency", + help="Recurrence frequency.", + ), + interval: int | None = typer.Option( + None, + "--interval", + min=1, + help="Recurrence interval.", + ), + occurrence_count: int | None = typer.Option( + None, + "--occurrence-count", + min=0, + help="Occurrence count; 0 means unlimited.", + ), + first_day_of_week: int | None = typer.Option( + None, + "--first-day-of-week", + min=0, + max=6, + help="First day of week; 0 is Sunday.", + ), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Update one recurrence rule.""" + + if all( + value is None + for value in (frequency, interval, occurrence_count, first_day_of_week) + ): + raise CLIAbort("No recurrence updates were requested.") + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + _reminder, recurrence_rule = _resolve_related_record( + api, + reminder_id, + rule_id, + label="recurrence rule", + fetch_rows=lambda reminder: reminders.recurrence_rules_for(reminder), + ) + _reminders_call( + api, + lambda: reminders.update_recurrence_rule( + recurrence_rule, + frequency=( + RECURRENCE_FREQUENCY_MAP[frequency] if frequency is not None else None + ), + interval=interval, + occurrence_count=occurrence_count, + first_day_of_week=first_day_of_week, + ), + ) + if frequency is not None: + recurrence_rule.frequency = RECURRENCE_FREQUENCY_MAP[frequency] + if interval is not None: + recurrence_rule.interval = interval + if occurrence_count is not None: + recurrence_rule.occurrence_count = occurrence_count + if first_day_of_week is not None: + recurrence_rule.first_day_of_week = first_day_of_week + if state.json_output: + state.write_json(recurrence_rule) + return + state.console.print(f"Updated {recurrence_rule.id}") + + +@recurrence_app.command("delete") +def reminders_recurrence_delete( + ctx: typer.Context, + reminder_id: str = typer.Argument(..., help="Reminder id."), + rule_id: str = typer.Argument(..., help="Recurrence rule id."), + username: UsernameOption = None, + session_dir: SessionDirOption = None, + http_proxy: HttpProxyOption = None, + https_proxy: HttpsProxyOption = None, + no_verify_ssl: NoVerifySslOption = False, + output_format: OutputFormatOption = DEFAULT_OUTPUT_FORMAT, + log_level: LogLevelOption = DEFAULT_LOG_LEVEL, +) -> None: + """Delete one recurrence rule.""" + + store_command_options( + ctx, + username=username, + session_dir=session_dir, + http_proxy=http_proxy, + https_proxy=https_proxy, + no_verify_ssl=no_verify_ssl, + output_format=output_format, + log_level=log_level, + ) + state = get_state(ctx) + api = state.get_api() + reminders = _reminders_service(api) + reminder, recurrence_rule = _resolve_related_record( + api, + reminder_id, + rule_id, + label="recurrence rule", + fetch_rows=lambda row: reminders.recurrence_rules_for(row), + ) + _reminders_call( + api, + lambda: reminders.delete_recurrence_rule(reminder, recurrence_rule), + ) + payload = { + "reminder_id": reminder.id, + "recurrence_rule_id": recurrence_rule.id, + "deleted": True, + } + if state.json_output: + state.write_json(payload) + return + state.console.print(f"Deleted {recurrence_rule.id}") diff --git a/pyicloud/cli/normalize.py b/pyicloud/cli/normalize.py index acc65557..c087227a 100644 --- a/pyicloud/cli/normalize.py +++ b/pyicloud/cli/normalize.py @@ -2,8 +2,11 @@ from __future__ import annotations +from datetime import datetime, timezone from typing import Any +MAX_NOTES_SEARCH_WINDOW = 5_000 + def normalize_account_summary(api, account) -> dict[str, Any]: """Normalize account summary data.""" @@ -174,3 +177,97 @@ def normalize_alias(alias: dict[str, Any]) -> dict[str, Any]: "label": alias.get("label"), "anonymous_id": alias.get("anonymousId"), } + + +def select_recent_notes( + notes_service: Any, *, limit: int, include_deleted: bool +) -> list[Any]: + """Return recent notes, excluding deleted notes by default.""" + + if limit <= 0: + return [] + if include_deleted: + return list(notes_service.recents(limit=limit)) + + probe_limit = limit + max_probe = min(max(limit, 10) * 8, 500) + while True: + rows = list(notes_service.recents(limit=probe_limit)) + filtered = [row for row in rows if not getattr(row, "is_deleted", False)] + if ( + len(filtered) >= limit + or len(rows) < probe_limit + or probe_limit >= max_probe + ): + return filtered[:limit] + probe_limit = min(probe_limit * 2, max_probe) + + +def search_notes_by_title( + notes_service: Any, + *, + title: str | None = None, + title_contains: str | None = None, + limit: int, +) -> list[Any]: + """Return title-matched notes using recents-first search with full-scan fallback.""" + + if limit <= 0: + return [] + + exact = (title or "").strip() + contains = (title_contains or "").strip().lower() + if not exact and not contains: + return [] + + def matches(note_title: str | None) -> bool: + if not note_title: + return False + if exact and note_title == exact: + return True + if contains and contains in note_title.lower(): + return True + return False + + def dedupe_key(item: Any) -> Any: + return getattr(item, "id", None) or id(item) + + candidates: list[Any] = [] + seen: set[Any] = set() + window = min(MAX_NOTES_SEARCH_WINDOW, max(500, limit * 50)) + + for note in notes_service.recents(limit=window): + if not matches(getattr(note, "title", None)): + continue + key = dedupe_key(note) + if key in seen: + continue + seen.add(key) + candidates.append(note) + if len(candidates) >= limit: + break + + if len(candidates) < limit: + for note in notes_service.iter_all(): + if not matches(getattr(note, "title", None)): + continue + key = dedupe_key(note) + if key in seen: + continue + seen.add(key) + candidates.append(note) + if len(candidates) >= limit: + break + + epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) + + def sort_key(item: Any) -> datetime: + modified_at = getattr(item, "modified_at", None) + if modified_at is None: + return epoch + if modified_at.tzinfo is None: + return modified_at.replace(tzinfo=timezone.utc) + return modified_at + + candidates.sort(key=sort_key, reverse=True) + return candidates[:limit] diff --git a/pyicloud/cli/output.py b/pyicloud/cli/output.py index 9d816da4..77256147 100644 --- a/pyicloud/cli/output.py +++ b/pyicloud/cli/output.py @@ -119,3 +119,37 @@ def print_json_text(console: Console, payload: Any) -> None: """Pretty-print a JSON object in text mode.""" console.print_json(json=to_json_string(payload, indent=2)) + + +def format_color_value(value: Any) -> str: + """Return a compact human-friendly representation of reminder colors.""" + + if not value: + return "" + + payload = value + if isinstance(value, str): + stripped = value.strip() + if not stripped: + return "" + if not stripped.startswith("{"): + return stripped + try: + payload = json.loads(stripped) + except json.JSONDecodeError: + return stripped + + if isinstance(payload, dict): + hex_value = payload.get("daHexString") + symbolic = payload.get("ckSymbolicColorName") or payload.get( + "daSymbolicColorName" + ) + if hex_value and symbolic and symbolic != "custom": + return f"{symbolic} ({hex_value})" + if hex_value: + return str(hex_value) + if symbolic: + return str(symbolic) + return to_json_string(payload) + + return str(payload) diff --git a/pyicloud/services/notes/service.py b/pyicloud/services/notes/service.py index 44fc0007..ad7728a3 100644 --- a/pyicloud/services/notes/service.py +++ b/pyicloud/services/notes/service.py @@ -216,6 +216,10 @@ def iter_all(self, *, since: Optional[str] = None) -> Iterable[NoteSummary]: ``NoteSummary`` instances for full exports, indexing jobs, or local cache refreshes. """ + if self._matches_current_sync_cursor(since): + LOGGER.debug("Skipping Notes full scan because sync token is current") + return + LOGGER.debug("Iterating all notes%s", f" since={since}" if since else "") for zone in self._raw.changes( zone_req=CKZoneChangesZoneReq( @@ -277,7 +281,11 @@ def folders(self) -> Iterable[NoteFolder]: name = self._decode_encrypted( rec.fields.get_value("TitleEncrypted") ) - has_sub_value = rec.fields.get_value(_HAS_SUBFOLDER_FIELD) + has_sub_value = getattr( + rec.fields.get_field(_HAS_SUBFOLDER_FIELD) or (), + "value", + None, + ) has_sub = None if has_sub_value is None else bool(has_sub_value) yield NoteFolder( id=folder_id, name=name, has_subfolders=has_sub, count=None @@ -519,6 +527,10 @@ def iter_changes(self, *, since: Optional[str] = None) -> Iterable[ChangeEvent]: Pass a sync token from ``sync_cursor()`` to process only new changes since a previous run. """ + if self._matches_current_sync_cursor(since): + LOGGER.debug("Skipping Notes change scan because sync token is current") + return + LOGGER.debug("Iterating changes%s", f" since={since}" if since else "") for zone in self._raw.changes( zone_req=CKZoneChangesZoneReq( @@ -604,6 +616,17 @@ def raw(self) -> CloudKitNotesClient: # -------------------------- Internal helpers ----------------------------- + def _matches_current_sync_cursor(self, since: Optional[str]) -> bool: + """Return whether an incremental Notes cursor is already current.""" + if not since: + return False + + try: + return self._raw.current_sync_token(zone_name="Notes") == since + except NotesApiError as exc: + LOGGER.warning("Failed to preflight Notes sync token: %s", exc) + return False + @staticmethod def _coerce_keys(keys: Optional[Iterable[object]]) -> Optional[List[str]]: if keys is None: diff --git a/pyicloud/services/reminders/_mappers.py b/pyicloud/services/reminders/_mappers.py index 0b04e1d8..7c67480e 100644 --- a/pyicloud/services/reminders/_mappers.py +++ b/pyicloud/services/reminders/_mappers.py @@ -11,6 +11,7 @@ from ._protocol import ( _as_raw_id, _decode_attachment_url, + _decode_cloudkit_text_value, _decode_crdt_document, _ref_name, ) @@ -117,36 +118,37 @@ def _reminder_ids_for_list_record(self, rec: CKRecord) -> list[str]: def _coerce_text(self, value: Any, *, field_name: str, record_name: str) -> str: """Normalize CloudKit text-like values into ``str`` for domain models.""" - if value is None: - return "" - if isinstance(value, str): - return value - if isinstance(value, bytes): - try: - return value.decode("utf-8") - except UnicodeDecodeError: - self._logger.warning( - "Field %s on %s was undecodable bytes; replacing invalid UTF-8", - field_name, - record_name, - ) - return value.decode("utf-8", errors="replace") - return str(value) + try: + return _decode_cloudkit_text_value(value) + except UnicodeDecodeError: + self._logger.warning( + "Field %s on %s was undecodable bytes; replacing invalid UTF-8", + field_name, + record_name, + ) + return value.decode("utf-8", errors="replace") def record_to_list(self, rec: CKRecord) -> RemindersList: fields = rec.fields title = fields.get_value("Name") color = fields.get_value("Color") + reminder_ids = self._reminder_ids_for_list_record(rec) + raw_count = fields.get_value("Count") + count = int(raw_count) if raw_count is not None else 0 + if count == 0 and reminder_ids: + # Live list records can carry complete reminder membership while the + # Count field stays at zero. Prefer the membership size in that case. + count = len(reminder_ids) return RemindersList( id=rec.recordName, title=str(title) if title else "Untitled", color=str(color) if color else None, - count=int(fields.get_value("Count") or 0), + count=count, badge_emblem=fields.get_value("BadgeEmblem"), sorting_style=fields.get_value("SortingStyle"), is_group=bool(fields.get_value("IsGroup") or 0), - reminder_ids=self._reminder_ids_for_list_record(rec), + reminder_ids=reminder_ids, record_change_tag=rec.recordChangeTag, ) diff --git a/pyicloud/services/reminders/_protocol.py b/pyicloud/services/reminders/_protocol.py index b5b0b31b..1ec726c3 100644 --- a/pyicloud/services/reminders/_protocol.py +++ b/pyicloud/services/reminders/_protocol.py @@ -84,6 +84,23 @@ def _decode_attachment_url(value: str) -> str: return value +def _encode_cloudkit_text_field(value: str) -> dict[str, str]: + """Encode text for CloudKit fields that store UTF-8 payload bytes.""" + encoded = base64.b64encode((value or "").encode("utf-8")).decode("ascii") + return {"type": "ENCRYPTED_BYTES", "value": encoded} + + +def _decode_cloudkit_text_value(value: object) -> str: + """Decode a CloudKit text field value into plain ``str``.""" + if value is None: + return "" + if isinstance(value, str): + return value + if isinstance(value, bytes): + return value.decode("utf-8") + return str(value) + + def _decode_crdt_document(encrypted_value: str | bytes) -> str: """Decode a CRDT document (TitleDocument or NotesDocument).""" data = encrypted_value diff --git a/pyicloud/services/reminders/_reads.py b/pyicloud/services/reminders/_reads.py index 57027827..2ddd6ad5 100644 --- a/pyicloud/services/reminders/_reads.py +++ b/pyicloud/services/reminders/_reads.py @@ -115,6 +115,10 @@ def reminders(self, list_id: Optional[str] = None) -> Iterable[Reminder]: def sync_cursor(self) -> str: """Return the latest usable sync token for the Reminders zone.""" + query_token = self._get_raw().current_sync_token(zone_id=_REMINDERS_ZONE_REQ) + if query_token: + return query_token + sync_token: Optional[str] = None for zone in self._iter_zone_change_pages( desired_record_types=[], diff --git a/pyicloud/services/reminders/_writes.py b/pyicloud/services/reminders/_writes.py index 6a354cda..c8bf9f2b 100644 --- a/pyicloud/services/reminders/_writes.py +++ b/pyicloud/services/reminders/_writes.py @@ -21,6 +21,7 @@ from ._protocol import ( _as_raw_id, _as_record_name, + _encode_cloudkit_text_field, _encode_crdt_document, _generate_resolution_token_map, ) @@ -736,7 +737,7 @@ def create_hashtag(self, reminder: Reminder, name: str) -> Hashtag: field_name="HashtagIDs", token_field_name="hashtagIDs", child_fields={ - "Name": {"type": "STRING", "value": name}, + "Name": _encode_cloudkit_text_field(name), "Deleted": {"type": "INT64", "value": 0}, "Reminder": { "type": "REFERENCE", @@ -762,7 +763,7 @@ def create_hashtag(self, reminder: Reminder, name: str) -> Hashtag: def update_hashtag(self, hashtag: Hashtag, name: str) -> None: """Update an existing hashtag name.""" fields: dict[str, Any] = { - "Name": {"type": "STRING", "value": name}, + "Name": _encode_cloudkit_text_field(name), } if hashtag.reminder_id: fields["Reminder"] = { diff --git a/pyicloud/services/reminders/client.py b/pyicloud/services/reminders/client.py index 8dd5a2c2..6ac294d6 100644 --- a/pyicloud/services/reminders/client.py +++ b/pyicloud/services/reminders/client.py @@ -187,6 +187,29 @@ def query( "Query response validation failed", payload=data ) from e + def current_sync_token( + self, + *, + zone_id: CKZoneIDReq, + record_type: str = "reminderList", + ) -> str | None: + """Fetch the current zone sync token using a lightweight query first.""" + payload = CKQueryRequest( + query=CKQueryObject(recordType=record_type), + zoneID=zone_id, + resultsLimit=1, + ).model_dump(mode="json", exclude_none=True) + + try: + data = self._http.post("/records/query", payload) + response = self._validate_response(CKQueryResponse, data) + except (RemindersApiError, ValidationError): + return None + + if getattr(response, "syncToken", None): + return str(response.syncToken) + return None + def changes( self, *, diff --git a/pyicloud/services/reminders/service.py b/pyicloud/services/reminders/service.py index d6218846..67e087b6 100644 --- a/pyicloud/services/reminders/service.py +++ b/pyicloud/services/reminders/service.py @@ -92,6 +92,7 @@ def __init__( ) base_params = { "remapEnums": True, + "getCurrentSyncToken": True, **(params or {}), } self._raw = CloudKitRemindersClient( diff --git a/tests/services/test_reminders_cloudkit.py b/tests/services/test_reminders_cloudkit.py index 2ef8b0e9..6d964871 100644 --- a/tests/services/test_reminders_cloudkit.py +++ b/tests/services/test_reminders_cloudkit.py @@ -319,6 +319,35 @@ def test_reminders_client_strict_mode_wraps_validation_error(): assert isinstance(excinfo.value.__cause__, ValidationError) +def test_reminders_client_current_sync_token_uses_query_sync_token(): + session = MagicMock() + session.post.return_value = MagicMock( + status_code=200, + json=lambda: {"records": [], "syncToken": "tok-query"}, + ) + client = CloudKitRemindersClient("https://example.com", session, {}) + + token = client.current_sync_token(zone_id=CKZoneIDReq(zoneName="Reminders")) + + assert token == "tok-query" + query_payload = session.post.call_args.kwargs["json"] + assert query_payload["query"]["recordType"] == "reminderList" + assert query_payload["resultsLimit"] == 1 + + +def test_reminders_client_current_sync_token_returns_none_when_missing(): + session = MagicMock() + session.post.return_value = MagicMock( + status_code=200, + json=lambda: {"records": []}, + ) + client = CloudKitRemindersClient("https://example.com", session, {}) + + token = client.current_sync_token(zone_id=CKZoneIDReq(zoneName="Reminders")) + + assert token is None + + def test_reminders_service_passes_through_validation_override(): service = RemindersService( "https://example.com", @@ -634,6 +663,40 @@ def test_list_parses_inline_reminder_ids_json(self, service): lst = service._record_to_list(rec) assert lst.reminder_ids == ["REM-1", "REM-2"] + assert lst.count == 2 + + def test_list_falls_back_to_reminder_ids_length_when_count_missing(self, service): + rec = _ck_record( + "List", + "LIST-003A", + { + "ReminderIDs": { + "type": "STRING", + "value": '["REM-1","Reminder/REM-2","REM-3"]', + } + }, + ) + + lst = service._record_to_list(rec) + assert lst.reminder_ids == ["REM-1", "REM-2", "REM-3"] + assert lst.count == 3 + + def test_list_falls_back_to_reminder_ids_length_when_count_is_zero(self, service): + rec = _ck_record( + "List", + "LIST-003B", + { + "Count": {"type": "INT64", "value": 0}, + "ReminderIDs": { + "type": "STRING", + "value": '["REM-1","Reminder/REM-2"]', + }, + }, + ) + + lst = service._record_to_list(rec) + assert lst.reminder_ids == ["REM-1", "REM-2"] + assert lst.count == 2 def test_list_parses_asset_backed_reminder_ids_from_downloaded_data(self, service): payload = base64.b64encode(b'["REM-1","Reminder/REM-2"]').decode("ascii") @@ -650,6 +713,7 @@ def test_list_parses_asset_backed_reminder_ids_from_downloaded_data(self, servic lst = service._record_to_list(rec) assert lst.reminder_ids == ["REM-1", "REM-2"] + assert lst.count == 2 service._raw.download_asset_bytes.assert_not_called() def test_list_parses_asset_backed_reminder_ids_from_download_url(self, service): @@ -667,6 +731,7 @@ def test_list_parses_asset_backed_reminder_ids_from_download_url(self, service): lst = service._record_to_list(rec) assert lst.reminder_ids == ["REM-3", "REM-4"] + assert lst.count == 2 service._raw.download_asset_bytes.assert_called_once_with( "https://example.com/reminder-ids.json" ) @@ -1393,7 +1458,9 @@ def test_create_and_delete_hashtag(self): create_ops = svc._raw.modify.call_args.kwargs["operations"] assert len(create_ops) == 2 assert create_ops[1].record.recordType == "Hashtag" - assert create_ops[1].record.fields["Name"].value == "travel" + name_field = create_ops[1].record.fields["Name"].root + assert name_field.type == "ENCRYPTED_BYTES" + assert name_field.value == b"travel" assert svc._raw.modify.call_args.kwargs["atomic"] is True svc._raw.modify.reset_mock() @@ -1877,6 +1944,46 @@ def _side_effect(**kwargs): assert reminder.record_change_tag == "ctag-rem-new" assert hashtag.record_change_tag == "ctag-hash-new" + def test_create_hashtag_name_round_trips_via_mapper(self): + svc = RemindersService("https://ckdatabasews.icloud.com", MagicMock(), {}) + svc._raw = MagicMock() + svc._raw.modify.return_value = self._ok_modify() + + reminder = Reminder( + id="Reminder/REM-TAG-ROUNDTRIP", + list_id="List/LIST-001", + title="Hashtag reminder", + record_change_tag="ctag-rem-old", + hashtag_ids=[], + ) + + svc.create_hashtag(reminder, "travel") + + name_field = ( + svc._raw.modify.call_args.kwargs["operations"][1].record.fields["Name"].root + ) + parsed = svc._record_to_hashtag( + _ck_record( + "Hashtag", + "Hashtag/HASH-ROUNDTRIP", + { + "Name": { + "type": name_field.type, + "value": base64.b64encode(name_field.value).decode("ascii"), + }, + "Reminder": { + "type": "REFERENCE", + "value": { + "recordName": "Reminder/REM-TAG-ROUNDTRIP", + "action": "VALIDATE", + }, + }, + }, + ) + ) + + assert parsed.name == "travel" + def test_create_url_attachment_hydrates_record_change_tags(self): svc = RemindersService("https://ckdatabasews.icloud.com", MagicMock(), {}) svc._raw = MagicMock() @@ -1992,6 +2099,45 @@ def _side_effect(**kwargs): svc.update_recurrence_rule(recurrence_rule, interval=2) assert recurrence_rule.record_change_tag == "new-recurrencerule-tag" + def test_update_hashtag_writes_encoded_name_field(self): + svc = RemindersService("https://ckdatabasews.icloud.com", MagicMock(), {}) + svc._raw = MagicMock() + svc._raw.modify.return_value = self._ok_modify() + + hashtag = Hashtag( + id="Hashtag/H-UPD-ENC", + name="old", + reminder_id="Reminder/REM-UPD-ENC", + record_change_tag="old-hashtag-tag", + ) + svc.update_hashtag(hashtag, "chores") + + operation = svc._raw.modify.call_args.kwargs["operations"][0] + name_field = operation.record.fields["Name"].root + assert name_field.type == "ENCRYPTED_BYTES" + assert name_field.value == b"chores" + + parsed = svc._record_to_hashtag( + _ck_record( + "Hashtag", + "Hashtag/H-UPD-ENC", + { + "Name": { + "type": name_field.type, + "value": base64.b64encode(name_field.value).decode("ascii"), + }, + "Reminder": { + "type": "REFERENCE", + "value": { + "recordName": "Reminder/REM-UPD-ENC", + "action": "VALIDATE", + }, + }, + }, + ) + ) + assert parsed.name == "chores" + class TestReminderReadPaths: """Validate reminders() and list_reminders() query behavior.""" @@ -2661,6 +2807,7 @@ def _changes_response( def test_sync_cursor_returns_final_paged_token(self): svc = RemindersService("https://ckdatabasews.icloud.com", MagicMock(), {}) svc._raw = MagicMock() + svc._raw.current_sync_token.return_value = None svc._raw.changes.side_effect = [ self._changes_response([], sync_token="tok-1", more_coming=True), self._changes_response([], sync_token="tok-2", more_coming=False), @@ -2675,6 +2822,15 @@ def test_sync_cursor_returns_final_paged_token(self): assert first_zone_req.desiredRecordTypes == [] assert first_zone_req.desiredKeys == [] + def test_sync_cursor_prefers_query_sync_token(self): + svc = RemindersService("https://ckdatabasews.icloud.com", MagicMock(), {}) + svc._raw = MagicMock() + svc._raw.current_sync_token.return_value = "tok-query" + + assert svc.sync_cursor() == "tok-query" + svc._raw.current_sync_token.assert_called_once() + svc._raw.changes.assert_not_called() + def test_iter_changes_emits_updated_deleted_and_tombstone_events(self): svc = RemindersService("https://ckdatabasews.icloud.com", MagicMock(), {}) svc._raw = MagicMock() diff --git a/tests/test_cmdline.py b/tests/test_cmdline.py index a34f2d46..d691a89a 100644 --- a/tests/test_cmdline.py +++ b/tests/test_cmdline.py @@ -16,6 +16,30 @@ import click from typer.testing import CliRunner +from pyicloud.services.notes.models import Attachment as NoteAttachment +from pyicloud.services.notes.models import ChangeEvent as NoteChangeEvent +from pyicloud.services.notes.models import ( + Note, + NoteFolder, + NoteSummary, +) +from pyicloud.services.notes.service import NoteLockedError, NoteNotFound +from pyicloud.services.reminders.client import RemindersApiError, RemindersAuthError +from pyicloud.services.reminders.models import ( + Alarm, + AlarmWithTrigger, + Hashtag, + ListRemindersResult, + LocationTrigger, + Proximity, + RecurrenceFrequency, + RecurrenceRule, + Reminder, + ReminderChangeEvent, + RemindersList, + URLAttachment, +) + account_index_module = importlib.import_module("pyicloud.cli.account_index") cli_module = importlib.import_module("pyicloud.cli.app") context_module = importlib.import_module("pyicloud.cli.context") @@ -195,6 +219,614 @@ def delete(self, anonymous_id: str) -> dict[str, Any]: return {"anonymousId": anonymous_id, "deleted": True} +class FakeNotes: + """Notes service fixture.""" + + def __init__(self) -> None: + attachment = NoteAttachment( + id="Attachment/PDF", + filename="agenda.pdf", + uti="com.adobe.pdf", + size=12, + download_url="https://example.com/agenda.pdf", + preview_url="https://example.com/agenda-preview.pdf", + thumbnail_url="https://example.com/agenda-thumb.png", + ) + self.recent_requests: list[int] = [] + self.iter_all_requests: list[str | None] = [] + self.folder_requests: list[tuple[str, int | None]] = [] + self.render_calls: list[dict[str, Any]] = [] + self.export_calls: list[dict[str, Any]] = [] + self.change_requests: list[str | None] = [] + self.folder_rows = [ + NoteFolder( + id="Folder/NOTES", + name="Notes", + has_subfolders=False, + count=1, + ), + NoteFolder( + id="Folder/WORK", + name="Work", + has_subfolders=True, + count=3, + ), + ] + self.recent_rows = [ + NoteSummary( + id="Note/DELETED", + title="Deleted Note", + snippet="Old note", + modified_at=datetime(2026, 3, 5, tzinfo=timezone.utc), + folder_id="Folder/DELETED", + folder_name="Recently Deleted", + is_deleted=True, + is_locked=False, + ), + NoteSummary( + id="Note/DAILY", + title="Daily Plan", + snippet="Ship CLI", + modified_at=datetime(2026, 3, 4, tzinfo=timezone.utc), + folder_id="Folder/NOTES", + folder_name="Notes", + is_deleted=False, + is_locked=False, + ), + NoteSummary( + id="Note/MEETING", + title="Meeting Notes", + snippet="Discuss roadmap", + modified_at=datetime(2026, 3, 3, tzinfo=timezone.utc), + folder_id="Folder/WORK", + folder_name="Work", + is_deleted=False, + is_locked=False, + ), + ] + self.all_rows = [ + self.recent_rows[2], + NoteSummary( + id="Note/FOLLOWUP", + title="Meeting Follow-up", + snippet="Send recap", + modified_at=datetime(2026, 3, 2, tzinfo=timezone.utc), + folder_id="Folder/WORK", + folder_name="Work", + is_deleted=False, + is_locked=False, + ), + self.recent_rows[1], + # Duplicate entry to verify deduplication in search_notes_by_title. + self.recent_rows[2], + ] + self.notes = { + "Note/DAILY": Note( + id="Note/DAILY", + title="Daily Plan", + snippet="Ship CLI", + modified_at=datetime(2026, 3, 4, tzinfo=timezone.utc), + folder_id="Folder/NOTES", + folder_name="Notes", + is_deleted=False, + is_locked=False, + text="Ship CLI", + html="
Ship CLI
", + attachments=[attachment], + ), + "Note/MEETING": Note( + id="Note/MEETING", + title="Meeting Notes", + snippet="Discuss roadmap", + modified_at=datetime(2026, 3, 3, tzinfo=timezone.utc), + folder_id="Folder/WORK", + folder_name="Work", + is_deleted=False, + is_locked=False, + text="Discuss roadmap", + html="Discuss roadmap
", + attachments=[attachment], + ), + "Note/FOLLOWUP": Note( + id="Note/FOLLOWUP", + title="Meeting Follow-up", + snippet="Send recap", + modified_at=datetime(2026, 3, 2, tzinfo=timezone.utc), + folder_id="Folder/WORK", + folder_name="Work", + is_deleted=False, + is_locked=False, + text="Send recap", + html="Send recap
", + attachments=None, + ), + } + self.change_rows = [ + NoteChangeEvent(type="updated", note=self.recent_rows[1]), + NoteChangeEvent(type="deleted", note=self.recent_rows[0]), + ] + self.cursor = "notes-cursor-1" + + @staticmethod + def _matches_id(note_id: str, query: str) -> bool: + return note_id == query or note_id.split("/", 1)[-1] == query + + def recents(self, *, limit: int = 50): + self.recent_requests.append(limit) + return list(self.recent_rows[:limit]) + + def folders(self): + return list(self.folder_rows) + + def in_folder(self, folder_id: str, limit: int | None = None): + self.folder_requests.append((folder_id, limit)) + rows = [row for row in self.all_rows if row.folder_id == folder_id] + return list(rows[:limit] if limit is not None else rows) + + def iter_all(self, *, since: Optional[str] = None): + self.iter_all_requests.append(since) + return iter(self.all_rows) + + def get(self, note_id: str, *, with_attachments: bool = False): + if self._matches_id("Note/LOCKED", note_id): + raise NoteLockedError(f"Note is locked: {note_id}") + for candidate_id, note in self.notes.items(): + if self._matches_id(candidate_id, note_id): + attachments = note.attachments if with_attachments else None + return note.model_copy(update={"attachments": attachments}) + raise NoteNotFound(f"Note not found: {note_id}") + + def render_note(self, note_id: str, **kwargs: Any) -> str: + note = self.get(note_id, with_attachments=False) + self.render_calls.append({"note_id": note.id, **kwargs}) + return note.html or f"{note.id}
" + + def export_note(self, note_id: str, output_dir: str, **kwargs: Any) -> str: + note = self.get(note_id, with_attachments=False) + path = Path(output_dir) / f"{note.id.split('/', 1)[-1].lower()}.html" + self.export_calls.append( + {"note_id": note.id, "output_dir": output_dir, **kwargs} + ) + return str(path) + + def iter_changes(self, *, since: Optional[str] = None): + self.change_requests.append(since) + return iter(self.change_rows) + + def sync_cursor(self) -> str: + return self.cursor + + +class FakeReminders: + """Reminders service fixture.""" + + def __init__(self) -> None: + self.list_rows = { + "List/INBOX": RemindersList( + id="List/INBOX", + title="Inbox", + color='{"daHexString":"#007AFF","ckSymbolicColorName":"blue"}', + count=0, + ), + "List/WORK": RemindersList( + id="List/WORK", + title="Work", + color='{"daHexString":"#34C759","ckSymbolicColorName":"green"}', + count=0, + ), + } + self.reminder_rows = { + "Reminder/A": Reminder( + id="Reminder/A", + list_id="List/INBOX", + title="Buy milk", + desc="2 percent", + completed=False, + due_date=datetime(2026, 3, 31, 9, 0, tzinfo=timezone.utc), + priority=1, + flagged=True, + all_day=False, + time_zone="Europe/Luxembourg", + alarm_ids=["Alarm/A"], + hashtag_ids=["Hashtag/ERRANDS"], + attachment_ids=["Attachment/LINK"], + recurrence_rule_ids=["Recurrence/WEEKLY"], + parent_reminder_id="Reminder/PARENT", + created=datetime(2026, 3, 1, tzinfo=timezone.utc), + modified=datetime(2026, 3, 4, tzinfo=timezone.utc), + ), + "Reminder/B": Reminder( + id="Reminder/B", + list_id="List/INBOX", + title="Pay rent", + desc="", + completed=True, + completed_date=datetime(2026, 3, 2, tzinfo=timezone.utc), + priority=0, + flagged=False, + all_day=False, + created=datetime(2026, 3, 1, tzinfo=timezone.utc), + modified=datetime(2026, 3, 2, tzinfo=timezone.utc), + ), + "Reminder/C": Reminder( + id="Reminder/C", + list_id="List/WORK", + title="Prepare deck", + desc="Slides for review", + completed=False, + priority=5, + flagged=False, + all_day=False, + created=datetime(2026, 3, 3, tzinfo=timezone.utc), + modified=datetime(2026, 3, 4, tzinfo=timezone.utc), + ), + } + self.alarm_rows = { + "Alarm/A": Alarm( + id="Alarm/A", + alarm_uid="alarm-a", + reminder_id="Reminder/A", + trigger_id="Trigger/A", + ) + } + self.trigger_rows = { + "Trigger/A": LocationTrigger( + id="Trigger/A", + alarm_id="Alarm/A", + title="Office", + address="1 Infinite Loop", + latitude=37.3318, + longitude=-122.0312, + radius=150.0, + proximity=Proximity.ARRIVING, + location_uid="office", + ) + } + self.hashtag_rows = { + "Hashtag/ERRANDS": Hashtag( + id="Hashtag/ERRANDS", + name="errands", + reminder_id="Reminder/A", + created=datetime(2026, 3, 1, tzinfo=timezone.utc), + ) + } + self.attachment_rows = { + "Attachment/LINK": URLAttachment( + id="Attachment/LINK", + reminder_id="Reminder/A", + url="https://example.com/checklist", + uti="public.url", + ) + } + self.recurrence_rows = { + "Recurrence/WEEKLY": RecurrenceRule( + id="Recurrence/WEEKLY", + reminder_id="Reminder/A", + frequency=RecurrenceFrequency.WEEKLY, + interval=1, + occurrence_count=0, + first_day_of_week=1, + ) + } + self.snapshot_requests: list[dict[str, Any]] = [] + self.change_requests: list[str | None] = [] + self.cursor = "reminders-cursor-1" + + @staticmethod + def _matches_id(record_id: str, query: str) -> bool: + return record_id == query or record_id.split("/", 1)[-1] == query + + def _find_reminder(self, reminder_id: str) -> Reminder: + for candidate_id, reminder in self.reminder_rows.items(): + if self._matches_id(candidate_id, reminder_id): + return reminder + raise LookupError(f"Reminder not found: {reminder_id}") + + def lists(self): + for row in self.list_rows.values(): + row.count = sum( + 1 + for reminder in self.reminder_rows.values() + if reminder.list_id == row.id and not reminder.deleted + ) + return list(self.list_rows.values()) + + def reminders(self, list_id: Optional[str] = None): + rows = [ + reminder + for reminder in self.reminder_rows.values() + if not reminder.deleted and (list_id is None or reminder.list_id == list_id) + ] + return list(rows) + + def list_reminders( + self, + list_id: str, + include_completed: bool = False, + results_limit: int = 200, + ) -> ListRemindersResult: + normalized = list_id if list_id.startswith("List/") else f"List/{list_id}" + self.snapshot_requests.append( + { + "list_id": normalized, + "include_completed": include_completed, + "results_limit": results_limit, + } + ) + reminders = [ + reminder + for reminder in self.reminder_rows.values() + if reminder.list_id == normalized + and not reminder.deleted + and (include_completed or not reminder.completed) + ][:results_limit] + reminder_ids = {reminder.id for reminder in reminders} + return ListRemindersResult( + reminders=reminders, + alarms={ + alarm_id: alarm + for alarm_id, alarm in self.alarm_rows.items() + if alarm.reminder_id in reminder_ids + }, + triggers={ + trigger_id: trigger + for trigger_id, trigger in self.trigger_rows.items() + if any( + alarm.trigger_id == trigger_id + for alarm in self.alarm_rows.values() + if alarm.reminder_id in reminder_ids + ) + }, + attachments={ + attachment_id: attachment + for attachment_id, attachment in self.attachment_rows.items() + if attachment.reminder_id in reminder_ids + }, + hashtags={ + hashtag_id: hashtag + for hashtag_id, hashtag in self.hashtag_rows.items() + if hashtag.reminder_id in reminder_ids + }, + recurrence_rules={ + rule_id: rule + for rule_id, rule in self.recurrence_rows.items() + if rule.reminder_id in reminder_ids + }, + ) + + def get(self, reminder_id: str) -> Reminder: + return self._find_reminder(reminder_id) + + def create( + self, + list_id: str, + title: str, + desc: str = "", + completed: bool = False, + due_date: Optional[datetime] = None, + priority: int = 0, + flagged: bool = False, + all_day: bool = False, + time_zone: Optional[str] = None, + parent_reminder_id: Optional[str] = None, + ) -> Reminder: + next_id = f"Reminder/CREATED-{len(self.reminder_rows) + 1}" + reminder = Reminder( + id=next_id, + list_id=list_id, + title=title, + desc=desc, + completed=completed, + due_date=due_date, + priority=priority, + flagged=flagged, + all_day=all_day, + time_zone=time_zone, + parent_reminder_id=parent_reminder_id, + created=datetime(2026, 3, 30, tzinfo=timezone.utc), + modified=datetime(2026, 3, 30, tzinfo=timezone.utc), + ) + self.reminder_rows[reminder.id] = reminder + return reminder + + def update(self, reminder: Reminder) -> None: + self.reminder_rows[reminder.id] = reminder + + def delete(self, reminder: Reminder) -> None: + reminder.deleted = True + self.reminder_rows[reminder.id] = reminder + + def add_location_trigger( + self, + reminder: Reminder, + title: str = "", + address: str = "", + latitude: float = 0.0, + longitude: float = 0.0, + radius: float = 100.0, + proximity: Proximity = Proximity.ARRIVING, + ) -> tuple[Alarm, LocationTrigger]: + index = len(self.alarm_rows) + 1 + alarm = Alarm( + id=f"Alarm/{index}", + alarm_uid=f"alarm-{index}", + reminder_id=reminder.id, + trigger_id=f"Trigger/{index}", + ) + trigger = LocationTrigger( + id=f"Trigger/{index}", + alarm_id=alarm.id, + title=title, + address=address, + latitude=latitude, + longitude=longitude, + radius=radius, + proximity=proximity, + location_uid=f"location-{index}", + ) + self.alarm_rows[alarm.id] = alarm + self.trigger_rows[trigger.id] = trigger + reminder.alarm_ids.append(alarm.id) + return alarm, trigger + + def create_hashtag(self, reminder: Reminder, name: str) -> Hashtag: + hashtag = Hashtag( + id=f"Hashtag/{name.upper()}", + name=name, + reminder_id=reminder.id, + created=datetime(2026, 3, 30, tzinfo=timezone.utc), + ) + self.hashtag_rows[hashtag.id] = hashtag + reminder.hashtag_ids.append(hashtag.id) + return hashtag + + def update_hashtag(self, hashtag: Hashtag, name: str) -> None: + hashtag.name = name + + def delete_hashtag(self, reminder: Reminder, hashtag: Hashtag) -> None: + reminder.hashtag_ids = [ + row_id for row_id in reminder.hashtag_ids if row_id != hashtag.id + ] + self.hashtag_rows.pop(hashtag.id, None) + + def create_url_attachment( + self, reminder: Reminder, url: str, uti: str = "public.url" + ) -> URLAttachment: + attachment = URLAttachment( + id=f"Attachment/{len(self.attachment_rows) + 1}", + reminder_id=reminder.id, + url=url, + uti=uti, + ) + self.attachment_rows[attachment.id] = attachment + reminder.attachment_ids.append(attachment.id) + return attachment + + def update_attachment( + self, + attachment: URLAttachment, + *, + url: Optional[str] = None, + uti: Optional[str] = None, + filename: Optional[str] = None, + file_size: Optional[int] = None, + width: Optional[int] = None, + height: Optional[int] = None, + ) -> None: + if url is not None: + attachment.url = url + if uti is not None: + attachment.uti = uti + + def delete_attachment(self, reminder: Reminder, attachment: URLAttachment) -> None: + reminder.attachment_ids = [ + row_id for row_id in reminder.attachment_ids if row_id != attachment.id + ] + self.attachment_rows.pop(attachment.id, None) + + def create_recurrence_rule( + self, + reminder: Reminder, + *, + frequency: RecurrenceFrequency = RecurrenceFrequency.DAILY, + interval: int = 1, + occurrence_count: int = 0, + first_day_of_week: int = 0, + ) -> RecurrenceRule: + rule = RecurrenceRule( + id=f"Recurrence/{len(self.recurrence_rows) + 1}", + reminder_id=reminder.id, + frequency=frequency, + interval=interval, + occurrence_count=occurrence_count, + first_day_of_week=first_day_of_week, + ) + self.recurrence_rows[rule.id] = rule + reminder.recurrence_rule_ids.append(rule.id) + return rule + + def update_recurrence_rule( + self, + recurrence_rule: RecurrenceRule, + *, + frequency: Optional[RecurrenceFrequency] = None, + interval: Optional[int] = None, + occurrence_count: Optional[int] = None, + first_day_of_week: Optional[int] = None, + ) -> None: + if frequency is not None: + recurrence_rule.frequency = frequency + if interval is not None: + recurrence_rule.interval = interval + if occurrence_count is not None: + recurrence_rule.occurrence_count = occurrence_count + if first_day_of_week is not None: + recurrence_rule.first_day_of_week = first_day_of_week + + def delete_recurrence_rule( + self, reminder: Reminder, recurrence_rule: RecurrenceRule + ) -> None: + reminder.recurrence_rule_ids = [ + row_id + for row_id in reminder.recurrence_rule_ids + if row_id != recurrence_rule.id + ] + self.recurrence_rows.pop(recurrence_rule.id, None) + + def alarms_for(self, reminder: Reminder) -> list[AlarmWithTrigger]: + rows = [] + for alarm_id in reminder.alarm_ids: + alarm = self.alarm_rows[alarm_id] + rows.append( + AlarmWithTrigger( + alarm=alarm, + trigger=self.trigger_rows.get(alarm.trigger_id), + ) + ) + return rows + + def tags_for(self, reminder: Reminder) -> list[Hashtag]: + return [ + self.hashtag_rows[row_id] + for row_id in reminder.hashtag_ids + if row_id in self.hashtag_rows + ] + + def attachments_for(self, reminder: Reminder) -> list[URLAttachment]: + return [ + self.attachment_rows[row_id] + for row_id in reminder.attachment_ids + if row_id in self.attachment_rows + ] + + def recurrence_rules_for(self, reminder: Reminder) -> list[RecurrenceRule]: + return [ + self.recurrence_rows[row_id] + for row_id in reminder.recurrence_rule_ids + if row_id in self.recurrence_rows + ] + + def iter_changes(self, *, since: Optional[str] = None): + self.change_requests.append(since) + return iter( + [ + ReminderChangeEvent( + type="updated", + reminder_id="Reminder/A", + reminder=self.reminder_rows["Reminder/A"], + ), + ReminderChangeEvent( + type="deleted", + reminder_id="Reminder/Z", + reminder=None, + ), + ] + ) + + def sync_cursor(self) -> str: + return self.cursor + + class FakeAPI: """Authenticated API fixture.""" @@ -325,6 +957,8 @@ def __init__( all=photo_album, ) self.hidemyemail = FakeHideMyEmail() + self.notes = FakeNotes() + self.reminders = FakeReminders() def _logout( self, @@ -524,6 +1158,8 @@ def test_root_help() -> None: "drive", "photos", "hidemyemail", + "notes", + "reminders", ): assert command in text @@ -550,6 +1186,8 @@ def test_group_help() -> None: "drive", "photos", "hidemyemail", + "notes", + "reminders", ): result = _runner().invoke(app, [command, "--help"]) assert result.exit_code == 0 @@ -567,6 +1205,8 @@ def test_bare_group_invocation_shows_help() -> None: "drive", "photos", "hidemyemail", + "notes", + "reminders", ): result = _runner().invoke(app, [command]) text = _plain_output(result) @@ -575,6 +1215,22 @@ def test_bare_group_invocation_shows_help() -> None: assert "Missing command" not in text +def test_notes_and_reminders_leaf_help() -> None: + """New service groups and reminder subgroups should expose leaf help.""" + + for cli_args in ( + ["notes", "search", "--help"], + ["reminders", "create", "--help"], + ["reminders", "alarm", "--help"], + ["reminders", "alarm", "add-location", "--help"], + ["reminders", "hashtag", "--help"], + ["reminders", "attachment", "--help"], + ["reminders", "recurrence", "--help"], + ): + result = _runner().invoke(app, cli_args) + assert result.exit_code == 0 + + def test_leaf_help_includes_execution_context_options() -> None: """Leaf command help should show the command-local options it supports.""" @@ -1886,458 +2542,682 @@ def test_non_interactive_2sa_does_not_send_verification_code() -> None: """Non-interactive 2SA should fail before sending a verification code.""" fake_api = FakeAPI() - fake_api.requires_2sa = True - fake_api.trusted_devices = [{"deviceName": "Trusted Device", "phoneNumber": "+1"}] - - result = _invoke(fake_api, "auth", "login", interactive=False) - - assert result.exit_code != 0 - assert result.exception.args[0] == ( - "Two-step authentication is required, but interactive prompts are disabled." - ) - fake_api.send_verification_code.assert_not_called() - + fake_api.requires_2fa = True -def test_devices_list_and_show_commands() -> None: - """Devices list and show should expose summary and detailed views.""" + def request_prompt() -> bool: + fake_api.two_factor_delivery_method = "trusted_device" + return True - fake_api = FakeAPI() - list_result = _invoke(fake_api, "devices", "list", "--locate") - show_result = _invoke(fake_api, "devices", "show", "device-1") - raw_result = _invoke( - fake_api, - "devices", - "show", - "device-1", - "--raw", - output_format="json", - ) - assert list_result.exit_code == 0 - assert "Example iPhone" in list_result.stdout - assert show_result.exit_code == 0 - assert "Battery Status" in show_result.stdout - assert raw_result.exit_code == 0 - assert json.loads(raw_result.stdout)["deviceDisplayName"] == "iPhone" + fake_api.request_2fa_code.side_effect = request_prompt + with patch.object(context_module.typer, "prompt", return_value="123456"): + result = _invoke(fake_api, "auth", "login", interactive=True) -def test_devices_show_reports_reauthentication_requirement() -> None: - """Device resolution should collapse reauth failures into a CLIAbort.""" + assert result.exit_code == 0 + assert "Requested a 2FA prompt on your trusted Apple devices." in result.stdout + fake_api.validate_2fa_code.assert_called_once_with("123456") - session_dir = _unique_session_dir("devices-show-reauth") - class ReauthAPI: - def __init__(self) -> None: - self.account_name = "user@example.com" - self.is_china_mainland = False - self.session = SimpleNamespace( - session_path=str(session_dir / "userexamplecom.session"), - cookiejar_path=str(session_dir / "userexamplecom.cookiejar"), - ) - self.get_auth_status = MagicMock( - return_value={ - "authenticated": True, - "trusted_session": True, - "requires_2fa": False, - "requires_2sa": False, - } - ) +def test_notes_commands() -> None: + """Notes commands should expose list, detail, render, export, and sync flows.""" - @property - def devices(self): - raise context_module.PyiCloudFailedLoginException("No password set") + fake_api = FakeAPI() - result = _invoke( - ReauthAPI(), - "devices", - "show", - "Example iPhone", - session_dir=session_dir, - ) + recent_result = _invoke(fake_api, "notes", "recent") + assert recent_result.exit_code == 0 + assert "Daily Plan" in recent_result.stdout + assert "Deleted Note" not in recent_result.stdout - assert result.exit_code != 0 - assert result.exception.args[0] == ( - "Find My requires re-authentication for user@example.com. " - "Run: icloud auth login --username user@example.com" + recent_json_result = _invoke( + fake_api, + "notes", + "recent", + "--include-deleted", + output_format="json", ) + recent_payload = json.loads(recent_json_result.stdout) + assert recent_json_result.exit_code == 0 + assert [row["id"] for row in recent_payload] == [ + "Note/DELETED", + "Note/DAILY", + "Note/MEETING", + ] + folders_result = _invoke(fake_api, "notes", "folders") + assert folders_result.exit_code == 0 + assert "Work" in folders_result.stdout -def test_account_summary_reports_reauthentication_requirement() -> None: - """Account commands should collapse reauth failures into a CLIAbort.""" - - session_dir = _unique_session_dir("account-summary-reauth") - - class ReauthAPI: - def __init__(self) -> None: - self.account_name = "user@example.com" - self.is_china_mainland = False - self.session = SimpleNamespace( - session_path=str(session_dir / "userexamplecom.session"), - cookiejar_path=str(session_dir / "userexamplecom.cookiejar"), - ) - self.get_auth_status = MagicMock( - return_value={ - "authenticated": True, - "trusted_session": True, - "requires_2fa": False, - "requires_2sa": False, - } - ) - - @property - def account(self): - raise context_module.PyiCloudFailedLoginException("No password set") - - result = _invoke( - ReauthAPI(), - "account", - "summary", - session_dir=session_dir, - ) - - assert result.exit_code != 0 - assert result.exception.args[0] == ( - "Account requires re-authentication for user@example.com. " - "Run: icloud auth login --username user@example.com" + folder_list_result = _invoke( + fake_api, + "notes", + "list", + "--folder-id", + "Folder/WORK", + "--limit", + "2", + output_format="json", ) + folder_payload = json.loads(folder_list_result.stdout) + assert folder_list_result.exit_code == 0 + assert [row["id"] for row in folder_payload] == ["Note/MEETING", "Note/FOLLOWUP"] - -def test_devices_mutations_and_export() -> None: - """Device actions should map to the Find My device methods.""" - - fake_api = FakeAPI() - export_path = TEST_ROOT / "device.json" - export_path.parent.mkdir(parents=True, exist_ok=True) - sound_result = _invoke( + all_notes_result = _invoke( fake_api, - "devices", - "sound", - "device-1", - "--subject", - "Ping", + "notes", + "list", + "--all", + "--since", + "notes-prev", + "--limit", + "2", output_format="json", ) - silent_result = _invoke( + all_payload = json.loads(all_notes_result.stdout) + assert all_notes_result.exit_code == 0 + assert fake_api.notes.iter_all_requests[-1] == "notes-prev" + assert [row["id"] for row in all_payload] == ["Note/MEETING", "Note/FOLLOWUP"] + + get_result = _invoke( fake_api, - "devices", - "message", - "device-1", - "Hello", - "--silent", + "notes", + "get", + "Note/DAILY", + "--with-attachments", + output_format="json", ) - lost_result = _invoke( + get_payload = json.loads(get_result.stdout) + assert get_result.exit_code == 0 + assert get_payload["attachments"][0]["id"] == "Attachment/PDF" + + render_result = _invoke( fake_api, - "devices", - "lost-mode", - "device-1", - "--phone", - "123", - "--message", - "Lost", - "--passcode", - "4567", + "notes", + "render", + "Note/DAILY", + "--preview-appearance", + "dark", + "--pdf-height", + "720", + output_format="json", ) + render_payload = json.loads(render_result.stdout) + assert render_result.exit_code == 0 + assert render_payload["html"] == "Ship CLI
" + assert fake_api.notes.render_calls[-1]["preview_appearance"] == "dark" + assert fake_api.notes.render_calls[-1]["pdf_object_height"] == 720 + export_result = _invoke( fake_api, - "devices", + "notes", "export", - "device-1", - "--output", - str(export_path), + "Note/DAILY", + "--output-dir", + str(TEST_ROOT / "notes-export"), + "--export-mode", + "lightweight", + "--fragment", + "--preview-appearance", + "dark", + "--pdf-height", + "480", output_format="json", ) - assert sound_result.exit_code == 0 - assert json.loads(sound_result.stdout)["subject"] == "Ping" - assert fake_api.devices[0].sound_subject == "Ping" - assert silent_result.exit_code == 0 - assert fake_api.devices[0].messages[-1]["sounds"] is False - assert lost_result.exit_code == 0 - assert fake_api.devices[0].lost_mode == { - "number": "123", - "text": "Lost", - "newpasscode": "4567", - } - assert export_result.exit_code == 0 export_payload = json.loads(export_result.stdout) - written_payload = json.loads(export_path.read_text(encoding="utf-8")) - assert export_payload["path"] == str(export_path) - assert export_payload["raw"] is False - assert written_payload["name"] == "Example iPhone" - assert written_payload["display_name"] == "iPhone" - assert "raw_data" in written_payload - assert "deviceDisplayName" not in written_payload - - raw_export_path = TEST_ROOT / "device-raw.json" - raw_export_result = _invoke( + assert export_result.exit_code == 0 + assert export_payload["path"].endswith("daily.html") + assert fake_api.notes.export_calls[-1]["export_mode"] == "lightweight" + assert fake_api.notes.export_calls[-1]["full_page"] is False + assert fake_api.notes.export_calls[-1]["preview_appearance"] == "dark" + assert fake_api.notes.export_calls[-1]["pdf_object_height"] == 480 + + changes_result = _invoke( fake_api, - "devices", - "export", - "device-1", - "--output", - str(raw_export_path), - "--raw", + "notes", + "changes", + "--since", + "notes-prev", + "--limit", + "1", output_format="json", ) - no_raw_export_path = TEST_ROOT / "device-no-raw.json" - no_raw_export_result = _invoke( + changes_payload = json.loads(changes_result.stdout) + assert changes_result.exit_code == 0 + assert fake_api.notes.change_requests[-1] == "notes-prev" + assert changes_payload[0]["type"] == "updated" + + cursor_result = _invoke(fake_api, "notes", "sync-cursor") + assert cursor_result.exit_code == 0 + assert cursor_result.stdout.strip() == "notes-cursor-1" + + +def test_notes_search_uses_recents_first_and_fallback() -> None: + """Notes search should probe recents first, fall back to iter_all, and dedupe.""" + + fake_api = FakeAPI() + + result = _invoke( fake_api, - "devices", - "export", - "device-1", - "--output", - str(no_raw_export_path), - "--no-raw", + "notes", + "search", + "--title-contains", + "Meeting", + "--limit", + "2", output_format="json", ) - assert raw_export_result.exit_code == 0 - assert json.loads(raw_export_result.stdout)["raw"] is True - assert "deviceDisplayName" in json.loads( - raw_export_path.read_text(encoding="utf-8") - ) - assert no_raw_export_result.exit_code == 0 - assert json.loads(no_raw_export_result.stdout)["raw"] is False - assert "display_name" in json.loads(no_raw_export_path.read_text(encoding="utf-8")) + + payload = json.loads(result.stdout) + assert result.exit_code == 0 + assert [row["id"] for row in payload] == ["Note/MEETING", "Note/FOLLOWUP"] + assert fake_api.notes.recent_requests[-1] == 500 + assert fake_api.notes.iter_all_requests == [None] -def test_device_mutation_reports_reauthentication_requirement() -> None: - """Mutating Find My commands should surface a clean reauthentication message.""" +def test_notes_commands_report_errors() -> None: + """Notes commands should surface clean selection and note-specific errors.""" fake_api = FakeAPI() - fake_api.devices[0].play_sound = MagicMock( - side_effect=context_module.PyiCloudFailedLoginException("No password set") + + search_result = _invoke(fake_api, "notes", "search") + assert search_result.exit_code != 0 + assert search_result.exception.args[0] == ( + "Pass --title or --title-contains to search notes." ) - result = _invoke(fake_api, "devices", "sound", "device-1") + missing_result = _invoke(fake_api, "notes", "get", "Note/MISSING") + assert missing_result.exit_code != 0 + assert missing_result.exception.args[0] == "Note not found: Note/MISSING" - assert result.exit_code != 0 - assert result.exception.args[0] == ( - "Find My requires re-authentication for user@example.com. " - "Run: icloud auth login --username user@example.com" - ) + locked_result = _invoke(fake_api, "notes", "get", "Note/LOCKED") + assert locked_result.exit_code != 0 + assert locked_result.exception.args[0] == "Note is locked: Note/LOCKED" -def test_destructive_device_commands_require_unique_match() -> None: - """Lost mode should require an unambiguous device name or an explicit device id.""" +def test_notes_commands_report_reauthentication_and_unavailability() -> None: + """Notes commands should wrap service reauth and service-unavailable failures.""" - fake_api = FakeAPI() - duplicate = FakeDevice() - duplicate.id = "device-2" - duplicate.data["id"] = duplicate.id - fake_api.devices = [fake_api.devices[0], duplicate] + class ReauthNotes: + def recents(self, *, limit: int = 50): + raise context_module.PyiCloudFailedLoginException("No password set") - result = _invoke(fake_api, "devices", "lost-mode", "Example iPhone") + class UnavailableNotes: + def sync_cursor(self) -> str: + raise context_module.PyiCloudServiceUnavailable("temporarily unavailable") - assert result.exit_code != 0 - assert result.exception.args[0] == ( - "Multiple devices matched 'Example iPhone'. Use a device id instead.\n" - " - device-1 (Example iPhone / iPhone)\n" - " - device-2 (Example iPhone / iPhone)" + fake_api = FakeAPI() + fake_api.notes = ReauthNotes() + reauth_result = _invoke(fake_api, "notes", "recent") + assert reauth_result.exit_code != 0 + assert reauth_result.exception.args[0] == ( + "Notes requires re-authentication for user@example.com. " + "Run: icloud auth login --username user@example.com" + ) + + fake_api = FakeAPI() + fake_api.notes = UnavailableNotes() + unavailable_result = _invoke(fake_api, "notes", "sync-cursor") + assert unavailable_result.exit_code != 0 + assert unavailable_result.exception.args[0] == ( + "Notes service unavailable: temporarily unavailable" ) -def test_calendar_and_contacts_commands() -> None: - """Calendar and contacts groups should expose read commands.""" +def test_reminders_core_commands() -> None: + """Reminders core commands should expose list, detail, mutation, and sync flows.""" fake_api = FakeAPI() - calendars = _invoke(fake_api, "calendar", "calendars") - contacts = _invoke(fake_api, "contacts", "me") - assert calendars.exit_code == 0 - assert "Home" in calendars.stdout - assert contacts.exit_code == 0 - assert "John Appleseed" in contacts.stdout + lists_result = _invoke(fake_api, "reminders", "lists") + assert lists_result.exit_code == 0 + assert "Inbox" in lists_result.stdout + assert "blue (#007AFF)" in lists_result.stdout -def test_drive_and_photos_commands() -> None: - """Drive and photos commands should expose listing and download flows.""" + list_result = _invoke(fake_api, "reminders", "list", output_format="json") + list_payload = json.loads(list_result.stdout) + assert list_result.exit_code == 0 + assert [row["id"] for row in list_payload] == ["Reminder/A", "Reminder/C"] + assert all(not row["completed"] for row in list_payload) - fake_api = FakeAPI() - output_path = TEST_ROOT / "photo.bin" - json_output_path = TEST_ROOT / "report.txt" - output_path.parent.mkdir(parents=True, exist_ok=True) - drive_result = _invoke(fake_api, "drive", "list", "/") - photo_result = _invoke( + completed_result = _invoke( fake_api, - "photos", - "download", - "photo-1", - "--output", - str(output_path), + "reminders", + "list", + "--list-id", + "INBOX", + "--include-completed", + output_format="json", ) - json_drive_result = _invoke( + completed_payload = json.loads(completed_result.stdout) + assert completed_result.exit_code == 0 + assert [row["id"] for row in completed_payload] == ["Reminder/A", "Reminder/B"] + assert fake_api.reminders.snapshot_requests[-1]["list_id"] == "List/INBOX" + + get_result = _invoke(fake_api, "reminders", "get", "Reminder/A") + assert get_result.exit_code == 0 + assert "Parent Reminder" in get_result.stdout + + create_result = _invoke( fake_api, - "drive", - "download", - "/report.txt", - "--output", - str(json_output_path), + "reminders", + "create", + "--list-id", + "INBOX", + "--title", + "Call mom", + "--desc", + "Saturday", + "--priority", + "9", + "--flagged", + "--all-day", output_format="json", ) - assert drive_result.exit_code == 0 - assert "report.txt" in drive_result.stdout - assert photo_result.exit_code == 0 - assert output_path.read_bytes() == b"photo-1:original" - assert json_drive_result.exit_code == 0 - assert json.loads(json_drive_result.stdout)["path"] == str(json_output_path) + create_payload = json.loads(create_result.stdout) + created_id = create_payload["id"] + assert create_result.exit_code == 0 + assert create_payload["list_id"] == "List/INBOX" + assert create_payload["flagged"] is True + assert create_payload["all_day"] is True + update_result = _invoke( + fake_api, + "reminders", + "update", + "Reminder/A", + "--title", + "Buy oat milk", + "--not-flagged", + "--clear-time-zone", + "--clear-parent-reminder", + output_format="json", + ) + update_payload = json.loads(update_result.stdout) + assert update_result.exit_code == 0 + assert update_payload["title"] == "Buy oat milk" + assert update_payload["flagged"] is False + assert update_payload["time_zone"] is None + assert update_payload["parent_reminder_id"] is None -def test_drive_missing_paths_report_cli_abort() -> None: - """Drive commands should collapse missing path lookups into CLIAbort errors.""" + status_result = _invoke( + fake_api, + "reminders", + "set-status", + "Reminder/A", + "--completed", + output_format="json", + ) + status_payload = json.loads(status_result.stdout) + assert status_result.exit_code == 0 + assert status_payload["completed"] is True - fake_api = FakeAPI() - output_path = TEST_ROOT / "missing.txt" + snapshot_result = _invoke( + fake_api, + "reminders", + "snapshot", + "--list-id", + "INBOX", + output_format="json", + ) + snapshot_payload = json.loads(snapshot_result.stdout) + assert snapshot_result.exit_code == 0 + assert set(snapshot_payload) == { + "alarms", + "attachments", + "hashtags", + "recurrence_rules", + "reminders", + "triggers", + } - list_result = _invoke(fake_api, "drive", "list", "/missing") - download_result = _invoke( + changes_result = _invoke( fake_api, - "drive", - "download", - "/missing", - "--output", - str(output_path), + "reminders", + "changes", + "--since", + "reminders-prev", + "--limit", + "1", + output_format="json", ) + changes_payload = json.loads(changes_result.stdout) + assert changes_result.exit_code == 0 + assert fake_api.reminders.change_requests[-1] == "reminders-prev" + assert changes_payload[0]["type"] == "updated" - assert list_result.exit_code != 0 - assert list_result.exception.args[0] == "Path not found: /missing" - assert download_result.exit_code != 0 - assert download_result.exception.args[0] == "Path not found: /missing" + cursor_result = _invoke(fake_api, "reminders", "sync-cursor") + assert cursor_result.exit_code == 0 + assert cursor_result.stdout.strip() == "reminders-cursor-1" + delete_result = _invoke( + fake_api, + "reminders", + "delete", + created_id, + output_format="json", + ) + delete_payload = json.loads(delete_result.stdout) + assert delete_result.exit_code == 0 + assert delete_payload["deleted"] is True + assert fake_api.reminders.reminder_rows[created_id].deleted is True -def test_photos_commands_report_reauthentication_requirement() -> None: - """Photos commands should wrap nested service operations in service_call.""" - class ReauthAlbums: - @property - def albums(self): - raise context_module.PyiCloudFailedLoginException("No password set") +def test_reminders_subgroup_commands() -> None: + """Reminder subgroup commands should expose alarm, hashtag, attachment, and recurrence flows.""" fake_api = FakeAPI() - fake_api.photos = ReauthAlbums() - albums_result = _invoke(fake_api, "photos", "albums") + alarm_list_result = _invoke( + fake_api, + "reminders", + "alarm", + "list", + "Reminder/A", + output_format="json", + ) + alarm_list_payload = json.loads(alarm_list_result.stdout) + assert alarm_list_result.exit_code == 0 + assert alarm_list_payload[0]["alarm"]["id"] == "Alarm/A" - assert albums_result.exit_code != 0 - assert albums_result.exception.args[0] == ( - "Photos requires re-authentication for user@example.com. " - "Run: icloud auth login --username user@example.com" + alarm_create_result = _invoke( + fake_api, + "reminders", + "alarm", + "add-location", + "Reminder/C", + "--title", + "Home", + "--address", + "Rue de Example", + "--latitude", + "49.61", + "--longitude", + "6.13", + "--radius", + "75", + "--proximity", + "leaving", + output_format="json", + ) + alarm_create_payload = json.loads(alarm_create_result.stdout) + assert alarm_create_result.exit_code == 0 + assert alarm_create_payload["trigger"]["title"] == "Home" + assert ( + fake_api.reminders.trigger_rows[alarm_create_payload["trigger"]["id"]].proximity + == Proximity.LEAVING ) - class BrokenPhoto(FakePhoto): - def download(self, version: str = "original") -> bytes: - raise context_module.PyiCloudFailedLoginException("No password set") + hashtag_list_result = _invoke( + fake_api, + "reminders", + "hashtag", + "list", + "Reminder/A", + output_format="json", + ) + hashtag_list_payload = json.loads(hashtag_list_result.stdout) + assert hashtag_list_result.exit_code == 0 + assert hashtag_list_payload[0]["id"] == "Hashtag/ERRANDS" - photo_album = FakePhotoAlbum("All Photos", [BrokenPhoto("photo-1", "img.jpg")]) - fake_api = FakeAPI() - fake_api.photos = SimpleNamespace( - albums=FakeAlbumContainer([photo_album]), - all=photo_album, + hashtag_create_result = _invoke( + fake_api, + "reminders", + "hashtag", + "create", + "Reminder/C", + "home", + output_format="json", ) - output_path = TEST_ROOT / "photo-reauth.bin" + hashtag_create_payload = json.loads(hashtag_create_result.stdout) + hashtag_suffix = hashtag_create_payload["id"].split("/", 1)[1] + assert hashtag_create_result.exit_code == 0 - download_result = _invoke( + hashtag_update_result = _invoke( fake_api, - "photos", - "download", - "photo-1", - "--output", - str(output_path), + "reminders", + "hashtag", + "update", + "Reminder/C", + hashtag_suffix, + "--name", + "chores", + output_format="json", ) + hashtag_update_payload = json.loads(hashtag_update_result.stdout) + assert hashtag_update_result.exit_code == 0 + assert hashtag_update_payload["name"] == "chores" - assert download_result.exit_code != 0 - assert download_result.exception.args[0] == ( - "Photos requires re-authentication for user@example.com. " - "Run: icloud auth login --username user@example.com" + hashtag_delete_result = _invoke( + fake_api, + "reminders", + "hashtag", + "delete", + "Reminder/C", + hashtag_suffix, + output_format="json", ) + hashtag_delete_payload = json.loads(hashtag_delete_result.stdout) + assert hashtag_delete_result.exit_code == 0 + assert hashtag_delete_payload["deleted"] is True + attachment_list_result = _invoke( + fake_api, + "reminders", + "attachment", + "list", + "Reminder/A", + output_format="json", + ) + attachment_list_payload = json.loads(attachment_list_result.stdout) + assert attachment_list_result.exit_code == 0 + assert attachment_list_payload[0]["id"] == "Attachment/LINK" -def test_hidemyemail_commands() -> None: - """Hide My Email commands should expose list and generate.""" + attachment_create_result = _invoke( + fake_api, + "reminders", + "attachment", + "create-url", + "Reminder/C", + "--url", + "https://example.com/new", + output_format="json", + ) + attachment_create_payload = json.loads(attachment_create_result.stdout) + attachment_suffix = attachment_create_payload["id"].split("/", 1)[1] + assert attachment_create_result.exit_code == 0 - fake_api = FakeAPI() - list_result = _invoke(fake_api, "hidemyemail", "list") - generate_result = _invoke(fake_api, "hidemyemail", "generate") - assert list_result.exit_code == 0 - assert "Shopping" in list_result.stdout - assert generate_result.exit_code == 0 - assert "generated@privaterelay.appleid.com" in generate_result.stdout + attachment_update_result = _invoke( + fake_api, + "reminders", + "attachment", + "update", + "Reminder/C", + attachment_suffix, + "--url", + "https://example.org/new", + "--uti", + "public.url", + output_format="json", + ) + attachment_update_payload = json.loads(attachment_update_result.stdout) + assert attachment_update_result.exit_code == 0 + assert attachment_update_payload["url"] == "https://example.org/new" + attachment_delete_result = _invoke( + fake_api, + "reminders", + "attachment", + "delete", + "Reminder/C", + attachment_suffix, + output_format="json", + ) + attachment_delete_payload = json.loads(attachment_delete_result.stdout) + assert attachment_delete_result.exit_code == 0 + assert attachment_delete_payload["deleted"] is True -def test_hidemyemail_generate_requires_alias() -> None: - """Generate should fail when the backend returns an empty alias.""" + recurrence_list_result = _invoke( + fake_api, + "reminders", + "recurrence", + "list", + "Reminder/A", + output_format="json", + ) + recurrence_list_payload = json.loads(recurrence_list_result.stdout) + assert recurrence_list_result.exit_code == 0 + assert recurrence_list_payload[0]["id"] == "Recurrence/WEEKLY" - fake_api = FakeAPI() - fake_api.hidemyemail.generate = MagicMock(return_value=None) + recurrence_create_result = _invoke( + fake_api, + "reminders", + "recurrence", + "create", + "Reminder/C", + "--frequency", + "monthly", + "--interval", + "2", + output_format="json", + ) + recurrence_create_payload = json.loads(recurrence_create_result.stdout) + recurrence_suffix = recurrence_create_payload["id"].split("/", 1)[1] + assert recurrence_create_result.exit_code == 0 - result = _invoke(fake_api, "hidemyemail", "generate") + recurrence_update_result = _invoke( + fake_api, + "reminders", + "recurrence", + "update", + "Reminder/C", + recurrence_suffix, + "--frequency", + "yearly", + "--interval", + "3", + "--occurrence-count", + "4", + output_format="json", + ) + recurrence_update_payload = json.loads(recurrence_update_result.stdout) + assert recurrence_update_result.exit_code == 0 + assert recurrence_update_payload["interval"] == 3 + assert recurrence_update_payload["occurrence_count"] == 4 - assert result.exit_code != 0 - assert result.exception.args[0] == ( - "Hide My Email generate returned an empty alias." + recurrence_delete_result = _invoke( + fake_api, + "reminders", + "recurrence", + "delete", + "Reminder/C", + recurrence_suffix, + output_format="json", ) + recurrence_delete_payload = json.loads(recurrence_delete_result.stdout) + assert recurrence_delete_result.exit_code == 0 + assert recurrence_delete_payload["deleted"] is True -def test_hidemyemail_update_omits_note_when_not_provided() -> None: - """Label-only updates should not overwrite notes with a synthetic default.""" +def test_reminders_commands_report_errors() -> None: + """Reminders commands should surface clean validation and lookup errors.""" fake_api = FakeAPI() - update_metadata = MagicMock(return_value={"anonymousId": "alias-1", "label": "New"}) - fake_api.hidemyemail.update_metadata = update_metadata - result = _invoke(fake_api, "hidemyemail", "update", "alias-1", "New") + missing_result = _invoke(fake_api, "reminders", "get", "Reminder/MISSING") + assert missing_result.exit_code != 0 + assert missing_result.exception.args[0] == "Reminder not found: Reminder/MISSING" - assert result.exit_code == 0 - update_metadata.assert_called_once_with("alias-1", "New", None) + update_result = _invoke(fake_api, "reminders", "update", "Reminder/A") + assert update_result.exit_code != 0 + assert update_result.exception.args[0] == "No reminder updates were requested." + hashtag_result = _invoke( + fake_api, + "reminders", + "hashtag", + "delete", + "Reminder/A", + "missing", + ) + assert hashtag_result.exit_code != 0 + assert hashtag_result.exception.args[0] == ( + "No hashtag matched 'missing' for reminder Reminder/A." + ) -def test_hidemyemail_mutations_require_valid_payload() -> None: - """Hide My Email mutators should reject empty success payloads.""" + attachment_result = _invoke( + fake_api, + "reminders", + "attachment", + "update", + "Reminder/A", + "LINK", + ) + assert attachment_result.exit_code != 0 + assert attachment_result.exception.args[0] == ( + "No attachment updates were requested." + ) - fake_api = FakeAPI() - fake_api.hidemyemail.delete = MagicMock(return_value={}) + recurrence_result = _invoke( + fake_api, + "reminders", + "recurrence", + "update", + "Reminder/A", + "WEEKLY", + ) + assert recurrence_result.exit_code != 0 + assert recurrence_result.exception.args[0] == ( + "No recurrence updates were requested." + ) - result = _invoke(fake_api, "hidemyemail", "delete", "alias-1") + class ApiErrorReminders: + def sync_cursor(self) -> str: + raise RemindersApiError("sync failed") - assert result.exit_code != 0 - assert result.exception.args[0] == ( - "Hide My Email delete returned an invalid response: {}" - ) + class AuthErrorReminders: + def sync_cursor(self) -> str: + raise RemindersAuthError("token expired") fake_api = FakeAPI() - fake_api.hidemyemail.reserve = MagicMock(return_value={}) - - result = _invoke( - fake_api, - "hidemyemail", - "reserve", - "alias@example.com", - "Shopping", - ) + fake_api.reminders = ApiErrorReminders() + api_error_result = _invoke(fake_api, "reminders", "sync-cursor") + assert api_error_result.exit_code != 0 + assert api_error_result.exception.args[0] == "sync failed" - assert result.exit_code != 0 - assert result.exception.args[0] == ( - "Hide My Email reserve returned an invalid response: {}" - ) + fake_api = FakeAPI() + fake_api.reminders = AuthErrorReminders() + auth_error_result = _invoke(fake_api, "reminders", "sync-cursor") + assert auth_error_result.exit_code != 0 + assert auth_error_result.exception.args[0] == "token expired" -def test_hidemyemail_list_reports_reauthentication_requirement() -> None: - """Hide My Email iteration errors should be wrapped in a CLIAbort.""" +def test_reminders_commands_report_reauthentication_and_unavailability() -> None: + """Reminders commands should wrap service reauth and service-unavailable failures.""" - class ReauthHideMyEmail: - def __iter__(self): + class ReauthReminders: + def lists(self): raise context_module.PyiCloudFailedLoginException("No password set") - def generate(self) -> str: # pragma: no cover - not used in this test - return "ignored" + class UnavailableReminders: + def sync_cursor(self) -> str: + raise context_module.PyiCloudServiceUnavailable("temporarily unavailable") fake_api = FakeAPI() - fake_api.hidemyemail = ReauthHideMyEmail() - - result = _invoke(fake_api, "hidemyemail", "list") - - assert result.exit_code != 0 - assert result.exception.args[0] == ( - "Hide My Email requires re-authentication for user@example.com. " + fake_api.reminders = ReauthReminders() + reauth_result = _invoke(fake_api, "reminders", "lists") + assert reauth_result.exit_code != 0 + assert reauth_result.exception.args[0] == ( + "Reminders requires re-authentication for user@example.com. " "Run: icloud auth login --username user@example.com" ) + fake_api = FakeAPI() + fake_api.reminders = UnavailableReminders() + unavailable_result = _invoke(fake_api, "reminders", "sync-cursor") + assert unavailable_result.exit_code != 0 + assert unavailable_result.exception.args[0] == ( + "Reminders service unavailable: temporarily unavailable" + ) + def test_main_returns_clean_error_for_user_abort(capsys) -> None: """The entrypoint should not emit a traceback for expected CLI errors.""" diff --git a/tests/test_notes.py b/tests/test_notes.py index 3b4cd327..6155cb03 100644 --- a/tests/test_notes.py +++ b/tests/test_notes.py @@ -257,6 +257,37 @@ def test_notes_service_export_note_uses_lazy_importer(self): self.assertEqual(exported, output_path) mock_export.assert_called_once() + def test_iter_all_skips_changes_when_sync_cursor_is_current(self): + self.service._raw = MagicMock() + self.service._raw.current_sync_token.return_value = "tok-current" + + rows = list(self.service.iter_all(since="tok-current")) + + self.assertEqual(rows, []) + self.service._raw.current_sync_token.assert_called_once_with(zone_name="Notes") + self.service._raw.changes.assert_not_called() + + def test_iter_changes_skips_changes_when_sync_cursor_is_current(self): + self.service._raw = MagicMock() + self.service._raw.current_sync_token.return_value = "tok-current" + + rows = list(self.service.iter_changes(since="tok-current")) + + self.assertEqual(rows, []) + self.service._raw.current_sync_token.assert_called_once_with(zone_name="Notes") + self.service._raw.changes.assert_not_called() + + def test_iter_all_uses_changes_when_sync_cursor_is_not_current(self): + self.service._raw = MagicMock() + self.service._raw.current_sync_token.return_value = "tok-other" + self.service._raw.changes.return_value = [] + + rows = list(self.service.iter_all(since="tok-stale")) + + self.assertEqual(rows, []) + self.service._raw.current_sync_token.assert_called_once_with(zone_name="Notes") + self.service._raw.changes.assert_called_once() + def test_notes_service_attachment_lookup_prefers_canonical_record_names(self): note_record = CKRecord.model_validate( { diff --git a/tests/test_output.py b/tests/test_output.py index 915c11be..34b7b6a2 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -8,6 +8,7 @@ TABLE_TITLE_STYLE, console_kv_table, console_table, + format_color_value, ) @@ -34,3 +35,20 @@ def test_console_kv_table_styles_key_column() -> None: assert table.border_style == TABLE_BORDER_STYLE assert tuple(table.row_styles) == TABLE_ROW_STYLES assert table.columns[0].style == TABLE_KEY_STYLE + + +def test_format_color_value_handles_symbolic_payloads() -> None: + """Reminder color payloads should render as a compact symbolic label.""" + + assert ( + format_color_value('{"daHexString":"#007AFF","ckSymbolicColorName":"blue"}') + == "blue (#007AFF)" + ) + + +def test_format_color_value_handles_plain_values() -> None: + """Plain, empty, and malformed color values should degrade gracefully.""" + + assert format_color_value("#34C759") == "#34C759" + assert format_color_value("") == "" + assert format_color_value("{not-json}") == "{not-json}"