From b4be9d09e43229b360f2c5f4c695c29b80755e3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Danii=C5=82=20M=2E?= Date: Wed, 18 Feb 2026 11:15:40 +0100 Subject: [PATCH 1/4] refactor(community-manager): introduce batch processing for user validation to avoid OOM --- backend/community_manager/actions/chat.py | 26 +++++++++++ backend/community_manager/tasks/chat.py | 6 +-- backend/core/src/core/services/chat/user.py | 31 +++++++++++++ .../unit/core/services/chat/test_user.py | 44 +++++++++++++++++++ 4 files changed, 102 insertions(+), 5 deletions(-) create mode 100644 backend/tests/unit/core/services/chat/test_user.py diff --git a/backend/community_manager/actions/chat.py b/backend/community_manager/actions/chat.py index e808515..6da499c 100644 --- a/backend/community_manager/actions/chat.py +++ b/backend/community_manager/actions/chat.py @@ -1124,6 +1124,32 @@ def __init__(self, db_session: Session): self.authorization_action = AuthorizationAction(db_session) # self.bot_api_service = TelegramBotApiService() + async def check_chat_members_compliance(self, chat_id: int) -> int: + """ + Iterates over all members of a chat in batches and kicks ineligible members. + + :param chat_id: The ID of the chat to check. + :return: The total number of members processed. + """ + logger.info(f"Starting to check chat members for chat {chat_id=!r}.") + + total_processed = 0 + for chat_members_chunk in self.telegram_chat_user_service.yield_all_for_chat( + chat_id=chat_id, + batch_size=100, + ): + await self.kick_ineligible_chat_members(chat_members=chat_members_chunk) + total_processed += len(chat_members_chunk) + logger.info( + f"Processed chunk of {len(chat_members_chunk)} users for chat {chat_id=!r}. " + f"Total processed: {total_processed}" + ) + + logger.info( + f"Finished checking members for chat {chat_id=!r}. Total: {total_processed}" + ) + return total_processed + async def kick_chat_member(self, chat_member: TelegramChatUser) -> None: """ Kicks a specified chat member from the chat. It ensures that the bot diff --git a/backend/community_manager/tasks/chat.py b/backend/community_manager/tasks/chat.py index 872143a..7d7390e 100644 --- a/backend/community_manager/tasks/chat.py +++ b/backend/community_manager/tasks/chat.py @@ -44,11 +44,7 @@ async def check_target_chat_members(chat_id: int) -> None: with DBService().db_session() as db_session: # BotAPI does not need a telethon client action = CommunityManagerUserChatAction(db_session) - chat_members = action.telegram_chat_user_service.get_all( - chat_ids=[chat_id], with_wallet_details=True - ) - logger.info(f"Found {len(chat_members)} chat members for chat {chat_id=!r}.") - await action.kick_ineligible_chat_members(chat_members=chat_members) + await action.check_chat_members_compliance(chat_id=chat_id) @app.task( diff --git a/backend/core/src/core/services/chat/user.py b/backend/core/src/core/services/chat/user.py index bcf205c..c67c64d 100644 --- a/backend/core/src/core/services/chat/user.py +++ b/backend/core/src/core/services/chat/user.py @@ -170,6 +170,37 @@ def get_all( return query.all() + def yield_all_for_chat( + self, chat_id: int, batch_size: int = 100 + ) -> Iterable[list[TelegramChatUser]]: + """ + Yields all users for a given chat in batches, using keyset pagination. + This is useful for processing large chats without loading all users into memory. + """ + last_seen_user_id = 0 + while True: + stmt = ( + select(TelegramChatUser) + .where( + TelegramChatUser.chat_id == chat_id, + TelegramChatUser.user_id > last_seen_user_id, + ) + .order_by(TelegramChatUser.user_id.asc()) + .limit(batch_size) + .options( + joinedload(TelegramChatUser.wallet_link).options( + joinedload(TelegramChatUserWallet.wallet), + ) + ) + ) + users = self.db_session.execute(stmt).scalars().all() + + if not users: + break + + yield users + last_seen_user_id = users[-1].user_id + def get_all_by_linked_wallet(self, addresses: list[str]) -> list[TelegramChatUser]: query = self.db_session.query(TelegramChatUser) query = query.join( diff --git a/backend/tests/unit/core/services/chat/test_user.py b/backend/tests/unit/core/services/chat/test_user.py new file mode 100644 index 0000000..2f8ec80 --- /dev/null +++ b/backend/tests/unit/core/services/chat/test_user.py @@ -0,0 +1,44 @@ +import pytest +from core.models.chat import TelegramChatUser +from core.models.user import User +from core.services.chat.user import TelegramChatUserService + + +@pytest.mark.asyncio +async def test_yield_all_for_chat_batching(db_session, chat_factory): + # Setup + chat = chat_factory() + service = TelegramChatUserService(db_session) + + # Create 25 users + users = [] + for i in range(25): + user = User(telegram_id=1000 + i) + db_session.add(user) + db_session.flush() + + chat_user = TelegramChatUser( + chat_id=chat.id, user_id=user.id, is_admin=False, is_managed=True + ) + db_session.add(chat_user) + users.append(chat_user) + + db_session.commit() + + # Test + batches = [] + for batch in service.yield_all_for_chat(chat.id, batch_size=10): + batches.append(batch) + + # Verify + assert len(batches) == 3 + assert len(batches[0]) == 10 + assert len(batches[1]) == 10 + assert len(batches[2]) == 5 + + all_yielded_users = [u for batch in batches for u in batch] + assert len(all_yielded_users) == 25 + + # Verify order + user_ids = [u.user_id for u in all_yielded_users] + assert user_ids == sorted(user_ids) From e500115d6431f8e3c986cf70081223cf71059e12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Danii=C5=82=20M=2E?= Date: Wed, 18 Feb 2026 11:21:34 +0100 Subject: [PATCH 2/4] fix(tests): use TelegramChatFactory and UserFactory in test_user.py --- .../unit/core/services/chat/test_user.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/backend/tests/unit/core/services/chat/test_user.py b/backend/tests/unit/core/services/chat/test_user.py index 2f8ec80..9bed78d 100644 --- a/backend/tests/unit/core/services/chat/test_user.py +++ b/backend/tests/unit/core/services/chat/test_user.py @@ -1,30 +1,23 @@ import pytest -from core.models.chat import TelegramChatUser -from core.models.user import User from core.services.chat.user import TelegramChatUserService +from tests.factories import TelegramChatFactory, TelegramChatUserFactory, UserFactory @pytest.mark.asyncio -async def test_yield_all_for_chat_batching(db_session, chat_factory): +async def test_yield_all_for_chat_batching(db_session): # Setup - chat = chat_factory() + chat = TelegramChatFactory.with_session(db_session).create() service = TelegramChatUserService(db_session) # Create 25 users users = [] for i in range(25): - user = User(telegram_id=1000 + i) - db_session.add(user) - db_session.flush() - - chat_user = TelegramChatUser( - chat_id=chat.id, user_id=user.id, is_admin=False, is_managed=True + user = UserFactory.with_session(db_session).create(telegram_id=1000 + i) + chat_user = TelegramChatUserFactory.with_session(db_session).create( + chat=chat, user=user, is_admin=False, is_managed=True ) - db_session.add(chat_user) users.append(chat_user) - db_session.commit() - # Test batches = [] for batch in service.yield_all_for_chat(chat.id, batch_size=10): From d23db6fe5c70792c6c5ac9f433f76e52d4b6642c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Danii=C5=82=20M=2E?= Date: Wed, 18 Feb 2026 11:22:59 +0100 Subject: [PATCH 3/4] fix(tests): add proper typing to test_user.py --- backend/tests/unit/core/services/chat/test_user.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/backend/tests/unit/core/services/chat/test_user.py b/backend/tests/unit/core/services/chat/test_user.py index 9bed78d..e17916d 100644 --- a/backend/tests/unit/core/services/chat/test_user.py +++ b/backend/tests/unit/core/services/chat/test_user.py @@ -1,10 +1,13 @@ import pytest +from sqlalchemy.orm import Session + +from core.models.chat import TelegramChatUser from core.services.chat.user import TelegramChatUserService from tests.factories import TelegramChatFactory, TelegramChatUserFactory, UserFactory @pytest.mark.asyncio -async def test_yield_all_for_chat_batching(db_session): +async def test_yield_all_for_chat_batching(db_session: Session) -> None: # Setup chat = TelegramChatFactory.with_session(db_session).create() service = TelegramChatUserService(db_session) @@ -19,7 +22,7 @@ async def test_yield_all_for_chat_batching(db_session): users.append(chat_user) # Test - batches = [] + batches: list[list[TelegramChatUser]] = [] for batch in service.yield_all_for_chat(chat.id, batch_size=10): batches.append(batch) From 7cf1d17758fe124e493dfcb4ccec174be52918a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Danii=C5=82=20M=2E?= Date: Wed, 18 Feb 2026 11:24:19 +0100 Subject: [PATCH 4/4] fix(core): add .unique() to yield_all_for_chat query to resolve InvalidRequestError --- backend/core/src/core/services/chat/user.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/core/src/core/services/chat/user.py b/backend/core/src/core/services/chat/user.py index c67c64d..3a27dd3 100644 --- a/backend/core/src/core/services/chat/user.py +++ b/backend/core/src/core/services/chat/user.py @@ -193,7 +193,7 @@ def yield_all_for_chat( ) ) ) - users = self.db_session.execute(stmt).scalars().all() + users = self.db_session.execute(stmt).scalars().unique().all() if not users: break