-
Notifications
You must be signed in to change notification settings - Fork 2
SSE initial implementation #204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
better connection lookup and remove the need for unwrap
- Add critical warning to docker-compose.yaml about SSE single-instance limitation * SSE connections tracked in-memory with DashMap * Must not scale horizontally without Redis Pub/Sub * Warns about symptom: events randomly fail with multiple replicas - Add nginx configuration for /api/sse endpoint * Disable proxy buffering for immediate event streaming * Set 24h read timeout for long-lived SSE connections * Enable chunked transfer encoding * Clear connection header for proper streaming * Add CORS headers for credential support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Create standalone SSE crate to avoid circular dependencies between service and web layers. Uses generic types (String for IDs, serde_json::Value for payloads) to remain independent of domain models. Key components: - ConnectionRegistry: Dual-index (connection_id, user_id) architecture using DashMap for O(1) concurrent lookups - Manager: High-level API for connection lifecycle and message routing - Message types: Event enum with action, agreement, goal, and system events - MessageScope: User-targeted and broadcast message delivery Architecture decisions: - In-memory connection tracking (single-instance only) - Generic types to avoid domain dependency - Thread-safe using DashMap and Arc - Tokio channels for event distribution 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Add SSE manager to the service layer's AppState to enable real-time event distribution throughout the application. The manager is wrapped in Arc for thread-safe sharing across request handlers. Changes: - Add sse dependency to service crate - Add sse_manager: Arc<sse::Manager> field to AppState - Update AppState::new() to accept sse_manager parameter - Make sse_manager publicly accessible via getter This allows controllers to send SSE events by calling app_state.sse_manager.send_message() with appropriate message scope. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Implement Axum SSE handler that establishes long-lived HTTP connections for server-sent events. One connection per authenticated user, persisting across page navigation. Implementation: - Handler at GET /sse (behind authentication middleware) - Uses async_stream::stream! for event streaming - Registers user connection with SSE manager - Automatic cleanup when connection closes - Returns Sse<impl Stream<Item = Result<Event, Infallible>>> - Keep-alive enabled with default settings Technical details: - Tokio unbounded channel receives events from manager - Stream yields events as they arrive from channel - Connection ID generated server-side for lifecycle tracking - Converts domain::Id to String for SSE layer compatibility 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Create and pass SSE manager instance to AppState in both the main application and database seeding utility. Changes: - main.rs: Initialize Arc<sse::Manager::new()> and pass to AppState - seed_db.rs: Initialize SSE manager for test data seeding context The manager is created once at startup and shared across all request handlers via Arc for thread-safe access. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Update three authentication middleware tests to initialize SSE manager when creating AppState, matching the new 3-parameter constructor signature. Fixed tests: - test_require_auth_returns_401_with_no_session - test_require_auth_returns_401_with_invalid_session_cookie - test_require_auth_allows_authenticated_request_to_proceed Each test now creates Arc<sse::Manager::new()> before constructing AppState to maintain test isolation while matching production code. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Update project documentation to reflect the new SSE (Server-Sent Events) real-time communication infrastructure. Changes: - README.md: Add sse crate to project directory structure - crate_dependency_graph.md: Add sse crate dependencies (web→sse, service→sse) - system_architecture_diagram.md: Add SSE Handler and SSE Manager components with event flow from domain layer - network_flow_diagram.md: Document SSE endpoint configuration and single-instance scaling limitation Key documentation notes: - SSE uses in-memory connection tracking (single-instance only) - Nginx configured for long-lived connections (24h timeout, no buffering) - Generic types used in sse crate to avoid circular dependencies 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
…ling - Change login endpoint from /user_sessions to /login - Use form-encoded data instead of JSON for login requests - Update cookie name from session_id to id throughout codebase - Parse ApiResponse wrapper structure for user data - Improve error messages with response body details 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
- Add x-version: 1.0.0-beta1 header to all API requests - Fix coaching relationships endpoint to include organization_id - Add get_user_organizations method to fetch user's orgs - Parse ApiResponse wrapper for all endpoint responses - Update all cookie headers to use 'id' instead of 'session_id' 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Add test_connection function that verifies basic SSE connectivity without requiring coaching data. This scenario: - Establishes SSE connections for both users - Waits 2 seconds to verify connections stay alive - Reports success if connections remain stable This allows testing SSE infrastructure without admin permissions. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
- Add ConnectionTest scenario choice to CLI - Add ForceLogoutTest scenario choice to CLI - Make test environment setup conditional (skip for ConnectionTest) - Update All scenario to include ConnectionTest - Improve scenario descriptions with requirements ConnectionTest can run without admin permissions since it doesn't require creating coaching relationships or sessions. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
…rceLogoutTest - Remove unused test_env parameter from test_force_logout function - Skip test environment setup for ForceLogoutTest scenario - Fix force_logout cookie header to use 'id' instead of 'session_id' - Update README with new connection-test scenario documentation - Clarify permission requirements for each test scenario - Add example output for connection test 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
|
@calebbourg I don't seem to be able to run the sse-test-client successfully on my local machine, I might be missing something but I am running with the examples provided under Here's what I'm getting: > cargo run -p sse-test-client -- \
--base-url http://localhost:4000 \
--user1 "user1@example.com:password123" \
--user2 "user2@example.com:password456" \
--scenario connection-test
=== SETUP PHASE ===
→ Authenticating users...
Error: Login failed: 401 Unauthorized - Response: UNAUTHORIZEDI'm guessing that I need to use a real test user from our seeded data for dev. If this is what I'm missing, I recommend updating the documentation in this README.md to just use the actual users instead of non-existent ones since this data isn't for production anyway and poses no security threat. |
| --user1 "user1@example.com:password123" \ | ||
| --user2 "user2@example.com:password456" \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the real seeded dev data here since it poses no security threat and helps a developer trying to use these examples to already have them ready to copy + paste from here.
|
|
||
| /// User 1 credentials (format: email:password) | ||
| #[arg(long)] | ||
| user1: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this test client, do we always need to test with a coach and a coachee?
|
Now I'm seeing the following error once I use two valid users (me as a user, and you as a user) when running the sse-test-client: > cargo run -p sse-test-client -- \
--base-url http://localhost:4000 \
--user1 "jim@refactorcoach.com:password" \
--user2 "calebbourg2@gmail.com:password" \
--scenario action-create
14:49:39 [INFO] summary="SELECT DISTINCT \"organizations\".\"id\", \"organizations\".\"name\", …" db.statement="\n\nSELECT DISTINCT \"organizations\".\"id\", \"organizations\".\"name\", \"organizations\".\"logo\", \"organizations\".\"slug\", \"organizations\".\"created_at\", \"organizations\".\"updated_at\" FROM \"refactor_platform\".\"organizations\" INNER JOIN \"refactor_platform\".\"user_roles\" ON \"organizations\".\"id\" = \"user_roles\".\"organization_id\" WHERE \"user_roles\".\"user_id\" = $1\n" rows_affected=1 rows_returned=1 elapsed=224.417µs elapsed_secs=0.000224417
14:49:39 [DEBUG] (2) sea_orm::driver::sqlx_postgres: SELECT "coaching_relationships"."id", "coaching_relationships"."organization_id", "coaching_relationships"."coach_id", "coaching_relationships"."coachee_id", "coaching_relationships"."slug", "coaching_relationships"."created_at", "coaching_relationships"."updated_at" FROM "refactor_platform"."coaching_relationships" WHERE "coaching_relationships"."organization_id" IN (SELECT "organizations"."id" FROM "refactor_platform"."organizations" WHERE "organizations"."id" = '7254d83b-75bb-4523-89e6-7c17005c4a7e')
14:49:39 [INFO] summary="SELECT \"coaching_relationships\".\"id\", \"coaching_relationships\".\"organization_id\", \"coaching_relationships\".\"coach_id\", …" db.statement="\n\nSELECT \"coaching_relationships\".\"id\", \"coaching_relationships\".\"organization_id\", \"coaching_relationships\".\"coach_id\", \"coaching_relationships\".\"coachee_id\", \"coaching_relationships\".\"slug\", \"coaching_relationships\".\"created_at\", \"coaching_relationships\".\"updated_at\" FROM \"refactor_platform\".\"coaching_relationships\" WHERE \"coaching_relationships\".\"organization_id\" IN (SELECT \"organizations\".\"id\" FROM \"refactor_platform\".\"organizations\" WHERE \"organizations\".\"id\" = $1)\n" rows_affected=5 rows_returned=5 elapsed=2.769666ms elapsed_secs=0.002769666
14:49:39 [ERROR] Coaching relationship already exists for coach: a37817a8-ad03-4f04-a6a4-2fe9ca070172 and coachee: 9b66b36a-62f3-4815-a6bb-0a62756537c2 in organization: 00000000-0000-0000-0000-000000000000
14:49:39 [WARN] EntityErrorKind::Other: Responding with 500 Internal Server Error. Error: Domain(Error { source: Some(Error { source: None, error_kind: ValidationError }), error_kind: Internal(Entity(Other("EntityErrorKind"))) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@calebbourg As we discussed in our sync, consider moving sse-test-client into something like a tools directory or test-tools or something like it.
| add_header Content-Type text/plain; | ||
| } | ||
|
|
||
| # SSE endpoint requires special configuration to prevent nginx from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add monitoring to this somehow in DO?
Description
This PR implements the foundational infrastructure for Server-Sent Events (SSE) to enable real-time, one-way communication from the backend to authenticated users. This establishes the core architecture needed for future real-time features like action notifications, coaching session updates, and system announcements.
GitHub Issue: [Closes|Fixes|Resolves] #your GitHub issue number here
Changes
ssecrate: Standalone crate with generic types (String IDs, JSON payloads) to avoid circular dependenciesConnectionRegistry: Dual-index architecture using DashMap for O(1) concurrent lookups by connection_id and user_idManager: High-level API for connection lifecycle and message routingMessagetypes: Event enum with action, agreement, goal, and system event variantsMessageScope: User-targeted and broadcast message deliveryweb/src/sse/handler.rs): GET /sse endpoint for establishing long-lived SSE connectionsasync_stream::stream!and Tokio channelssse_manager: Arc<sse::Manager>to service layer's AppStateapp_state.sse_manager.send_message()main.rsandseed_db.rs/api/sseendpoint (24h timeout, no buffering, chunked encoding)sse-test-client): CLI tool for integration testing SSE functionalityto reflect SSE components
Testing Strategy
Concerns
serde_json::Valuefor payloads to avoid circular dependencies - type safety enforced at web layer boundaries