diff --git a/src/llm/index.ts b/src/llm/index.ts index 5fe0307..2a3fbc4 100644 --- a/src/llm/index.ts +++ b/src/llm/index.ts @@ -111,8 +111,83 @@ export class OllamaAPI extends LLMService { } public async completionStream(params: LLMSettings): Promise { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - return {}; + if (DEBUG) { + console.log("Ollama stream completion"); + console.log("Model: " + params.model); + } + await this.api.setModel(params.model); + + // Create a queue to handle chunks as they arrive + const chunks: string[] = []; + let pendingResolve: ((value: IteratorResult<{ response: string }>) => void) | null = null; + let isComplete = false; + let error: any = null; + + // Start streaming immediately + this.api.streamingGenerate( + params.prompt, + // responseOutput callback - called for each chunk as it arrives + (chunk: string) => { + // Queue the chunk first, then resolve if someone is waiting + chunks.push(chunk); + if (pendingResolve) { + // Someone is waiting - give them the chunk immediately + const nextChunk = chunks.shift()!; + pendingResolve({ value: { response: nextChunk }, done: false }); + pendingResolve = null; + } + }, + // contextOutput callback + null, + // fullResponseOutput callback + null, + // statsOutput callback + null + ).then(() => { + isComplete = true; + // If someone is waiting, notify them we're done + if (pendingResolve) { + pendingResolve({ value: undefined, done: true }); + pendingResolve = null; + } + }).catch((err: any) => { + error = err; + isComplete = true; + // Clear the queue on error to avoid returning stale chunks + chunks.splice(0); + if (pendingResolve) { + pendingResolve({ value: undefined, done: true }); + pendingResolve = null; + } + }); + + // Return an async iterator that yields chunks as they arrive + return { + [Symbol.asyncIterator]: () => ({ + next: (): Promise> => { + // If there's an error, throw it immediately (no chunks should be returned after error) + if (error) { + throw error; + } + + // If we have queued chunks, return one immediately + if (chunks.length > 0) { + const chunk = chunks.shift()!; + return Promise.resolve({ value: { response: chunk }, done: false }); + } + + // If streaming is complete, we're done + if (isComplete) { + return Promise.resolve({ value: undefined, done: true }); + } + + // Otherwise, wait for the next chunk + return new Promise((resolve) => { + pendingResolve = resolve; + }); + } + }) + }; } } diff --git a/src/loz.ts b/src/loz.ts index f20bfe5..ed03ee0 100644 --- a/src/loz.ts +++ b/src/loz.ts @@ -272,7 +272,7 @@ export class Loz { let curCompleteText = ""; const api = this.checkAPI(); - if (api === "openai" || api === "github-copilot") { + if (api === "openai" || api === "github-copilot" || api === "ollama") { let stream: any; try { stream = await this.llmAPI.completionStream(params); @@ -347,11 +347,20 @@ export class Loz { } } + process.stdout.write("\n"); + } else if (api === "ollama") { + // Handle Ollama streaming + for await (const data of stream) { + if (data === null) break; // Break instead of return to ensure newline is written + const streamData = data.response || ""; + curCompleteText += streamData; + process.stdout.write(streamData); + } process.stdout.write("\n"); } else { // OpenAI streaming for await (const data of stream) { - if (data === null) return; + if (data === null) break; // Break instead of return to ensure newline is written const streamData = data.choices[0]?.delta?.content || ""; curCompleteText += streamData; process.stdout.write(streamData); @@ -395,7 +404,7 @@ export class Loz { } cli.prompt(); }); - cli.start(true); + cli.start(); }); } diff --git a/src/prompt/index.ts b/src/prompt/index.ts index edb9dd3..67d81e6 100644 --- a/src/prompt/index.ts +++ b/src/prompt/index.ts @@ -7,7 +7,6 @@ interface PromptInterface { export class CommandLinePrompt implements PromptInterface { private rl: readline.Interface; private callback: (input: string) => Promise; - private timer: any; constructor(callback: (input: string) => Promise) { // ... @@ -22,26 +21,13 @@ export class CommandLinePrompt implements PromptInterface { this.rl.prompt(); } - public async start(blinking?: boolean): Promise { + public async start(): Promise { // Set the prompt to display before each input this.rl.setPrompt("> "); // Show the cursor and prompt the user for input this.rl.prompt(); - // Set the terminal to raw mode to allow for cursor manipulation - process.stdin.setRawMode(true); - - // Display a blinking cursor - if (blinking) { - this.timer = setInterval(() => { - process.stdout.write("\x1B[?25h"); - setTimeout(() => { - process.stdout.write("\x1B[?25l"); - }, 500); - }, 1000); - } - // Listen for user input this.rl.on("line", async (input) => { this.rl.prompt(); @@ -50,19 +36,11 @@ export class CommandLinePrompt implements PromptInterface { // Handle CTRL+C to exit the program this.rl.on("SIGINT", () => { - clearInterval(this.timer); this.rl.close(); }); } public exit(): void { - clearInterval(this.timer); - // Show the cursor - process.stdout.write("\x1B[?25h"); - // Try to enable cursor blinking - process.stdout.write("\x1B[?12h"); - // Reset the terminal to the normal mode - process.stdin.setRawMode(false); this.rl.close(); } }