diff --git a/bec_lib/bec_lib/endpoints.py b/bec_lib/bec_lib/endpoints.py index 72f073085..46472ba8f 100644 --- a/bec_lib/bec_lib/endpoints.py +++ b/bec_lib/bec_lib/endpoints.py @@ -1769,3 +1769,71 @@ def atlas_deployment_info(deployment_name: str): message_type=messages.VariableMessage, message_op=MessageOp.SET_PUBLISH, ) + + @staticmethod + def message_service_queue(): + """ + Endpoint for message service queue. This endpoint is used by clients of messaging services to + send messages using a messages.MessagingServiceMessage message. + + Returns: + EndpointInfo: Endpoint for message service queue. + """ + endpoint = f"{EndpointType.INTERNAL.value}/message_service/queue" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.MessagingServiceMessage, + message_op=MessageOp.STREAM, + ) + + @staticmethod + def message_service_scopes(): + """ + Endpoint for message service scopes. This endpoint is used by clients of messaging services to + listen for scope changes using a messages.MessagingServiceScopeMessage message. + + Returns: + EndpointInfo: Endpoint for message service scopes. + """ + endpoint = f"{EndpointType.INTERNAL.value}/message_service/scopes" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.MessagingServiceScopes, + message_op=MessageOp.STREAM, + ) + + @staticmethod + def message_service_ingest(deployment_name: str): + """ + Endpoint for ingesting messages for a particular deployment's messaging service. + + Returns: + EndpointInfo: Endpoint for message service ingest. + """ + endpoint = ( + f"{EndpointType.INTERNAL.value}/deployment/{deployment_name}/message_service/ingest" + ) + return EndpointInfo( + endpoint=endpoint, + message_type=messages.MessagingServiceMessage, + message_op=MessageOp.STREAM, + ) + + @staticmethod + def available_logbooks(realm_name: str): + """ + Endpoint for available logbooks. This endpoint is used to publish the available logbooks + using an AvailableResourceMessage. + + Args: + realm_name (str): Realm name. + + Returns: + EndpointInfo: Endpoint for available logbooks. + """ + endpoint = f"{EndpointType.INTERNAL.value}/realm/{realm_name}/info/available_logbooks" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.AvailableResourceMessage, + message_op=MessageOp.KEY_VALUE, + ) diff --git a/bec_lib/bec_lib/messages.py b/bec_lib/bec_lib/messages.py index 66b99d320..7aa2256ab 100644 --- a/bec_lib/bec_lib/messages.py +++ b/bec_lib/bec_lib/messages.py @@ -1380,3 +1380,49 @@ def check_macro(cls, values): if values.update_type == "add" and not values.file_path: raise ValueError("file_path must be provided for add actions") return values + + +class _MessagingServiceContent(BaseModel): + """ + Content of the message for messaging services + + """ + + content_type: Literal["text", "file", "tags"] + content: str | dict | list[str] + + +class MessagingServiceMessage(BECMessage): + """ + Message for communicating with messaging services such as Signal, Teams or SciLog + + Args: + service_name (Literal["signal", "teams", "scilog"]): Name of the messaging service + message (list[_MessagingServiceContent]): Message content + channel (str | list[str]): Channel or recipient to send the message to + metadata (dict, optional): Additional metadata. Defaults to None. + """ + + msg_type: ClassVar[str] = "messaging_service_message" + service_name: Literal["signal", "teams", "scilog"] + message: list[_MessagingServiceContent] + channel: str | list[str] | None = None + metadata: dict | None = Field(default_factory=dict) + + +class MessagingServiceScopes(BECMessage): + """ + Message for communicating available scopes for messaging services such as Signal, Teams or SciLog + + Args: + service_name (Literal["signal", "teams", "scilog"]): Name of the messaging service + scopes (list[str]): List of available scopes + enabled (bool): True if the service is enabled + metadata (dict, optional): Additional metadata. Defaults to None. + """ + + msg_type: ClassVar[str] = "messaging_service_scopes_message" + service_name: Literal["signal", "teams", "scilog"] + scopes: list[str] + enabled: bool + metadata: dict | None = Field(default_factory=dict) diff --git a/bec_lib/bec_lib/messaging_services.py b/bec_lib/bec_lib/messaging_services.py new file mode 100644 index 000000000..0d9247453 --- /dev/null +++ b/bec_lib/bec_lib/messaging_services.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +import os +from abc import ABC +from typing import TYPE_CHECKING, Self + +from bec_lib import messages +from bec_lib.endpoints import MessageEndpoints + +if TYPE_CHECKING: + from bec_lib.redis_connector import RedisConnector + + +class MessageServiceObject: + """ + A class representing a message object for a messaging service. + """ + + def __init__(self, service: MessagingService) -> None: + self._service = service + self._content = [] + + def add_text(self, text: str) -> Self: + """ + Add text to the message object. + + Args: + text (str): The text to add. + + Returns: + MessageObject: The updated message object. + """ + # Implementation to add text to the message + self._content.append(messages.MessagingServiceContent(content_type="text", content=text)) + return self + + def add_attachment(self, file_path: str) -> Self: + """ + Add an attachment to the message object. The file is read from + the given file path and included in the message, including its + metadata such as filename and MIME type. + + Please note that the maximum allowed file size for attachments is 5 MB. + + Args: + file_path (str): The file path of the attachment to add. + + Raises: + FileNotFoundError: If the attachment file does not exist. + ValueError: If the attachment file size exceeds the maximum limit of 5 MB. + + Returns: + MessageObject: The updated message object. + """ + + if not os.path.isfile(file_path): + raise FileNotFoundError(f"Attachment file not found: {file_path}") + + file_size = os.path.getsize(file_path) + max_size = 5 * 1024 * 1024 # 5 MB + if file_size > max_size: + raise ValueError( + f"Attachment file size exceeds the maximum limit of 5 MB: {file_size} bytes" + ) + + with open(file_path, "rb") as f: + file_data = f.read() + + filename = os.path.basename(file_path) + file_extension = os.path.splitext(filename)[1].lower() + if file_extension == ".txt": + mime_type = "text/plain" + elif file_extension in [".jpg", ".jpeg"]: + mime_type = "image/jpeg" + elif file_extension == ".png": + mime_type = "image/png" + elif file_extension == ".pdf": + mime_type = "application/pdf" + else: + mime_type = "application/octet-stream" + + attachment = {"filename": filename, "mime_type": mime_type, "data": file_data} + self._content.append( + messages.MessagingServiceContent(content_type="file", content=attachment) + ) + + return self + + def send(self) -> None: + """ + Send the message using the associated messaging service. + """ + self._service.send(self) + + +class MessagingService(ABC): + """ + Abstract base class for messaging services. + Inherit from this class to implement specific messaging services. + At minimum, override the _SERVICE_NAME attribute. + """ + + _SERVICE_NAME = "generic" + _MESSAGE_OBJECT_CLASS = MessageServiceObject + + def __init__(self, redis_connector: RedisConnector) -> None: + self._redis_connector = redis_connector + self._scopes = set() + self._enabled = False + self._redis_connector.register( + MessageEndpoints.message_service_scopes(), cb=self._on_new_scope_change_msg, parent=self + ) + + @staticmethod + def _on_new_scope_change_msg( + message: dict[str, messages.MessagingServiceScopes], parent: MessagingService + ) -> None: + """ + Callback for scope changes. Currently a placeholder for future functionality. + + Args: + message (dict[str, MessagingServiceScopes]): The scope change message. + parent (MessagingService): The parent messaging service instance. + """ + msg = message["data"] + parent._update_scopes(msg) + + def _update_scopes(self, scope_info: messages.MessagingServiceScopes) -> None: + """ + Update the scopes for the messaging service. + + Args: + scope_info (messages.MessagingServiceScopes): The new scope information. + """ + if scope_info.service_name != self._SERVICE_NAME: + return + self._scopes = scope_info.scopes + self._enabled = scope_info.enabled + + def new(self) -> MessageServiceObject: + """ + Create a new message object associated with this messaging service. + + Returns: + MessageServiceObject: A new message object. + """ + if not self._enabled: + raise RuntimeError(f"Messaging service '{self._SERVICE_NAME}' is not enabled.") + return MessageServiceObject(self) + + def send(self, message: MessageServiceObject) -> None: + """ + Send a message using the messaging service. + + Args: + message (MessageServiceObject): The message to send. + """ + if not self._enabled: + raise RuntimeError(f"Messaging service '{self._SERVICE_NAME}' is not enabled.") + bec_message = messages.MessagingServiceMessage( + service_name=self._SERVICE_NAME, # type: ignore + message=message._content, # pylint: disable=protected-access + ) + self._redis_connector.xadd(MessageEndpoints.message_service_queue(), {"data": bec_message}) + + +class SciLogMessageServiceObject(MessageServiceObject): + """ + A class representing a message object for the SciLog messaging service. + """ + + def add_tags(self, tags: str | list[str]) -> Self: + """ + Add tags to the SciLog message object. + + Args: + tags (str | list[str]): The tag or list of tags to add. + + Returns: + SciLogMessageServiceObject: The updated message object. + """ + if isinstance(tags, str): + tags = [tags] + self._content.append(messages.MessagingServiceContent(content_type="tags", content=tags)) + return self + + +class SciLogMessagingService(MessagingService): + """Messaging service for SciLog platform.""" + + _SERVICE_NAME = "scilog" + + def new(self) -> SciLogMessageServiceObject: + """ + Create a new SciLog message object associated with this messaging service. + + Returns: + SciLogMessageServiceObject: A new SciLog message object. + """ + if not self._enabled: + raise RuntimeError(f"Messaging service '{self._SERVICE_NAME}' is not enabled.") + return SciLogMessageServiceObject(self) + + +class TeamsMessagingService(MessagingService): + """Messaging service for Microsoft Teams platform.""" + + _SERVICE_NAME = "teams" + + +class SignalMessagingService(MessagingService): + """Messaging service for Signal platform.""" + + _SERVICE_NAME = "signal" diff --git a/bec_lib/tests/test_messaging_service.py b/bec_lib/tests/test_messaging_service.py new file mode 100644 index 000000000..578d51e98 --- /dev/null +++ b/bec_lib/tests/test_messaging_service.py @@ -0,0 +1,110 @@ +from bec_lib.endpoints import MessageEndpoints +from bec_lib.messaging_services import MessageServiceObject, SciLogMessagingService + + +def test_scilog_messaging_service_new(connected_connector): + service = SciLogMessagingService(connected_connector) + message = service.new() + assert isinstance(message, MessageServiceObject) + assert message._service == service # pylint: disable=protected-access + assert message._content == [] # pylint: disable=protected-access + + +def test_scilog_messaging_service_send(connected_connector): + service = SciLogMessagingService(connected_connector) + message = service.new() + message.add_text("Test message") + + message.send() + out = connected_connector.xread( + MessageEndpoints.message_service_queue(), from_start=True, count=1 + ) + assert len(out) == 1 + out = out[0]["data"] + assert out.service_name == "scilog" + assert len(out.message) == 1 + assert out.message[0].content_type == "text" + assert out.message[0].content == "Test message" + + +def test_scilog_messaging_service_send_with_attachment(tmp_path, connected_connector): + # Create a temporary file to use as an attachment + file_path = tmp_path / "test.txt" + file_content = "This is a test file." + with open(file_path, "w") as f: + f.write(file_content) + + service = SciLogMessagingService(connected_connector) + message = service.new() + message.add_text("Test message with attachment") + message.add_attachment(str(file_path)) + + message.send() + out = connected_connector.xread( + MessageEndpoints.message_service_queue(), from_start=True, count=1 + ) + assert len(out) == 1 + out = out[0]["data"] + assert out.service_name == "scilog" + assert len(out.message) == 2 + + # Check text part + assert out.message[0].content_type == "text" + assert out.message[0].content == "Test message with attachment" + + # Check attachment part + assert out.message[1].content_type == "file" + attachment = out.message[1].content + assert attachment["filename"] == "test.txt" + assert attachment["mime_type"] == "text/plain" + assert attachment["data"] == file_content.encode() + + +def test_scilog_messaging_service_send_image_attachment(tmp_path, connected_connector): + # Create a temporary image file to use as an attachment + file_path = tmp_path / "image.png" + with open(file_path, "wb") as f: + f.write(b"\x89PNG\r\n\x1a\n") # Write minimal PNG header + + service = SciLogMessagingService(connected_connector) + message = service.new() + message.add_text("Test message with image attachment") + message.add_attachment(str(file_path)) + + message.send() + out = connected_connector.xread( + MessageEndpoints.message_service_queue(), from_start=True, count=1 + ) + assert len(out) == 1 + out = out[0]["data"] + assert out.service_name == "scilog" + assert len(out.message) == 2 + + # Check text part + assert out.message[0].content_type == "text" + assert out.message[0].content == "Test message with image attachment" + + # Check attachment part + assert out.message[1].content_type == "file" + attachment = out.message[1].content + assert attachment["filename"] == "image.png" + assert attachment["mime_type"] == "image/png" + assert attachment["data"] == b"\x89PNG\r\n\x1a\n" + + +def test_scilog_messaging_service_add_tags(connected_connector): + service = SciLogMessagingService(connected_connector) + message = service.new() + message.add_text("Test message with tags") + message.add_tags(["tag1", "tag2"]) + + message.send() + out = connected_connector.xread(MessageEndpoints.message_service_queue(), from_start=True) + assert len(out) == 1 + out = out[0]["data"] + assert out.service_name == "scilog" + assert len(out.message) == 2 + assert out.message[0].content_type == "text" + assert out.message[0].content == "Test message with tags" + assert out.message[1].content_type == "tags" + assert out.message[1].content == ["tag1", "tag2"] diff --git a/bec_server/bec_server/scihub/atlas/atlas_connector.py b/bec_server/bec_server/scihub/atlas/atlas_connector.py index 1b9c97c0f..101f5d9a6 100644 --- a/bec_server/bec_server/scihub/atlas/atlas_connector.py +++ b/bec_server/bec_server/scihub/atlas/atlas_connector.py @@ -98,6 +98,22 @@ def ingest_data(self, data: dict) -> None: MessageEndpoints.atlas_deployment_ingest(self.deployment_name), data, max_size=1000 ) + def ingest_message(self, msg: dict) -> None: + """ + Ingest a message into Atlas to be consumed by a messaging service such as SciLog, Teams or Signal. + """ + if not self.connected_to_atlas: + logger.warning("Not connected to Atlas. Cannot ingest message.") + return + + if self.redis_atlas is None: + logger.error("Redis Atlas connection is not initialized.") + return + + self.redis_atlas.xadd( + MessageEndpoints.message_service_ingest(self.deployment_name), msg, max_size=1000 + ) + def update_acls(self): """ Update the ACLs from Atlas. This is done by reading the ACLs from the Atlas diff --git a/bec_server/bec_server/scihub/atlas/atlas_metadata_handler.py b/bec_server/bec_server/scihub/atlas/atlas_metadata_handler.py index c1bd465a5..a9c5d595c 100644 --- a/bec_server/bec_server/scihub/atlas/atlas_metadata_handler.py +++ b/bec_server/bec_server/scihub/atlas/atlas_metadata_handler.py @@ -31,6 +31,7 @@ def __init__(self, atlas_connector: AtlasConnector) -> None: self._start_account_subscription() self._start_scan_subscription() self._start_scan_history_subscription() + self._start_messaging_subscription() def _start_account_subscription(self): self.atlas_connector.connector.register( @@ -53,8 +54,13 @@ def _start_scan_history_subscription(self): MessageEndpoints.scan_history(), cb=self._handle_scan_history, parent=self ) + def _start_messaging_subscription(self): + self.atlas_connector.connector.register( + MessageEndpoints.message_service_queue(), cb=self._handle_messaging, parent=self + ) + @staticmethod - def _handle_atlas_account_update(msg, *, parent, **_kwargs) -> None: + def _handle_atlas_account_update(msg, *, parent: AtlasMetadataHandler, **_kwargs) -> None: if not isinstance(msg, dict) or "data" not in msg: logger.error(f"Invalid account message received from Atlas: {msg}") return @@ -74,7 +80,7 @@ def _update_local_account(self, account: str) -> None: logger.info(f"Updated local account to: {account}") @staticmethod - def _handle_account_info(msg, *, parent, **_kwargs) -> None: + def _handle_account_info(msg, *, parent: AtlasMetadataHandler, **_kwargs) -> None: if not isinstance(msg, dict) or "data" not in msg: logger.error(f"Invalid account message received: {msg}") return @@ -84,7 +90,7 @@ def _handle_account_info(msg, *, parent, **_kwargs) -> None: logger.info(f"Updated account to: {parent._account}") @staticmethod - def _handle_scan_status(msg, *, parent, **_kwargs) -> None: + def _handle_scan_status(msg, *, parent: AtlasMetadataHandler, **_kwargs) -> None: msg = msg.value try: parent.send_atlas_update({"scan_status": msg}) @@ -94,7 +100,7 @@ def _handle_scan_status(msg, *, parent, **_kwargs) -> None: logger.exception(f"Failed to update scan status: {content}") @staticmethod - def _handle_scan_history(msg, *, parent, **_kwargs) -> None: + def _handle_scan_history(msg, *, parent: AtlasMetadataHandler, **_kwargs) -> None: msg = msg["data"] try: parent.send_atlas_update({"scan_history": msg}) @@ -103,6 +109,15 @@ def _handle_scan_history(msg, *, parent, **_kwargs) -> None: content = traceback.format_exc() logger.exception(f"Failed to update scan history: {content}") + @staticmethod + def _handle_messaging(msg, *, parent: AtlasMetadataHandler, **_kwargs) -> None: + try: + parent.atlas_connector.ingest_message(msg) + # pylint: disable=broad-except + except Exception: + content = traceback.format_exc() + logger.exception(f"Failed to update messaging data: {content}") + def send_atlas_update(self, msg: dict) -> None: """ Update the scan status in Atlas