Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 147 additions & 104 deletions rust/src/client.rs

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 agent_chat_stream missing API key authentication, only checks JWT token

The agent_chat_stream method (used by both agent_chat and subagent_chat) only checks for a JWT access token when building the Authorization header (rust/src/client.rs:1417-1423). It does not check for an API key via get_api_key(). In contrast, the analogous create_chat_completion_stream method (rust/src/client.rs:1289-1303) correctly prefers the API key over the JWT token. This means users who authenticate via API key (using new_with_api_key or set_api_key) will send agent chat requests without an Authorization header, causing authentication failures on the server.

(Refers to lines 1417-1423)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use reqwest::{
};
use serde::{de::DeserializeOwned, Serialize};
use serde_cbor::Value as CborValue;
use serde_json::Value;
use std::sync::{Arc, RwLock};
use uuid::Uuid;

Expand All @@ -25,6 +24,67 @@ pub struct OpenSecretClient {
server_public_key: Arc<RwLock<Option<Vec<u8>>>>, // Store server's public key from attestation
}

fn append_query_param(query: &mut Vec<String>, key: &str, value: impl ToString) {
let encoded = utf8_percent_encode(&value.to_string(), NON_ALPHANUMERIC).to_string();
query.push(format!("{}={}", key, encoded));
}

fn build_agent_items_endpoint(base: &str, params: Option<&AgentItemsListParams>) -> String {
let mut endpoint = base.to_string();
let mut query = Vec::new();

if let Some(params) = params {
if let Some(limit) = params.limit {
append_query_param(&mut query, "limit", limit);
}
if let Some(after) = params.after {
append_query_param(&mut query, "after", after);
}
if let Some(order) = &params.order {
append_query_param(&mut query, "order", order);
}
if let Some(include) = &params.include {
for include_value in include {
append_query_param(&mut query, "include", include_value);
}
}
}

if !query.is_empty() {
endpoint.push('?');
endpoint.push_str(&query.join("&"));
}

endpoint
}

fn build_subagents_endpoint(params: Option<&ListSubagentsParams>) -> String {
let mut endpoint = "/v1/agent/subagents".to_string();
let mut query = Vec::new();

if let Some(params) = params {
if let Some(limit) = params.limit {
append_query_param(&mut query, "limit", limit);
}
if let Some(after) = params.after {
append_query_param(&mut query, "after", after);
}
if let Some(order) = &params.order {
append_query_param(&mut query, "order", order);
}
if let Some(created_by) = &params.created_by {
append_query_param(&mut query, "created_by", created_by);
}
}

if !query.is_empty() {
endpoint.push('?');
endpoint.push_str(&query.join("&"));
}

endpoint
}

impl OpenSecretClient {
pub fn new(base_url: impl Into<String>) -> Result<Self> {
let base_url = base_url.into();
Expand Down Expand Up @@ -1319,13 +1379,9 @@ impl OpenSecretClient {
Ok(Box::pin(event_stream))
}

// Agent API Methods

/// Sends a message to the agent and returns a stream of SSE events.
/// The agent processes the message through a multi-step tool loop and
/// delivers messages via SSE (agent.message, agent.done, agent.error events).
pub async fn agent_chat(
async fn agent_chat_stream(
&self,
endpoint: String,
input: &str,
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<AgentSseEvent>> + Send>>> {
use eventsource_stream::Eventsource;
Expand All @@ -1341,7 +1397,7 @@ impl OpenSecretClient {
)
})?;

let url = format!("{}/v1/agent/chat", self.base_url);
let url = format!("{}{}", self.base_url, endpoint);

let json = serde_json::to_string(&request)?;
let encrypted = crypto::encrypt_data(&session.session_key, json.as_bytes())?;
Expand Down Expand Up @@ -1424,6 +1480,22 @@ impl OpenSecretClient {
})),
}
}
"agent.typing" => {
match serde_json::from_str::<AgentTypingEvent>(
&json_str,
) {
Ok(typing) => {
Some(Ok(AgentSseEvent::Typing(typing)))
}
Err(e) => Some(Err(Error::Api {
status: 0,
message: format!(
"Failed to parse agent typing: {}",
e
),
})),
}
}
"agent.done" => {
match serde_json::from_str::<AgentDoneEvent>(&json_str)
{
Expand Down Expand Up @@ -1475,135 +1547,106 @@ impl OpenSecretClient {
Ok(Box::pin(event_stream))
}

/// Gets the agent configuration for the current user
pub async fn get_agent_config(&self) -> Result<AgentConfigResponse> {
self.encrypted_api_call("/v1/agent/config", "GET", None::<()>)
// Agent API Methods

/// Fetches the current user's main agent.
pub async fn get_main_agent(&self) -> Result<MainAgentResponse> {
self.encrypted_api_call("/v1/agent", "GET", None::<()>)
.await
}

/// Updates the agent configuration
pub async fn update_agent_config(
&self,
request: UpdateAgentConfigRequest,
) -> Result<AgentConfigResponse> {
self.encrypted_api_call("/v1/agent/config", "PUT", Some(request))
/// Deletes the current user's main agent and resets shared agent state.
pub async fn delete_main_agent(&self) -> Result<DeletedObjectResponse> {
self.encrypted_api_call("/v1/agent", "DELETE", None::<()>)
.await
}

/// Lists all memory blocks for the current user's agent
pub async fn list_memory_blocks(&self) -> Result<Vec<MemoryBlockResponse>> {
self.encrypted_api_call("/v1/agent/memory/blocks", "GET", None::<()>)
.await
/// Lists items in the main agent conversation.
pub async fn list_main_agent_items(
&self,
params: Option<AgentItemsListParams>,
) -> Result<AgentItemsListResponse> {
let endpoint = build_agent_items_endpoint("/v1/agent/items", params.as_ref());
self.encrypted_api_call(&endpoint, "GET", None::<()>).await
}

/// Gets a specific memory block by label
pub async fn get_memory_block(&self, label: &str) -> Result<MemoryBlockResponse> {
let encoded = utf8_percent_encode(label, NON_ALPHANUMERIC).to_string();
self.encrypted_api_call(
&format!("/v1/agent/memory/blocks/{}", encoded),
"GET",
None::<()>,
)
.await
/// Fetches a single item from the main agent conversation.
pub async fn get_main_agent_item(&self, item_id: Uuid) -> Result<ConversationItem> {
self.encrypted_api_call(&format!("/v1/agent/items/{}", item_id), "GET", None::<()>)
.await
}

/// Updates a specific memory block by label
pub async fn update_memory_block(
/// Sends a message to the main agent and returns a stream of SSE events.
pub async fn agent_chat(
&self,
label: &str,
request: UpdateMemoryBlockRequest,
) -> Result<MemoryBlockResponse> {
let encoded = utf8_percent_encode(label, NON_ALPHANUMERIC).to_string();
self.encrypted_api_call(
&format!("/v1/agent/memory/blocks/{}", encoded),
"PUT",
Some(request),
)
.await
input: &str,
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<AgentSseEvent>> + Send>>> {
self.agent_chat_stream("/v1/agent/chat".to_string(), input)
.await
}

/// Inserts a new archival memory entry
pub async fn insert_archival_memory(
/// Creates a new subagent for the current user.
pub async fn create_subagent(
&self,
text: &str,
metadata: Option<Value>,
) -> Result<InsertArchivalResponse> {
let request = InsertArchivalRequest {
text: text.to_string(),
metadata,
};
self.encrypted_api_call("/v1/agent/memory/archival", "POST", Some(request))
request: CreateSubagentRequest,
) -> Result<SubagentResponse> {
self.encrypted_api_call("/v1/agent/subagents", "POST", Some(request))
.await
}

/// Deletes an archival memory entry by UUID
pub async fn delete_archival_memory(&self, id: Uuid) -> Result<DeletedObjectResponse> {
self.encrypted_api_call(
&format!("/v1/agent/memory/archival/{}", id),
"DELETE",
None::<()>,
)
.await
/// Lists subagents for the current user with pagination and filtering.
pub async fn list_subagents(
&self,
params: Option<ListSubagentsParams>,
) -> Result<SubagentListResponse> {
let endpoint = build_subagents_endpoint(params.as_ref());
self.encrypted_api_call(&endpoint, "GET", None::<()>).await
}

/// Searches agent memory (archival + recall)
pub async fn search_agent_memory(
&self,
request: MemorySearchRequest,
) -> Result<MemorySearchResponse> {
self.encrypted_api_call("/v1/agent/memory/search", "POST", Some(request))
/// Fetches a single subagent by UUID.
pub async fn get_subagent(&self, id: Uuid) -> Result<SubagentResponse> {
self.encrypted_api_call(&format!("/v1/agent/subagents/{}", id), "GET", None::<()>)
.await
}

/// Lists agent conversations
pub async fn list_agent_conversations(&self) -> Result<AgentConversationListResponse> {
self.encrypted_api_call("/v1/agent/conversations", "GET", None::<()>)
/// Sends a message to a specific subagent and returns a stream of SSE events.
pub async fn subagent_chat(
&self,
id: Uuid,
input: &str,
) -> Result<std::pin::Pin<Box<dyn futures::Stream<Item = Result<AgentSseEvent>> + Send>>> {
self.agent_chat_stream(format!("/v1/agent/subagents/{}/chat", id), input)
.await
}

/// Gets items from an agent conversation
pub async fn list_agent_conversation_items(
/// Lists items in a subagent conversation.
pub async fn list_subagent_items(
&self,
conversation_id: &str,
limit: Option<i32>,
after: Option<&str>,
order: Option<&str>,
) -> Result<AgentConversationItemsResponse> {
let encoded_id = utf8_percent_encode(conversation_id, NON_ALPHANUMERIC).to_string();
let mut url = format!("/v1/agent/conversations/{}/items", encoded_id);
let mut params = Vec::new();
if let Some(l) = limit {
params.push(format!("limit={}", l));
}
if let Some(a) = after {
params.push(format!(
"after={}",
utf8_percent_encode(a, NON_ALPHANUMERIC)
));
}
if let Some(o) = order {
params.push(format!("order={}", o));
}
if !params.is_empty() {
url.push('?');
url.push_str(&params.join("&"));
}
self.encrypted_api_call(&url, "GET", None::<()>).await
id: Uuid,
params: Option<AgentItemsListParams>,
) -> Result<AgentItemsListResponse> {
let endpoint = build_agent_items_endpoint(
&format!("/v1/agent/subagents/{}/items", id),
params.as_ref(),
);
self.encrypted_api_call(&endpoint, "GET", None::<()>).await
}

/// Deletes an agent conversation
pub async fn delete_agent_conversation(
&self,
conversation_id: &str,
) -> Result<DeletedObjectResponse> {
let encoded_id = utf8_percent_encode(conversation_id, NON_ALPHANUMERIC).to_string();
/// Fetches a single item from a subagent conversation.
pub async fn get_subagent_item(&self, id: Uuid, item_id: Uuid) -> Result<ConversationItem> {
self.encrypted_api_call(
&format!("/v1/agent/conversations/{}", encoded_id),
"DELETE",
&format!("/v1/agent/subagents/{}/items/{}", id, item_id),
"GET",
None::<()>,
)
.await
}

/// Deletes a subagent by UUID.
pub async fn delete_subagent(&self, id: Uuid) -> Result<DeletedObjectResponse> {
self.encrypted_api_call(&format!("/v1/agent/subagents/{}", id), "DELETE", None::<()>)
.await
}
}

#[cfg(test)]
Expand Down
Loading