Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion application_sdk/server/fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
HttpWorkflowTrigger,
PreflightCheckRequest,
PreflightCheckResponse,
Subscription,
TestAuthRequest,
TestAuthResponse,
WorkflowConfigRequest,
Expand Down Expand Up @@ -90,12 +91,15 @@ class APIServer(ServerInterface):
workflow_router: APIRouter
dapr_router: APIRouter
events_router: APIRouter
subscription_router: APIRouter
handler: Optional[HandlerInterface]
templates: Jinja2Templates
duckdb_ui: DuckDBUI

docs_directory_path: str = "docs"
docs_export_path: str = "dist"
# List of subscriptions to be registered
subscriptions: List[Subscription] = []

frontend_assets_path: str = "frontend/static"

Expand All @@ -112,6 +116,7 @@ def __init__(
frontend_templates_path: str = "frontend/templates",
ui_enabled: bool = True,
has_configmap: bool = False,
subscriptions: List[Subscription] = [],
):
"""Initialize the FastAPI application.

Expand All @@ -138,7 +143,7 @@ def __init__(
self.workflow_router = APIRouter()
self.dapr_router = APIRouter()
self.events_router = APIRouter()

self.subscriptions = subscriptions
# Set up the application
error_handler = internal_server_error_handler # Store as local variable
self.app.add_exception_handler(
Expand Down Expand Up @@ -205,6 +210,7 @@ def register_routers(self):
- Workflow router (/workflows/v1)
- Pubsub router (/dapr)
- Events router (/events/v1)
- Subscription router (/subscriptions/v1)
"""
# Register all routes first
self.register_routes()
Expand All @@ -215,6 +221,16 @@ def register_routers(self):
self.app.include_router(self.dapr_router, prefix="/dapr")
self.app.include_router(self.events_router, prefix="/events/v1")

# Register subscription routes from subscriptions with handler callbacks
subscription_router = APIRouter()
for subscription in self.subscriptions:
subscription_router.add_api_route(
f"/{subscription.route}",
subscription.handler,
methods=["POST"],
)
self.app.include_router(subscription_router, prefix="/subscriptions/v1")

def fallback_home(self, request: Request) -> HTMLResponse:
return self.templates.TemplateResponse(
"index.html",
Expand Down Expand Up @@ -432,6 +448,19 @@ async def get_dapr_subscriptions(
"""

subscriptions: List[dict[str, Any]] = []
for subscription in self.subscriptions:
subscription_dict: dict[str, Any] = {
"pubsubname": subscription.component_name,
"topic": subscription.topic,
"route": f"/subscriptions/v1/{subscription.route}",
}
if subscription.bulk_config:
subscription_dict["bulkSubscribe"] = (
subscription.bulk_config.model_dump(by_alias=True)
)
if subscription.dead_letter_topic:
Comment thread
firecast marked this conversation as resolved.
subscription_dict["deadLetterTopic"] = subscription.dead_letter_topic
subscriptions.append(subscription_dict)
for event_trigger in self.event_triggers:
filters = [
f"({event_filter.path} {event_filter.operator} '{event_filter.value}')"
Expand Down
63 changes: 62 additions & 1 deletion application_sdk/server/fastapi/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Request/Response DTOs for workflows

from enum import Enum
from typing import Any, Dict, List, Optional, Type
from typing import Any, Callable, Coroutine, Dict, List, Optional, Type, Union

from pydantic import BaseModel, Field, RootModel

Expand Down Expand Up @@ -240,3 +240,64 @@ class EventWorkflowTrigger(WorkflowTrigger):

def should_trigger_workflow(self, event: Event) -> bool:
return True


class Subscription(BaseModel):
"""Subscription configuration for Dapr messaging.

Attributes:
component_name: Name of the Dapr pubsub component
topic: Topic to subscribe to
route: Route path for the message handler endpoint
handler: Required callback function to handle incoming messages
bulk_config: Optional bulk subscribe configuration
dead_letter_topic: Optional dead letter topic for failed messages

Nested Classes:
BulkConfig: Configuration for bulk message processing
MessageStatus: Status codes for handler responses (SUCCESS, RETRY, DROP)
"""

class BulkConfig(BaseModel):
"""Bulk configuration for Dapr messaging.

Attributes:
enabled: Whether bulk subscribe is enabled
max_messages_count: Maximum number of messages to receive in a batch
max_await_duration_ms: Maximum time to wait for messages in milliseconds
"""

enabled: bool = False
max_messages_count: int = Field(
default=100, serialization_alias="maxMessagesCount"
)
max_await_duration_ms: int = Field(
default=40, serialization_alias="maxAwaitDurationMs"
)

class MessageStatus(str, Enum):
"""Status codes for Dapr pub/sub subscription message handler responses.

Used in subscription handler responses to indicate how Dapr should handle the message.
Based on Dapr docs: https://docs.dapr.io/reference/api/pubsub_api/#expected-http-response

Attributes:
SUCCESS: Message was processed successfully.
RETRY: Message processing failed, should be retried.
DROP: Message should be dropped (sent to dead letter topic if configured).
"""

SUCCESS = "SUCCESS"
RETRY = "RETRY"
DROP = "DROP"

model_config = {"arbitrary_types_allowed": True}

component_name: str
topic: str
Comment thread
firecast marked this conversation as resolved.
route: str
handler: Union[
Callable[[Any], Any], Callable[[Any], Coroutine[Any, Any, Any]]
] # Required callback function (sync or async)
bulk_config: Optional[BulkConfig] = None
dead_letter_topic: Optional[str] = None
Comment thread
niteesh-atlan marked this conversation as resolved.
2 changes: 1 addition & 1 deletion components/eventstore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ spec:
# metadata:
# - name: url
# value: https://atlan.url/eventurl
# value: "https://webhook.site/testhash"
# value: "https://webhook.site/testhash"
41 changes: 41 additions & 0 deletions components/pubsub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.in-memory
version: v1
# Bulk subscribe for in-memory pubsub has a batch size of 1 as it is not supported.
## Use the below component for Kafka. Kafka supports bulk subscribe.
# apiVersion: dapr.io/v1alpha1
# kind: Component
# metadata:
# name: pubsub
# spec:
# type: pubsub.kafka
# version: v1
# metadata:
# # Kafka broker configuration
# - name: brokers
# value: "localhost:9092"

# # Consumer configuration
# - name: consumerGroup
# value: "message-processor"

# - name: clientId
# value: "message-processor-client"

# # Session timeout
# - name: sessionTimeout
# value: "600s"

# - name: authType
# value: "none"

# # - name: bulkSubscribe
# # value: "true"
# # - name: maxMessagesCount
# # value: "10"
# # - name: maxAwaitDurationMs
# # value: "1000"
87 changes: 86 additions & 1 deletion docs/docs/concepts/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ This module provides the core server framework for building Atlan applications,
5. **Models (`application_sdk.server.fastapi.models.py`)**:
* **Purpose:** Defines Pydantic models used for request/response validation and serialization for the default API endpoints (e.g., `TestAuthRequest`, `WorkflowResponse`, `PreflightCheckRequest`, `PreflightCheckResponse`).

6. **Subscriptions (`application_sdk.server.fastapi.models.py`)**:
* **Purpose:** Configure Dapr pub/sub message subscriptions for event-driven processing without Temporal workflows.
* `Subscription`: Defines a subscription to a Dapr pubsub topic with a handler callback.
* `Subscription.BulkConfig`: Nested class for bulk message processing configuration.
* `Subscription.MessageStatus`: Nested enum for handler response status codes (`SUCCESS`, `RETRY`, `DROP`).

## Usage Patterns

### 1. Using the Default FastAPI Server
Expand Down Expand Up @@ -249,6 +255,85 @@ api_server = APIServer(
)
```

### 4. Using Subscriptions for Message Processing

For event-driven applications that process messages from Dapr pub/sub without Temporal workflows, you can use `Subscription` to define message handlers.

```python
# In your main server file (e.g., main.py)
import asyncio
from typing import Any, Dict

from application_sdk.server.fastapi import APIServer
from application_sdk.server.fastapi.models import Subscription
from application_sdk.observability.logger_adaptor import get_logger

logger = get_logger(__name__)

# Define a sync message handler
def process_message(message: Dict[str, Any]) -> dict:
"""Process a single message from Dapr pubsub."""
event_data = message.get("data", message)
logger.info(f"Processing message: {event_data}")

# Return status using Subscription.MessageStatus enum
return {"status": Subscription.MessageStatus.SUCCESS}

# Define an async handler for bulk processing
async def process_bulk_messages(request: Dict[str, Any]) -> dict:
"""Process messages in bulk from Dapr pubsub."""
if "entries" in request:
# Bulk format
statuses = []
for entry in request.get("entries", []):
entry_id = entry.get("entryId", "unknown")
# Process each entry
statuses.append({"entryId": entry_id, "status": Subscription.MessageStatus.SUCCESS})
return {"statuses": statuses}
else:
# Single message format
return {"status": Subscription.MessageStatus.SUCCESS}

async def main():
# Define subscriptions with handler callbacks
subscription = Subscription(
component_name="messaging", # Dapr pubsub component name
topic="events-topic", # Topic to subscribe to
route="events-topic", # Route path for the handler endpoint
handler=process_message, # Callback function (sync or async)
dead_letter_topic="events-dlq", # Optional dead letter topic
)

bulk_subscription = Subscription(
component_name="messaging",
topic="bulk-events-topic",
route="bulk-events",
handler=process_bulk_messages,
bulk_config=Subscription.BulkConfig(
enabled=True,
max_messages_count=100, # Max messages per batch
max_await_duration_ms=1000, # Max wait time for batch
),
)

# Create server with subscriptions (no workflow_client needed)
server = APIServer(
subscriptions=[subscription, bulk_subscription],
)

await server.start()

if __name__ == "__main__":
asyncio.run(main())
```

This setup:
* Registers message handler endpoints at `/subscriptions/v1/{route}`
* Configures Dapr subscriptions via `/dapr/subscribe` endpoint
* Supports both sync and async handlers
* Supports bulk message processing with `Subscription.BulkConfig`
* Supports dead letter topics for failed messages

## Summary

The `application_sdk.server` module, especially the `fastapi` sub-package, provides a robust foundation for building web servers that interact with Atlan handlers and Temporal workflows. You can use the default `APIServer` for simple cases, extend it with custom routers for specific API needs, and override handler methods to tailor the behavior of standard API endpoints.
The `application_sdk.server` module, especially the `fastapi` sub-package, provides a robust foundation for building web servers that interact with Atlan handlers and Temporal workflows. You can use the default `APIServer` for simple cases, extend it with custom routers for specific API needs, override handler methods to tailor the behavior of standard API endpoints, and use `Subscription` for event-driven message processing.
Loading
Loading