diff --git a/application_sdk/server/fastapi/__init__.py b/application_sdk/server/fastapi/__init__.py index c2b536a06..72f682acf 100644 --- a/application_sdk/server/fastapi/__init__.py +++ b/application_sdk/server/fastapi/__init__.py @@ -42,6 +42,7 @@ HttpWorkflowTrigger, PreflightCheckRequest, PreflightCheckResponse, + Subscription, TestAuthRequest, TestAuthResponse, WorkflowConfigRequest, @@ -90,12 +91,15 @@ class APIServer(ServerInterface): workflow_router: APIRouter dapr_router: APIRouter events_router: APIRouter + subscription_router: APIRouter handler: Optional[HandlerInterface] templates: Jinja2Templates duckdb_ui: DuckDBUI docs_directory_path: str = "docs" docs_export_path: str = "dist" + # List of subscriptions to be registered + subscriptions: List[Subscription] = [] frontend_assets_path: str = "frontend/static" @@ -112,6 +116,7 @@ def __init__( frontend_templates_path: str = "frontend/templates", ui_enabled: bool = True, has_configmap: bool = False, + subscriptions: List[Subscription] = [], ): """Initialize the FastAPI application. @@ -138,7 +143,7 @@ def __init__( self.workflow_router = APIRouter() self.dapr_router = APIRouter() self.events_router = APIRouter() - + self.subscriptions = subscriptions # Set up the application error_handler = internal_server_error_handler # Store as local variable self.app.add_exception_handler( @@ -205,6 +210,7 @@ def register_routers(self): - Workflow router (/workflows/v1) - Pubsub router (/dapr) - Events router (/events/v1) + - Subscription router (/subscriptions/v1) """ # Register all routes first self.register_routes() @@ -215,6 +221,16 @@ def register_routers(self): self.app.include_router(self.dapr_router, prefix="/dapr") self.app.include_router(self.events_router, prefix="/events/v1") + # Register subscription routes from subscriptions with handler callbacks + subscription_router = APIRouter() + for subscription in self.subscriptions: + subscription_router.add_api_route( + f"/{subscription.route}", + subscription.handler, + methods=["POST"], + ) + self.app.include_router(subscription_router, prefix="/subscriptions/v1") + def fallback_home(self, request: Request) -> HTMLResponse: return self.templates.TemplateResponse( "index.html", @@ -432,6 +448,19 @@ async def get_dapr_subscriptions( """ subscriptions: List[dict[str, Any]] = [] + for subscription in self.subscriptions: + subscription_dict: dict[str, Any] = { + "pubsubname": subscription.component_name, + "topic": subscription.topic, + "route": f"/subscriptions/v1/{subscription.route}", + } + if subscription.bulk_config: + subscription_dict["bulkSubscribe"] = ( + subscription.bulk_config.model_dump(by_alias=True) + ) + if subscription.dead_letter_topic: + subscription_dict["deadLetterTopic"] = subscription.dead_letter_topic + subscriptions.append(subscription_dict) for event_trigger in self.event_triggers: filters = [ f"({event_filter.path} {event_filter.operator} '{event_filter.value}')" diff --git a/application_sdk/server/fastapi/models.py b/application_sdk/server/fastapi/models.py index 14805c76b..7fdc91e70 100644 --- a/application_sdk/server/fastapi/models.py +++ b/application_sdk/server/fastapi/models.py @@ -1,7 +1,7 @@ # Request/Response DTOs for workflows from enum import Enum -from typing import Any, Dict, List, Optional, Type +from typing import Any, Callable, Coroutine, Dict, List, Optional, Type, Union from pydantic import BaseModel, Field, RootModel @@ -240,3 +240,64 @@ class EventWorkflowTrigger(WorkflowTrigger): def should_trigger_workflow(self, event: Event) -> bool: return True + + +class Subscription(BaseModel): + """Subscription configuration for Dapr messaging. + + Attributes: + component_name: Name of the Dapr pubsub component + topic: Topic to subscribe to + route: Route path for the message handler endpoint + handler: Required callback function to handle incoming messages + bulk_config: Optional bulk subscribe configuration + dead_letter_topic: Optional dead letter topic for failed messages + + Nested Classes: + BulkConfig: Configuration for bulk message processing + MessageStatus: Status codes for handler responses (SUCCESS, RETRY, DROP) + """ + + class BulkConfig(BaseModel): + """Bulk configuration for Dapr messaging. + + Attributes: + enabled: Whether bulk subscribe is enabled + max_messages_count: Maximum number of messages to receive in a batch + max_await_duration_ms: Maximum time to wait for messages in milliseconds + """ + + enabled: bool = False + max_messages_count: int = Field( + default=100, serialization_alias="maxMessagesCount" + ) + max_await_duration_ms: int = Field( + default=40, serialization_alias="maxAwaitDurationMs" + ) + + class MessageStatus(str, Enum): + """Status codes for Dapr pub/sub subscription message handler responses. + + Used in subscription handler responses to indicate how Dapr should handle the message. + Based on Dapr docs: https://docs.dapr.io/reference/api/pubsub_api/#expected-http-response + + Attributes: + SUCCESS: Message was processed successfully. + RETRY: Message processing failed, should be retried. + DROP: Message should be dropped (sent to dead letter topic if configured). + """ + + SUCCESS = "SUCCESS" + RETRY = "RETRY" + DROP = "DROP" + + model_config = {"arbitrary_types_allowed": True} + + component_name: str + topic: str + route: str + handler: Union[ + Callable[[Any], Any], Callable[[Any], Coroutine[Any, Any, Any]] + ] # Required callback function (sync or async) + bulk_config: Optional[BulkConfig] = None + dead_letter_topic: Optional[str] = None diff --git a/components/eventstore.yaml b/components/eventstore.yaml index 17ba81da5..268b57531 100644 --- a/components/eventstore.yaml +++ b/components/eventstore.yaml @@ -22,4 +22,4 @@ spec: # metadata: # - name: url # value: https://atlan.url/eventurl -# value: "https://webhook.site/testhash" \ No newline at end of file +# value: "https://webhook.site/testhash" diff --git a/components/pubsub.yaml b/components/pubsub.yaml new file mode 100644 index 000000000..08f3f71d4 --- /dev/null +++ b/components/pubsub.yaml @@ -0,0 +1,41 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub +spec: + type: pubsub.in-memory + version: v1 +# Bulk subscribe for in-memory pubsub has a batch size of 1 as it is not supported. +## Use the below component for Kafka. Kafka supports bulk subscribe. +# apiVersion: dapr.io/v1alpha1 +# kind: Component +# metadata: +# name: pubsub +# spec: +# type: pubsub.kafka +# version: v1 +# metadata: +# # Kafka broker configuration +# - name: brokers +# value: "localhost:9092" + +# # Consumer configuration +# - name: consumerGroup +# value: "message-processor" + +# - name: clientId +# value: "message-processor-client" + +# # Session timeout +# - name: sessionTimeout +# value: "600s" + +# - name: authType +# value: "none" + +# # - name: bulkSubscribe +# # value: "true" +# # - name: maxMessagesCount +# # value: "10" +# # - name: maxAwaitDurationMs +# # value: "1000" diff --git a/docs/docs/concepts/server.md b/docs/docs/concepts/server.md index 7d6c6e362..8f108cbbe 100644 --- a/docs/docs/concepts/server.md +++ b/docs/docs/concepts/server.md @@ -30,6 +30,12 @@ This module provides the core server framework for building Atlan applications, 5. **Models (`application_sdk.server.fastapi.models.py`)**: * **Purpose:** Defines Pydantic models used for request/response validation and serialization for the default API endpoints (e.g., `TestAuthRequest`, `WorkflowResponse`, `PreflightCheckRequest`, `PreflightCheckResponse`). +6. **Subscriptions (`application_sdk.server.fastapi.models.py`)**: + * **Purpose:** Configure Dapr pub/sub message subscriptions for event-driven processing without Temporal workflows. + * `Subscription`: Defines a subscription to a Dapr pubsub topic with a handler callback. + * `Subscription.BulkConfig`: Nested class for bulk message processing configuration. + * `Subscription.MessageStatus`: Nested enum for handler response status codes (`SUCCESS`, `RETRY`, `DROP`). + ## Usage Patterns ### 1. Using the Default FastAPI Server @@ -249,6 +255,85 @@ api_server = APIServer( ) ``` +### 4. Using Subscriptions for Message Processing + +For event-driven applications that process messages from Dapr pub/sub without Temporal workflows, you can use `Subscription` to define message handlers. + +```python +# In your main server file (e.g., main.py) +import asyncio +from typing import Any, Dict + +from application_sdk.server.fastapi import APIServer +from application_sdk.server.fastapi.models import Subscription +from application_sdk.observability.logger_adaptor import get_logger + +logger = get_logger(__name__) + +# Define a sync message handler +def process_message(message: Dict[str, Any]) -> dict: + """Process a single message from Dapr pubsub.""" + event_data = message.get("data", message) + logger.info(f"Processing message: {event_data}") + + # Return status using Subscription.MessageStatus enum + return {"status": Subscription.MessageStatus.SUCCESS} + +# Define an async handler for bulk processing +async def process_bulk_messages(request: Dict[str, Any]) -> dict: + """Process messages in bulk from Dapr pubsub.""" + if "entries" in request: + # Bulk format + statuses = [] + for entry in request.get("entries", []): + entry_id = entry.get("entryId", "unknown") + # Process each entry + statuses.append({"entryId": entry_id, "status": Subscription.MessageStatus.SUCCESS}) + return {"statuses": statuses} + else: + # Single message format + return {"status": Subscription.MessageStatus.SUCCESS} + +async def main(): + # Define subscriptions with handler callbacks + subscription = Subscription( + component_name="messaging", # Dapr pubsub component name + topic="events-topic", # Topic to subscribe to + route="events-topic", # Route path for the handler endpoint + handler=process_message, # Callback function (sync or async) + dead_letter_topic="events-dlq", # Optional dead letter topic + ) + + bulk_subscription = Subscription( + component_name="messaging", + topic="bulk-events-topic", + route="bulk-events", + handler=process_bulk_messages, + bulk_config=Subscription.BulkConfig( + enabled=True, + max_messages_count=100, # Max messages per batch + max_await_duration_ms=1000, # Max wait time for batch + ), + ) + + # Create server with subscriptions (no workflow_client needed) + server = APIServer( + subscriptions=[subscription, bulk_subscription], + ) + + await server.start() + +if __name__ == "__main__": + asyncio.run(main()) +``` + +This setup: +* Registers message handler endpoints at `/subscriptions/v1/{route}` +* Configures Dapr subscriptions via `/dapr/subscribe` endpoint +* Supports both sync and async handlers +* Supports bulk message processing with `Subscription.BulkConfig` +* Supports dead letter topics for failed messages + ## Summary -The `application_sdk.server` module, especially the `fastapi` sub-package, provides a robust foundation for building web servers that interact with Atlan handlers and Temporal workflows. You can use the default `APIServer` for simple cases, extend it with custom routers for specific API needs, and override handler methods to tailor the behavior of standard API endpoints. \ No newline at end of file +The `application_sdk.server` module, especially the `fastapi` sub-package, provides a robust foundation for building web servers that interact with Atlan handlers and Temporal workflows. You can use the default `APIServer` for simple cases, extend it with custom routers for specific API needs, override handler methods to tailor the behavior of standard API endpoints, and use `Subscription` for event-driven message processing. \ No newline at end of file diff --git a/tests/unit/server/fastapi/test_fastapi.py b/tests/unit/server/fastapi/test_fastapi.py index af0324c24..893351fff 100644 --- a/tests/unit/server/fastapi/test_fastapi.py +++ b/tests/unit/server/fastapi/test_fastapi.py @@ -4,6 +4,7 @@ import pytest from httpx import ASGITransport, AsyncClient from hypothesis import HealthCheck, given, settings +from pydantic import ValidationError from application_sdk.handlers import HandlerInterface from application_sdk.server.fastapi import ( @@ -12,6 +13,7 @@ PreflightCheckRequest, PreflightCheckResponse, ) +from application_sdk.server.fastapi.models import Subscription from application_sdk.test_utils.hypothesis.strategies.server.fastapi import ( payload_strategy, ) @@ -22,6 +24,151 @@ class SampleWorkflow(WorkflowInterface): pass +class TestSubscriptionModel: + """Test suite for Subscription model validation.""" + + def test_subscription_with_required_fields(self): + """Test Subscription creation with all required fields.""" + + def sync_handler(message: Dict[str, Any]) -> dict: + return {"status": "processed"} + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="test-route", + handler=sync_handler, + ) + + assert subscription.component_name == "pubsub" + assert subscription.topic == "test-topic" + assert subscription.route == "test-route" + assert subscription.handler == sync_handler + assert subscription.bulk_config is None + assert subscription.dead_letter_topic is None + + def test_subscription_with_async_handler(self): + """Test Subscription creation with an async message handler.""" + + async def async_handler(message: Dict[str, Any]) -> dict: + return {"status": "processed"} + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="test-route", + handler=async_handler, + ) + + assert subscription.handler == async_handler + + def test_subscription_with_bulk_config(self): + """Test Subscription creation with bulk configuration.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "processed"} + + bulk_config = Subscription.BulkConfig( + enabled=True, + max_messages_count=50, + max_await_duration_ms=100, + ) + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="test-route", + handler=handler, + bulk_config=bulk_config, + ) + + assert subscription.bulk_config is not None + assert subscription.bulk_config.enabled is True + assert subscription.bulk_config.max_messages_count == 50 + assert subscription.bulk_config.max_await_duration_ms == 100 + + def test_subscription_with_dead_letter_topic(self): + """Test Subscription creation with dead letter topic.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "processed"} + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="test-route", + handler=handler, + dead_letter_topic="test-dlq", + ) + + assert subscription.dead_letter_topic == "test-dlq" + + def test_subscription_with_all_optional_fields(self): + """Test Subscription creation with all optional fields configured.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "processed"} + + bulk_config = Subscription.BulkConfig( + enabled=True, + max_messages_count=200, + max_await_duration_ms=50, + ) + + subscription = Subscription( + component_name="my-pubsub", + topic="orders", + route="process-orders", + handler=handler, + bulk_config=bulk_config, + dead_letter_topic="orders-dlq", + ) + + assert subscription.component_name == "my-pubsub" + assert subscription.topic == "orders" + assert subscription.route == "process-orders" + assert subscription.bulk_config is not None + assert subscription.bulk_config.enabled is True + assert subscription.bulk_config.max_messages_count == 200 + assert subscription.dead_letter_topic == "orders-dlq" + + def test_subscription_missing_required_field(self): + """Test Subscription validation fails when required fields are missing.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "processed"} + + with pytest.raises(ValidationError) as exc_info: + Subscription( + component_name="pubsub", + topic="test-topic", + # Missing route + handler=handler, + ) + + assert "route" in str(exc_info.value) + + def test_subscription_bulk_config_defaults(self): + """Test Subscription.BulkConfig model default values.""" + bulk_config = Subscription.BulkConfig() + + assert bulk_config.enabled is False + assert bulk_config.max_messages_count == 100 + assert bulk_config.max_await_duration_ms == 40 + + def test_subscription_bulk_config_with_custom_values(self): + """Test Subscription.BulkConfig model with custom values.""" + bulk_config = Subscription.BulkConfig( + enabled=True, + max_messages_count=500, + max_await_duration_ms=200, + ) + + assert bulk_config.enabled is True + assert bulk_config.max_messages_count == 500 + assert bulk_config.max_await_duration_ms == 200 + + class TestServer: @pytest.fixture(autouse=True) def setup_method(self): @@ -190,3 +337,575 @@ async def test_event_trigger_conditions(self): # Assert assert response.status_code == 404 + + +class TestMessagingRouterRegistration: + """Test suite for messaging router registration with valid subscriptions.""" + + @pytest.fixture(autouse=True) + def setup_method(self): + """Setup method that runs before each test method.""" + self.mock_handler = Mock(spec=HandlerInterface) + self.mock_handler.preflight_check = AsyncMock() + self.app = APIServer(handler=self.mock_handler) + + @pytest.mark.asyncio + async def test_messaging_router_registration_with_sync_handler(self): + """Test messaging router registration with a sync message handler.""" + + def sync_message_handler(message: Dict[str, Any]) -> dict: + return {"status": "processed", "data": message} + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="process-message", + handler=sync_message_handler, + ) + + self.app.subscriptions = [subscription] + self.app.register_routers() + + # Verify the route is registered + transport = ASGITransport(app=self.app.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.post( + "/subscriptions/v1/process-message", + json={"test": "data"}, + ) + + assert response.status_code == 200 + response_data = response.json() + assert response_data["status"] == "processed" + + @pytest.mark.asyncio + async def test_messaging_router_registration_with_async_handler(self): + """Test messaging router registration with an async message handler.""" + + async def async_message_handler(message: Dict[str, Any]) -> dict: + return {"status": "async_processed", "data": message} + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="async-process", + handler=async_message_handler, + ) + + self.app.subscriptions = [subscription] + self.app.register_routers() + + transport = ASGITransport(app=self.app.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.post( + "/subscriptions/v1/async-process", + json={"test": "async_data"}, + ) + + assert response.status_code == 200 + response_data = response.json() + assert response_data["status"] == "async_processed" + + @pytest.mark.asyncio + async def test_messaging_router_registration_with_multiple_subscriptions(self): + """Test messaging router registration with multiple subscriptions.""" + + def handler_one(message: Dict[str, Any]) -> dict: + return {"handler": "one"} + + def handler_two(message: Dict[str, Any]) -> dict: + return {"handler": "two"} + + subscriptions = [ + Subscription( + component_name="pubsub", + topic="topic-one", + route="route-one", + handler=handler_one, + ), + Subscription( + component_name="pubsub", + topic="topic-two", + route="route-two", + handler=handler_two, + ), + ] + + self.app.subscriptions = subscriptions + self.app.register_routers() + + transport = ASGITransport(app=self.app.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + # Test first route + response_one = await ac.post( + "/subscriptions/v1/route-one", + json={}, + ) + assert response_one.status_code == 200 + assert response_one.json()["handler"] == "one" + + # Test second route + response_two = await ac.post( + "/subscriptions/v1/route-two", + json={}, + ) + assert response_two.status_code == 200 + assert response_two.json()["handler"] == "two" + + @pytest.mark.asyncio + async def test_messaging_router_not_registered_when_no_subscriptions(self): + """Test that no messaging routes are registered when subscriptions list is empty.""" + self.app.subscriptions = [] + self.app.register_routers() + + transport = ASGITransport(app=self.app.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.post( + "/subscriptions/v1/some-route", + json={}, + ) + + # Should return 404 since route is not registered + assert response.status_code == 404 + + +class TestDaprSubscriptionEndpointGeneration: + """Test suite for Dapr subscription endpoint generation.""" + + @pytest.fixture(autouse=True) + def setup_method(self): + """Setup method that runs before each test method.""" + self.mock_handler = Mock(spec=HandlerInterface) + self.mock_handler.preflight_check = AsyncMock() + self.app = APIServer(handler=self.mock_handler) + + @pytest.mark.asyncio + async def test_dapr_subscriptions_with_basic_subscription(self): + """Test Dapr subscription generation with basic subscription configuration.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "ok"} + + subscription = Subscription( + component_name="my-pubsub", + topic="my-topic", + route="handle-message", + handler=handler, + ) + + self.app.subscriptions = [subscription] + self.app.event_triggers = [] + + subscriptions = await self.app.get_dapr_subscriptions() + + assert len(subscriptions) == 1 + assert subscriptions[0]["pubsubname"] == "my-pubsub" + assert subscriptions[0]["topic"] == "my-topic" + assert subscriptions[0]["route"] == "/subscriptions/v1/handle-message" + assert "bulkSubscribe" not in subscriptions[0] + assert "deadLetterTopic" not in subscriptions[0] + + @pytest.mark.asyncio + async def test_dapr_subscriptions_with_bulk_subscribe_enabled(self): + """Test Dapr subscription generation with bulk subscribe enabled.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "ok"} + + bulk_config = Subscription.BulkConfig( + enabled=True, + max_messages_count=250, + max_await_duration_ms=75, + ) + + subscription = Subscription( + component_name="pubsub", + topic="bulk-topic", + route="bulk-handler", + handler=handler, + bulk_config=bulk_config, + ) + + self.app.subscriptions = [subscription] + self.app.event_triggers = [] + + subscriptions = await self.app.get_dapr_subscriptions() + + assert len(subscriptions) == 1 + assert "bulkSubscribe" in subscriptions[0] + assert subscriptions[0]["bulkSubscribe"]["enabled"] is True + assert subscriptions[0]["bulkSubscribe"]["maxMessagesCount"] == 250 + assert subscriptions[0]["bulkSubscribe"]["maxAwaitDurationMs"] == 75 + + @pytest.mark.asyncio + async def test_dapr_subscriptions_with_bulk_subscribe_disabled(self): + """Test Dapr subscription generation with bulk subscribe disabled.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "ok"} + + bulk_config = Subscription.BulkConfig( + enabled=False, + max_messages_count=100, + max_await_duration_ms=40, + ) + + subscription = Subscription( + component_name="pubsub", + topic="topic", + route="handler", + handler=handler, + bulk_config=bulk_config, + ) + + self.app.subscriptions = [subscription] + self.app.event_triggers = [] + + subscriptions = await self.app.get_dapr_subscriptions() + + assert len(subscriptions) == 1 + # bulkSubscribe should be included even when enabled=False + assert "bulkSubscribe" in subscriptions[0] + assert subscriptions[0]["bulkSubscribe"]["enabled"] is False + assert subscriptions[0]["bulkSubscribe"]["maxMessagesCount"] == 100 + assert subscriptions[0]["bulkSubscribe"]["maxAwaitDurationMs"] == 40 + + @pytest.mark.asyncio + async def test_dapr_subscriptions_with_dead_letter_topic(self): + """Test Dapr subscription generation with dead letter topic configured.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "ok"} + + subscription = Subscription( + component_name="pubsub", + topic="main-topic", + route="main-handler", + handler=handler, + dead_letter_topic="main-topic-dlq", + ) + + self.app.subscriptions = [subscription] + self.app.event_triggers = [] + + subscriptions = await self.app.get_dapr_subscriptions() + + assert len(subscriptions) == 1 + assert "deadLetterTopic" in subscriptions[0] + assert subscriptions[0]["deadLetterTopic"] == "main-topic-dlq" + + @pytest.mark.asyncio + async def test_dapr_subscriptions_with_all_options(self): + """Test Dapr subscription generation with all options configured.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "ok"} + + bulk_config = Subscription.BulkConfig( + enabled=True, + max_messages_count=500, + max_await_duration_ms=150, + ) + + subscription = Subscription( + component_name="kafka-pubsub", + topic="orders-topic", + route="process-orders", + handler=handler, + bulk_config=bulk_config, + dead_letter_topic="orders-dlq", + ) + + self.app.subscriptions = [subscription] + self.app.event_triggers = [] + + subscriptions = await self.app.get_dapr_subscriptions() + + assert len(subscriptions) == 1 + sub = subscriptions[0] + assert sub["pubsubname"] == "kafka-pubsub" + assert sub["topic"] == "orders-topic" + assert sub["route"] == "/subscriptions/v1/process-orders" + assert sub["bulkSubscribe"]["enabled"] is True + assert sub["bulkSubscribe"]["maxMessagesCount"] == 500 + assert sub["bulkSubscribe"]["maxAwaitDurationMs"] == 150 + assert sub["deadLetterTopic"] == "orders-dlq" + + @pytest.mark.asyncio + async def test_dapr_subscriptions_with_multiple_subscriptions(self): + """Test Dapr subscription generation with multiple subscriptions.""" + + def handler_one(message: Dict[str, Any]) -> dict: + return {"status": "one"} + + def handler_two(message: Dict[str, Any]) -> dict: + return {"status": "two"} + + subscriptions_list = [ + Subscription( + component_name="pubsub-a", + topic="topic-a", + route="handler-a", + handler=handler_one, + ), + Subscription( + component_name="pubsub-b", + topic="topic-b", + route="handler-b", + handler=handler_two, + dead_letter_topic="topic-b-dlq", + ), + ] + + self.app.subscriptions = subscriptions_list + self.app.event_triggers = [] + + subscriptions = await self.app.get_dapr_subscriptions() + + assert len(subscriptions) == 2 + + # First subscription + assert subscriptions[0]["pubsubname"] == "pubsub-a" + assert subscriptions[0]["topic"] == "topic-a" + assert subscriptions[0]["route"] == "/subscriptions/v1/handler-a" + + # Second subscription + assert subscriptions[1]["pubsubname"] == "pubsub-b" + assert subscriptions[1]["topic"] == "topic-b" + assert subscriptions[1]["route"] == "/subscriptions/v1/handler-b" + assert subscriptions[1]["deadLetterTopic"] == "topic-b-dlq" + + @pytest.mark.asyncio + async def test_dapr_subscriptions_endpoint_via_http(self): + """Test the /dapr/subscribe endpoint returns correct subscription config.""" + + def handler(message: Dict[str, Any]) -> dict: + return {"status": "ok"} + + subscription = Subscription( + component_name="test-pubsub", + topic="test-topic", + route="test-handler", + handler=handler, + bulk_config=Subscription.BulkConfig(enabled=True, max_messages_count=100), + ) + + self.app.subscriptions = [subscription] + self.app.event_triggers = [] + self.app.register_routers() + + transport = ASGITransport(app=self.app.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.get("/dapr/subscribe") + + assert response.status_code == 200 + subscriptions = response.json() + assert len(subscriptions) == 1 + assert subscriptions[0]["pubsubname"] == "test-pubsub" + assert subscriptions[0]["topic"] == "test-topic" + assert subscriptions[0]["bulkSubscribe"]["enabled"] is True + + +class TestMessageHandlerCallbackInvocation: + """Test suite for message handler callback invocation.""" + + @pytest.fixture(autouse=True) + def setup_method(self): + """Setup method that runs before each test method.""" + self.mock_handler = Mock(spec=HandlerInterface) + self.mock_handler.preflight_check = AsyncMock() + self.app = APIServer(handler=self.mock_handler) + + @pytest.mark.asyncio + async def test_sync_handler_receives_correct_data(self): + """Test that sync handler receives the correct request data.""" + received_data = [] + + def sync_handler(message: Dict[str, Any]) -> dict: + received_data.append(message) + return {"received": True} + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="sync-route", + handler=sync_handler, + ) + + self.app.subscriptions = [subscription] + self.app.register_routers() + + test_payload = {"message": "hello", "value": 123} + + transport = ASGITransport(app=self.app.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.post( + "/subscriptions/v1/sync-route", + json=test_payload, + ) + + assert response.status_code == 200 + assert response.json()["received"] is True + assert len(received_data) == 1 + assert received_data[0]["message"] == "hello" + assert received_data[0]["value"] == 123 + + @pytest.mark.asyncio + async def test_async_handler_receives_correct_data(self): + """Test that async handler receives the correct request data.""" + received_data = [] + + async def async_handler(message: Dict[str, Any]) -> dict: + received_data.append(message) + return {"async_received": True} + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="async-route", + handler=async_handler, + ) + + self.app.subscriptions = [subscription] + self.app.register_routers() + + test_payload = {"message": "async_hello", "value": 456} + + transport = ASGITransport(app=self.app.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.post( + "/subscriptions/v1/async-route", + json=test_payload, + ) + + assert response.status_code == 200 + assert response.json()["async_received"] is True + assert len(received_data) == 1 + assert received_data[0]["message"] == "async_hello" + assert received_data[0]["value"] == 456 + + @pytest.mark.asyncio + async def test_handler_with_call_tracking(self): + """Test handler invocation with call tracking to verify invocation.""" + call_count = [0] + + def tracked_handler(message: Dict[str, Any]) -> dict: + call_count[0] += 1 + return {"call_count": call_count[0]} + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="tracked-route", + handler=tracked_handler, + ) + + self.app.subscriptions = [subscription] + self.app.register_routers() + + transport = ASGITransport(app=self.app.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + # First call + response1 = await ac.post( + "/subscriptions/v1/tracked-route", + json={"test": "data"}, + ) + assert response1.status_code == 200 + assert call_count[0] == 1 + + # Second call + response2 = await ac.post( + "/subscriptions/v1/tracked-route", + json={"test": "more_data"}, + ) + assert response2.status_code == 200 + assert call_count[0] == 2 + + @pytest.mark.asyncio + async def test_handler_error_propagation(self): + """Test that handler errors are properly propagated.""" + + def error_handler(message: Dict[str, Any]) -> dict: + raise ValueError("Handler error occurred") + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="error-route", + handler=error_handler, + ) + + self.app.subscriptions = [subscription] + self.app.register_routers() + + transport = ASGITransport(app=self.app.app, raise_app_exceptions=False) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.post( + "/subscriptions/v1/error-route", + json={}, + ) + + # FastAPI returns 500 for unhandled exceptions + assert response.status_code == 500 + + @pytest.mark.asyncio + async def test_async_handler_error_propagation(self): + """Test that async handler errors are properly propagated.""" + + async def async_error_handler(message: Dict[str, Any]) -> dict: + raise RuntimeError("Async handler error occurred") + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="async-error-route", + handler=async_error_handler, + ) + + self.app.subscriptions = [subscription] + self.app.register_routers() + + transport = ASGITransport(app=self.app.app, raise_app_exceptions=False) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.post( + "/subscriptions/v1/async-error-route", + json={}, + ) + + # FastAPI returns 500 for unhandled exceptions + assert response.status_code == 500 + + @pytest.mark.asyncio + async def test_handler_returns_custom_response(self): + """Test that handler can return custom response data.""" + + def custom_response_handler(message: Dict[str, Any]) -> dict: + return { + "status": "SUCCESS", + "processed_at": "2024-01-01T00:00:00Z", + "items_count": 42, + "metadata": {"source": "test"}, + } + + subscription = Subscription( + component_name="pubsub", + topic="test-topic", + route="custom-response-route", + handler=custom_response_handler, + ) + + self.app.subscriptions = [subscription] + self.app.register_routers() + + transport = ASGITransport(app=self.app.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.post( + "/subscriptions/v1/custom-response-route", + json={}, + ) + + assert response.status_code == 200 + response_data = response.json() + assert response_data["status"] == "SUCCESS" + assert response_data["items_count"] == 42 + assert response_data["metadata"]["source"] == "test"