diff --git a/business_objects/user.py b/business_objects/user.py index 8fa65dd..b7164ff 100644 --- a/business_objects/user.py +++ b/business_objects/user.py @@ -26,6 +26,42 @@ def get_user_cached_if_not_admin(user_id: str) -> Optional[User]: return user +# TODO use new roles form kratos +def get_admin_users() -> List[User]: + kernai_admins = ( + session.query(User) + .filter(User.email.ilike("%@kern.ai"), User.verified == True) + .all() + ) + + query = """ + SELECT email FROM global.full_admin_access + """ + + result = general.execute_all(query) + full_admin_emails = [row[0].lower() for row in result] if result else [] + + if full_admin_emails: + full_admins = ( + session.query(User).filter(User.email.in_(full_admin_emails)).all() + ) + else: + full_admins = [] + + admin_users = {user.id: user for user in kernai_admins + full_admins} + return list(admin_users.values()) + + +def get_engineer_users(org_id: str) -> List[User]: + engineers = ( + session.query(User) + .filter(User.role == enums.UserRoles.ENGINEER.value) + .filter(User.organization_id == org_id) + .all() + ) + return engineers + + @TTLCacheDecorator(CacheEnum.USER, 5, "user_id") def get_user_cached(user_id: str) -> User: user = get(user_id) @@ -52,6 +88,23 @@ def get_all( return query.all() +def get_all_users_by_users_team(user_id: str) -> List[User]: + if not user_id: + return [] + teams_subquery = ( + session.query(TeamMember.team_id) + .filter(TeamMember.user_id == user_id) + .subquery() + ) + query = ( + session.query(User) + .join(TeamMember, TeamMember.user_id == User.id) + .filter(TeamMember.team_id.in_(sql.select(teams_subquery))) + .distinct(User.id) + ) + return query.all() + + def get_all_team_members_by_project(project_id: str) -> List[User]: query = ( session.query(TeamMember) diff --git a/enums.py b/enums.py index 4dbceca..0d5c4a0 100644 --- a/enums.py +++ b/enums.py @@ -180,6 +180,9 @@ class Tablenames(Enum): TIMED_EXECUTIONS = "timed_executions" CONVERSATION_SHARE = "conversation_share" CONVERSATION_GLOBAL_SHARE = "conversation_global_share" + INBOX_MAIL = "inbox_mail" + INBOX_MAIL_THREAD = "inbox_mail_thread" + INBOX_MAIL_THREAD_ASSOCIATION = "inbox_mail_thread_association" def snake_case_to_pascal_case(self): # the type name (written in PascalCase) of a table is needed to create backrefs @@ -1025,3 +1028,10 @@ class MessageType(Enum): class TimedExecutionKey(Enum): LAST_RESET_USER_MESSAGE_COUNT = "LAST_RESET_USER_MESSAGE_COUNT" + + +class InboxMailThreadSupportProgressState(Enum): + PENDING = "PENDING" + IN_PROGRESS = "IN_PROGRESS" + RESOLVED = "RESOLVED" + FAILED = "FAILED" diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py new file mode 100644 index 0000000..37845bf --- /dev/null +++ b/global_objects/inbox_mail.py @@ -0,0 +1,388 @@ +from concurrent.futures import thread +from typing import Dict, List, Optional, Any +from submodules.model.util import sql_alchemy_to_dict + +from ..session import session +from sqlalchemy import cast, String, func, desc, asc + +from submodules.model.business_objects import ( + general, + user as user_bo, + organization as org_bo, +) +from submodules.model.cognition_objects import ( + project as cognition_project, + conversation, +) +from submodules.model.models import ( + InboxMail, + InboxMailThread, + InboxMailThreadAssociation, +) +from submodules.model.enums import InboxMailThreadSupportProgressState +from sqlalchemy.orm.attributes import flag_modified + + +def get(inbox_mail_id: str) -> InboxMail: + return session.query(InboxMail).filter(InboxMail.id == inbox_mail_id).first() + + +def get_new_inbox_mails( + org_id: str, user_id: str, user_is_admin: bool +) -> Dict[str, int]: + count_query = ( + session.query( + InboxMailThreadAssociation.thread_id, + func.sum(InboxMailThreadAssociation.unread_mail_count).label( + "total_unread_count" + ), + ) + .join( + InboxMailThread, + InboxMailThread.id == InboxMailThreadAssociation.thread_id, + ) + .filter( + InboxMailThread.organization_id == org_id, + InboxMailThreadAssociation.user_id == user_id, + ) + .group_by(InboxMailThreadAssociation.thread_id) + ) + + total_unread_count = 0 + for _, thread_unread_count in count_query: + total_unread_count += thread_unread_count + + if user_is_admin: + admin_unread_count = 0 + admin_threads = ( + session.query(InboxMailThread) + .filter( + InboxMailThread.is_admin_support_thread == True, + ) + .all() + ) + for thread in admin_threads: + meta_data = thread.meta_data or {} + admin_unread_count += meta_data.get("unreadMailCountAdmin", 0) + total_unread_count += admin_unread_count + return total_unread_count + + +def get_by_thread( + org_id: str, user_id: str, thread_id: str, user_is_admin: bool = False +) -> List[InboxMail]: + + # Regular users: only threads they participate in. + # Admins: threads they participate in (normal), plus all admin support threads. + query = session.query(InboxMail).join( + InboxMailThread, InboxMail.thread_id == InboxMailThread.id + ) + + if user_is_admin: + participant_thread_ids = ( + session.query(InboxMailThreadAssociation.thread_id) + .filter(InboxMailThreadAssociation.user_id == user_id) + .subquery() + ) + query = query.filter( + (InboxMailThread.id == thread_id) + & ( + (InboxMailThread.id.in_(participant_thread_ids.select())) + | (InboxMailThread.is_admin_support_thread == True) + ) + ) + else: + query = query.join( + InboxMailThreadAssociation, + InboxMailThreadAssociation.thread_id == InboxMailThread.id, + ).filter( + InboxMailThreadAssociation.user_id == user_id, + InboxMailThread.id == thread_id, + ) + + inbox_mail_entities = query.order_by(asc(InboxMail.created_at)).all() + return inbox_mail_entities + + +def get_inbox_mail_thread_length(thread_id: str) -> int: + count = session.query(InboxMail).filter(InboxMail.thread_id == thread_id).count() + return count + + +def delete(inbox_mail_id: str, with_commit: bool = True) -> None: + session.query(InboxMail).filter(InboxMail.id == inbox_mail_id).delete() + general.flush_or_commit(with_commit) + + +def delete_thread_by_id(thread_id: str, with_commit: bool = True) -> None: + session.query(InboxMailThread).filter(InboxMailThread.id == thread_id).delete() + general.flush_or_commit(with_commit) + + +def get_overview_by_threads( + org_id: str, + user_id: str, + page: int = 1, + limit: int = 10, + user_is_admin: bool = False, +) -> Dict[str, Any]: + + # Regular users: only threads they participate in. + # Admins: threads they participate in (normal), plus all admin support threads. + + base_query = session.query(InboxMailThread) + + if user_is_admin: + participant_thread_ids = ( + session.query(InboxMailThreadAssociation.thread_id) + .filter(InboxMailThreadAssociation.user_id == user_id) + .subquery() + ) + query = base_query.filter( + (InboxMailThread.id.in_(participant_thread_ids.select())) + | (InboxMailThread.is_admin_support_thread == True) + ) + else: + query = base_query.join( + InboxMailThreadAssociation, + InboxMailThreadAssociation.thread_id == InboxMailThread.id, + ).filter(InboxMailThreadAssociation.user_id == str(user_id)) + + total_threads = query.count() + + threads = ( + query.order_by(desc(InboxMailThread.created_at)) + .offset((page - 1) * limit) + .limit(limit) + .all() + ) + + thread_ids = [t.id for t in threads] + + participants = ( + session.query( + InboxMailThreadAssociation.thread_id, InboxMailThreadAssociation.user_id + ) + .filter(InboxMailThreadAssociation.thread_id.in_(thread_ids)) + .all() + ) + + participants_map = {} + for thread_id, participant_user_id in participants: + participants_map.setdefault(str(thread_id), []).append(str(participant_user_id)) + + unread_counts = ( + session.query( + InboxMailThreadAssociation.thread_id, + InboxMailThreadAssociation.unread_mail_count, + ) + .filter( + InboxMailThreadAssociation.thread_id.in_(thread_ids), + InboxMailThreadAssociation.user_id == user_id, + ) + .all() + ) + unread_count_map = { + str(thread_id): unread_mail_count + for thread_id, unread_mail_count in unread_counts + } + + thread_dicts = [] + for thread in threads: + thread_dict = __extend_thread_dict(thread, participants_map, unread_count_map) + if user_is_admin and thread_dict.get("is_admin_support_thread"): + __enrich_admin_metadata( + thread_dict.get("meta_data", {}), + ) + thread_dicts.append(thread_dict) + + return { + "total_threads": total_threads, + "page": page, + "limit": limit, + "threads": thread_dicts, + } + + +def __extend_thread_dict(thread, participants_map, unread_count_map): + thread_dict = sql_alchemy_to_dict(thread) + thread_dict["latest_mail"] = sql_alchemy_to_dict(get_last_in_thread(thread.id)) + thread_dict["participant_ids"] = participants_map.get(str(thread.id), []) + thread_dict["unread_mail_count"] = unread_count_map.get(str(thread.id), 0) + thread_dict["organization_name"] = ( + org_bo.get(thread.organization_id).name if thread.organization_id else None + ) + return thread_dict + + +def __enrich_admin_metadata(meta_data): + + project_id = meta_data.get("projectId") + if project_id: + project_entity = cognition_project.get(project_id) + meta_data["projectName"] = ( + project_entity.name if project_entity else "" + ) + conversation_id = meta_data.get("conversationId") + if conversation_id: + conversation_entity = conversation.get(project_id, conversation_id) + meta_data["conversationHeader"] = ( + conversation_entity.header + if conversation_entity + else "" + ) + + +def get_inbox_mail_thread_by_id(thread_id: str) -> InboxMailThread: + thread_entity = ( + session.query(InboxMailThread).filter(InboxMailThread.id == thread_id).first() + ) + if not thread_entity: + raise ValueError("Inbox mail thread not found") + return thread_entity + + +def get_inbox_mail_thread_association_by_thread_id_and_user_id( + thread_id: str, user_id: str +) -> Optional[InboxMailThreadAssociation]: + association_entity = ( + session.query(InboxMailThreadAssociation) + .filter( + InboxMailThreadAssociation.thread_id == thread_id, + InboxMailThreadAssociation.user_id == user_id, + ) + .first() + ) + return association_entity + + +def get_inbox_mail_thread_associations_by_thread_id( + thread_id: str, +) -> List[InboxMailThreadAssociation]: + association_entities = ( + session.query(InboxMailThreadAssociation) + .filter( + InboxMailThreadAssociation.thread_id == thread_id, + ) + .all() + ) + return association_entities + + +def create_by_thread( + org_id: str, + sender_id: str, + content: str, + recipient_ids: Optional[List[str]] = None, + subject: Optional[str] = None, + meta_data: Optional[Dict] = None, + thread_id: Optional[str] = None, + is_important: bool = False, + is_admin_support_thread: bool = False, + created_by: Optional[str] = None, + with_commit: bool = True, +) -> List[InboxMail]: + + if thread_id is None: + if is_admin_support_thread: + meta_data = meta_data or {} + meta_data["unreadMailCountAdmin"] = 1 + thread_entity = InboxMailThread( + created_by=sender_id if not created_by else created_by, + organization_id=org_id, + subject=subject, + meta_data=meta_data or {}, + is_important=is_important, + is_admin_support_thread=is_admin_support_thread, + ) + general.add(thread_entity) + + participant_ids = [sender_id] + recipient_ids + thread_association_entities = [] + for user_id in participant_ids: + thread_association_entity = InboxMailThreadAssociation( + thread_id=thread_entity.id, + user_id=user_id, + unread_mail_count=1 if user_id != sender_id else 0, + ) + thread_association_entities.append(thread_association_entity) + general.add_all(thread_association_entities) + else: + thread_entity = get_inbox_mail_thread_by_id(thread_id) + # Only update unread counts if the sender is also the issue creator + if is_admin_support_thread and thread_entity.created_by == sender_id: + meta_data = thread_entity.meta_data or {} + meta_data["unreadMailCountAdmin"] = ( + meta_data.get("unreadMailCountAdmin", 0) + 1 + ) + thread_entity.meta_data = meta_data + association_entities = get_inbox_mail_thread_associations_by_thread_id( + thread_id + ) + for assoc in association_entities: + if assoc.user_id != sender_id: + assoc.unread_mail_count += 1 + general.flush_or_commit(with_commit) + + inbox_mail_entitiy = InboxMail( + content=content, sender_id=sender_id, thread_id=thread_entity.id + ) + + general.add(inbox_mail_entitiy) + + if with_commit: + general.commit() + + return inbox_mail_entitiy + + +def get_first_in_thread(thread_id: str) -> Optional[InboxMail]: + inbox_mail_entity = ( + session.query(InboxMail) + .filter(InboxMail.thread_id == thread_id) + .order_by(asc(InboxMail.created_at)) + .first() + ) + return inbox_mail_entity + + +def get_last_in_thread(thread_id: str) -> Optional[InboxMail]: + inbox_mail_entity = ( + session.query(InboxMail) + .filter(InboxMail.thread_id == thread_id) + .order_by(desc(InboxMail.created_at)) + .first() + ) + return inbox_mail_entity + + +def get_participant_ids_by_thread_id(thread_id: str) -> List[str]: + associations = ( + session.query(InboxMailThreadAssociation) + .filter(InboxMailThreadAssociation.thread_id == thread_id) + .all() + ) + participant_ids = [assoc.user_id for assoc in associations] + return participant_ids + + +def update_thread_progress( + user_id: str, + thread_id: str, + progress_state: str, + user_name: str, + with_commit: bool = True, +) -> Dict[str, Any]: + thread_entity = get_inbox_mail_thread_by_id(thread_id) + thread_entity.progress_state = progress_state + if progress_state == InboxMailThreadSupportProgressState.IN_PROGRESS.value: + thread_entity.support_owner_id = user_id + meta_data = thread_entity.meta_data or {} + meta_data["supportOwnerName"] = user_name + thread_entity.meta_data = meta_data + flag_modified(thread_entity, "meta_data") + + if progress_state == InboxMailThreadSupportProgressState.PENDING.value: + thread_entity.support_owner_id = None + + general.flush_or_commit(with_commit) diff --git a/models.py b/models.py index 0ba844f..00d09b4 100644 --- a/models.py +++ b/models.py @@ -2561,3 +2561,71 @@ class ConversationGlobalShare(Base): index=True, ) created_at = Column(DateTime, default=sql.func.now()) + + +class InboxMailThread(Base): + __tablename__ = Tablenames.INBOX_MAIL_THREAD.value + __table_args__ = {"schema": "global"} + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + created_by = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="CASCADE"), + index=True, + ) + organization_id = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.ORGANIZATION.value}.id", ondelete="CASCADE"), + index=True, + ) + created_at = Column(DateTime, default=sql.func.now()) + subject = Column(String) + meta_data = Column(JSON) + is_important = Column(Boolean, default=False) + progress_state = Column( + String + ) # of type enums. InboxMailThreadSupportProgressState *.value + support_owner_id = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="SET NULL"), + index=True, + ) + is_admin_support_thread = Column(Boolean, default=False) + + +class InboxMail(Base): + __tablename__ = Tablenames.INBOX_MAIL.value + __table_args__ = {"schema": "global"} + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + created_at = Column(DateTime, default=sql.func.now()) + sender_id = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="SET NULL"), + index=True, + ) + thread_id = Column( + UUID(as_uuid=True), + ForeignKey( + f"global.{Tablenames.INBOX_MAIL_THREAD.value}.id", ondelete="CASCADE" + ), + index=True, + ) + content = Column(String) + + +class InboxMailThreadAssociation(Base): + __tablename__ = Tablenames.INBOX_MAIL_THREAD_ASSOCIATION.value + __table_args__ = {"schema": "global"} + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + thread_id = Column( + UUID(as_uuid=True), + ForeignKey( + f"global.{Tablenames.INBOX_MAIL_THREAD.value}.id", ondelete="CASCADE" + ), + index=True, + ) + user_id = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="CASCADE"), + index=True, + ) + unread_mail_count = Column(Integer, default=0)