Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ MAX_MEDIA_SIZE_MB=4096
# Higher values = faster but more memory usage
BATCH_SIZE=100

# How often to save backup progress (every N batch inserts)
# 1 = checkpoint every batch (safest, best crash recovery)
# Higher = fewer DB writes but more re-work on crash/restart
CHECKPOINT_INTERVAL=1

# Use symlinks to deduplicate identical media files across chats
# DEDUPLICATE_MEDIA=true

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ The **Scope** column shows whether each variable applies to the backup scheduler
| `DOWNLOAD_MEDIA` | `true` | B | Download media files (photos, videos, documents) |
| `MAX_MEDIA_SIZE_MB` | `100` | B | Skip media files larger than this (MB) |
| `BATCH_SIZE` | `100` | B | Messages processed per database batch |
| `CHECKPOINT_INTERVAL` | `1` | B | Save backup progress every N batch inserts (lower = safer resume after crash) |
| `DATABASE_TIMEOUT` | `60.0` | B/V | Database operation timeout in seconds |
| `SESSION_NAME` | `telegram_backup` | B | Telethon session file name |
| `DEDUPLICATE_MEDIA` | `true` | B | Symlink identical media files across chats to save disk space |
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ services:
DOWNLOAD_MEDIA: ${DOWNLOAD_MEDIA:-true}
MAX_MEDIA_SIZE_MB: ${MAX_MEDIA_SIZE_MB:-100}
BATCH_SIZE: ${BATCH_SIZE:-100}
CHECKPOINT_INTERVAL: ${CHECKPOINT_INTERVAL:-1}
# DEDUPLICATE_MEDIA: ${DEDUPLICATE_MEDIA:-true}
# PRIORITY_CHAT_IDS: ${PRIORITY_CHAT_IDS:-}
# SKIP_MEDIA_CHAT_IDS: ${SKIP_MEDIA_CHAT_IDS:-}
Expand Down
3 changes: 3 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def __init__(self):

# Batch processing configuration
self.batch_size = int(os.getenv("BATCH_SIZE", "100"))
# How often to checkpoint sync progress (every N batch inserts)
# Lower = better crash recovery, higher = fewer DB writes
self.checkpoint_interval = max(1, int(os.getenv("CHECKPOINT_INTERVAL", "1")))

# Database Configuration
# Timeout for SQLite operations (seconds).
Expand Down
127 changes: 58 additions & 69 deletions src/telegram_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,84 +613,48 @@ async def _backup_dialog(self, dialog, is_archived: bool = False) -> int:
# Get last synced message ID for incremental backup
last_message_id = await self.db.get_last_message_id(chat_id)

# Fetch new messages
messages = []
batch_data = []
# Fetch and process messages in batches with periodic checkpointing.
# sync_status is updated every checkpoint_interval batches so that
# a crash/restart only re-fetches messages since the last checkpoint
# instead of restarting the entire chat from scratch.
batch_data: list[dict] = []
batch_size = self.config.batch_size
total_processed = 0
checkpoint_interval = self.config.checkpoint_interval
grand_total = 0
uncheckpointed_count = 0
batches_since_checkpoint = 0
running_max_id = last_message_id

async for message in self.client.iter_messages(entity, min_id=last_message_id, reverse=True):
messages.append(message)

# Process message
msg_data = await self._process_message(message, chat_id)
batch_data.append(msg_data)
running_max_id = max(running_max_id, message.id)

# Batch insert every 50 messages
if len(batch_data) >= batch_size:
await self.db.insert_messages_batch(batch_data)
# Insert media records AFTER messages (FK constraint satisfied)
for msg in batch_data:
if msg.get("_media_data"):
await self.db.insert_media(msg["_media_data"])
# Store reactions for this batch
for msg in batch_data:
if msg.get("reactions"):
reactions_list = []
for reaction in msg["reactions"]:
# Store each user's reaction separately if we have user info
# Otherwise store as aggregated count
if reaction.get("user_ids") and len(reaction["user_ids"]) > 0:
# We have specific users - store each one
for user_id in reaction["user_ids"]:
reactions_list.append({"emoji": reaction["emoji"], "user_id": user_id, "count": 1})
# If count is higher than user_ids, add remaining as anonymous
remaining = reaction.get("count", 0) - len(reaction["user_ids"])
if remaining > 0:
reactions_list.append(
{"emoji": reaction["emoji"], "user_id": None, "count": remaining}
)
else:
# No user info - store as aggregated count
reactions_list.append(
{"emoji": reaction["emoji"], "user_id": None, "count": reaction.get("count", 1)}
)
if reactions_list:
await self.db.insert_reactions(msg["id"], chat_id, reactions_list)
total_processed += len(batch_data)
logger.info(f" → Processed {total_processed} messages...")
await self._commit_batch(batch_data, chat_id)
count = len(batch_data)
grand_total += count
uncheckpointed_count += count
batches_since_checkpoint += 1
logger.info(f" → Processed {grand_total} messages...")

if batches_since_checkpoint >= checkpoint_interval:
await self.db.update_sync_status(chat_id, running_max_id, uncheckpointed_count)
uncheckpointed_count = 0
batches_since_checkpoint = 0

batch_data = []

# Insert remaining messages
# Flush remaining messages
if batch_data:
await self.db.insert_messages_batch(batch_data)
# Insert media records AFTER messages (FK constraint satisfied)
for msg in batch_data:
if msg.get("_media_data"):
await self.db.insert_media(msg["_media_data"])
# Store reactions for remaining messages
for msg in batch_data:
if msg.get("reactions"):
reactions_list = []
for reaction in msg["reactions"]:
if reaction.get("user_ids") and len(reaction["user_ids"]) > 0:
for user_id in reaction["user_ids"]:
reactions_list.append({"emoji": reaction["emoji"], "user_id": user_id, "count": 1})
remaining = reaction.get("count", 0) - len(reaction["user_ids"])
if remaining > 0:
reactions_list.append({"emoji": reaction["emoji"], "user_id": None, "count": remaining})
else:
reactions_list.append(
{"emoji": reaction["emoji"], "user_id": None, "count": reaction.get("count", 1)}
)
if reactions_list:
await self.db.insert_reactions(msg["id"], chat_id, reactions_list)
total_processed += len(batch_data)

# Update sync status
if messages:
max_message_id = max(msg.id for msg in messages)
await self.db.update_sync_status(chat_id, max_message_id, len(messages))
await self._commit_batch(batch_data, chat_id)
count = len(batch_data)
grand_total += count
uncheckpointed_count += count

# Final checkpoint for any un-checkpointed messages
if uncheckpointed_count > 0:
await self.db.update_sync_status(chat_id, running_max_id, uncheckpointed_count)

# Sync deletions and edits if enabled (expensive!)
if self.config.sync_deletions_edits:
Expand All @@ -699,7 +663,32 @@ async def _backup_dialog(self, dialog, is_archived: bool = False) -> int:
# Always sync pinned messages to keep them up-to-date
await self._sync_pinned_messages(chat_id, entity)

return len(messages)
return grand_total

async def _commit_batch(self, batch_data: list[dict], chat_id: int) -> None:
"""Persist a batch of processed messages, their media and reactions to the DB."""
await self.db.insert_messages_batch(batch_data)

for msg in batch_data:
if msg.get("_media_data"):
await self.db.insert_media(msg["_media_data"])

for msg in batch_data:
if msg.get("reactions"):
reactions_list: list[dict] = []
for reaction in msg["reactions"]:
if reaction.get("user_ids") and len(reaction["user_ids"]) > 0:
for user_id in reaction["user_ids"]:
reactions_list.append({"emoji": reaction["emoji"], "user_id": user_id, "count": 1})
remaining = reaction.get("count", 0) - len(reaction["user_ids"])
if remaining > 0:
reactions_list.append({"emoji": reaction["emoji"], "user_id": None, "count": remaining})
else:
reactions_list.append(
{"emoji": reaction["emoji"], "user_id": None, "count": reaction.get("count", 1)}
)
if reactions_list:
await self.db.insert_reactions(msg["id"], chat_id, reactions_list)

async def _sync_deletions_and_edits(self, chat_id: int, entity):
"""
Expand Down
38 changes: 38 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,5 +302,43 @@ def test_skip_media_delete_existing_explicit_true(self):
self.assertTrue(config.skip_media_delete_existing)


class TestCheckpointInterval(unittest.TestCase):
"""Test CHECKPOINT_INTERVAL configuration for backup progress saving."""

def setUp(self):
self.temp_dir = tempfile.mkdtemp()

def tearDown(self):
shutil.rmtree(self.temp_dir, ignore_errors=True)

def test_checkpoint_interval_default(self):
"""CHECKPOINT_INTERVAL defaults to 1 when not set."""
env_vars = {"CHAT_TYPES": "private", "BACKUP_PATH": self.temp_dir}
with patch.dict(os.environ, env_vars, clear=True):
config = Config()
self.assertEqual(config.checkpoint_interval, 1)

def test_checkpoint_interval_custom(self):
"""Can configure a custom checkpoint interval."""
env_vars = {"CHAT_TYPES": "private", "CHECKPOINT_INTERVAL": "5", "BACKUP_PATH": self.temp_dir}
with patch.dict(os.environ, env_vars, clear=True):
config = Config()
self.assertEqual(config.checkpoint_interval, 5)

def test_checkpoint_interval_minimum_one(self):
"""CHECKPOINT_INTERVAL is clamped to minimum of 1."""
env_vars = {"CHAT_TYPES": "private", "CHECKPOINT_INTERVAL": "0", "BACKUP_PATH": self.temp_dir}
with patch.dict(os.environ, env_vars, clear=True):
config = Config()
self.assertEqual(config.checkpoint_interval, 1)

def test_checkpoint_interval_negative_clamped(self):
"""Negative CHECKPOINT_INTERVAL is clamped to 1."""
env_vars = {"CHAT_TYPES": "private", "CHECKPOINT_INTERVAL": "-3", "BACKUP_PATH": self.temp_dir}
with patch.dict(os.environ, env_vars, clear=True):
config = Config()
self.assertEqual(config.checkpoint_interval, 1)


if __name__ == "__main__":
unittest.main()
Loading
Loading