Skip to content

feat: convert upload/refresh to sse endpoints#30

Open
elliot14A wants to merge 9 commits intomainfrom
feat/sse-endpoints
Open

feat: convert upload/refresh to sse endpoints#30
elliot14A wants to merge 9 commits intomainfrom
feat/sse-endpoints

Conversation

@elliot14A
Copy link
Copy Markdown
Contributor

@elliot14A elliot14A commented Dec 4, 2025

Summary by CodeRabbit

  • New Features
    • Real-time Server-Sent Events (SSE) for uploads and refreshes, plus a client SSE utility and new SSE-based mutation hooks to stream progress.
  • UI Changes
    • Persistent progress toasts and dialog-based progress views with spinner, percentage, and incremental updates; some transient success toasts removed.
    • Forms and wizards show a live progress state during dataset/database uploads.
  • Breaking Changes
    • File refresh component now requires a datasetId prop.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Dec 4, 2025

Walkthrough

Replaces synchronous create/refresh flows with Server-Sent Events: server handlers stream status/error/complete events; frontend adds an SSE client, SSE-aware mutation hooks, and UI changes to display persistent progress and handle streamed updates.

Changes

Cohort / File(s) Summary
Server: SSE handlers (database & S3)
server/interfaces/http/routes/source/database/create.go, server/interfaces/http/routes/source/database/refresh.go, server/interfaces/http/routes/source/s3/create.go, server/interfaces/http/routes/source/s3/refresh.go
Convert create/refresh handlers to SSE (text/event-stream). Add SSEEvent/SSEData types, channels, sendEvent/handleFailure helpers, spawn background goroutines to perform work, emit status/error/complete events, manage lifecycle, transactions and resource cleanup; replace immediate HTTP JSON responses with streamed events.
Web: SSE client & mutation hooks
web/src/lib/sse-client.ts, web/src/lib/mutations/dataset/refresh-dataset-database.ts, web/src/lib/mutations/dataset/refresh-dataset.ts, web/src/lib/mutations/dataset/source-database-dataset.ts, web/src/lib/mutations/dataset/source-dataset.ts
Add fetchWithSSE and typed SSEEvent client utility. Add SSE-based hooks (useRefreshDatabaseDatasetSSE, useRefreshDatasetSSE, useSourceDatabaseDatasetSSE, useSourceDatasetSSE) that POST to SSE endpoints, accept onProgress callbacks, and handle auth/org headers; migrate some prior apiClient flows to SSE and deprecate older hooks/interfaces.
Frontend: integrate SSE progress UI
web/src/components/dataset/database-source-form.tsx, web/src/components/dataset/dataset-header.tsx, web/src/components/dataset/dataset-upload-wizard.tsx, web/src/components/dataset/file-refresh-wizard.tsx, web/src/components/dataset/unified-uploader.tsx, web/src/components/dataset/csv-validation-uppy.tsx
Wire SSE onProgress into UI: add persistent progress UI (toasts/dialogs/progress bars), adjust state and error handling, remove some discrete success toasts, consolidate toast flows, and update component props where required (e.g., FileRefreshWizardProps adds datasetId).
Misc / Exports
web/src/lib/sse-client.ts
New exports: typed SSEEvent and fetchWithSSE, with built-in auth/org header handling and streaming parse logic.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Handler as HTTP Handler
    participant SSEStream as SSE Stream
    participant Worker as Background Worker
    participant Services as Backend Services

    Client->>Handler: POST /source/{s3|database}/{create|refresh}
    activate Handler
    Handler->>SSEStream: set SSE headers, open stream
    Handler->>Worker: spawn goroutine to perform work
    Handler-->>Client: keep connection open (streaming)
    deactivate Handler

    activate Worker
    Worker->>SSEStream: Emit "status_update" (validating)
    Worker->>Services: validate project/dataset/config
    alt validation fails
        Worker->>SSEStream: Emit "error"
        Worker->>SSEStream: Emit "complete"
        Worker->>SSEStream: Close stream
    else proceed
        Worker->>SSEStream: Emit "status_update" (ingest/refresh)
        Worker->>Services: ingest / DB ops / metrics / summary / schema upload
        Worker->>SSEStream: Emit "status_update" (metrics/summary)
        Worker->>SSEStream: Emit "complete" (dataset + summary)
        Worker->>SSEStream: Close stream
    end
    deactivate Worker

    Client->>Client: fetchWithSSE receives events -> onProgress updates UI
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • Verify goroutine/channel lifecycle and that channels are closed on all error/success paths in server handlers.
  • Inspect sendEvent/handleFailure semantics vs. HTTP error semantics and fetchWithSSE parsing.
  • Review transaction begin/commit/rollback and resource cleanup in S3/database handlers.
  • Validate fetchWithSSE header/auth/org handling and streaming JSON parsing edge cases.
  • Confirm frontend state transitions, toast/dialog lifecycle, and component prop updates (notably FileRefreshWizardProps).

Possibly related PRs

  • factly/gopie#108 — Modifies web/src/components/dataset/dataset-upload-wizard.tsx; touches the same component and may conflict or overlap with SSE-driven toast/stepper changes.

Suggested reviewers

  • Nirlep5252

Poem

🐰 I hopped through lines of code and cheer,

Streams of progress now draw near.
Events of carrots, pings that flow,
Step by step the updates glow.
Hooray — the pipeline hums, we go! ✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately captures the main objective of the changeset, which converts multiple upload and refresh endpoints to use Server-Sent Events (SSE) for streaming progress instead of single JSON responses.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/sse-endpoints

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude Bot commented Dec 4, 2025

Claude finished @elliot14A's task —— View job


PR Review: Converting Upload/Refresh to SSE Endpoints

Review Tasks

  • Analyze modified Go files for SSE implementation
  • Check code quality and best practices
  • Identify potential bugs or issues
  • Review performance considerations
  • Examine security concerns
  • Assess test coverage needs

🔍 Code Review Summary

This PR successfully converts upload and refresh endpoints to Server-Sent Events (SSE), providing real-time progress updates for long-running operations. The implementation follows good patterns but has several areas for improvement.

Strengths

  1. Consistent SSE Implementation: All 4 files follow a similar pattern with proper SSE headers and streaming
  2. Good Error Handling: Comprehensive error handling with proper cleanup mechanisms
  3. Resource Cleanup: Well-implemented cleanup functions to handle partial failures
  4. Progress Tracking: Clear status updates throughout the process
  5. Transaction Management: Proper transaction handling in S3 refresh endpoint

🚨 Critical Issues

1. Goroutine Leak Risk (High Priority)

Files: All 4 files (create.go:159, refresh.go:83, etc.)

The SSE implementation starts goroutines that could leak if the client disconnects:

go func() {
    defer close(sseChan)
    // Long-running operations...
}()

Issue: No context cancellation when client disconnects
Impact: Memory leaks and resource exhaustion
Fix: Use ctx.Context() with context.WithCancel and monitor for client disconnection

2. Inconsistent User ID Extraction (High Priority)

Files:

  • s3/create.go:102 - Uses header: ctx.Get(middleware.UserCtxKey)
  • database/create.go:113 - Uses context: ctx.Locals(middleware.UserCtxKey)
  • s3/refresh.go:61 - Uses context: ctx.Locals(middleware.UserCtxKey)

Issue: Inconsistent user ID extraction methods
Impact: Potential authentication/authorization issues
Fix: Standardize on one approach (likely ctx.Locals)

3. Ignored JSON Marshal Errors (Medium Priority)

Files: All files (e.g., database/create.go:145)

payloadBytes, _ := json.Marshal(eventPayload)

Issue: Ignored errors could cause malformed SSE messages
Impact: Client-side parsing errors
Fix: Handle JSON marshal errors properly

🔧 Other Issues

4. Hardcoded Sleep Statements

Files: Multiple files (e.g., database/create.go:200, s3/refresh.go:168)

time.Sleep(2 * time.Second) // Wait for table creation

Issue: Arbitrary delays, not deterministic
Impact: Slower performance, potential race conditions
Fix: Use proper synchronization or polling mechanisms

5. Missing Input Validation

Files: s3/create.go:100

Issue: Organization ID validation happens after getting from header instead of middleware
Impact: Security risk if header manipulation occurs
Fix: Use middleware validation consistently

6. Resource Cleanup Race Conditions

Files: Database files (database/create.go:44)

Issue: Cleanup operations aren't synchronized
Impact: Potential partial cleanup failures
Fix: Add proper error handling for cleanup operations

7. Large Channel Buffer

Files: All files (sseChan := make(chan SSEData, 10))

Issue: Fixed buffer size might not be optimal
Impact: Potential blocking or memory usage
Fix: Consider unbuffered channel or dynamic sizing

🛡️ Security Concerns

  1. SQL Injection Prevention: Good use of parameterized queries through service layer
  2. Input Validation: Proper validation using struct tags
  3. Error Message Exposure: Consider sanitizing error messages sent to clients
  4. Resource Access Control: Organization ID validation needs improvement

🚀 Performance Considerations

  1. Connection Pooling: Ensure database connections are properly managed
  2. Memory Usage: SSE channels and long-running goroutines could accumulate
  3. Client Timeouts: No client timeout handling for long operations

📋 Test Coverage

Issue: No test files found for these endpoints
Recommendation: Add unit tests for:

  • SSE event formatting
  • Error handling scenarios
  • Resource cleanup logic
  • Timeout scenarios

📝 Recommendations

Immediate Actions:

  1. Fix goroutine leaks with proper context cancellation
  2. Standardize user ID extraction method
  3. Handle JSON marshal errors

Future Improvements:

  1. Add comprehensive test coverage
  2. Implement client timeout handling
  3. Consider using a more robust SSE library
  4. Add metrics/monitoring for long-running operations

Overall Assessment: The SSE implementation is functional and well-structured, but requires fixes for production readiness, particularly around resource management and error handling.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
server/interfaces/http/routes/source/s3/create.go (1)

17-28: Duplicate SSE types.

These types duplicate those in database/create.go. As suggested there, extract to a shared package.

🧹 Nitpick comments (6)
server/interfaces/http/routes/source/database/create.go (3)

17-28: Extract shared SSE types to a common package.

SSEEvent and SSEData are duplicated in s3/create.go (lines 17-28). Consider extracting these types to a shared package (e.g., interfaces/http/sse/types.go) to avoid duplication and ensure consistency.


158-161: Consider handling client disconnection to avoid wasted resources.

The goroutine continues processing even if the client disconnects (detected at line 394-402 via write/flush errors). For long-running operations like AI generation, this wastes compute resources.

Consider passing a cancellable context to the goroutine and checking ctx.Done() before expensive operations:

+	// Create cancellable context for the async operation
+	asyncCtx, cancel := context.WithCancel(ctx.Context())
+	defer cancel()
+
 	// Start async upload process
 	go func() {
 		defer close(sseChan)
+		// Check for cancellation before expensive operations
+		select {
+		case <-asyncCtx.Done():
+			return
+		default:
+		}
 		sendEvent("status_update", "Validating project...", nil)

Also, signal cancellation when stream writer encounters an error:

 	ctx.Response().SetBodyStreamWriter(func(w *bufio.Writer) {
+		defer cancel() // Cancel goroutine when stream closes
 		for sse := range sseChan {

Also applies to: 380-404


199-200: Fixed sleep is fragile; consider polling or callback.

The 2-second sleep assumes table creation completes within this window. This could fail under load or succeed earlier, wasting time. Consider polling the table existence or using a callback/event from the OLAP service if available.

server/interfaces/http/routes/source/s3/refresh.go (2)

128-139: Transaction defer relies on variable reuse — fragile but functional.

The defer block (lines 128-139) reads the final value of err to decide commit vs. rollback. This works because:

  • On error: early return leaves err set → rollback
  • On success: all operations succeed with err = nil → commit

However, variable reuse throughout the function is fragile. A future edit that shadows err (:= instead of =) could cause incorrect commits.

Consider using explicit error tracking:

+		var txErr error
+		defer func() {
+			if txErr != nil {
+				h.logger.Error("Rolling back transactions due to error", zap.Error(txErr))
+				olapTx.Rollback()
+				storeTx.Rollback(txCtx)
+			} else {
+				h.logger.Info("Committing transactions")
+				olapTx.Commit()
+				storeTx.Commit(txCtx)
+			}
+		}()

Then set txErr = err before each return.


167-168: Fixed sleep for data commit synchronization.

Same concern as noted in other files — polling or an event-based approach would be more robust than a fixed 2-second wait.

server/interfaces/http/routes/source/database/refresh.go (1)

213-214: Fixed sleep for data commit.

Same pattern as other files — consider more robust synchronization mechanisms.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 99c2a2a and 05d9d62.

📒 Files selected for processing (4)
  • server/interfaces/http/routes/source/database/create.go (4 hunks)
  • server/interfaces/http/routes/source/database/refresh.go (3 hunks)
  • server/interfaces/http/routes/source/s3/create.go (5 hunks)
  • server/interfaces/http/routes/source/s3/refresh.go (3 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: CR
Repo: factly/gopie PR: 0
File: chat-server/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:34.944Z
Learning: Applies to chat-server/app/api/v1/routers/**/*.py : Use Server-Sent Events for streaming real-time query progress responses
📚 Learning: 2025-12-02T11:48:34.944Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: chat-server/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:34.944Z
Learning: Applies to chat-server/app/api/v1/routers/**/*.py : Use Server-Sent Events for streaming real-time query progress responses

Applied to files:

  • server/interfaces/http/routes/source/database/refresh.go
  • server/interfaces/http/routes/source/database/create.go
  • server/interfaces/http/routes/source/s3/refresh.go
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to server/**/*.go : Integrate DuckDB for OLAP queries in the Go backend server

Applied to files:

  • server/interfaces/http/routes/source/database/refresh.go
  • server/interfaces/http/routes/source/database/create.go
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to server/**/*.go : Store dataset files in S3-compatible storage (MinIO) in the Go backend server

Applied to files:

  • server/interfaces/http/routes/source/database/refresh.go
  • server/interfaces/http/routes/source/s3/create.go
  • server/interfaces/http/routes/source/database/create.go
  • server/interfaces/http/routes/source/s3/refresh.go
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to server/**/*.go : Use PostgreSQL with SQLC for type-safe queries in the Go backend server

Applied to files:

  • server/interfaces/http/routes/source/database/create.go
🧬 Code graph analysis (4)
server/interfaces/http/routes/source/database/refresh.go (4)
server/interfaces/http/routes/source/database/create.go (2)
  • SSEData (25-28)
  • SSEEvent (18-22)
server/interfaces/http/routes/source/s3/create.go (2)
  • SSEData (25-28)
  • SSEEvent (18-22)
server/infrastructure/postgres/gen/database_sources.sql.go (1)
  • UpdateDatabaseSourceLastUpdatedAtParams (172-175)
server/domain/models/dataset.go (1)
  • UpdateDatasetParams (112-129)
server/interfaces/http/routes/source/s3/create.go (2)
server/interfaces/http/routes/source/database/create.go (2)
  • SSEData (25-28)
  • SSEEvent (18-22)
server/domain/models/dataset.go (1)
  • CreateDatasetParams (84-108)
server/interfaces/http/routes/source/database/create.go (3)
server/interfaces/http/routes/source/s3/create.go (2)
  • SSEData (25-28)
  • SSEEvent (18-22)
server/domain/pkg/utils.go (1)
  • RandomString (40-46)
server/domain/models/database_source.go (1)
  • CreateDatabaseSourceParams (17-29)
server/interfaces/http/routes/source/s3/refresh.go (4)
server/interfaces/http/routes/source/database/create.go (2)
  • SSEData (25-28)
  • SSEEvent (18-22)
server/interfaces/http/routes/source/s3/create.go (2)
  • SSEData (25-28)
  • SSEEvent (18-22)
server/domain/models/dataset.go (1)
  • UpdateDatasetParams (112-129)
server/infrastructure/postgres/gen/datasets.sql.go (1)
  • UpdateDatasetParams (356-367)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Analyze (go)
🔇 Additional comments (4)
server/interfaces/http/routes/source/database/create.go (1)

135-156: SSE helpers are well-structured.

The buffered channel and helper functions provide a clean abstraction for SSE event emission. The ignored json.Marshal errors (lines 145, 153) are acceptable since the structs are simple and unlikely to fail serialization.

server/interfaces/http/routes/source/s3/create.go (1)

157-293: Async processing flow looks correct.

The goroutine properly handles validation, ingestion, dataset creation, summary generation, and schema upload with appropriate cleanup on errors. The SSE events provide good progress visibility.

server/interfaces/http/routes/source/s3/refresh.go (1)

86-243: Refresh flow implementation is correct.

The async flow properly handles dataset validation, transaction management, OLAP operations, metrics update, summary regeneration, and schema upload. SSE events provide clear progress visibility.

server/interfaces/http/routes/source/database/refresh.go (1)

82-279: Overall refresh logic is well-structured.

The async flow handles project/dataset validation, driver detection, incremental/full refresh modes, metrics updates, and schema upload with appropriate SSE progress events. Error handling is comprehensive.

Comment on lines +345 to 353
summary, err := h.datasetSvc.CreateDatasetSummary(tableName, datasetSummary)
if err != nil {
h.logger.Error("Error creating dataset summary", zap.Error(err))
cleanup.hasSummary = true
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error creating dataset summary: %w", err))
return
}
cleanup.hasSummary = true
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Premature cleanup.hasSummary = true on line 348.

cleanup.hasSummary is set to true at line 348 before CreateDatasetSummary returns. If the call fails, cleanup will attempt to delete a summary that wasn't created. Move line 348's assignment after line 352.

 		summary, err := h.datasetSvc.CreateDatasetSummary(tableName, datasetSummary)
 		if err != nil {
 			h.logger.Error("Error creating dataset summary", zap.Error(err))
-			cleanup.hasSummary = true
 			h.cleanupResources(cleanup)
 			handleFailure(fmt.Errorf("error creating dataset summary: %w", err))
 			return
 		}
 		cleanup.hasSummary = true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
summary, err := h.datasetSvc.CreateDatasetSummary(tableName, datasetSummary)
if err != nil {
h.logger.Error("Error creating dataset summary", zap.Error(err))
cleanup.hasSummary = true
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error creating dataset summary: %w", err))
return
}
cleanup.hasSummary = true
summary, err := h.datasetSvc.CreateDatasetSummary(tableName, datasetSummary)
if err != nil {
h.logger.Error("Error creating dataset summary", zap.Error(err))
h.cleanupResources(cleanup)
handleFailure(fmt.Errorf("error creating dataset summary: %w", err))
return
}
cleanup.hasSummary = true
🤖 Prompt for AI Agents
In server/interfaces/http/routes/source/database/create.go around lines 345 to
353, the code sets cleanup.hasSummary = true before calling CreateDatasetSummary
which causes cleanup to attempt deleting a non-existent summary on failure; move
the assignment so it runs only after CreateDatasetSummary succeeds (i.e., remove
or relocate the line at 348 and place an equivalent cleanup.hasSummary = true
immediately after the success branch following line 352), ensuring the error
path does not mark the summary as created.

Comment on lines +135 to 139
if strings.HasPrefix(source.ConnectionString, "postgres") {
driver = "postgres"
} else {
driver = "mysql"
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find relevant files
fd -e go | grep -E "(database|source)" | head -20

Repository: factly/gopie

Length of output: 894


🏁 Script executed:

#!/bin/bash
# Look for DatabaseSource type definition
rg "type DatabaseSource" -A 15

Repository: factly/gopie

Length of output: 6540


🏁 Script executed:

#!/bin/bash
# Check the context around lines 135-139 in refresh.go
cat -n server/interfaces/http/routes/source/database/refresh.go | sed -n '125,150p'

Repository: factly/gopie

Length of output: 983


🏁 Script executed:

#!/bin/bash
# Look for CreateDatabaseSourceParams
rg "CreateDatabaseSourceParams" -A 10

Repository: factly/gopie

Length of output: 10199


🏁 Script executed:

#!/bin/bash
# Get full DatabaseSource model from domain
cat -n server/domain/models/database_source.go | head -30

Repository: factly/gopie

Length of output: 1779


🏁 Script executed:

#!/bin/bash
# Check if there's any mapping from database to domain model
rg "DatabaseSource" server/infrastructure/postgres/store/database_source/database_source.go -A 2 -B 2 | head -50

Repository: factly/gopie

Length of output: 1750


Add Driver field to the DatabaseSource domain model instead of inferring from connection string.

The driver inference via strings.HasPrefix(source.ConnectionString, "postgres") is fragile. While CreateDatabaseSourceParams includes a Driver field that is persisted to the database, the domain model DatabaseSource (in server/domain/models/database_source.go) is missing this field. This causes the retrieval layer to discard the stored driver value, forcing a workaround via string prefix checking.

Update the DatabaseSource struct to include the Driver field:

type DatabaseSource struct {
	ID               string `json:"id"`
	DatasetID        string `json:"dataset_id"`
	ConnectionString string `json:"connection_string"`
+	Driver           string `json:"driver"`
	OrganizationID   string `json:"organization_id"`
	SQLQuery         string `json:"sql_query"`
	TimestampColumn  string `json:"update_sql_query,omitempty"`
	CreatedAt        string `json:"created_at"`
	LastUpdatedAt    string `json:"last_updated_at"`
	UpdatedAt        string `json:"updated_at"`
}

Then ensure the mapping in server/infrastructure/postgres/store/database_source/database_source.go populates this field when constructing the domain model from the database row. Finally, replace the string prefix check with driver = source.Driver.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In server/interfaces/http/routes/source/database/refresh.go around lines
135-139, replace the fragile prefix-based driver detection with the stored
Driver value by using driver = source.Driver; to enable this, add a Driver
string field to the domain model in server/domain/models/database_source.go
(ensure JSON/db tags match existing patterns), update the row->domain mapping in
server/infrastructure/postgres/store/database_source/database_source.go to
populate the new Driver field from the DB row, and then remove the
strings.HasPrefix(...) branch so the route uses source.Driver directly.

Comment on lines +201 to +204
err = h.dbSourceSvc.Update(ctx.Context(), models.UpdateDatabaseSourceLastUpdatedAtParams{
ID: source.ID,
LastUpdatedAt: *lastUpdatedAt,
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Using ctx.Context() inside goroutine after handler returns is unsafe.

The Fiber context (ctx) may be recycled after the handler returns, making ctx.Context() potentially invalid. This could cause panics or data races when the goroutine calls h.dbSourceSvc.Update(ctx.Context(), ...).

Capture the context before starting the goroutine:

+	// Capture context before starting goroutine
+	reqCtx := ctx.Context()
+
 	// Start async refresh process
 	go func() {
 		defer close(sseChan)

Then use reqCtx instead of ctx.Context():

-				err = h.dbSourceSvc.Update(ctx.Context(), models.UpdateDatabaseSourceLastUpdatedAtParams{
+				err = h.dbSourceSvc.Update(reqCtx, models.UpdateDatabaseSourceLastUpdatedAtParams{

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In server/interfaces/http/routes/source/database/refresh.go around lines
201-204: capture the request context into a local variable (e.g., reqCtx :=
ctx.Context()) and any loop/closure variables you need (e.g., id := source.ID;
lu := *lastUpdatedAt) before starting the goroutine, then inside the goroutine
call h.dbSourceSvc.Update(reqCtx,
models.UpdateDatabaseSourceLastUpdatedAtParams{ID: id, LastUpdatedAt: lu})
(replace all uses of ctx.Context() and any shared loop variables with the
captured locals).

Comment on lines +248 to +255
sendEvent("status_update", "Saving dataset summary...", nil)

return ctx.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
"message": "Error creating dataset summary",
"code": fiber.StatusInternalServerError,
})
}
summary, err := h.datasetSvc.CreateDatasetSummary(tableName, datasetSummary)
if err != nil {
h.logger.Error("Error creating dataset summary", zap.Error(err))
handleFailure(fmt.Errorf("error creating dataset summary: %w", err))
return
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find CreateDatasetSummary implementation
rg -nA 15 'func.*CreateDatasetSummary' --type go

Repository: factly/gopie

Length of output: 4677


🏁 Script executed:

#!/bin/bash
# Also check the s3/refresh.go pattern for comparison
rg -nB 5 -A 10 'DeleteSummaryWithTx' server/interfaces/http/routes/source/s3/refresh.go

Repository: factly/gopie

Length of output: 618


🏁 Script executed:

#!/bin/bash
# Check if there's any upsert or replace logic in CreateDatasetSummary
rg -n 'func.*CreateDatasetSummary|DELETE|INSERT|UPSERT|UPDATE' --type go | head -50

Repository: factly/gopie

Length of output: 2000


🏁 Script executed:

#!/bin/bash
# Find the SQL file defining createDatasetSummary
find . -name "*.sql" -type f -exec rg -l 'createDatasetSummary|dataset_summary' {} \;

Repository: factly/gopie

Length of output: 247


🏁 Script executed:

#!/bin/bash
# Check the SQL queries file for the CREATE statement
rg -B 5 'INSERT INTO dataset_summary' --type sql

Repository: factly/gopie

Length of output: 38


🏁 Script executed:

#!/bin/bash
# Look at the constraint definitions on dataset_summary table
find . -name "*.sql" -type f | head -20

Repository: factly/gopie

Length of output: 1740


🏁 Script executed:

#!/bin/bash
# Check the SQL query definition
cat server/infrastructure/postgres/sql/queries/dataset_summary.sql

Repository: factly/gopie

Length of output: 357


🏁 Script executed:

#!/bin/bash
# Check the migration to see schema and constraints
cat server/infrastructure/postgres/migrations/20250501023944_dataset_summary.sql

Repository: factly/gopie

Length of output: 369


Delete the old summary before creating a new one.

The CreateDatasetSummary operation is a plain INSERT statement without upsert logic, and the schema has no unique constraint on dataset_name. Calling this without first deleting the existing summary will create duplicate records. Match the pattern used in s3/refresh.go, which calls DeleteSummaryWithTx before creating the new summary:

+		err = h.datasetSvc.DeleteDatasetSummary(tableName)
+		if err != nil {
+			h.logger.Error("Error deleting existing dataset summary", zap.Error(err))
+			handleFailure(fmt.Errorf("error deleting existing dataset summary: %w", err))
+			return
+		}
+
 		sendEvent("status_update", "Saving dataset summary...", nil)
 
 		summary, err := h.datasetSvc.CreateDatasetSummary(tableName, datasetSummary)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In server/interfaces/http/routes/source/database/refresh.go around lines 248 to
255, the code calls CreateDatasetSummary which performs a plain INSERT and will
produce duplicate rows because there is no unique constraint; before calling
CreateDatasetSummary, delete the existing summary using the same transactional
pattern as s3/refresh.go (call DeleteSummaryWithTx on the current tx and handle
its error), then proceed to CreateDatasetSummary within the same transaction so
the delete+insert are atomic; ensure errors from DeleteSummaryWithTx are logged
and routed to handleFailure and that you only call CreateDatasetSummary if the
delete succeeds.

// Start async upload process
go func() {
defer close(sseChan)
ctxBg := context.Background()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use request context instead of context.Background().

Using context.Background() loses the request's cancellation signal, deadlines, and tracing context. Pass ctx.Context() (or a derived context) to preserve these capabilities and enable cancellation on client disconnect.

-		ctxBg := context.Background()
+		reqCtx := ctx.Context()

Then use reqCtx for IngestS3File and other context-aware calls.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In server/interfaces/http/routes/source/s3/create.go around line 160, replace
the use of context.Background() with the incoming request context (e.g., reqCtx
:= req.Context() or a derived context) so the request's cancellation, deadlines,
and tracing are preserved; then pass reqCtx to IngestS3File and any other
context-aware calls instead of ctxBg.

@claude
Copy link
Copy Markdown

claude Bot commented Dec 10, 2025

Claude finished @elliot14A's task —— View job


PR Review: Converting Upload/Refresh to SSE Endpoints

Review Tasks

  • Analyze modified Go files for SSE implementation
  • Check code quality and best practices
  • Identify potential bugs or issues
  • Review performance considerations
  • Examine security concerns
  • Assess test coverage needs

🔍 Code Review Summary

This PR successfully converts upload and refresh endpoints to Server-Sent Events (SSE), providing real-time progress updates for long-running operations. The implementation follows good patterns but has several areas for improvement.

Strengths

  1. Consistent SSE Implementation: All 4 files follow a similar pattern with proper SSE headers and streaming
  2. Good Error Handling: Comprehensive error handling with proper cleanup mechanisms
  3. Resource Cleanup: Well-implemented cleanup functions to handle partial failures
  4. Progress Tracking: Clear status updates throughout the process
  5. Transaction Management: Proper transaction handling in S3 refresh endpoint

🚨 Critical Issues

1. Goroutine Leak Risk (High Priority)

Files: All 4 files (create.go:159, refresh.go:83, etc.)

The SSE implementation starts goroutines that could leak if the client disconnects:

go func() {
    defer close(sseChan)
    // Long-running operations...
}()

Issue: No context cancellation when client disconnects
Impact: Memory leaks and resource exhaustion
Fix: Use ctx.Context() with context.WithCancel and monitor for client disconnection

2. Inconsistent User ID Extraction (High Priority)

Files:

  • s3/create.go:102 - Uses header: ctx.Get(middleware.UserCtxKey)
  • database/create.go:113 - Uses context: ctx.Locals(middleware.UserCtxKey)
  • s3/refresh.go:61 - Uses context: ctx.Locals(middleware.UserCtxKey)

Issue: Inconsistent user ID extraction methods
Impact: Potential authentication/authorization issues
Fix: Standardize on one approach (likely ctx.Locals)

3. Ignored JSON Marshal Errors (Medium Priority)

Files: All files (e.g., database/create.go:145)

payloadBytes, _ := json.Marshal(eventPayload)

Issue: Ignored errors could cause malformed SSE messages
Impact: Client-side parsing errors
Fix: Handle JSON marshal errors properly

🔧 Other Issues

4. Hardcoded Sleep Statements

Files: Multiple files (e.g., database/create.go:200, s3/refresh.go:168)

time.Sleep(2 * time.Second) // Wait for table creation

Issue: Arbitrary delays, not deterministic
Impact: Slower performance, potential race conditions
Fix: Use proper synchronization or polling mechanisms

5. Missing Input Validation

Files: s3/create.go:100

Issue: Organization ID validation happens after getting from header instead of middleware
Impact: Security risk if header manipulation occurs
Fix: Use middleware validation consistently

6. Resource Cleanup Race Conditions

Files: Database files (database/create.go:44)

Issue: Cleanup operations aren't synchronized
Impact: Potential partial cleanup failures
Fix: Add proper error handling for cleanup operations

7. Large Channel Buffer

Files: All files (sseChan := make(chan SSEData, 10))

Issue: Fixed buffer size might not be optimal
Impact: Potential blocking or memory usage
Fix: Consider unbuffered channel or dynamic sizing

🛡️ Security Concerns

  1. SQL Injection Prevention: Good use of parameterized queries through service layer
  2. Input Validation: Proper validation using struct tags
  3. Error Message Exposure: Consider sanitizing error messages sent to clients
  4. Resource Access Control: Organization ID validation needs improvement

🚀 Performance Considerations

  1. Connection Pooling: Ensure database connections are properly managed
  2. Memory Usage: SSE channels and long-running goroutines could accumulate
  3. Client Timeouts: No client timeout handling for long operations

📋 Test Coverage

Issue: No test files found for these endpoints
Recommendation: Add unit tests for:

  • SSE event formatting
  • Error handling scenarios
  • Resource cleanup logic
  • Timeout scenarios

📝 Recommendations

Immediate Actions:

  1. Fix goroutine leaks with proper context cancellation
  2. Standardize user ID extraction method
  3. Handle JSON marshal errors

Future Improvements:

  1. Add comprehensive test coverage
  2. Implement client timeout handling
  3. Consider using a more robust SSE library
  4. Add metrics/monitoring for long-running operations

Overall Assessment: The SSE implementation is functional and well-structured, but requires fixes for production readiness, particularly around resource management and error handling.


Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
web/src/components/dataset/file-refresh-wizard.tsx (1)

58-67: Props interface includes datasetId but component destructures and shadows it from useParams.

The FileRefreshWizardProps interface declares datasetId: string, but the component only destructures projectId and onRefreshComplete from props (line 66-67), then immediately shadows datasetId by extracting it from useParams() (line 70-73). This creates confusion about which source of datasetId is authoritative.

Either remove datasetId from props if it should always come from route params, or use the prop value instead of extracting from params:

 export interface FileRefreshWizardProps {
   projectId: string;
-  datasetId: string;
   onRefreshComplete: () => void;
 }
web/src/components/dataset/dataset-upload-wizard.tsx (1)

138-138: Remove unused mutation.

The sourceDataset mutation appears to be unused after the SSE migration. Since startUpload (from useSourceDatasetSSE) is now used in handleCreateDataset (line 449), this declaration should be removed.

-  const sourceDataset = useSourceDataset();
🧹 Nitpick comments (6)
web/src/lib/mutations/dataset/refresh-dataset-database.ts (1)

66-93: Consider adding explicit return type annotation for better type safety.

The hook works correctly, but the return type is implicitly Promise<any>. Adding an explicit return type would improve developer experience and catch type errors earlier.

-export const useRefreshDatabaseDatasetSSE = () => {
+export const useRefreshDatabaseDatasetSSE = (): ((params: {
+  projectId: string;
+  datasetName: string;
+  refreshType: "full" | "incremental";
+  onProgress: (event: SSEEvent) => void;
+}) => Promise<RefreshDatabaseDatasetResponse>) => {
   const { accessToken } = useAuth();
web/src/lib/sse-client.ts (1)

21-36: Minor: Inconsistent indentation at line 22.

The const isAuthEnabled declaration has inconsistent leading whitespace compared to surrounding code.

   // Add Auth & Org Headers (Reusing logic from api-client)
-const isAuthEnabled = String(process.env.NEXT_PUBLIC_ENABLE_AUTH).trim() === "true";
+  const isAuthEnabled = String(process.env.NEXT_PUBLIC_ENABLE_AUTH).trim() === "true";
   const orgId = getGlobalOrganizationId();
web/src/lib/mutations/dataset/refresh-dataset.ts (1)

3-5: Redundant import: SSEEvent is imported twice.

SSEEvent is imported on line 3 via fetchWithSSE and again separately on line 5. Consolidate to a single import statement.

-import { fetchWithSSE } from "@/lib/sse-client";
-import { useAuth } from "@/hooks/use-auth";
-import { SSEEvent } from "@/lib/sse-client";
+import { fetchWithSSE, SSEEvent } from "@/lib/sse-client";
+import { useAuth } from "@/hooks/use-auth";
web/src/components/dataset/file-refresh-wizard.tsx (1)

482-497: Minor: Unnecessary fragment wrapper around RefreshFileUploader.

The curly braces { ... } around RefreshFileUploader create an unnecessary expression context. This is harmless but can be simplified.

-              {
-                <RefreshFileUploader
-                  ref={uploaderRef}
-                  existingSchema={existingSchemaData?.schema as ColumnInfo[]}
-                  onValidationStateChange={handleValidationStateChange}
-                  onUploadSuccess={handleUploadComplete}
-                  onUploadError={(msg) => {
-                    // Update toast if active, otherwise just set error
-                    toast.error("Upload Failed", {
-                      id: REFRESH_TOAST_ID,
-                      description: msg,
-                    });
-                    setApiError(msg);
-                  }}
-                />
-              }
+              <RefreshFileUploader
+                ref={uploaderRef}
+                existingSchema={existingSchemaData?.schema as ColumnInfo[]}
+                onValidationStateChange={handleValidationStateChange}
+                onUploadSuccess={handleUploadComplete}
+                onUploadError={(msg) => {
+                  toast.error("Upload Failed", {
+                    id: REFRESH_TOAST_ID,
+                    description: msg,
+                  });
+                  setApiError(msg);
+                }}
+              />
web/src/components/dataset/dataset-header.tsx (1)

147-147: Consider renaming the sonner import instead of the hook result.

Aliasing toast to toasty works but is less intuitive. Consider renaming the sonner import for clarity:

-import { toast } from "sonner"; 
+import { toast as sonnerToast } from "sonner"; 
 ...
-  const { toast : toasty } = useToast();
+  const { toast } = useToast();

Then use sonnerToast for the SSE progress toasts and toast for the standard notifications. This keeps the hook's natural API intact.

web/src/lib/mutations/dataset/source-dataset.ts (1)

93-94: Remove the inline comment.

The comment // Ensure path matches your API router prefix appears to be a development note that should be removed before merging.

     return await fetchWithSSE(
-      '/source/s3/upload', // Ensure path matches your API router prefix
+      '/source/s3/upload',
       {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 05d9d62 and be7a2c9.

📒 Files selected for processing (11)
  • web/src/components/dataset/csv-validation-uppy.tsx (0 hunks)
  • web/src/components/dataset/database-source-form.tsx (6 hunks)
  • web/src/components/dataset/dataset-header.tsx (12 hunks)
  • web/src/components/dataset/dataset-upload-wizard.tsx (8 hunks)
  • web/src/components/dataset/file-refresh-wizard.tsx (10 hunks)
  • web/src/components/dataset/unified-uploader.tsx (0 hunks)
  • web/src/lib/mutations/dataset/refresh-dataset-database.ts (2 hunks)
  • web/src/lib/mutations/dataset/refresh-dataset.ts (2 hunks)
  • web/src/lib/mutations/dataset/source-database-dataset.ts (2 hunks)
  • web/src/lib/mutations/dataset/source-dataset.ts (2 hunks)
  • web/src/lib/sse-client.ts (1 hunks)
💤 Files with no reviewable changes (2)
  • web/src/components/dataset/csv-validation-uppy.tsx
  • web/src/components/dataset/unified-uploader.tsx
🧰 Additional context used
📓 Path-based instructions (4)
web/src/**/*.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

web/src/**/*.{ts,tsx}: Use Next.js 15 with App Router and React 19 for the web frontend framework
Use TypeScript as the language for the web frontend
Generate and use types from API responses for type safety
Use dynamic imports and code splitting for optimal performance
Load DuckDB WASM on-demand and use web workers for client-side SQL execution

Files:

  • web/src/lib/mutations/dataset/refresh-dataset-database.ts
  • web/src/lib/mutations/dataset/refresh-dataset.ts
  • web/src/components/dataset/dataset-header.tsx
  • web/src/lib/sse-client.ts
  • web/src/lib/mutations/dataset/source-dataset.ts
  • web/src/components/dataset/file-refresh-wizard.tsx
  • web/src/components/dataset/database-source-form.tsx
  • web/src/components/dataset/dataset-upload-wizard.tsx
  • web/src/lib/mutations/dataset/source-database-dataset.ts
web/src/lib/{queries,mutations}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

Use React Query (TanStack Query) for server state management

Files:

  • web/src/lib/mutations/dataset/refresh-dataset-database.ts
  • web/src/lib/mutations/dataset/refresh-dataset.ts
  • web/src/lib/mutations/dataset/source-dataset.ts
  • web/src/lib/mutations/dataset/source-database-dataset.ts
web/src/components/**/*.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

web/src/components/**/*.{ts,tsx}: Use TailwindCSS with Radix UI components for styling instead of custom CSS
Use React Hook Form with Zod validation for form handling
Use pre-configured queries and mutations from lib/queries and lib/mutations for API calls
Wrap features in error boundaries to handle errors gracefully
Always handle loading and error states in UI components
Use Next.js Image component for image optimization

Files:

  • web/src/components/dataset/dataset-header.tsx
  • web/src/components/dataset/file-refresh-wizard.tsx
  • web/src/components/dataset/database-source-form.tsx
  • web/src/components/dataset/dataset-upload-wizard.tsx
web/src/components/dataset/**upload**.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

Use Uppy for dataset file uploads in upload components

Files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
🧠 Learnings (10)
📓 Common learnings
Learnt from: CR
Repo: factly/gopie PR: 0
File: chat-server/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:34.944Z
Learning: Applies to chat-server/app/api/v1/routers/**/*.py : Use Server-Sent Events for streaming real-time query progress responses
📚 Learning: 2025-12-02T11:48:50.398Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.398Z
Learning: Applies to web/src/components/dataset/**upload**.{ts,tsx} : Use Uppy for dataset file uploads in upload components

Applied to files:

  • web/src/lib/mutations/dataset/refresh-dataset-database.ts
  • web/src/lib/mutations/dataset/refresh-dataset.ts
  • web/src/components/dataset/dataset-header.tsx
  • web/src/lib/mutations/dataset/source-dataset.ts
  • web/src/components/dataset/file-refresh-wizard.tsx
  • web/src/components/dataset/database-source-form.tsx
  • web/src/components/dataset/dataset-upload-wizard.tsx
  • web/src/lib/mutations/dataset/source-database-dataset.ts
📚 Learning: 2025-12-02T11:48:50.398Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.398Z
Learning: Applies to web/src/components/**/*.{ts,tsx} : Use pre-configured queries and mutations from lib/queries and lib/mutations for API calls

Applied to files:

  • web/src/lib/mutations/dataset/refresh-dataset-database.ts
  • web/src/lib/mutations/dataset/refresh-dataset.ts
  • web/src/components/dataset/dataset-header.tsx
  • web/src/lib/mutations/dataset/source-dataset.ts
  • web/src/components/dataset/file-refresh-wizard.tsx
  • web/src/components/dataset/dataset-upload-wizard.tsx
  • web/src/lib/mutations/dataset/source-database-dataset.ts
📚 Learning: 2025-12-02T11:48:50.398Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.398Z
Learning: Applies to web/src/lib/{queries,mutations}/**/*.{ts,tsx} : Use React Query (TanStack Query) for server state management

Applied to files:

  • web/src/lib/mutations/dataset/refresh-dataset-database.ts
  • web/src/lib/mutations/dataset/refresh-dataset.ts
  • web/src/lib/mutations/dataset/source-dataset.ts
  • web/src/lib/mutations/dataset/source-database-dataset.ts
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to web/**/*.{js,jsx,ts,tsx} : Integrate DuckDB WASM for client-side SQL execution in the Next.js frontend

Applied to files:

  • web/src/lib/mutations/dataset/refresh-dataset-database.ts
  • web/src/components/dataset/dataset-header.tsx
  • web/src/components/dataset/dataset-upload-wizard.tsx
  • web/src/lib/mutations/dataset/source-database-dataset.ts
📚 Learning: 2025-12-02T11:48:50.398Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.398Z
Learning: Applies to web/src/components/dataset/sql-editor.{ts,tsx} : Use Monaco Editor for SQL editing with syntax highlighting in SQL editor components

Applied to files:

  • web/src/components/dataset/dataset-header.tsx
  • web/src/components/dataset/dataset-upload-wizard.tsx
  • web/src/lib/mutations/dataset/source-database-dataset.ts
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to web/**/*.{js,jsx,ts,tsx} : Use Monaco Editor for SQL editing in the Next.js frontend

Applied to files:

  • web/src/components/dataset/dataset-header.tsx
📚 Learning: 2025-12-02T11:48:50.398Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.398Z
Learning: Applies to web/src/**/*.{ts,tsx} : Load DuckDB WASM on-demand and use web workers for client-side SQL execution

Applied to files:

  • web/src/components/dataset/dataset-header.tsx
  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-12-02T11:48:50.398Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.398Z
Learning: Applies to web/src/lib/stores/**/*.{ts,tsx} : Use Zustand for global state management (auth, chat, SQL, visualization stores)

Applied to files:

  • web/src/components/dataset/dataset-header.tsx
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to web/**/*.{js,jsx,ts,tsx} : Use Zustand for global state management and React Query for server state in the Next.js frontend

Applied to files:

  • web/src/components/dataset/file-refresh-wizard.tsx
🧬 Code graph analysis (7)
web/src/lib/mutations/dataset/refresh-dataset-database.ts (1)
web/src/lib/sse-client.ts (1)
  • fetchWithSSE (10-90)
web/src/lib/mutations/dataset/refresh-dataset.ts (1)
web/src/lib/sse-client.ts (1)
  • fetchWithSSE (10-90)
web/src/components/dataset/dataset-header.tsx (2)
web/src/lib/mutations/dataset/refresh-dataset-database.ts (1)
  • useRefreshDatabaseDatasetSSE (66-93)
web/src/lib/queries/dataset/check-timestamp-column.ts (1)
  • useCheckTimestampColumn (16-42)
web/src/lib/mutations/dataset/source-dataset.ts (1)
web/src/lib/sse-client.ts (1)
  • fetchWithSSE (10-90)
web/src/components/dataset/file-refresh-wizard.tsx (1)
web/src/components/dataset/refresh-file-uploader.tsx (1)
  • RefreshFileUploader (64-402)
web/src/components/dataset/database-source-form.tsx (1)
web/src/lib/mutations/dataset/source-database-dataset.ts (1)
  • useSourceDatabaseDatasetSSE (17-41)
web/src/components/dataset/dataset-upload-wizard.tsx (1)
web/src/lib/mutations/dataset/source-dataset.ts (1)
  • useSourceDatasetSSE (59-103)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Analyze (go)
🔇 Additional comments (18)
web/src/lib/mutations/dataset/refresh-dataset-database.ts (1)

3-4: LGTM!

Imports for SSE client utilities and auth hook are correctly added to support the new SSE-based mutation.

web/src/lib/sse-client.ts (1)

3-8: LGTM!

The SSEEvent interface is well-defined with appropriate event types that match the server-side SSE implementation.

web/src/lib/mutations/dataset/refresh-dataset.ts (1)

73-106: LGTM!

The SSE hook implementation is correct and follows the same pattern as useRefreshDatabaseDatasetSSE. The nullish coalescing for ignoreErrors (line 98) properly defaults to true when undefined.

web/src/components/dataset/file-refresh-wizard.tsx (3)

195-230: LGTM!

The SSE progress flow is well-implemented with a persistent toast ID that bridges the upload and refresh phases. The progress incrementing (capped at 90%) provides smooth UX feedback even when exact progress isn't available from the server.


271-291: LGTM!

The upload completion handler correctly bridges the S3 upload phase to the SSE refresh phase, maintaining toast continuity with matching progress values.


293-322: LGTM!

Toast initialization with duration: Infinity is appropriate for the persistent progress flow. Error handling correctly maintains toast continuity.

web/src/components/dataset/dataset-header.tsx (4)

56-60: LGTM!

New imports for SSE-based refresh functionality are correctly added, including Progress component for visual feedback and SSE types for type safety.


182-195: LGTM!

State and hook setup for SSE-based refresh is well-structured. The strict equality check hasTimestampColumn === true correctly handles loading and error states by defaulting to disallowing incremental refresh.


313-405: LGTM!

The dual-path refresh handler (dialog vs toast) is well-implemented. Using Promise.all for query invalidation improves efficiency. The error handling correctly cleans up state for both UI paths.


969-1056: LGTM!

The dialog implementation handles the in-progress state well:

  • Prevents accidental closing during refresh via the onOpenChange guard
  • Dynamically switches between progress view and action buttons
  • Provides clear feedback with "Please wait..." when refresh is in progress
web/src/lib/mutations/dataset/source-dataset.ts (1)

59-103: LGTM! SSE-based hook for streaming progress.

The implementation correctly uses fetchWithSSE with authentication and progress callbacks, aligning with the project's SSE pattern for real-time progress updates. Based on learnings, this approach is appropriate for streaming real-time query progress responses.

web/src/lib/mutations/dataset/source-database-dataset.ts (1)

17-41: LGTM! Consistent SSE implementation.

The hook follows the same pattern as useSourceDatasetSSE, correctly separating the onProgress callback from the request body and passing authentication via the SSE client.

web/src/components/dataset/database-source-form.tsx (3)

85-92: Closure variable works correctly here.

The currentProgress variable is captured by closure but since it's updated synchronously on each callback invocation before setUploadProgress, this pattern works as intended. The progress caps at 90% to leave room for the final completion state.


114-119: Good error recovery pattern.

Resetting uploadProgress to null on error correctly returns the user to the form UI, allowing them to retry without reopening the dialog. The inline comment explains the intentional choice to not call onError to keep the dialog open.


144-164: Well-structured progress UI.

The progress UI provides good user feedback with a spinner, descriptive message, percentage, and progress bar. The conditional rendering cleanly separates the progress and form states.

web/src/components/dataset/dataset-upload-wizard.tsx (3)

458-475: SSE progress callback correctly updates the toast.

The callback properly increments progress (capped at 90%), updates the toast message from SSE events, and reuses the same toastId for a smooth single-toast experience throughout the upload flow.


248-270: Good toast lifecycle management.

Using a single toastId for the entire dataset creation flow provides a clean UX where the toast transforms through states (loading → progress updates → success/error) rather than spawning multiple toasts.


478-480: Response structure is correct.

The response from fetchWithSSE for the /source/s3/upload endpoint returns event.data from the 'complete' event, which has the structure { dataset, summary } (flat structure, not nested in a data wrapper). Accessing res?.dataset?.id is correct and aligns with the backend implementation in server/interfaces/http/routes/source/s3/create.go line 289.

Comment on lines +1285 to +1299
{/* {uploadProgress.status === 'processing' && (
<div className="space-y-3 p-4 border rounded-lg bg-secondary/10">
<div className="flex items-center justify-between text-sm">
<span className="font-medium flex items-center gap-2">
<Loader2 className="h-4 w-4 animate-spin text-primary" />
{uploadProgress.message}
</span>
<span className="text-muted-foreground">{uploadProgress.percentage}%</span>
</div>
<Progress value={uploadProgress.percentage} className="h-2" />
<p className="text-xs text-muted-foreground">
Processing your data. Large datasets may take a moment.
</p>
</div>
)} */}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Remove commented-out code.

This commented-out progress UI block should be removed. It appears to be leftover from development and adds noise to the codebase.

-              {/* {uploadProgress.status === 'processing' && (
-      <div className="space-y-3 p-4 border rounded-lg bg-secondary/10">
-        <div className="flex items-center justify-between text-sm">
-          <span className="font-medium flex items-center gap-2">
-            <Loader2 className="h-4 w-4 animate-spin text-primary" />
-            {uploadProgress.message}
-          </span>
-          <span className="text-muted-foreground">{uploadProgress.percentage}%</span>
-        </div>
-        <Progress value={uploadProgress.percentage} className="h-2" />
-        <p className="text-xs text-muted-foreground">
-          Processing your data. Large datasets may take a moment.
-        </p>
-      </div>
-    )} */}
-                           <Button
+              <Button

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In web/src/components/dataset/dataset-upload-wizard.tsx around lines 1285 to
1299, there is a commented-out progress UI block; remove the entire commented
block (from the opening {/* to the closing */}) so no commented JSX remains,
ensure no stray braces or trailing commas are left behind and run project
lint/format to keep file formatting consistent.

Comment thread web/src/lib/sse-client.ts
Comment on lines +54 to +87
while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n');

for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const jsonStr = line.slice(6);
if (!jsonStr.trim()) continue;

const event = JSON.parse(jsonStr);

// Map backend event types to frontend expected types if necessary
// Server sends: "status_update", "complete", "error"
onEvent(event);

if (event.type === 'complete') {
resultData = event.data;
}
if (event.type === 'error') {
throw new Error(event.message);
}
} catch (e) {
if (e instanceof Error && e.message !== "Unexpected end of JSON input") {
console.error("SSE Parse Error", e);
}
if (line.includes('"type":"error"')) throw e; // Re-throw actual errors
}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

SSE messages split across chunks may be lost.

The current implementation splits each chunk by \n and parses lines individually. However, SSE messages can be split across TCP chunks. If a data: line is partially received in one chunk and completed in the next, the partial JSON will fail to parse and be silently dropped (or logged), and the continuation in the next chunk won't form a valid line either.

Consider buffering incomplete lines across chunks:

   const reader = response.body.getReader();
   const decoder = new TextDecoder();
   let resultData: T | undefined;
+  let buffer = '';

   while (true) {
     const { done, value } = await reader.read();
     if (done) break;

-    const chunk = decoder.decode(value, { stream: true });
-    const lines = chunk.split('\n');
+    buffer += decoder.decode(value, { stream: true });
+    const lines = buffer.split('\n');
+    // Keep the last potentially incomplete line in the buffer
+    buffer = lines.pop() || '';

     for (const line of lines) {
       if (line.startsWith('data: ')) {

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In web/src/lib/sse-client.ts around lines 54 to 87, SSE event data can be split
across underlying chunks and the current per-chunk split-by-newline parsing
drops or misparses partial messages; fix by maintaining a persistent buffer
string (e.g., "remainder") outside the loop, prepend it to each decoded chunk,
then split into lines, process all complete lines, and keep the last fragment
(if the chunk does not end with a newline) in remainder to be prepended to the
next chunk; ensure you only attempt JSON.parse on fully-formed data lines and
handle the remainder at stream end.

Comment thread web/src/lib/sse-client.ts
}
}

return resultData as T;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential issue: resultData may be undefined if no 'complete' event is received.

If the SSE stream ends without emitting a complete event, resultData will be undefined but cast to T, which could cause runtime errors in consumers expecting a valid result.

Consider adding validation:

-  return resultData as T;
+  if (resultData === undefined) {
+    throw new Error('SSE stream ended without a complete event');
+  }
+  return resultData;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return resultData as T;
if (resultData === undefined) {
throw new Error('SSE stream ended without a complete event');
}
return resultData;
🤖 Prompt for AI Agents
In web/src/lib/sse-client.ts around line 89, the function unconditionally
returns resultData as T but resultData can be undefined if no 'complete' event
was received; add validation after stream ends to check if resultData is defined
and, if not, throw a descriptive error or reject the Promise (or return an
explicit default/optional type) so callers don't get an unsafe undefined cast;
ensure the function signature and callers reflect the chosen behavior
(throw/reject or optional return) and include a clear error message indicating
the SSE stream ended without a complete event.

@claude
Copy link
Copy Markdown

claude Bot commented Dec 11, 2025

Claude finished @elliot14A's task —— View job


PR Review: Converting Upload/Refresh to SSE Endpoints

Review Tasks

  • Analyze modified Go files for SSE implementation
  • Check code quality and best practices
  • Identify potential bugs or issues
  • Review performance considerations
  • Examine security concerns
  • Assess test coverage needs

🔍 Code Review Summary

This PR successfully converts upload and refresh endpoints to Server-Sent Events (SSE), providing real-time progress updates for long-running operations. The implementation follows good patterns but has several areas for improvement.

Strengths

  1. Consistent SSE Implementation: All 4 files follow a similar pattern with proper SSE headers and streaming
  2. Good Error Handling: Comprehensive error handling with proper cleanup mechanisms
  3. Resource Cleanup: Well-implemented cleanup functions to handle partial failures
  4. Progress Tracking: Clear status updates throughout the process
  5. Transaction Management: Proper transaction handling in S3 refresh endpoint

🚨 Critical Issues

1. Goroutine Leak Risk (High Priority)

Files: All 4 files (create.go:159, refresh.go:83, etc.)

The SSE implementation starts goroutines that could leak if the client disconnects:

go func() {
    defer close(sseChan)
    // Long-running operations...
}()

Issue: No context cancellation when client disconnects
Impact: Memory leaks and resource exhaustion
Fix: Use ctx.Context() with context.WithCancel and monitor for client disconnection

2. Inconsistent User ID Extraction (High Priority)

Files:

  • s3/create.go:102 - Uses header: ctx.Get(middleware.UserCtxKey)
  • database/create.go:113 - Uses context: ctx.Locals(middleware.UserCtxKey)
  • s3/refresh.go:61 - Uses context: ctx.Locals(middleware.UserCtxKey)

Issue: Inconsistent user ID extraction methods
Impact: Potential authentication/authorization issues
Fix: Standardize on one approach (likely ctx.Locals)

3. Ignored JSON Marshal Errors (Medium Priority)

Files: All files (e.g., database/create.go:145)

payloadBytes, _ := json.Marshal(eventPayload)

Issue: Ignored errors could cause malformed SSE messages
Impact: Client-side parsing errors
Fix: Handle JSON marshal errors properly

🔧 Other Issues

4. Hardcoded Sleep Statements

Files: Multiple files (e.g., database/create.go:200, s3/refresh.go:168)

time.Sleep(2 * time.Second) // Wait for table creation

Issue: Arbitrary delays, not deterministic
Impact: Slower performance, potential race conditions
Fix: Use proper synchronization or polling mechanisms

5. Missing Input Validation

Files: s3/create.go:100

Issue: Organization ID validation happens after getting from header instead of middleware
Impact: Security risk if header manipulation occurs
Fix: Use middleware validation consistently

6. Resource Cleanup Race Conditions

Files: Database files (database/create.go:44)

Issue: Cleanup operations aren't synchronized
Impact: Potential partial cleanup failures
Fix: Add proper error handling for cleanup operations

7. Large Channel Buffer

Files: All files (sseChan := make(chan SSEData, 10))

Issue: Fixed buffer size might not be optimal
Impact: Potential blocking or memory usage
Fix: Consider unbuffered channel or dynamic sizing

🛡️ Security Concerns

  1. SQL Injection Prevention: Good use of parameterized queries through service layer
  2. Input Validation: Proper validation using struct tags
  3. Error Message Exposure: Consider sanitizing error messages sent to clients
  4. Resource Access Control: Organization ID validation needs improvement

🚀 Performance Considerations

  1. Connection Pooling: Ensure database connections are properly managed
  2. Memory Usage: SSE channels and long-running goroutines could accumulate
  3. Client Timeouts: No client timeout handling for long operations

📋 Test Coverage

Issue: No test files found for these endpoints
Recommendation: Add unit tests for:

  • SSE event formatting
  • Error handling scenarios
  • Resource cleanup logic
  • Timeout scenarios

📝 Recommendations

Immediate Actions:

  1. Fix goroutine leaks with proper context cancellation
  2. Standardize user ID extraction method
  3. Handle JSON marshal errors

Future Improvements:

  1. Add comprehensive test coverage
  2. Implement client timeout handling
  3. Consider using a more robust SSE library
  4. Add metrics/monitoring for long-running operations

Overall Assessment: The SSE implementation is functional and well-structured, but requires fixes for production readiness, particularly around resource management and error handling.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
web/src/components/dataset/dataset-upload-wizard.tsx (1)

1283-1298: Remove commented-out code.

This commented-out progress UI block should be removed as it adds noise to the codebase.

-              {/* {uploadProgress.status === 'processing' && (
-      <div className="space-y-3 p-4 border rounded-lg bg-secondary/10">
-        <div className="flex items-center justify-between text-sm">
-          <span className="font-medium flex items-center gap-2">
-            <Loader2 className="h-4 w-4 animate-spin text-primary" />
-            {uploadProgress.message}
-          </span>
-          <span className="text-muted-foreground">{uploadProgress.percentage}%</span>
-        </div>
-        <Progress value={uploadProgress.percentage} className="h-2" />
-        <p className="text-xs text-muted-foreground">
-          Processing your data. Large datasets may take a moment.
-        </p>
-      </div>
-    )} */}
-                           <Button
+              <Button
web/src/lib/sse-client.ts (2)

60-93: SSE messages split across chunks may be lost.

The current implementation splits each chunk by \n and parses lines individually. However, SSE messages can be split across TCP chunks. If a data: line is partially received in one chunk and completed in the next, the partial JSON will fail to parse.

Buffer incomplete lines across chunks:

   const reader = response.body.getReader();
   const decoder = new TextDecoder();
   let resultData: T | undefined;
+  let buffer = '';

   while (true) {
     const { done, value } = await reader.read();
     if (done) break;

-    const chunk = decoder.decode(value, { stream: true });
-    const lines = chunk.split('\n');
+    buffer += decoder.decode(value, { stream: true });
+    const lines = buffer.split('\n');
+    // Keep the last potentially incomplete line in the buffer
+    buffer = lines.pop() || '';

     for (const line of lines) {
       if (line.startsWith('data: ')) {

95-96: resultData may be undefined if no 'complete' event is received.

If the SSE stream ends without emitting a complete event (e.g., due to network interruption or server error), resultData will be undefined but cast to T, causing runtime errors in consumers.

Add validation before returning:

-  return resultData as T;
+  if (resultData === undefined) {
+    throw new Error('SSE stream ended without a complete event');
+  }
+  return resultData;
🧹 Nitpick comments (2)
web/src/components/dataset/dataset-upload-wizard.tsx (2)

321-322: Polling with hardcoded delay is fragile.

Using a fixed 2-second delay to wait for the store to update is a race condition waiting to happen. If the store takes longer to update, the URL won't be found; if it's faster, it adds unnecessary latency.

Consider subscribing to store changes or using a callback from triggerUpload that resolves with the upload response directly, rather than polling the store after a timeout.

-          // Wait a bit longer for the store to update with the upload response
-          await new Promise((resolve) => setTimeout(resolve, 2000));
+          // TODO: Refactor triggerUpload to return the upload response directly
+          // instead of relying on store polling with arbitrary timeouts
+          await new Promise((resolve) => setTimeout(resolve, 2000));

456-473: Consider using server-provided progress when available.

The SSEEvent interface includes a progress field, but the callback ignores it and uses artificial increments instead. This may result in inaccurate progress representation if the server sends actual progress percentages.

         onProgress: (event: SSEEvent) => {
           if (event.type === 'status_update') {
-            // Increment progress artificially (capped at 90%)
-            currentProgress = Math.min(currentProgress + 10, 90);
+            // Use server progress if available, otherwise increment artificially
+            if (event.progress !== undefined) {
+              currentProgress = Math.min(event.progress, 90);
+            } else {
+              currentProgress = Math.min(currentProgress + 10, 90);
+            }
             
             // 6. UPDATE TOAST with SSE Message
             toast.loading(
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between be7a2c9 and 41192ad.

📒 Files selected for processing (2)
  • web/src/components/dataset/dataset-upload-wizard.tsx (8 hunks)
  • web/src/lib/sse-client.ts (1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
web/src/**/*.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

web/src/**/*.{ts,tsx}: Use Next.js 15 with App Router and React 19 for the web frontend framework
Use TypeScript as the language for the web frontend
Generate and use types from API responses for type safety
Use dynamic imports and code splitting for optimal performance
Load DuckDB WASM on-demand and use web workers for client-side SQL execution

Files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
  • web/src/lib/sse-client.ts
web/src/components/**/*.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

web/src/components/**/*.{ts,tsx}: Use TailwindCSS with Radix UI components for styling instead of custom CSS
Use React Hook Form with Zod validation for form handling
Use pre-configured queries and mutations from lib/queries and lib/mutations for API calls
Wrap features in error boundaries to handle errors gracefully
Always handle loading and error states in UI components
Use Next.js Image component for image optimization

Files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
web/src/components/dataset/**upload**.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

Use Uppy for dataset file uploads in upload components

Files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
🧠 Learnings (9)
📓 Common learnings
Learnt from: CR
Repo: factly/gopie PR: 0
File: chat-server/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:34.955Z
Learning: Applies to chat-server/app/api/v1/routers/**/*.py : Use Server-Sent Events for streaming real-time query progress responses
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/components/dataset/**upload**.{ts,tsx} : Use Uppy for dataset file uploads in upload components

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/components/dataset/sql-editor.{ts,tsx} : Use Monaco Editor for SQL editing with syntax highlighting in SQL editor components

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/components/**/*.{ts,tsx} : Use pre-configured queries and mutations from lib/queries and lib/mutations for API calls

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to web/**/*.{js,jsx,ts,tsx} : Use Zustand for global state management and React Query for server state in the Next.js frontend

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to web/**/*.{js,jsx,ts,tsx} : Integrate DuckDB WASM for client-side SQL execution in the Next.js frontend

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/lib/{queries,mutations}/**/*.{ts,tsx} : Use React Query (TanStack Query) for server state management

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to web/**/*.{js,jsx,ts,tsx} : Use Monaco Editor for SQL editing in the Next.js frontend

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/**/*.{ts,tsx} : Load DuckDB WASM on-demand and use web workers for client-side SQL execution

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
🧬 Code graph analysis (1)
web/src/lib/sse-client.ts (1)
web/src/lib/api-client.ts (1)
  • getGlobalOrganizationId (42-44)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: claude-review
🔇 Additional comments (4)
web/src/components/dataset/dataset-upload-wizard.tsx (3)

43-45: LGTM!

The new imports for SSE support (useSourceDatasetSSE, Progress, SSEEvent) are correctly added to support the streaming progress functionality.


78-78: LGTM!

The SSE-based mutation hook is correctly initialized, aligning with the learnings that recommend using Server-Sent Events for streaming real-time progress responses.


476-493: No issues found. The response structure res.dataset and res.dataset.alias is correct for the SSE endpoint implementation. The fetchWithSSE function extracts event.data from the 'complete' event and returns it directly. The backend sends the event data as {"dataset": {...}, "summary": {...}}, so accessing res.dataset is the proper pattern here, not a typical REST wrapper like res.data.dataset.

web/src/lib/sse-client.ts (1)

1-8: LGTM!

The SSEEvent interface is well-typed with appropriate event types and optional fields for data and progress.

Comment thread web/src/lib/sse-client.ts
Comment on lines +21 to +36
// Add Auth & Org Headers (Reusing logic from api-client)
const isAuthEnabled = String(process.env.NEXT_PUBLIC_ENABLE_AUTH).trim() === "true";
const orgId = getGlobalOrganizationId();

// Token
if (accessToken) {
headers["Authorization"] = `Bearer ${accessToken}`;
}

// Org / User handling
if (!isAuthEnabled) {
headers["x-user-id"] = "system";
headers["x-organization-id"] = "system";
} else if (orgId) {
headers["x-organization-id"] = orgId;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Search for SSE endpoint handlers in the backend
find . -type f -name "*.go" | head -20

Repository: factly/gopie

Length of output: 834


🏁 Script executed:

# Look for SSE-related endpoints and organization header requirements
rg "sse|SSE|streaming" --type go -l | head -20

Repository: factly/gopie

Length of output: 634


🏁 Script executed:

# Check x-organization-id header handling across the codebase
rg "x-organization-id" -c --type go | head -30

Repository: factly/gopie

Length of output: 153


🏁 Script executed:

# Examine the auth middleware to see how x-organization-id is handled
cat -n server/interfaces/http/middleware/auth.go

Repository: factly/gopie

Length of output: 3752


🏁 Script executed:

# Examine dataset_access middleware
cat -n server/interfaces/http/middleware/dataset_access.go

Repository: factly/gopie

Length of output: 7304


🏁 Script executed:

# Look for SSE/streaming endpoint definitions
rg "sse|stream" server/interfaces/http/routes -i -l

Repository: factly/gopie

Length of output: 472


🏁 Script executed:

# Check the chats/create.go which likely handles SSE/streaming
cat -n server/interfaces/http/routes/api/chats/create.go | head -100

Repository: factly/gopie

Length of output: 4695


🏁 Script executed:

# Look for route definitions to see which middleware is applied to SSE endpoints
rg "chats.*create\|streaming\|sse" server/interfaces/http -A5 -B5 -i | head -60

Repository: factly/gopie

Length of output: 38


SSE requests will fail with 401 when auth is enabled but orgId is unavailable.

The backend middleware (AuthorizeHeaders, APIAuth) requires the x-organization-id header for all authenticated requests. The current sse-client.ts logic has a gap: when auth is enabled but orgId is null, no x-organization-id header is sent, causing the request to fail with a 401 Unauthorized response from the middleware (lines 63-69, 88-95 in auth.go).

The SSE endpoint handler in server/interfaces/http/routes/api/chats/create.go expects orgID to be populated in the context (line 49), which requires the auth middleware to have validated and set it. Requests without this header will be rejected before reaching the handler.

Either ensure orgId is always available when auth is enabled, or modify the frontend logic to handle missing orgId appropriately.

Comment thread web/src/lib/sse-client.ts
Comment on lines +85 to +90
} catch (e) {
if (e instanceof Error && e.message !== "Unexpected end of JSON input") {
console.error("SSE Parse Error", e);
}
if (line.includes('"type":"error"')) throw e; // Re-throw actual errors
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fragile error re-throw logic.

The catch block re-throws based on string matching (line.includes('"type":"error"')), but e at this point is likely a SyntaxError from failed JSON parsing, not the actual SSE error. This means if an error event's JSON is malformed, you'll throw a parse error instead of the actual error message.

Consider restructuring to handle parse errors separately:

         } catch (e) {
-          if (e instanceof Error && e.message !== "Unexpected end of JSON input") {
-             console.error("SSE Parse Error", e);
-          }
-          if (line.includes('"type":"error"')) throw e; // Re-throw actual errors
+          // Only log parse errors, don't re-throw them
+          // Partial JSON will be handled by buffering (see above fix)
+          if (e instanceof Error && e.message !== "Unexpected end of JSON input") {
+            console.error("SSE Parse Error:", line, e);
+          }
         }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
web/src/lib/sse-client.ts around lines 85-90: the catch currently tests
line.includes('"type":"error"') after a JSON parse failure and re-throws the
parse exception instead of the actual SSE error; change the flow so you first
attempt to parse the line inside try, and in the catch distinguish a JSON
SyntaxError from an SSE error by inspecting the raw line: if the raw line
indicates an SSE error (e.g., contains "type":"error") construct and throw an
Error containing the raw line or the parsed error payload if you can parse it
safely, otherwise log the parse failure with the raw line and do not re-throw
the parse SyntaxError as the SSE error; ensure only genuine SSE error objects
(parsed or reconstructed from the raw line) are re-thrown.

@claude
Copy link
Copy Markdown

claude Bot commented Dec 11, 2025

Claude finished @elliot14A's task —— View job


PR Review: Converting Upload/Refresh to SSE Endpoints

Review Tasks

  • Read repository guidelines (CLAUDE.md)
  • Analyze modified Go files for SSE implementation
  • Check code quality and best practices
  • Identify potential bugs or issues
  • Review performance considerations
  • Examine security concerns
  • Assess test coverage needs

🔍 Code Review Summary

This PR successfully converts upload and refresh endpoints to Server-Sent Events (SSE), providing real-time progress updates for long-running operations. The implementation follows good patterns but has several areas for improvement.

Strengths

  1. Consistent SSE Implementation: All 4 files follow a similar pattern with proper SSE headers and streaming
  2. Good Error Handling: Comprehensive error handling with proper cleanup mechanisms
  3. Resource Cleanup: Well-implemented cleanup functions to handle partial failures
  4. Progress Tracking: Clear status updates throughout the process
  5. Transaction Management: Proper transaction handling in S3 refresh endpoint

🚨 Critical Issues

1. Goroutine Leak Risk (High Priority)

Files: All 4 files (create.go:159, refresh.go:83, etc.)

The SSE implementation starts goroutines that could leak if the client disconnects:

go func() {
    defer close(sseChan)
    // Long-running operations...
}()

Issue: No context cancellation when client disconnects
Impact: Memory leaks and resource exhaustion
Fix: Use ctx.Context() with context.WithCancel and monitor for client disconnection

2. Inconsistent User ID Extraction (High Priority)

Files:

  • s3/create.go:102 - Uses header: ctx.Get(middleware.UserCtxKey)
  • database/create.go:113 - Uses context: ctx.Locals(middleware.UserCtxKey)
  • s3/refresh.go:61 - Uses context: ctx.Locals(middleware.UserCtxKey)

Issue: Inconsistent user ID extraction methods
Impact: Potential authentication/authorization issues
Fix: Standardize on one approach (likely ctx.Locals)

3. Ignored JSON Marshal Errors (Medium Priority)

Files: All files (e.g., database/create.go:145)

payloadBytes, _ := json.Marshal(eventPayload)

Issue: Ignored errors could cause malformed SSE messages
Impact: Client-side parsing errors
Fix: Handle JSON marshal errors properly

🔧 Other Issues

4. Hardcoded Sleep Statements

Files: Multiple files (e.g., database/create.go:200, s3/refresh.go:168)

time.Sleep(2 * time.Second) // Wait for table creation

Issue: Arbitrary delays, not deterministic
Impact: Slower performance, potential race conditions
Fix: Use proper synchronization or polling mechanisms

5. Missing Input Validation

Files: s3/create.go:100

Issue: Organization ID validation happens after getting from header instead of middleware
Impact: Security risk if header manipulation occurs
Fix: Use middleware validation consistently

6. Resource Cleanup Race Conditions

Files: Database files (database/create.go:44)

Issue: Cleanup operations aren't synchronized
Impact: Potential partial cleanup failures
Fix: Add proper error handling for cleanup operations

7. Large Channel Buffer

Files: All files (sseChan := make(chan SSEData, 10))

Issue: Fixed buffer size might not be optimal
Impact: Potential blocking or memory usage
Fix: Consider unbuffered channel or dynamic sizing

🛡️ Security Concerns

  1. SQL Injection Prevention: Good use of parameterized queries through service layer
  2. Input Validation: Proper validation using struct tags
  3. Error Message Exposure: Consider sanitizing error messages sent to clients
  4. Resource Access Control: Organization ID validation needs improvement

🚀 Performance Considerations

  1. Connection Pooling: Ensure database connections are properly managed
  2. Memory Usage: SSE channels and long-running goroutines could accumulate
  3. Client Timeouts: No client timeout handling for long operations

📋 Test Coverage

Issue: No test files found for these endpoints
Recommendation: Add unit tests for:

  • SSE event formatting
  • Error handling scenarios
  • Resource cleanup logic
  • Timeout scenarios

📝 Frontend Issues

TypeScript SSE Client (web/src/lib/sse-client.ts)

  1. Message Fragmentation: SSE messages can be split across TCP chunks, causing JSON parse failures
  2. Undefined Result Data: Function returns undefined if no 'complete' event received
  3. Auth Header Issue: Missing x-organization-id header when auth enabled but orgId unavailable

UI Components

  1. Commented Code: Remove commented-out progress UI blocks in dataset-upload-wizard.tsx
  2. Polling with Timeouts: Replace hardcoded delays with proper event-driven patterns
  3. Progress Handling: Consider using server-provided progress values instead of artificial increments

📝 Recommendations

Immediate Actions:

  1. Fix goroutine leaks with proper context cancellation
  2. Standardize user ID extraction method
  3. Handle JSON marshal errors
  4. Fix SSE client message fragmentation

Future Improvements:

  1. Add comprehensive test coverage
  2. Implement client timeout handling
  3. Consider using a more robust SSE library
  4. Add metrics/monitoring for long-running operations

Overall Assessment: The SSE implementation is functional and well-structured, but requires fixes for production readiness, particularly around resource management and error handling.


Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
web/src/components/dataset/dataset-upload-wizard.tsx (1)

1284-1299: Remove legacy commented‑out uploadProgress JSX.

The large commented block referencing uploadProgress is now obsolete given the new toast‑based SSE progress flow, and it still mentions state that no longer exists in the component. It adds noise and could confuse future readers.

Please delete the entire commented section and keep only the Create Dataset button:

-              {/* {uploadProgress.status === 'processing' && (
-      <div className="space-y-3 p-4 border rounded-lg bg-secondary/10">
-        <div className="flex items-center justify-between text-sm">
-          <span className="font-medium flex items-center gap-2">
-            <Loader2 className="h-4 w-4 animate-spin text-primary" />
-            {uploadProgress.message}
-          </span>
-          <span className="text-muted-foreground">{uploadProgress.percentage}%</span>
-        </div>
-        <Progress value={uploadProgress.percentage} className="h-2" />
-        <p className="text-xs text-muted-foreground">
-          Processing your data. Large datasets may take a moment.
-        </p>
-      </div>
-    )} */}
-                           <Button
+              <Button
                 size="sm"
                 onClick={handleCreateDataset}
                 disabled={
🧹 Nitpick comments (3)
web/src/components/dataset/dataset-upload-wizard.tsx (3)

445-447: Consider using event.progress / more SSE types instead of only synthetic increments.

Right now onProgress only handles event.type === 'status_update' and bumps currentProgress by a fixed 10%, ignoring potential progress, complete, or error events on SSEEvent. If the backend ever sends real numeric progress (event.type === 'progress' with event.progress) or richer terminal events, the UI here won’t reflect it.

You could prefer explicit progress when available and fall back to synthetic increments otherwise, e.g.:

-        onProgress: (event: SSEEvent) => {
-          if (event.type === 'status_update') {
-            currentProgress = Math.min(currentProgress + 10, 90);
+        onProgress: (event: SSEEvent) => {
+          if (event.type === 'progress' && typeof event.progress === 'number') {
+            currentProgress = Math.min(event.progress, 90);
+          } else if (event.type === 'status_update') {
+            currentProgress = Math.min(currentProgress + 10, 90);
           }

That keeps the UI future‑proof if the SSE payloads evolve.

Also applies to: 457-475


247-251: Verify Sonner toast duration behavior when repeatedly updating the same toastId.

The initial toast.loading sets { id: toastId, duration: Infinity }, but subsequent toast.loading calls with the same id omit duration. Depending on Sonner’s merge behavior, this may reset the toast to the default duration and allow it to auto‑dismiss while the SSE flow is still running.

To avoid a flaky progress UX, consider either:

  • Always passing duration: Infinity for all toast.loading(..., { id }) updates in this flow, or
  • Confirming via docs/runtime that options are merged and the original duration: Infinity is preserved on update.

Example tweak:

-      toast.loading(
+      toast.loading(
         <div className="flex flex-col gap-2 w-full min-w-[250px]">
           {/* ... */}
         </div>,
-        { id: toastId }
+        { id: toastId, duration: Infinity }
       );

Also applies to: 263-269, 292-301, 310-321, 463-472


448-456: Tighten typing instead of as { dataset: Dataset } at the SSE call site.

The cast as { dataset: Dataset } hides mismatches between what the SSE endpoint returns and what the UI expects. Given you already import Dataset from the API client, it’d be safer to let the hook expose that shape directly and drop the assertion.

For example, in useSourceDatasetSSE you could type the return as:

type SourceDatasetResponse = { dataset: Dataset };

export function useSourceDatasetSSE() {
  // ...
  const startUpload = async (params: Params): Promise<SourceDatasetResponse> => {
    // ...
  };
  return startUpload;
}

Then here you can just:

const res = await startUpload({ /* ... */ });
// res.dataset is strongly typed

That keeps the contract in one place and leverages the generated API types end‑to‑end.

Also applies to: 475-485

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 41192ad and 6531ed2.

📒 Files selected for processing (1)
  • web/src/components/dataset/dataset-upload-wizard.tsx (8 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
web/src/**/*.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

web/src/**/*.{ts,tsx}: Use Next.js 15 with App Router and React 19 for the web frontend framework
Use TypeScript as the language for the web frontend
Generate and use types from API responses for type safety
Use dynamic imports and code splitting for optimal performance
Load DuckDB WASM on-demand and use web workers for client-side SQL execution

Files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
web/src/components/**/*.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

web/src/components/**/*.{ts,tsx}: Use TailwindCSS with Radix UI components for styling instead of custom CSS
Use React Hook Form with Zod validation for form handling
Use pre-configured queries and mutations from lib/queries and lib/mutations for API calls
Wrap features in error boundaries to handle errors gracefully
Always handle loading and error states in UI components
Use Next.js Image component for image optimization

Files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
web/src/components/dataset/**upload**.{ts,tsx}

📄 CodeRabbit inference engine (web/CLAUDE.md)

Use Uppy for dataset file uploads in upload components

Files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
🧠 Learnings (9)
📓 Common learnings
Learnt from: CR
Repo: factly/gopie PR: 0
File: chat-server/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:34.955Z
Learning: Applies to chat-server/app/api/v1/routers/**/*.py : Use Server-Sent Events for streaming real-time query progress responses
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/components/dataset/**upload**.{ts,tsx} : Use Uppy for dataset file uploads in upload components

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/components/dataset/sql-editor.{ts,tsx} : Use Monaco Editor for SQL editing with syntax highlighting in SQL editor components

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/components/**/*.{ts,tsx} : Use pre-configured queries and mutations from lib/queries and lib/mutations for API calls

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to web/**/*.{js,jsx,ts,tsx} : Use Zustand for global state management and React Query for server state in the Next.js frontend

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to web/**/*.{js,jsx,ts,tsx} : Integrate DuckDB WASM for client-side SQL execution in the Next.js frontend

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/lib/{queries,mutations}/**/*.{ts,tsx} : Use React Query (TanStack Query) for server state management

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to web/**/*.{js,jsx,ts,tsx} : Use Monaco Editor for SQL editing in the Next.js frontend

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
📚 Learning: 2025-12-02T11:48:50.411Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: web/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:50.411Z
Learning: Applies to web/src/**/*.{ts,tsx} : Load DuckDB WASM on-demand and use web workers for client-side SQL execution

Applied to files:

  • web/src/components/dataset/dataset-upload-wizard.tsx
🧬 Code graph analysis (1)
web/src/components/dataset/dataset-upload-wizard.tsx (2)
web/src/lib/mutations/dataset/source-dataset.ts (1)
  • useSourceDatasetSSE (59-103)
web/src/lib/sse-client.ts (1)
  • SSEEvent (3-8)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: claude-review
  • GitHub Check: Analyze (go)
🔇 Additional comments (1)
web/src/components/dataset/dataset-upload-wizard.tsx (1)

43-47: SSE upload wiring and shared toast flow look good.

Using useSourceDatasetSSE, SSEEvent, and Dataset together with a single toastId to track all phases (init → S3 upload → SSE processing → completion) is a clean design. It keeps progress UX centralized while still leaning on lib/mutations/sse-client instead of ad‑hoc fetches, which matches the repo’s guidance around shared mutations.

Based on learnings, this correctly uses preconfigured mutations and SSE utilities from @/lib/mutations and @/lib/sse-client rather than custom API calls.

Also applies to: 79-79, 241-251

@claude
Copy link
Copy Markdown

claude Bot commented Dec 11, 2025

Claude finished @elliot14A's task —— View job


PR Review: Converting Upload/Refresh to SSE Endpoints

Review Tasks

  • Read repository guidelines (CLAUDE.md)
  • Analyze modified Go files for SSE implementation
  • Review frontend TypeScript SSE client implementation
  • Check code quality and best practices
  • Identify potential bugs or issues
  • Review performance considerations
  • Examine security concerns
  • Assess test coverage needs

🔍 Code Review Summary

This PR successfully converts upload and refresh endpoints to Server-Sent Events (SSE), providing real-time progress updates for long-running operations. The implementation follows good patterns but has several areas for improvement.

Strengths

  1. Consistent SSE Implementation: All 4 files follow a similar pattern with proper SSE headers and streaming
  2. Good Error Handling: Comprehensive error handling with proper cleanup mechanisms
  3. Resource Cleanup: Well-implemented cleanup functions to handle partial failures
  4. Progress Tracking: Clear status updates throughout the process
  5. Transaction Management: Proper transaction handling in S3 refresh endpoint

🚨 Critical Issues

1. Goroutine Leak Risk (High Priority)

Files: All 4 files (create.go:159, refresh.go:83, etc.)

The SSE implementation starts goroutines that could leak if the client disconnects:

go func() {
    defer close(sseChan)
    // Long-running operations...
}()

Issue: No context cancellation when client disconnects
Impact: Memory leaks and resource exhaustion
Fix: Use ctx.Context() with context.WithCancel and monitor for client disconnection

2. Inconsistent User ID Extraction (High Priority)

Files:

  • s3/create.go:102 - Uses header: ctx.Get(middleware.UserCtxKey)
  • database/create.go:113 - Uses context: ctx.Locals(middleware.UserCtxKey)
  • s3/refresh.go:61 - Uses context: ctx.Locals(middleware.UserCtxKey)

Issue: Inconsistent user ID extraction methods
Impact: Potential authentication/authorization issues
Fix: Standardize on one approach (likely ctx.Locals)

3. Ignored JSON Marshal Errors (Medium Priority)

Files: All files (e.g., database/create.go:145)

payloadBytes, _ := json.Marshal(eventPayload)

Issue: Ignored errors could cause malformed SSE messages
Impact: Client-side parsing errors
Fix: Handle JSON marshal errors properly

🔧 Other Issues

4. Hardcoded Sleep Statements

Files: Multiple files (e.g., database/create.go:200, s3/refresh.go:168)

time.Sleep(2 * time.Second) // Wait for table creation

Issue: Arbitrary delays, not deterministic
Impact: Slower performance, potential race conditions
Fix: Use proper synchronization or polling mechanisms

5. Missing Input Validation

Files: s3/create.go:100

Issue: Organization ID validation happens after getting from header instead of middleware
Impact: Security risk if header manipulation occurs
Fix: Use middleware validation consistently

6. Resource Cleanup Race Conditions

Files: Database files (database/create.go:44)

Issue: Cleanup operations aren't synchronized
Impact: Potential partial cleanup failures
Fix: Add proper error handling for cleanup operations

7. Large Channel Buffer

Files: All files (sseChan := make(chan SSEData, 10))

Issue: Fixed buffer size might not be optimal
Impact: Potential blocking or memory usage
Fix: Consider unbuffered channel or dynamic sizing

🛡️ Security Concerns

  1. SQL Injection Prevention: Good use of parameterized queries through service layer
  2. Input Validation: Proper validation using struct tags
  3. Error Message Exposure: Consider sanitizing error messages sent to clients
  4. Resource Access Control: Organization ID validation needs improvement

🚀 Performance Considerations

  1. Connection Pooling: Ensure database connections are properly managed
  2. Memory Usage: SSE channels and long-running goroutines could accumulate
  3. Client Timeouts: No client timeout handling for long operations

📋 Test Coverage

Issue: No test files found for these endpoints
Recommendation: Add unit tests for:

  • SSE event formatting
  • Error handling scenarios
  • Resource cleanup logic
  • Timeout scenarios

📝 Frontend Issues

TypeScript SSE Client (web/src/lib/sse-client.ts)

  1. Message Fragmentation: SSE messages can be split across TCP chunks, causing JSON parse failures
  2. Undefined Result Data: Function returns undefined if no 'complete' event received
  3. Auth Header Issue: Missing x-organization-id header when auth enabled but orgId unavailable

UI Components

  1. Commented Code: Remove commented-out progress UI blocks in dataset-upload-wizard.tsx
  2. Polling with Timeouts: Replace hardcoded delays with proper event-driven patterns
  3. Progress Handling: Consider using server-provided progress values instead of artificial increments

📝 Recommendations

Immediate Actions:

  1. Fix goroutine leaks with proper context cancellation
  2. Standardize user ID extraction method
  3. Handle JSON marshal errors
  4. Fix SSE client message fragmentation

Future Improvements:

  1. Add comprehensive test coverage
  2. Implement client timeout handling
  3. Consider using a more robust SSE library
  4. Add metrics/monitoring for long-running operations

Overall Assessment: The SSE implementation is functional and well-structured, but requires fixes for production readiness, particularly around resource management and error handling.


Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
server/interfaces/http/routes/source/database/refresh.go (3)

136-140: Driver inference via string prefix is still fragile.

This issue was flagged in a previous review. The domain model should include a Driver field that is populated from the database, eliminating the need for string prefix checking.


251-256: Delete old summary before creating new one to prevent duplicates.

This issue was flagged in a previous review. The CreateDatasetSummary operation performs a plain INSERT without upsert logic and will create duplicate records. The pattern in s3/refresh.go deletes the existing summary first.


202-205: Capture request context before goroutine to enable cancellation.

Using context.Background() prevents panics but also prevents cancellation when the client disconnects. This wastes resources on abandoned operations.

Apply this diff to capture the context:

+	// Capture context before starting goroutine
+	reqCtx := ctx.Context()
+
 	// Start async refresh process
 	go func() {

Then use the captured context:

-			err = h.dbSourceSvc.Update(context.Background(), models.UpdateDatabaseSourceLastUpdatedAtParams{
+			err = h.dbSourceSvc.Update(reqCtx, models.UpdateDatabaseSourceLastUpdatedAtParams{
🧹 Nitpick comments (3)
server/interfaces/http/routes/source/database/refresh.go (3)

42-58: Consider starting SSE stream before validation for consistency.

Early validation errors return JSON responses while later errors use SSE format. While this works (SSE can't start without a valid request), it creates an inconsistent error format for clients.

If you want a purely SSE-based endpoint, consider starting the SSE stream first and sending validation errors as SSE events. However, the current approach is acceptable.


61-61: Consider increasing SSE channel buffer size.

The buffer size of 10 may be insufficient given the number of events sent during refresh (15+ status updates). While blocking is unlikely due to continuous consumption, a larger buffer (e.g., 50) provides more headroom and prevents potential blocking if the client consumes slowly.


215-215: Replace hard-coded sleep with a more robust wait mechanism.

The 2-second sleep is fragile and may cause issues if the OLAP commit takes longer or shorter than expected. Consider implementing polling with exponential backoff or a callback mechanism to detect when data is committed.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6531ed2 and f323c83.

📒 Files selected for processing (1)
  • server/interfaces/http/routes/source/database/refresh.go (3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: CR
Repo: factly/gopie PR: 0
File: chat-server/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:34.955Z
Learning: Applies to chat-server/app/api/v1/routers/**/*.py : Use Server-Sent Events for streaming real-time query progress responses
📚 Learning: 2025-12-02T11:48:34.955Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: chat-server/CLAUDE.md:0-0
Timestamp: 2025-12-02T11:48:34.955Z
Learning: Applies to chat-server/app/api/v1/routers/**/*.py : Use Server-Sent Events for streaming real-time query progress responses

Applied to files:

  • server/interfaces/http/routes/source/database/refresh.go
📚 Learning: 2025-07-29T09:36:39.908Z
Learnt from: CR
Repo: factly/gopie PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-07-29T09:36:39.908Z
Learning: Applies to server/**/*.go : Store dataset files in S3-compatible storage (MinIO) in the Go backend server

Applied to files:

  • server/interfaces/http/routes/source/database/refresh.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: claude-review
  • GitHub Check: Analyze (go)
🔇 Additional comments (2)
server/interfaces/http/routes/source/database/refresh.go (2)

4-6: LGTM!

The added imports (bufio, context, encoding/json) are necessary for SSE streaming and event marshaling.


282-308: LGTM!

The SSE streaming implementation is correctly structured:

  • Proper SSE headers (Content-Type, Cache-Control, Connection, Transfer-Encoding)
  • Buffered writer with flush after each event
  • Error handling during streaming
  • Clean loop over channel until closed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants