From 2a1c82471bbeb630357e5cbb3abecda889aef42e Mon Sep 17 00:00:00 2001 From: GeiserX <9169332+GeiserX@users.noreply.github.com> Date: Wed, 25 Feb 2026 10:48:47 +0100 Subject: [PATCH] feat: import Telegram Desktop chat exports (#81) New `telegram-archive import` CLI command that reads Telegram Desktop exports (result.json + media folders) and inserts messages, users, and media into the database. Imported chats appear in the web viewer like any other backed-up chat. Supports --dry-run, --skip-media, --merge, --chat-id flags. Handles text, photos, videos, documents, voice, stickers, service messages, forwards, replies, and edited messages. Closes #81 --- README.md | 36 +++ docs/CHANGELOG.md | 14 + pyproject.toml | 2 +- src/__main__.py | 62 +++++ src/telegram_import.py | 426 ++++++++++++++++++++++++++++++ tests/test_telegram_import.py | 479 ++++++++++++++++++++++++++++++++++ 6 files changed, 1018 insertions(+), 1 deletion(-) create mode 100644 src/telegram_import.py create mode 100644 tests/test_telegram_import.py diff --git a/README.md b/README.md index 85e4944..8b88444 100644 --- a/README.md +++ b/README.md @@ -462,8 +462,44 @@ docker compose exec telegram-backup python -m src backup # Re-authenticate (if session expires) docker compose exec -it telegram-backup python -m src auth + +# Import a Telegram Desktop export +docker compose exec telegram-backup python -m src import -p /data/exports/ChatExport ``` +### Importing Telegram Desktop Exports + +You can import chat history exported from Telegram Desktop (Settings > Advanced > Export Telegram data) into Telegram-Archive. The exported chat will appear in the web viewer just like live-backed-up chats. + +```bash +# Basic import (auto-detects chat ID from export) +telegram-archive import -p /path/to/ChatExport_2024-01-15 + +# Import with explicit chat ID (marked format) +telegram-archive import -p /path/to/export -c -1001234567890 + +# Dry run — validate without writing anything +telegram-archive import -p /path/to/export --dry-run + +# Import text only, skip media files +telegram-archive import -p /path/to/export --skip-media + +# Merge into an existing chat (add/update messages) +telegram-archive import -p /path/to/export --merge +``` + +**Flags:** + +| Flag | Description | +|------|-------------| +| `-p, --path` | Path to export folder containing `result.json` (required) | +| `-c, --chat-id` | Override chat ID in marked format (e.g., `-1001234567890`) | +| `--dry-run` | Parse and validate without writing to DB or copying media | +| `--skip-media` | Import only messages and metadata, skip media files | +| `--merge` | Allow importing into a chat that already has messages | + +**Supported content:** Text messages, photos, videos, documents, voice messages, stickers, animations, service messages (pins, group actions), forwarded messages, replies, and edited messages. + ## Data Storage ``` diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f828910..1ec183b 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,20 @@ For upgrade instructions, see [Upgrading](#upgrading) at the bottom. ## [Unreleased] +## [6.4.0] - 2026-02-25 + +### Added + +- **Import Telegram Desktop chat exports** — New `telegram-archive import` CLI command reads Telegram Desktop exports (`result.json` + media folders) and inserts them into the database. Imported chats appear in the web viewer like any other backed-up chat. Supports both single-chat and full-account exports. Closes [#81](https://github.com/GeiserX/Telegram-Archive/issues/81). + - `--path` — Path to export folder containing `result.json` + - `--chat-id` — Override chat ID (marked format) + - `--dry-run` — Validate without writing to DB or copying media + - `--skip-media` — Import only messages/metadata + - `--merge` — Allow importing into a chat that already has messages +- Handles text messages, photos, videos, documents, voice messages, stickers, and service messages (pins, group actions, etc.) +- Forwards, replies, and edited messages are preserved with full metadata +- Media files are copied into the standard media directory structure + ## [6.3.2] - 2026-02-17 ### Fixed diff --git a/pyproject.toml b/pyproject.toml index 3bd5f1a..663519e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "telegram-archive" -version = "6.3.1" +version = "6.4.0" description = "Automated Telegram backup with Docker. Performs incremental backups of messages and media on a configurable schedule." readme = "README.md" requires-python = ">=3.14" diff --git a/src/__main__.py b/src/__main__.py index 0ddc032..39a02af 100644 --- a/src/__main__.py +++ b/src/__main__.py @@ -33,6 +33,10 @@ def create_parser() -> argparse.ArgumentParser: telegram-archive stats # Show backup statistics telegram-archive export -o file.json # Export to JSON + 4. Import Telegram Desktop exports: + telegram-archive import -p /path/to/export + telegram-archive import -p /path/to/export -c -1001234567890 --merge + LOCAL DEVELOPMENT: Use --data-dir to specify an alternative data location (default: /data): @@ -109,6 +113,26 @@ def create_parser() -> argparse.ArgumentParser: "list-chats", help="List all backed up chats", description="Show a table of all chats in the backup database." ) + # Import command + import_parser = subparsers.add_parser( + "import", + help="Import Telegram Desktop chat export", + description="Import a Telegram Desktop chat export (result.json + media) into the database.", + ) + import_parser.add_argument("-p", "--path", required=True, help="Path to export folder containing result.json") + import_parser.add_argument( + "-c", "--chat-id", type=int, help="Override chat ID (marked format, e.g. -1001234567890)" + ) + import_parser.add_argument( + "--dry-run", action="store_true", help="Parse and validate without writing to DB or copying media" + ) + import_parser.add_argument( + "--skip-media", action="store_true", help="Import only messages/metadata, skip media files" + ) + import_parser.add_argument( + "--merge", action="store_true", help="Allow importing into a chat that already has messages" + ) + return parser @@ -172,6 +196,42 @@ async def run_list_chats(args) -> int: return 1 +async def run_import(args) -> int: + """Run import command.""" + from .config import Config, setup_logging + from .telegram_import import TelegramImporter + + try: + config = Config() + setup_logging(config) + + importer = await TelegramImporter.create(config.media_path) + try: + summary = await importer.run( + export_path=args.path, + chat_id_override=args.chat_id, + dry_run=args.dry_run, + skip_media=args.skip_media, + merge=args.merge, + ) + prefix = "[DRY RUN] " if args.dry_run else "" + print(f"\n{prefix}Import complete:") + print(f" Chats: {summary['chats_imported']}") + print(f" Messages: {summary['total_messages']}") + print(f" Media files: {summary['total_media']}") + for detail in summary["details"]: + print( + f" - {detail['chat_name']} (ID {detail['chat_id']}): " + f"{detail['messages']} messages, {detail['media']} media" + ) + finally: + await importer.close() + return 0 + except Exception as e: + print(f"Import failed: {e}", file=sys.stderr) + return 1 + + def run_auth(args) -> int: """Run authentication setup.""" from .setup_auth import main as auth_main @@ -231,6 +291,8 @@ def main() -> int: return asyncio.run(run_stats(args)) elif args.command == "list-chats": return asyncio.run(run_list_chats(args)) + elif args.command == "import": + return asyncio.run(run_import(args)) else: parser.print_help() return 0 diff --git a/src/telegram_import.py b/src/telegram_import.py new file mode 100644 index 0000000..029ceea --- /dev/null +++ b/src/telegram_import.py @@ -0,0 +1,426 @@ +""" +Import Telegram Desktop chat exports into Telegram-Archive. + +Reads result.json from Telegram Desktop exports (single-chat or full-account) +and inserts messages, users, and media into the existing database schema. +""" + +import json +import logging +import shutil +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from .db import DatabaseAdapter, close_database, get_adapter, init_database + +logger = logging.getLogger(__name__) + +BATCH_SIZE = 500 + +CHAT_TYPE_MAP = { + "personal_chat": "user", + "bot_chat": "user", + "saved_messages": "user", + "private_group": "group", + "private_supergroup": "supergroup", + "public_supergroup": "supergroup", + "private_channel": "channel", + "public_channel": "channel", +} + +MEDIA_TYPE_MAP = { + "animation": "animation", + "video_file": "video", + "video_message": "video_note", + "voice_message": "voice", + "audio_file": "audio", + "sticker": "sticker", +} + + +def parse_from_id(from_id: str | None) -> int | None: + """Parse Telegram Desktop's from_id string into a numeric ID. + + Formats: "user123456789", "channel123456789", "group123456789" + """ + if not from_id: + return None + for prefix, multiplier in (("user", 1), ("channel", -1), ("group", -1)): + if from_id.startswith(prefix): + try: + raw = int(from_id[len(prefix) :]) + if prefix == "channel": + return -(1000000000000 + raw) + return raw * multiplier + except ValueError: + return None + return None + + +def derive_chat_id(export_id: int, export_type: str) -> int: + """Derive a marked chat ID from the export's raw id and type.""" + if export_type in ("personal_chat", "bot_chat", "saved_messages"): + return export_id + if export_type == "private_group": + return -export_id + if export_type in ("private_supergroup", "public_supergroup", "private_channel", "public_channel"): + return -(1000000000000 + export_id) + return export_id + + +def flatten_text(text_field: str | list | None) -> str: + """Flatten Telegram Desktop's text field to plain string. + + The field can be a plain string or an array of text entity objects + like [{"type": "plain", "text": "Hello "}, {"type": "bold", "text": "world"}]. + """ + if text_field is None: + return "" + if isinstance(text_field, str): + return text_field + if isinstance(text_field, list): + parts = [] + for item in text_field: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, dict): + parts.append(item.get("text", "")) + return "".join(parts) + return str(text_field) + + +def parse_date(msg: dict) -> datetime | None: + """Parse date from a Telegram Desktop export message.""" + if "date_unixtime" in msg: + try: + return datetime.fromtimestamp(int(msg["date_unixtime"]), tz=UTC).replace(tzinfo=None) + except ValueError, TypeError, OSError: + pass + if "date" in msg: + try: + return datetime.fromisoformat(msg["date"]).replace(tzinfo=None) + except ValueError, TypeError: + pass + return None + + +def parse_edited_date(msg: dict) -> datetime | None: + """Parse edit date from a Telegram Desktop export message.""" + if "edited_unixtime" in msg: + try: + return datetime.fromtimestamp(int(msg["edited_unixtime"]), tz=UTC).replace(tzinfo=None) + except ValueError, TypeError, OSError: + pass + if "edited" in msg: + try: + return datetime.fromisoformat(msg["edited"]).replace(tzinfo=None) + except ValueError, TypeError: + pass + return None + + +def _detect_media(msg: dict, export_path: Path) -> tuple[str | None, str | None, str | None]: + """Detect media type and file path from an export message. + + Returns (media_type, relative_path, original_filename). + """ + if "photo" in msg and msg["photo"]: + rel = msg["photo"] + return "photo", rel, Path(rel).name + + if "file" in msg and msg["file"]: + rel = msg["file"] + fname = msg.get("file_name") or Path(rel).name + media_type = MEDIA_TYPE_MAP.get(msg.get("media_type", ""), "document") + return media_type, rel, fname + + return None, None, None + + +def _build_service_text(msg: dict) -> str: + """Build display text for service messages from action fields.""" + action = msg.get("action", "") + actor = msg.get("actor", "") or msg.get("from", "") + text_parts = [] + + if actor: + text_parts.append(actor) + + action_map = { + "pin_message": "pinned a message", + "phone_call": "made a phone call", + "create_group": "created the group", + "invite_members": "invited members", + "remove_members": "removed members", + "join_group_by_link": "joined the group via invite link", + "join_group_by_request": "joined the group via request", + "migrate_to_supergroup": "upgraded to supergroup", + "migrate_from_group": "migrated from group", + "edit_group_title": "changed the group title", + "edit_group_photo": "changed the group photo", + "delete_group_photo": "removed the group photo", + "score_in_game": "scored in a game", + "custom_action": msg.get("text", "performed an action"), + } + + text_parts.append(action_map.get(action, action.replace("_", " ") if action else "performed an action")) + + if msg.get("title"): + text_parts.append(f'"{msg["title"]}"') + if msg.get("members"): + names = [m if isinstance(m, str) else str(m) for m in msg["members"]] + text_parts.append(", ".join(names)) + + return " ".join(text_parts) + + +class TelegramImporter: + """Import Telegram Desktop exports into Telegram-Archive database.""" + + def __init__(self, db: DatabaseAdapter, media_path: str): + self.db = db + self.media_path = media_path + + @classmethod + async def create(cls, media_path: str) -> TelegramImporter: + await init_database() + db = await get_adapter() + return cls(db, media_path) + + async def close(self) -> None: + await close_database() + + async def run( + self, + export_path: str, + chat_id_override: int | None = None, + dry_run: bool = False, + skip_media: bool = False, + merge: bool = False, + ) -> dict[str, Any]: + """Run the import process. + + Returns a summary dict with counts per chat. + """ + path = Path(export_path) + result_file = path / "result.json" + if not result_file.exists(): + raise FileNotFoundError(f"result.json not found in {path}") + + logger.info(f"Reading {result_file}...") + with open(result_file, encoding="utf-8") as f: + data = json.load(f) + + chats = self._extract_chats(data) + if not chats: + raise ValueError("No chats found in export file") + + summary: dict[str, Any] = {"chats_imported": 0, "total_messages": 0, "total_media": 0, "details": []} + + for chat_data in chats: + chat_id = ( + chat_id_override + if chat_id_override + else derive_chat_id(chat_data.get("id", 0), chat_data.get("type", "personal_chat")) + ) + + if chat_id == 0: + logger.warning(f"Skipping chat with no ID: {chat_data.get('name', 'unknown')}") + continue + + result = await self._import_chat( + chat_data=chat_data, + chat_id=chat_id, + export_path=path, + dry_run=dry_run, + skip_media=skip_media, + merge=merge, + ) + + summary["chats_imported"] += 1 + summary["total_messages"] += result["messages"] + summary["total_media"] += result["media"] + summary["details"].append(result) + + if chat_id_override and len(chats) > 1: + logger.info("--chat-id provided with multi-chat export; only importing first chat") + break + + return summary + + def _extract_chats(self, data: dict) -> list[dict]: + """Extract chat list from either single-chat or full-account export.""" + if "messages" in data: + return [data] + if "chats" in data and isinstance(data["chats"], dict): + chat_list = data["chats"].get("list", []) + if isinstance(chat_list, list): + return chat_list + return [] + + async def _import_chat( + self, + chat_data: dict, + chat_id: int, + export_path: Path, + dry_run: bool, + skip_media: bool, + merge: bool, + ) -> dict[str, Any]: + """Import a single chat from export data.""" + chat_name = chat_data.get("name", "Unknown") + export_type = chat_data.get("type", "personal_chat") + messages = chat_data.get("messages", []) + + logger.info(f"Importing chat '{chat_name}' (ID: {chat_id}, type: {export_type}) - {len(messages)} messages") + + if not merge and not dry_run: + existing = await self.db.get_chat_stats(chat_id) + if existing and existing.get("messages", 0) > 0: + raise ValueError( + f"Chat {chat_id} ('{chat_name}') already has {existing['messages']} messages. " + "Use --merge to import into an existing chat." + ) + + if not dry_run: + await self.db.upsert_chat( + { + "id": chat_id, + "type": CHAT_TYPE_MAP.get(export_type, "unknown"), + "title": chat_name if export_type not in ("personal_chat", "bot_chat") else None, + "first_name": chat_name if export_type in ("personal_chat", "bot_chat") else None, + } + ) + + seen_users: set[int] = set() + msg_count = 0 + media_count = 0 + max_msg_id = 0 + batch: list[dict[str, Any]] = [] + media_batch: list[dict[str, Any]] = [] + + for msg in messages: + msg_id = msg.get("id") + if msg_id is None: + continue + + max_msg_id = max(max_msg_id, msg_id) + msg_type = msg.get("type", "message") + + sender_id = parse_from_id(msg.get("from_id")) + if sender_id and sender_id > 0 and sender_id not in seen_users and not dry_run: + seen_users.add(sender_id) + await self.db.upsert_user( + { + "id": sender_id, + "first_name": msg.get("from", ""), + } + ) + + if msg_type == "service": + text = _build_service_text(msg) + else: + text = flatten_text(msg.get("text")) + + date = parse_date(msg) + if date is None: + logger.warning(f"Skipping message {msg_id}: no valid date") + continue + + raw_data: dict[str, Any] = {} + if msg.get("forwarded_from"): + raw_data["forward_from_name"] = msg["forwarded_from"] + + message_data = { + "id": msg_id, + "chat_id": chat_id, + "sender_id": sender_id, + "date": date, + "text": text, + "reply_to_msg_id": msg.get("reply_to_message_id"), + "forward_from_id": None, + "edit_date": parse_edited_date(msg), + "raw_data": raw_data, + "is_outgoing": 0, + "is_pinned": 0, + } + + batch.append(message_data) + msg_count += 1 + + if not skip_media: + media_type, rel_path, orig_name = _detect_media(msg, export_path) + if media_type and rel_path: + source = export_path / rel_path + if source.exists(): + media_id = f"import_{chat_id}_{msg_id}" + dest_dir = Path(self.media_path) / str(chat_id) + dest_name = f"{media_id}_{orig_name}" if orig_name else f"{media_id}" + dest_file = dest_dir / dest_name + stored_path = f"{chat_id}/{dest_name}" + + media_data = { + "id": media_id, + "message_id": msg_id, + "chat_id": chat_id, + "type": media_type, + "file_name": orig_name, + "file_path": stored_path, + "file_size": source.stat().st_size, + "mime_type": msg.get("mime_type"), + "width": msg.get("width"), + "height": msg.get("height"), + "duration": msg.get("duration_seconds"), + "downloaded": True, + "download_date": datetime.now(UTC).replace(tzinfo=None), + "_source": str(source), + "_dest": str(dest_file), + } + media_batch.append(media_data) + media_count += 1 + else: + logger.warning(f"Media file not found: {source}") + + if len(batch) >= BATCH_SIZE: + if not dry_run: + await self._flush_batch(batch, media_batch) + batch.clear() + media_batch.clear() + logger.info(f" Progress: {msg_count}/{len(messages)} messages") + + if batch and not dry_run: + await self._flush_batch(batch, media_batch) + + if not dry_run and msg_count > 0: + await self.db.update_sync_status(chat_id, max_msg_id, msg_count) + + action = "Would import" if dry_run else "Imported" + logger.info(f"{action} {msg_count} messages and {media_count} media files for '{chat_name}'") + + return { + "chat_id": chat_id, + "chat_name": chat_name, + "messages": msg_count, + "media": media_count, + "max_message_id": max_msg_id, + } + + async def _flush_batch( + self, + messages: list[dict[str, Any]], + media: list[dict[str, Any]], + ) -> None: + """Flush a batch of messages and media to the database.""" + await self.db.insert_messages_batch(messages) + + for m in media: + source = m.pop("_source") + dest = m.pop("_dest") + + dest_path = Path(dest) + dest_path.parent.mkdir(parents=True, exist_ok=True) + if not dest_path.exists(): + shutil.copy2(source, dest) + + await self.db.insert_media(m) diff --git a/tests/test_telegram_import.py b/tests/test_telegram_import.py new file mode 100644 index 0000000..4c64540 --- /dev/null +++ b/tests/test_telegram_import.py @@ -0,0 +1,479 @@ +import asyncio +import json +import os +import shutil +import tempfile +import unittest +from datetime import datetime +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +from src.telegram_import import ( + TelegramImporter, + _build_service_text, + _detect_media, + derive_chat_id, + flatten_text, + parse_date, + parse_edited_date, + parse_from_id, +) + + +class TestParseFromId(unittest.TestCase): + def test_user_id(self): + self.assertEqual(parse_from_id("user123456789"), 123456789) + + def test_channel_id(self): + self.assertEqual(parse_from_id("channel1234567890"), -1001234567890) + + def test_group_id(self): + self.assertEqual(parse_from_id("group123456789"), -123456789) + + def test_none(self): + self.assertIsNone(parse_from_id(None)) + + def test_empty_string(self): + self.assertIsNone(parse_from_id("")) + + def test_unknown_prefix(self): + self.assertIsNone(parse_from_id("bot123")) + + def test_invalid_number(self): + self.assertIsNone(parse_from_id("userabc")) + + +class TestDeriveChatId(unittest.TestCase): + def test_personal_chat(self): + self.assertEqual(derive_chat_id(123456, "personal_chat"), 123456) + + def test_bot_chat(self): + self.assertEqual(derive_chat_id(99999, "bot_chat"), 99999) + + def test_saved_messages(self): + self.assertEqual(derive_chat_id(42, "saved_messages"), 42) + + def test_private_group(self): + self.assertEqual(derive_chat_id(123456, "private_group"), -123456) + + def test_private_supergroup(self): + self.assertEqual(derive_chat_id(1234567890, "private_supergroup"), -1001234567890) + + def test_public_supergroup(self): + self.assertEqual(derive_chat_id(1234567890, "public_supergroup"), -1001234567890) + + def test_private_channel(self): + self.assertEqual(derive_chat_id(1234567890, "private_channel"), -1001234567890) + + def test_public_channel(self): + self.assertEqual(derive_chat_id(1234567890, "public_channel"), -1001234567890) + + def test_unknown_type(self): + self.assertEqual(derive_chat_id(42, "unknown_type"), 42) + + +class TestFlattenText(unittest.TestCase): + def test_plain_string(self): + self.assertEqual(flatten_text("Hello world"), "Hello world") + + def test_empty_string(self): + self.assertEqual(flatten_text(""), "") + + def test_none(self): + self.assertEqual(flatten_text(None), "") + + def test_entity_list(self): + entities = [ + {"type": "plain", "text": "Hello "}, + {"type": "bold", "text": "world"}, + {"type": "plain", "text": "!"}, + ] + self.assertEqual(flatten_text(entities), "Hello world!") + + def test_mixed_list(self): + entities = ["plain text", {"type": "link", "text": "http://example.com"}] + self.assertEqual(flatten_text(entities), "plain texthttp://example.com") + + def test_empty_list(self): + self.assertEqual(flatten_text([]), "") + + +class TestParseDate(unittest.TestCase): + def test_unixtime(self): + msg = {"date_unixtime": "1673779800"} + result = parse_date(msg) + self.assertIsInstance(result, datetime) + self.assertEqual(result.year, 2023) + + def test_iso_format(self): + msg = {"date": "2023-01-15T10:30:00"} + result = parse_date(msg) + self.assertIsInstance(result, datetime) + self.assertEqual(result.year, 2023) + self.assertEqual(result.month, 1) + self.assertEqual(result.day, 15) + + def test_prefers_unixtime(self): + msg = {"date_unixtime": "1673779800", "date": "2025-06-01T00:00:00"} + result = parse_date(msg) + self.assertEqual(result.year, 2023) + + def test_no_date(self): + self.assertIsNone(parse_date({})) + + def test_invalid_date(self): + self.assertIsNone(parse_date({"date": "not-a-date"})) + + +class TestParseEditedDate(unittest.TestCase): + def test_edited_unixtime(self): + msg = {"edited_unixtime": "1673780100"} + result = parse_edited_date(msg) + self.assertIsInstance(result, datetime) + + def test_edited_iso(self): + msg = {"edited": "2023-01-15T10:35:00"} + result = parse_edited_date(msg) + self.assertIsInstance(result, datetime) + + def test_no_edited(self): + self.assertIsNone(parse_edited_date({})) + + +class TestDetectMedia(unittest.TestCase): + def test_photo(self): + msg = {"photo": "photos/photo_1.jpg"} + media_type, rel, fname = _detect_media(msg, Path("/tmp")) + self.assertEqual(media_type, "photo") + self.assertEqual(rel, "photos/photo_1.jpg") + self.assertEqual(fname, "photo_1.jpg") + + def test_document(self): + msg = {"file": "files/doc.pdf", "file_name": "document.pdf", "mime_type": "application/pdf"} + media_type, rel, fname = _detect_media(msg, Path("/tmp")) + self.assertEqual(media_type, "document") + self.assertEqual(fname, "document.pdf") + + def test_video(self): + msg = {"file": "videos/vid.mp4", "media_type": "video_file"} + media_type, rel, fname = _detect_media(msg, Path("/tmp")) + self.assertEqual(media_type, "video") + + def test_voice(self): + msg = {"file": "voice/msg.ogg", "media_type": "voice_message"} + media_type, rel, fname = _detect_media(msg, Path("/tmp")) + self.assertEqual(media_type, "voice") + + def test_animation(self): + msg = {"file": "animations/anim.mp4", "media_type": "animation"} + media_type, rel, fname = _detect_media(msg, Path("/tmp")) + self.assertEqual(media_type, "animation") + + def test_no_media(self): + media_type, rel, fname = _detect_media({}, Path("/tmp")) + self.assertIsNone(media_type) + self.assertIsNone(rel) + + def test_photo_takes_precedence(self): + msg = {"photo": "photos/p.jpg", "file": "files/f.pdf"} + media_type, _, _ = _detect_media(msg, Path("/tmp")) + self.assertEqual(media_type, "photo") + + +class TestBuildServiceText(unittest.TestCase): + def test_pin_message(self): + msg = {"action": "pin_message", "from": "Alice"} + self.assertIn("pinned a message", _build_service_text(msg)) + self.assertIn("Alice", _build_service_text(msg)) + + def test_create_group(self): + msg = {"action": "create_group", "actor": "Bob", "title": "My Group"} + result = _build_service_text(msg) + self.assertIn("Bob", result) + self.assertIn("created the group", result) + self.assertIn("My Group", result) + + def test_unknown_action(self): + msg = {"action": "some_new_action", "from": "Charlie"} + result = _build_service_text(msg) + self.assertIn("some new action", result) + + +class TestTelegramImporterExtractChats(unittest.TestCase): + def _make_importer(self): + db = MagicMock() + return TelegramImporter(db, "/tmp/media") + + def test_single_chat_export(self): + data = {"name": "Test Chat", "type": "personal_chat", "id": 123, "messages": []} + importer = self._make_importer() + chats = importer._extract_chats(data) + self.assertEqual(len(chats), 1) + self.assertEqual(chats[0]["name"], "Test Chat") + + def test_full_account_export(self): + data = { + "chats": { + "list": [ + {"name": "Chat 1", "type": "personal_chat", "id": 1, "messages": []}, + {"name": "Chat 2", "type": "private_group", "id": 2, "messages": []}, + ] + } + } + importer = self._make_importer() + chats = importer._extract_chats(data) + self.assertEqual(len(chats), 2) + + def test_empty_data(self): + importer = self._make_importer() + self.assertEqual(importer._extract_chats({}), []) + + +class TestTelegramImporterRun(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.export_dir = os.path.join(self.temp_dir, "export") + os.makedirs(self.export_dir) + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _run(self, coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + def _write_export(self, data): + with open(os.path.join(self.export_dir, "result.json"), "w") as f: + json.dump(data, f) + + def test_dry_run_no_db_writes(self): + self._write_export( + { + "name": "Test", + "type": "personal_chat", + "id": 42, + "messages": [ + { + "id": 1, + "type": "message", + "date": "2024-01-15T10:00:00", + "from": "Alice", + "from_id": "user42", + "text": "Hello", + }, + { + "id": 2, + "type": "message", + "date": "2024-01-15T10:01:00", + "from": "Bob", + "from_id": "user99", + "text": "World", + }, + ], + } + ) + + db = AsyncMock() + importer = TelegramImporter(db, os.path.join(self.temp_dir, "media")) + + summary = self._run(importer.run(self.export_dir, dry_run=True)) + + self.assertEqual(summary["total_messages"], 2) + self.assertEqual(summary["chats_imported"], 1) + db.upsert_chat.assert_not_called() + db.insert_messages_batch.assert_not_called() + + def test_import_with_merge_check(self): + self._write_export( + { + "name": "Existing Chat", + "type": "personal_chat", + "id": 42, + "messages": [ + {"id": 1, "type": "message", "date": "2024-01-15T10:00:00", "text": "Hi"}, + ], + } + ) + + db = AsyncMock() + db.get_chat_stats.return_value = {"messages": 100} + importer = TelegramImporter(db, os.path.join(self.temp_dir, "media")) + + with self.assertRaises(ValueError) as ctx: + self._run(importer.run(self.export_dir, merge=False)) + self.assertIn("already has", str(ctx.exception)) + + def test_import_messages(self): + self._write_export( + { + "name": "Test Chat", + "type": "personal_chat", + "id": 42, + "messages": [ + { + "id": 1, + "type": "message", + "date": "2024-01-15T10:00:00", + "from": "Alice", + "from_id": "user42", + "text": "Hello", + }, + { + "id": 2, + "type": "service", + "date": "2024-01-15T10:05:00", + "from": "Alice", + "from_id": "user42", + "action": "pin_message", + }, + ], + } + ) + + db = AsyncMock() + db.get_chat_stats.return_value = {"messages": 0} + importer = TelegramImporter(db, os.path.join(self.temp_dir, "media")) + + summary = self._run(importer.run(self.export_dir)) + + self.assertEqual(summary["total_messages"], 2) + db.upsert_chat.assert_called_once() + db.insert_messages_batch.assert_called_once() + db.update_sync_status.assert_called_once_with(42, 2, 2) + + def test_import_with_media(self): + photos_dir = os.path.join(self.export_dir, "photos") + os.makedirs(photos_dir) + photo_path = os.path.join(photos_dir, "photo_1.jpg") + with open(photo_path, "wb") as f: + f.write(b"\xff\xd8\xff\xe0" + b"\x00" * 100) + + self._write_export( + { + "name": "Media Chat", + "type": "personal_chat", + "id": 42, + "messages": [ + { + "id": 1, + "type": "message", + "date": "2024-01-15T10:00:00", + "from": "Alice", + "from_id": "user42", + "text": "", + "photo": "photos/photo_1.jpg", + "width": 800, + "height": 600, + }, + ], + } + ) + + media_dir = os.path.join(self.temp_dir, "media") + db = AsyncMock() + db.get_chat_stats.return_value = {"messages": 0} + importer = TelegramImporter(db, media_dir) + + summary = self._run(importer.run(self.export_dir)) + + self.assertEqual(summary["total_media"], 1) + db.insert_media.assert_called_once() + media_call = db.insert_media.call_args[0][0] + self.assertEqual(media_call["type"], "photo") + self.assertEqual(media_call["message_id"], 1) + self.assertTrue(Path(media_dir, "42").exists()) + + def test_skip_media_flag(self): + photos_dir = os.path.join(self.export_dir, "photos") + os.makedirs(photos_dir) + with open(os.path.join(photos_dir, "photo_1.jpg"), "wb") as f: + f.write(b"\x00" * 50) + + self._write_export( + { + "name": "Chat", + "type": "personal_chat", + "id": 42, + "messages": [ + { + "id": 1, + "type": "message", + "date": "2024-01-15T10:00:00", + "text": "", + "photo": "photos/photo_1.jpg", + }, + ], + } + ) + + db = AsyncMock() + db.get_chat_stats.return_value = {"messages": 0} + importer = TelegramImporter(db, os.path.join(self.temp_dir, "media")) + + summary = self._run(importer.run(self.export_dir, skip_media=True)) + + self.assertEqual(summary["total_media"], 0) + db.insert_media.assert_not_called() + + def test_missing_result_json(self): + db = AsyncMock() + importer = TelegramImporter(db, "/tmp/media") + + with self.assertRaises(FileNotFoundError): + self._run(importer.run(self.export_dir)) + + def test_forwarded_message(self): + self._write_export( + { + "name": "Chat", + "type": "personal_chat", + "id": 42, + "messages": [ + { + "id": 1, + "type": "message", + "date": "2024-01-15T10:00:00", + "text": "Forwarded content", + "forwarded_from": "Some Channel", + }, + ], + } + ) + + db = AsyncMock() + db.get_chat_stats.return_value = {"messages": 0} + importer = TelegramImporter(db, os.path.join(self.temp_dir, "media")) + + self._run(importer.run(self.export_dir)) + + call_args = db.insert_messages_batch.call_args[0][0] + self.assertEqual(call_args[0]["raw_data"]["forward_from_name"], "Some Channel") + + def test_chat_id_override(self): + self._write_export( + { + "name": "Chat", + "type": "personal_chat", + "id": 42, + "messages": [ + {"id": 1, "type": "message", "date": "2024-01-15T10:00:00", "text": "Hi"}, + ], + } + ) + + db = AsyncMock() + db.get_chat_stats.return_value = {"messages": 0} + importer = TelegramImporter(db, os.path.join(self.temp_dir, "media")) + + summary = self._run(importer.run(self.export_dir, chat_id_override=-1009999)) + + self.assertEqual(summary["details"][0]["chat_id"], -1009999) + chat_call = db.upsert_chat.call_args[0][0] + self.assertEqual(chat_call["id"], -1009999) + + +if __name__ == "__main__": + unittest.main()