diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 031bd166..90e8ee10 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -92,6 +92,7 @@ func (c *Client) CacheDir() string { func (c *Client) cacheKey(key any) (string, error) { hash := sha256.New() + hash.Write([]byte("v2")) if err := json.NewEncoder(hash).Encode(key); err != nil { return "", err } diff --git a/pkg/engine/http.go b/pkg/engine/http.go index 0742b236..87348ce9 100644 --- a/pkg/engine/http.go +++ b/pkg/engine/http.go @@ -76,7 +76,7 @@ func (e *Engine) runHTTP(ctx context.Context, prg *types.Program, tool types.Too } for _, env := range e.Env { - if strings.HasPrefix(env, "GPTSCRIPT_") { + if strings.HasPrefix(env, "GPTSCRIPT_WORKSPACE_") { req.Header.Add("X-GPTScript-Env", env) } } diff --git a/pkg/openai/client.go b/pkg/openai/client.go index af518f93..be5c6253 100644 --- a/pkg/openai/client.go +++ b/pkg/openai/client.go @@ -9,6 +9,7 @@ import ( "slices" "sort" "strings" + "time" openai "github.com/gptscript-ai/chat-completion-client" "github.com/gptscript-ai/gptscript/pkg/cache" @@ -212,15 +213,15 @@ func (c *Client) seed(request openai.ChatCompletionRequest) int { return hash.Seed(newRequest) } -func (c *Client) fromCache(ctx context.Context, messageRequest types.CompletionRequest, request openai.ChatCompletionRequest) (result []openai.ChatCompletionStreamResponse, _ bool, _ error) { +func (c *Client) fromCache(ctx context.Context, messageRequest types.CompletionRequest, request openai.ChatCompletionRequest) (result types.CompletionMessage, _ bool, _ error) { if !messageRequest.GetCache() { - return nil, false, nil + return types.CompletionMessage{}, false, nil } found, err := c.cache.Get(ctx, c.cacheKey(request), &result) if err != nil { - return nil, false, err + return types.CompletionMessage{}, false, err } else if !found { - return nil, false, nil + return types.CompletionMessage{}, false, nil } return result, true, nil } @@ -396,11 +397,11 @@ func (c *Client) Call(ctx context.Context, messageRequest types.CompletionReques IncludeUsage: true, } } - response, ok, err := c.fromCache(ctx, messageRequest, request) + result, ok, err := c.fromCache(ctx, messageRequest, request) if err != nil { return nil, err } else if !ok { - response, err = c.call(ctx, request, id, status) + result, err = c.call(ctx, request, id, status) // If we got back a context length exceeded error, keep retrying and shrinking the message history until we pass. var apiError *openai.APIError @@ -408,9 +409,8 @@ func (c *Client) Call(ctx context.Context, messageRequest types.CompletionReques // Decrease maxTokens by 10% to make garbage collection more aggressive. // The retry loop will further decrease maxTokens if needed. maxTokens := decreaseTenPercent(messageRequest.MaxTokens) - response, err = c.contextLimitRetryLoop(ctx, request, id, maxTokens, status) + result, err = c.contextLimitRetryLoop(ctx, request, id, maxTokens, status) } - if err != nil { return nil, err } @@ -418,11 +418,6 @@ func (c *Client) Call(ctx context.Context, messageRequest types.CompletionReques cacheResponse = true } - result := types.CompletionMessage{} - for _, response := range response { - result = appendMessage(result, response) - } - for i, content := range result.Content { if content.ToolCall != nil && content.ToolCall.ID == "" { content.ToolCall.ID = "call_" + hash.ID(content.ToolCall.Function.Name, content.ToolCall.Function.Arguments)[:8] @@ -440,7 +435,6 @@ func (c *Client) Call(ctx context.Context, messageRequest types.CompletionReques status <- types.CompletionStatus{ CompletionID: id, - Chunks: response, Response: result, Usage: result.Usage, Cached: cacheResponse, @@ -449,9 +443,9 @@ func (c *Client) Call(ctx context.Context, messageRequest types.CompletionReques return &result, nil } -func (c *Client) contextLimitRetryLoop(ctx context.Context, request openai.ChatCompletionRequest, id string, maxTokens int, status chan<- types.CompletionStatus) ([]openai.ChatCompletionStreamResponse, error) { +func (c *Client) contextLimitRetryLoop(ctx context.Context, request openai.ChatCompletionRequest, id string, maxTokens int, status chan<- types.CompletionStatus) (types.CompletionMessage, error) { var ( - response []openai.ChatCompletionStreamResponse + response types.CompletionMessage err error ) @@ -469,10 +463,10 @@ func (c *Client) contextLimitRetryLoop(ctx context.Context, request openai.ChatC maxTokens = decreaseTenPercent(maxTokens) continue } - return nil, err + return types.CompletionMessage{}, err } - return nil, err + return types.CompletionMessage{}, err } func appendMessage(msg types.CompletionMessage, response openai.ChatCompletionStreamResponse) types.CompletionMessage { @@ -548,7 +542,7 @@ func override(left, right string) string { return left } -func (c *Client) call(ctx context.Context, request openai.ChatCompletionRequest, transactionID string, partial chan<- types.CompletionStatus) (responses []openai.ChatCompletionStreamResponse, _ error) { +func (c *Client) call(ctx context.Context, request openai.ChatCompletionRequest, transactionID string, partial chan<- types.CompletionStatus) (types.CompletionMessage, error) { streamResponse := os.Getenv("GPTSCRIPT_INTERNAL_OPENAI_STREAMING") != "false" partial <- types.CompletionStatus{ @@ -565,56 +559,58 @@ func (c *Client) call(ctx context.Context, request openai.ChatCompletionRequest, request.StreamOptions = nil resp, err := c.c.CreateChatCompletion(ctx, request) if err != nil { - return nil, err + return types.CompletionMessage{}, err } - return []openai.ChatCompletionStreamResponse{ - { - ID: resp.ID, - Object: resp.Object, - Created: resp.Created, - Model: resp.Model, - Usage: resp.Usage, - Choices: []openai.ChatCompletionStreamChoice{ - { - Index: resp.Choices[0].Index, - Delta: openai.ChatCompletionStreamChoiceDelta{ - Content: resp.Choices[0].Message.Content, - Role: resp.Choices[0].Message.Role, - FunctionCall: resp.Choices[0].Message.FunctionCall, - ToolCalls: resp.Choices[0].Message.ToolCalls, - }, - FinishReason: resp.Choices[0].FinishReason, + return appendMessage(types.CompletionMessage{}, openai.ChatCompletionStreamResponse{ + ID: resp.ID, + Object: resp.Object, + Created: resp.Created, + Model: resp.Model, + Usage: resp.Usage, + Choices: []openai.ChatCompletionStreamChoice{ + { + Index: resp.Choices[0].Index, + Delta: openai.ChatCompletionStreamChoiceDelta{ + Content: resp.Choices[0].Message.Content, + Role: resp.Choices[0].Message.Role, + FunctionCall: resp.Choices[0].Message.FunctionCall, + ToolCalls: resp.Choices[0].Message.ToolCalls, }, + FinishReason: resp.Choices[0].FinishReason, }, }, - }, nil + }), nil } stream, err := c.c.CreateChatCompletionStream(ctx, request) if err != nil { - return nil, err + return types.CompletionMessage{}, err } defer stream.Close() - var partialMessage types.CompletionMessage + var ( + partialMessage types.CompletionMessage + start = time.Now() + last []string + ) for { response, err := stream.Recv() if err == io.EOF { - return responses, c.cache.Store(ctx, c.cacheKey(request), responses) + return partialMessage, c.cache.Store(ctx, c.cacheKey(request), partialMessage) } else if err != nil { - return nil, err - } - if len(response.Choices) > 0 { - slog.Debug("stream", "content", response.Choices[0].Delta.Content) + return types.CompletionMessage{}, err } + partialMessage = appendMessage(partialMessage, response) if partial != nil { - partialMessage = appendMessage(partialMessage, response) - partial <- types.CompletionStatus{ - CompletionID: transactionID, - PartialResponse: &partialMessage, + if time.Since(start) > 500*time.Millisecond { + last = last[:0] + partial <- types.CompletionStatus{ + CompletionID: transactionID, + PartialResponse: &partialMessage, + } + start = time.Now() } } - responses = append(responses, response) } } diff --git a/pkg/types/completion.go b/pkg/types/completion.go index 6a05effa..2362071f 100644 --- a/pkg/types/completion.go +++ b/pkg/types/completion.go @@ -82,7 +82,6 @@ type CompletionStatus struct { Response any Usage Usage Cached bool - Chunks any PartialResponse *CompletionMessage }