Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ documentation = "https://docs.rs/cloudllm/latest/cloudllm/"

[dependencies]
tokio = { version = "1.47.1", features = ["full"] }
tokio-stream = "0.1"

# This is my own fork of the OpenAI Rust client, which has a few improvements over the original.
# https://github.com/gubatron/openai-rust
openai-rust2 = { version = "1.6.0" }

async-trait = "0.1.88"
log = "0.4.27"
env_logger = "0.11.8"
env_logger = "0.11.8"
futures-util = "0.3.27"
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ CloudLLM is a Rust library designed to seamlessly bridge applications with remot
## Features

- **Unified Interface**: Interact with multiple LLMs using a single, consistent API.
- **Streaming Support**: Real-time token delivery for reduced latency and better UX (see [STREAMING.md](STREAMING.md))
- **Pay-as-you-go Integration**: Designed to work efficiently with pay-as-you-go LLM platforms.
- **Extendable**: Easily add new LLM platform clients as they emerge.
- **Asynchronous Support**: Built with async operations for non-blocking calls.
Expand Down Expand Up @@ -41,6 +42,23 @@ cloudllm = "0.2.12" # Use the latest version

Refer to the `examples/` directory to see how you can set up sessions and interact with various LLM platforms using CloudLLM.

### Streaming Responses

CloudLLM supports real-time streaming for dramatically reduced latency. Tokens are delivered as soon as they arrive, creating a better user experience:

```rust
use cloudllm::ClientWrapper;
use futures_util::StreamExt;

let mut stream = client.send_message_stream(messages, None).await?;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
print!("{}", chunk.content); // Display immediately!
}
```

For complete streaming documentation and examples, see [STREAMING.md](STREAMING.md).

## Contributing

Contributions to CloudLLM are always welcome! Whether it's feature suggestions, bug reporting, or code improvements, all contributions are appreciated.
Expand Down
215 changes: 215 additions & 0 deletions STREAMING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Streaming Support in CloudLLM

CloudLLM now provides first-class streaming support for real-time token delivery, dramatically reducing perceived latency in user interfaces.

## Overview

Streaming allows you to receive and display tokens as soon as they arrive from the LLM provider, rather than waiting for the complete response. This creates a better user experience as users can start reading the response immediately.

## Features

- **Low Latency**: Tokens are delivered as soon as they arrive from the provider
- **Compatible API**: Streaming methods work alongside existing non-streaming methods
- **Provider Support**: Works with OpenAI, Grok, Claude, and other providers that support streaming
- **Simple Interface**: Easy-to-use Stream interface with `MessageChunk` items

## Usage

### Basic Streaming with ClientWrapper

```rust
use cloudllm::client_wrapper::Role;
use cloudllm::clients::openai::OpenAIClient;
use cloudllm::ClientWrapper;
use futures_util::StreamExt;

#[tokio::main]
async fn main() {
let client = OpenAIClient::new_with_model_enum(
&secret_key,
cloudllm::clients::openai::Model::GPT5Nano,
);

let messages = vec![
cloudllm::client_wrapper::Message {
role: Role::User,
content: "Tell me a short story".to_string(),
},
];

// Get streaming response
let mut stream = client.send_message_stream(messages, None).await.unwrap();

// Process chunks as they arrive
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
print!("{}", chunk.content); // Display immediately!

if chunk.is_final {
break;
}
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
}
```

### Streaming with LLMSession

```rust
use cloudllm::LLMSession;
use cloudllm::client_wrapper::Role;
use futures_util::StreamExt;

let mut session = LLMSession::new(
std::sync::Arc::new(client),
"You are a helpful assistant.".to_string(),
4096
);

let mut stream = session.send_message_stream(
Role::User,
"Write a poem".to_string(),
None,
).await.unwrap();

let mut full_response = String::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result.unwrap();
print!("{}", chunk.content);
full_response.push_str(&chunk.content);
}
```

## API Reference

### `MessageChunk`

Represents a chunk of streaming response:

```rust
pub struct MessageChunk {
/// The incremental content in this chunk
pub content: String,
/// Whether this is the final chunk in the stream
pub is_final: bool,
}
```

### `send_message_stream()`

Available on all `ClientWrapper` implementations:

```rust
async fn send_message_stream(
&self,
messages: Vec<Message>,
optional_search_parameters: Option<SearchParameters>,
) -> Result<Pin<Box<dyn Stream<Item = Result<MessageChunk, SendError>>>>, Box<dyn Error>>
```

Also available on `LLMSession`:

```rust
pub async fn send_message_stream(
&mut self,
role: Role,
content: String,
optional_search_parameters: Option<SearchParameters>,
) -> Result<Pin<Box<dyn Stream<Item = Result<MessageChunk, SendError>>>>, Box<dyn Error>>
```

## Important Notes

### Token Usage Tracking

**Token usage tracking is NOT available for streaming responses.** The OpenAI streaming API does not provide usage information in real-time. If you need token usage tracking, use the non-streaming `send_message()` method instead.

### Conversation History with LLMSession

When using `send_message_stream()` with `LLMSession`, the assistant's response is **NOT** automatically added to the conversation history. If you want to maintain conversation context with streaming, you must:

1. Collect all chunks into a complete message
2. Manually add the message to the session's conversation history

Example:

```rust
let mut stream = session.send_message_stream(Role::User, prompt, None).await?;
let mut full_response = String::new();

while let Some(chunk) = stream.next().await {
let chunk = chunk?;
full_response.push_str(&chunk.content);
}

// Manually add to history if needed
// (Note: You'll need to access the internal conversation_history field,
// or use send_message() for automatic history management)
```

For most use cases with conversation history, consider using the non-streaming `send_message()` method which handles history automatically.

### Thread Safety

The streaming API returns streams that are not `Send` due to limitations in the underlying HTTP client. This means:

- The stream must be consumed in the same task that creates it
- You cannot send the stream across threads
- This is typically not an issue for web servers or CLI applications

## Examples

See the examples directory for complete working examples:

- `examples/streaming_example.rs` - Basic streaming with ClientWrapper
- `examples/streaming_session_example.rs` - Streaming with LLMSession

Run examples:
```bash
OPEN_AI_SECRET=your-key cargo run --example streaming_example
OPEN_AI_SECRET=your-key cargo run --example streaming_session_example
```

## Supported Providers

All providers that delegate to OpenAI-compatible APIs support streaming:

- ✅ OpenAI (GPT-4, GPT-5, etc.)
- ✅ Grok (xAI)
- ✅ Claude (Anthropic) - via OpenAI-compatible endpoint
- ✅ Gemini - via OpenAI-compatible endpoint

## Benefits

### Dramatically Reduced Perceived Latency

Instead of waiting 5-10 seconds for a complete response, users see the assistant "typing" almost immediately - typically within 200-500ms. This creates a much more responsive and natural user experience.

### Better User Engagement

Users can start reading and processing the response while it's still being generated, leading to:
- Higher user satisfaction
- More natural conversation flow
- Better perceived performance

## Performance Comparison

**Without Streaming:**
```
User sends message -> [5-10 second wait] -> Complete response appears
Time to first token: 5000ms
```

**With Streaming:**
```
User sends message -> [200-500ms] -> First tokens appear -> More tokens arrive continuously
Time to first token: 300ms ⚡
```

The total time to receive the complete message is similar, but the user experience is dramatically better with streaming.
73 changes: 73 additions & 0 deletions examples/streaming_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::env;
use std::io::{self, Write};

use cloudllm::client_wrapper::Role;
use cloudllm::clients::openai::OpenAIClient;
use cloudllm::ClientWrapper;
use futures_util::StreamExt;

// Run from the root folder of the repo as follows:
// OPEN_AI_SECRET=your-open-ai-key-here cargo run --example streaming_example

#[tokio::main]
async fn main() {
// Read OPEN_AI_SECRET from environment variable
let secret_key =
env::var("OPEN_AI_SECRET").expect("Please set the OPEN_AI_SECRET environment variable!");

// Instantiate the OpenAI client
let client = OpenAIClient::new_with_model_enum(
&secret_key,
cloudllm::clients::openai::Model::GPT5Nano,
);

// Create messages
let messages = vec![
cloudllm::client_wrapper::Message {
role: Role::System,
content: "You are a helpful assistant.".to_string(),
},
cloudllm::client_wrapper::Message {
role: Role::User,
content: "Tell me a short story about a robot learning to paint.".to_string(),
},
];

println!("Streaming response from AI:");
println!("----------------------------");

// Send message and get streaming response
match client.send_message_stream(messages, None).await {
Ok(mut stream) => {
let mut full_content = String::new();

// Process chunks as they arrive
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
// Print the chunk immediately (this is what gives us low latency!)
print!("{}", chunk.content);
io::stdout().flush().unwrap();

full_content.push_str(&chunk.content);

if chunk.is_final {
println!("\n\n[Stream complete]");
break;
}
}
Err(e) => {
eprintln!("\nError in stream: {}", e);
break;
}
}
}

println!("\n----------------------------");
println!("Total characters received: {}", full_content.len());
}
Err(e) => {
eprintln!("Error starting stream: {}", e);
}
}
}
Loading