Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 77 additions & 2 deletions src/llm/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,83 @@ export class OllamaAPI extends LLMService {
}

public async completionStream(params: LLMSettings): Promise<any> {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
return {};
if (DEBUG) {
console.log("Ollama stream completion");
console.log("Model: " + params.model);
}
await this.api.setModel(params.model);

// Create a queue to handle chunks as they arrive
const chunks: string[] = [];
let pendingResolve: ((value: IteratorResult<{ response: string }>) => void) | null = null;
let isComplete = false;
let error: any = null;

// Start streaming immediately
this.api.streamingGenerate(
params.prompt,
// responseOutput callback - called for each chunk as it arrives
(chunk: string) => {
// Queue the chunk first, then resolve if someone is waiting
chunks.push(chunk);
if (pendingResolve) {
// Someone is waiting - give them the chunk immediately
const nextChunk = chunks.shift()!;
pendingResolve({ value: { response: nextChunk }, done: false });
pendingResolve = null;
}
},
// contextOutput callback
null,
// fullResponseOutput callback
null,
// statsOutput callback
null
).then(() => {
isComplete = true;
// If someone is waiting, notify them we're done
if (pendingResolve) {
pendingResolve({ value: undefined, done: true });
pendingResolve = null;
}
}).catch((err: any) => {
error = err;
isComplete = true;
// Clear the queue on error to avoid returning stale chunks
chunks.splice(0);
if (pendingResolve) {
pendingResolve({ value: undefined, done: true });
pendingResolve = null;
}
});

// Return an async iterator that yields chunks as they arrive
return {
[Symbol.asyncIterator]: () => ({
next: (): Promise<IteratorResult<{ response: string }>> => {
// If there's an error, throw it immediately (no chunks should be returned after error)
if (error) {
throw error;
}

// If we have queued chunks, return one immediately
if (chunks.length > 0) {
const chunk = chunks.shift()!;
return Promise.resolve({ value: { response: chunk }, done: false });
}

// If streaming is complete, we're done
if (isComplete) {
return Promise.resolve({ value: undefined, done: true });
}

// Otherwise, wait for the next chunk
return new Promise((resolve) => {
pendingResolve = resolve;
});
}
})
};
}
}

Expand Down
15 changes: 12 additions & 3 deletions src/loz.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ export class Loz {
let curCompleteText = "";
const api = this.checkAPI();

if (api === "openai" || api === "github-copilot") {
if (api === "openai" || api === "github-copilot" || api === "ollama") {
let stream: any;
try {
stream = await this.llmAPI.completionStream(params);
Expand Down Expand Up @@ -347,11 +347,20 @@ export class Loz {
}
}

process.stdout.write("\n");
} else if (api === "ollama") {
// Handle Ollama streaming
for await (const data of stream) {
if (data === null) break; // Break instead of return to ensure newline is written
const streamData = data.response || "";
curCompleteText += streamData;
process.stdout.write(streamData);
}
process.stdout.write("\n");
} else {
// OpenAI streaming
for await (const data of stream) {
if (data === null) return;
if (data === null) break; // Break instead of return to ensure newline is written
const streamData = data.choices[0]?.delta?.content || "";
curCompleteText += streamData;
process.stdout.write(streamData);
Expand Down Expand Up @@ -395,7 +404,7 @@ export class Loz {
}
cli.prompt();
});
cli.start(true);
cli.start();
});
}

Expand Down
24 changes: 1 addition & 23 deletions src/prompt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ interface PromptInterface {
export class CommandLinePrompt implements PromptInterface {
private rl: readline.Interface;
private callback: (input: string) => Promise<void>;
private timer: any;

constructor(callback: (input: string) => Promise<void>) {
// ...
Expand All @@ -22,26 +21,13 @@ export class CommandLinePrompt implements PromptInterface {
this.rl.prompt();
}

public async start(blinking?: boolean): Promise<void> {
public async start(): Promise<void> {
// Set the prompt to display before each input
this.rl.setPrompt("> ");

// Show the cursor and prompt the user for input
this.rl.prompt();

// Set the terminal to raw mode to allow for cursor manipulation
process.stdin.setRawMode(true);

// Display a blinking cursor
if (blinking) {
this.timer = setInterval(() => {
process.stdout.write("\x1B[?25h");
setTimeout(() => {
process.stdout.write("\x1B[?25l");
}, 500);
}, 1000);
}

// Listen for user input
this.rl.on("line", async (input) => {
this.rl.prompt();
Expand All @@ -50,19 +36,11 @@ export class CommandLinePrompt implements PromptInterface {

// Handle CTRL+C to exit the program
this.rl.on("SIGINT", () => {
clearInterval(this.timer);
this.rl.close();
});
}

public exit(): void {
clearInterval(this.timer);
// Show the cursor
process.stdout.write("\x1B[?25h");
// Try to enable cursor blinking
process.stdout.write("\x1B[?12h");
// Reset the terminal to the normal mode
process.stdin.setRawMode(false);
this.rl.close();
}
}
Loading