Skip to content

feat: connector workbench#640

Closed
altitude wants to merge 2 commits intomainfrom
feat/connector-workbench
Closed

feat: connector workbench#640
altitude wants to merge 2 commits intomainfrom
feat/connector-workbench

Conversation

@altitude
Copy link
Member

@altitude altitude commented Feb 4, 2026

No description provided.

@altitude altitude requested a review from a team as a code owner February 4, 2026 21:32
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 4, 2026

Walkthrough

Adds a new Connector Workbench: CLI command, in-memory engine/storage, HTTP server, debug capture/transport, schema inference and baselining, snapshot/replay tooling, introspection, test generation, task tracking, and a React-based web UI.

Changes

Cohort / File(s) Summary
CLI Integration
cmd/root.go, cmd/workbench.go
Adds top-level workbench Cobra subcommand with provider/config flags, provider listing, config validation, and signal-aware startup/shutdown.
Core Workbench & Orchestration
tools/workbench/workbench.go, tools/workbench/workbench.go (constructors/methods across package)
New Workbench type, lifecycle (Start/Stop/Wait), connector instance management, global debug transport and replayer wiring.
Engine & Runtime
tools/workbench/engine.go, tools/workbench/tasks.go
In-process execution engine replacing Temporal workflows with direct task execution, per-page fetch helpers, fetch-state tracking, task tracker with step-mode and execution history.
In-memory Storage & Responses
tools/workbench/storage.go, tools/workbench/responses.go
Thread-safe MemoryStorage with CRUD and snapshot export/import; API response converters for accounts/payments/balances.
Debugging & Transport
tools/workbench/debug.go, tools/workbench/transport.go
DebugStore capturing logs/HTTP/plugin calls with stats; DebugTransport wrapping RoundTripper to capture requests/responses, redact sensitive headers, and optionally infer schemas.
Schemas & Analysis
tools/workbench/schema.go, tools/workbench/baseline.go
JSON schema inference manager, baselining, schema diffing; Baseline manager for data snapshots, exports/imports, and diffs against live data.
Snapshots & Replay
tools/workbench/snapshots.go, tools/workbench/replay.go
SnapshotManager for HTTP exchange preservation and import/export; Replayer for replaying requests, history, comparisons, dry-run, and curl generation.
Introspection & Test Generation
tools/workbench/introspect.go, tools/workbench/testgen.go
Source introspector (file tree, Go AST symbol extraction, code search); TestGenerator to render fixtures and Go test files from saved snapshots.
Server & HTTP API
tools/workbench/server.go
Chi-based HTTP API exposing connector lifecycle, data endpoints, debug, replay, snapshots, schemas, baselines, introspection, tasks, and optional embedded UI routes.
UI Application & Assets
tools/workbench/ui/*, tools/workbench/ui/src/*
React + Vite app: entry, main App.tsx (multi-tab dashboard), typed client API, styles (index.css, App.css), Vite config, and static index.html; includes UI build config and .gitignore.
Utilities & Misc
tools/workbench/* (transport helpers, test/data utilities)
Various helpers for truncation, header copying, filename sanitization, curl generation, and export/import utilities across workbench subsystems.
Documentation
tools/workbench/README.md
README documenting architecture, usage, components, limitations, CLI and API patterns.

Sequence Diagrams

sequenceDiagram
    participant UI as User Interface
    participant Server as Workbench Server
    participant Engine as Engine
    participant Plugin as Connector Plugin
    participant Storage as MemoryStorage
    participant Debug as DebugStore

    UI->>Server: POST /api/connector/:id/run-cycle
    Server->>Engine: RunOneCycle(ctx)
    activate Engine
    Engine->>Plugin: FetchNextAccounts(pagePayload)
    activate Plugin
    Plugin->>Debug: LogPluginCall(method, input)
    Plugin-->>Engine: accounts page + hasMore
    deactivate Plugin
    Engine->>Storage: StoreAccounts(accounts)
    Storage-->>Engine: ack
    Engine->>Debug: LogPluginResult(callID, result)
    Engine->>Engine: update fetchState, spawn child tasks
    deactivate Engine
    Server->>Storage: GetAccounts()
    Storage-->>Server: accounts list
    Server-->>UI: 200 OK (accounts JSON)
Loading
sequenceDiagram
    participant Client as HTTP Client
    participant Transport as DebugTransport
    participant Remote as Remote API
    participant Debug as DebugStore
    participant Schemas as SchemaManager

    Client->>Transport: RoundTrip(request)
    activate Transport
    Transport->>Debug: LogHTTPRequest(request meta)
    Transport->>Remote: Forward request
    Remote-->>Transport: Response
    Transport->>Schemas: InferFromJSON(operation, method, body) 
    activate Schemas
    Schemas-->>Schemas: parse JSON, update schema
    deactivate Schemas
    Transport->>Debug: LogHTTPRequest(response meta)
    Transport-->>Client: response
    deactivate Transport
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Poem

"I nibble logs and chase each trace,
building bridges in memory's space.
Schemas, snapshots, tests in a line—
replaying requests while sipping thyme.
A rabbit's workbench: small, bright, and fine." 🐇✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 61.29% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive No pull request description was provided by the author, making it impossible to assess whether descriptive context about the changes was intended. Add a pull request description explaining the workbench purpose, key components, and integration details to provide context for reviewers.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main addition: a new connector workbench feature with comprehensive tooling for development and testing.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/connector-workbench

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

🤖 Fix all issues with AI agents
In `@tools/workbench/baseline.go`:
- Around line 292-309: In comparePayment, avoid nil pointer deref by checking
base.Amount and curr.Amount before calling Cmp: if both non-nil use Cmp as
before; if one is nil and the other not, append the "amount" Change with
OldValue/NewValue set to a safe string (e.g. "nil" or curr.Amount.String() when
non-nil); if both nil, treat as equal and do nothing. Update the function
comparePayment to perform these nil checks and use a helper-safe conversion
(e.g. conditional Amount.String() calls) so Amount.String() is never called on a
nil pointer.
- Around line 229-247: In compareAccount, the DefaultAsset check currently does
a pointer comparison (base.DefaultAsset != curr.DefaultAsset) which compares
addresses not string values and can misreport changes; update the logic in
compareAccount to mirror the Name handling: first detect nil-vs-non-nil
differences, then if both non-nil compare *base.DefaultAsset and
*curr.DefaultAsset for value inequality, and set OldValue/NewValue accordingly
before appending the Change for Field "default_asset".
- Around line 359-367: In BaselineManager.compareBalance, avoid nil dereference
by first checking models.PSPBalance.Amount on both base and curr (e.g., if
base.Amount == nil && curr.Amount == nil do nothing; if one is nil treat as
change and use a "nil" placeholder for OldValue/NewValue; otherwise use
base.Amount.Cmp(curr.Amount) and Amount.String()), ensuring you only call .Cmp()
and .String() when the *big.Int is non-nil; update the Change for Field "amount"
accordingly.

In `@tools/workbench/debug.go`:
- Around line 87-91: The nextID implementation in DebugStore uses an
unsynchronized idCounter and a second-precision timestamp causing collisions;
change DebugStore.nextID to use an atomic counter (e.g., sync/atomic on
d.idCounter) and include a higher-resolution timestamp (time.Now().UnixNano())
or append the atomic value to the timestamp to guarantee uniqueness and avoid
races; update references to d.idCounter and the nextID method to use
atomic.AddUint64/LoadUint64 (or equivalent) and remove non-atomic increments to
prevent data races and ID collisions.

In `@tools/workbench/engine.go`:
- Around line 624-635: The CreateWebhooks call is using a hard-coded
WebhookBaseUrl ("http://localhost:8080/webhooks") in both e.debug.LogPluginCall
and e.plugin.CreateWebhooks; change these to use the engine's
configuration-derived base URL instead (e.g. build from e.config.ListenAddr or
an injected WebhookBaseURL field), replace the literal with the computed base +
"/webhooks", and propagate this change to any call sites that construct
CreateWebhooksRequest so both LogPluginCall and CreateWebhooks use the same
config-driven URL (refer to e.debug.LogPluginCall, e.plugin.CreateWebhooks and
the CreateWebhooksRequest WebhookBaseUrl field).
- Around line 664-816: The helpers (FetchAccountsOnePage, FetchPaymentsOnePage,
FetchBalancesOnePage) race because they read and mutate fetchState fields
outside the mutex; to fix, serialize per-key fetches by adding an in-flight
guard on fetchState (e.g. state.InFlight) and use e.mu to: 1) lock, check
state.HasMore and state.InFlight, set state.InFlight=true and snapshot
currentState and pageSize into locals, then unlock; 2) call the plugin with the
snapped values; and 3) re-lock to update state.State, state.HasMore,
state.PagesFetched, state.TotalItems and clear state.InFlight (also clear on
error) and then unlock; apply the same pattern to payments and balances and
ensure ResetFetchState interacts safely with state.InFlight.

In `@tools/workbench/server.go`:
- Around line 1067-1095: The snapshot export handler (handleExportSnapshots)
currently accepts arbitrary Directory paths from the request and passes them to
snapshots.ExportToDir; fix this by constraining all file operations to the
server's safe base directory (e.g., s.config.PersistPath or a dedicated
baseDir): resolve the requested path with filepath.Join(baseDir, req.Directory),
call filepath.Abs on both baseDir and the joined path, verify the joined path is
inside the base (e.g., using filepath.Rel and ensuring it does not start with
".."), and reject requests with a 400/403 if the check fails; apply the same
validation pattern to the snapshot import handler (the import-related methods
around 1098-1127), the test generation handlers (around 1148-1177), and anywhere
else file paths are accepted (1651-1679) before calling functions like
snapshots.ExportToDir or similar.
- Around line 536-545: handleHTTPCaptureStatus currently dereferences
transport.MaxBodySize without checking for nil; update
Server.handleHTTPCaptureStatus to safely handle a nil transport returned by
s.workbench.Transport() by computing a maxBodySize variable (default 0 or
appropriate sentinel) only when transport != nil, use transport.IsEnabled() as
now (guarding when transport is nil), and pass that maxBodySize into
s.jsonResponse along with the enabled flag so no panic occurs when Transport()
is nil.

In `@tools/workbench/storage.go`:
- Around line 350-365: The Export method (MemoryStorage.Export) currently
returns live map references causing possible "concurrent map iteration and map
write" panics; fix it by deep-copying each internal map/slice (Accounts,
Payments, Balances, ExternalAccounts, Others, States, TasksTree, WebhookConfigs)
while holding s.mu.RLock() and return the snapshot built from these copies
(preserving ExportedAt = time.Now()); ensure copies are shallow-deep as
appropriate (new maps with copied entries, and cloned nested map values if they
are maps/slices) so the caller/encoder works on independent data after the lock
is released.

In `@tools/workbench/transport.go`:
- Around line 113-134: The schema inference is using the truncated string
(entry.ResponseBody) which can produce invalid JSON; keep using the original
full byte slice read from resp.Body for InferFromJSON. After reading bodyBytes
and restoring resp.Body (in the block that calls truncateBody and sets
entry.ResponseBody), pass the original bodyBytes (not entry.ResponseBody) into
t.Schemas.InferFromJSON (ensuring you still convert to bytes when needed), and
only use truncateBody for logging/storage; reference the functions/variables
truncateBody, entry.ResponseBody, InferFromJSON, resp.Body and the surrounding
logic in transport.go to make this change.
🟡 Minor comments (16)
tools/workbench/README.md-17-23 (1)

17-23: ⚠️ Potential issue | 🟡 Minor

Add language identifiers to fenced code blocks (MD040).

markdownlint flags these fences; adding language tags improves readability and fixes lint.

🔧 Suggested patch
-```
+```text
 payments binary
     │
     ├── payments server      ← Full production mode (Temporal + PostgreSQL)
     │
     └── payments workbench   ← Lightweight dev mode (in-memory)

@@
- +text
┌─────────────────────────────────────────────────────────────────┐
│ Workbench │
@@
└─────────────────────────────────────────────────────────────────┘

@@
-```
+```text
Temporal Workflow
    └─→ Schedule activity → Retry on failure → Persist state

@@
- +text
┌─────────────────────┐
│ pkg/connector │ ← Public API for connectors
@@
└─────────────────────┘

Also applies to: 27-44, 50-53, 161-176

tools/workbench/debug.go-102-109 (1)

102-109: ⚠️ Potential issue | 🟡 Minor

Guard against nil errors in LogError.

Calling LogError with a nil error will panic.

✅ Suggested fix
 func (d *DebugStore) LogError(operation string, err error) {
+	if err == nil {
+		return
+	}
 	d.addEntry(DebugEntry{
 		Type:      DebugEntryTypeError,
 		Operation: operation,
 		Error:     err.Error(),
 	})
 }
tools/workbench/ui/src/App.tsx-564-582 (1)

564-582: ⚠️ Potential issue | 🟡 Minor

URL parsing can throw on malformed URLs.

The new URL(req.url) constructor on Lines 572 and 580 will throw if req.url is not a valid URL string. In a debug transport context, malformed URLs could occur from misconfigured connectors or error states.

🛡️ Proposed fix to handle invalid URLs gracefully
 {httpRequests.map((req) => {
+  let urlPath = req.url;
+  let urlHost = '';
+  try {
+    const parsed = new URL(req.url);
+    urlPath = parsed.pathname;
+    urlHost = parsed.host;
+  } catch {
+    // Fallback for malformed URLs
+  }
   return (
     <div 
       key={req.id}
       className={`request-entry ${req.error || (req.response_status && req.response_status >= 400) ? 'request-error' : ''} ${selectedRequest?.id === req.id ? 'selected' : ''}`}
       onClick={() => setSelectedRequest(req)}
     >
       <div className="request-header">
         <span className={`request-method method-${req.method.toLowerCase()}`}>{req.method}</span>
-        <span className="request-url">{new URL(req.url).pathname}</span>
+        <span className="request-url">{urlPath}</span>
         ...
       </div>
-      <div className="request-host">{new URL(req.url).host}</div>
+      <div className="request-host">{urlHost}</div>
     </div>
   );
 })}
tools/workbench/snapshots.go-262-291 (1)

262-291: ⚠️ Potential issue | 🟡 Minor

Surface import failures instead of silently skipping files.

Read/unmarshal errors are ignored, which can lead to partial imports without feedback. Consider returning the first error (or a summary) alongside the count.

Proposed fix
-	count := 0
+	count := 0
+	var firstErr error
 	for _, entry := range entries {
 		if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
 			continue
 		}
@@
-		data, err := os.ReadFile(path)
-		if err != nil {
-			continue
-		}
+		data, err := os.ReadFile(path)
+		if err != nil {
+			if firstErr == nil {
+				firstErr = fmt.Errorf("failed to read %s: %w", entry.Name(), err)
+			}
+			continue
+		}
@@
-		if err := json.Unmarshal(data, &snapshot); err != nil {
-			continue
-		}
+		if err := json.Unmarshal(data, &snapshot); err != nil {
+			if firstErr == nil {
+				firstErr = fmt.Errorf("invalid snapshot %s: %w", entry.Name(), err)
+			}
+			continue
+		}
@@
 	}
 
-	return count, nil
+	return count, firstErr
tools/workbench/workbench.go-253-274 (1)

253-274: ⚠️ Potential issue | 🟡 Minor

PrintUsage lists endpoints without the /api prefix.

The server routes are mounted under /api, but the banner shows /status, /fetch/*, etc. Update the text to avoid confusing users.

Proposed fix
-	fmt.Println("║   GET  /status              - Workbench status                ║")
-	fmt.Println("║   POST /install             - Install connector               ║")
-	fmt.Println("║   POST /uninstall           - Uninstall connector             ║")
-	fmt.Println("║   POST /fetch/accounts      - Fetch accounts (one page)       ║")
-	fmt.Println("║   POST /fetch/balances      - Fetch balances                  ║")
-	fmt.Println("║   POST /fetch/payments      - Fetch payments (one page)       ║")
-	fmt.Println("║   POST /fetch/all           - Run full fetch cycle            ║")
-	fmt.Println("║   POST /transfer            - Create a transfer               ║")
-	fmt.Println("║   POST /payout              - Create a payout                 ║")
-	fmt.Println("║   GET  /data/accounts       - List fetched accounts           ║")
-	fmt.Println("║   GET  /data/payments       - List fetched payments           ║")
-	fmt.Println("║   GET  /data/balances       - List fetched balances           ║")
-	fmt.Println("║   GET  /debug/logs          - View debug logs                 ║")
-	fmt.Println("║   GET  /debug/state         - View connector state            ║")
-	fmt.Println("║   GET  /debug/requests      - View HTTP requests to PSP       ║")
+	fmt.Println("║   GET  /api/status              - Workbench status            ║")
+	fmt.Println("║   POST /api/install             - Install connector           ║")
+	fmt.Println("║   POST /api/uninstall           - Uninstall connector         ║")
+	fmt.Println("║   POST /api/fetch/accounts      - Fetch accounts (one page)   ║")
+	fmt.Println("║   POST /api/fetch/balances      - Fetch balances              ║")
+	fmt.Println("║   POST /api/fetch/payments      - Fetch payments (one page)   ║")
+	fmt.Println("║   POST /api/fetch/all           - Run full fetch cycle        ║")
+	fmt.Println("║   POST /api/transfer            - Create a transfer           ║")
+	fmt.Println("║   POST /api/payout              - Create a payout             ║")
+	fmt.Println("║   GET  /api/data/accounts       - List fetched accounts       ║")
+	fmt.Println("║   GET  /api/data/payments       - List fetched payments       ║")
+	fmt.Println("║   GET  /api/data/balances       - List fetched balances       ║")
+	fmt.Println("║   GET  /api/debug/logs          - View debug logs             ║")
+	fmt.Println("║   GET  /api/debug/state         - View connector state        ║")
+	fmt.Println("║   GET  /api/debug/requests      - View HTTP requests to PSP   ║")
tools/workbench/server.go-981-999 (1)

981-999: ⚠️ Potential issue | 🟡 Minor

Avoid empty snapshot names to prevent export collisions.

CreateSnapshot accepts an empty name; export uses the name as filename, so empty/duplicate names can overwrite .json files. Consider requiring name (and operation) or defaulting to a timestamp-based name.

Proposed fix
 	var snapshot Snapshot
 	if err := json.NewDecoder(r.Body).Decode(&snapshot); err != nil {
 		s.errorResponse(w, http.StatusBadRequest, "invalid request: "+err.Error())
 		return
 	}
+	if snapshot.Name == "" {
+		snapshot.Name = fmt.Sprintf("snapshot-%s", time.Now().Format("20060102-150405"))
+	}
+	if snapshot.Operation == "" {
+		s.errorResponse(w, http.StatusBadRequest, "operation is required")
+		return
+	}
tools/workbench/snapshots.go-232-257 (1)

232-257: ⚠️ Potential issue | 🟡 Minor

Prevent snapshot export filename collisions.

sanitizeFilename(s.Name) can be empty or duplicated, which can overwrite .json files. Consider falling back to ID and/or appending the snapshot ID to ensure uniqueness.

Proposed fix
-		filename := sanitizeFilename(s.Name) + ".json"
-		path := filepath.Join(dir, filename)
+		filename := sanitizeFilename(s.Name)
+		if filename == "" {
+			filename = "snapshot"
+		}
+		filename = fmt.Sprintf("%s-%s.json", filename, s.ID)
+		path := filepath.Join(dir, filename)
tools/workbench/server.go-290-307 (1)

290-307: ⚠️ Potential issue | 🟡 Minor

Return 400 on invalid JSON payloads for fetch endpoints.

Decode errors are ignored, so malformed JSON silently becomes an empty from_payload, which is hard to debug. Consider validating request bodies and returning 400; apply the same pattern to the payments and balances handlers.

Proposed fix (apply similarly to payments/balances)
 func (s *Server) handleFetchAccounts(w http.ResponseWriter, r *http.Request) {
 	var req fetchRequest
 	if r.Body != nil {
-		json.NewDecoder(r.Body).Decode(&req)
+		if err := json.NewDecoder(r.Body).Decode(&req); err != nil && err != io.EOF {
+			s.errorResponse(w, http.StatusBadRequest, "invalid request: "+err.Error())
+			return
+		}
 	}

Also applies to: 309-326, 328-345

tools/workbench/server.go-817-834 (1)

817-834: ⚠️ Potential issue | 🟡 Minor

Replay modifications are skipped for chunked requests.

The ContentLength > 0 guard ignores bodies when the length is unknown (chunked), so client modifications are silently dropped. Consider decoding and treating io.EOF as “no modifications”.

Proposed fix
 	// Parse optional modifications
 	var modifications *ReplayRequest
-	if r.ContentLength > 0 {
-		modifications = &ReplayRequest{}
-		if err := json.NewDecoder(r.Body).Decode(modifications); err != nil {
-			s.errorResponse(w, http.StatusBadRequest, "invalid modifications: "+err.Error())
-			return
-		}
-	}
+	if r.Body != nil {
+		var mods ReplayRequest
+		if err := json.NewDecoder(r.Body).Decode(&mods); err == nil {
+			modifications = &mods
+		} else if err != io.EOF {
+			s.errorResponse(w, http.StatusBadRequest, "invalid modifications: "+err.Error())
+			return
+		}
+	}
tools/workbench/server.go-46-57 (1)

46-57: ⚠️ Potential issue | 🟡 Minor

Remove AllowCredentials: true or restrict AllowedOrigins to explicit origins.

The combination of AllowedOrigins: ["*"] and AllowCredentials: true is invalid per the CORS specification—browsers will reject credentialed cross-origin requests when Access-Control-Allow-Origin is a wildcard. Since the workbench UI is embedded and served from the same origin, credentials are not needed. Either remove AllowCredentials: true or, if credentials are truly required, use an explicit allowlist of origins instead of "*".

tools/workbench/schema.go-264-267 (1)

264-267: ⚠️ Potential issue | 🟡 Minor

Same error handling gap in SaveAllBaselines.

Lines 265 and 267 also discard JSON errors. Consider adding error handling or at minimum logging failures, perhaps returning a count of successful saves vs failures.

tools/workbench/replay.go-227-237 (1)

227-237: ⚠️ Potential issue | 🟡 Minor

Returning pointer to slice element may cause data races.

GetReplayByID returns &r.history[i], a pointer into the internal slice. If another goroutine calls Replay while the caller uses this pointer, and addToHistory truncates the slice (Line 205), the pointer may reference invalid memory or a different entry.

Consider returning a copy instead:

🔧 Proposed fix to return a copy
 func (r *Replayer) GetReplayByID(id string) *ReplayResponse {
   r.mu.RLock()
   defer r.mu.RUnlock()

   for i := range r.history {
     if r.history[i].ID == id {
-      return &r.history[i]
+      resp := r.history[i] // copy
+      return &resp
     }
   }
   return nil
 }
tools/workbench/introspect.go-246-254 (1)

246-254: ⚠️ Potential issue | 🟡 Minor

Path traversal check may be bypassable on certain platforms.

The path security check at Line 252 compares against i.basePath, but i.basePath is set via filepath.Abs() in NewIntrospector without canonicalization of symlinks. If fullPath resolves through a symlink that escapes the base directory, the HasPrefix check could pass while accessing files outside the intended directory.

Consider using filepath.EvalSymlinks on both paths before comparison for robust protection:

🛡️ Proposed fix for symlink-aware path validation
 // Security: ensure path doesn't escape connector directory
 fullPath := filepath.Join(i.basePath, relPath)
-fullPath, err := filepath.Abs(fullPath)
+fullPath, err := filepath.EvalSymlinks(fullPath)
 if err != nil {
   return nil, err
 }
-if !strings.HasPrefix(fullPath, i.basePath) {
+basePath, err := filepath.EvalSymlinks(i.basePath)
+if err != nil {
+  return nil, err
+}
+if !strings.HasPrefix(fullPath, basePath) {
   return nil, fmt.Errorf("invalid path")
 }
tools/workbench/schema.go-248-256 (1)

248-256: ⚠️ Potential issue | 🟡 Minor

JSON marshal/unmarshal errors are silently ignored in SaveBaseline.

Lines 249 and 251 perform JSON round-trip for deep copy but discard potential errors. While json.Marshal on these structures is unlikely to fail, ignoring errors masks potential issues and deviates from the pattern used elsewhere in this file.

🔧 Proposed fix to handle errors
 // Deep copy the schema
-data, _ := json.Marshal(schema)
+data, err := json.Marshal(schema)
+if err != nil {
+  return fmt.Errorf("failed to marshal schema: %w", err)
+}
 var baseline InferredSchema
-json.Unmarshal(data, &baseline)
+if err := json.Unmarshal(data, &baseline); err != nil {
+  return fmt.Errorf("failed to unmarshal schema: %w", err)
+}
 baseline.ID = fmt.Sprintf("baseline-%s-%d", operation, time.Now().UnixNano())
tools/workbench/introspect.go-470-470 (1)

470-470: ⚠️ Potential issue | 🟡 Minor

Error from filepath.Rel is silently ignored (same pattern as Line 413).

Consider handling this consistently across the file.

tools/workbench/introspect.go-413-413 (1)

413-413: ⚠️ Potential issue | 🟡 Minor

Error from filepath.Rel is silently ignored.

If filepath.Rel fails, relPath will contain an empty string or garbage value, leading to incorrect file paths in the methods slice.

🔧 Proposed fix to handle the error
-relPath, _ := filepath.Rel(i.basePath, path)
+relPath, err := filepath.Rel(i.basePath, path)
+if err != nil {
+  return nil
+}
🧹 Nitpick comments (8)
cmd/workbench.go (2)

167-178: Non-deterministic output order when listing providers.

Map iteration order in Go is non-deterministic. When users run --list-providers multiple times, the providers may appear in different orders, which could be confusing.

🔧 Proposed fix for consistent ordering
 func printProviders() error {
 	configs := registry.GetConfigs(true) // include debug connectors

 	fmt.Println("Available connector providers:")
 	fmt.Println()

+	// Sort providers for consistent output
+	providers := make([]string, 0, len(configs))
+	for provider := range configs {
+		providers = append(providers, provider)
+	}
+	sort.Strings(providers)
+
-	for provider, config := range configs {
+	for _, provider := range providers {
+		config := configs[provider]
 		fmt.Printf("  %s\n", provider)
 		fmt.Printf("    Config parameters:\n")
 		for paramName, param := range config {

183-185: Empty init function.

The empty init() function with a comment suggests future work. Consider either removing it or adding a TODO comment with more context about what's planned.

tools/workbench/ui/src/App.css (1)

208-212: Fixed viewport height calculations.

Multiple components use height: calc(100vh - 200px) which assumes a fixed header/nav height of 200px. If the header height changes or content wraps differently at various viewport widths, this could cause layout issues.

Consider using CSS custom properties for the header height to make maintenance easier:

:root {
  --header-height: 200px;
}

.data-tab {
  height: calc(100vh - var(--header-height));
}
tools/workbench/ui/src/App.tsx (1)

1-2043: Large single-file component.

This file contains 2000+ lines with multiple components. While functional, consider splitting into separate files for better maintainability:

  • components/Dashboard.tsx
  • components/Debug.tsx
  • components/Tasks.tsx
  • components/Snapshots.tsx
  • components/Analysis.tsx
  • components/Code.tsx

This is a recommended refactor for future iterations, not a blocker for this PR.

tools/workbench/tasks.go (1)

299-341: Child task attachment may be ambiguous with multiple parents.

AddChildTask selects the parent by type only; if the root tree has multiple nodes of the same type, children can attach to the wrong parent. Consider matching by name/from-payload or passing a parent ID to disambiguate.

tools/workbench/introspect.go (1)

381-387: Loop variable shadows outer receiver variable i.

At Line 382, the loop uses i as the index variable, which shadows the method receiver i *Introspector. While this works correctly because the shadowed i is only used within the loop scope, it reduces code clarity and could cause confusion during maintenance.

✏️ Rename loop variable to avoid shadowing
 methods := make([]MethodInfo, len(pluginMethods))
-for i, m := range pluginMethods {
-  methods[i] = MethodInfo{
+for idx, m := range pluginMethods {
+  methods[idx] = MethodInfo{
     Name:        m,
     Implemented: false,
   }
 }
tools/workbench/replay.go (1)

367-392: Method field is not escaped in curl command.

Header values and body are properly escaped with single-quote handling, but req.Method (Line 371) is written directly without escaping. While HTTP methods are typically simple strings, for completeness consider quoting or validating the method.

✏️ Optional: Quote the method for consistency
 func (r *Replayer) CreateCurlCommand(req ReplayRequest) string {
   var b bytes.Buffer
-  b.WriteString("curl -X ")
-  b.WriteString(req.Method)
+  b.WriteString("curl -X '")
+  b.WriteString(strings.ReplaceAll(req.Method, "'", "'\\''"))
+  b.WriteString("'")
tools/workbench/baseline.go (1)

399-408: Use strings.Join from standard library instead of custom implementation.

This function replicates strings.Join from the strings package which is already imported in schema.go but not in this file.

♻️ Replace with standard library

Add to imports:

"strings"

Then replace usage at Line 396:

-return fmt.Sprintf("%s", join(parts, " | "))
+return strings.Join(parts, " | ")

And remove the custom join function (Lines 399-408).

Comment on lines +229 to +247
func (m *BaselineManager) compareAccount(base, curr models.PSPAccount) []Change {
var changes []Change

if base.DefaultAsset != curr.DefaultAsset {
changes = append(changes, Change{Field: "default_asset", OldValue: base.DefaultAsset, NewValue: curr.DefaultAsset})
}
if (base.Name == nil) != (curr.Name == nil) || (base.Name != nil && curr.Name != nil && *base.Name != *curr.Name) {
var oldVal, newVal interface{}
if base.Name != nil {
oldVal = *base.Name
}
if curr.Name != nil {
newVal = *curr.Name
}
changes = append(changes, Change{Field: "name", OldValue: oldVal, NewValue: newVal})
}

return changes
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Pointer comparison for DefaultAsset may cause nil pointer dereference.

At Line 232, comparing base.DefaultAsset != curr.DefaultAsset performs pointer comparison when the underlying types are *string. This compares addresses, not values. If both are non-nil but point to equal strings at different addresses, this will incorrectly report a change. If one is nil and you dereference later, it would panic (though not in this specific code path).

Based on the relevant snippet showing DefaultAsset *string, you need value comparison:

🐛 Proposed fix for proper pointer comparison
 func (m *BaselineManager) compareAccount(base, curr models.PSPAccount) []Change {
   var changes []Change

-  if base.DefaultAsset != curr.DefaultAsset {
-    changes = append(changes, Change{Field: "default_asset", OldValue: base.DefaultAsset, NewValue: curr.DefaultAsset})
+  if (base.DefaultAsset == nil) != (curr.DefaultAsset == nil) ||
+     (base.DefaultAsset != nil && curr.DefaultAsset != nil && *base.DefaultAsset != *curr.DefaultAsset) {
+    var oldVal, newVal interface{}
+    if base.DefaultAsset != nil {
+      oldVal = *base.DefaultAsset
+    }
+    if curr.DefaultAsset != nil {
+      newVal = *curr.DefaultAsset
+    }
+    changes = append(changes, Change{Field: "default_asset", OldValue: oldVal, NewValue: newVal})
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (m *BaselineManager) compareAccount(base, curr models.PSPAccount) []Change {
var changes []Change
if base.DefaultAsset != curr.DefaultAsset {
changes = append(changes, Change{Field: "default_asset", OldValue: base.DefaultAsset, NewValue: curr.DefaultAsset})
}
if (base.Name == nil) != (curr.Name == nil) || (base.Name != nil && curr.Name != nil && *base.Name != *curr.Name) {
var oldVal, newVal interface{}
if base.Name != nil {
oldVal = *base.Name
}
if curr.Name != nil {
newVal = *curr.Name
}
changes = append(changes, Change{Field: "name", OldValue: oldVal, NewValue: newVal})
}
return changes
}
func (m *BaselineManager) compareAccount(base, curr models.PSPAccount) []Change {
var changes []Change
if (base.DefaultAsset == nil) != (curr.DefaultAsset == nil) ||
(base.DefaultAsset != nil && curr.DefaultAsset != nil && *base.DefaultAsset != *curr.DefaultAsset) {
var oldVal, newVal interface{}
if base.DefaultAsset != nil {
oldVal = *base.DefaultAsset
}
if curr.DefaultAsset != nil {
newVal = *curr.DefaultAsset
}
changes = append(changes, Change{Field: "default_asset", OldValue: oldVal, NewValue: newVal})
}
if (base.Name == nil) != (curr.Name == nil) || (base.Name != nil && curr.Name != nil && *base.Name != *curr.Name) {
var oldVal, newVal interface{}
if base.Name != nil {
oldVal = *base.Name
}
if curr.Name != nil {
newVal = *curr.Name
}
changes = append(changes, Change{Field: "name", OldValue: oldVal, NewValue: newVal})
}
return changes
}
🤖 Prompt for AI Agents
In `@tools/workbench/baseline.go` around lines 229 - 247, In compareAccount, the
DefaultAsset check currently does a pointer comparison (base.DefaultAsset !=
curr.DefaultAsset) which compares addresses not string values and can misreport
changes; update the logic in compareAccount to mirror the Name handling: first
detect nil-vs-non-nil differences, then if both non-nil compare
*base.DefaultAsset and *curr.DefaultAsset for value inequality, and set
OldValue/NewValue accordingly before appending the Change for Field
"default_asset".

Comment on lines +292 to +309
func (m *BaselineManager) comparePayment(base, curr models.PSPPayment) []Change {
var changes []Change

if base.Amount.Cmp(curr.Amount) != 0 {
changes = append(changes, Change{Field: "amount", OldValue: base.Amount.String(), NewValue: curr.Amount.String()})
}
if base.Asset != curr.Asset {
changes = append(changes, Change{Field: "asset", OldValue: base.Asset, NewValue: curr.Asset})
}
if base.Status != curr.Status {
changes = append(changes, Change{Field: "status", OldValue: base.Status, NewValue: curr.Status})
}
if base.Type != curr.Type {
changes = append(changes, Change{Field: "type", OldValue: base.Type, NewValue: curr.Type})
}

return changes
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Potential nil pointer dereference when comparing payment amounts.

Line 295 calls base.Amount.Cmp(curr.Amount) without nil checks. According to the model definition, Amount *big.Int is a pointer that could be nil. This will panic if either amount is nil.

🐛 Proposed fix to add nil checks
 func (m *BaselineManager) comparePayment(base, curr models.PSPPayment) []Change {
   var changes []Change

-  if base.Amount.Cmp(curr.Amount) != 0 {
-    changes = append(changes, Change{Field: "amount", OldValue: base.Amount.String(), NewValue: curr.Amount.String()})
+  baseAmountNil := base.Amount == nil
+  currAmountNil := curr.Amount == nil
+  if baseAmountNil != currAmountNil || (!baseAmountNil && !currAmountNil && base.Amount.Cmp(curr.Amount) != 0) {
+    var oldVal, newVal interface{}
+    if base.Amount != nil {
+      oldVal = base.Amount.String()
+    }
+    if curr.Amount != nil {
+      newVal = curr.Amount.String()
+    }
+    changes = append(changes, Change{Field: "amount", OldValue: oldVal, NewValue: newVal})
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (m *BaselineManager) comparePayment(base, curr models.PSPPayment) []Change {
var changes []Change
if base.Amount.Cmp(curr.Amount) != 0 {
changes = append(changes, Change{Field: "amount", OldValue: base.Amount.String(), NewValue: curr.Amount.String()})
}
if base.Asset != curr.Asset {
changes = append(changes, Change{Field: "asset", OldValue: base.Asset, NewValue: curr.Asset})
}
if base.Status != curr.Status {
changes = append(changes, Change{Field: "status", OldValue: base.Status, NewValue: curr.Status})
}
if base.Type != curr.Type {
changes = append(changes, Change{Field: "type", OldValue: base.Type, NewValue: curr.Type})
}
return changes
}
func (m *BaselineManager) comparePayment(base, curr models.PSPPayment) []Change {
var changes []Change
baseAmountNil := base.Amount == nil
currAmountNil := curr.Amount == nil
if baseAmountNil != currAmountNil || (!baseAmountNil && !currAmountNil && base.Amount.Cmp(curr.Amount) != 0) {
var oldVal, newVal interface{}
if base.Amount != nil {
oldVal = base.Amount.String()
}
if curr.Amount != nil {
newVal = curr.Amount.String()
}
changes = append(changes, Change{Field: "amount", OldValue: oldVal, NewValue: newVal})
}
if base.Asset != curr.Asset {
changes = append(changes, Change{Field: "asset", OldValue: base.Asset, NewValue: curr.Asset})
}
if base.Status != curr.Status {
changes = append(changes, Change{Field: "status", OldValue: base.Status, NewValue: curr.Status})
}
if base.Type != curr.Type {
changes = append(changes, Change{Field: "type", OldValue: base.Type, NewValue: curr.Type})
}
return changes
}
🤖 Prompt for AI Agents
In `@tools/workbench/baseline.go` around lines 292 - 309, In comparePayment, avoid
nil pointer deref by checking base.Amount and curr.Amount before calling Cmp: if
both non-nil use Cmp as before; if one is nil and the other not, append the
"amount" Change with OldValue/NewValue set to a safe string (e.g. "nil" or
curr.Amount.String() when non-nil); if both nil, treat as equal and do nothing.
Update the function comparePayment to perform these nil checks and use a
helper-safe conversion (e.g. conditional Amount.String() calls) so
Amount.String() is never called on a nil pointer.

Comment on lines +359 to +367
func (m *BaselineManager) compareBalance(base, curr models.PSPBalance) []Change {
var changes []Change

if base.Amount.Cmp(curr.Amount) != 0 {
changes = append(changes, Change{Field: "amount", OldValue: base.Amount.String(), NewValue: curr.Amount.String()})
}

return changes
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Same nil pointer dereference risk for balance amounts.

Line 362 has the same issue as comparePayment - calling .Cmp() on potentially nil *big.Int pointers.

🐛 Apply the same nil-safe comparison pattern
 func (m *BaselineManager) compareBalance(base, curr models.PSPBalance) []Change {
   var changes []Change

-  if base.Amount.Cmp(curr.Amount) != 0 {
-    changes = append(changes, Change{Field: "amount", OldValue: base.Amount.String(), NewValue: curr.Amount.String()})
+  baseAmountNil := base.Amount == nil
+  currAmountNil := curr.Amount == nil
+  if baseAmountNil != currAmountNil || (!baseAmountNil && !currAmountNil && base.Amount.Cmp(curr.Amount) != 0) {
+    var oldVal, newVal interface{}
+    if base.Amount != nil {
+      oldVal = base.Amount.String()
+    }
+    if curr.Amount != nil {
+      newVal = curr.Amount.String()
+    }
+    changes = append(changes, Change{Field: "amount", OldValue: oldVal, NewValue: newVal})
   }

   return changes
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (m *BaselineManager) compareBalance(base, curr models.PSPBalance) []Change {
var changes []Change
if base.Amount.Cmp(curr.Amount) != 0 {
changes = append(changes, Change{Field: "amount", OldValue: base.Amount.String(), NewValue: curr.Amount.String()})
}
return changes
}
func (m *BaselineManager) compareBalance(base, curr models.PSPBalance) []Change {
var changes []Change
baseAmountNil := base.Amount == nil
currAmountNil := curr.Amount == nil
if baseAmountNil != currAmountNil || (!baseAmountNil && !currAmountNil && base.Amount.Cmp(curr.Amount) != 0) {
var oldVal, newVal interface{}
if base.Amount != nil {
oldVal = base.Amount.String()
}
if curr.Amount != nil {
newVal = curr.Amount.String()
}
changes = append(changes, Change{Field: "amount", OldValue: oldVal, NewValue: newVal})
}
return changes
}
🤖 Prompt for AI Agents
In `@tools/workbench/baseline.go` around lines 359 - 367, In
BaselineManager.compareBalance, avoid nil dereference by first checking
models.PSPBalance.Amount on both base and curr (e.g., if base.Amount == nil &&
curr.Amount == nil do nothing; if one is nil treat as change and use a "nil"
placeholder for OldValue/NewValue; otherwise use base.Amount.Cmp(curr.Amount)
and Amount.String()), ensuring you only call .Cmp() and .String() when the
*big.Int is non-nil; update the Change for Field "amount" accordingly.

Comment on lines +87 to +91
// nextID generates the next unique ID.
func (d *DebugStore) nextID() string {
d.idCounter++
return time.Now().Format("20060102-150405") + "-" + string(rune('A'+d.idCounter%26))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix ID collisions and data races in nextID.

IDs currently repeat after 26 events/second and idCounter is unsynchronized, so concurrent logging can race and produce duplicates. This breaks lookups by ID and can corrupt debug correlation.

🔒 Suggested fix (atomic counter + nanosecond timestamp)
 import (
 	"encoding/json"
+	"fmt"
 	"sync"
+	"sync/atomic"
 	"time"
 )
@@
 func (d *DebugStore) nextID() string {
-	d.idCounter++
-	return time.Now().Format("20060102-150405") + "-" + string(rune('A'+d.idCounter%26))
+	n := atomic.AddInt64(&d.idCounter, 1)
+	return fmt.Sprintf("%d-%d", time.Now().UnixNano(), n)
 }

Also applies to: 205-216

🤖 Prompt for AI Agents
In `@tools/workbench/debug.go` around lines 87 - 91, The nextID implementation in
DebugStore uses an unsynchronized idCounter and a second-precision timestamp
causing collisions; change DebugStore.nextID to use an atomic counter (e.g.,
sync/atomic on d.idCounter) and include a higher-resolution timestamp
(time.Now().UnixNano()) or append the atomic value to the timestamp to guarantee
uniqueness and avoid races; update references to d.idCounter and the nextID
method to use atomic.AddUint64/LoadUint64 (or equivalent) and remove non-atomic
increments to prevent data races and ID collisions.

Comment on lines +624 to +635
callID := e.debug.LogPluginCall("CreateWebhooks", models.CreateWebhooksRequest{
FromPayload: fromPayload,
ConnectorID: e.connectorID.String(),
WebhookBaseUrl: "http://localhost:8080/webhooks",
})
start := time.Now()

resp, err := e.plugin.CreateWebhooks(ctx, models.CreateWebhooksRequest{
FromPayload: fromPayload,
ConnectorID: e.connectorID.String(),
WebhookBaseUrl: "http://localhost:8080/webhooks",
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Webhook base URL is hard-coded to localhost:8080.

This breaks when ListenAddr is changed, leading to incorrect webhook registrations. Consider injecting the base URL from Config into the engine.

Proposed fix (engine-side; update call sites accordingly)
 type Engine struct {
@@
 	logger      logging.Logger
+	webhookBaseURL string
@@
-func NewEngine(
+func NewEngine(
 	connectorID models.ConnectorID,
 	plugin models.Plugin,
 	storage *MemoryStorage,
 	debug *DebugStore,
 	tasks *TaskTracker,
 	logger logging.Logger,
+	webhookBaseURL string,
 ) *Engine {
 	return &Engine{
@@
-		logger:               logger,
+		logger:               logger,
+		webhookBaseURL:       webhookBaseURL,
@@
 	callID := e.debug.LogPluginCall("CreateWebhooks", models.CreateWebhooksRequest{
@@
-		WebhookBaseUrl: "http://localhost:8080/webhooks",
+		WebhookBaseUrl: e.webhookBaseURL,
 	})
@@
 	resp, err := e.plugin.CreateWebhooks(ctx, models.CreateWebhooksRequest{
@@
-		WebhookBaseUrl: "http://localhost:8080/webhooks",
+		WebhookBaseUrl: e.webhookBaseURL,
 	})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
callID := e.debug.LogPluginCall("CreateWebhooks", models.CreateWebhooksRequest{
FromPayload: fromPayload,
ConnectorID: e.connectorID.String(),
WebhookBaseUrl: "http://localhost:8080/webhooks",
})
start := time.Now()
resp, err := e.plugin.CreateWebhooks(ctx, models.CreateWebhooksRequest{
FromPayload: fromPayload,
ConnectorID: e.connectorID.String(),
WebhookBaseUrl: "http://localhost:8080/webhooks",
})
callID := e.debug.LogPluginCall("CreateWebhooks", models.CreateWebhooksRequest{
FromPayload: fromPayload,
ConnectorID: e.connectorID.String(),
WebhookBaseUrl: e.webhookBaseURL,
})
start := time.Now()
resp, err := e.plugin.CreateWebhooks(ctx, models.CreateWebhooksRequest{
FromPayload: fromPayload,
ConnectorID: e.connectorID.String(),
WebhookBaseUrl: e.webhookBaseURL,
})
🤖 Prompt for AI Agents
In `@tools/workbench/engine.go` around lines 624 - 635, The CreateWebhooks call is
using a hard-coded WebhookBaseUrl ("http://localhost:8080/webhooks") in both
e.debug.LogPluginCall and e.plugin.CreateWebhooks; change these to use the
engine's configuration-derived base URL instead (e.g. build from
e.config.ListenAddr or an injected WebhookBaseURL field), replace the literal
with the computed base + "/webhooks", and propagate this change to any call
sites that construct CreateWebhooksRequest so both LogPluginCall and
CreateWebhooks use the same config-driven URL (refer to e.debug.LogPluginCall,
e.plugin.CreateWebhooks and the CreateWebhooksRequest WebhookBaseUrl field).

Comment on lines +664 to +816
// FetchAccountsOnePage fetches one page of accounts.
func (e *Engine) FetchAccountsOnePage(ctx context.Context, fromPayload json.RawMessage) (*models.FetchNextAccountsResponse, error) {
e.mu.Lock()
state := e.accountsFetchState
if state == nil {
state = &fetchState{HasMore: true}
e.accountsFetchState = state
}
currentState := state.State
e.mu.Unlock()

if !state.HasMore {
return &models.FetchNextAccountsResponse{HasMore: false}, nil
}

callID := e.debug.LogPluginCall("FetchNextAccounts", models.FetchNextAccountsRequest{
FromPayload: fromPayload,
State: currentState,
PageSize: e.pageSize,
})
start := time.Now()

resp, err := e.plugin.FetchNextAccounts(ctx, models.FetchNextAccountsRequest{
FromPayload: fromPayload,
State: currentState,
PageSize: e.pageSize,
})

e.debug.LogPluginResult(callID, resp, time.Since(start), err)

if err != nil {
return nil, err
}

e.mu.Lock()
state.State = resp.NewState
state.HasMore = resp.HasMore
state.PagesFetched++
state.TotalItems += len(resp.Accounts)
e.mu.Unlock()

if len(resp.Accounts) > 0 {
e.storage.StoreAccounts(resp.Accounts)
}

return &resp, nil
}

// FetchPaymentsOnePage fetches one page of payments.
func (e *Engine) FetchPaymentsOnePage(ctx context.Context, fromPayload json.RawMessage) (*models.FetchNextPaymentsResponse, error) {
key := string(fromPayload)
if key == "" {
key = "_root"
}

e.mu.Lock()
state, ok := e.paymentsFetchState[key]
if !ok {
state = &fetchState{HasMore: true}
e.paymentsFetchState[key] = state
}
currentState := state.State
e.mu.Unlock()

if !state.HasMore {
return &models.FetchNextPaymentsResponse{HasMore: false}, nil
}

callID := e.debug.LogPluginCall("FetchNextPayments", models.FetchNextPaymentsRequest{
FromPayload: fromPayload,
State: currentState,
PageSize: e.pageSize,
})
start := time.Now()

resp, err := e.plugin.FetchNextPayments(ctx, models.FetchNextPaymentsRequest{
FromPayload: fromPayload,
State: currentState,
PageSize: e.pageSize,
})

e.debug.LogPluginResult(callID, resp, time.Since(start), err)

if err != nil {
return nil, err
}

e.mu.Lock()
state.State = resp.NewState
state.HasMore = resp.HasMore
state.PagesFetched++
state.TotalItems += len(resp.Payments)
e.mu.Unlock()

if len(resp.Payments) > 0 {
e.storage.StorePayments(resp.Payments)
}

return &resp, nil
}

// FetchBalancesOnePage fetches one page of balances.
func (e *Engine) FetchBalancesOnePage(ctx context.Context, fromPayload json.RawMessage) (*models.FetchNextBalancesResponse, error) {
key := string(fromPayload)
if key == "" {
key = "_root"
}

e.mu.Lock()
state, ok := e.balancesFetchState[key]
if !ok {
state = &fetchState{HasMore: true}
e.balancesFetchState[key] = state
}
currentState := state.State
e.mu.Unlock()

if !state.HasMore {
return &models.FetchNextBalancesResponse{HasMore: false}, nil
}

callID := e.debug.LogPluginCall("FetchNextBalances", models.FetchNextBalancesRequest{
FromPayload: fromPayload,
State: currentState,
PageSize: e.pageSize,
})
start := time.Now()

resp, err := e.plugin.FetchNextBalances(ctx, models.FetchNextBalancesRequest{
FromPayload: fromPayload,
State: currentState,
PageSize: e.pageSize,
})

e.debug.LogPluginResult(callID, resp, time.Since(start), err)

if err != nil {
return nil, err
}

e.mu.Lock()
state.State = resp.NewState
state.HasMore = resp.HasMore
state.PagesFetched++
state.TotalItems += len(resp.Balances)
e.mu.Unlock()

if len(resp.Balances) > 0 {
e.storage.StoreBalances(resp.Balances)
}

return &resp, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

FetchOnePage helpers are not concurrency-safe.

state fields are read outside the lock, and concurrent calls (or ResetFetchState) can race, causing duplicate page fetches or corrupted state. Please serialize per-key fetches and snapshot pageSize under lock before calling the plugin. Apply the same pattern to the payments and balances helpers.

Proposed fix (pattern; apply to payments/balances)
 type fetchState struct {
 	State    json.RawMessage
 	HasMore  bool
 	PagesFetched int
 	TotalItems   int
+	InFlight     bool
 }
@@
 func (e *Engine) FetchAccountsOnePage(ctx context.Context, fromPayload json.RawMessage) (*models.FetchNextAccountsResponse, error) {
 	e.mu.Lock()
 	state := e.accountsFetchState
 	if state == nil {
 		state = &fetchState{HasMore: true}
 		e.accountsFetchState = state
 	}
-	currentState := state.State
-	e.mu.Unlock()
+	if state.InFlight {
+		e.mu.Unlock()
+		return nil, fmt.Errorf("fetch already in progress")
+	}
+	if !state.HasMore {
+		e.mu.Unlock()
+		return &models.FetchNextAccountsResponse{HasMore: false}, nil
+	}
+	state.InFlight = true
+	currentState := state.State
+	pageSize := e.pageSize
+	e.mu.Unlock()
+
+	defer func() {
+		e.mu.Lock()
+		state.InFlight = false
+		e.mu.Unlock()
+	}()
 
-	if !state.HasMore {
-		return &models.FetchNextAccountsResponse{HasMore: false}, nil
-	}
-
 	callID := e.debug.LogPluginCall("FetchNextAccounts", models.FetchNextAccountsRequest{
 		FromPayload: fromPayload,
 		State:       currentState,
-		PageSize:    e.pageSize,
+		PageSize:    pageSize,
 	})
@@
 	resp, err := e.plugin.FetchNextAccounts(ctx, models.FetchNextAccountsRequest{
 		FromPayload: fromPayload,
 		State:       currentState,
-		PageSize:    e.pageSize,
+		PageSize:    pageSize,
 	})
🤖 Prompt for AI Agents
In `@tools/workbench/engine.go` around lines 664 - 816, The helpers
(FetchAccountsOnePage, FetchPaymentsOnePage, FetchBalancesOnePage) race because
they read and mutate fetchState fields outside the mutex; to fix, serialize
per-key fetches by adding an in-flight guard on fetchState (e.g. state.InFlight)
and use e.mu to: 1) lock, check state.HasMore and state.InFlight, set
state.InFlight=true and snapshot currentState and pageSize into locals, then
unlock; 2) call the plugin with the snapped values; and 3) re-lock to update
state.State, state.HasMore, state.PagesFetched, state.TotalItems and clear
state.InFlight (also clear on error) and then unlock; apply the same pattern to
payments and balances and ensure ResetFetchState interacts safely with
state.InFlight.

Comment on lines +536 to +545
func (s *Server) handleHTTPCaptureStatus(w http.ResponseWriter, r *http.Request) {
transport := s.workbench.Transport()
enabled := false
if transport != nil {
enabled = transport.IsEnabled()
}
s.jsonResponse(w, http.StatusOK, map[string]interface{}{
"enabled": enabled,
"max_body_size": transport.MaxBodySize,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard against nil transport in /debug/http-capture.

If Transport() returns nil, dereferencing transport.MaxBodySize will panic and crash the server.

Proposed fix
 func (s *Server) handleHTTPCaptureStatus(w http.ResponseWriter, r *http.Request) {
 	transport := s.workbench.Transport()
-	enabled := false
-	if transport != nil {
-		enabled = transport.IsEnabled()
-	}
+	enabled := false
+	maxBodySize := 0
+	if transport != nil {
+		enabled = transport.IsEnabled()
+		maxBodySize = transport.MaxBodySize
+	}
 	s.jsonResponse(w, http.StatusOK, map[string]interface{}{
-		"enabled":       enabled,
-		"max_body_size": transport.MaxBodySize,
+		"enabled":       enabled,
+		"max_body_size": maxBodySize,
 	})
 }
🤖 Prompt for AI Agents
In `@tools/workbench/server.go` around lines 536 - 545, handleHTTPCaptureStatus
currently dereferences transport.MaxBodySize without checking for nil; update
Server.handleHTTPCaptureStatus to safely handle a nil transport returned by
s.workbench.Transport() by computing a maxBodySize variable (default 0 or
appropriate sentinel) only when transport != nil, use transport.IsEnabled() as
now (guarding when transport is nil), and pass that maxBodySize into
s.jsonResponse along with the enabled flag so no panic occurs when Transport()
is nil.

Comment on lines 1067 to 1095
func (s *Server) handleExportSnapshots(w http.ResponseWriter, r *http.Request) {
snapshots := s.workbench.Snapshots()
if snapshots == nil {
s.errorResponse(w, http.StatusInternalServerError, "snapshots not available")
return
}

var req struct {
Directory string `json:"directory"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.errorResponse(w, http.StatusBadRequest, "invalid request: "+err.Error())
return
}

if req.Directory == "" {
s.errorResponse(w, http.StatusBadRequest, "directory is required")
return
}

if err := snapshots.ExportToDir(req.Directory); err != nil {
s.errorResponse(w, http.StatusInternalServerError, err.Error())
return
}

s.jsonResponse(w, http.StatusOK, map[string]string{
"status": "exported",
"directory": req.Directory,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Constrain export/import/test paths to a safe base directory.

Directory paths are taken directly from the request and used for read/write. If the server is bound to a non-local address, this enables arbitrary filesystem access. Suggest restricting to Config.PersistPath (or a dedicated base dir) and rejecting paths outside it; apply to snapshot export/import and test generation.

Example guard (apply similarly to import/test generation)
 	if req.Directory == "" {
 		s.errorResponse(w, http.StatusBadRequest, "directory is required")
 		return
 	}
+	baseDir := s.workbench.Config().PersistPath
+	if baseDir == "" {
+		s.errorResponse(w, http.StatusBadRequest, "persist_path is not configured")
+		return
+	}
+	dir := filepath.Clean(req.Directory)
+	if !filepath.IsAbs(dir) {
+		dir = filepath.Join(baseDir, dir)
+	}
+	baseDir = filepath.Clean(baseDir)
+	if dir != baseDir && !strings.HasPrefix(dir, baseDir+string(os.PathSeparator)) {
+		s.errorResponse(w, http.StatusBadRequest, "directory must be within persist_path")
+		return
+	}
 
-	if err := snapshots.ExportToDir(req.Directory); err != nil {
+	if err := snapshots.ExportToDir(dir); err != nil {
 		s.errorResponse(w, http.StatusInternalServerError, err.Error())
 		return
 	}

Also applies to: 1098-1127, 1148-1177, 1651-1679

🤖 Prompt for AI Agents
In `@tools/workbench/server.go` around lines 1067 - 1095, The snapshot export
handler (handleExportSnapshots) currently accepts arbitrary Directory paths from
the request and passes them to snapshots.ExportToDir; fix this by constraining
all file operations to the server's safe base directory (e.g.,
s.config.PersistPath or a dedicated baseDir): resolve the requested path with
filepath.Join(baseDir, req.Directory), call filepath.Abs on both baseDir and the
joined path, verify the joined path is inside the base (e.g., using filepath.Rel
and ensuring it does not start with ".."), and reject requests with a 400/403 if
the check fails; apply the same validation pattern to the snapshot import
handler (the import-related methods around 1098-1127), the test generation
handlers (around 1148-1177), and anywhere else file paths are accepted
(1651-1679) before calling functions like snapshots.ExportToDir or similar.

Comment on lines +350 to +365
// Export exports all storage data as a snapshot.
func (s *MemoryStorage) Export() StorageSnapshot {
s.mu.RLock()
defer s.mu.RUnlock()

return StorageSnapshot{
Accounts: s.accounts,
Payments: s.payments,
Balances: s.balances,
ExternalAccounts: s.externalAccounts,
Others: s.others,
States: s.states,
TasksTree: s.tasksTree,
WebhookConfigs: s.webhookConfigs,
ExportedAt: time.Now(),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Export returns live maps and can panic under concurrent writes.

Export() returns internal map references, and handleExport encodes after the lock is released. If any writer updates a map concurrently, JSON encoding can panic with “concurrent map iteration and map write”. Please deep-copy maps under the lock before returning.

Proposed fix (deep-copy maps inside Export)
 func (s *MemoryStorage) Export() StorageSnapshot {
 	s.mu.RLock()
 	defer s.mu.RUnlock()
 
+	accounts := make(map[string]models.PSPAccount, len(s.accounts))
+	for k, v := range s.accounts {
+		accounts[k] = v
+	}
+	payments := make(map[string]models.PSPPayment, len(s.payments))
+	for k, v := range s.payments {
+		payments[k] = v
+	}
+	balances := make(map[string]models.PSPBalance, len(s.balances))
+	for k, v := range s.balances {
+		balances[k] = v
+	}
+	externalAccounts := make(map[string]models.PSPAccount, len(s.externalAccounts))
+	for k, v := range s.externalAccounts {
+		externalAccounts[k] = v
+	}
+	others := make(map[string][]models.PSPOther, len(s.others))
+	for k, v := range s.others {
+		others[k] = append([]models.PSPOther(nil), v...)
+	}
+	states := make(map[string]json.RawMessage, len(s.states))
+	for k, v := range s.states {
+		states[k] = append(json.RawMessage(nil), v...)
+	}
+	webhooks := append([]models.PSPWebhookConfig(nil), s.webhookConfigs...)
+
 	return StorageSnapshot{
-		Accounts:         s.accounts,
-		Payments:         s.payments,
-		Balances:         s.balances,
-		ExternalAccounts: s.externalAccounts,
-		Others:           s.others,
-		States:           s.states,
+		Accounts:         accounts,
+		Payments:         payments,
+		Balances:         balances,
+		ExternalAccounts: externalAccounts,
+		Others:           others,
+		States:           states,
 		TasksTree:        s.tasksTree,
-		WebhookConfigs:   s.webhookConfigs,
+		WebhookConfigs:   webhooks,
 		ExportedAt:       time.Now(),
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Export exports all storage data as a snapshot.
func (s *MemoryStorage) Export() StorageSnapshot {
s.mu.RLock()
defer s.mu.RUnlock()
return StorageSnapshot{
Accounts: s.accounts,
Payments: s.payments,
Balances: s.balances,
ExternalAccounts: s.externalAccounts,
Others: s.others,
States: s.states,
TasksTree: s.tasksTree,
WebhookConfigs: s.webhookConfigs,
ExportedAt: time.Now(),
}
// Export exports all storage data as a snapshot.
func (s *MemoryStorage) Export() StorageSnapshot {
s.mu.RLock()
defer s.mu.RUnlock()
accounts := make(map[string]models.PSPAccount, len(s.accounts))
for k, v := range s.accounts {
accounts[k] = v
}
payments := make(map[string]models.PSPPayment, len(s.payments))
for k, v := range s.payments {
payments[k] = v
}
balances := make(map[string]models.PSPBalance, len(s.balances))
for k, v := range s.balances {
balances[k] = v
}
externalAccounts := make(map[string]models.PSPAccount, len(s.externalAccounts))
for k, v := range s.externalAccounts {
externalAccounts[k] = v
}
others := make(map[string][]models.PSPOther, len(s.others))
for k, v := range s.others {
others[k] = append([]models.PSPOther(nil), v...)
}
states := make(map[string]json.RawMessage, len(s.states))
for k, v := range s.states {
states[k] = append(json.RawMessage(nil), v...)
}
webhooks := append([]models.PSPWebhookConfig(nil), s.webhookConfigs...)
return StorageSnapshot{
Accounts: accounts,
Payments: payments,
Balances: balances,
ExternalAccounts: externalAccounts,
Others: others,
States: states,
TasksTree: s.tasksTree,
WebhookConfigs: webhooks,
ExportedAt: time.Now(),
}
}
🤖 Prompt for AI Agents
In `@tools/workbench/storage.go` around lines 350 - 365, The Export method
(MemoryStorage.Export) currently returns live map references causing possible
"concurrent map iteration and map write" panics; fix it by deep-copying each
internal map/slice (Accounts, Payments, Balances, ExternalAccounts, Others,
States, TasksTree, WebhookConfigs) while holding s.mu.RLock() and return the
snapshot built from these copies (preserving ExportedAt = time.Now()); ensure
copies are shallow-deep as appropriate (new maps with copied entries, and cloned
nested map values if they are maps/slices) so the caller/encoder works on
independent data after the lock is released.

Comment on lines +113 to +134
// Capture response body
if resp.Body != nil {
bodyBytes, err := io.ReadAll(resp.Body)
if err == nil {
resp.Body.Close()
// Restore the body for the caller
resp.Body = io.NopCloser(bytes.NewReader(bodyBytes))
entry.ResponseBody = t.truncateBody(bodyBytes)
}
}

t.Debug.LogHTTPRequest(entry)

// Auto-infer schema from JSON responses
if t.Schemas != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
contentType := resp.Header.Get("Content-Type")
if strings.Contains(contentType, "application/json") && entry.ResponseBody != "" {
// Extract operation from URL path
operation := extractOperationFromURL(req.URL.Path)
t.Schemas.InferFromJSON(operation, req.URL.Path, req.Method, []byte(entry.ResponseBody))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Schema inference may receive truncated JSON body.

The response body is truncated by truncateBody() on Line 120 before being stored in entry.ResponseBody. However, on Line 132, this potentially truncated body is passed to InferFromJSON(). If the body exceeds MaxBodySize (64KB), the schema inference will receive invalid/incomplete JSON, leading to parsing errors or incorrect schema detection.

🐛 Proposed fix to use original body for schema inference
 	// Capture response body
 	if resp.Body != nil {
 		bodyBytes, err := io.ReadAll(resp.Body)
 		if err == nil {
 			resp.Body.Close()
 			// Restore the body for the caller
 			resp.Body = io.NopCloser(bytes.NewReader(bodyBytes))
 			entry.ResponseBody = t.truncateBody(bodyBytes)
+
+			// Auto-infer schema from JSON responses (use original body, not truncated)
+			if t.Schemas != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
+				contentType := resp.Header.Get("Content-Type")
+				if strings.Contains(contentType, "application/json") && len(bodyBytes) > 0 {
+					operation := extractOperationFromURL(req.URL.Path)
+					t.Schemas.InferFromJSON(operation, req.URL.Path, req.Method, bodyBytes)
+				}
+			}
 		}
 	}

 	t.Debug.LogHTTPRequest(entry)

-	// Auto-infer schema from JSON responses
-	if t.Schemas != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
-		contentType := resp.Header.Get("Content-Type")
-		if strings.Contains(contentType, "application/json") && entry.ResponseBody != "" {
-			// Extract operation from URL path
-			operation := extractOperationFromURL(req.URL.Path)
-			t.Schemas.InferFromJSON(operation, req.URL.Path, req.Method, []byte(entry.ResponseBody))
-		}
-	}
-
 	return resp, nil
🤖 Prompt for AI Agents
In `@tools/workbench/transport.go` around lines 113 - 134, The schema inference is
using the truncated string (entry.ResponseBody) which can produce invalid JSON;
keep using the original full byte slice read from resp.Body for InferFromJSON.
After reading bodyBytes and restoring resp.Body (in the block that calls
truncateBody and sets entry.ResponseBody), pass the original bodyBytes (not
entry.ResponseBody) into t.Schemas.InferFromJSON (ensuring you still convert to
bytes when needed), and only use truncateBody for logging/storage; reference the
functions/variables truncateBody, entry.ResponseBody, InferFromJSON, resp.Body
and the surrounding logic in transport.go to make this change.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@tools/workbench/engine.go`:
- Around line 264-267: When WaitForStep() returns false you currently return
immediately leaving the task and stats in a running state; before returning from
each such block (the one shown and the similar blocks at the other locations)
mark the execution/task as stopped/failed or skipped and update the runner stats
and currentTask accordingly (e.tasks, WaitForStep(), e.currentTask and whatever
stats structure e uses) — e.g. set the task status to skipped/failed, increment
the failed/skipped counter, decrement any running counter and clear
e.currentTask — then return the error; apply this change to the shown block and
the other locations listed.
- Around line 98-103: SetPageSize currently allows zero or negative values which
can cause invalid requests or loops; modify Engine.SetPageSize to guard against
non-positive inputs by clamping the incoming size to a safe minimum (e.g., if
size < 1 then size = 1) before assigning e.pageSize while still holding e.mu;
update any callers or tests if they rely on the old behavior. Reference:
Engine.SetPageSize, e.mu, and e.pageSize.

In `@tools/workbench/workbench.go`:
- Around line 244-246: The transport's Schemas field is being overwritten on
each CreateConnector call (w.transport.Schemas = schemas), causing only the last
connector's SchemaManager to be used; change the logic in CreateConnector to
merge the new connector's SchemaManager/schemas into the transport's existing
Schemas instead of replacing them — e.g., retrieve w.transport.Schemas, combine
or append entries from the connector's SchemaManager (or convert to a map keyed
by connector ID) so all connectors' schemas are preserved and available for
auto-inference; update any related initialization in CreateConnector and ensure
SchemaManager merging respects deduplication and connector identity.
🧹 Nitpick comments (10)
tools/workbench/introspect.go (1)

40-46: Ignored error from filepath.Abs.

The error from filepath.Abs at Line 43 is discarded. While unlikely to fail for valid paths, consider logging or handling the error for robustness.

Optional: handle the error
 	for _, p := range possiblePaths {
 		if info, err := os.Stat(p); err == nil && info.IsDir() {
-			basePath, _ = filepath.Abs(p)
+			absPath, err := filepath.Abs(p)
+			if err == nil {
+				basePath = absPath
+			}
 			break
 		}
 	}
tools/workbench/ui/src/api.ts (1)

636-640: Mutable module-level state may cause issues in concurrent contexts.

selectedConnectorId is exported as mutable module state. While this works for single-page apps, it could lead to race conditions if multiple parts of the UI update it simultaneously. Consider using React context or a state management library for cleaner state handling.

tools/workbench/server.go (2)

50-57: Permissive CORS configuration with credentials.

AllowedOrigins: []string{"*"} combined with AllowCredentials: true is overly permissive. While acceptable for a local development tool, ensure the server is only bound to localhost in production usage.

Consider restricting CORS for non-localhost bindings
+	// For development workbench, restrict CORS when not on localhost
+	allowedOrigins := []string{"*"}
+	if !strings.HasPrefix(cfg.ListenAddr, "127.0.0.1") && !strings.HasPrefix(cfg.ListenAddr, "localhost") {
+		allowedOrigins = []string{"http://localhost:*", "http://127.0.0.1:*"}
+	}
 	r.Use(cors.Handler(cors.Options{
-		AllowedOrigins:   []string{"*"},
+		AllowedOrigins:   allowedOrigins,
 		AllowedMethods:   []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},

482-485: Silently ignoring JSON decode errors.

When r.Body != nil, the decode error is ignored. If the body contains malformed JSON, the request proceeds with zero-value req. Consider returning an error for invalid JSON.

Proposed fix
 	var req fetchRequest
-	if r.Body != nil {
-		json.NewDecoder(r.Body).Decode(&req)
+	if r.Body != nil && r.ContentLength > 0 {
+		if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+			s.errorResponse(w, http.StatusBadRequest, "invalid request body: "+err.Error())
+			return
+		}
 	}

Also applies to lines 507-510, 532-535, 557-560.

tools/workbench/workbench.go (2)

214-218: Truncated UUID may cause collisions.

uuid.New().String()[:8] produces only 8 hex characters (~4 bytes of entropy). While collision probability is low for typical workbench usage, consider using the full UUID or a longer prefix for robustness.


29-31: Global mutable state for debug transport.

globalDebugTransport is a package-level variable that could complicate testing and concurrent workbench instances. For a development tool this is acceptable, but note this limits running multiple Workbench instances in the same process.

tools/workbench/storage.go (2)

257-263: GetOthers returns internal slice reference.

Returning s.others[name] directly exposes the internal slice to callers who could mutate it. Consider returning a copy for safety.

Proposed fix
 func (s *MemoryStorage) GetOthers(name string) []models.PSPOther {
 	s.mu.RLock()
 	defer s.mu.RUnlock()

-	return s.others[name]
+	src := s.others[name]
+	if src == nil {
+		return nil
+	}
+	return append([]models.PSPOther(nil), src...)
 }

351-357: GetWebhookConfigs returns internal slice reference.

Similar to GetOthers, this returns a reference to the internal slice that callers could mutate.

Proposed fix
 func (s *MemoryStorage) GetWebhookConfigs() []models.PSPWebhookConfig {
 	s.mu.RLock()
 	defer s.mu.RUnlock()

-	return s.webhookConfigs
+	if s.webhookConfigs == nil {
+		return nil
+	}
+	return append([]models.PSPWebhookConfig(nil), s.webhookConfigs...)
 }
tools/workbench/tasks.go (2)

143-149: Avoid exposing the mutable task tree directly.
Returning the internal slice allows callers to mutate nodes without locks, risking data races or state corruption. Prefer returning a copy or a summary view.

Proposed fix (shallow copy)
 func (t *TaskTracker) GetTaskTree() []*TaskNode {
 	t.mu.RLock()
 	defer t.mu.RUnlock()
-
-	return t.rootTree
+	out := make([]*TaskNode, len(t.rootTree))
+	copy(out, t.rootTree)
+	return out
 }

379-405: Consider clearing step signals on reset.
If a step signal is already queued, the next WaitForStep after Reset can auto-advance unintentionally. Recreating or draining stepChan/stopChan here would keep resets clean.

Optional reset tweak
 func (t *TaskTracker) Reset() {
 	t.mu.Lock()
 	defer t.mu.Unlock()
@@
 	t.executions = make([]TaskExecution, 0)
 	t.currentTask = nil
 	t.stats = TaskStats{}
+	t.stepChan = make(chan struct{}, 1)
+	t.stopChan = make(chan struct{})
 }

Comment on lines +98 to +103
// SetPageSize sets the page size for fetching.
func (e *Engine) SetPageSize(size int) {
e.mu.Lock()
defer e.mu.Unlock()
e.pageSize = size
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard against non-positive page sizes.
SetPageSize(0) (or negative) can lead to invalid plugin requests or loops. Consider clamping or rejecting invalid values.

Possible fix
 func (e *Engine) SetPageSize(size int) {
 	e.mu.Lock()
 	defer e.mu.Unlock()
-	e.pageSize = size
+	if size <= 0 {
+		e.pageSize = DefaultPageSize
+		return
+	}
+	e.pageSize = size
 }
🤖 Prompt for AI Agents
In `@tools/workbench/engine.go` around lines 98 - 103, SetPageSize currently
allows zero or negative values which can cause invalid requests or loops; modify
Engine.SetPageSize to guard against non-positive inputs by clamping the incoming
size to a safe minimum (e.g., if size < 1 then size = 1) before assigning
e.pageSize while still holding e.mu; update any callers or tests if they rely on
the old behavior. Reference: Engine.SetPageSize, e.mu, and e.pageSize.

Comment on lines +264 to +267
// Wait for step signal if in step mode
if e.tasks != nil && !e.tasks.WaitForStep() {
return fmt.Errorf("execution stopped")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Step-mode stop path leaves tasks “running.”
When WaitForStep() returns false, you return without completing the current task, leaving currentTask and stats in a running state. Consider marking the exec as failed/skipped before returning.

Example fix (apply to each block)
-	if e.tasks != nil && !e.tasks.WaitForStep() {
-		return fmt.Errorf("execution stopped")
-	}
+	if e.tasks != nil && !e.tasks.WaitForStep() {
+		if exec != nil {
+			e.tasks.CompleteTask(exec, 0, false, fmt.Errorf("execution stopped"))
+		}
+		return fmt.Errorf("execution stopped")
+	}

Also applies to: 355-357, 432-434, 509-511, 591-593, 671-674

🤖 Prompt for AI Agents
In `@tools/workbench/engine.go` around lines 264 - 267, When WaitForStep() returns
false you currently return immediately leaving the task and stats in a running
state; before returning from each such block (the one shown and the similar
blocks at the other locations) mark the execution/task as stopped/failed or
skipped and update the runner stats and currentTask accordingly (e.tasks,
WaitForStep(), e.currentTask and whatever stats structure e uses) — e.g. set the
task status to skipped/failed, increment the failed/skipped counter, decrement
any running counter and clear e.currentTask — then return the error; apply this
change to the shown block and the other locations listed.

Comment on lines +244 to +246
// Link schemas to transport for auto-inference
w.transport.Schemas = schemas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Only last connector's SchemaManager is linked to transport.

At Line 245, w.transport.Schemas = schemas overwrites the previous assignment on each CreateConnector call. This means only the most recently created connector will have its schemas auto-inferred from HTTP traffic. Consider using a multi-connector aware schema inference approach.

🤖 Prompt for AI Agents
In `@tools/workbench/workbench.go` around lines 244 - 246, The transport's Schemas
field is being overwritten on each CreateConnector call (w.transport.Schemas =
schemas), causing only the last connector's SchemaManager to be used; change the
logic in CreateConnector to merge the new connector's SchemaManager/schemas into
the transport's existing Schemas instead of replacing them — e.g., retrieve
w.transport.Schemas, combine or append entries from the connector's
SchemaManager (or convert to a map keyed by connector ID) so all connectors'
schemas are preserved and available for auto-inference; update any related
initialization in CreateConnector and ensure SchemaManager merging respects
deduplication and connector identity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why is schema stored in the transport if it's also stored in the connector instance?

"syscall"

"github.com/formancehq/go-libs/v3/logging"
"github.com/formancehq/payments/internal/connectors/plugins/registry"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaks the CI, can be removed for local import

"github.com/go-chi/cors"
)

//go:embed ui/dist/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Payment does not build anymore by itself because we now need to compile the UI, that's not great.

In the workbench README we should probably add:

  • install the dependencies and build the UI with
    cd tools/workbench/ui && npm install && npm run build

@altitude altitude closed this Feb 6, 2026
Copy link
Contributor

@laouji laouji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have time to review everything in detail, but I'm not super enthusiastic about the kind of stateful hell that reimplementing a database in memory implies.

There's lots of optimistic locking throughout which is not very concurrency friendly. In a dev environment, I understand this is less of a problem, but if that's the case why have multi connector handling at all?

I fear that having such a stateful application, where state is not very carefully managed, will mean that it'll be hard for a developer to know "is this a bug in my connector?" or "is this a bug in the cache in the workbench?"

Is the motivation for not having an external database that "docker is too difficult"?

Going back to my suggestion about connectors working sort of like "lambdas", can we instead imagine a way to develop connectors that doesn't involve storing state at all?

Maybe if a developer just needs to check that the HTTP requests against the PSP work, we can effectively install/uninstall connectors in each request and instead manage things like cursors as part of request parameters that the developer can manipulate at will.

This stateless input/output approach also would make testing new connectors more simple -> we could design a test suite devs could use to to validate that a newly developed connector works (provided they have the API keys).

"github.com/formancehq/go-libs/v3/logging"
"github.com/formancehq/payments/internal/connectors/plugins/registry"
"github.com/formancehq/payments/internal/models"
pkgregistry "github.com/formancehq/payments/pkg/registry"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't appear to be a package that exists?


// getConnector retrieves the connector from request context
func (s *Server) getConnector(r *http.Request) *ConnectorInstance {
conn, _ := r.Context().Value(connectorCtxKey).(*ConnectorInstance)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm apprehensive about storing something fetched from an internal cache in context. Context is more designed for a top down approach (eg. the caller can share things with its children).

I don't have a full overview of how you are using connectors between requests, but I think this is likely to cause more headaches and bizarre side effects than it's worth.

I recommend just giving handlers direct access to the cache.

originalTransport http.RoundTripper

// Multi-connector management
connectors map[string]*ConnectorInstance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance you can reuse or expand on the connectors.Manager which is already used to manage loading / unloading connectors into memory from the registry?

Comment on lines +244 to +246
// Link schemas to transport for auto-inference
w.transport.Schemas = schemas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why is schema stored in the transport if it's also stored in the connector instance?

return
}

if err := conn.engine.RunOneCycle(r.Context()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that this is a recursive function it's likely to timeout for test accounts that have a lot of accounts / data in general.

The underlying for loops should probably add ctx.Done checks to be able to exit early in the case that the client stops listening.

}

// TaskTracker tracks task execution for the workbench.
type TaskTracker struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design of this interface seems way too bloated and convoluted.

Seems fair that a task tracker might have a pointer to its own task executions, but then for some reason each task node also has a list of executions attached to it that never seems to be referenced anywhere else?

There is tons of locking and some functions return pointers, which mean that the callers can actually modify in flight data without acquiring the necessary locks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants