-
-
Couldn't load subscription status.
- Fork 6
first attempt at making a smooth stream of model output -- need to re… #329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…move "removal 1" comments
|
@92jpascal is attempting to deploy a commit to the QCX-MAIN Team on Vercel. A member of the Team first needs to authorize it. |
|
|
PR Compliance Guide 🔍Below is a summary of compliance checks for this PR:
Compliance status legend🟢 - Fully Compliant🟡 - Partial Compliant 🔴 - Not Compliant ⚪ - Requires Further Human Verification 🏷️ - Compliance label |
|||||||||||||||||||
WalkthroughParallelizes answer generation by introducing two concurrent stages and awaiting both via Promise.allSettled. Updates researcher agent to accept a dynamic system prompt and adjust UI streaming behavior. Allows search tool to accept an injected stream. Adds new concurrency utilities for limiting tasks, budgeting with timeouts, and token gating. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor U as User
participant A as app/actions.tsx
participant R as Researcher
participant Q as QuerySuggestor
participant S as Subsequent Logic
U->>A: Trigger action
note over A: Create AbortController<br/>Start two stages in parallel
par Stage 1
A->>R: researcher(dynamicSystemPrompt, streams, messages,...)
R-->>A: Streamed text + done()
and Stage 2
A->>Q: querySuggestor(...)
Q-->>A: Suggestions or error (caught)
end
A->>A: Promise.allSettled([R, Q])
alt Both settled
A->>S: Run follow-up (API outcomes, length checks)
S-->>U: Final response
else Abort/timeout
A-->>U: Terminate/handle error
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Code Suggestions ✨Explore these optional code suggestions:
|
|||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
app/actions.tsx (4)
370-389: Writer fallback should use the locally mutated messages (with tool calls/results), not aiState.get().messages.Using aiState drops tool context (you removed in-loop AI state updates). Use the local
messages(mutated by researcher) so writer has full context.- const modifiedMessages = aiState - .get() - .messages.map(msg => - msg.role === 'tool' - ? { - ...msg, - role: 'assistant', - content: JSON.stringify(msg.content), - type: 'tool' - } - : msg - ) as CoreMessage[] - const latestMessages = modifiedMessages.slice(maxMessages * -1) + // Use the locally built conversation including tool calls/results from researcher + const latestMessages = (messages as CoreMessage[]).slice(maxMessages * -1) answer = await writer( currentSystemPrompt, uiStream, streamText, latestMessages )
390-391: Avoid double-ending the same stream.
researcher()already callsstreamText.done(). This extra call is redundant and can cause no-op warnings.- } else { - streamText.done() - } + }
339-359: Persist tool outputs to AI state once (post-stage) to retain UI on reload/share.You removed real-time tool appends. Without persisting tool results, Search/Retrieve/Video sections won’t reconstruct from saved state. After both stages settle, flush
toolOutputsintoaiState.await Promise.allSettled([stage1, stage2]) + + // Persist tool outputs so UI can be reconstructed from AI state later. + if (toolOutputs.length > 0) { + for (const output of toolOutputs) { + aiState.update({ + ...aiState.get(), + messages: [ + ...aiState.get().messages, + { + id: groupeId, + role: 'tool', + content: JSON.stringify(output.result), + name: output.toolName, + type: 'tool' + } + ] + }) + } + }Also applies to: 367-368
294-295: Finish isGenerating with a boolean.Be consistent: set to false.
- isGenerating.done() + isGenerating.done(false)lib/agents/tools/search.tsx (1)
49-53: Fix missing closing quote and complete the stream on errors to avoid a hanging section.- fullResponse += `\nAn error occurred while searching for "${query}.` + fullResponse += `\nAn error occurred while searching for "${query}".` uiStream.update( <Card className="p-4 mt-2 text-sm"> {`An error occurred while searching for "${query}".`} </Card> ) + // Complete the injected or local stream so UI doesn't hang + try { streamResults.done(JSON.stringify({ error: true })) } catch {} return searchResult
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
app/actions.tsx(3 hunks)lib/agents/researcher.tsx(4 hunks)lib/agents/tools/search.tsx(1 hunks)lib/utils/concurrency.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
app/actions.tsx (1)
lib/agents/query-suggestor.tsx (1)
querySuggestor(8-50)
lib/agents/researcher.tsx (4)
lib/agents/writer.tsx (1)
writer(7-51)lib/agents/inquire.tsx (1)
inquire(12-54)lib/agents/tools/index.tsx (1)
ToolProps(7-11)lib/agents/tools/geospatial.tsx (1)
uiStream(150-337)
🔇 Additional comments (1)
app/actions.tsx (1)
140-140: Fix Markdown link formatting (extra space breaks the link).[suggest_minor_issue]
- : `QCX-Terra is a model garden of pixel level precision geospatial foundational models for efficient land feature predictions from satellite imagery. Available for our Pro and Enterprise customers. [QCX Pricing] (https://www.queue.cx/#pricing)`; + : `QCX-Terra is a model garden of pixel level precision geospatial foundational models for efficient land feature predictions from satellite imagery. Available for our Pro and Enterprise customers. [QCX Pricing](https://www.queue.cx/#pricing)`;
| ) { | ||
| */ | ||
| // Split and stage producers | ||
| const abortController = new AbortController() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Remove or wire AbortController.
Declared but unused. Either pass its signal into a budget/timeout helper or remove it.
- const abortController = new AbortController()
+ // If cancellation will be supported, pass AbortSignal into withBudget/agents; otherwise omit.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const abortController = new AbortController() | |
| // If cancellation will be supported, pass AbortSignal into withBudget/agents; otherwise omit. |
🤖 Prompt for AI Agents
In app/actions.tsx around line 325, an AbortController is declared but never
used; either remove the unused AbortController declaration or wire it into the
async operation that needs cancellation by passing abortController.signal to the
fetch/budget/timeout helper (or storing it to call abort() where appropriate).
If you keep it, thread the signal into the request/timeouter function and ensure
you handle AbortError in the catch; otherwise delete the AbortController
declaration to eliminate the unused variable.
| const stage2 = (async () => { | ||
| try { | ||
| await querySuggestor(uiStream, messages) | ||
| } catch {} | ||
| })() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Don’t run querySuggestor twice; capture stage2 result and reuse to avoid duplicate “Related” UI and extra API calls.
Stage2 already runs querySuggestor, but it’s called again later. Capture the stage2 result and only call again if stage2 failed.
- const stage2 = (async () => {
- try {
- await querySuggestor(uiStream, messages)
- } catch {}
- })()
+ let relatedFromStage2: RelatedQueries | null = null
+ const stage2 = (async () => {
+ try {
+ relatedFromStage2 = await querySuggestor(uiStream, messages)
+ } catch (err) {
+ console.warn('querySuggestor failed (stage2):', err)
+ }
+ })()
await Promise.allSettled([stage1, stage2])
- if (!errorOccurred) {
- const relatedQueries = await querySuggestor(uiStream, messages)
+ if (!errorOccurred) {
+ // If stage2 already produced related queries (and UI), reuse them; otherwise fetch once.
+ const relatedQueries = relatedFromStage2 ?? await querySuggestor(uiStream, messages)
uiStream.append(
<Section title="Follow-up">
<FollowupPanel />
</Section>
)
@@
{
id: groupeId,
role: 'assistant',
content: JSON.stringify(relatedQueries),
type: 'related'
},Also applies to: 393-427
🤖 Prompt for AI Agents
In app/actions.tsx around lines 361-366 (and similarly 393-427), the current
code immediately invokes an async IIFE that calls querySuggestor and then later
calls querySuggestor again, causing duplicate "Related" UI and extra API calls;
change the pattern to capture the stage2 promise (e.g., assign the invoked
promise to a variable) and reuse its result where querySuggestor is called
later: await the captured stage2 promise when you need its outcome, and only
call querySuggestor again as a fallback if that awaited promise rejected or
returned an explicit failure. Ensure you preserve existing try/catch semantics
around the initial invocation and the later consumption so duplicate requests
are avoided.
| try { | ||
| await querySuggestor(uiStream, messages) | ||
| } catch {} | ||
| })() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Don’t swallow stage2 errors silently.
Log at least a warning so failures aren’t invisible.
- try {
- await querySuggestor(uiStream, messages)
- } catch {}
+ try {
+ await querySuggestor(uiStream, messages)
+ } catch (err) {
+ console.warn('querySuggestor failed:', err)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try { | |
| await querySuggestor(uiStream, messages) | |
| } catch {} | |
| })() | |
| try { | |
| await querySuggestor(uiStream, messages) | |
| } catch (err) { | |
| console.warn('querySuggestor failed:', err) | |
| } | |
| })() |
🤖 Prompt for AI Agents
In app/actions.tsx around lines 362 to 365, the catch block after await
querySuggestor(uiStream, messages) is silently swallowing errors; change it to
catch the error and log at least a warning with the error details (e.g.,
processLogger.warn or console.warn with a contextual message and the error) so
stage2 failures are visible for debugging; preserve existing control flow after
logging.
| // removal 1: // Remove the spinner | ||
| // removal 1: // uiStream.update(null) | ||
| // Append the answer section immediately to avoid gating on first token | ||
| uiStream.append(answerSection) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clear the spinner before appending the answer section to prevent a stuck spinner.
The spinner is set in app/actions.tsx via uiStream.update(). Call update(null) here to remove it once streaming begins.
- // Append the answer section immediately to avoid gating on first token
- uiStream.append(answerSection)
+ // Replace spinner with the answer section once streaming starts
+ uiStream.update(null)
+ uiStream.append(answerSection)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // removal 1: // Remove the spinner | |
| // removal 1: // uiStream.update(null) | |
| // Append the answer section immediately to avoid gating on first token | |
| uiStream.append(answerSection) | |
| // removal 1: // Remove the spinner | |
| // removal 1: // uiStream.update(null) | |
| // Replace spinner with the answer section once streaming starts | |
| uiStream.update(null) | |
| uiStream.append(answerSection) |
🤖 Prompt for AI Agents
In lib/agents/researcher.tsx around lines 83 to 87, the spinner set earlier via
uiStream.update(<Spinner />) is not being cleared before appending the answer
section which can leave the spinner visible; call uiStream.update(null)
immediately before uiStream.append(answerSection) to remove the spinner, then
append the answerSection so streaming begins without a stuck spinner.
| if (signal) { | ||
| if (signal.aborted) throw new Error('aborted'); | ||
| signal.addEventListener('abort', () => { | ||
| if (timeoutId) clearTimeout(timeoutId); | ||
| }, { once: true }); | ||
| } | ||
|
|
||
| try { | ||
| return await Promise.race([task(), timeoutPromise]); | ||
| } finally { | ||
| if (timeoutId) clearTimeout(timeoutId); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withBudget can hang on AbortSignal; reject on abort and clear timeout.
Currently abort clears the timeout but doesn’t reject, potentially hanging forever. Include an abortPromise in the race.
if (signal) {
- if (signal.aborted) throw new Error('aborted');
- signal.addEventListener('abort', () => {
- if (timeoutId) clearTimeout(timeoutId);
- }, { once: true });
+ if (signal.aborted) throw new DOMException('Aborted', 'AbortError');
+ // Ensure we reject promptly on abort and clear timeout
+ // Note: listeners are one-shot
+ var abortPromise = new Promise<never>((_, reject) => {
+ signal.addEventListener(
+ 'abort',
+ () => {
+ if (timeoutId) clearTimeout(timeoutId);
+ reject(new DOMException('Aborted', 'AbortError'));
+ },
+ { once: true }
+ )
+ })
}
try {
- return await Promise.race([task(), timeoutPromise]);
+ // Include abortPromise in the race if provided
+ return await Promise.race(
+ typeof abortPromise !== 'undefined'
+ ? [task(), timeoutPromise, abortPromise]
+ : [task(), timeoutPromise]
+ );
} finally {
if (timeoutId) clearTimeout(timeoutId);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (signal) { | |
| if (signal.aborted) throw new Error('aborted'); | |
| signal.addEventListener('abort', () => { | |
| if (timeoutId) clearTimeout(timeoutId); | |
| }, { once: true }); | |
| } | |
| try { | |
| return await Promise.race([task(), timeoutPromise]); | |
| } finally { | |
| if (timeoutId) clearTimeout(timeoutId); | |
| } | |
| } | |
| if (signal) { | |
| if (signal.aborted) throw new DOMException('Aborted', 'AbortError'); | |
| // Ensure we reject promptly on abort and clear timeout | |
| // Note: listeners are one-shot | |
| var abortPromise = new Promise<never>((_, reject) => { | |
| signal.addEventListener( | |
| 'abort', | |
| () => { | |
| if (timeoutId) clearTimeout(timeoutId); | |
| reject(new DOMException('Aborted', 'AbortError')); | |
| }, | |
| { once: true } | |
| ); | |
| }); | |
| } | |
| try { | |
| // Include abortPromise in the race if provided | |
| return await Promise.race( | |
| typeof abortPromise !== 'undefined' | |
| ? [task(), timeoutPromise, abortPromise] | |
| : [task(), timeoutPromise] | |
| ); | |
| } finally { | |
| if (timeoutId) clearTimeout(timeoutId); | |
| } | |
| } |
🤖 Prompt for AI Agents
In lib/utils/concurrency.ts around lines 32 to 44, the current abort handler
only clears the timeout and doesn't reject, which can cause withBudget to hang;
add an abortPromise that rejects when signal.aborted (or on 'abort' event) and
include it in the Promise.race alongside task() and timeoutPromise, ensure the
abort listener also clears the timeout and rejects with an appropriate
AbortError, and finally keep the existing finally cleanup to clear the timeout
and remove the abort listener so no handlers leak.
| export async function tokenGate(start: () => number, threshold: number, fn: () => Promise<void>): Promise<void> { | ||
| if (start() >= threshold) { | ||
| await fn(); | ||
| return; | ||
| } | ||
| // Poll lightly until threshold reached | ||
| await new Promise<void>((resolve) => { | ||
| const id = setInterval(async () => { | ||
| if (start() >= threshold) { | ||
| clearInterval(id); | ||
| resolve(); | ||
| } | ||
| }, 50); | ||
| }); | ||
| await fn(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Optional: tokenGate can wait indefinitely; consider max wait or external signal.
To avoid unbounded waits, support a max wait or AbortSignal.
-export async function tokenGate(start: () => number, threshold: number, fn: () => Promise<void>): Promise<void> {
+export async function tokenGate(start: () => number, threshold: number, fn: () => Promise<void>, options?: { maxMs?: number; signal?: AbortSignal }): Promise<void> {
if (start() >= threshold) {
await fn();
return;
}
// Poll lightly until threshold reached
- await new Promise<void>((resolve) => {
+ await new Promise<void>((resolve, reject) => {
const id = setInterval(async () => {
if (start() >= threshold) {
clearInterval(id);
resolve();
}
- }, 50);
+ }, 50);
+ if (options?.signal) {
+ options.signal.addEventListener('abort', () => { clearInterval(id); reject(new DOMException('Aborted', 'AbortError')) }, { once: true })
+ }
+ if (options?.maxMs) {
+ setTimeout(() => { clearInterval(id); reject(new Error('token_gate_timeout')) }, options.maxMs)
+ }
});
await fn();
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export async function tokenGate(start: () => number, threshold: number, fn: () => Promise<void>): Promise<void> { | |
| if (start() >= threshold) { | |
| await fn(); | |
| return; | |
| } | |
| // Poll lightly until threshold reached | |
| await new Promise<void>((resolve) => { | |
| const id = setInterval(async () => { | |
| if (start() >= threshold) { | |
| clearInterval(id); | |
| resolve(); | |
| } | |
| }, 50); | |
| }); | |
| await fn(); | |
| } | |
| export async function tokenGate( | |
| start: () => number, | |
| threshold: number, | |
| fn: () => Promise<void>, | |
| options?: { maxMs?: number; signal?: AbortSignal } | |
| ): Promise<void> { | |
| if (start() >= threshold) { | |
| await fn() | |
| return | |
| } | |
| // Poll lightly until threshold reached | |
| await new Promise<void>((resolve, reject) => { | |
| const id = setInterval(async () => { | |
| if (start() >= threshold) { | |
| clearInterval(id) | |
| resolve() | |
| } | |
| }, 50) | |
| if (options?.signal) { | |
| options.signal.addEventListener( | |
| 'abort', | |
| () => { | |
| clearInterval(id) | |
| reject(new DOMException('Aborted', 'AbortError')) | |
| }, | |
| { once: true } | |
| ) | |
| } | |
| if (options?.maxMs) { | |
| setTimeout(() => { | |
| clearInterval(id) | |
| reject(new Error('token_gate_timeout')) | |
| }, options.maxMs) | |
| } | |
| }) | |
| await fn() | |
| } |
🤖 Prompt for AI Agents
In lib/utils/concurrency.ts around lines 46 to 61, tokenGate currently can block
forever waiting for the threshold; change its signature to accept an optional
maxWaitMs (number) and/or an AbortSignal, and update the wait logic to race the
interval check against a timeout and an abort signal: start the interval checker
as now, also start a timeout that clears the interval and rejects/returns if
maxWaitMs elapses (or treat as a resolved no-op depending on desired behavior),
and attach an abort listener that clears the interval and rejects immediately
when signalled; ensure all timers/listeners are cleaned up before calling fn or
returning so fn is only invoked when threshold reached and callers receive a
clear error/abort when waiting stops.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Thanks for the pull request @92jpascal |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 15905 🏁 Script executed: Length of output: 7049 🏁 Script executed: Length of output: 6235 🏁 Script executed: Length of output: 3627 🏁 Script executed: Length of output: 5149 @ngoiyaeric, thank you for the detailed feedback! I've analyzed the PR and identified several critical issues that explain the problems you're experiencing. Let me break them down: 🔴 Critical Issues Found1. Duplicate
|
|
@codetabbitai can you generalize the methodology to enhance performance through concurrency across the entire codebase. |
User description
…move "removal 1" comments
PR Type
Enhancement
Description
Refactored streaming model output to run researcher and query suggester concurrently
Added concurrency utilities for task management and token-based gating
Modified answer section rendering to display immediately without waiting for first token
Injected stream handling into search tool for better control
Diagram Walkthrough
File Walkthrough
concurrency.ts
Add concurrency control utilities for async operationslib/utils/concurrency.ts
runWithConcurrencyLimitfunction for parallel task executionwith limits
withBudgetfor time-constrained async operations withabort support
tokenGateutility for threshold-based polling and executionactions.tsx
Refactor to concurrent stage-based execution modelapp/actions.tsx
Promise.allSettledasync operations
researcher.tsx
Optimize answer section rendering for immediate displaylib/agents/researcher.tsx
immediately
streamText.done()call to properly close the streamsearch.tsx
Support stream injection in search toollib/agents/tools/search.tsx
injectedStreamparameter to search tool executefunction
Summary by CodeRabbit