From 608f4b2d698ad31ec56baf2b6f1c9e2fc4a8d934 Mon Sep 17 00:00:00 2001 From: GeiserX <9169332+GeiserX@users.noreply.github.com> Date: Mon, 16 Feb 2026 21:35:52 +0100 Subject: [PATCH 1/2] fix: checkpoint sync_status after every N batches for crash-safe resume Moves sync_status updates from end-of-chat to after every CHECKPOINT_INTERVAL batch inserts (default: 1). On crash/restart, backup resumes from the last committed batch instead of re-fetching all messages for the current chat. Also extracts duplicated batch-commit logic into _commit_batch() helper, removes the message accumulation list (saves memory on large chats), and adds CHECKPOINT_INTERVAL env var with docs and tests. Closes #76 --- .env.example | 5 ++ README.md | 1 + docker-compose.yml | 1 + src/config.py | 3 + src/telegram_backup.py | 127 ++++++++++++--------------- tests/test_config.py | 38 ++++++++ tests/test_telegram_backup.py | 161 ++++++++++++++++++++++++++++++++++ 7 files changed, 267 insertions(+), 69 deletions(-) diff --git a/.env.example b/.env.example index 7e690ac..dd503de 100644 --- a/.env.example +++ b/.env.example @@ -39,6 +39,11 @@ MAX_MEDIA_SIZE_MB=4096 # Higher values = faster but more memory usage BATCH_SIZE=100 +# How often to save backup progress (every N batch inserts) +# 1 = checkpoint every batch (safest, best crash recovery) +# Higher = fewer DB writes but more re-work on crash/restart +CHECKPOINT_INTERVAL=1 + # Use symlinks to deduplicate identical media files across chats # DEDUPLICATE_MEDIA=true diff --git a/README.md b/README.md index b81ad1e..85e4944 100644 --- a/README.md +++ b/README.md @@ -216,6 +216,7 @@ The **Scope** column shows whether each variable applies to the backup scheduler | `DOWNLOAD_MEDIA` | `true` | B | Download media files (photos, videos, documents) | | `MAX_MEDIA_SIZE_MB` | `100` | B | Skip media files larger than this (MB) | | `BATCH_SIZE` | `100` | B | Messages processed per database batch | +| `CHECKPOINT_INTERVAL` | `1` | B | Save backup progress every N batch inserts (lower = safer resume after crash) | | `DATABASE_TIMEOUT` | `60.0` | B/V | Database operation timeout in seconds | | `SESSION_NAME` | `telegram_backup` | B | Telethon session file name | | `DEDUPLICATE_MEDIA` | `true` | B | Symlink identical media files across chats to save disk space | diff --git a/docker-compose.yml b/docker-compose.yml index 7a181e5..590e09a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,6 +20,7 @@ services: DOWNLOAD_MEDIA: ${DOWNLOAD_MEDIA:-true} MAX_MEDIA_SIZE_MB: ${MAX_MEDIA_SIZE_MB:-100} BATCH_SIZE: ${BATCH_SIZE:-100} + CHECKPOINT_INTERVAL: ${CHECKPOINT_INTERVAL:-1} # DEDUPLICATE_MEDIA: ${DEDUPLICATE_MEDIA:-true} # PRIORITY_CHAT_IDS: ${PRIORITY_CHAT_IDS:-} # SKIP_MEDIA_CHAT_IDS: ${SKIP_MEDIA_CHAT_IDS:-} diff --git a/src/config.py b/src/config.py index 4ac7afc..b909671 100644 --- a/src/config.py +++ b/src/config.py @@ -34,6 +34,9 @@ def __init__(self): # Batch processing configuration self.batch_size = int(os.getenv("BATCH_SIZE", "100")) + # How often to checkpoint sync progress (every N batch inserts) + # Lower = better crash recovery, higher = fewer DB writes + self.checkpoint_interval = max(1, int(os.getenv("CHECKPOINT_INTERVAL", "1"))) # Database Configuration # Timeout for SQLite operations (seconds). diff --git a/src/telegram_backup.py b/src/telegram_backup.py index 7dc5aa9..bcd5c96 100644 --- a/src/telegram_backup.py +++ b/src/telegram_backup.py @@ -613,84 +613,48 @@ async def _backup_dialog(self, dialog, is_archived: bool = False) -> int: # Get last synced message ID for incremental backup last_message_id = await self.db.get_last_message_id(chat_id) - # Fetch new messages - messages = [] - batch_data = [] + # Fetch and process messages in batches with periodic checkpointing. + # sync_status is updated every checkpoint_interval batches so that + # a crash/restart only re-fetches messages since the last checkpoint + # instead of restarting the entire chat from scratch. + batch_data: list[dict] = [] batch_size = self.config.batch_size - total_processed = 0 + checkpoint_interval = self.config.checkpoint_interval + grand_total = 0 + uncheckpointed_count = 0 + batches_since_checkpoint = 0 + running_max_id = last_message_id async for message in self.client.iter_messages(entity, min_id=last_message_id, reverse=True): - messages.append(message) - - # Process message msg_data = await self._process_message(message, chat_id) batch_data.append(msg_data) + running_max_id = max(running_max_id, message.id) - # Batch insert every 50 messages if len(batch_data) >= batch_size: - await self.db.insert_messages_batch(batch_data) - # Insert media records AFTER messages (FK constraint satisfied) - for msg in batch_data: - if msg.get("_media_data"): - await self.db.insert_media(msg["_media_data"]) - # Store reactions for this batch - for msg in batch_data: - if msg.get("reactions"): - reactions_list = [] - for reaction in msg["reactions"]: - # Store each user's reaction separately if we have user info - # Otherwise store as aggregated count - if reaction.get("user_ids") and len(reaction["user_ids"]) > 0: - # We have specific users - store each one - for user_id in reaction["user_ids"]: - reactions_list.append({"emoji": reaction["emoji"], "user_id": user_id, "count": 1}) - # If count is higher than user_ids, add remaining as anonymous - remaining = reaction.get("count", 0) - len(reaction["user_ids"]) - if remaining > 0: - reactions_list.append( - {"emoji": reaction["emoji"], "user_id": None, "count": remaining} - ) - else: - # No user info - store as aggregated count - reactions_list.append( - {"emoji": reaction["emoji"], "user_id": None, "count": reaction.get("count", 1)} - ) - if reactions_list: - await self.db.insert_reactions(msg["id"], chat_id, reactions_list) - total_processed += len(batch_data) - logger.info(f" → Processed {total_processed} messages...") + await self._commit_batch(batch_data, chat_id) + count = len(batch_data) + grand_total += count + uncheckpointed_count += count + batches_since_checkpoint += 1 + logger.info(f" → Processed {grand_total} messages...") + + if batches_since_checkpoint >= checkpoint_interval: + await self.db.update_sync_status(chat_id, running_max_id, uncheckpointed_count) + uncheckpointed_count = 0 + batches_since_checkpoint = 0 + batch_data = [] - # Insert remaining messages + # Flush remaining messages if batch_data: - await self.db.insert_messages_batch(batch_data) - # Insert media records AFTER messages (FK constraint satisfied) - for msg in batch_data: - if msg.get("_media_data"): - await self.db.insert_media(msg["_media_data"]) - # Store reactions for remaining messages - for msg in batch_data: - if msg.get("reactions"): - reactions_list = [] - for reaction in msg["reactions"]: - if reaction.get("user_ids") and len(reaction["user_ids"]) > 0: - for user_id in reaction["user_ids"]: - reactions_list.append({"emoji": reaction["emoji"], "user_id": user_id, "count": 1}) - remaining = reaction.get("count", 0) - len(reaction["user_ids"]) - if remaining > 0: - reactions_list.append({"emoji": reaction["emoji"], "user_id": None, "count": remaining}) - else: - reactions_list.append( - {"emoji": reaction["emoji"], "user_id": None, "count": reaction.get("count", 1)} - ) - if reactions_list: - await self.db.insert_reactions(msg["id"], chat_id, reactions_list) - total_processed += len(batch_data) - - # Update sync status - if messages: - max_message_id = max(msg.id for msg in messages) - await self.db.update_sync_status(chat_id, max_message_id, len(messages)) + await self._commit_batch(batch_data, chat_id) + count = len(batch_data) + grand_total += count + uncheckpointed_count += count + + # Final checkpoint for any un-checkpointed messages + if uncheckpointed_count > 0: + await self.db.update_sync_status(chat_id, running_max_id, uncheckpointed_count) # Sync deletions and edits if enabled (expensive!) if self.config.sync_deletions_edits: @@ -699,7 +663,32 @@ async def _backup_dialog(self, dialog, is_archived: bool = False) -> int: # Always sync pinned messages to keep them up-to-date await self._sync_pinned_messages(chat_id, entity) - return len(messages) + return grand_total + + async def _commit_batch(self, batch_data: list[dict], chat_id: int) -> None: + """Persist a batch of processed messages, their media and reactions to the DB.""" + await self.db.insert_messages_batch(batch_data) + + for msg in batch_data: + if msg.get("_media_data"): + await self.db.insert_media(msg["_media_data"]) + + for msg in batch_data: + if msg.get("reactions"): + reactions_list: list[dict] = [] + for reaction in msg["reactions"]: + if reaction.get("user_ids") and len(reaction["user_ids"]) > 0: + for user_id in reaction["user_ids"]: + reactions_list.append({"emoji": reaction["emoji"], "user_id": user_id, "count": 1}) + remaining = reaction.get("count", 0) - len(reaction["user_ids"]) + if remaining > 0: + reactions_list.append({"emoji": reaction["emoji"], "user_id": None, "count": remaining}) + else: + reactions_list.append( + {"emoji": reaction["emoji"], "user_id": None, "count": reaction.get("count", 1)} + ) + if reactions_list: + await self.db.insert_reactions(msg["id"], chat_id, reactions_list) async def _sync_deletions_and_edits(self, chat_id: int, entity): """ diff --git a/tests/test_config.py b/tests/test_config.py index 0755208..b1d8443 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -302,5 +302,43 @@ def test_skip_media_delete_existing_explicit_true(self): self.assertTrue(config.skip_media_delete_existing) +class TestCheckpointInterval(unittest.TestCase): + """Test CHECKPOINT_INTERVAL configuration for backup progress saving.""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_checkpoint_interval_default(self): + """CHECKPOINT_INTERVAL defaults to 1 when not set.""" + env_vars = {"CHAT_TYPES": "private", "BACKUP_PATH": self.temp_dir} + with patch.dict(os.environ, env_vars, clear=True): + config = Config() + self.assertEqual(config.checkpoint_interval, 1) + + def test_checkpoint_interval_custom(self): + """Can configure a custom checkpoint interval.""" + env_vars = {"CHAT_TYPES": "private", "CHECKPOINT_INTERVAL": "5", "BACKUP_PATH": self.temp_dir} + with patch.dict(os.environ, env_vars, clear=True): + config = Config() + self.assertEqual(config.checkpoint_interval, 5) + + def test_checkpoint_interval_minimum_one(self): + """CHECKPOINT_INTERVAL is clamped to minimum of 1.""" + env_vars = {"CHAT_TYPES": "private", "CHECKPOINT_INTERVAL": "0", "BACKUP_PATH": self.temp_dir} + with patch.dict(os.environ, env_vars, clear=True): + config = Config() + self.assertEqual(config.checkpoint_interval, 1) + + def test_checkpoint_interval_negative_clamped(self): + """Negative CHECKPOINT_INTERVAL is clamped to 1.""" + env_vars = {"CHAT_TYPES": "private", "CHECKPOINT_INTERVAL": "-3", "BACKUP_PATH": self.temp_dir} + with patch.dict(os.environ, env_vars, clear=True): + config = Config() + self.assertEqual(config.checkpoint_interval, 1) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_telegram_backup.py b/tests/test_telegram_backup.py index b191f71..1374a6e 100644 --- a/tests/test_telegram_backup.py +++ b/tests/test_telegram_backup.py @@ -252,5 +252,166 @@ def test_cleanup_db_error_does_not_crash(self): self._run(self.backup._cleanup_existing_media(-1001234567890)) +class TestBackupCheckpointing(unittest.TestCase): + """Test per-batch sync_status checkpointing in _backup_dialog.""" + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + + self.config = MagicMock() + self.config.batch_size = 2 + self.config.checkpoint_interval = 1 + self.config.skip_media_chat_ids = set() + self.config.skip_media_delete_existing = False + self.config.sync_deletions_edits = False + self.config.media_path = os.path.join(self.temp_dir, "media") + + self.db = AsyncMock() + self.db.get_last_message_id.return_value = 0 + + self.backup = TelegramBackup.__new__(TelegramBackup) + self.backup.config = self.config + self.backup.db = self.db + self.backup.client = MagicMock() + self.backup._cleaned_media_chats = set() + self.backup._get_marked_id = MagicMock(return_value=100) + self.backup._extract_chat_data = MagicMock(return_value={"id": 100}) + self.backup._ensure_profile_photo = AsyncMock() + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _run(self, coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + def _make_dialog(self): + dialog = MagicMock() + dialog.entity = MagicMock() + return dialog + + def _make_message(self, msg_id): + msg = MagicMock() + msg.id = msg_id + return msg + + def test_checkpoint_after_every_batch(self): + """With checkpoint_interval=1, sync_status updates after every batch.""" + messages = [self._make_message(i) for i in range(1, 5)] + + async def fake_iter(*args, **kwargs): + for m in messages: + yield m + + self.backup.client.iter_messages = fake_iter + self.backup._process_message = AsyncMock(side_effect=lambda m, c: {"id": m.id, "chat_id": c}) + self.backup._commit_batch = AsyncMock() + self.backup._sync_pinned_messages = AsyncMock() + + result = self._run(self.backup._backup_dialog(self._make_dialog(), 100)) + + self.assertEqual(result, 4) + # 2 batches of 2 => 2 checkpoints, nothing left uncheckpointed + self.assertEqual(self.db.update_sync_status.await_count, 2) + + def test_checkpoint_interval_greater_than_one(self): + """With checkpoint_interval=2, checkpoint only every 2nd batch.""" + self.config.checkpoint_interval = 2 + messages = [self._make_message(i) for i in range(1, 7)] + + async def fake_iter(*args, **kwargs): + for m in messages: + yield m + + self.backup.client.iter_messages = fake_iter + self.backup._process_message = AsyncMock(side_effect=lambda m, c: {"id": m.id, "chat_id": c}) + self.backup._commit_batch = AsyncMock() + self.backup._sync_pinned_messages = AsyncMock() + + result = self._run(self.backup._backup_dialog(self._make_dialog(), 200)) + + self.assertEqual(result, 6) + # 3 batches of 2, checkpoint_interval=2 => checkpoint at batch 2, then final for batch 3 + self.assertEqual(self.db.update_sync_status.await_count, 2) + + def test_final_flush_gets_checkpointed(self): + """Leftover messages (< batch_size) are flushed and checkpointed.""" + messages = [self._make_message(i) for i in range(1, 4)] + + async def fake_iter(*args, **kwargs): + for m in messages: + yield m + + self.backup.client.iter_messages = fake_iter + self.backup._process_message = AsyncMock(side_effect=lambda m, c: {"id": m.id, "chat_id": c}) + self.backup._commit_batch = AsyncMock() + self.backup._sync_pinned_messages = AsyncMock() + + result = self._run(self.backup._backup_dialog(self._make_dialog(), 300)) + + self.assertEqual(result, 3) + # batch of 2 -> checkpoint, then 1 remaining -> final checkpoint + self.assertEqual(self.db.update_sync_status.await_count, 2) + + def test_no_messages_no_checkpoint(self): + """When there are no new messages, no checkpoint should happen.""" + async def fake_iter(*args, **kwargs): + return + yield # noqa: unreachable - makes this an async generator + + self.backup.client.iter_messages = fake_iter + self.backup._process_message = AsyncMock() + self.backup._commit_batch = AsyncMock() + self.backup._sync_pinned_messages = AsyncMock() + + result = self._run(self.backup._backup_dialog(self._make_dialog(), 400)) + + self.assertEqual(result, 0) + self.db.update_sync_status.assert_not_awaited() + + def test_checkpoint_tracks_max_message_id(self): + """Checkpoint should pass the highest message ID seen so far.""" + messages = [self._make_message(10), self._make_message(20)] + + async def fake_iter(*args, **kwargs): + for m in messages: + yield m + + self.backup.client.iter_messages = fake_iter + self.backup._process_message = AsyncMock(side_effect=lambda m, c: {"id": m.id, "chat_id": c}) + self.backup._commit_batch = AsyncMock() + self.backup._sync_pinned_messages = AsyncMock() + + self._run(self.backup._backup_dialog(self._make_dialog(), 500)) + + call_args = self.db.update_sync_status.call_args + self.assertEqual(call_args[0][1], 20) + + def test_commit_batch_called_correctly(self): + """_commit_batch persists messages, media and reactions.""" + backup = TelegramBackup.__new__(TelegramBackup) + backup.db = AsyncMock() + + batch = [ + {"id": 1, "chat_id": 100, "_media_data": {"file_path": "/a.jpg"}, "reactions": None}, + {"id": 2, "chat_id": 100, "reactions": [ + {"emoji": "👍", "user_ids": [], "count": 3} + ]}, + ] + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(backup._commit_batch(batch, 100)) + finally: + loop.close() + + backup.db.insert_messages_batch.assert_awaited_once_with(batch) + backup.db.insert_media.assert_awaited_once_with({"file_path": "/a.jpg"}) + backup.db.insert_reactions.assert_awaited_once() + + if __name__ == "__main__": unittest.main() From 3544f517c7d2adcbe2e4561c74c73ea84c9d8c31 Mon Sep 17 00:00:00 2001 From: GeiserX <9169332+GeiserX@users.noreply.github.com> Date: Mon, 16 Feb 2026 21:37:40 +0100 Subject: [PATCH 2/2] style: fix ruff formatting in test file --- tests/test_telegram_backup.py | 82 +++++++++++++++++++++++++++-------- 1 file changed, 65 insertions(+), 17 deletions(-) diff --git a/tests/test_telegram_backup.py b/tests/test_telegram_backup.py index 1374a6e..29ef895 100644 --- a/tests/test_telegram_backup.py +++ b/tests/test_telegram_backup.py @@ -94,8 +94,15 @@ def test_cleanup_deletes_real_files(self): f.write(b"x" * 1024) self.db.get_media_for_chat.return_value = [ - {"id": "m1", "message_id": 1, "chat_id": chat_id, "type": "photo", - "file_path": file_path, "file_size": 1024, "downloaded": True} + { + "id": "m1", + "message_id": 1, + "chat_id": chat_id, + "type": "photo", + "file_path": file_path, + "file_size": 1024, + "downloaded": True, + } ] self.db.delete_media_for_chat.return_value = 1 @@ -121,8 +128,15 @@ def test_cleanup_removes_symlinks_without_counting_freed_bytes(self): os.symlink(rel_path, symlink_path) self.db.get_media_for_chat.return_value = [ - {"id": "m1", "message_id": 1, "chat_id": chat_id, "type": "photo", - "file_path": symlink_path, "file_size": 2048, "downloaded": True} + { + "id": "m1", + "message_id": 1, + "chat_id": chat_id, + "type": "photo", + "file_path": symlink_path, + "file_size": 2048, + "downloaded": True, + } ] self.db.delete_media_for_chat.return_value = 1 @@ -144,8 +158,15 @@ def test_cleanup_removes_empty_chat_directory(self): f.write(b"x" * 512) self.db.get_media_for_chat.return_value = [ - {"id": "m1", "message_id": 1, "chat_id": chat_id, "type": "photo", - "file_path": file_path, "file_size": 512, "downloaded": True} + { + "id": "m1", + "message_id": 1, + "chat_id": chat_id, + "type": "photo", + "file_path": file_path, + "file_size": 512, + "downloaded": True, + } ] self.db.delete_media_for_chat.return_value = 1 @@ -168,8 +189,15 @@ def test_cleanup_keeps_nonempty_directory(self): f.write(b"y" * 256) self.db.get_media_for_chat.return_value = [ - {"id": "m1", "message_id": 1, "chat_id": chat_id, "type": "photo", - "file_path": tracked_file, "file_size": 512, "downloaded": True} + { + "id": "m1", + "message_id": 1, + "chat_id": chat_id, + "type": "photo", + "file_path": tracked_file, + "file_size": 512, + "downloaded": True, + } ] self.db.delete_media_for_chat.return_value = 1 @@ -191,8 +219,15 @@ def test_cleanup_handles_missing_files(self): """Should handle records where file doesn't exist on disk.""" chat_id = -1001234567890 self.db.get_media_for_chat.return_value = [ - {"id": "m1", "message_id": 1, "chat_id": chat_id, "type": "photo", - "file_path": "/nonexistent/path.jpg", "file_size": 1024, "downloaded": True} + { + "id": "m1", + "message_id": 1, + "chat_id": chat_id, + "type": "photo", + "file_path": "/nonexistent/path.jpg", + "file_size": 1024, + "downloaded": True, + } ] self.db.delete_media_for_chat.return_value = 1 @@ -232,10 +267,24 @@ def test_cleanup_mixed_real_and_symlinks(self): os.symlink(rel_path, symlink_path) self.db.get_media_for_chat.return_value = [ - {"id": "m1", "message_id": 1, "chat_id": chat_id, "type": "video", - "file_path": real_file, "file_size": 4096, "downloaded": True}, - {"id": "m2", "message_id": 2, "chat_id": chat_id, "type": "photo", - "file_path": symlink_path, "file_size": 2048, "downloaded": True}, + { + "id": "m1", + "message_id": 1, + "chat_id": chat_id, + "type": "video", + "file_path": real_file, + "file_size": 4096, + "downloaded": True, + }, + { + "id": "m2", + "message_id": 2, + "chat_id": chat_id, + "type": "photo", + "file_path": symlink_path, + "file_size": 2048, + "downloaded": True, + }, ] self.db.delete_media_for_chat.return_value = 2 @@ -358,6 +407,7 @@ async def fake_iter(*args, **kwargs): def test_no_messages_no_checkpoint(self): """When there are no new messages, no checkpoint should happen.""" + async def fake_iter(*args, **kwargs): return yield # noqa: unreachable - makes this an async generator @@ -397,9 +447,7 @@ def test_commit_batch_called_correctly(self): batch = [ {"id": 1, "chat_id": 100, "_media_data": {"file_path": "/a.jpg"}, "reactions": None}, - {"id": 2, "chat_id": 100, "reactions": [ - {"emoji": "👍", "user_ids": [], "count": 3} - ]}, + {"id": 2, "chat_id": 100, "reactions": [{"emoji": "👍", "user_ids": [], "count": 3}]}, ] loop = asyncio.new_event_loop()